You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
proxysql/lib/PgSQLFFTO.cpp

179 lines
6.9 KiB

#include "proxysql.h"
#include "PgSQL_HostGroups_Manager.h"
#include "PgSQL_Thread.h"
#include "PgSQL_Session.h"
#include "PgSQL_Data_Stream.h"
#include "PgSQL_Query_Processor.h"
#include "PgSQLFFTO.hpp"
#ifndef SPOOKYV2
#include "SpookyV2.h"
#define SPOOKYV2
#endif
#include "c_tokenizer.h"
#include <arpa/inet.h>
#include <cstring>
#include <regex>
extern class PgSQL_Query_Processor* GloPgQPro;
// Helper to extract rows affected/sent from PostgreSQL CommandComplete tag
static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t len, bool& is_select) {
std::string command_tag(reinterpret_cast<const char*>(payload), len);
is_select = false;
std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\s+\\S*\\s*(\\d+)");
std::smatch matches;
if (std::regex_search(command_tag, matches, re)) {
if (matches.size() == 3) {
std::string command_type = matches[1].str();
uint64_t rows = std::stoull(matches[2].str());
if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") {
is_select = true;
}
return rows;
}
}
return 0;
}
PgSQLFFTO::PgSQLFFTO(PgSQL_Session* session)
: m_session(session), m_state(IDLE), m_query_start_time(0) {
m_client_buffer.reserve(1024);
m_server_buffer.reserve(4096);
}
PgSQLFFTO::~PgSQLFFTO() {
on_close();
}
void PgSQLFFTO::on_client_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_client_buffer.insert(m_client_buffer.end(), buf, buf + len);
while (m_client_buffer.size() >= 5) {
char type = m_client_buffer[0];
uint32_t msg_len;
memcpy(&msg_len, &m_client_buffer[1], 4);
msg_len = ntohl(msg_len);
if (m_client_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + 5;
process_client_message(type, payload, msg_len - 4);
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + 1 + msg_len);
}
}
void PgSQLFFTO::on_server_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_server_buffer.insert(m_server_buffer.end(), buf, buf + len);
while (m_server_buffer.size() >= 5) {
char type = m_server_buffer[0];
uint32_t msg_len;
memcpy(&msg_len, &m_server_buffer[1], 4);
msg_len = ntohl(msg_len);
if (m_server_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + 5;
process_server_message(type, payload, msg_len - 4);
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + 1 + msg_len);
}
}
void PgSQLFFTO::on_close() {
if (m_state != IDLE && m_query_start_time != 0) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
m_state = IDLE;
}
}
void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, size_t len) {
if (type == 'Q') {
m_current_query = std::string(reinterpret_cast<const char*>(payload), len);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
} else if (type == 'P') {
std::string stmt_name = reinterpret_cast<const char*>(payload);
const char* query = reinterpret_cast<const char*>(payload) + stmt_name.length() + 1;
m_statements[stmt_name] = query;
} else if (type == 'B') {
std::string portal_name = reinterpret_cast<const char*>(payload);
const char* stmt_ptr = reinterpret_cast<const char*>(payload) + portal_name.length() + 1;
std::string stmt_name = stmt_ptr;
m_portals[portal_name] = stmt_name;
} else if (type == 'E') {
std::string portal_name = reinterpret_cast<const char*>(payload);
auto pit = m_portals.find(portal_name);
if (pit != m_portals.end()) {
auto sit = m_statements.find(pit->second);
if (sit != m_statements.end()) {
m_current_query = sit->second;
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
}
}
} else if (type == 'X') {
on_close();
}
}
void PgSQLFFTO::process_server_message(char type, const unsigned char* payload, size_t len) {
if (m_state == IDLE) return;
if (type == 'C') {
unsigned long long duration = monotonic_time() - m_query_start_time;
bool is_select = false;
uint64_t rows = extract_pg_rows_affected(payload, len, is_select);
if (is_select) report_query_stats(m_current_query, duration, 0, rows);
else report_query_stats(m_current_query, duration, rows, 0);
m_state = IDLE;
} else if (type == 'Z' || type == 'E') {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
m_state = IDLE;
}
}
void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long duration_us, uint64_t affected_rows, uint64_t rows_sent) {
if (query.empty() || !GloPgQPro) return;
options opts;
opts.lowercase = pgsql_thread___query_digests_lowercase;
opts.replace_null = pgsql_thread___query_digests_replace_null;
opts.replace_number = !pgsql_thread___query_digests_no_digits;
opts.keep_comment = pgsql_thread___query_digests_keep_comment;
opts.grouping_limit = pgsql_thread___query_digests_grouping_limit;
opts.groups_grouping_limit = pgsql_thread___query_digests_groups_grouping_limit;
opts.max_query_length = pgsql_thread___query_digests_max_query_length;
SQP_par_t qp;
memset(&qp, 0, sizeof(qp));
char* fst_cmnt = NULL;
char* digest_text = pgsql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts);
if (digest_text) {
qp.digest_text = digest_text;
qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0);
char* ca = (char*)"";
if (pgsql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) {
ca = m_session->client_myds->addr.addr;
}
uint64_t hash2;
SpookyHash myhash;
myhash.Init(19, 3);
auto* ui = m_session->client_myds->myconn->userinfo;
myhash.Update(ui->username, strlen(ui->username));
myhash.Update(&qp.digest, sizeof(qp.digest));
myhash.Update(ui->schemaname, strlen(ui->schemaname));
myhash.Update(&m_session->current_hostgroup, sizeof(m_session->current_hostgroup));
myhash.Update(ca, strlen(ca));
myhash.Final(&qp.digest_total, &hash2);
GloPgQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, affected_rows, rows_sent);
free(digest_text);
}
if (fst_cmnt) free(fst_cmnt);
}