diff --git a/include/MySQLFFTO.hpp b/include/MySQLFFTO.hpp index e2a2efcbf..af83b4386 100644 --- a/include/MySQLFFTO.hpp +++ b/include/MySQLFFTO.hpp @@ -9,10 +9,6 @@ class MySQL_Session; -/** - * @class MySQLFFTO - * @brief MySQL-specific implementation of TrafficObserver. - */ class MySQLFFTO : public TrafficObserver { public: explicit MySQLFFTO(MySQL_Session* session); @@ -26,8 +22,9 @@ private: enum State { IDLE, AWAITING_PREPARE_OK, - AWAITING_RESULTSET, - READING_RESULTSET + AWAITING_RESPONSE, + READING_COLUMNS, + READING_ROWS }; MySQL_Session* m_session; @@ -38,8 +35,9 @@ private: std::string m_current_query; std::string m_pending_prepare_query; unsigned long long m_query_start_time; + uint64_t m_affected_rows; + uint64_t m_rows_sent; - // Binary Protocol Tracking: statement_id -> query_text std::unordered_map m_statements; void process_client_packet(const unsigned char* data, size_t len); diff --git a/lib/MySQLFFTO.cpp b/lib/MySQLFFTO.cpp index 72bb71a8e..5da1fc1cd 100644 --- a/lib/MySQLFFTO.cpp +++ b/lib/MySQLFFTO.cpp @@ -18,43 +18,34 @@ extern class MySQL_Query_Processor* GloMyQPro; -// Helper to read MySQL length-encoded integers static uint64_t read_lenenc_int(const unsigned char* &buf, size_t &len) { if (len == 0) return 0; - uint8_t first_byte = buf[0]; - buf++; - len--; - - if (first_byte < 0xFB) { - return first_byte; - } else if (first_byte == 0xFC) { + buf++; len--; + if (first_byte < 0xFB) return first_byte; + if (first_byte == 0xFC) { if (len < 2) return 0; uint64_t value = buf[0] | (static_cast(buf[1]) << 8); - buf += 2; - len -= 2; - return value; - } else if (first_byte == 0xFD) { + buf += 2; len -= 2; return value; + } + if (first_byte == 0xFD) { if (len < 3) return 0; uint64_t value = buf[0] | (static_cast(buf[1]) << 8) | (static_cast(buf[2]) << 16); - buf += 3; - len -= 3; - return value; - } else if (first_byte == 0xFE) { + buf += 3; len -= 3; return value; + } + if (first_byte == 0xFE) { if (len < 8) return 0; uint64_t value = buf[0] | (static_cast(buf[1]) << 8) | (static_cast(buf[2]) << 16) | (static_cast(buf[3]) << 24) | (static_cast(buf[4]) << 32) | (static_cast(buf[5]) << 40) | (static_cast(buf[6]) << 48) | (static_cast(buf[7]) << 56); - buf += 8; - len -= 8; - return value; + buf += 8; len -= 8; return value; } return 0; } MySQLFFTO::MySQLFFTO(MySQL_Session* session) - : m_session(session), m_state(IDLE), m_query_start_time(0) { + : m_session(session), m_state(IDLE), m_query_start_time(0), m_affected_rows(0), m_rows_sent(0) { m_client_buffer.reserve(1024); m_server_buffer.reserve(4096); } @@ -66,12 +57,10 @@ MySQLFFTO::~MySQLFFTO() { void MySQLFFTO::on_client_data(const char* buf, size_t len) { if (!buf || len == 0) return; m_client_buffer.insert(m_client_buffer.end(), buf, buf + len); - while (m_client_buffer.size() >= sizeof(mysql_hdr)) { const mysql_hdr* hdr = reinterpret_cast(m_client_buffer.data()); uint32_t pkt_len = hdr->pkt_length; if (m_client_buffer.size() < sizeof(mysql_hdr) + pkt_len) break; - const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + sizeof(mysql_hdr); process_client_packet(payload, pkt_len); m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + sizeof(mysql_hdr) + pkt_len); @@ -81,12 +70,10 @@ void MySQLFFTO::on_client_data(const char* buf, size_t len) { void MySQLFFTO::on_server_data(const char* buf, size_t len) { if (!buf || len == 0) return; m_server_buffer.insert(m_server_buffer.end(), buf, buf + len); - while (m_server_buffer.size() >= sizeof(mysql_hdr)) { const mysql_hdr* hdr = reinterpret_cast(m_server_buffer.data()); uint32_t pkt_len = hdr->pkt_length; if (m_server_buffer.size() < sizeof(mysql_hdr) + pkt_len) break; - const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + sizeof(mysql_hdr); process_server_packet(payload, pkt_len); m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + sizeof(mysql_hdr) + pkt_len); @@ -96,7 +83,7 @@ void MySQLFFTO::on_server_data(const char* buf, size_t len) { void MySQLFFTO::on_close() { if (m_state != IDLE && m_query_start_time != 0) { unsigned long long duration = monotonic_time() - m_query_start_time; - report_query_stats(m_current_query, duration); + report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent); m_state = IDLE; } } @@ -104,23 +91,23 @@ void MySQLFFTO::on_close() { void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) { if (len == 0) return; uint8_t command = data[0]; - if (command == _MYSQL_COM_QUERY) { m_current_query = std::string(reinterpret_cast(data + 1), len - 1); m_query_start_time = monotonic_time(); - m_state = AWAITING_RESULTSET; + m_state = AWAITING_RESPONSE; + m_affected_rows = 0; m_rows_sent = 0; } else if (command == _MYSQL_COM_STMT_PREPARE) { m_pending_prepare_query = std::string(reinterpret_cast(data + 1), len - 1); m_state = AWAITING_PREPARE_OK; } else if (command == _MYSQL_COM_STMT_EXECUTE) { if (len >= 5) { - uint32_t stmt_id; - memcpy(&stmt_id, data + 1, 4); + uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4); auto it = m_statements.find(stmt_id); if (it != m_statements.end()) { m_current_query = it->second; m_query_start_time = monotonic_time(); - m_state = AWAITING_RESULTSET; + m_state = AWAITING_RESPONSE; + m_affected_rows = 0; m_rows_sent = 0; } } } else if (command == _MYSQL_COM_QUIT) { @@ -133,34 +120,57 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { uint8_t first_byte = data[0]; if (m_state == AWAITING_PREPARE_OK) { - if (first_byte == 0x00 && len >= 5) { - uint32_t stmt_id; - memcpy(&stmt_id, data + 1, 4); + if (first_byte == 0x00 && len >= 9) { + uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4); m_statements[stmt_id] = m_pending_prepare_query; m_state = IDLE; } else if (first_byte == 0xFF) { m_state = IDLE; } - } else if (m_state == AWAITING_RESULTSET || m_state == READING_RESULTSET) { - if (first_byte == 0x00) { - const unsigned char* pos = data + 1; - size_t remaining_len = len - 1; - uint64_t affected_rows = read_lenenc_int(pos, remaining_len); + } else if (m_state == AWAITING_RESPONSE) { + if (first_byte == 0x00) { // OK + const unsigned char* pos = data + 1; size_t rem = len - 1; + m_affected_rows = read_lenenc_int(pos, rem); unsigned long long duration = monotonic_time() - m_query_start_time; - report_query_stats(m_current_query, duration, affected_rows, 0); + report_query_stats(m_current_query, duration, m_affected_rows, 0); m_state = IDLE; - } else if (first_byte == 0xFF || (first_byte == 0xFE && len < 9)) { + } else if (first_byte == 0xFF) { // ERR unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration); m_state = IDLE; + } else if (first_byte == 0xFE && len < 9) { // EOF + m_state = IDLE; + } else { // Result Set started (first_byte is column count) + m_state = READING_COLUMNS; + } + } else if (m_state == READING_COLUMNS) { + if (first_byte == 0xFE && len < 9) { // EOF after columns + m_state = READING_ROWS; + } else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) { + m_state = READING_ROWS; + } + } else if (m_state == READING_ROWS) { + if (first_byte == 0xFE && len < 9) { // EOF after rows + unsigned long long duration = monotonic_time() - m_query_start_time; + report_query_stats(m_current_query, duration, 0, m_rows_sent); + m_state = IDLE; + } else if (first_byte == 0x00 && len >= 7 && (m_session->client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF)) { + unsigned long long duration = monotonic_time() - m_query_start_time; + report_query_stats(m_current_query, duration, 0, m_rows_sent); + m_state = IDLE; + } else if (first_byte == 0xFF) { // ERR + m_state = IDLE; } else { - m_state = READING_RESULTSET; + m_rows_sent++; } } } void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long duration_us, uint64_t affected_rows, uint64_t rows_sent) { - if (query.empty() || !GloMyQPro) return; + if (query.empty() || !GloMyQPro || !m_session) return; + if (!m_session->client_myds || !m_session->client_myds->myconn || !m_session->client_myds->myconn->userinfo) return; + auto* ui = m_session->client_myds->myconn->userinfo; + if (!ui->username || !ui->schemaname) return; options opts; opts.lowercase = mysql_thread___query_digests_lowercase; @@ -171,32 +181,23 @@ void MySQLFFTO::report_query_stats(const std::string& query, unsigned long long opts.groups_grouping_limit = mysql_thread___query_digests_groups_grouping_limit; opts.max_query_length = mysql_thread___query_digests_max_query_length; - SQP_par_t qp; - memset(&qp, 0, sizeof(qp)); + SQP_par_t qp; memset(&qp, 0, sizeof(qp)); char* fst_cmnt = NULL; char* digest_text = mysql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts); - if (digest_text) { qp.digest_text = digest_text; qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0); char* ca = (char*)""; - if (mysql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) { - ca = m_session->client_myds->addr.addr; - } - - uint64_t hash2; - SpookyHash myhash; - myhash.Init(19, 3); - auto* ui = m_session->client_myds->myconn->userinfo; + if (mysql_thread___query_digests_track_hostname && m_session->client_myds->addr.addr) ca = m_session->client_myds->addr.addr; + uint64_t hash2; SpookyHash myhash; myhash.Init(19, 3); myhash.Update(ui->username, strlen(ui->username)); myhash.Update(&qp.digest, sizeof(qp.digest)); myhash.Update(ui->schemaname, strlen(ui->schemaname)); myhash.Update(&m_session->current_hostgroup, sizeof(m_session->current_hostgroup)); myhash.Update(ca, strlen(ca)); myhash.Final(&qp.digest_total, &hash2); - GloMyQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, affected_rows, rows_sent); - free(digest_text); + if (digest_text != qp.buf) free(digest_text); } if (fst_cmnt) free(fst_cmnt); } diff --git a/lib/PgSQLFFTO.cpp b/lib/PgSQLFFTO.cpp index 909dbe11d..7d2241a86 100644 --- a/lib/PgSQLFFTO.cpp +++ b/lib/PgSQLFFTO.cpp @@ -16,21 +16,16 @@ extern class PgSQL_Query_Processor* GloPgQPro; -// Helper to extract rows affected/sent from PostgreSQL CommandComplete tag static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t len, bool& is_select) { std::string command_tag(reinterpret_cast(payload), len); is_select = false; - std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\s+\\S*\\s*(\\d+)"); std::smatch matches; - if (std::regex_search(command_tag, matches, re)) { if (matches.size() == 3) { std::string command_type = matches[1].str(); uint64_t rows = std::stoull(matches[2].str()); - if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") { - is_select = true; - } + if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") is_select = true; return rows; } } @@ -50,14 +45,10 @@ PgSQLFFTO::~PgSQLFFTO() { void PgSQLFFTO::on_client_data(const char* buf, size_t len) { if (!buf || len == 0) return; m_client_buffer.insert(m_client_buffer.end(), buf, buf + len); - while (m_client_buffer.size() >= 5) { char type = m_client_buffer[0]; - uint32_t msg_len; - memcpy(&msg_len, &m_client_buffer[1], 4); - msg_len = ntohl(msg_len); + uint32_t msg_len; memcpy(&msg_len, &m_client_buffer[1], 4); msg_len = ntohl(msg_len); if (m_client_buffer.size() < 1 + msg_len) break; - const unsigned char* payload = reinterpret_cast(m_client_buffer.data()) + 5; process_client_message(type, payload, msg_len - 4); m_client_buffer.erase(m_client_buffer.begin(), m_client_buffer.begin() + 1 + msg_len); @@ -67,14 +58,10 @@ void PgSQLFFTO::on_client_data(const char* buf, size_t len) { void PgSQLFFTO::on_server_data(const char* buf, size_t len) { if (!buf || len == 0) return; m_server_buffer.insert(m_server_buffer.end(), buf, buf + len); - while (m_server_buffer.size() >= 5) { char type = m_server_buffer[0]; - uint32_t msg_len; - memcpy(&msg_len, &m_server_buffer[1], 4); - msg_len = ntohl(msg_len); + uint32_t msg_len; memcpy(&msg_len, &m_server_buffer[1], 4); msg_len = ntohl(msg_len); if (m_server_buffer.size() < 1 + msg_len) break; - const unsigned char* payload = reinterpret_cast(m_server_buffer.data()) + 5; process_server_message(type, payload, msg_len - 4); m_server_buffer.erase(m_server_buffer.begin(), m_server_buffer.begin() + 1 + msg_len); @@ -129,14 +116,21 @@ void PgSQLFFTO::process_server_message(char type, const unsigned char* payload, else report_query_stats(m_current_query, duration, rows, 0); m_state = IDLE; } else if (type == 'Z' || type == 'E') { - unsigned long long duration = monotonic_time() - m_query_start_time; - report_query_stats(m_current_query, duration); + // ReadyForQuery or ErrorResponse. We don't always want to report stats here if 'C' already did it. + // But if we didn't get 'C', report what we have. + if (m_state == AWAITING_RESPONSE) { + unsigned long long duration = monotonic_time() - m_query_start_time; + report_query_stats(m_current_query, duration); + } m_state = IDLE; } } void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long duration_us, uint64_t affected_rows, uint64_t rows_sent) { - if (query.empty() || !GloPgQPro) return; + if (query.empty() || !GloPgQPro || !m_session) return; + if (!m_session->client_myds || !m_session->client_myds->myconn || !m_session->client_myds->myconn->userinfo) return; + auto* ui = m_session->client_myds->myconn->userinfo; + if (!ui->username || !ui->schemaname) return; options opts; opts.lowercase = pgsql_thread___query_digests_lowercase; @@ -147,32 +141,23 @@ void PgSQLFFTO::report_query_stats(const std::string& query, unsigned long long opts.groups_grouping_limit = pgsql_thread___query_digests_groups_grouping_limit; opts.max_query_length = pgsql_thread___query_digests_max_query_length; - SQP_par_t qp; - memset(&qp, 0, sizeof(qp)); + SQP_par_t qp; memset(&qp, 0, sizeof(qp)); char* fst_cmnt = NULL; char* digest_text = pgsql_query_digest_and_first_comment(query.c_str(), query.length(), &fst_cmnt, qp.buf, &opts); - if (digest_text) { qp.digest_text = digest_text; qp.digest = SpookyHash::Hash64(digest_text, strlen(digest_text), 0); char* ca = (char*)""; - if (pgsql_thread___query_digests_track_hostname && m_session->client_myds && m_session->client_myds->addr.addr) { - ca = m_session->client_myds->addr.addr; - } - - uint64_t hash2; - SpookyHash myhash; - myhash.Init(19, 3); - auto* ui = m_session->client_myds->myconn->userinfo; + if (pgsql_thread___query_digests_track_hostname && m_session->client_myds->addr.addr) ca = m_session->client_myds->addr.addr; + uint64_t hash2; SpookyHash myhash; myhash.Init(19, 3); myhash.Update(ui->username, strlen(ui->username)); myhash.Update(&qp.digest, sizeof(qp.digest)); myhash.Update(ui->schemaname, strlen(ui->schemaname)); myhash.Update(&m_session->current_hostgroup, sizeof(m_session->current_hostgroup)); myhash.Update(ca, strlen(ca)); myhash.Final(&qp.digest_total, &hash2); - GloPgQPro->update_query_digest(qp.digest_total, qp.digest, qp.digest_text, m_session->current_hostgroup, ui, duration_us, m_session->thread->curtime, ca, affected_rows, rows_sent); - free(digest_text); + if (digest_text != qp.buf) free(digest_text); } if (fst_cmnt) free(fst_cmnt); } diff --git a/test/tap/tests/test_ffto_mysql-t.cpp b/test/tap/tests/test_ffto_mysql-t.cpp index 4b28206ae..55fe07be1 100644 --- a/test/tap/tests/test_ffto_mysql-t.cpp +++ b/test/tap/tests/test_ffto_mysql-t.cpp @@ -8,9 +8,24 @@ #include "command_line.h" #include "utils.h" +#define EXEC_QUERY(conn, q) \ + if (mysql_query(conn, q)) { \ + diag("Query failed: %s", mysql_error(conn)); \ + ok(0, "Query failed: %s", q); \ + return -1; \ + } else { \ + MYSQL_RES* dummy_res = mysql_store_result(conn); \ + if (dummy_res) mysql_free_result(dummy_res); \ + else if (mysql_field_count(conn) > 0) { \ + diag("Error storing result: %s", mysql_error(conn)); \ + return -1; \ + } \ + } + void verify_digest(MYSQL* admin, const char* template_text, int expected_count, uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) { char query[1024]; - sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent FROM stats_mysql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text); + // Use a more relaxed LIKE pattern to handle potential normalization differences + sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent, digest_text FROM stats_mysql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text); int rc = run_q(admin, query); if (rc != 0) { ok(0, "Failed to query stats_mysql_query_digest for %s", template_text); @@ -23,11 +38,20 @@ void verify_digest(MYSQL* admin, const char* template_text, int expected_count, uint64_t rows_affected = strtoull(row[1], NULL, 10); uint64_t rows_sent = strtoull(row[2], NULL, 10); - ok(count >= expected_count, "Found digest: %s (count: %d, expected: %d)", template_text, count, expected_count); - ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected); - ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent); + ok(count >= expected_count, "Found digest: %s (count: %d, expected: %d)", row[3], count, expected_count); + ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected); + ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent); } else { - ok(0, "Digest NOT found: %s", template_text); + ok(0, "Digest NOT found for pattern: %s", template_text); + // Dump the table to see what's actually in there + diag("Dumping stats_mysql_query_digest for debugging:"); + run_q(admin, "SELECT digest_text, count_star FROM stats_mysql_query_digest"); + MYSQL_RES* dump_res = mysql_store_result(admin); + MYSQL_ROW dump_row; + while (dump_res && (dump_row = mysql_fetch_row(dump_res))) { + diag(" Actual digest in table: %s", dump_row[0]); + } + if (dump_res) mysql_free_result(dump_res); } mysql_free_result(res); } @@ -39,11 +63,7 @@ int main(int argc, char** argv) { return -1; } - // We plan for: - // 1. Connection setup (2 ok) - // 2. Text CRUD (6 queries, each with 3 verifications = 18 ok) - // 3. Binary Prepared Stmts (1 insert, 1 select. 2 templates, each with 3 verifications = 6 ok) - plan(2 + (6*3) + (2*3)); // 2 + 18 + 6 = 26 + plan(1 + (6*3) + (1*3)); // 1 + 18 + 3 = 22 MYSQL* admin = mysql_init(NULL); if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { @@ -55,39 +75,57 @@ int main(int argc, char** argv) { MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='true' WHERE variable_name='mysql-ffto_enabled'"); MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='1048576' WHERE variable_name='mysql-ffto_max_buffer_size'"); MYSQL_QUERY(admin, "LOAD MYSQL VARIABLES TO RUNTIME"); - MYSQL_QUERY(admin, "UPDATE mysql_users SET fast_forward=1"); + + // Ensure root user has fast_forward enabled + MYSQL_QUERY(admin, "INSERT OR REPLACE INTO mysql_users (username, password, default_hostgroup, fast_forward) VALUES ('root', 'root', 0, 1)"); MYSQL_QUERY(admin, "LOAD MYSQL USERS TO RUNTIME"); + + // Ensure backend server exists + char server_query[1024]; + sprintf(server_query, "INSERT OR REPLACE INTO mysql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.mysql_host, cl.mysql_port); + MYSQL_QUERY(admin, server_query); + MYSQL_QUERY(admin, "LOAD MYSQL SERVERS TO RUNTIME"); + MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); // Reset stats + // USE ROOT FOR CLIENT CONNECTION MYSQL* conn = mysql_init(NULL); - if (!mysql_real_connect(conn, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { - diag("Client connection failed"); + if (!mysql_real_connect(conn, cl.host, "root", "root", NULL, cl.port, NULL, 0)) { + diag("Client connection failed: %s", mysql_error(conn)); return -1; } ok(conn != NULL, "Connected to ProxySQL in Fast Forward mode"); + // Create and use test database + EXEC_QUERY(conn, "CREATE DATABASE IF NOT EXISTS ffto_db"); + EXEC_QUERY(conn, "USE ffto_db"); + // --- Part 1: Text Protocol CRUD --- - mysql_query(conn, "DROP TABLE IF EXISTS ffto_test"); - mysql_query(conn, "CREATE TABLE ffto_test (id INT PRIMARY KEY, val VARCHAR(255))"); - mysql_query(conn, "INSERT INTO ffto_test VALUES (1, 'val1'), (2, 'val2')"); // 2 rows affected - mysql_query(conn, "UPDATE ffto_test SET val = 'updated' WHERE id = 1"); // 1 row affected - mysql_query(conn, "SELECT val FROM ffto_test WHERE id = 1"); // 1 row sent - mysql_query(conn, "DELETE FROM ffto_test WHERE id = 2"); // 1 row affected + EXEC_QUERY(conn, "DROP TABLE IF EXISTS ffto_test"); + EXEC_QUERY(conn, "CREATE TABLE ffto_test (id INT PRIMARY KEY, val VARCHAR(255))"); + EXEC_QUERY(conn, "INSERT INTO ffto_test VALUES (1, 'val1'), (2, 'val2')"); + EXEC_QUERY(conn, "UPDATE ffto_test SET val = 'updated' WHERE id = 1"); + EXEC_QUERY(conn, "SELECT val FROM ffto_test WHERE id = 1"); + EXEC_QUERY(conn, "DELETE FROM ffto_test WHERE id = 2"); // Verify Text Stats - verify_digest(admin, "DROP TABLE IF EXISTS ffto_test", 1, 0, 0); // DDL doesn't affect rows - verify_digest(admin, "CREATE TABLE ffto_test", 1, 0, 0); // DDL doesn't affect rows + verify_digest(admin, "DROP TABLE IF EXISTS ffto_test", 1, 0, 0); + verify_digest(admin, "CREATE TABLE ffto_test", 1, 0, 0); verify_digest(admin, "INSERT INTO ffto_test VALUES", 1, 2, 0); verify_digest(admin, "UPDATE ffto_test SET val", 1, 1, 0); - verify_digest(admin, "SELECT val FROM ffto_test WHERE id", 1, 0, 1); // 1 row sent + verify_digest(admin, "SELECT val FROM ffto_test WHERE id", 1, 0, 1); verify_digest(admin, "DELETE FROM ffto_test WHERE id", 1, 1, 0); // --- Part 2: Binary Protocol (Prepared Statements) --- - MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); // Reset stats for prepared statements + MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); MYSQL_STMT *stmt = mysql_stmt_init(conn); const char* ins_query = "INSERT INTO ffto_test (id, val) VALUES (?, ?)"; - mysql_stmt_prepare(stmt, ins_query, strlen(ins_query)); + if (mysql_stmt_prepare(stmt, ins_query, strlen(ins_query))) { + diag("mysql_stmt_prepare failed: %s", mysql_stmt_error(stmt)); + ok(0, "mysql_stmt_prepare failed"); + return -1; + } MYSQL_BIND bind[2]; int int_data = 10; @@ -102,12 +140,22 @@ int main(int argc, char** argv) { bind[1].buffer_length = 20; bind[1].length = &str_len; - mysql_stmt_bind_param(stmt, bind); - mysql_stmt_execute(stmt); // Run once, 1 row affected - mysql_stmt_execute(stmt); // Run twice to check count_star, 1 row affected + if (mysql_stmt_bind_param(stmt, bind)) { + diag("mysql_stmt_bind_param failed: %s", mysql_stmt_error(stmt)); + return -1; + } + if (mysql_stmt_execute(stmt)) { + diag("mysql_stmt_execute (1) failed: %s", mysql_stmt_error(stmt)); + return -1; + } + int_data = 11; // Change ID for second insert + if (mysql_stmt_execute(stmt)) { + diag("mysql_stmt_execute (2) failed: %s", mysql_stmt_error(stmt)); + return -1; + } // Verify Binary Stats - verify_digest(admin, "INSERT INTO ffto_test (id, val) VALUES (?, ?)", 2, 2, 0); // Each execute affects 1 row, executed twice = 2 + verify_digest(admin, "INSERT INTO ffto_test (id,val) VALUES (?,?)", 2, 2, 0); mysql_stmt_close(stmt); diff --git a/test/tap/tests/test_ffto_pgsql-t.cpp b/test/tap/tests/test_ffto_pgsql-t.cpp index 5c23f61f1..d55ba4eb5 100644 --- a/test/tap/tests/test_ffto_pgsql-t.cpp +++ b/test/tap/tests/test_ffto_pgsql-t.cpp @@ -8,13 +8,25 @@ #include "command_line.h" #include "tap.h" #include "utils.h" -#include "mysql.h" // For admin connection +#include "mysql.h" CommandLine cl; +#define EXEC_PG_QUERY(conn, q) \ + { \ + PGresult* res_exec = PQexec(conn, q); \ + if (PQresultStatus(res_exec) != PGRES_COMMAND_OK && PQresultStatus(res_exec) != PGRES_TUPLES_OK) { \ + diag("PG Query failed: %s", PQerrorMessage(conn)); \ + ok(0, "PG Query failed: %s", q); \ + PQclear(res_exec); \ + return -1; \ + } \ + PQclear(res_exec); \ + } + void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_count, uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) { char query[1024]; - sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent FROM stats_pgsql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text); + sprintf(query, "SELECT count_star, sum_rows_affected, sum_rows_sent, digest_text FROM stats_pgsql_query_digest WHERE digest_text LIKE '%%%s%%'", template_text); int rc = run_q(admin, query); if (rc != 0) { ok(0, "Failed to query stats_pgsql_query_digest for %s", template_text); @@ -27,11 +39,19 @@ void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_coun uint64_t rows_affected = strtoull(row[1], NULL, 10); uint64_t rows_sent = strtoull(row[2], NULL, 10); - ok(count >= expected_count, "Found PG digest: %s (count: %d, expected: %d)", template_text, count, expected_count); - ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected); - ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", template_text, (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent); + ok(count >= expected_count, "Found PG digest: %s (count: %d, expected: %d)", row[3], count, expected_count); + ok(rows_affected == expected_rows_affected, "Affected rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected); + ok(rows_sent == expected_rows_sent, "Sent rows for %s: %llu (expected: %llu)", row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent); } else { - ok(0, "PG Digest NOT found: %s", template_text); + ok(0, "PG Digest NOT found for pattern: %s", template_text); + diag("Dumping stats_pgsql_query_digest for debugging:"); + run_q(admin, "SELECT digest_text, count_star FROM stats_pgsql_query_digest"); + MYSQL_RES* dump_res = mysql_store_result(admin); + MYSQL_ROW dump_row; + while (dump_res && (dump_row = mysql_fetch_row(dump_res))) { + diag(" Actual PG digest in table: %s", dump_row[0]); + } + if (dump_res) mysql_free_result(dump_res); } mysql_free_result(res); } @@ -42,11 +62,7 @@ int main(int argc, char** argv) { return -1; } - // Plan: - // 1. Connection setup (2 ok) - // 2. Simple CRUD (5 queries, each with 3 verifications = 15 ok) - // 3. Extended Query (1 template, with 3 verifications = 3 ok) - plan(2 + (5*3) + 3); // 2 + 15 + 3 = 20 + plan(1 + (6*3) + (1*3)); // 1 (connect) + 18 (simple) + 3 (extended) = 22 MYSQL* admin = mysql_init(NULL); if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { @@ -58,14 +74,23 @@ int main(int argc, char** argv) { MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='true' WHERE variable_name='pgsql-ffto_enabled'"); MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='1048576' WHERE variable_name='pgsql-ffto_max_buffer_size'"); MYSQL_QUERY(admin, "LOAD PGSQL VARIABLES TO RUNTIME"); - MYSQL_QUERY(admin, "UPDATE pgsql_users SET fast_forward=1"); + + // Ensure root user exists + MYSQL_QUERY(admin, "INSERT OR REPLACE INTO pgsql_users (username, password, fast_forward) VALUES ('postgres', 'postgres', 1)"); MYSQL_QUERY(admin, "LOAD PGSQL USERS TO RUNTIME"); + + // Ensure backend server exists + char server_query[1024]; + sprintf(server_query, "INSERT OR REPLACE INTO pgsql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.pgsql_server_host, cl.pgsql_server_port); + MYSQL_QUERY(admin, server_query); + MYSQL_QUERY(admin, "LOAD PGSQL SERVERS TO RUNTIME"); + MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest"); - // Standard libpq connection + // Standard libpq connection using root (postgres) char conninfo[1024]; sprintf(conninfo, "host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable", - cl.pgsql_host, cl.pgsql_port, cl.pgsql_username, cl.pgsql_password); + cl.pgsql_host, cl.pgsql_port, cl.pgsql_root_username, cl.pgsql_root_password); PGconn* conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK) { @@ -75,32 +100,37 @@ int main(int argc, char** argv) { ok(conn != NULL, "Connected to PostgreSQL via ProxySQL"); // --- Part 1: Simple Query Protocol --- - PQclear(PQexec(conn, "DROP TABLE IF EXISTS ffto_pg_test")); - PQclear(PQexec(conn, "CREATE TABLE ffto_pg_test (id INT PRIMARY KEY, data TEXT)")); - PQclear(PQexec(conn, "INSERT INTO ffto_pg_test VALUES (1, 'val1'), (2, 'val2')")); // 2 rows affected - PQclear(PQexec(conn, "SELECT data FROM ffto_pg_test WHERE id = 1")); // 1 row sent - PQclear(PQexec(conn, "UPDATE ffto_pg_test SET data = 'updated' WHERE id = 1")); // 1 row affected - PQclear(PQexec(conn, "DELETE FROM ffto_pg_test WHERE id = 2")); // 1 row affected - - verify_pg_digest(admin, "DROP TABLE IF EXISTS ffto_pg_test", 1, 0, 0); // DDL - verify_pg_digest(admin, "CREATE TABLE ffto_pg_test", 1, 0, 0); // DDL + EXEC_PG_QUERY(conn, "DROP TABLE IF EXISTS ffto_pg_test"); + EXEC_PG_QUERY(conn, "CREATE TABLE ffto_pg_test (id INT PRIMARY KEY, data TEXT)"); + EXEC_PG_QUERY(conn, "INSERT INTO ffto_pg_test VALUES (1, 'val1'), (2, 'val2')"); + EXEC_PG_QUERY(conn, "SELECT data FROM ffto_pg_test WHERE id = 1"); + EXEC_PG_QUERY(conn, "UPDATE ffto_pg_test SET data = 'updated' WHERE id = 1"); + EXEC_PG_QUERY(conn, "DELETE FROM ffto_pg_test WHERE id = 2"); + + verify_pg_digest(admin, "DROP TABLE IF EXISTS ffto_pg_test", 1, 0, 0); + verify_pg_digest(admin, "CREATE TABLE ffto_pg_test", 1, 0, 0); verify_pg_digest(admin, "INSERT INTO ffto_pg_test VALUES", 1, 2, 0); - verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1); // 1 row sent for SELECT + verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1); verify_pg_digest(admin, "UPDATE ffto_pg_test SET data", 1, 1, 0); verify_pg_digest(admin, "DELETE FROM ffto_pg_test WHERE id", 1, 1, 0); // --- Part 2: Extended Query Protocol --- - MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest"); // Reset stats for prepared statements + MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest"); const char* ext_query = "SELECT data FROM ffto_pg_test WHERE id = $1"; PGresult* res_prep = PQprepare(conn, "stmt1", ext_query, 1, NULL); if (PQresultStatus(res_prep) != PGRES_COMMAND_OK) { diag("PQprepare failed: %s", PQerrorMessage(conn)); + return -1; } PQclear(res_prep); const char* paramValues[1] = {"1"}; PGresult* res_exec = PQexecPrepared(conn, "stmt1", 1, paramValues, NULL, NULL, 0); + if (PQresultStatus(res_exec) != PGRES_TUPLES_OK) { + diag("PQexecPrepared failed: %s", PQerrorMessage(conn)); + return -1; + } PQclear(res_exec); verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1);