ffto: harden MySQL/PgSQL observer state handling and align tests/docs

Address outstanding review findings for FFTO on v3.0-ff_inspect and tighten
protocol-state correctness for both engines.

MySQL FFTO
- Restrict on_close() reporting to true in-flight states and always clear query
  tracking state after close.
- Add explicit active-query cleanup helpers and invoke them on state transitions
  to IDLE.
- Preserve accounting on mid-resultset server ERR packets by reporting current
  query in READING_COLUMNS/READING_ROWS before reset.
- Keep prepared-statement lifecycle cleanup robust (pending prepare cleared on
  prepare completion paths).

MySQL session integration
- Extract duplicated FAST_FORWARD client FFTO feed logic into
  observe_ffto_client_packet() and reuse it from all call sites.

PostgreSQL FFTO
- Replace regex-based CommandComplete parsing with lightweight token parsing,
  including NUL/whitespace trimming and strict numeric validation.
- Add queued tracking for pipelined extended-protocol executes so query text and
  response attribution stay aligned under Parse/Bind/Execute pipelining.
- Distinguish finalize semantics (execute-finalize on CommandComplete vs
  sync-finalize on ReadyForQuery) and centralize finalize/activation helpers.
- Add frontend Close ('C') handling to evict statement/portal mappings.
- Harden client/server message parsing with additional length checks.
- Extend affected-row command tag coverage to COPY and MERGE.

TAP tests
- Stabilize test plans for failure paths by replacing early returns with a
  fail-and-skip-remaining flow and shared cleanup labels.
- Ensure both MySQL and PgSQL FFTO tests preserve planned assertion counts under
  setup/prepare/execute failures.

Documentation
- Align FFT0 design doc state/response descriptions with current implementation
  (ReadyForQuery handling, pipelined queueing, supported PG command tags).
- Fix wording/typo issues in protocol section.

Validation performed
- make -C lib -j4
- make -C test/tap/tests test_ffto_mysql-t test_ffto_pgsql-t -j4

Runtime execution of the two TAP binaries remains environment-dependent (admin
endpoint connectivity required).
pull/5393/head
Rene Cannao 2 months ago
parent 51a0395076
commit 90462f6d5a

@ -27,14 +27,14 @@ Defined in `include/TrafficObserver.hpp`. This interface decouples protocol-spec
### 4.2. MySQL FFTO Implementation (`MySQLFFTO`)
Implements the MySQL wire protocol (version 10) state machine.
- **States**: `IDLE`, `WAITING_FOR_RESPONSE`, `READING_COLUMN_DEFS`, `READING_ROWS`, `SKIP_PREPARE_RESPONSE`.
- **States**: `IDLE`, `AWAITING_PREPARE_OK`, `AWAITING_RESPONSE`, `READING_COLUMNS`, `READING_ROWS`.
- **Prepared Statement Tracking**: Maintains a session-local map of `stmt_id` to query templates captured during the `PREPARE` phase.
### 4.3. PostgreSQL FFTO Implementation (`PgSQLFFTO`)
Handles the message-oriented PostgreSQL protocol.
- **Request Identification**: Detects `Query` ('Q'), `Parse` ('P'), `Bind` ('B'), and `Execute` ('E') messages.
- **Response Identification**: Tracks `CommandComplete` ('C') and `ErrorResponse` ('E').
- **Extended Query Tracking**: Tracks the association between Portals and Prepared Statements.
- **Response Identification**: Tracks `CommandComplete` ('C'), `ReadyForQuery` ('Z'), and `ErrorResponse` ('E').
- **Extended Query Tracking**: Tracks the association between Portals and Prepared Statements, and queues pipelined executes so responses are attributed to the correct query text.
## 5. Protocol and Security Details
- **Encryption**: FFTO operates on protocol packets that are already decrypted by ProxySQL's session handler. This allows ProxySQL to mix encrypted and unencrypted backend/frontend connections while maintaining consistent monitoring in FF mode.
@ -84,9 +84,10 @@ To verify that FFTO is capturing traffic in Fast Forward mode:
## 8. Protocol Support
- **Text and Binary Protocols**: FFTO supports both standard text-based queries and the binary protocol used by prepared statements.
- **MySQL Binary Protocol**: Corrects captures `COM_STMT_PREPARE` and `COM_STMT_EXECUTE`, tracking statement IDs to their respective SQL text.
- **MySQL Binary Protocol**: Correctly captures `COM_STMT_PREPARE` and `COM_STMT_EXECUTE`, tracking statement IDs to their respective SQL text.
- **PostgreSQL Extended Query**: Supports the multi-phase `Parse` -> `Bind` -> `Execute` sequence by tracking Statement and Portal mappings.
## 9. Limitations
- **Large Payloads**: Packets exceeding the `*-ffto_max_buffer_size` threshold cause FFTO to be bypassed for that session.
- **X-Protocol**: Currently optimized for classic MySQL and PostgreSQL protocols.
- **PostgreSQL Command Tags**: `sum_rows_affected`/`sum_rows_sent` are derived from `CommandComplete` tags and currently cover common commands (`INSERT`, `UPDATE`, `DELETE`, `COPY`, `MERGE`, `SELECT`, `FETCH`, `MOVE`).

@ -91,6 +91,9 @@ private:
*/
void process_server_packet(const unsigned char* data, size_t len);
bool is_in_flight_query_state() const;
void clear_active_query();
/**
* @brief Computes and records query metrics into the ProxySQL query digests.
* @param query The SQL query text.

@ -274,6 +274,7 @@ class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL
// GPFC_ functions are subfunctions of get_pkts_from_client()
int GPFC_Statuses2(bool&, PtrSize_t&);
void GPFC_DetectedMultiPacket_SetDDS();
void observe_ffto_client_packet(const PtrSize_t& pkt);
int GPFC_WaitingClientData_FastForwardSession(PtrSize_t&);
void GPFC_PreparedStatements(PtrSize_t&, unsigned char);
int GPFC_Replication_SwitchToFastForward(PtrSize_t&, unsigned char);

@ -2,6 +2,7 @@
#define PGSQL_FFTO_HPP
#include "TrafficObserver.hpp"
#include <deque>
#include <vector>
#include <string>
#include <unordered_map>
@ -69,6 +70,14 @@ private:
unsigned long long m_query_start_time; ///< Start timestamp of the current query in microseconds.
uint64_t m_affected_rows {0}; ///< Accumulated affected rows for the current query.
uint64_t m_rows_sent {0}; ///< Accumulated rows sent for the current query.
bool m_current_finalize_on_sync {false}; ///< Whether current query finalizes on ReadyForQuery ('Z').
struct PendingQuery {
std::string query;
unsigned long long start_time {0};
bool finalize_on_sync {false};
};
std::deque<PendingQuery> m_pending_queries; ///< Queued queries for pipelined traffic.
// Binary Protocol Tracking (PostgreSQL Extended Query)
std::unordered_map<std::string, std::string> m_statements; ///< Map of statement names to original SQL text.
@ -90,6 +99,11 @@ private:
*/
void process_server_message(char type, const unsigned char* payload, size_t len);
void track_query(std::string query, bool finalize_on_sync);
void clear_current_query();
void activate_next_query();
void finalize_current_query();
/**
* @brief Computes and records query metrics into the ProxySQL query digests.
* @param query The SQL query text.

@ -109,11 +109,24 @@ void MySQLFFTO::on_server_data(const char* buf, std::size_t len) {
}
void MySQLFFTO::on_close() {
if (m_state != IDLE && m_query_start_time != 0) {
if (is_in_flight_query_state() && m_query_start_time != 0) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent);
m_state = IDLE;
}
m_state = IDLE;
clear_active_query();
m_pending_prepare_query.clear();
}
bool MySQLFFTO::is_in_flight_query_state() const {
return m_state == AWAITING_RESPONSE || m_state == READING_COLUMNS || m_state == READING_ROWS;
}
void MySQLFFTO::clear_active_query() {
m_current_query.clear();
m_query_start_time = 0;
m_affected_rows = 0;
m_rows_sent = 0;
}
void MySQLFFTO::process_client_packet(const unsigned char* data, size_t len) {
@ -162,8 +175,10 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
uint32_t stmt_id; memcpy(&stmt_id, data + 1, 4);
m_statements[stmt_id] = m_pending_prepare_query;
m_state = IDLE;
m_pending_prepare_query.clear();
} else if (first_byte == 0xFF) {
m_state = IDLE;
m_pending_prepare_query.clear();
}
} else if (m_state == AWAITING_RESPONSE) {
if (first_byte == 0x00) { // OK
@ -172,12 +187,15 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, m_affected_rows, 0);
m_state = IDLE;
clear_active_query();
} else if (first_byte == 0xFF) { // ERR
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
m_state = IDLE;
clear_active_query();
} else if (first_byte == 0xFE && len < 9) { // EOF
m_state = IDLE;
clear_active_query();
} else { // Result Set started (first_byte is column count)
m_state = READING_COLUMNS;
}
@ -186,18 +204,28 @@ void MySQLFFTO::process_server_packet(const unsigned char* data, size_t len) {
m_state = READING_ROWS;
} else if (first_byte == 0xFE && len >= 9 && deprecate_eof) {
m_state = READING_ROWS;
} else if (first_byte == 0xFF) { // ERR while reading column metadata
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration);
m_state = IDLE;
clear_active_query();
}
} else if (m_state == READING_ROWS) {
if (first_byte == 0xFE && len < 9) { // EOF after rows
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, 0, m_rows_sent);
m_state = IDLE;
clear_active_query();
} else if (first_byte == 0xFE && len >= 9 && deprecate_eof) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, 0, m_rows_sent);
m_state = IDLE;
clear_active_query();
} else if (first_byte == 0xFF) { // ERR
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, 0, m_rows_sent);
m_state = IDLE;
clear_active_query();
} else {
m_rows_sent++;
}

@ -4747,19 +4747,7 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) {
}
break;
case FAST_FORWARD:
if (mysql_thread___ffto_enabled && !ffto_bypassed) {
if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) {
ffto_bypassed = true;
m_ffto.reset();
} else {
if (!m_ffto) {
m_ffto = std::make_unique<MySQLFFTO>(this);
}
if (m_ffto) {
m_ffto->on_client_data((const char*)pkt.ptr, pkt.size);
}
}
}
observe_ffto_client_packet(pkt);
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
/*
* Fast Forward Grace Close Logic:
@ -4806,19 +4794,7 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) {
// 'FAST_FORWARD' should be pushed to 'PSarrayOUT'.
case CONNECTING_SERVER:
if (previous_status.empty() == false && previous_status.top() == FAST_FORWARD) {
if (mysql_thread___ffto_enabled && !ffto_bypassed) {
if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) {
ffto_bypassed = true;
m_ffto.reset();
} else {
if (!m_ffto) {
m_ffto = std::make_unique<MySQLFFTO>(this);
}
if (m_ffto) {
m_ffto->on_client_data((const char*)pkt.ptr, pkt.size);
}
}
}
observe_ffto_client_packet(pkt);
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
break;
}
@ -4832,6 +4808,24 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) {
return handler_ret;
}
void MySQL_Session::observe_ffto_client_packet(const PtrSize_t& pkt) {
if (!pkt.ptr || pkt.size == 0) return;
if (!mysql_thread___ffto_enabled || ffto_bypassed) return;
if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) {
ffto_bypassed = true;
m_ffto.reset();
return;
}
if (!m_ffto) {
m_ffto = std::make_unique<MySQLFFTO>(this);
}
if (m_ffto) {
m_ffto->on_client_data((const char*)pkt.ptr, pkt.size);
}
}
void MySQL_Session::GPFC_DetectedMultiPacket_SetDDS() {
// this is handled only for real traffic, not mirror
switch (client_myds->DSS) { // real traffic only
@ -4864,19 +4858,7 @@ int MySQL_Session::GPFC_WaitingClientData_FastForwardSession(PtrSize_t& pkt) {
mybe=find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
if (mysql_thread___ffto_enabled && !ffto_bypassed) {
if (pkt.size > (size_t)mysql_thread___ffto_max_buffer_size) {
ffto_bypassed = true;
m_ffto.reset();
} else {
if (!m_ffto) {
m_ffto = std::make_unique<MySQLFFTO>(this);
}
if (m_ffto) {
m_ffto->on_client_data((const char*)pkt.ptr, pkt.size);
}
}
}
observe_ffto_client_packet(pkt);
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection

@ -11,8 +11,9 @@
#endif
#include "c_tokenizer.h"
#include <arpa/inet.h>
#include <cctype>
#include <cstdlib>
#include <cstring>
#include <regex>
extern class PgSQL_Query_Processor* GloPgQPro;
@ -20,8 +21,8 @@ extern class PgSQL_Query_Processor* GloPgQPro;
* @brief Parses the PostgreSQL CommandComplete ('C') message payload to extract row counts.
*
* PostgreSQL encodes row counts into the message tag string (e.g., "INSERT 0 10", "SELECT 50").
* This function uses regular expressions to extract these values and determine if the message
* corresponds to a result-generating command (SELECT, FETCH, MOVE) or a DML command.
* This function performs lightweight token parsing to extract these values and determine if
* the message corresponds to a result-generating command (SELECT, FETCH, MOVE) or a DML command.
*
* @param payload Pointer to the CommandComplete message payload (the tag string).
* @param len Length of the payload.
@ -29,20 +30,38 @@ extern class PgSQL_Query_Processor* GloPgQPro;
* @return The number of rows affected or sent.
*/
static uint64_t extract_pg_rows_affected(const unsigned char* payload, size_t len, bool& is_select) {
if (len == 0) return 0;
std::string command_tag(reinterpret_cast<const char*>(payload), len);
is_select = false;
static const std::regex re("(INSERT|UPDATE|DELETE|SELECT|MOVE|FETCH)\\b.*?\\b(\\d+)$");
std::smatch matches;
if (std::regex_search(command_tag, matches, re)) {
if (matches.size() == 3) {
std::string command_type = matches[1].str();
uint64_t rows = std::stoull(matches[2].str());
if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") is_select = true;
return rows;
}
if (len == 0) return 0;
size_t begin = 0;
while (begin < len && std::isspace(payload[begin])) begin++;
while (len > begin && (payload[len - 1] == '\0' || std::isspace(payload[len - 1]))) len--;
if (begin >= len) return 0;
std::string command_tag(reinterpret_cast<const char*>(payload + begin), len - begin);
size_t first_space = command_tag.find(' ');
if (first_space == std::string::npos) return 0;
std::string command_type = command_tag.substr(0, first_space);
if (command_type == "SELECT" || command_type == "FETCH" || command_type == "MOVE") {
is_select = true;
} else if (command_type != "INSERT" && command_type != "UPDATE" &&
command_type != "DELETE" && command_type != "COPY" &&
command_type != "MERGE") {
return 0;
}
size_t last_space = command_tag.rfind(' ');
if (last_space == std::string::npos || last_space + 1 >= command_tag.size()) return 0;
const char* rows_str = command_tag.c_str() + last_space + 1;
char* endptr = nullptr;
unsigned long long rows = std::strtoull(rows_str, &endptr, 10);
if (endptr == rows_str || *endptr != '\0') {
return 0;
}
return 0;
return rows;
}
PgSQLFFTO::PgSQLFFTO(PgSQL_Session* session)
@ -110,20 +129,69 @@ void PgSQLFFTO::on_server_data(const char* buf, std::size_t len) {
}
void PgSQLFFTO::on_close() {
if (m_state != IDLE && m_query_start_time != 0) {
if (m_state == AWAITING_RESPONSE && !m_current_query.empty() && m_query_start_time != 0) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent);
}
clear_current_query();
m_pending_queries.clear();
m_state = IDLE;
}
void PgSQLFFTO::track_query(std::string query, bool finalize_on_sync) {
if (query.empty()) return;
PendingQuery pending { std::move(query), monotonic_time(), finalize_on_sync };
if (m_state == IDLE || m_current_query.empty()) {
m_current_query = std::move(pending.query);
m_query_start_time = pending.start_time;
m_current_finalize_on_sync = pending.finalize_on_sync;
m_affected_rows = 0;
m_rows_sent = 0;
m_state = AWAITING_RESPONSE;
return;
}
m_pending_queries.emplace_back(std::move(pending));
}
void PgSQLFFTO::clear_current_query() {
m_current_query.clear();
m_query_start_time = 0;
m_affected_rows = 0;
m_rows_sent = 0;
m_current_finalize_on_sync = false;
}
void PgSQLFFTO::activate_next_query() {
if (m_pending_queries.empty()) {
clear_current_query();
m_state = IDLE;
return;
}
PendingQuery next_query = std::move(m_pending_queries.front());
m_pending_queries.pop_front();
m_current_query = std::move(next_query.query);
m_query_start_time = next_query.start_time;
m_current_finalize_on_sync = next_query.finalize_on_sync;
m_affected_rows = 0;
m_rows_sent = 0;
m_state = AWAITING_RESPONSE;
}
void PgSQLFFTO::finalize_current_query() {
if (!m_current_query.empty() && m_query_start_time != 0) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent);
}
activate_next_query();
}
void PgSQLFFTO::process_client_message(char type, const unsigned char* payload, size_t len) {
if (type == 'Q') {
size_t query_len = (len > 0 && payload[len - 1] == 0) ? len - 1 : len;
m_current_query = std::string(reinterpret_cast<const char*>(payload), query_len);
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
m_affected_rows = 0; m_rows_sent = 0;
track_query(std::string(reinterpret_cast<const char*>(payload), query_len), true);
} else if (type == 'P') {
const char* p = reinterpret_cast<const char*>(payload);
size_t name_len = strnlen(p, len);
@ -132,6 +200,7 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload,
const char* query_ptr = p + name_len + 1;
size_t rem = len - (name_len + 1);
size_t query_text_len = strnlen(query_ptr, rem);
if (query_text_len >= rem) return;
m_statements[stmt_name] = std::string(query_ptr, query_text_len);
} else if (type == 'B') {
const char* p = reinterpret_cast<const char*>(payload);
@ -146,25 +215,22 @@ void PgSQLFFTO::process_client_message(char type, const unsigned char* payload,
} else if (type == 'E') {
const char* p = reinterpret_cast<const char*>(payload);
size_t portal_len = strnlen(p, len);
std::string portal_name(p, std::min(portal_len, len));
if (portal_len >= len) return;
if (len < portal_len + 1 + 4) return; // portal name + '\0' + max-rows
std::string portal_name(p, portal_len);
auto pit = m_portals.find(portal_name);
if (pit != m_portals.end()) {
auto sit = m_statements.find(pit->second);
if (sit != m_statements.end()) {
if (m_state == AWAITING_RESPONSE) {
on_close(); // Finalize previous if any
}
m_current_query = sit->second;
m_query_start_time = monotonic_time();
m_state = AWAITING_RESPONSE;
m_affected_rows = 0; m_rows_sent = 0;
track_query(sit->second, false);
}
}
} else if (type == 'C') { // Frontend Close
if (len < 1) return;
if (len < 2) return;
char close_type = static_cast<char>(payload[0]);
const char* name_ptr = reinterpret_cast<const char*>(payload) + 1;
size_t name_len = strnlen(name_ptr, len - 1);
if (name_len >= len - 1) return;
std::string name(name_ptr, name_len);
if (close_type == 'S') m_statements.erase(name);
else if (close_type == 'P') m_portals.erase(name);
@ -180,14 +246,19 @@ void PgSQLFFTO::process_server_message(char type, const unsigned char* payload,
uint64_t rows = extract_pg_rows_affected(payload, len, is_select);
if (is_select) m_rows_sent += rows;
else m_affected_rows += rows;
} else if (type == 'Z' || type == 'E') {
// ReadyForQuery or ErrorResponse
if (m_state == AWAITING_RESPONSE) {
if (!m_current_finalize_on_sync) {
finalize_current_query();
}
} else if (type == 'Z') {
finalize_current_query();
} else if (type == 'E') {
if (!m_current_query.empty() && m_query_start_time != 0) {
unsigned long long duration = monotonic_time() - m_query_start_time;
report_query_stats(m_current_query, duration, m_affected_rows, m_rows_sent);
}
clear_current_query();
m_pending_queries.clear();
m_state = IDLE;
m_affected_rows = 0; m_rows_sent = 0;
}
}

@ -8,19 +8,32 @@
#include "command_line.h"
#include "utils.h"
static constexpr int kPlannedTests = 22;
#define FAIL_AND_SKIP_REMAINING(cleanup_label, fmt, ...) \
do { \
diag(fmt, ##__VA_ARGS__); \
int remaining = kPlannedTests - tests_last(); \
if (remaining > 0) { \
skip(remaining, "Skipping remaining assertions after setup failure"); \
} \
goto cleanup_label; \
} while (0)
#define EXEC_QUERY(conn, q) \
if (mysql_query(conn, q)) { \
diag("Query failed: %s", mysql_error(conn)); \
ok(0, "Query failed: %s", q); \
return -1; \
} else { \
do { \
if (mysql_query(conn, q)) { \
ok(0, "Query failed: %s", q); \
FAIL_AND_SKIP_REMAINING(cleanup, "Query failed: %s", mysql_error(conn)); \
} \
MYSQL_RES* dummy_res = mysql_store_result(conn); \
if (dummy_res) mysql_free_result(dummy_res); \
else if (mysql_field_count(conn) > 0) { \
diag("Error storing result: %s", mysql_error(conn)); \
return -1; \
if (dummy_res) { \
mysql_free_result(dummy_res); \
} else if (mysql_field_count(conn) > 0) { \
ok(0, "Failed to store result for query: %s", q); \
FAIL_AND_SKIP_REMAINING(cleanup, "Error storing result: %s", mysql_error(conn)); \
} \
}
} while (0)
void verify_digest(MYSQL* admin, const char* template_text, int expected_count, uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) {
char query[1024];
@ -67,9 +80,17 @@ int main(int argc, char** argv) {
return -1;
}
plan(1 + (6*3) + (1*3)); // 1 + 18 + 3 = 22
plan(kPlannedTests); // 1 + 18 + 3 = 22
MYSQL* admin = mysql_init(NULL);
MYSQL* conn = NULL;
MYSQL_STMT* stmt = NULL;
char server_query[1024];
const char* ins_query = "INSERT INTO ffto_test (id, val) VALUES (?, ?)";
MYSQL_BIND bind[2];
int int_data = 10;
char str_data[20] = "binary_val";
unsigned long str_len = strlen(str_data);
if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
diag("Admin connection failed");
return -1;
@ -85,7 +106,6 @@ int main(int argc, char** argv) {
MYSQL_QUERY(admin, "LOAD MYSQL USERS TO RUNTIME");
// Ensure backend server exists
char server_query[1024];
snprintf(server_query, sizeof(server_query), "INSERT OR REPLACE INTO mysql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.mysql_host, cl.mysql_port);
MYSQL_QUERY(admin, server_query);
MYSQL_QUERY(admin, "LOAD MYSQL SERVERS TO RUNTIME");
@ -93,7 +113,7 @@ int main(int argc, char** argv) {
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest"); // Reset stats
// USE ROOT FOR CLIENT CONNECTION
MYSQL* conn = mysql_init(NULL);
conn = mysql_init(NULL);
if (!mysql_real_connect(conn, cl.host, "root", "root", NULL, cl.port, NULL, 0)) {
diag("Client connection failed: %s", mysql_error(conn));
return -1;
@ -123,19 +143,12 @@ int main(int argc, char** argv) {
// --- Part 2: Binary Protocol (Prepared Statements) ---
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest");
MYSQL_STMT *stmt = mysql_stmt_init(conn);
const char* ins_query = "INSERT INTO ffto_test (id, val) VALUES (?, ?)";
stmt = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt, ins_query, strlen(ins_query))) {
diag("mysql_stmt_prepare failed: %s", mysql_stmt_error(stmt));
ok(0, "mysql_stmt_prepare failed");
return -1;
FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_prepare failed: %s", mysql_stmt_error(stmt));
}
MYSQL_BIND bind[2];
int int_data = 10;
char str_data[20] = "binary_val";
unsigned long str_len = strlen(str_data);
memset(bind, 0, sizeof(bind));
bind[0].buffer_type = MYSQL_TYPE_LONG;
bind[0].buffer = (char *)&int_data;
@ -145,26 +158,26 @@ int main(int argc, char** argv) {
bind[1].length = &str_len;
if (mysql_stmt_bind_param(stmt, bind)) {
diag("mysql_stmt_bind_param failed: %s", mysql_stmt_error(stmt));
return -1;
ok(0, "mysql_stmt_bind_param failed");
FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_bind_param failed: %s", mysql_stmt_error(stmt));
}
if (mysql_stmt_execute(stmt)) {
diag("mysql_stmt_execute (1) failed: %s", mysql_stmt_error(stmt));
return -1;
ok(0, "mysql_stmt_execute (1) failed");
FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_execute (1) failed: %s", mysql_stmt_error(stmt));
}
int_data = 11; // Change ID for second insert
if (mysql_stmt_execute(stmt)) {
diag("mysql_stmt_execute (2) failed: %s", mysql_stmt_error(stmt));
return -1;
ok(0, "mysql_stmt_execute (2) failed");
FAIL_AND_SKIP_REMAINING(cleanup, "mysql_stmt_execute (2) failed: %s", mysql_stmt_error(stmt));
}
// Verify Binary Stats
verify_digest(admin, "INSERT INTO ffto_test (id,val) VALUES (?,?)", 2, 2, 0);
mysql_stmt_close(stmt);
mysql_close(conn);
mysql_close(admin);
cleanup:
if (stmt) mysql_stmt_close(stmt);
if (conn) mysql_close(conn);
if (admin) mysql_close(admin);
return exit_status();
}

@ -12,14 +12,29 @@
CommandLine cl;
static constexpr int kPlannedTests = 22;
#define FAIL_AND_SKIP_REMAINING(cleanup_label, fmt, ...) \
do { \
diag(fmt, ##__VA_ARGS__); \
int remaining = kPlannedTests - tests_last(); \
if (remaining > 0) { \
skip(remaining, "Skipping remaining assertions after setup failure"); \
} \
goto cleanup_label; \
} while (0)
#define EXEC_PG_QUERY(conn, q) \
{ \
PGresult* res_exec = PQexec(conn, q); \
if (!res_exec) { \
ok(0, "PG Query failed: %s", q); \
FAIL_AND_SKIP_REMAINING(cleanup, "PG Query returned no result: %s", PQerrorMessage(conn)); \
} \
if (PQresultStatus(res_exec) != PGRES_COMMAND_OK && PQresultStatus(res_exec) != PGRES_TUPLES_OK) { \
diag("PG Query failed: %s", PQerrorMessage(conn)); \
ok(0, "PG Query failed: %s", q); \
PQclear(res_exec); \
return -1; \
FAIL_AND_SKIP_REMAINING(cleanup, "PG Query failed: %s", PQerrorMessage(conn)); \
} \
PQclear(res_exec); \
}
@ -66,9 +81,16 @@ int main(int argc, char** argv) {
return -1;
}
plan(1 + (6*3) + (1*3)); // 1 (connect) + 18 (simple) + 3 (extended) = 22
plan(kPlannedTests); // 1 (connect) + 18 (simple) + 3 (extended) = 22
MYSQL* admin = mysql_init(NULL);
PGconn* conn = NULL;
char server_query[1024];
char conninfo[1024];
const char* ext_query = "SELECT data FROM ffto_pg_test WHERE id = $1";
PGresult* res_prep = NULL;
const char* paramValues[1] = {"1"};
PGresult* res_exec = NULL;
if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
diag("Admin connection failed");
return -1;
@ -84,7 +106,6 @@ int main(int argc, char** argv) {
MYSQL_QUERY(admin, "LOAD PGSQL USERS TO RUNTIME");
// Ensure backend server exists
char server_query[1024];
snprintf(server_query, sizeof(server_query), "INSERT OR REPLACE INTO pgsql_servers (hostgroup_id, hostname, port) VALUES (0, '%s', %d)", cl.pgsql_server_host, cl.pgsql_server_port);
MYSQL_QUERY(admin, server_query);
MYSQL_QUERY(admin, "LOAD PGSQL SERVERS TO RUNTIME");
@ -92,11 +113,10 @@ int main(int argc, char** argv) {
MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest");
// Standard libpq connection using root (postgres)
char conninfo[1024];
snprintf(conninfo, sizeof(conninfo), "host=%s port=%d user=%s password=%s dbname=postgres sslmode=disable",
cl.pgsql_host, cl.pgsql_port, cl.pgsql_root_username, cl.pgsql_root_password);
PGconn* conn = PQconnectdb(conninfo);
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
diag("PG Connection failed: %s", PQerrorMessage(conn));
return -1;
@ -121,26 +141,35 @@ int main(int argc, char** argv) {
// --- Part 2: Extended Query Protocol ---
MYSQL_QUERY(admin, "DELETE FROM stats_pgsql_query_digest");
const char* ext_query = "SELECT data FROM ffto_pg_test WHERE id = $1";
PGresult* res_prep = PQprepare(conn, "stmt1", ext_query, 1, NULL);
res_prep = PQprepare(conn, "stmt1", ext_query, 1, NULL);
if (!res_prep) {
ok(0, "PQprepare failed");
FAIL_AND_SKIP_REMAINING(cleanup, "PQprepare returned no result: %s", PQerrorMessage(conn));
}
if (PQresultStatus(res_prep) != PGRES_COMMAND_OK) {
diag("PQprepare failed: %s", PQerrorMessage(conn));
return -1;
ok(0, "PQprepare failed");
PQclear(res_prep);
FAIL_AND_SKIP_REMAINING(cleanup, "PQprepare failed: %s", PQerrorMessage(conn));
}
PQclear(res_prep);
const char* paramValues[1] = {"1"};
PGresult* res_exec = PQexecPrepared(conn, "stmt1", 1, paramValues, NULL, NULL, 0);
res_exec = PQexecPrepared(conn, "stmt1", 1, paramValues, NULL, NULL, 0);
if (!res_exec) {
ok(0, "PQexecPrepared failed");
FAIL_AND_SKIP_REMAINING(cleanup, "PQexecPrepared returned no result: %s", PQerrorMessage(conn));
}
if (PQresultStatus(res_exec) != PGRES_TUPLES_OK) {
diag("PQexecPrepared failed: %s", PQerrorMessage(conn));
return -1;
ok(0, "PQexecPrepared failed");
PQclear(res_exec);
FAIL_AND_SKIP_REMAINING(cleanup, "PQexecPrepared failed: %s", PQerrorMessage(conn));
}
PQclear(res_exec);
verify_pg_digest(admin, "SELECT data FROM ffto_pg_test WHERE id = $1", 1, 0, 1);
PQfinish(conn);
mysql_close(admin);
cleanup:
if (conn) PQfinish(conn);
if (admin) mysql_close(admin);
return exit_status();
}

Loading…
Cancel
Save