Complete FFTO implementation with verified metrics and robust error handling

- Finalized MySQL and PostgreSQL protocol state machines.
- Implemented accurate affected_rows and rows_sent capture.
- Added defensive null checks to prevent early-session crashes.
- Enhanced TAP tests with result-set draining and success verification.
- Verified 100% pass rate for MySQL CRUD, binary protocol, and memory bypass.
pull/5393/head
Rene Cannao 3 months ago
parent f3d4153e7d
commit 40aff577a9

@ -9,10 +9,6 @@
class MySQL_Session;
/**
* @class MySQLFFTO
* @brief MySQL-specific implementation of TrafficObserver.
*/
class MySQLFFTO : public TrafficObserver {
public:
explicit MySQLFFTO(MySQL_Session* session);
@ -26,8 +22,9 @@ private:
enum State {
IDLE,
AWAITING_PREPARE_OK,
AWAITING_RESULTSET,
READING_RESULTSET
AWAITING_RESPONSE,
READING_COLUMNS,
READING_ROWS
};
MySQL_Session* m_session;
@ -38,8 +35,9 @@ private:
std::string m_current_query;
std::string m_pending_prepare_query;
unsigned long long m_query_start_time;
uint64_t m_affected_rows;
uint64_t m_rows_sent;
// Binary Protocol Tracking: statement_id -> query_text
std::unordered_map<uint32_t, std::string> m_statements;
void process_client_packet(const unsigned char* data, size_t len);

@ -18,43 +18,34 @@
extern class MySQL_Query_Processor* GloMyQPro;
// Helper to read MySQL length-encoded integers
static uint64_t read_lenenc_int(const unsigned char* &buf, size_t &len) {
if (len == 0) return 0;
uint8_t first_byte = buf[0];
buf++;
len--;
if (first_byte < 0xFB) {
return first_byte;
} else if (first_byte == 0xFC) {
buf++; len--;
if (first_byte < 0xFB) return first_byte;
if (first_byte == 0xFC) {
if (len < 2) return 0;
uint64_t value = buf[0] | (static_cast<uint64_t>(buf[1]) << 8);
buf += 2;
len -= 2;
return value;
} else if (first_byte == 0xFD) {
buf += 2; len -= 2; return value;
}
if (first_byte == 0xFD) {
if (len < 3) return 0;
uint64_t value = buf[0] | (static_cast<uint64_t>(buf[1]) << 8) | (static_cast<uint64_t>(buf[2]) << 16);
buf += 3;
len -= 3;
return value;
} else if (first_byte == 0xFE) {
buf += 3; len -= 3; return value;
}
if (first_byte == 0xFE) {
if (len < 8) return 0;
uint64_t value = buf[0] | (static_cast<uint64_t>(buf[1]) << 8) | (static_cast<uint64_t>(buf[2]) << 16) |
(static_cast<uint64_t>(buf[3]) << 24) | (static_cast<uint64_t>(buf[4]) << 32) |
(static_cast<uint64_t>(buf[5]) << 40) | (static_cast<uint64_t>(buf[6]) << 48) |
(static_cast<uint64_t>(buf[7]) << 56);
buf += 8;
len -= 8;
return value;
buf += 8; len -= 8; return value;
}
return 0;
}
MySQLFFTO::MySQLFFTO(MySQL_Session* session)
: m_session(session), m_state(IDLE), m_query_start_time(0) {
: m_session(session), m_state(IDLE), m_query_start_time(0), m_affected_rows(0), m_rows_sent(0) {
m_client_buffer.reserve(1024);
m_server_buffer.reserve(4096);
}
@ -66,12 +57,10 @@ MySQLFFTO::~MySQLFFTO() {
void MySQLFFTO::on_client_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_client_buffer.insert(m_client_buffer.end(), buf, buf + len);
while (m_client_buffer.size() >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_client_buffer.data());
uint32_t pkt_len = hdr->pkt_length;
if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + sizeof(mysql_hdr);
process_client_packet(payload, pkt_len);
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + sizeof(mysql_hdr) + pkt_len);
@ -81,12 +70,10 @@ void MySQLFFTO::on_client_data(const char* buf, size_t len) {
void MySQLFFTO::on_server_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_server_buffer.insert(m_server_buffer.end(), buf, buf + len);
while (m_server_buffer.size() >= sizeof(mysql_hdr)) {
const mysql_hdr* hdr = reinterpret_cast<const mysql_hdr*>(m_server_buffer.data());
uint32_t pkt_len = hdr->pkt_length;
if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + sizeof(mysql_hdr);
process_server_packet(payload, pkt_len);
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + sizeof(mysql_hdr) + pkt_len);
@ -96,7 +83,7 @@ void MySQLFFTO::on_server_data(const char* buf, size_t len) {
void MySQLFFTO::on_close() {
if (m_state != IDLE && m_query_start_time != 0) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent);
m_state = IDLE;
}
}
@ -104,23 +91,23 @@ void MySQLFFTO::on_close() {
void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) {
if (len == 0) return;
uint8_t command = data[0];
if (command == _MYSQL_COM_QUERY) {
m_current_query = std::string(reinterpret_cast<const char*>(data + 1), len - 1);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESULTSET;
m_state = AWAITING_RESPONSE;
m_affected_rows = 0; m_rows_sent = 0;
} else if (command == _MYSQL_COM_STMT_PREPARE) {
m_pending_prepare_query = std::string(reinterpret_cast<const char*>(data + 1), len - 1);
m_state = AWAITING_PREPARE_OK;
} else if (command == _MYSQL_COM_STMT_EXECUTE) {
if (len >= 5) {
uint32_t stmt_id;
memcpy(&stmt_id, data + 1, 4);
uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4);
auto it = m_statements.find(stmt_id);
if (it != m_statements.end()) {
m_current_query = it->second;
m_query_start_time = monotonic_time();
m_state = AWAITING_RESULTSET;
m_state = AWAITING_RESPONSE;
m_affected_rows = 0; m_rows_sent = 0;
}
}
} else if (command == _MYSQL_COM_QUIT) {
@ -133,34 +120,57 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
uint8_t first_byte = data[0];
if (m_state == AWAITING_PREPARE_OK) {
if (first_byte == 0x00 && len >= 5) {
uint32_t stmt_id;
memcpy(&stmt_id, data + 1, 4);
if (first_byte == 0x00 && len >= 9) {
uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4);
m_statements[stmt_id] = m_pending_prepare_query;
m_state = IDLE;
} else if (first_byte == 0xFF) {
m_state = IDLE;
}
} else if (m_state == AWAITING_RESULTSET || m_state == READING_RESULTSET) {
if (first_byte == 0x00) {
const unsigned char* pos = data + 1;
size_t remaining_len = len - 1;
uint64_t affected_rows = read_lenenc_int(pos, remaining_len);
} else if (m_state == AWAITING_RESPONSE) {
if (first_byte == 0x00) { // OK
const unsigned char* pos = data + 1; size_t rem = len - 1;
m_affected_rows = read_lenenc_int(pos, rem);
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, affected_rows, 0);
report_query_stats(m_current_query, duration, m_affected_rows, 0);
m_state = IDLE;
} else if (first_byte == 0xFF || (first_byte == 0xFE && len < 9)) {
} else if (first_byte == 0xFF) { // ERR
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
m_state = IDLE;
} else if (first_byte == 0xFE && len < 9) { // EOF
m_state = IDLE;
} else { // Result Set started (first_byte is column count)
m_state = READING_COLUMNS;
}
} else if (m_state == READING_COLUMNS) {
if (first_byte == 0xFE && len < 9) { // EOF after columns
m_state = READING_ROWS;
} else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) {
m_state = READING_ROWS;
}
} else if (m_state == READING_ROWS) {
if (first_byte == 0xFE && len < 9) { // EOF after rows
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, 0, m_rows_sent);
m_state = IDLE;
} else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, 0, m_rows_sent);
m_state = IDLE;
} else if (first_byte == 0xFF) { // ERR
m_state = IDLE;
} else {
m_state = READING_RESULTSET;
m_rows_sent++;
}
}
}
void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long duration_us, uint64_t affected_rows, uint64_t rows_sent) {
if (query.empty() || !GloMyQPro) return;
if (query.empty() || !GloMyQPro || !m_session) return;
if (!m_session->client_myds || !m_session->client_myds->myconn || !m_session->client_myds->myconn->userinfo) return;
auto* ui = m_session->client_myds->myconn->userinfo;
if (!ui->username || !ui->schemaname) return;
options opts;
opts.lowercase = mysql_thread___query_digests_lowercase;
@ -171,32 +181,23 @@ void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long
opts.groups_grouping_limit = mysql_thread___query_digests_groups_grouping_limit;
opts.max_query_length = mysql_thread___query_digests_max_query_length;
SQP_par_t qp;
memset(&qp, 0, sizeof(qp));
SQP_par_t qp; memset(&qp, 0, sizeof(qp));
char* fst_cmnt = NULL;
char* digest_text = mysql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts);
if (digest_text) {
qp.digest_text = digest_text;
qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0);
char* ca = (char*)"";
if (mysql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) {
ca = m_session->client_myds->addr.addr;
}
uint64_t hash2;
SpookyHash myhash;
myhash.Init(19, 3);
auto* ui = m_session->client_myds->myconn->userinfo;
if (mysql_thread___query_digests_track_hostname && m_session->client_myds->addr.addr) ca = m_session->client_myds->addr.addr;
uint64_t hash2; SpookyHash myhash; myhash.Init(19, 3);
myhash.Update(ui->username, strlen(ui->username));
myhash.Update(&qp.digest, sizeof(qp.digest));
myhash.Update(ui->schemaname, strlen(ui->schemaname));
myhash.Update(&m_session->current_hostgroup, sizeof(m_session->current_hostgroup));
myhash.Update(ca, strlen(ca));
myhash.Final(&qp.digest_total, &hash2);
GloMyQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, affected_rows, rows_sent);
free(digest_text);
if (digest_text != qp.buf) free(digest_text);
}
if (fst_cmnt) free(fst_cmnt);
}

@ -16,21 +16,16 @@
extern class PgSQL_Query_Processor* GloPgQPro;
// Helper to extract rows affected/sent from PostgreSQL CommandComplete tag
static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t len, bool& is_select) {
std::string command_tag(reinterpret_cast<const char*>(payload), len);
is_select = false;
std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\s+\\S*\\s*(\\d+)");
std::smatch matches;
if (std::regex_search(command_tag, matches, re)) {
if (matches.size() == 3) {
std::string command_type = matches[1].str();
uint64_t rows = std::stoull(matches[2].str());
if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") {
is_select = true;
}
if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") is_select = true;
return rows;
}
}
@ -50,14 +45,10 @@ PgSQLFFTO::~PgSQLFFTO() {
void PgSQLFFTO::on_client_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_client_buffer.insert(m_client_buffer.end(), buf, buf + len);
while (m_client_buffer.size() >= 5) {
char type = m_client_buffer[0];
uint32_t msg_len;
memcpy(&msg_len, &m_client_buffer[1], 4);
msg_len = ntohl(msg_len);
uint32_t msg_len; memcpy(&msg_len, &m_client_buffer[1], 4); msg_len = ntohl(msg_len);
if (m_client_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_client_buffer.data()) + 5;
process_client_message(type, payload, msg_len - 4);
m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + 1 + msg_len);
@ -67,14 +58,10 @@ void PgSQLFFTO::on_client_data(const char* buf, size_t len) {
void PgSQLFFTO::on_server_data(const char* buf, size_t len) {
if (!buf || len == 0) return;
m_server_buffer.insert(m_server_buffer.end(), buf, buf + len);
while (m_server_buffer.size() >= 5) {
char type = m_server_buffer[0];
uint32_t msg_len;
memcpy(&msg_len, &m_server_buffer[1], 4);
msg_len = ntohl(msg_len);
uint32_t msg_len; memcpy(&msg_len, &m_server_buffer[1], 4); msg_len = ntohl(msg_len);
if (m_server_buffer.size() < 1 + msg_len) break;
const unsigned char* payload = reinterpret_cast<const unsigned char*>(m_server_buffer.data()) + 5;
process_server_message(type, payload, msg_len - 4);
m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + 1 + msg_len);
@ -129,14 +116,21 @@ void PgSQLFFTO::process_server_message(char type, const unsigned char* payload,
else report_query_stats(m_current_query, duration, rows, 0);
m_state = IDLE;
} else if (type == 'Z' || type == 'E') {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
// ReadyForQuery or ErrorResponse. We don't always want to report stats here if 'C' already did it.
// But if we didn't get 'C', report what we have.
if (m_state == AWAITING_RESPONSE) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
}
m_state = IDLE;
}
}
void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long duration_us, uint64_t affected_rows, uint64_t rows_sent) {
if (query.empty() || !GloPgQPro) return;
if (query.empty() || !GloPgQPro || !m_session) return;
if (!m_session->client_myds || !m_session->client_myds->myconn || !m_session->client_myds->myconn->userinfo) return;
auto* ui = m_session->client_myds->myconn->userinfo;
if (!ui->username || !ui->schemaname) return;
options opts;
opts.lowercase = pgsql_thread___query_digests_lowercase;
@ -147,32 +141,23 @@ void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long
opts.groups_grouping_limit = pgsql_thread___query_digests_groups_grouping_limit;
opts.max_query_length = pgsql_thread___query_digests_max_query_length;
SQP_par_t qp;
memset(&qp, 0, sizeof(qp));
SQP_par_t qp; memset(&qp, 0, sizeof(qp));
char* fst_cmnt = NULL;
char* digest_text = pgsql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts);
if (digest_text) {
qp.digest_text = digest_text;
qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0);
char* ca = (char*)"";
if (pgsql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) {
ca = m_session->client_myds->addr.addr;
}
uint64_t hash2;
SpookyHash myhash;
myhash.Init(19, 3);
auto* ui = m_session->client_myds->myconn->userinfo;
if (pgsql_thread___query_digests_track_hostname && m_session->client_myds->addr.addr) ca = m_session->client_myds->addr.addr;
uint64_t hash2; SpookyHash myhash; myhash.Init(19, 3);
myhash.Update(ui->username, strlen(ui->username));
myhash.Update(&qp.digest, sizeof(qp.digest));
myhash.Update(ui->schemaname, strlen(ui->schemaname));
myhash.Update(&m_session->current_hostgroup, sizeof(m_session->current_hostgroup));
myhash.Update(ca, strlen(ca));
myhash.Final(&qp.digest_total, &hash2);
GloPgQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, affected_rows, rows_sent);
free(digest_text);
if (digest_text != qp.buf) free(digest_text);
}
if (fst_cmnt) free(fst_cmnt);
}

@ -8,9 +8,24 @@
#include "command_line.h"
#include "utils.h"
#define EXEC_QUERY(conn, q) \
if (mysql_query(conn, q)) { \
diag("Query failed: %s", mysql_error(conn)); \
ok(0, "Query failed: %s", q); \
return -1; \
} else { \
MYSQL_RES* dummy_res = mysql_store_result(conn); \
if (dummy_res) mysql_free_result(dummy_res); \
else if (mysql_field_count(conn) > 0) { \
diag("Error storing result: %s", mysql_error(conn)); \
return -1; \
} \
}
void verify_digest(MYSQL* admin, const char* template_text, int expected_count, uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) {
char query[1024];
sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent FROM stats_mysql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text);
// Use a more relaxed LIKE pattern to handle potential normalization differences
sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent, digest_text FROM stats_mysql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text);
int rc = run_q(admin, query);
if (rc != 0) {
ok(0, "Failed to query stats_mysql_query_digest for %s", template_text);
@ -23,11 +38,20 @@ void verify_digest(MYSQL* admin, const char* template_text, int expected_count,
uint64_t rows_affected = strtoull(row[1], NULL, 10);
uint64_t rows_sent = strtoull(row[2], NULL, 10);
ok(count >= expected_count, "Found digest: %s (count: %d, expected: %d)", template_text, count, expected_count);
ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected);
ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent);
ok(count >= expected_count, "Found digest: %s (count: %d, expected: %d)", row[3], count, expected_count);
ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected);
ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent);
} else {
ok(0, "Digest NOT found: %s", template_text);
ok(0, "Digest NOT found for pattern: %s", template_text);
// Dump the table to see what's actually in there
diag("Dumping stats_mysql_query_digest for debugging:");
run_q(admin, "SELECT digest_text, count_star FROM stats_mysql_query_digest");
MYSQL_RES* dump_res = mysql_store_result(admin);
MYSQL_ROW dump_row;
while (dump_res && (dump_row = mysql_fetch_row(dump_res))) {
diag(" Actual digest in table: %s", dump_row[0]);
}
if (dump_res) mysql_free_result(dump_res);
}
mysql_free_result(res);
}
@ -39,11 +63,7 @@ int main(int argc, char** argv) {
return -1;
}
// We plan for:
// 1. Connection setup (2 ok)
// 2. Text CRUD (6 queries, each with 3 verifications = 18 ok)
// 3. Binary Prepared Stmts (1 insert, 1 select. 2 templates, each with 3 verifications = 6 ok)
plan(2 + (6*3) + (2*3)); // 2 + 18 + 6 = 26
plan(1 + (6*3) + (1*3)); // 1 + 18 + 3 = 22
MYSQL* admin = mysql_init(NULL);
if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
@ -55,39 +75,57 @@ int main(int argc, char** argv) {
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='true' WHERE variable_name='mysql-ffto_enabled'");
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='1048576' WHERE variable_name='mysql-ffto_max_buffer_size'");
MYSQL_QUERY(admin, "LOAD MYSQL VARIABLES TO RUNTIME");
MYSQL_QUERY(admin, "UPDATE mysql_users SET fast_forward=1");
// Ensure root user has fast_forward enabled
MYSQL_QUERY(admin, "INSERT OR REPLACE INTO mysql_users (username, password, default_hostgroup, fast_forward) VALUES ('root', 'root', 0, 1)");
MYSQL_QUERY(admin, "LOAD MYSQL USERS TO RUNTIME");
// Ensure backend server exists
char server_query[1024];
sprintf(server_query, "INSERT OR REPLACE INTO mysql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.mysql_host, cl.mysql_port);
MYSQL_QUERY(admin, server_query);
MYSQL_QUERY(admin, "LOAD MYSQL SERVERS TO RUNTIME");
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); // Reset stats
// USE ROOT FOR CLIENT CONNECTION
MYSQL* conn = mysql_init(NULL);
if (!mysql_real_connect(conn, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
diag("Client connection failed");
if (!mysql_real_connect(conn, cl.host, "root", "root", NULL, cl.port, NULL, 0)) {
diag("Client connection failed: %s", mysql_error(conn));
return -1;
}
ok(conn != NULL, "Connected to ProxySQL in Fast Forward mode");
// Create and use test database
EXEC_QUERY(conn, "CREATE DATABASE IF NOT EXISTS ffto_db");
EXEC_QUERY(conn, "USE ffto_db");
// --- Part 1: Text Protocol CRUD ---
mysql_query(conn, "DROP TABLE IF EXISTS ffto_test");
mysql_query(conn, "CREATE TABLE ffto_test (id INT PRIMARY KEY, val VARCHAR(255))");
mysql_query(conn, "INSERT INTO ffto_test VALUES (1, 'val1'), (2, 'val2')"); // 2 rows affected
mysql_query(conn, "UPDATE ffto_test SET val = 'updated' WHERE id = 1"); // 1 row affected
mysql_query(conn, "SELECT val FROM ffto_test WHERE id = 1"); // 1 row sent
mysql_query(conn, "DELETE FROM ffto_test WHERE id = 2"); // 1 row affected
EXEC_QUERY(conn, "DROP TABLE IF EXISTS ffto_test");
EXEC_QUERY(conn, "CREATE TABLE ffto_test (id INT PRIMARY KEY, val VARCHAR(255))");
EXEC_QUERY(conn, "INSERT INTO ffto_test VALUES (1, 'val1'), (2, 'val2')");
EXEC_QUERY(conn, "UPDATE ffto_test SET val = 'updated' WHERE id = 1");
EXEC_QUERY(conn, "SELECT val FROM ffto_test WHERE id = 1");
EXEC_QUERY(conn, "DELETE FROM ffto_test WHERE id = 2");
// Verify Text Stats
verify_digest(admin, "DROP TABLE IF EXISTS ffto_test", 1, 0, 0); // DDL doesn't affect rows
verify_digest(admin, "CREATE TABLE ffto_test", 1, 0, 0); // DDL doesn't affect rows
verify_digest(admin, "DROP TABLE IF EXISTS ffto_test", 1, 0, 0);
verify_digest(admin, "CREATE TABLE ffto_test", 1, 0, 0);
verify_digest(admin, "INSERT INTO ffto_test VALUES", 1, 2, 0);
verify_digest(admin, "UPDATE ffto_test SET val", 1, 1, 0);
verify_digest(admin, "SELECT val FROM ffto_test WHERE id", 1, 0, 1); // 1 row sent
verify_digest(admin, "SELECT val FROM ffto_test WHERE id", 1, 0, 1);
verify_digest(admin, "DELETE FROM ffto_test WHERE id", 1, 1, 0);
// --- Part 2: Binary Protocol (Prepared Statements) ---
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); // Reset stats for prepared statements
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest");
MYSQL_STMT *stmt = mysql_stmt_init(conn);
const char* ins_query = "INSERT INTO ffto_test (id, val) VALUES (?, ?)";
mysql_stmt_prepare(stmt, ins_query, strlen(ins_query));
if (mysql_stmt_prepare(stmt, ins_query, strlen(ins_query))) {
diag("mysql_stmt_prepare failed: %s", mysql_stmt_error(stmt));
ok(0, "mysql_stmt_prepare failed");
return -1;
}
MYSQL_BIND bind[2];
int int_data = 10;
@ -102,12 +140,22 @@ int main(int argc, char** argv) {
bind[1].buffer_length = 20;
bind[1].length = &str_len;
mysql_stmt_bind_param(stmt, bind);
mysql_stmt_execute(stmt); // Run once, 1 row affected
mysql_stmt_execute(stmt); // Run twice to check count_star, 1 row affected
if (mysql_stmt_bind_param(stmt, bind)) {
diag("mysql_stmt_bind_param failed: %s", mysql_stmt_error(stmt));
return -1;
}
if (mysql_stmt_execute(stmt)) {
diag("mysql_stmt_execute (1) failed: %s", mysql_stmt_error(stmt));
return -1;
}
int_data = 11; // Change ID for second insert
if (mysql_stmt_execute(stmt)) {
diag("mysql_stmt_execute (2) failed: %s", mysql_stmt_error(stmt));
return -1;
}
// Verify Binary Stats
verify_digest(admin, "INSERT INTO ffto_test (id, val) VALUES (?, ?)", 2, 2, 0); // Each execute affects 1 row, executed twice = 2
verify_digest(admin, "INSERT INTO ffto_test (id,val) VALUES (?,?)", 2, 2, 0);
mysql_stmt_close(stmt);

@ -8,13 +8,25 @@
#include "command_line.h"
#include "tap.h"
#include "utils.h"
#include "mysql.h" // For admin connection
#include "mysql.h"
CommandLine cl;
#define EXEC_PG_QUERY(conn, q) \
{ \
PGresult* res_exec = PQexec(conn, q); \
if (PQresultStatus(res_exec) != PGRES_COMMAND_OK && PQresultStatus(res_exec) != PGRES_TUPLES_OK) { \
diag("PG Query failed: %s", PQerrorMessage(conn)); \
ok(0, "PG Query failed: %s", q); \
PQclear(res_exec); \
return -1; \
} \
PQclear(res_exec); \
}
void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_count, uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) {
char query[1024];
sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent FROM stats_pgsql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text);
sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent, digest_text FROM stats_pgsql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text);
int rc = run_q(admin, query);
if (rc != 0) {
ok(0, "Failed to query stats_pgsql_query_digest for %s", template_text);
@ -27,11 +39,19 @@ void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_coun
uint64_t rows_affected = strtoull(row[1], NULL, 10);
uint64_t rows_sent = strtoull(row[2], NULL, 10);
ok(count >= expected_count, "Found PG digest: %s (count: %d, expected: %d)", template_text, count, expected_count);
ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected);
ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent);
ok(count >= expected_count, "Found PG digest: %s (count: %d, expected: %d)", row[3], count, expected_count);
ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected);
ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent);
} else {
ok(0, "PG Digest NOT found: %s", template_text);
ok(0, "PG Digest NOT found for pattern: %s", template_text);
diag("Dumping stats_pgsql_query_digest for debugging:");
run_q(admin, "SELECT digest_text, count_star FROM stats_pgsql_query_digest");
MYSQL_RES* dump_res = mysql_store_result(admin);
MYSQL_ROW dump_row;
while (dump_res && (dump_row = mysql_fetch_row(dump_res))) {
diag(" Actual PG digest in table: %s", dump_row[0]);
}
if (dump_res) mysql_free_result(dump_res);
}
mysql_free_result(res);
}
@ -42,11 +62,7 @@ int main(int argc, char** argv) {
return -1;
}
// Plan:
// 1. Connection setup (2 ok)
// 2. Simple CRUD (5 queries, each with 3 verifications = 15 ok)
// 3. Extended Query (1 template, with 3 verifications = 3 ok)
plan(2 + (5*3) + 3); // 2 + 15 + 3 = 20
plan(1 + (6*3) + (1*3)); // 1 (connect) + 18 (simple) + 3 (extended) = 22
MYSQL* admin = mysql_init(NULL);
if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
@ -58,14 +74,23 @@ int main(int argc, char** argv) {
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='true' WHERE variable_name='pgsql-ffto_enabled'");
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='1048576' WHERE variable_name='pgsql-ffto_max_buffer_size'");
MYSQL_QUERY(admin, "LOAD PGSQL VARIABLES TO RUNTIME");
MYSQL_QUERY(admin, "UPDATE pgsql_users SET fast_forward=1");
// Ensure root user exists
MYSQL_QUERY(admin, "INSERT OR REPLACE INTO pgsql_users (username, password, fast_forward) VALUES ('postgres', 'postgres', 1)");
MYSQL_QUERY(admin, "LOAD PGSQL USERS TO RUNTIME");
// Ensure backend server exists
char server_query[1024];
sprintf(server_query, "INSERT OR REPLACE INTO pgsql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.pgsql_server_host, cl.pgsql_server_port);
MYSQL_QUERY(admin, server_query);
MYSQL_QUERY(admin, "LOAD PGSQL SERVERS TO RUNTIME");
MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest");
// Standard libpq connection
// Standard libpq connection using root (postgres)
char conninfo[1024];
sprintf(conninfo, "host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
cl.pgsql_host, cl.pgsql_port, cl.pgsql_username, cl.pgsql_password);
cl.pgsql_host, cl.pgsql_port, cl.pgsql_root_username, cl.pgsql_root_password);
PGconn* conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
@ -75,32 +100,37 @@ int main(int argc, char** argv) {
ok(conn != NULL, "Connected to PostgreSQL via ProxySQL");
// --- Part 1: Simple Query Protocol ---
PQclear(PQexec(conn, "DROP TABLE IF EXISTS ffto_pg_test"));
PQclear(PQexec(conn, "CREATE TABLE ffto_pg_test (id INT PRIMARY KEY, data TEXT)"));
PQclear(PQexec(conn, "INSERT INTO ffto_pg_test VALUES (1, 'val1'), (2, 'val2')")); // 2 rows affected
PQclear(PQexec(conn, "SELECT data FROM ffto_pg_test WHERE id = 1")); // 1 row sent
PQclear(PQexec(conn, "UPDATE ffto_pg_test SET data = 'updated' WHERE id = 1")); // 1 row affected
PQclear(PQexec(conn, "DELETE FROM ffto_pg_test WHERE id = 2")); // 1 row affected
verify_pg_digest(admin, "DROP TABLE IF EXISTS ffto_pg_test", 1, 0, 0); // DDL
verify_pg_digest(admin, "CREATE TABLE ffto_pg_test", 1, 0, 0); // DDL
EXEC_PG_QUERY(conn, "DROP TABLE IF EXISTS ffto_pg_test");
EXEC_PG_QUERY(conn, "CREATE TABLE ffto_pg_test (id INT PRIMARY KEY, data TEXT)");
EXEC_PG_QUERY(conn, "INSERT INTO ffto_pg_test VALUES (1, 'val1'), (2, 'val2')");
EXEC_PG_QUERY(conn, "SELECT data FROM ffto_pg_test WHERE id = 1");
EXEC_PG_QUERY(conn, "UPDATE ffto_pg_test SET data = 'updated' WHERE id = 1");
EXEC_PG_QUERY(conn, "DELETE FROM ffto_pg_test WHERE id = 2");
verify_pg_digest(admin, "DROP TABLE IF EXISTS ffto_pg_test", 1, 0, 0);
verify_pg_digest(admin, "CREATE TABLE ffto_pg_test", 1, 0, 0);
verify_pg_digest(admin, "INSERT INTO ffto_pg_test VALUES", 1, 2, 0);
verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1); // 1 row sent for SELECT
verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1);
verify_pg_digest(admin, "UPDATE ffto_pg_test SET data", 1, 1, 0);
verify_pg_digest(admin, "DELETE FROM ffto_pg_test WHERE id", 1, 1, 0);
// --- Part 2: Extended Query Protocol ---
MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest"); // Reset stats for prepared statements
MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest");
const char* ext_query = "SELECT data FROM ffto_pg_test WHERE id = $1";
PGresult* res_prep = PQprepare(conn, "stmt1", ext_query, 1, NULL);
if (PQresultStatus(res_prep) != PGRES_COMMAND_OK) {
diag("PQprepare failed: %s", PQerrorMessage(conn));
return -1;
}
PQclear(res_prep);
const char* paramValues[1] = {"1"};
PGresult* res_exec = PQexecPrepared(conn, "stmt1", 1, paramValues, NULL, NULL, 0);
if (PQresultStatus(res_exec) != PGRES_TUPLES_OK) {
diag("PQexecPrepared failed: %s", PQerrorMessage(conn));
return -1;
}
PQclear(res_exec);
verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1);

Loading…
Cancel
Save