From 1258a1f2f25195cdfd8530774dd8555e4ab20b2a Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Thu, 19 Feb 2026 23:16:02 +0000 Subject: [PATCH] Address PR reviews: improve performance, safety, and robustness of FFTO --- include/MySQLFFTO.hpp | 8 +- include/MySQL_Session.h | 10 +-- include/MySQL_Thread.h | 4 +- include/PgSQLFFTO.hpp | 14 +-- include/PgSQL_Thread.h | 4 +- include/TrafficObserver.hpp | 8 +- lib/MySQLFFTO.cpp | 56 +++++++++--- lib/MySQL_Session.cpp | 38 ++++---- lib/MySQL_Thread.cpp | 2 +- lib/PgSQLFFTO.cpp | 119 +++++++++++++++++++------- lib/PgSQL_Session.cpp | 35 ++++---- lib/PgSQL_Thread.cpp | 2 +- src/main.cpp | 1 - test/tap/tests/test_ffto_bypass-t.cpp | 6 +- test/tap/tests/test_ffto_mysql-t.cpp | 4 + test/tap/tests/test_ffto_pgsql-t.cpp | 4 + 16 files changed, 202 insertions(+), 113 deletions(-) diff --git a/include/MySQLFFTO.hpp b/include/MySQLFFTO.hpp index 96afcb3be..92ae34752 100644 --- a/include/MySQLFFTO.hpp +++ b/include/MySQLFFTO.hpp @@ -28,21 +28,21 @@ public: /** * @brief Destructor for MySQLFFTO. Ensures cleanup and reports any pending query stats. */ - virtual ~MySQLFFTO(); + ~MySQLFFTO() override; /** * @brief Entry point for data received from the client. * @param buf Pointer to the raw data buffer. * @param len Length of the data in bytes. */ - void on_client_data(const char* buf, size_t len) override; + void on_client_data(const char* buf, std::size_t len) override; /** * @brief Entry point for data received from the MySQL server. * @param buf Pointer to the raw data buffer. * @param len Length of the data in bytes. */ - void on_server_data(const char* buf, size_t len) override; + void on_server_data(const char* buf, std::size_t len) override; /** * @brief Called when the session is closing. Ensures the final query's metrics are reported. @@ -66,6 +66,8 @@ private: State m_state; ///< Current state of the protocol parser. std::vector m_client_buffer; ///< Temporary buffer for client-side packet reassembly. std::vector m_server_buffer; ///< Temporary buffer for server-side packet reassembly. + std::size_t m_client_offset {0}; ///< Current read offset in m_client_buffer. + std::size_t m_server_offset {0}; ///< Current read offset in m_server_buffer. std::string m_current_query; ///< The query currently being tracked. std::string m_pending_prepare_query; ///< The SQL text of a pending prepare statement. diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index bbc3630cc..f3243c751 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -486,11 +486,11 @@ class MySQL_Session: public Base_Session m_ffto; - bool ffto_bypassed; - - ProxySQL_Node_Address * proxysql_node_address; + Session_Regex **match_regexes; + std::unique_ptr m_ffto; + bool ffto_bypassed; + + ProxySQL_Node_Address * proxysql_node_address; // this is used ONLY for Admin, and only if the other party is another proxysql instance part of a cluster bool use_ldap_auth; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 84ee1cf78..a4f9b4e2d 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -415,8 +415,6 @@ class MySQL_Threads_Handler bool monitor_wait_timeout; bool monitor_writer_is_also_reader; bool monitor_replication_lag_group_by_host; - bool ffto_enabled; - int ffto_max_buffer_size; //! How frequently a replication lag check is performed. Unit: 'ms'. int monitor_replication_lag_interval; //! Read only check timeout. Unit: 'ms'. @@ -597,6 +595,8 @@ class MySQL_Threads_Handler int processlist_max_query_length; bool ignore_min_gtid_annotations; + bool ffto_enabled; + int ffto_max_buffer_size; } variables; struct { unsigned int mirror_sessions_current; diff --git a/include/PgSQLFFTO.hpp b/include/PgSQLFFTO.hpp index 3a5b26c09..b61fa970e 100644 --- a/include/PgSQLFFTO.hpp +++ b/include/PgSQLFFTO.hpp @@ -8,10 +8,6 @@ class PgSQL_Session; -/** - * @class PgSQLFFTO - * @brief PostgreSQL-specific implementation of TrafficObserver. - */ /** * @class PgSQLFFTO * @brief Observer class for PostgreSQL traffic in Fast-Forward mode. @@ -31,21 +27,21 @@ public: /** * @brief Destructor for PgSQLFFTO. Ensures cleanup and metrics reporting. */ - virtual ~PgSQLFFTO(); + ~PgSQLFFTO() override; /** * @brief Entry point for data received from the PostgreSQL client. * @param buf Pointer to the raw data buffer. * @param len Length of the data in bytes. */ - void on_client_data(const char* buf, size_t len) override; + void on_client_data(const char* buf, std::size_t len) override; /** * @brief Entry point for data received from the PostgreSQL server. * @param buf Pointer to the raw data buffer. * @param len Length of the data in bytes. */ - void on_server_data(const char* buf, size_t len) override; + void on_server_data(const char* buf, std::size_t len) override; /** * @brief Called when the PostgreSQL session is closing. Ensures final stats are reported. @@ -66,9 +62,13 @@ private: State m_state; ///< Current state of the protocol observer. std::vector m_client_buffer; ///< Temporary buffer for client message reassembly. std::vector m_server_buffer; ///< Temporary buffer for server message reassembly. + std::size_t m_client_offset {0}; ///< Current read offset in m_client_buffer. + std::size_t m_server_offset {0}; ///< Current read offset in m_server_buffer. std::string m_current_query; ///< The SQL query currently being tracked. unsigned long long m_query_start_time; ///< Start timestamp of the current query in microseconds. + uint64_t m_affected_rows {0}; ///< Accumulated affected rows for the current query. + uint64_t m_rows_sent {0}; ///< Accumulated rows sent for the current query. // Binary Protocol Tracking (PostgreSQL Extended Query) std::unordered_map m_statements; ///< Map of statement names to original SQL text. diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 68e2d9ec1..61a1a9f9f 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -902,8 +902,6 @@ public: bool monitor_wait_timeout; bool monitor_writer_is_also_reader; bool monitor_replication_lag_group_by_host; - bool ffto_enabled; - int ffto_max_buffer_size; //! How frequently a replication lag check is performed. Unit: 'ms'. int monitor_replication_lag_interval; //! Read only check timeout. Unit: 'ms'. @@ -1068,6 +1066,8 @@ public: #endif int show_processlist_extended; int processlist_max_query_length; + bool ffto_enabled; + int ffto_max_buffer_size; } variables; struct { unsigned int mirror_sessions_current; diff --git a/include/TrafficObserver.hpp b/include/TrafficObserver.hpp index 43587b626..1057810b2 100644 --- a/include/TrafficObserver.hpp +++ b/include/TrafficObserver.hpp @@ -1,7 +1,7 @@ #ifndef TRAFFIC_OBSERVER_HPP #define TRAFFIC_OBSERVER_HPP -#include +#include /** * @class TrafficObserver @@ -13,21 +13,21 @@ */ class TrafficObserver { public: - virtual ~TrafficObserver() {} + virtual ~TrafficObserver() = default; /** * @brief Called when data is received from the client. * @param buf Pointer to the raw protocol data buffer. * @param len Length of the data in the buffer. */ - virtual void on_client_data(const char* buf, size_t len) = 0; + virtual void on_client_data(const char* buf, std::size_t len) = 0; /** * @brief Called when data is received from the server. * @param buf Pointer to the raw protocol data buffer. * @param len Length of the data in the buffer. */ - virtual void on_server_data(const char* buf, size_t len) = 0; + virtual void on_server_data(const char* buf, std::size_t len) = 0; /** * @brief Called when the session is closing. diff --git a/lib/MySQLFFTO.cpp b/lib/MySQLFFTO.cpp index 61d4f47bd..5a4a20964 100644 --- a/lib/MySQLFFTO.cpp +++ b/lib/MySQLFFTO.cpp @@ -64,29 +64,47 @@ MySQLFFTO::~MySQLFFTO() { on_close(); } -void MySQLFFTO::on_client_data(const char* buf, size_t len) { +void MySQLFFTO::on_client_data(const char* buf, std::size_t len) { if (!buf || len == 0) return; m_client_buffer.insert(m_client_buffer.end(), buf, buf + len); - while (m_client_buffer.size() >= sizeof(mysql_hdr)) { - const mysql_hdr* hdr = reinterpret_cast(m_client_buffer.data()); + while (m_client_buffer.size() - m_client_offset >= sizeof(mysql_hdr)) { + const mysql_hdr* hdr = reinterpret_cast(m_client_buffer.data() + m_client_offset); uint32_t pkt_len = hdr->pkt_length; - if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) break; - const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + sizeof(mysql_hdr); + if (m_client_buffer.size() - m_client_offset < sizeof(mysql_hdr) + pkt_len) break; + const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + m_client_offset + sizeof(mysql_hdr); process_client_packet(payload, pkt_len); - m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + sizeof(mysql_hdr) + pkt_len); + m_client_offset += sizeof(mysql_hdr) + pkt_len; + } + if (m_client_offset > 0) { + if (m_client_offset >= m_client_buffer.size()) { + m_client_buffer.clear(); + m_client_offset = 0; + } else if (m_client_offset > 4096) { // Compact if offset is large + m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + m_client_offset); + m_client_offset = 0; + } } } -void MySQLFFTO::on_server_data(const char* buf, size_t len) { +void MySQLFFTO::on_server_data(const char* buf, std::size_t len) { if (!buf || len == 0) return; m_server_buffer.insert(m_server_buffer.end(), buf, buf + len); - while (m_server_buffer.size() >= sizeof(mysql_hdr)) { - const mysql_hdr* hdr = reinterpret_cast(m_server_buffer.data()); + while (m_server_buffer.size() - m_server_offset >= sizeof(mysql_hdr)) { + const mysql_hdr* hdr = reinterpret_cast(m_server_buffer.data() + m_server_offset); uint32_t pkt_len = hdr->pkt_length; - if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) break; - const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + sizeof(mysql_hdr); + if (m_server_buffer.size() - m_server_offset < sizeof(mysql_hdr) + pkt_len) break; + const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + m_server_offset + sizeof(mysql_hdr); process_server_packet(payload, pkt_len); - m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + sizeof(mysql_hdr) + pkt_len); + m_server_offset += sizeof(mysql_hdr) + pkt_len; + } + if (m_server_offset > 0) { + if (m_server_offset >= m_server_buffer.size()) { + m_server_buffer.clear(); + m_server_offset = 0; + } else if (m_server_offset > 4096) { // Compact if offset is large + m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + m_server_offset); + m_server_offset = 0; + } } } @@ -120,6 +138,11 @@ void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) { m_affected_rows = 0; m_rows_sent = 0; } } + } else if (command == _MYSQL_COM_STMT_CLOSE) { + if (len >= 5) { + uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4); + m_statements.erase(stmt_id); + } } else if (command == _MYSQL_COM_QUIT) { on_close(); } @@ -129,6 +152,11 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { if (len == 0 || m_state == IDLE) return; uint8_t first_byte = data[0]; + bool deprecate_eof = false; + if (m_session && m_session->client_myds && m_session->client_myds->myconn) { + deprecate_eof = (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF); + } + if (m_state == AWAITING_PREPARE_OK) { if (first_byte == 0x00 && len >= 9) { uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4); @@ -156,7 +184,7 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { } else if (m_state == READING_COLUMNS) { if (first_byte == 0xFE && len < 9) { // EOF after columns m_state = READING_ROWS; - } else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) { + } else if (first_byte == 0xFE && len >= 9 && deprecate_eof) { m_state = READING_ROWS; } } else if (m_state == READING_ROWS) { @@ -164,7 +192,7 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, 0, m_rows_sent); m_state = IDLE; - } else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) { + } else if (first_byte == 0xFE && len >= 9 && deprecate_eof) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, 0, m_rows_sent); m_state = IDLE; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index d2d0c0eda..ac095769f 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -729,14 +729,15 @@ void MySQL_Session::reset() { delete mybes; mybes=NULL; } - mybe=NULL; - - with_gtid = false; - backend_closed_in_fast_forward = false; - fast_forward_grace_start_time = 0; - - //gtid_trxid = 0; - gtid_hid = -1; + mybe=NULL; + + with_gtid = false; + backend_closed_in_fast_forward = false; + fast_forward_grace_start_time = 0; + ffto_bypassed = false; + m_ffto.reset(); + + //gtid_trxid = 0; gtid_hid = -1; memset(gtid_buf,0,sizeof(gtid_buf)); if (session_type == PROXYSQL_SESSION_SQLITE) { SQLite3_Session *sqlite_sess = (SQLite3_Session *)thread->gen_args; @@ -6023,17 +6024,16 @@ handler_again: // register the mysql_data_stream thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); } - if (mysql_thread___ffto_enabled && !ffto_bypassed) { - if (!m_ffto) { - m_ffto = std::make_unique(this); - } - if (m_ffto) { - for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) { - m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size); - } - } - } - client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len); + if (mysql_thread___ffto_enabled && !ffto_bypassed && m_ffto) { + for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) { + if (mybe->server_myds->PSarrayIN->pdata[i].size > (size_t)mysql_thread___ffto_max_buffer_size) { + ffto_bypassed = true; + m_ffto.reset(); + break; + } + m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size); + } + } client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len); while (mybe->server_myds->PSarrayIN->len) mybe->server_myds->PSarrayIN->remove_index(mybe->server_myds->PSarrayIN->len-1,NULL); break; case CONNECTING_CLIENT: diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 33a4d9c01..7c8e934e5 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -2562,7 +2562,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false); VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false); VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false); - VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 0, 1024*1024*1024, false); + VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 1, 1024*1024*1024, false); VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false); VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false); VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false); diff --git a/lib/PgSQLFFTO.cpp b/lib/PgSQLFFTO.cpp index c23c7e30e..fec395e58 100644 --- a/lib/PgSQLFFTO.cpp +++ b/lib/PgSQLFFTO.cpp @@ -29,9 +29,10 @@ extern class PgSQL_Query_Processor* GloPgQPro; * @return The number of rows affected or sent. */ static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t len, bool& is_select) { + if (len == 0) return 0; std::string command_tag(reinterpret_cast(payload), len); is_select = false; - std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\s+\\S*\\s*(\\d+)"); + static const std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\b.*?\\b(\\d+)$"); std::smatch matches; if (std::regex_search(command_tag, matches, re)) { if (matches.size() == 3) { @@ -45,7 +46,7 @@ static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t le } PgSQLFFTO::PgSQLFFTO(PgSQL_Session* session) - : m_session(session), m_state(IDLE), m_query_start_time(0) { + : m_session(session), m_state(IDLE), m_query_start_time(0), m_affected_rows(0), m_rows_sent(0) { m_client_buffer.reserve(1024); m_server_buffer.reserve(4096); } @@ -54,65 +55,119 @@ PgSQLFFTO::~PgSQLFFTO() { on_close(); } -void PgSQLFFTO::on_client_data(const char* buf, size_t len) { +void PgSQLFFTO::on_client_data(const char* buf, std::size_t len) { if (!buf || len == 0) return; m_client_buffer.insert(m_client_buffer.end(), buf, buf + len); - while (m_client_buffer.size() >= 5) { - char type = m_client_buffer[0]; - uint32_t msg_len; memcpy(&msg_len, &m_client_buffer[1], 4); msg_len = ntohl(msg_len); - if (m_client_buffer.size() < 1 + msg_len) break; - const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + 5; + while (m_client_buffer.size() - m_client_offset >= 5) { + char type = m_client_buffer[m_client_offset]; + uint32_t msg_len; memcpy(&msg_len, &m_client_buffer[m_client_offset + 1], 4); msg_len = ntohl(msg_len); + if (msg_len < 4 || msg_len > 1024 * 1024 * 1024) { // Sanity check + on_close(); + m_client_buffer.clear(); m_client_offset = 0; + return; + } + if (m_client_buffer.size() - m_client_offset < 1 + msg_len) break; + const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + m_client_offset + 5; process_client_message(type, payload, msg_len - 4); - m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + 1 + msg_len); + m_client_offset += 1 + msg_len; + } + if (m_client_offset > 0) { + if (m_client_offset >= m_client_buffer.size()) { + m_client_buffer.clear(); + m_client_offset = 0; + } else if (m_client_offset > 4096) { + m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + m_client_offset); + m_client_offset = 0; + } } } -void PgSQLFFTO::on_server_data(const char* buf, size_t len) { +void PgSQLFFTO::on_server_data(const char* buf, std::size_t len) { if (!buf || len == 0) return; m_server_buffer.insert(m_server_buffer.end(), buf, buf + len); - while (m_server_buffer.size() >= 5) { - char type = m_server_buffer[0]; - uint32_t msg_len; memcpy(&msg_len, &m_server_buffer[1], 4); msg_len = ntohl(msg_len); - if (m_server_buffer.size() < 1 + msg_len) break; - const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + 5; + while (m_server_buffer.size() - m_server_offset >= 5) { + char type = m_server_buffer[m_server_offset]; + uint32_t msg_len; memcpy(&msg_len, &m_server_buffer[m_server_offset + 1], 4); msg_len = ntohl(msg_len); + if (msg_len < 4 || msg_len > 1024 * 1024 * 1024) { // Sanity check + on_close(); + m_server_buffer.clear(); m_server_offset = 0; + return; + } + if (m_server_buffer.size() - m_server_offset < 1 + msg_len) break; + const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + m_server_offset + 5; process_server_message(type, payload, msg_len - 4); - m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + 1 + msg_len); + m_server_offset += 1 + msg_len; + } + if (m_server_offset > 0) { + if (m_server_offset >= m_server_buffer.size()) { + m_server_buffer.clear(); + m_server_offset = 0; + } else if (m_server_offset > 4096) { + m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + m_server_offset); + m_server_offset = 0; + } } } void PgSQLFFTO::on_close() { if (m_state != IDLE && m_query_start_time != 0) { unsigned long long duration = monotonic_time() - m_query_start_time; - report_query_stats(m_current_query, duration); + report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent); m_state = IDLE; } } void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, size_t len) { if (type == 'Q') { - m_current_query = std::string(reinterpret_cast(payload), len); + size_t query_len = (len > 0 && payload[len - 1] == 0) ? len - 1 : len; + m_current_query = std::string(reinterpret_cast(payload), query_len); m_query_start_time = monotonic_time(); m_state = AWAITING_RESPONSE; + m_affected_rows = 0; m_rows_sent = 0; } else if (type == 'P') { - std::string stmt_name = reinterpret_cast(payload); - const char* query = reinterpret_cast(payload) + stmt_name.length() + 1; - m_statements[stmt_name] = query; + const char* p = reinterpret_cast(payload); + size_t name_len = strnlen(p, len); + if (name_len >= len) return; // No null terminator + std::string stmt_name(p, name_len); + const char* query_ptr = p + name_len + 1; + size_t rem = len - (name_len + 1); + size_t query_text_len = strnlen(query_ptr, rem); + m_statements[stmt_name] = std::string(query_ptr, query_text_len); } else if (type == 'B') { - std::string portal_name = reinterpret_cast(payload); - const char* stmt_ptr = reinterpret_cast(payload) + portal_name.length() + 1; - std::string stmt_name = stmt_ptr; - m_portals[portal_name] = stmt_name; + const char* p = reinterpret_cast(payload); + size_t portal_len = strnlen(p, len); + if (portal_len >= len) return; + std::string portal_name(p, portal_len); + const char* stmt_ptr = p + portal_len + 1; + size_t rem = len - (portal_len + 1); + size_t stmt_name_len = strnlen(stmt_ptr, rem); + if (stmt_name_len >= rem) return; + m_portals[portal_name] = std::string(stmt_ptr, stmt_name_len); } else if (type == 'E') { - std::string portal_name = reinterpret_cast(payload); + const char* p = reinterpret_cast(payload); + size_t portal_len = strnlen(p, len); + std::string portal_name(p, std::min(portal_len, len)); auto pit = m_portals.find(portal_name); if (pit != m_portals.end()) { auto sit = m_statements.find(pit->second); if (sit != m_statements.end()) { + if (m_state == AWAITING_RESPONSE) { + on_close(); // Finalize previous if any + } m_current_query = sit->second; m_query_start_time = monotonic_time(); m_state = AWAITING_RESPONSE; + m_affected_rows = 0; m_rows_sent = 0; } } + } else if (type == 'C') { // Frontend Close + if (len < 1) return; + char close_type = static_cast(payload[0]); + const char* name_ptr = reinterpret_cast(payload) + 1; + size_t name_len = strnlen(name_ptr, len - 1); + std::string name(name_ptr, name_len); + if (close_type == 'S') m_statements.erase(name); + else if (close_type == 'P') m_portals.erase(name); } else if (type == 'X') { on_close(); } @@ -121,20 +176,18 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, void PgSQLFFTO::process_server_message(char type, const unsigned char* payload, size_t len) { if (m_state == IDLE) return; if (type == 'C') { - unsigned long long duration = monotonic_time() - m_query_start_time; bool is_select = false; uint64_t rows = extract_pg_rows_affected(payload, len, is_select); - if (is_select) report_query_stats(m_current_query, duration, 0, rows); - else report_query_stats(m_current_query, duration, rows, 0); - m_state = IDLE; + if (is_select) m_rows_sent += rows; + else m_affected_rows += rows; } else if (type == 'Z' || type == 'E') { - // ReadyForQuery or ErrorResponse. We don't always want to report stats here if 'C' already did it. - // But if we didn't get 'C', report what we have. + // ReadyForQuery or ErrorResponse if (m_state == AWAITING_RESPONSE) { unsigned long long duration = monotonic_time() - m_query_start_time; - report_query_stats(m_current_query, duration); + report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent); } m_state = IDLE; + m_affected_rows = 0; m_rows_sent = 0; } } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 124e889f0..c096514a1 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -306,13 +306,13 @@ void PgSQL_Session::reset() { } } } - if (client_myds && client_myds->myconn) { - client_myds->myconn->reset(); - } - extended_query_phase = EXTQ_PHASE_IDLE; -} - -PgSQL_Session::~PgSQL_Session() { + if (client_myds && client_myds->myconn) { + client_myds->myconn->reset(); + } + extended_query_phase = EXTQ_PHASE_IDLE; + ffto_bypassed = false; + m_ffto.reset(); + }PgSQL_Session::~PgSQL_Session() { if (m_ffto) { m_ffto->on_close(); } @@ -2729,17 +2729,16 @@ handler_again: // register the PgSQL_Data_Stream thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); } - if (pgsql_thread___ffto_enabled && !ffto_bypassed) { - if (!m_ffto) { - m_ffto = std::make_unique(this); - } - if (m_ffto) { - for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) { - m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size); - } - } - } - client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len); + if (pgsql_thread___ffto_enabled && !ffto_bypassed && m_ffto) { + for (unsigned int i = 0; i < mybe->server_myds->PSarrayIN->len; i++) { + if (mybe->server_myds->PSarrayIN->pdata[i].size > (size_t)pgsql_thread___ffto_max_buffer_size) { + ffto_bypassed = true; + m_ffto.reset(); + break; + } + m_ffto->on_server_data((const char*)mybe->server_myds->PSarrayIN->pdata[i].ptr, mybe->server_myds->PSarrayIN->pdata[i].size); + } + } client_myds->PSarrayOUT->copy_add(mybe->server_myds->PSarrayIN, 0, mybe->server_myds->PSarrayIN->len); constexpr unsigned char ready_packet[] = { 0x5A, 0x00, 0x00, 0x00, 0x05 }; bool is_copy_ready_packet = false; diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index c3e25f535..cd4a966a1 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -2223,7 +2223,7 @@ char** PgSQL_Threads_Handler::get_variables_list() { VariablesPointers_int["poll_timeout_on_failure"] = make_tuple(&variables.poll_timeout_on_failure, 10, 20000, false); VariablesPointers_int["shun_on_failures"] = make_tuple(&variables.shun_on_failures, 0, 10000000, false); VariablesPointers_int["shun_recovery_time_sec"] = make_tuple(&variables.shun_recovery_time_sec, 0, 3600 * 24 * 365, false); - VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 0, 1024 * 1024 * 1024, false); + VariablesPointers_int["ffto_max_buffer_size"] = make_tuple(&variables.ffto_max_buffer_size, 1, 1024 * 1024 * 1024, false); VariablesPointers_int["unshun_algorithm"] = make_tuple(&variables.unshun_algorithm, 0, 1, false); VariablesPointers_int["hostgroup_manager_verbose"] = make_tuple(&variables.hostgroup_manager_verbose, 0, 3, false); VariablesPointers_int["tcp_keepalive_time"] = make_tuple(&variables.tcp_keepalive_time, 0, 7200, false); diff --git a/src/main.cpp b/src/main.cpp index 36ff4ee7d..75c775d3f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1505,7 +1505,6 @@ void ProxySQL_Main_init_phase2___not_started(const bootstrap_info_t& boostrap_in } #endif /* PROXYSQLGENAI */ ProxySQL_Main_init_Admin_module(boostrap_info); - proxy_info("TEST_LOG: ProxySQL is starting\n"); GloMTH->print_version(); { diff --git a/test/tap/tests/test_ffto_bypass-t.cpp b/test/tap/tests/test_ffto_bypass-t.cpp index 7a0e2ffb8..8c4b80cf2 100644 --- a/test/tap/tests/test_ffto_bypass-t.cpp +++ b/test/tap/tests/test_ffto_bypass-t.cpp @@ -54,14 +54,14 @@ int main(int argc, char** argv) { for(int i=0; i<200; i++) large_query += "x"; large_query += "'"; - mysql_query(conn, large_query.c_str()); + MYSQL_QUERY(conn, large_query.c_str()); // Verify that NO digest was recorded for this query because it was bypassed - int rc = run_q(admin, "SELECT count(*) FROM stats_mysql_query_digest WHERE digest_text LIKE '%xxxx%'"); + int rc = run_q(admin, "SELECT count(*) FROM stats_mysql_query_digest"); MYSQL_RES* res = mysql_store_result(admin); MYSQL_ROW row = mysql_fetch_row(res); int count = atoi(row[0]); - ok(count == 0, "Query larger than threshold was correctly bypassed (count: %d)", count); + ok(count == 0, "No digests recorded for queries exceeding threshold (count: %d)", count); mysql_free_result(res); mysql_close(conn); diff --git a/test/tap/tests/test_ffto_mysql-t.cpp b/test/tap/tests/test_ffto_mysql-t.cpp index 55fe07be1..e14448998 100644 --- a/test/tap/tests/test_ffto_mysql-t.cpp +++ b/test/tap/tests/test_ffto_mysql-t.cpp @@ -29,6 +29,8 @@ void verify_digest(MYSQL* admin, const char* template_text, int expected_count, int rc = run_q(admin, query); if (rc != 0) { ok(0, "Failed to query stats_mysql_query_digest for %s", template_text); + ok(0, "Skipping rows_affected check due to query failure"); + ok(0, "Skipping rows_sent check due to query failure"); return; } MYSQL_RES* res = mysql_store_result(admin); @@ -43,6 +45,8 @@ void verify_digest(MYSQL* admin, const char* template_text, int expected_count, ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent); } else { ok(0, "Digest NOT found for pattern: %s", template_text); + ok(0, "Skipping rows_affected check (digest not found)"); + ok(0, "Skipping rows_sent check (digest not found)"); // Dump the table to see what's actually in there diag("Dumping stats_mysql_query_digest for debugging:"); run_q(admin, "SELECT digest_text, count_star FROM stats_mysql_query_digest"); diff --git a/test/tap/tests/test_ffto_pgsql-t.cpp b/test/tap/tests/test_ffto_pgsql-t.cpp index d55ba4eb5..3fb837292 100644 --- a/test/tap/tests/test_ffto_pgsql-t.cpp +++ b/test/tap/tests/test_ffto_pgsql-t.cpp @@ -30,6 +30,8 @@ void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_coun int rc = run_q(admin, query); if (rc != 0) { ok(0, "Failed to query stats_pgsql_query_digest for %s", template_text); + ok(0, "Skipping PG rows_affected check due to query failure"); + ok(0, "Skipping PG rows_sent check due to query failure"); return; } MYSQL_RES* res = mysql_store_result(admin); @@ -44,6 +46,8 @@ void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_coun ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent); } else { ok(0, "PG Digest NOT found for pattern: %s", template_text); + ok(0, "Skipping PG rows_affected check (digest not found)"); + ok(0, "Skipping PG rows_sent check (digest not found)"); diag("Dumping stats_pgsql_query_digest for debugging:"); run_q(admin, "SELECT digest_text, count_star FROM stats_pgsql_query_digest"); MYSQL_RES* dump_res = mysql_store_result(admin);