diff --git a/doc/ffto_design.md b/doc/ffto_design.md index 0f68a5c40..2d6484dfa 100644 --- a/doc/ffto_design.md +++ b/doc/ffto_design.md @@ -82,6 +82,11 @@ To verify that FFTO is capturing traffic in Fast Forward mode: ``` 4. Confirm that queries which were previously "invisible" in FF mode are now being recorded. -## 8. Limitations -- **Multi-packet query execution**: Very large queries that exceed the max buffer size will cause FFTO to bypass the session. -- **Binary Protocols**: FFTO currently focuses on the text-based query protocols; specialized binary protocols (like some X-Protocol features) may require future extensions. +## 8. Protocol Support +- **Text and Binary Protocols**: FFTO supports both standard text-based queries and the binary protocol used by prepared statements. +- **MySQL Binary Protocol**: Corrects captures `COM_STMT_PREPARE` and `COM_STMT_EXECUTE`, tracking statement IDs to their respective SQL text. +- **PostgreSQL Extended Query**: Supports the multi-phase `Parse` -> `Bind` -> `Execute` sequence by tracking Statement and Portal mappings. + +## 9. Limitations +- **Large Payloads**: Packets exceeding the `*-ffto_max_buffer_size` threshold cause FFTO to be bypassed for that session. +- **X-Protocol**: Currently optimized for classic MySQL and PostgreSQL protocols. diff --git a/include/MySQLFFTO.hpp b/include/MySQLFFTO.hpp index 713e1ec5c..22140e8e5 100644 --- a/include/MySQLFFTO.hpp +++ b/include/MySQLFFTO.hpp @@ -4,6 +4,8 @@ #include "TrafficObserver.hpp" #include #include +#include +#include class MySQL_Session; @@ -23,9 +25,9 @@ public: private: enum State { IDLE, + AWAITING_PREPARE_OK, AWAITING_RESULTSET, - READING_RESULTSET, - AWAITING_OK_ERR + READING_RESULTSET }; MySQL_Session* m_session; @@ -34,8 +36,12 @@ private: std::vector m_server_buffer; std::string m_current_query; + std::string m_pending_prepare_query; unsigned long long m_query_start_time; + // Binary Protocol Tracking: statement_id -> query_text + std::unordered_map m_statements; + void process_client_packet(const unsigned char* data, size_t len); void process_server_packet(const unsigned char* data, size_t len); void report_query_stats(const std::string& query, unsigned long long duration_us); diff --git a/include/PgSQLFFTO.hpp b/include/PgSQLFFTO.hpp index ba3631a25..8ec1063ad 100644 --- a/include/PgSQLFFTO.hpp +++ b/include/PgSQLFFTO.hpp @@ -4,6 +4,7 @@ #include "TrafficObserver.hpp" #include #include +#include class PgSQL_Session; @@ -34,6 +35,10 @@ private: std::string m_current_query; unsigned long long m_query_start_time; + // Binary Protocol Tracking (PostgreSQL Extended Query) + std::unordered_map m_statements; // name -> query + std::unordered_map m_portals; // portal -> name + void process_client_message(char type, const unsigned char* payload, size_t len); void process_server_message(char type, const unsigned char* payload, size_t len); void report_query_stats(const std::string& query, unsigned long long duration_us); diff --git a/lib/MySQLFFTO.cpp b/lib/MySQLFFTO.cpp index 069ede63d..47085e6b9 100644 --- a/lib/MySQLFFTO.cpp +++ b/lib/MySQLFFTO.cpp @@ -29,40 +29,30 @@ MySQLFFTO::~MySQLFFTO() { void MySQLFFTO::on_client_data(const char* buf, size_t len) { if (!buf || len == 0) return; - m_client_buffer.insert(m_client_buffer.end(), buf, buf + len); while (m_client_buffer.size() >= sizeof(mysql_hdr)) { const mysql_hdr* hdr = reinterpret_cast(m_client_buffer.data()); uint32_t pkt_len = hdr->pkt_length; - - if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) { - break; - } + if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) break; const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + sizeof(mysql_hdr); process_client_packet(payload, pkt_len); - m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + sizeof(mysql_hdr) + pkt_len); } } void MySQLFFTO::on_server_data(const char* buf, size_t len) { if (!buf || len == 0) return; - m_server_buffer.insert(m_server_buffer.end(), buf, buf + len); while (m_server_buffer.size() >= sizeof(mysql_hdr)) { const mysql_hdr* hdr = reinterpret_cast(m_server_buffer.data()); uint32_t pkt_len = hdr->pkt_length; - - if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) { - break; - } + if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) break; const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + sizeof(mysql_hdr); process_server_packet(payload, pkt_len); - m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + sizeof(mysql_hdr) + pkt_len); } } @@ -77,13 +67,25 @@ void MySQLFFTO::on_close() { void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) { if (len == 0) return; - uint8_t command = data[0]; - if (command == _MYSQL_COM_QUERY || command == _MYSQL_COM_STMT_EXECUTE || command == _MYSQL_COM_STMT_PREPARE) { - if (len > 1) { - m_current_query = std::string(reinterpret_cast(data + 1), len - 1); - m_query_start_time = monotonic_time(); - m_state = AWAITING_RESULTSET; + + if (command == _MYSQL_COM_QUERY) { + m_current_query = std::string(reinterpret_cast(data + 1), len - 1); + m_query_start_time = monotonic_time(); + m_state = AWAITING_RESULTSET; + } else if (command == _MYSQL_COM_STMT_PREPARE) { + m_pending_prepare_query = std::string(reinterpret_cast(data + 1), len - 1); + m_state = AWAITING_PREPARE_OK; + } else if (command == _MYSQL_COM_STMT_EXECUTE) { + if (len >= 5) { + uint32_t stmt_id; + memcpy(&stmt_id, data + 1, 4); // Little-endian stmt_id + auto it = m_statements.find(stmt_id); + if (it != m_statements.end()) { + m_current_query = it->second; + m_query_start_time = monotonic_time(); + m_state = AWAITING_RESULTSET; + } } } else if (command == _MYSQL_COM_QUIT) { on_close(); @@ -92,15 +94,19 @@ void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) { void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { if (len == 0 || m_state == IDLE) return; - uint8_t first_byte = data[0]; - - if (m_state == AWAITING_RESULTSET || m_state == READING_RESULTSET) { - if (first_byte == 0x00 || first_byte == 0xFF) { - unsigned long long duration = monotonic_time() - m_query_start_time; - report_query_stats(m_current_query, duration); + + if (m_state == AWAITING_PREPARE_OK) { + if (first_byte == 0x00 && len >= 5) { // COM_STMT_PREPARE_OK + uint32_t stmt_id; + memcpy(&stmt_id, data + 1, 4); + m_statements[stmt_id] = m_pending_prepare_query; + m_state = IDLE; + } else if (first_byte == 0xFF) { // ERR m_state = IDLE; - } else if (first_byte == 0xFE && len < 9) { + } + } else if (m_state == AWAITING_RESULTSET || m_state == READING_RESULTSET) { + if (first_byte == 0x00 || first_byte == 0xFF || (first_byte == 0xFE && len < 9)) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration); m_state = IDLE; @@ -125,17 +131,14 @@ void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long SQP_par_t qp; memset(&qp, 0, sizeof(qp)); char* fst_cmnt = NULL; - char* digest_text = mysql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts); + if (digest_text) { qp.digest_text = digest_text; qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0); - char* ca = (char*)""; - if (mysql_thread___query_digests_track_hostname) { - if (m_session->client_myds && m_session->client_myds->addr.addr) { - ca = m_session->client_myds->addr.addr; - } + if (mysql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) { + ca = m_session->client_myds->addr.addr; } uint64_t hash2; @@ -150,7 +153,6 @@ void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long myhash.Final(&qp.digest_total, &hash2); GloMyQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, 0, 0); - free(digest_text); } if (fst_cmnt) free(fst_cmnt); diff --git a/lib/PgSQLFFTO.cpp b/lib/PgSQLFFTO.cpp index cca3f730c..ca3e06441 100644 --- a/lib/PgSQLFFTO.cpp +++ b/lib/PgSQLFFTO.cpp @@ -27,7 +27,6 @@ PgSQLFFTO::~PgSQLFFTO() { void PgSQLFFTO::on_client_data(const char* buf, size_t len) { if (!buf || len == 0) return; - m_client_buffer.insert(m_client_buffer.end(), buf, buf + len); while (m_client_buffer.size() >= 5) { @@ -35,21 +34,16 @@ void PgSQLFFTO::on_client_data(const char* buf, size_t len) { uint32_t msg_len; memcpy(&msg_len, &m_client_buffer[1], 4); msg_len = ntohl(msg_len); - - if (m_client_buffer.size() < 1 + msg_len) { - break; - } + if (m_client_buffer.size() < 1 + msg_len) break; const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + 5; process_client_message(type, payload, msg_len - 4); - m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + 1 + msg_len); } } void PgSQLFFTO::on_server_data(const char* buf, size_t len) { if (!buf || len == 0) return; - m_server_buffer.insert(m_server_buffer.end(), buf, buf + len); while (m_server_buffer.size() >= 5) { @@ -57,14 +51,10 @@ void PgSQLFFTO::on_server_data(const char* buf, size_t len) { uint32_t msg_len; memcpy(&msg_len, &m_server_buffer[1], 4); msg_len = ntohl(msg_len); - - if (m_server_buffer.size() < 1 + msg_len) { - break; - } + if (m_server_buffer.size() < 1 + msg_len) break; const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + 5; process_server_message(type, payload, msg_len - 4); - m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + 1 + msg_len); } } @@ -79,20 +69,28 @@ void PgSQLFFTO::on_close() { void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, size_t len) { if (type == 'Q') { // Simple Query - if (len > 0) { - m_current_query = std::string(reinterpret_cast(payload), len); - m_query_start_time = monotonic_time(); - m_state = AWAITING_RESPONSE; - } - } else if (type == 'P') { // Parse (Extended Query) - const char* query = reinterpret_cast(payload); - while (*query != '\0' && (size_t)(query - reinterpret_cast(payload)) < len) { - query++; - } - if (*query == '\0' && (size_t)(query + 1 - reinterpret_cast(payload)) < len) { - m_current_query = std::string(query + 1); - m_query_start_time = monotonic_time(); - m_state = AWAITING_RESPONSE; + m_current_query = std::string(reinterpret_cast(payload), len); + m_query_start_time = monotonic_time(); + m_state = AWAITING_RESPONSE; + } else if (type == 'P') { // Parse + std::string stmt_name = reinterpret_cast(payload); + const char* query = reinterpret_cast(payload) + stmt_name.length() + 1; + m_statements[stmt_name] = query; + } else if (type == 'B') { // Bind + std::string portal_name = reinterpret_cast(payload); + const char* stmt_ptr = reinterpret_cast(payload) + portal_name.length() + 1; + std::string stmt_name = stmt_ptr; + m_portals[portal_name] = stmt_name; + } else if (type == 'E') { // Execute + std::string portal_name = reinterpret_cast(payload); + auto pit = m_portals.find(portal_name); + if (pit != m_portals.end()) { + auto sit = m_statements.find(pit->second); + if (sit != m_statements.end()) { + m_current_query = sit->second; + m_query_start_time = monotonic_time(); + m_state = AWAITING_RESPONSE; + } } } else if (type == 'X') { // Terminate on_close(); @@ -101,7 +99,6 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, void PgSQLFFTO::process_server_message(char type, const unsigned char* payload, size_t len) { if (m_state == IDLE) return; - if (type == 'C' || type == 'Z' || type == 'E') { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration); @@ -124,17 +121,14 @@ void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long SQP_par_t qp; memset(&qp, 0, sizeof(qp)); char* fst_cmnt = NULL; - char* digest_text = pgsql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts); + if (digest_text) { qp.digest_text = digest_text; qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0); - char* ca = (char*)""; - if (pgsql_thread___query_digests_track_hostname) { - if (m_session->client_myds && m_session->client_myds->addr.addr) { - ca = m_session->client_myds->addr.addr; - } + if (pgsql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) { + ca = m_session->client_myds->addr.addr; } uint64_t hash2; @@ -149,7 +143,6 @@ void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long myhash.Final(&qp.digest_total, &hash2); GloPgQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, 0, 0); - free(digest_text); } if (fst_cmnt) free(fst_cmnt);