diff --git a/doc/ffto_design.md b/doc/ffto_design.md index 2d6484dfa..65826175d 100644 --- a/doc/ffto_design.md +++ b/doc/ffto_design.md @@ -27,14 +27,14 @@ Defined in `include/TrafficObserver.hpp`. This interface decouples protocol-spec ### 4.2. MySQL FFTO Implementation (`MySQLFFTO`) Implements the MySQL wire protocol (version 10) state machine. -- **States**: `IDLE`, `WAITING_FOR_RESPONSE`, `READING_COLUMN_DEFS`, `READING_ROWS`, `SKIP_PREPARE_RESPONSE`. +- **States**: `IDLE`, `AWAITING_PREPARE_OK`, `AWAITING_RESPONSE`, `READING_COLUMNS`, `READING_ROWS`. - **Prepared Statement Tracking**: Maintains a session-local map of `stmt_id` to query templates captured during the `PREPARE` phase. ### 4.3. PostgreSQL FFTO Implementation (`PgSQLFFTO`) Handles the message-oriented PostgreSQL protocol. - **Request Identification**: Detects `Query` ('Q'), `Parse` ('P'), `Bind` ('B'), and `Execute` ('E') messages. -- **Response Identification**: Tracks `CommandComplete` ('C') and `ErrorResponse` ('E'). -- **Extended Query Tracking**: Tracks the association between Portals and Prepared Statements. +- **Response Identification**: Tracks `CommandComplete` ('C'), `ReadyForQuery` ('Z'), and `ErrorResponse` ('E'). +- **Extended Query Tracking**: Tracks the association between Portals and Prepared Statements, and queues pipelined executes so responses are attributed to the correct query text. ## 5. Protocol and Security Details - **Encryption**: FFTO operates on protocol packets that are already decrypted by ProxySQL's session handler. This allows ProxySQL to mix encrypted and unencrypted backend/frontend connections while maintaining consistent monitoring in FF mode. @@ -84,9 +84,10 @@ To verify that FFTO is capturing traffic in Fast Forward mode: ## 8. Protocol Support - **Text and Binary Protocols**: FFTO supports both standard text-based queries and the binary protocol used by prepared statements. -- **MySQL Binary Protocol**: Corrects captures `COM_STMT_PREPARE` and `COM_STMT_EXECUTE`, tracking statement IDs to their respective SQL text. +- **MySQL Binary Protocol**: Correctly captures `COM_STMT_PREPARE` and `COM_STMT_EXECUTE`, tracking statement IDs to their respective SQL text. - **PostgreSQL Extended Query**: Supports the multi-phase `Parse` -> `Bind` -> `Execute` sequence by tracking Statement and Portal mappings. ## 9. Limitations - **Large Payloads**: Packets exceeding the `*-ffto_max_buffer_size` threshold cause FFTO to be bypassed for that session. - **X-Protocol**: Currently optimized for classic MySQL and PostgreSQL protocols. +- **PostgreSQL Command Tags**: `sum_rows_affected`/`sum_rows_sent` are derived from `CommandComplete` tags and currently cover common commands (`INSERT`, `UPDATE`, `DELETE`, `COPY`, `MERGE`, `SELECT`, `FETCH`, `MOVE`). diff --git a/include/MySQLFFTO.hpp b/include/MySQLFFTO.hpp index 92ae34752..375bf0f81 100644 --- a/include/MySQLFFTO.hpp +++ b/include/MySQLFFTO.hpp @@ -91,6 +91,9 @@ private: */ void process_server_packet(const unsigned char* data, size_t len); + bool is_in_flight_query_state() const; + void clear_active_query(); + /** * @brief Computes and records query metrics into the ProxySQL query digests. * @param query The SQL query text. diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index f3243c751..9ba70990e 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -274,6 +274,7 @@ class MySQL_Session: public Base_Session #include #include #include @@ -69,6 +70,14 @@ private: unsigned long long m_query_start_time; ///< Start timestamp of the current query in microseconds. uint64_t m_affected_rows {0}; ///< Accumulated affected rows for the current query. uint64_t m_rows_sent {0}; ///< Accumulated rows sent for the current query. + bool m_current_finalize_on_sync {false}; ///< Whether current query finalizes on ReadyForQuery ('Z'). + + struct PendingQuery { + std::string query; + unsigned long long start_time {0}; + bool finalize_on_sync {false}; + }; + std::deque m_pending_queries; ///< Queued queries for pipelined traffic. // Binary Protocol Tracking (PostgreSQL Extended Query) std::unordered_map m_statements; ///< Map of statement names to original SQL text. @@ -90,6 +99,11 @@ private: */ void process_server_message(char type, const unsigned char* payload, size_t len); + void track_query(std::string query, bool finalize_on_sync); + void clear_current_query(); + void activate_next_query(); + void finalize_current_query(); + /** * @brief Computes and records query metrics into the ProxySQL query digests. * @param query The SQL query text. diff --git a/lib/MySQLFFTO.cpp b/lib/MySQLFFTO.cpp index 5a4a20964..4a9408775 100644 --- a/lib/MySQLFFTO.cpp +++ b/lib/MySQLFFTO.cpp @@ -109,11 +109,24 @@ void MySQLFFTO::on_server_data(const char* buf, std::size_t len) { } void MySQLFFTO::on_close() { - if (m_state != IDLE && m_query_start_time != 0) { + if (is_in_flight_query_state() && m_query_start_time != 0) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent); - m_state = IDLE; } + m_state = IDLE; + clear_active_query(); + m_pending_prepare_query.clear(); +} + +bool MySQLFFTO::is_in_flight_query_state() const { + return m_state == AWAITING_RESPONSE || m_state == READING_COLUMNS || m_state == READING_ROWS; +} + +void MySQLFFTO::clear_active_query() { + m_current_query.clear(); + m_query_start_time = 0; + m_affected_rows = 0; + m_rows_sent = 0; } void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) { @@ -162,8 +175,10 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4); m_statements[stmt_id] = m_pending_prepare_query; m_state = IDLE; + m_pending_prepare_query.clear(); } else if (first_byte == 0xFF) { m_state = IDLE; + m_pending_prepare_query.clear(); } } else if (m_state == AWAITING_RESPONSE) { if (first_byte == 0x00) { // OK @@ -172,12 +187,15 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, m_affected_rows, 0); m_state = IDLE; + clear_active_query(); } else if (first_byte == 0xFF) { // ERR unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration); m_state = IDLE; + clear_active_query(); } else if (first_byte == 0xFE && len < 9) { // EOF m_state = IDLE; + clear_active_query(); } else { // Result Set started (first_byte is column count) m_state = READING_COLUMNS; } @@ -186,18 +204,28 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) { m_state = READING_ROWS; } else if (first_byte == 0xFE && len >= 9 && deprecate_eof) { m_state = READING_ROWS; + } else if (first_byte == 0xFF) { // ERR while reading column metadata + unsigned long long duration = monotonic_time() - m_query_start_time; + report_query_stats(m_current_query, duration); + m_state = IDLE; + clear_active_query(); } } else if (m_state == READING_ROWS) { if (first_byte == 0xFE && len < 9) { // EOF after rows unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, 0, m_rows_sent); m_state = IDLE; + clear_active_query(); } else if (first_byte == 0xFE && len >= 9 && deprecate_eof) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, 0, m_rows_sent); m_state = IDLE; + clear_active_query(); } else if (first_byte == 0xFF) { // ERR + unsigned long long duration = monotonic_time() - m_query_start_time; + report_query_stats(m_current_query, duration, 0, m_rows_sent); m_state = IDLE; + clear_active_query(); } else { m_rows_sent++; } diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index ac095769f..1023307e0 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -4747,19 +4747,7 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) { } break; case FAST_FORWARD: - if (mysql_thread___ffto_enabled && !ffto_bypassed) { - if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) { - ffto_bypassed = true; - m_ffto.reset(); - } else { - if (!m_ffto) { - m_ffto = std::make_unique(this); - } - if (m_ffto) { - m_ffto->on_client_data((const char*)pkt.ptr, pkt.size); - } - } - } + observe_ffto_client_packet(pkt); mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); /* * Fast Forward Grace Close Logic: @@ -4806,19 +4794,7 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) { // 'FAST_FORWARD' should be pushed to 'PSarrayOUT'. case CONNECTING_SERVER: if (previous_status.empty() == false && previous_status.top() == FAST_FORWARD) { - if (mysql_thread___ffto_enabled && !ffto_bypassed) { - if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) { - ffto_bypassed = true; - m_ffto.reset(); - } else { - if (!m_ffto) { - m_ffto = std::make_unique(this); - } - if (m_ffto) { - m_ffto->on_client_data((const char*)pkt.ptr, pkt.size); - } - } - } + observe_ffto_client_packet(pkt); mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); break; } @@ -4832,6 +4808,24 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) { return handler_ret; } +void MySQL_Session::observe_ffto_client_packet(const PtrSize_t& pkt) { + if (!pkt.ptr || pkt.size == 0) return; + if (!mysql_thread___ffto_enabled || ffto_bypassed) return; + + if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) { + ffto_bypassed = true; + m_ffto.reset(); + return; + } + + if (!m_ffto) { + m_ffto = std::make_unique(this); + } + if (m_ffto) { + m_ffto->on_client_data((const char*)pkt.ptr, pkt.size); + } +} + void MySQL_Session::GPFC_DetectedMultiPacket_SetDDS() { // this is handled only for real traffic, not mirror switch (client_myds->DSS) { // real traffic only @@ -4864,19 +4858,7 @@ int MySQL_Session::GPFC_WaitingClientData_FastForwardSession(PtrSize_t& pkt) { mybe=find_or_create_backend(current_hostgroup); // set a backend mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active - if (mysql_thread___ffto_enabled && !ffto_bypassed) { - if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) { - ffto_bypassed = true; - m_ffto.reset(); - } else { - if (!m_ffto) { - m_ffto = std::make_unique(this); - } - if (m_ffto) { - m_ffto->on_client_data((const char*)pkt.ptr, pkt.size); - } - } - } + observe_ffto_client_packet(pkt); mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection diff --git a/lib/PgSQLFFTO.cpp b/lib/PgSQLFFTO.cpp index fec395e58..c085414cd 100644 --- a/lib/PgSQLFFTO.cpp +++ b/lib/PgSQLFFTO.cpp @@ -11,8 +11,9 @@ #endif #include "c_tokenizer.h" #include +#include +#include #include -#include extern class PgSQL_Query_Processor* GloPgQPro; @@ -20,8 +21,8 @@ extern class PgSQL_Query_Processor* GloPgQPro; * @brief Parses the PostgreSQL CommandComplete ('C') message payload to extract row counts. * * PostgreSQL encodes row counts into the message tag string (e.g., "INSERT 0 10", "SELECT 50"). - * This function uses regular expressions to extract these values and determine if the message - * corresponds to a result-generating command (SELECT, FETCH, MOVE) or a DML command. + * This function performs lightweight token parsing to extract these values and determine if + * the message corresponds to a result-generating command (SELECT, FETCH, MOVE) or a DML command. * * @param payload Pointer to the CommandComplete message payload (the tag string). * @param len Length of the payload. @@ -29,20 +30,38 @@ extern class PgSQL_Query_Processor* GloPgQPro; * @return The number of rows affected or sent. */ static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t len, bool& is_select) { - if (len == 0) return 0; - std::string command_tag(reinterpret_cast(payload), len); is_select = false; - static const std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\b.*?\\b(\\d+)$"); - std::smatch matches; - if (std::regex_search(command_tag, matches, re)) { - if (matches.size() == 3) { - std::string command_type = matches[1].str(); - uint64_t rows = std::stoull(matches[2].str()); - if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") is_select = true; - return rows; - } + if (len == 0) return 0; + + size_t begin = 0; + while (begin < len && std::isspace(payload[begin])) begin++; + while (len > begin && (payload[len - 1] == '\0' || std::isspace(payload[len - 1]))) len--; + if (begin >= len) return 0; + + std::string command_tag(reinterpret_cast(payload + begin), len - begin); + + size_t first_space = command_tag.find(' '); + if (first_space == std::string::npos) return 0; + + std::string command_type = command_tag.substr(0, first_space); + if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") { + is_select = true; + } else if (command_type != "INSERT" && command_type != "UPDATE" && + command_type != "DELETE" && command_type != "COPY" && + command_type != "MERGE") { + return 0; + } + + size_t last_space = command_tag.rfind(' '); + if (last_space == std::string::npos || last_space + 1 >= command_tag.size()) return 0; + + const char* rows_str = command_tag.c_str() + last_space + 1; + char* endptr = nullptr; + unsigned long long rows = std::strtoull(rows_str, &endptr, 10); + if (endptr == rows_str || *endptr != '\0') { + return 0; } - return 0; + return rows; } PgSQLFFTO::PgSQLFFTO(PgSQL_Session* session) @@ -110,20 +129,69 @@ void PgSQLFFTO::on_server_data(const char* buf, std::size_t len) { } void PgSQLFFTO::on_close() { - if (m_state != IDLE && m_query_start_time != 0) { + if (m_state == AWAITING_RESPONSE && !m_current_query.empty() && m_query_start_time != 0) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent); + } + clear_current_query(); + m_pending_queries.clear(); + m_state = IDLE; +} + +void PgSQLFFTO::track_query(std::string query, bool finalize_on_sync) { + if (query.empty()) return; + + PendingQuery pending { std::move(query), monotonic_time(), finalize_on_sync }; + if (m_state == IDLE || m_current_query.empty()) { + m_current_query = std::move(pending.query); + m_query_start_time = pending.start_time; + m_current_finalize_on_sync = pending.finalize_on_sync; + m_affected_rows = 0; + m_rows_sent = 0; + m_state = AWAITING_RESPONSE; + return; + } + + m_pending_queries.emplace_back(std::move(pending)); +} + +void PgSQLFFTO::clear_current_query() { + m_current_query.clear(); + m_query_start_time = 0; + m_affected_rows = 0; + m_rows_sent = 0; + m_current_finalize_on_sync = false; +} + +void PgSQLFFTO::activate_next_query() { + if (m_pending_queries.empty()) { + clear_current_query(); m_state = IDLE; + return; + } + + PendingQuery next_query = std::move(m_pending_queries.front()); + m_pending_queries.pop_front(); + m_current_query = std::move(next_query.query); + m_query_start_time = next_query.start_time; + m_current_finalize_on_sync = next_query.finalize_on_sync; + m_affected_rows = 0; + m_rows_sent = 0; + m_state = AWAITING_RESPONSE; +} + +void PgSQLFFTO::finalize_current_query() { + if (!m_current_query.empty() && m_query_start_time != 0) { + unsigned long long duration = monotonic_time() - m_query_start_time; + report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent); } + activate_next_query(); } void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, size_t len) { if (type == 'Q') { size_t query_len = (len > 0 && payload[len - 1] == 0) ? len - 1 : len; - m_current_query = std::string(reinterpret_cast(payload), query_len); - m_query_start_time = monotonic_time(); - m_state = AWAITING_RESPONSE; - m_affected_rows = 0; m_rows_sent = 0; + track_query(std::string(reinterpret_cast(payload), query_len), true); } else if (type == 'P') { const char* p = reinterpret_cast(payload); size_t name_len = strnlen(p, len); @@ -132,6 +200,7 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, const char* query_ptr = p + name_len + 1; size_t rem = len - (name_len + 1); size_t query_text_len = strnlen(query_ptr, rem); + if (query_text_len >= rem) return; m_statements[stmt_name] = std::string(query_ptr, query_text_len); } else if (type == 'B') { const char* p = reinterpret_cast(payload); @@ -146,25 +215,22 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, } else if (type == 'E') { const char* p = reinterpret_cast(payload); size_t portal_len = strnlen(p, len); - std::string portal_name(p, std::min(portal_len, len)); + if (portal_len >= len) return; + if (len < portal_len + 1 + 4) return; // portal name + '\0' + max-rows + std::string portal_name(p, portal_len); auto pit = m_portals.find(portal_name); if (pit != m_portals.end()) { auto sit = m_statements.find(pit->second); if (sit != m_statements.end()) { - if (m_state == AWAITING_RESPONSE) { - on_close(); // Finalize previous if any - } - m_current_query = sit->second; - m_query_start_time = monotonic_time(); - m_state = AWAITING_RESPONSE; - m_affected_rows = 0; m_rows_sent = 0; + track_query(sit->second, false); } } } else if (type == 'C') { // Frontend Close - if (len < 1) return; + if (len < 2) return; char close_type = static_cast(payload[0]); const char* name_ptr = reinterpret_cast(payload) + 1; size_t name_len = strnlen(name_ptr, len - 1); + if (name_len >= len - 1) return; std::string name(name_ptr, name_len); if (close_type == 'S') m_statements.erase(name); else if (close_type == 'P') m_portals.erase(name); @@ -180,14 +246,19 @@ void PgSQLFFTO::process_server_message(char type, const unsigned char* payload, uint64_t rows = extract_pg_rows_affected(payload, len, is_select); if (is_select) m_rows_sent += rows; else m_affected_rows += rows; - } else if (type == 'Z' || type == 'E') { - // ReadyForQuery or ErrorResponse - if (m_state == AWAITING_RESPONSE) { + if (!m_current_finalize_on_sync) { + finalize_current_query(); + } + } else if (type == 'Z') { + finalize_current_query(); + } else if (type == 'E') { + if (!m_current_query.empty() && m_query_start_time != 0) { unsigned long long duration = monotonic_time() - m_query_start_time; report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent); } + clear_current_query(); + m_pending_queries.clear(); m_state = IDLE; - m_affected_rows = 0; m_rows_sent = 0; } } diff --git a/test/tap/tests/test_ffto_mysql-t.cpp b/test/tap/tests/test_ffto_mysql-t.cpp index ea75c1969..e5bcb588d 100644 --- a/test/tap/tests/test_ffto_mysql-t.cpp +++ b/test/tap/tests/test_ffto_mysql-t.cpp @@ -8,19 +8,32 @@ #include "command_line.h" #include "utils.h" +static constexpr int kPlannedTests = 22; + +#define FAIL_AND_SKIP_REMAINING(cleanup_label, fmt, ...) \ + do { \ + diag(fmt, ##__VA_ARGS__); \ + int remaining = kPlannedTests - tests_last(); \ + if (remaining > 0) { \ + skip(remaining, "Skipping remaining assertions after setup failure"); \ + } \ + goto cleanup_label; \ + } while (0) + #define EXEC_QUERY(conn, q) \ - if (mysql_query(conn, q)) { \ - diag("Query failed: %s", mysql_error(conn)); \ - ok(0, "Query failed: %s", q); \ - return -1; \ - } else { \ + do { \ + if (mysql_query(conn, q)) { \ + ok(0, "Query failed: %s", q); \ + FAIL_AND_SKIP_REMAINING(cleanup, "Query failed: %s", mysql_error(conn)); \ + } \ MYSQL_RES* dummy_res = mysql_store_result(conn); \ - if (dummy_res) mysql_free_result(dummy_res); \ - else if (mysql_field_count(conn) > 0) { \ - diag("Error storing result: %s", mysql_error(conn)); \ - return -1; \ + if (dummy_res) { \ + mysql_free_result(dummy_res); \ + } else if (mysql_field_count(conn) > 0) { \ + ok(0, "Failed to store result for query: %s", q); \ + FAIL_AND_SKIP_REMAINING(cleanup, "Error storing result: %s", mysql_error(conn)); \ } \ - } + } while (0) void verify_digest(MYSQL* admin, const char* template_text, int expected_count, uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) { char query[1024]; @@ -67,9 +80,17 @@ int main(int argc, char** argv) { return -1; } - plan(1 + (6*3) + (1*3)); // 1 + 18 + 3 = 22 + plan(kPlannedTests); // 1 + 18 + 3 = 22 MYSQL* admin = mysql_init(NULL); + MYSQL* conn = NULL; + MYSQL_STMT* stmt = NULL; + char server_query[1024]; + const char* ins_query = "INSERT INTO ffto_test (id, val) VALUES (?, ?)"; + MYSQL_BIND bind[2]; + int int_data = 10; + char str_data[20] = "binary_val"; + unsigned long str_len = strlen(str_data); if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { diag("Admin connection failed"); return -1; @@ -85,7 +106,6 @@ int main(int argc, char** argv) { MYSQL_QUERY(admin, "LOAD MYSQL USERS TO RUNTIME"); // Ensure backend server exists - char server_query[1024]; snprintf(server_query, sizeof(server_query), "INSERT OR REPLACE INTO mysql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.mysql_host, cl.mysql_port); MYSQL_QUERY(admin, server_query); MYSQL_QUERY(admin, "LOAD MYSQL SERVERS TO RUNTIME"); @@ -93,7 +113,7 @@ int main(int argc, char** argv) { MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); // Reset stats // USE ROOT FOR CLIENT CONNECTION - MYSQL* conn = mysql_init(NULL); + conn = mysql_init(NULL); if (!mysql_real_connect(conn, cl.host, "root", "root", NULL, cl.port, NULL, 0)) { diag("Client connection failed: %s", mysql_error(conn)); return -1; @@ -123,19 +143,12 @@ int main(int argc, char** argv) { // --- Part 2: Binary Protocol (Prepared Statements) --- MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); - MYSQL_STMT *stmt = mysql_stmt_init(conn); - const char* ins_query = "INSERT INTO ffto_test (id, val) VALUES (?, ?)"; + stmt = mysql_stmt_init(conn); if (mysql_stmt_prepare(stmt, ins_query, strlen(ins_query))) { - diag("mysql_stmt_prepare failed: %s", mysql_stmt_error(stmt)); ok(0, "mysql_stmt_prepare failed"); - return -1; + FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_prepare failed: %s", mysql_stmt_error(stmt)); } - MYSQL_BIND bind[2]; - int int_data = 10; - char str_data[20] = "binary_val"; - unsigned long str_len = strlen(str_data); - memset(bind, 0, sizeof(bind)); bind[0].buffer_type = MYSQL_TYPE_LONG; bind[0].buffer = (char *)&int_data; @@ -145,26 +158,26 @@ int main(int argc, char** argv) { bind[1].length = &str_len; if (mysql_stmt_bind_param(stmt, bind)) { - diag("mysql_stmt_bind_param failed: %s", mysql_stmt_error(stmt)); - return -1; + ok(0, "mysql_stmt_bind_param failed"); + FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_bind_param failed: %s", mysql_stmt_error(stmt)); } if (mysql_stmt_execute(stmt)) { - diag("mysql_stmt_execute (1) failed: %s", mysql_stmt_error(stmt)); - return -1; + ok(0, "mysql_stmt_execute (1) failed"); + FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_execute (1) failed: %s", mysql_stmt_error(stmt)); } int_data = 11; // Change ID for second insert if (mysql_stmt_execute(stmt)) { - diag("mysql_stmt_execute (2) failed: %s", mysql_stmt_error(stmt)); - return -1; + ok(0, "mysql_stmt_execute (2) failed"); + FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_execute (2) failed: %s", mysql_stmt_error(stmt)); } // Verify Binary Stats verify_digest(admin, "INSERT INTO ffto_test (id,val) VALUES (?,?)", 2, 2, 0); - mysql_stmt_close(stmt); - - mysql_close(conn); - mysql_close(admin); +cleanup: + if (stmt) mysql_stmt_close(stmt); + if (conn) mysql_close(conn); + if (admin) mysql_close(admin); return exit_status(); } diff --git a/test/tap/tests/test_ffto_pgsql-t.cpp b/test/tap/tests/test_ffto_pgsql-t.cpp index 7e6003fbf..0c668dcb2 100644 --- a/test/tap/tests/test_ffto_pgsql-t.cpp +++ b/test/tap/tests/test_ffto_pgsql-t.cpp @@ -12,14 +12,29 @@ CommandLine cl; +static constexpr int kPlannedTests = 22; + +#define FAIL_AND_SKIP_REMAINING(cleanup_label, fmt, ...) \ + do { \ + diag(fmt, ##__VA_ARGS__); \ + int remaining = kPlannedTests - tests_last(); \ + if (remaining > 0) { \ + skip(remaining, "Skipping remaining assertions after setup failure"); \ + } \ + goto cleanup_label; \ + } while (0) + #define EXEC_PG_QUERY(conn, q) \ { \ PGresult* res_exec = PQexec(conn, q); \ + if (!res_exec) { \ + ok(0, "PG Query failed: %s", q); \ + FAIL_AND_SKIP_REMAINING(cleanup, "PG Query returned no result: %s", PQerrorMessage(conn)); \ + } \ if (PQresultStatus(res_exec) != PGRES_COMMAND_OK && PQresultStatus(res_exec) != PGRES_TUPLES_OK) { \ - diag("PG Query failed: %s", PQerrorMessage(conn)); \ ok(0, "PG Query failed: %s", q); \ PQclear(res_exec); \ - return -1; \ + FAIL_AND_SKIP_REMAINING(cleanup, "PG Query failed: %s", PQerrorMessage(conn)); \ } \ PQclear(res_exec); \ } @@ -66,9 +81,16 @@ int main(int argc, char** argv) { return -1; } - plan(1 + (6*3) + (1*3)); // 1 (connect) + 18 (simple) + 3 (extended) = 22 + plan(kPlannedTests); // 1 (connect) + 18 (simple) + 3 (extended) = 22 MYSQL* admin = mysql_init(NULL); + PGconn* conn = NULL; + char server_query[1024]; + char conninfo[1024]; + const char* ext_query = "SELECT data FROM ffto_pg_test WHERE id = $1"; + PGresult* res_prep = NULL; + const char* paramValues[1] = {"1"}; + PGresult* res_exec = NULL; if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { diag("Admin connection failed"); return -1; @@ -84,7 +106,6 @@ int main(int argc, char** argv) { MYSQL_QUERY(admin, "LOAD PGSQL USERS TO RUNTIME"); // Ensure backend server exists - char server_query[1024]; snprintf(server_query, sizeof(server_query), "INSERT OR REPLACE INTO pgsql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.pgsql_server_host, cl.pgsql_server_port); MYSQL_QUERY(admin, server_query); MYSQL_QUERY(admin, "LOAD PGSQL SERVERS TO RUNTIME"); @@ -92,11 +113,10 @@ int main(int argc, char** argv) { MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest"); // Standard libpq connection using root (postgres) - char conninfo[1024]; snprintf(conninfo, sizeof(conninfo), "host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable", cl.pgsql_host, cl.pgsql_port, cl.pgsql_root_username, cl.pgsql_root_password); - PGconn* conn = PQconnectdb(conninfo); + conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK) { diag("PG Connection failed: %s", PQerrorMessage(conn)); return -1; @@ -121,26 +141,35 @@ int main(int argc, char** argv) { // --- Part 2: Extended Query Protocol --- MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest"); - const char* ext_query = "SELECT data FROM ffto_pg_test WHERE id = $1"; - PGresult* res_prep = PQprepare(conn, "stmt1", ext_query, 1, NULL); + res_prep = PQprepare(conn, "stmt1", ext_query, 1, NULL); + if (!res_prep) { + ok(0, "PQprepare failed"); + FAIL_AND_SKIP_REMAINING(cleanup, "PQprepare returned no result: %s", PQerrorMessage(conn)); + } if (PQresultStatus(res_prep) != PGRES_COMMAND_OK) { - diag("PQprepare failed: %s", PQerrorMessage(conn)); - return -1; + ok(0, "PQprepare failed"); + PQclear(res_prep); + FAIL_AND_SKIP_REMAINING(cleanup, "PQprepare failed: %s", PQerrorMessage(conn)); } PQclear(res_prep); - const char* paramValues[1] = {"1"}; - PGresult* res_exec = PQexecPrepared(conn, "stmt1", 1, paramValues, NULL, NULL, 0); + res_exec = PQexecPrepared(conn, "stmt1", 1, paramValues, NULL, NULL, 0); + if (!res_exec) { + ok(0, "PQexecPrepared failed"); + FAIL_AND_SKIP_REMAINING(cleanup, "PQexecPrepared returned no result: %s", PQerrorMessage(conn)); + } if (PQresultStatus(res_exec) != PGRES_TUPLES_OK) { - diag("PQexecPrepared failed: %s", PQerrorMessage(conn)); - return -1; + ok(0, "PQexecPrepared failed"); + PQclear(res_exec); + FAIL_AND_SKIP_REMAINING(cleanup, "PQexecPrepared failed: %s", PQerrorMessage(conn)); } PQclear(res_exec); verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1); - PQfinish(conn); - mysql_close(admin); +cleanup: + if (conn) PQfinish(conn); + if (admin) mysql_close(admin); return exit_status(); }