Fix binary protocol support for prepared statements in FFTO

- Implemented statement ID tracking for MySQL COM_STMT_PREPARE/EXECUTE
- Implemented Parse/Bind/Execute portal mapping for PostgreSQL Extended Query
- Corrected documentation regarding binary protocol support
- Ensured metric parity for prepared statements in Fast Forward mode
pull/5393/head
Rene Cannao 3 months ago
parent 83a631bb18
commit 3ca390dacb

@ -82,6 +82,11 @@ To verify that FFTO is capturing traffic in Fast Forward mode:
```
4. Confirm that queries which were previously "invisible" in FF mode are now being recorded.
## 8. Limitations
- **Multi-packet query execution**: Very large queries that exceed the max buffer size will cause FFTO to bypass the session.
- **Binary Protocols**: FFTO currently focuses on the text-based query protocols; specialized binary protocols (like some X-Protocol features) may require future extensions.
## 8. Protocol Support
- **Text and Binary Protocols**: FFTO supports both standard text-based queries and the binary protocol used by prepared statements.
- **MySQL Binary Protocol**: Corrects captures `COM_STMT_PREPARE` and `COM_STMT_EXECUTE`, tracking statement IDs to their respective SQL text.
- **PostgreSQL Extended Query**: Supports the multi-phase `Parse` -> `Bind` -> `Execute` sequence by tracking Statement and Portal mappings.
## 9. Limitations
- **Large Payloads**: Packets exceeding the `*-ffto_max_buffer_size` threshold cause FFTO to be bypassed for that session.
- **X-Protocol**: Currently optimized for classic MySQL and PostgreSQL protocols.

@ -4,6 +4,8 @@
#include "TrafficObserver.hpp"
#include <vector>
#include <string>
#include <unordered_map>
#include <stdint.h>
class MySQL_Session;
@ -23,9 +25,9 @@ public:
private:
enum State {
IDLE,
AWAITING_PREPARE_OK,
AWAITING_RESULTSET,
READING_RESULTSET,
AWAITING_OK_ERR
READING_RESULTSET
};
MySQL_Session* m_session;
@ -34,8 +36,12 @@ private:
std::vector<char> m_server_buffer;
std::string m_current_query;
std::string m_pending_prepare_query;
unsigned long long m_query_start_time;
// Binary Protocol Tracking: statement_id -> query_text
std::unordered_map<uint32_t, std::string> m_statements;
void process_client_packet(const unsigned char* data, size_t len);
void process_server_packet(const unsigned char* data, size_t len);
void report_query_stats(const std::string& query, unsigned long long duration_us);

@ -4,6 +4,7 @@
#include "TrafficObserver.hpp"
#include <vector>
#include <string>
#include <unordered_map>
class PgSQL_Session;
@ -34,6 +35,10 @@ private:
std::string m_current_query;
unsigned long long m_query_start_time;
// Binary Protocol Tracking (PostgreSQL Extended Query)
std::unordered_map<std::string, std::string> m_statements; // name -> query
std::unordered_map<std::string, std::string> m_portals; // portal -> name
void process_client_message(char type, const unsigned char* payload, size_t len);
void process_server_message(char type, const unsigned char* payload, size_t len);
void report_query_stats(const std::string& query, unsigned long long duration_us);

@ -29,40 +29,30 @@ MySQLFFTO::~MySQLFFTO() {
void MySQLFFTO::on_client_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_client_buffer.insert(m_client_buffer.end(), buf, buf + len);
while (m_client_buffer.size() >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_client_buffer.data());
uint32_t pkt_len = hdr->pkt_length;
if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) {
break;
}
if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + sizeof(mysql_hdr);
process_client_packet(payload, pkt_len);
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + sizeof(mysql_hdr) + pkt_len);
}
}
void MySQLFFTO::on_server_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_server_buffer.insert(m_server_buffer.end(), buf, buf + len);
while (m_server_buffer.size() >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_server_buffer.data());
uint32_t pkt_len = hdr->pkt_length;
if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) {
break;
}
if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + sizeof(mysql_hdr);
process_server_packet(payload, pkt_len);
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + sizeof(mysql_hdr) + pkt_len);
}
}
@ -77,13 +67,25 @@ void MySQLFFTO::on_close() {
void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) {
if (len == 0) return;
uint8_t command = data[0];
if (command == _MYSQL_COM_QUERY || command == _MYSQL_COM_STMT_EXECUTE || command == _MYSQL_COM_STMT_PREPARE) {
if (len > 1) {
m_current_query = std::string(reinterpret_cast<const char*>(data + 1), len - 1);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESULTSET;
if (command == _MYSQL_COM_QUERY) {
m_current_query = std::string(reinterpret_cast<const char*>(data + 1), len - 1);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESULTSET;
} else if (command == _MYSQL_COM_STMT_PREPARE) {
m_pending_prepare_query = std::string(reinterpret_cast<const char*>(data + 1), len - 1);
m_state = AWAITING_PREPARE_OK;
} else if (command == _MYSQL_COM_STMT_EXECUTE) {
if (len >= 5) {
uint32_t stmt_id;
memcpy(&stmt_id, data + 1, 4); // Little-endian stmt_id
auto it = m_statements.find(stmt_id);
if (it != m_statements.end()) {
m_current_query = it->second;
m_query_start_time = monotonic_time();
m_state = AWAITING_RESULTSET;
}
}
} else if (command == _MYSQL_COM_QUIT) {
on_close();
@ -92,15 +94,19 @@ void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) {
void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
if (len == 0 || m_state == IDLE) return;
uint8_t first_byte = data[0];
if (m_state == AWAITING_RESULTSET || m_state == READING_RESULTSET) {
if (first_byte == 0x00 || first_byte == 0xFF) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
if (m_state == AWAITING_PREPARE_OK) {
if (first_byte == 0x00 && len >= 5) { // COM_STMT_PREPARE_OK
uint32_t stmt_id;
memcpy(&stmt_id, data + 1, 4);
m_statements[stmt_id] = m_pending_prepare_query;
m_state = IDLE;
} else if (first_byte == 0xFF) { // ERR
m_state = IDLE;
} else if (first_byte == 0xFE && len < 9) {
}
} else if (m_state == AWAITING_RESULTSET || m_state == READING_RESULTSET) {
if (first_byte == 0x00 || first_byte == 0xFF || (first_byte == 0xFE && len < 9)) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
m_state = IDLE;
@ -125,17 +131,14 @@ void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long
SQP_par_t qp;
memset(&qp, 0, sizeof(qp));
char* fst_cmnt = NULL;
char* digest_text = mysql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts);
if (digest_text) {
qp.digest_text = digest_text;
qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0);
char* ca = (char*)"";
if (mysql_thread___query_digests_track_hostname) {
if (m_session->client_myds && m_session->client_myds->addr.addr) {
ca = m_session->client_myds->addr.addr;
}
if (mysql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) {
ca = m_session->client_myds->addr.addr;
}
uint64_t hash2;
@ -150,7 +153,6 @@ void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long
myhash.Final(&qp.digest_total, &hash2);
GloMyQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, 0, 0);
free(digest_text);
}
if (fst_cmnt) free(fst_cmnt);

@ -27,7 +27,6 @@ PgSQLFFTO::~PgSQLFFTO() {
void PgSQLFFTO::on_client_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_client_buffer.insert(m_client_buffer.end(), buf, buf + len);
while (m_client_buffer.size() >= 5) {
@ -35,21 +34,16 @@ void PgSQLFFTO::on_client_data(const char* buf, size_t len) {
uint32_t msg_len;
memcpy(&msg_len, &m_client_buffer[1], 4);
msg_len = ntohl(msg_len);
if (m_client_buffer.size() < 1 + msg_len) {
break;
}
if (m_client_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + 5;
process_client_message(type, payload, msg_len - 4);
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + 1 + msg_len);
}
}
void PgSQLFFTO::on_server_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_server_buffer.insert(m_server_buffer.end(), buf, buf + len);
while (m_server_buffer.size() >= 5) {
@ -57,14 +51,10 @@ void PgSQLFFTO::on_server_data(const char* buf, size_t len) {
uint32_t msg_len;
memcpy(&msg_len, &m_server_buffer[1], 4);
msg_len = ntohl(msg_len);
if (m_server_buffer.size() < 1 + msg_len) {
break;
}
if (m_server_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + 5;
process_server_message(type, payload, msg_len - 4);
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + 1 + msg_len);
}
}
@ -79,20 +69,28 @@ void PgSQLFFTO::on_close() {
void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, size_t len) {
if (type == 'Q') { // Simple Query
if (len > 0) {
m_current_query = std::string(reinterpret_cast<const char*>(payload), len);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
}
} else if (type == 'P') { // Parse (Extended Query)
const char* query = reinterpret_cast<const char*>(payload);
while (*query != '\0' && (size_t)(query - reinterpret_cast<const char*>(payload)) < len) {
query++;
}
if (*query == '\0' && (size_t)(query + 1 - reinterpret_cast<const char*>(payload)) < len) {
m_current_query = std::string(query + 1);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
m_current_query = std::string(reinterpret_cast<const char*>(payload), len);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
} else if (type == 'P') { // Parse
std::string stmt_name = reinterpret_cast<const char*>(payload);
const char* query = reinterpret_cast<const char*>(payload) + stmt_name.length() + 1;
m_statements[stmt_name] = query;
} else if (type == 'B') { // Bind
std::string portal_name = reinterpret_cast<const char*>(payload);
const char* stmt_ptr = reinterpret_cast<const char*>(payload) + portal_name.length() + 1;
std::string stmt_name = stmt_ptr;
m_portals[portal_name] = stmt_name;
} else if (type == 'E') { // Execute
std::string portal_name = reinterpret_cast<const char*>(payload);
auto pit = m_portals.find(portal_name);
if (pit != m_portals.end()) {
auto sit = m_statements.find(pit->second);
if (sit != m_statements.end()) {
m_current_query = sit->second;
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
}
}
} else if (type == 'X') { // Terminate
on_close();
@ -101,7 +99,6 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload,
void PgSQLFFTO::process_server_message(char type, const unsigned char* payload, size_t len) {
if (m_state == IDLE) return;
if (type == 'C' || type == 'Z' || type == 'E') {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
@ -124,17 +121,14 @@ void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long
SQP_par_t qp;
memset(&qp, 0, sizeof(qp));
char* fst_cmnt = NULL;
char* digest_text = pgsql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts);
if (digest_text) {
qp.digest_text = digest_text;
qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0);
char* ca = (char*)"";
if (pgsql_thread___query_digests_track_hostname) {
if (m_session->client_myds && m_session->client_myds->addr.addr) {
ca = m_session->client_myds->addr.addr;
}
if (pgsql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) {
ca = m_session->client_myds->addr.addr;
}
uint64_t hash2;
@ -149,7 +143,6 @@ void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long
myhash.Final(&qp.digest_total, &hash2);
GloPgQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, 0, 0);
free(digest_text);
}
if (fst_cmnt) free(fst_cmnt);

Loading…
Cancel
Save