Address PR reviews: improve performance, safety, and robustness of FFTO

pull/5393/head
Rene Cannao 3 months ago
parent db04a54c92
commit 1258a1f2f2

@ -28,21 +28,21 @@ public:
/**
* @brief Destructor for MySQLFFTO. Ensures cleanup and reports any pending query stats.
*/
virtual ~MySQLFFTO();
~MySQLFFTO() override;
/**
* @brief Entry point for data received from the client.
* @param buf Pointer to the raw data buffer.
* @param len Length of the data in bytes.
*/
void on_client_data(const char* buf, size_t len) override;
void on_client_data(const char* buf, std::size_t len) override;
/**
* @brief Entry point for data received from the MySQL server.
* @param buf Pointer to the raw data buffer.
* @param len Length of the data in bytes.
*/
void on_server_data(const char* buf, size_t len) override;
void on_server_data(const char* buf, std::size_t len) override;
/**
* @brief Called when the session is closing. Ensures the final query's metrics are reported.
@ -66,6 +66,8 @@ private:
State m_state; ///< Current state of the protocol parser.
std::vector<char> m_client_buffer; ///< Temporary buffer for client-side packet reassembly.
std::vector<char> m_server_buffer; ///< Temporary buffer for server-side packet reassembly.
std::size_t m_client_offset {0}; ///< Current read offset in m_client_buffer.
std::size_t m_server_offset {0}; ///< Current read offset in m_server_buffer.
std::string m_current_query; ///< The query currently being tracked.
std::string m_pending_prepare_query; ///< The SQL text of a pending prepare statement.

@ -486,11 +486,11 @@ class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL
MySQL_STMTs_meta *sess_STMTs_meta;
StmtLongDataHandler *SLDH;
Session_Regex **match_regexes;
std::unique_ptr<TrafficObserver> m_ffto;
bool ffto_bypassed;
ProxySQL_Node_Address * proxysql_node_address;
Session_Regex **match_regexes;
std::unique_ptr<TrafficObserver> m_ffto;
bool ffto_bypassed;
ProxySQL_Node_Address * proxysql_node_address;
// this is used ONLY for Admin, and only if the other party is another proxysql instance part of a cluster
bool use_ldap_auth;

@ -415,8 +415,6 @@ class MySQL_Threads_Handler
bool monitor_wait_timeout;
bool monitor_writer_is_also_reader;
bool monitor_replication_lag_group_by_host;
bool ffto_enabled;
int ffto_max_buffer_size;
//! How frequently a replication lag check is performed. Unit: 'ms'.
int monitor_replication_lag_interval;
//! Read only check timeout. Unit: 'ms'.
@ -597,6 +595,8 @@ class MySQL_Threads_Handler
int processlist_max_query_length;
bool ignore_min_gtid_annotations;
bool ffto_enabled;
int ffto_max_buffer_size;
} variables;
struct {
unsigned int mirror_sessions_current;

@ -8,10 +8,6 @@
class PgSQL_Session;
/**
* @class PgSQLFFTO
* @brief PostgreSQL-specific implementation of TrafficObserver.
*/
/**
* @class PgSQLFFTO
* @brief Observer class for PostgreSQL traffic in Fast-Forward mode.
@ -31,21 +27,21 @@ public:
/**
* @brief Destructor for PgSQLFFTO. Ensures cleanup and metrics reporting.
*/
virtual ~PgSQLFFTO();
~PgSQLFFTO() override;
/**
* @brief Entry point for data received from the PostgreSQL client.
* @param buf Pointer to the raw data buffer.
* @param len Length of the data in bytes.
*/
void on_client_data(const char* buf, size_t len) override;
void on_client_data(const char* buf, std::size_t len) override;
/**
* @brief Entry point for data received from the PostgreSQL server.
* @param buf Pointer to the raw data buffer.
* @param len Length of the data in bytes.
*/
void on_server_data(const char* buf, size_t len) override;
void on_server_data(const char* buf, std::size_t len) override;
/**
* @brief Called when the PostgreSQL session is closing. Ensures final stats are reported.
@ -66,9 +62,13 @@ private:
State m_state; ///< Current state of the protocol observer.
std::vector<char> m_client_buffer; ///< Temporary buffer for client message reassembly.
std::vector<char> m_server_buffer; ///< Temporary buffer for server message reassembly.
std::size_t m_client_offset {0}; ///< Current read offset in m_client_buffer.
std::size_t m_server_offset {0}; ///< Current read offset in m_server_buffer.
std::string m_current_query; ///< The SQL query currently being tracked.
unsigned long long m_query_start_time; ///< Start timestamp of the current query in microseconds.
uint64_t m_affected_rows {0}; ///< Accumulated affected rows for the current query.
uint64_t m_rows_sent {0}; ///< Accumulated rows sent for the current query.
// Binary Protocol Tracking (PostgreSQL Extended Query)
std::unordered_map<std::string, std::string> m_statements; ///< Map of statement names to original SQL text.

@ -902,8 +902,6 @@ public:
bool monitor_wait_timeout;
bool monitor_writer_is_also_reader;
bool monitor_replication_lag_group_by_host;
bool ffto_enabled;
int ffto_max_buffer_size;
//! How frequently a replication lag check is performed. Unit: 'ms'.
int monitor_replication_lag_interval;
//! Read only check timeout. Unit: 'ms'.
@ -1068,6 +1066,8 @@ public:
#endif
int show_processlist_extended;
int processlist_max_query_length;
bool ffto_enabled;
int ffto_max_buffer_size;
} variables;
struct {
unsigned int mirror_sessions_current;

@ -1,7 +1,7 @@
#ifndef TRAFFIC_OBSERVER_HPP
#define TRAFFIC_OBSERVER_HPP
#include <stddef.h>
#include <cstddef>
/**
* @class TrafficObserver
@ -13,21 +13,21 @@
*/
class TrafficObserver {
public:
virtual ~TrafficObserver() {}
virtual ~TrafficObserver() = default;
/**
* @brief Called when data is received from the client.
* @param buf Pointer to the raw protocol data buffer.
* @param len Length of the data in the buffer.
*/
virtual void on_client_data(const char* buf, size_t len) = 0;
virtual void on_client_data(const char* buf, std::size_t len) = 0;
/**
* @brief Called when data is received from the server.
* @param buf Pointer to the raw protocol data buffer.
* @param len Length of the data in the buffer.
*/
virtual void on_server_data(const char* buf, size_t len) = 0;
virtual void on_server_data(const char* buf, std::size_t len) = 0;
/**
* @brief Called when the session is closing.

@ -64,29 +64,47 @@ MySQLFFTO::~MySQLFFTO() {
on_close();
}
void MySQLFFTO::on_client_data(const char* buf, size_t len) {
void MySQLFFTO::on_client_data(const char* buf, std::size_t len) {
if (!buf || len == 0) return;
m_client_buffer.insert(m_client_buffer.end(), buf, buf + len);
while (m_client_buffer.size() >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_client_buffer.data());
while (m_client_buffer.size() - m_client_offset >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_client_buffer.data() + m_client_offset);
uint32_t pkt_len = hdr->pkt_length;
if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + sizeof(mysql_hdr);
if (m_client_buffer.size() - m_client_offset < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + m_client_offset + sizeof(mysql_hdr);
process_client_packet(payload, pkt_len);
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + sizeof(mysql_hdr) + pkt_len);
m_client_offset += sizeof(mysql_hdr) + pkt_len;
}
if (m_client_offset > 0) {
if (m_client_offset >= m_client_buffer.size()) {
m_client_buffer.clear();
m_client_offset = 0;
} else if (m_client_offset > 4096) { // Compact if offset is large
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + m_client_offset);
m_client_offset = 0;
}
}
}
void MySQLFFTO::on_server_data(const char* buf, size_t len) {
void MySQLFFTO::on_server_data(const char* buf, std::size_t len) {
if (!buf || len == 0) return;
m_server_buffer.insert(m_server_buffer.end(), buf, buf + len);
while (m_server_buffer.size() >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_server_buffer.data());
while (m_server_buffer.size() - m_server_offset >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_server_buffer.data() + m_server_offset);
uint32_t pkt_len = hdr->pkt_length;
if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + sizeof(mysql_hdr);
if (m_server_buffer.size() - m_server_offset < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + m_server_offset + sizeof(mysql_hdr);
process_server_packet(payload, pkt_len);
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + sizeof(mysql_hdr) + pkt_len);
m_server_offset += sizeof(mysql_hdr) + pkt_len;
}
if (m_server_offset > 0) {
if (m_server_offset >= m_server_buffer.size()) {
m_server_buffer.clear();
m_server_offset = 0;
} else if (m_server_offset > 4096) { // Compact if offset is large
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + m_server_offset);
m_server_offset = 0;
}
}
}
@ -120,6 +138,11 @@ void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) {
m_affected_rows = 0; m_rows_sent = 0;
}
}
} else if (command == _MYSQL_COM_STMT_CLOSE) {
if (len >= 5) {
uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4);
m_statements.erase(stmt_id);
}
} else if (command == _MYSQL_COM_QUIT) {
on_close();
}
@ -129,6 +152,11 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
if (len == 0 || m_state == IDLE) return;
uint8_t first_byte = data[0];
bool deprecate_eof = false;
if (m_session && m_session->client_myds && m_session->client_myds->myconn) {
deprecate_eof = (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF);
}
if (m_state == AWAITING_PREPARE_OK) {
if (first_byte == 0x00 && len >= 9) {
uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4);
@ -156,7 +184,7 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
} else if (m_state == READING_COLUMNS) {
if (first_byte == 0xFE && len < 9) { // EOF after columns
m_state = READING_ROWS;
} else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) {
} else if (first_byte == 0xFE && len >= 9 && deprecate_eof) {
m_state = READING_ROWS;
}
} else if (m_state == READING_ROWS) {
@ -164,7 +192,7 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, 0, m_rows_sent);
m_state = IDLE;
} else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) {
} else if (first_byte == 0xFE && len >= 9 && deprecate_eof) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, 0, m_rows_sent);
m_state = IDLE;

@ -729,14 +729,15 @@ void MySQL_Session::reset() {
delete mybes;
mybes=NULL;
}
mybe=NULL;
with_gtid = false;
backend_closed_in_fast_forward = false;
fast_forward_grace_start_time = 0;
//gtid_trxid = 0;
gtid_hid = -1;
mybe=NULL;
with_gtid = false;
backend_closed_in_fast_forward = false;
fast_forward_grace_start_time = 0;
ffto_bypassed = false;
m_ffto.reset();
//gtid_trxid = 0; gtid_hid = -1;
memset(gtid_buf,0,sizeof(gtid_buf));
if (session_type == PROXYSQL_SESSION_SQLITE) {
SQLite3_Session *sqlite_sess = (SQLite3_Session *)thread->gen_args;
@ -6023,17 +6024,16 @@ handler_again:
// register the mysql_data_stream
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
if (mysql_thread___ffto_enabled && !ffto_bypassed) {
if (!m_ffto) {
m_ffto = std::make_unique<MySQLFFTO>(this);
}
if (m_ffto) {
for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) {
m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size);
}
}
}
client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len);
if (mysql_thread___ffto_enabled && !ffto_bypassed && m_ffto) {
for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) {
if (mybe->server_myds->PSarrayIN->pdata[i].size > (size_t)mysql_thread___ffto_max_buffer_size) {
ffto_bypassed = true;
m_ffto.reset();
break;
}
m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size);
}
} client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len);
while (mybe->server_myds->PSarrayIN->len) mybe->server_myds->PSarrayIN->remove_index(mybe->server_myds->PSarrayIN->len-1,NULL);
break;
case CONNECTING_CLIENT:

@ -2562,7 +2562,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false);
VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false);
VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false);
VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 0, 1024*1024*1024, false);
VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 1, 1024*1024*1024, false);
VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false);
VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false);
VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false);

@ -29,9 +29,10 @@ extern class PgSQL_Query_Processor* GloPgQPro;
* @return The number of rows affected or sent.
*/
static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t len, bool& is_select) {
if (len == 0) return 0;
std::string command_tag(reinterpret_cast<const char*>(payload), len);
is_select = false;
std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\s+\\S*\\s*(\\d+)");
static const std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\b.*?\\b(\\d+)$");
std::smatch matches;
if (std::regex_search(command_tag, matches, re)) {
if (matches.size() == 3) {
@ -45,7 +46,7 @@ static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t le
}
PgSQLFFTO::PgSQLFFTO(PgSQL_Session* session)
: m_session(session), m_state(IDLE), m_query_start_time(0) {
: m_session(session), m_state(IDLE), m_query_start_time(0), m_affected_rows(0), m_rows_sent(0) {
m_client_buffer.reserve(1024);
m_server_buffer.reserve(4096);
}
@ -54,65 +55,119 @@ PgSQLFFTO::~PgSQLFFTO() {
on_close();
}
void PgSQLFFTO::on_client_data(const char* buf, size_t len) {
void PgSQLFFTO::on_client_data(const char* buf, std::size_t len) {
if (!buf || len == 0) return;
m_client_buffer.insert(m_client_buffer.end(), buf, buf + len);
while (m_client_buffer.size() >= 5) {
char type = m_client_buffer[0];
uint32_t msg_len; memcpy(&msg_len, &m_client_buffer[1], 4); msg_len = ntohl(msg_len);
if (m_client_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + 5;
while (m_client_buffer.size() - m_client_offset >= 5) {
char type = m_client_buffer[m_client_offset];
uint32_t msg_len; memcpy(&msg_len, &m_client_buffer[m_client_offset + 1], 4); msg_len = ntohl(msg_len);
if (msg_len < 4 || msg_len > 1024 * 1024 * 1024) { // Sanity check
on_close();
m_client_buffer.clear(); m_client_offset = 0;
return;
}
if (m_client_buffer.size() - m_client_offset < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + m_client_offset + 5;
process_client_message(type, payload, msg_len - 4);
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + 1 + msg_len);
m_client_offset += 1 + msg_len;
}
if (m_client_offset > 0) {
if (m_client_offset >= m_client_buffer.size()) {
m_client_buffer.clear();
m_client_offset = 0;
} else if (m_client_offset > 4096) {
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + m_client_offset);
m_client_offset = 0;
}
}
}
void PgSQLFFTO::on_server_data(const char* buf, size_t len) {
void PgSQLFFTO::on_server_data(const char* buf, std::size_t len) {
if (!buf || len == 0) return;
m_server_buffer.insert(m_server_buffer.end(), buf, buf + len);
while (m_server_buffer.size() >= 5) {
char type = m_server_buffer[0];
uint32_t msg_len; memcpy(&msg_len, &m_server_buffer[1], 4); msg_len = ntohl(msg_len);
if (m_server_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + 5;
while (m_server_buffer.size() - m_server_offset >= 5) {
char type = m_server_buffer[m_server_offset];
uint32_t msg_len; memcpy(&msg_len, &m_server_buffer[m_server_offset + 1], 4); msg_len = ntohl(msg_len);
if (msg_len < 4 || msg_len > 1024 * 1024 * 1024) { // Sanity check
on_close();
m_server_buffer.clear(); m_server_offset = 0;
return;
}
if (m_server_buffer.size() - m_server_offset < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + m_server_offset + 5;
process_server_message(type, payload, msg_len - 4);
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + 1 + msg_len);
m_server_offset += 1 + msg_len;
}
if (m_server_offset > 0) {
if (m_server_offset >= m_server_buffer.size()) {
m_server_buffer.clear();
m_server_offset = 0;
} else if (m_server_offset > 4096) {
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + m_server_offset);
m_server_offset = 0;
}
}
}
void PgSQLFFTO::on_close() {
if (m_state != IDLE && m_query_start_time != 0) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent);
m_state = IDLE;
}
}
void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, size_t len) {
if (type == 'Q') {
m_current_query = std::string(reinterpret_cast<const char*>(payload), len);
size_t query_len = (len > 0 && payload[len - 1] == 0) ? len - 1 : len;
m_current_query = std::string(reinterpret_cast<const char*>(payload), query_len);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
m_affected_rows = 0; m_rows_sent = 0;
} else if (type == 'P') {
std::string stmt_name = reinterpret_cast<const char*>(payload);
const char* query = reinterpret_cast<const char*>(payload) + stmt_name.length() + 1;
m_statements[stmt_name] = query;
const char* p = reinterpret_cast<const char*>(payload);
size_t name_len = strnlen(p, len);
if (name_len >= len) return; // No null terminator
std::string stmt_name(p, name_len);
const char* query_ptr = p + name_len + 1;
size_t rem = len - (name_len + 1);
size_t query_text_len = strnlen(query_ptr, rem);
m_statements[stmt_name] = std::string(query_ptr, query_text_len);
} else if (type == 'B') {
std::string portal_name = reinterpret_cast<const char*>(payload);
const char* stmt_ptr = reinterpret_cast<const char*>(payload) + portal_name.length() + 1;
std::string stmt_name = stmt_ptr;
m_portals[portal_name] = stmt_name;
const char* p = reinterpret_cast<const char*>(payload);
size_t portal_len = strnlen(p, len);
if (portal_len >= len) return;
std::string portal_name(p, portal_len);
const char* stmt_ptr = p + portal_len + 1;
size_t rem = len - (portal_len + 1);
size_t stmt_name_len = strnlen(stmt_ptr, rem);
if (stmt_name_len >= rem) return;
m_portals[portal_name] = std::string(stmt_ptr, stmt_name_len);
} else if (type == 'E') {
std::string portal_name = reinterpret_cast<const char*>(payload);
const char* p = reinterpret_cast<const char*>(payload);
size_t portal_len = strnlen(p, len);
std::string portal_name(p, std::min(portal_len, len));
auto pit = m_portals.find(portal_name);
if (pit != m_portals.end()) {
auto sit = m_statements.find(pit->second);
if (sit != m_statements.end()) {
if (m_state == AWAITING_RESPONSE) {
on_close(); // Finalize previous if any
}
m_current_query = sit->second;
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
m_affected_rows = 0; m_rows_sent = 0;
}
}
} else if (type == 'C') { // Frontend Close
if (len < 1) return;
char close_type = static_cast<char>(payload[0]);
const char* name_ptr = reinterpret_cast<const char*>(payload) + 1;
size_t name_len = strnlen(name_ptr, len - 1);
std::string name(name_ptr, name_len);
if (close_type == 'S') m_statements.erase(name);
else if (close_type == 'P') m_portals.erase(name);
} else if (type == 'X') {
on_close();
}
@ -121,20 +176,18 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload,
void PgSQLFFTO::process_server_message(char type, const unsigned char* payload, size_t len) {
if (m_state == IDLE) return;
if (type == 'C') {
unsigned long long duration = monotonic_time() - m_query_start_time;
bool is_select = false;
uint64_t rows = extract_pg_rows_affected(payload, len, is_select);
if (is_select) report_query_stats(m_current_query, duration, 0, rows);
else report_query_stats(m_current_query, duration, rows, 0);
m_state = IDLE;
if (is_select) m_rows_sent += rows;
else m_affected_rows += rows;
} else if (type == 'Z' || type == 'E') {
// ReadyForQuery or ErrorResponse. We don't always want to report stats here if 'C' already did it.
// But if we didn't get 'C', report what we have.
// ReadyForQuery or ErrorResponse
if (m_state == AWAITING_RESPONSE) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent);
}
m_state = IDLE;
m_affected_rows = 0; m_rows_sent = 0;
}
}

@ -306,13 +306,13 @@ void PgSQL_Session::reset() {
}
}
}
if (client_myds && client_myds->myconn) {
client_myds->myconn->reset();
}
extended_query_phase = EXTQ_PHASE_IDLE;
}
PgSQL_Session::~PgSQL_Session() {
if (client_myds && client_myds->myconn) {
client_myds->myconn->reset();
}
extended_query_phase = EXTQ_PHASE_IDLE;
ffto_bypassed = false;
m_ffto.reset();
}PgSQL_Session::~PgSQL_Session() {
if (m_ffto) {
m_ffto->on_close();
}
@ -2729,17 +2729,16 @@ handler_again:
// register the PgSQL_Data_Stream
thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
if (pgsql_thread___ffto_enabled && !ffto_bypassed) {
if (!m_ffto) {
m_ffto = std::make_unique<PgSQLFFTO>(this);
}
if (m_ffto) {
for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) {
m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size);
}
}
}
client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len);
if (pgsql_thread___ffto_enabled && !ffto_bypassed && m_ffto) {
for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) {
if (mybe->server_myds->PSarrayIN->pdata[i].size > (size_t)pgsql_thread___ffto_max_buffer_size) {
ffto_bypassed = true;
m_ffto.reset();
break;
}
m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size);
}
} client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len);
constexpr unsigned char ready_packet[] = { 0x5A, 0x00, 0x00, 0x00, 0x05 };
bool is_copy_ready_packet = false;

@ -2223,7 +2223,7 @@ char** PgSQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["poll_timeout_on_failure"] = make_tuple(&variables.poll_timeout_on_failure, 10, 20000, false);
VariablesPointers_int["shun_on_failures"] = make_tuple(&variables.shun_on_failures, 0, 10000000, false);
VariablesPointers_int["shun_recovery_time_sec"] = make_tuple(&variables.shun_recovery_time_sec, 0, 3600 * 24 * 365, false);
VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 0, 1024 * 1024 * 1024, false);
VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 1, 1024 * 1024 * 1024, false);
VariablesPointers_int["unshun_algorithm"] = make_tuple(&variables.unshun_algorithm, 0, 1, false);
VariablesPointers_int["hostgroup_manager_verbose"] = make_tuple(&variables.hostgroup_manager_verbose, 0, 3, false);
VariablesPointers_int["tcp_keepalive_time"] = make_tuple(&variables.tcp_keepalive_time, 0, 7200, false);

@ -1505,7 +1505,6 @@ void ProxySQL_Main_init_phase2___not_started(const bootstrap_info_t& boostrap_in
}
#endif /* PROXYSQLGENAI */
ProxySQL_Main_init_Admin_module(boostrap_info);
proxy_info("TEST_LOG: ProxySQL is starting\n");
GloMTH->print_version();
{

@ -54,14 +54,14 @@ int main(int argc, char** argv) {
for(int i=0; i<200; i++) large_query += "x";
large_query += "'";
mysql_query(conn, large_query.c_str());
MYSQL_QUERY(conn, large_query.c_str());
// Verify that NO digest was recorded for this query because it was bypassed
int rc = run_q(admin, "SELECT count(*) FROM stats_mysql_query_digest WHERE digest_text LIKE '%xxxx%'");
int rc = run_q(admin, "SELECT count(*) FROM stats_mysql_query_digest");
MYSQL_RES* res = mysql_store_result(admin);
MYSQL_ROW row = mysql_fetch_row(res);
int count = atoi(row[0]);
ok(count == 0, "Query larger than threshold was correctly bypassed (count: %d)", count);
ok(count == 0, "No digests recorded for queries exceeding threshold (count: %d)", count);
mysql_free_result(res);
mysql_close(conn);

@ -29,6 +29,8 @@ void verify_digest(MYSQL* admin, const char* template_text, int expected_count,
int rc = run_q(admin, query);
if (rc != 0) {
ok(0, "Failed to query stats_mysql_query_digest for %s", template_text);
ok(0, "Skipping rows_affected check due to query failure");
ok(0, "Skipping rows_sent check due to query failure");
return;
}
MYSQL_RES* res = mysql_store_result(admin);
@ -43,6 +45,8 @@ void verify_digest(MYSQL* admin, const char* template_text, int expected_count,
ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent);
} else {
ok(0, "Digest NOT found for pattern: %s", template_text);
ok(0, "Skipping rows_affected check (digest not found)");
ok(0, "Skipping rows_sent check (digest not found)");
// Dump the table to see what's actually in there
diag("Dumping stats_mysql_query_digest for debugging:");
run_q(admin, "SELECT digest_text, count_star FROM stats_mysql_query_digest");

@ -30,6 +30,8 @@ void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_coun
int rc = run_q(admin, query);
if (rc != 0) {
ok(0, "Failed to query stats_pgsql_query_digest for %s", template_text);
ok(0, "Skipping PG rows_affected check due to query failure");
ok(0, "Skipping PG rows_sent check due to query failure");
return;
}
MYSQL_RES* res = mysql_store_result(admin);
@ -44,6 +46,8 @@ void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_coun
ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent);
} else {
ok(0, "PG Digest NOT found for pattern: %s", template_text);
ok(0, "Skipping PG rows_affected check (digest not found)");
ok(0, "Skipping PG rows_sent check (digest not found)");
diag("Dumping stats_pgsql_query_digest for debugging:");
run_q(admin, "SELECT digest_text, count_star FROM stats_pgsql_query_digest");
MYSQL_RES* dump_res = mysql_store_result(admin);

Loading…
Cancel
Save