FFTO E2E TAP test: mixed text and binary protocol (#5516)

Add test_ffto_mysql_mixed_protocol-t.cpp covering FFTO behavior when
alternating between text protocol (COM_QUERY) and binary protocol
(COM_STMT_PREPARE/EXECUTE/CLOSE) within the same fast-forward session.
Part of the comprehensive E2E test suite for issue #5516.

Test scenarios (22 TAP assertions):
1. Text SELECT followed by prepared SELECT on same table — verifies
   both produce digest entries and aggregate correctly when the
   normalized SQL text matches.
2. Interleaved text and binary operations (text INSERT → prepared
   INSERT → text UPDATE → prepared SELECT) — validates clean FFTO
   state machine transitions between COM_QUERY and COM_STMT_EXECUTE
   command types.
3. Prepare → execute → close → re-prepare same SQL → execute —
   exercises the COM_STMT_CLOSE path which erases the stmt_id from
   FFTO's m_statements map, followed by a fresh COM_STMT_PREPARE
   that gets a new stmt_id. Verifies both executions share one
   digest with count_star >= 2.
4. Two prepared statements active simultaneously (INSERT + SELECT)
   with interleaved execution — validates that FFTO correctly
   maintains separate m_statements map entries per stmt_id and
   attributes queries to the correct digest.

Includes helper functions exec_prepared_insert() and
exec_prepared_select() to reduce binary protocol boilerplate.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
v3.0-5516
Rene Cannao 2 months ago
parent ac926629f7
commit cf30e02c30

@ -0,0 +1,473 @@
/**
* @file test_ffto_mysql_mixed_protocol-t.cpp
* @brief FFTO E2E TAP test mixed text and binary protocol within
* the same session.
*
* Validates that the Fast Forward Traffic Observer (FFTO) correctly
* handles alternating text protocol (COM_QUERY) and binary protocol
* (COM_STMT_PREPARE / COM_STMT_EXECUTE / COM_STMT_CLOSE) commands
* within a single fast-forward session.
*
* FFTO maintains a state machine that must cleanly transition between
* text queries (which go directly to AWAITING_RESPONSE) and prepared
* statement commands (which involve the m_statements map for stmt_id
* tracking). These tests exercise the transitions.
*
* @par Test scenarios
* 1. Text SELECT followed by prepared SELECT separate digests
* 2. Interleaved text and binary (INSERT, UPDATE, SELECT mix)
* 3. Prepare execute close re-prepare execute same SQL
* 4. Two prepared statements active simultaneously, interleaved
*
* @pre ProxySQL running with a MySQL backend, reachable via the standard
* TAP environment variables.
*
* @see MySQLFFTO.cpp process_client_packet() handles COM_QUERY,
* COM_STMT_PREPARE, COM_STMT_EXECUTE, COM_STMT_CLOSE
*/
#include <string>
#include <stdio.h>
#include <cstring>
#include <unistd.h>
#include <vector>
#include <cstdint>
#include "mysql.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
/**
* @brief Total number of planned TAP assertions.
*
* Breakdown:
* - Setup: 1 (connect)
* - Scenario 1: 2 × 3 = 6 (text SELECT + prepared SELECT digests)
* - Scenario 2: 2 × 3 = 6 (text INSERT + prepared INSERT digests)
* - Scenario 3: 1 × 3 = 3 (re-prepared INSERT digest with count >= 2)
* - Scenario 4: 2 × 3 = 6 (two concurrent prepared stmts)
* Total = 1 + 6 + 6 + 3 + 6 = 22
*/
static constexpr int kPlannedTests = 22;
/** @copydoc FAIL_AND_SKIP_REMAINING */
#define FAIL_AND_SKIP_REMAINING(cleanup_label, fmt, ...) \
do { \
diag(fmt, ##__VA_ARGS__); \
int remaining = kPlannedTests - tests_last(); \
if (remaining > 0) { \
skip(remaining, "Skipping remaining assertions after setup failure"); \
} \
goto cleanup_label; \
} while (0)
/** @copydoc EXEC_QUERY */
#define EXEC_QUERY(conn, q) \
do { \
if (mysql_query(conn, q)) { \
ok(0, "Query failed: %s", q); \
FAIL_AND_SKIP_REMAINING(cleanup, "Query failed: %s", mysql_error(conn)); \
} \
MYSQL_RES* dummy_res = mysql_store_result(conn); \
if (dummy_res) { \
mysql_free_result(dummy_res); \
} else if (mysql_field_count(conn) > 0) { \
ok(0, "Failed to store result for query: %s", q); \
FAIL_AND_SKIP_REMAINING(cleanup, "Error storing result: %s", mysql_error(conn)); \
} \
} while (0)
/**
* @brief Verify that a query digest exists with expected counters.
* @copydetails verify_digest
*/
void verify_digest(MYSQL* admin, const char* template_text, int expected_count,
uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) {
char query[1024];
snprintf(query, sizeof(query),
"SELECT count_star, sum_rows_affected, sum_rows_sent, digest_text "
"FROM stats_mysql_query_digest WHERE digest_text LIKE '%%%s%%'",
template_text);
MYSQL_RES* res = NULL;
MYSQL_ROW row = NULL;
for (int attempt = 0; attempt < 20; attempt++) {
int rc = run_q(admin, query);
if (rc != 0) { usleep(100000); continue; }
res = mysql_store_result(admin);
row = mysql_fetch_row(res);
if (row) break;
mysql_free_result(res);
res = NULL;
usleep(100000);
}
if (row) {
int count = atoi(row[0]);
uint64_t rows_affected = strtoull(row[1], NULL, 10);
uint64_t rows_sent = strtoull(row[2], NULL, 10);
ok(count >= expected_count,
"Digest count for '%s': %d (expected >= %d)", row[3], count, expected_count);
ok(rows_affected == expected_rows_affected,
"rows_affected for '%s': %llu (expected %llu)",
row[3], (unsigned long long)rows_affected, (unsigned long long)expected_rows_affected);
ok(rows_sent == expected_rows_sent,
"rows_sent for '%s': %llu (expected %llu)",
row[3], (unsigned long long)rows_sent, (unsigned long long)expected_rows_sent);
} else {
ok(0, "Digest NOT found for pattern: %s", template_text);
ok(0, "Skipping rows_affected check (digest not found)");
ok(0, "Skipping rows_sent check (digest not found)");
diag("Dumping stats_mysql_query_digest for debugging:");
run_q(admin, "SELECT digest_text, count_star FROM stats_mysql_query_digest");
MYSQL_RES* dump_res = mysql_store_result(admin);
MYSQL_ROW dump_row;
while (dump_res && (dump_row = mysql_fetch_row(dump_res))) {
diag(" digest: %s count: %s", dump_row[0], dump_row[1]);
}
if (dump_res) mysql_free_result(dump_res);
}
if (res) mysql_free_result(res);
}
/**
* @brief Execute a prepared INSERT with integer id and string value.
*
* Helper to reduce boilerplate for binary-protocol INSERT operations.
*
* @param stmt Prepared statement handle (already prepared).
* @param id Integer primary key value.
* @param val String value for the 'val' column.
* @return 0 on success, non-zero on failure.
*/
static int exec_prepared_insert(MYSQL_STMT* stmt, int id, const char* val) {
MYSQL_BIND bind[2];
unsigned long val_len = strlen(val);
memset(bind, 0, sizeof(bind));
bind[0].buffer_type = MYSQL_TYPE_LONG;
bind[0].buffer = (char*)&id;
bind[1].buffer_type = MYSQL_TYPE_STRING;
bind[1].buffer = (char*)val;
bind[1].buffer_length = val_len + 1;
bind[1].length = &val_len;
if (mysql_stmt_bind_param(stmt, bind)) return -1;
if (mysql_stmt_execute(stmt)) return -1;
return 0;
}
/**
* @brief Execute a prepared SELECT with integer parameter, consume result.
*
* @param stmt Prepared statement handle (already prepared for a SELECT).
* @param id Integer parameter value.
* @return Number of rows fetched, or -1 on error.
*/
static int exec_prepared_select(MYSQL_STMT* stmt, int id) {
MYSQL_BIND bind_param[1];
memset(bind_param, 0, sizeof(bind_param));
bind_param[0].buffer_type = MYSQL_TYPE_LONG;
bind_param[0].buffer = (char*)&id;
if (mysql_stmt_bind_param(stmt, bind_param)) return -1;
if (mysql_stmt_execute(stmt)) return -1;
/* Bind result and fetch */
MYSQL_BIND bind_result[1];
char result_val[256];
unsigned long result_len = 0;
memset(bind_result, 0, sizeof(bind_result));
bind_result[0].buffer_type = MYSQL_TYPE_STRING;
bind_result[0].buffer = result_val;
bind_result[0].buffer_length = sizeof(result_val);
bind_result[0].length = &result_len;
if (mysql_stmt_bind_result(stmt, bind_result)) return -1;
if (mysql_stmt_store_result(stmt)) return -1;
int rows = 0;
while (mysql_stmt_fetch(stmt) == 0) rows++;
mysql_stmt_free_result(stmt);
return rows;
}
int main(int argc, char** argv) {
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to get the required environmental variables.");
return -1;
}
diag("=== FFTO MySQL Mixed Protocol Test ===");
diag("Validates FFTO state machine transitions when alternating");
diag("between text protocol (COM_QUERY) and binary protocol");
diag("(COM_STMT_PREPARE/EXECUTE/CLOSE) within the same session.");
diag("======================================");
plan(kPlannedTests);
MYSQL* admin = mysql_init(NULL);
MYSQL* conn = NULL;
MYSQL_STMT* stmt1 = NULL;
MYSQL_STMT* stmt2 = NULL;
if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password,
NULL, cl.admin_port, NULL, 0)) {
diag("Admin connection failed: %s", mysql_error(admin));
return -1;
}
/* ── FFTO Configuration ─────────────────────────────────────────── */
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='true' "
"WHERE variable_name='mysql-ffto_enabled'");
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='1048576' "
"WHERE variable_name='mysql-ffto_max_buffer_size'");
MYSQL_QUERY(admin, "LOAD MYSQL VARIABLES TO RUNTIME");
{
char uq[1024];
snprintf(uq, sizeof(uq),
"INSERT OR REPLACE INTO mysql_users "
"(username, password, default_hostgroup, fast_forward) "
"VALUES ('%s', '%s', 0, 1)",
cl.root_username, cl.root_password);
MYSQL_QUERY(admin, uq);
MYSQL_QUERY(admin, "LOAD MYSQL USERS TO RUNTIME");
}
{
char sq[1024];
snprintf(sq, sizeof(sq),
"INSERT OR REPLACE INTO mysql_servers "
"(hostgroup_id, hostname, port) VALUES (0, '%s', %d)",
cl.mysql_host, cl.mysql_port);
MYSQL_QUERY(admin, sq);
MYSQL_QUERY(admin, "LOAD MYSQL SERVERS TO RUNTIME");
}
/* ── Client Connection ──────────────────────────────────────────── */
conn = mysql_init(NULL);
if (!mysql_real_connect(conn, cl.host, cl.root_username, cl.root_password,
NULL, cl.port, NULL, 0)) {
diag("Client connection failed: %s", mysql_error(conn));
return -1;
}
ok(conn != NULL, "Connected to ProxySQL in fast-forward mode");
EXEC_QUERY(conn, "CREATE DATABASE IF NOT EXISTS ffto_db");
EXEC_QUERY(conn, "USE ffto_db");
EXEC_QUERY(conn, "DROP TABLE IF EXISTS ffto_mixed");
EXEC_QUERY(conn, "CREATE TABLE ffto_mixed ("
"id INT PRIMARY KEY, "
"val VARCHAR(255))");
/* Seed initial data */
EXEC_QUERY(conn, "INSERT INTO ffto_mixed VALUES (1, 'initial_1'), (2, 'initial_2')");
/* ================================================================
* Scenario 1: Text SELECT followed by prepared SELECT
*
* Both use the same table and similar SQL, but text protocol uses
* COM_QUERY while prepared uses COM_STMT_EXECUTE. FFTO should
* produce two separate digest entries because the SQL text differs
* (literal value vs placeholder).
* ================================================================ */
diag("--- Scenario 1: text SELECT then prepared SELECT ---");
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest");
/* Text protocol SELECT */
if (mysql_query(conn, "SELECT val FROM ffto_mixed WHERE id = 1")) {
diag("Text SELECT failed: %s", mysql_error(conn));
FAIL_AND_SKIP_REMAINING(cleanup, "Text SELECT failed");
}
{
MYSQL_RES* rs = mysql_store_result(conn);
if (rs) mysql_free_result(rs);
}
/* Binary protocol SELECT */
{
const char* ps_sel = "SELECT val FROM ffto_mixed WHERE id = ?";
stmt1 = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt1, ps_sel, strlen(ps_sel))) {
diag("Prepare SELECT failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepare SELECT failed");
}
int rows = exec_prepared_select(stmt1, 2);
diag("Prepared SELECT returned %d rows", rows);
mysql_stmt_close(stmt1);
stmt1 = NULL;
}
/*
* Both queries normalize to the same digest text:
* "SELECT val FROM ffto_mixed WHERE id = ?"
* So they aggregate into one digest entry with count_star >= 2.
*/
verify_digest(admin, "SELECT val FROM ffto_mixed WHERE id", 2, 0, 2);
/*
* Verify the text INSERT we did during setup is also tracked.
* This confirms the state machine handled the COM_QUERY COM_STMT_PREPARE
* transition cleanly.
*/
verify_digest(admin, "INSERT INTO ffto_mixed VALUES", 1, 2, 0);
/* ================================================================
* Scenario 2: Interleaved text and binary protocol
*
* Pattern: text INSERT prepared INSERT text UPDATE prepared SELECT
* Tests that the FFTO state machine correctly resets between
* different command types.
* ================================================================ */
diag("--- Scenario 2: interleaved text and binary ---");
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest");
/* Text INSERT */
EXEC_QUERY(conn, "INSERT INTO ffto_mixed VALUES (10, 'text_ins')");
/* Prepared INSERT */
{
const char* ps_ins = "INSERT INTO ffto_mixed (id, val) VALUES (?, ?)";
stmt1 = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt1, ps_ins, strlen(ps_ins))) {
diag("Prepare INSERT failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepare INSERT failed");
}
if (exec_prepared_insert(stmt1, 11, "binary_ins")) {
diag("Prepared INSERT exec failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepared INSERT exec failed");
}
mysql_stmt_close(stmt1);
stmt1 = NULL;
}
/* Text UPDATE */
EXEC_QUERY(conn, "UPDATE ffto_mixed SET val = 'updated' WHERE id = 10");
/* Prepared SELECT */
{
const char* ps_sel = "SELECT val FROM ffto_mixed WHERE id = ?";
stmt1 = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt1, ps_sel, strlen(ps_sel))) {
diag("Prepare SELECT failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepare SELECT failed");
}
exec_prepared_select(stmt1, 11);
mysql_stmt_close(stmt1);
stmt1 = NULL;
}
/* Verify text INSERT and prepared INSERT are tracked */
verify_digest(admin, "INSERT INTO ffto_mixed", 2, 2, 0);
/* Verify UPDATE tracked */
verify_digest(admin, "UPDATE ffto_mixed SET val", 1, 1, 0);
/* ================================================================
* Scenario 3: Prepare execute close re-prepare execute
*
* Tests the COM_STMT_CLOSE path which erases the stmt_id from
* FFTO's m_statements map, followed by a fresh COM_STMT_PREPARE
* that gets a new stmt_id for the same SQL.
* ================================================================ */
diag("--- Scenario 3: prepare/close/re-prepare cycle ---");
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest");
/* First cycle: prepare, execute, close */
{
const char* ps_ins = "INSERT INTO ffto_mixed (id, val) VALUES (?, ?)";
stmt1 = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt1, ps_ins, strlen(ps_ins))) {
diag("Prepare (1st) failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepare (1st) failed");
}
if (exec_prepared_insert(stmt1, 20, "cycle_1")) {
diag("Execute (1st) failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Execute (1st) failed");
}
mysql_stmt_close(stmt1);
stmt1 = NULL;
}
/* Second cycle: re-prepare same SQL, execute again */
{
const char* ps_ins = "INSERT INTO ffto_mixed (id, val) VALUES (?, ?)";
stmt1 = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt1, ps_ins, strlen(ps_ins))) {
diag("Prepare (2nd) failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepare (2nd) failed");
}
if (exec_prepared_insert(stmt1, 21, "cycle_2")) {
diag("Execute (2nd) failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Execute (2nd) failed");
}
mysql_stmt_close(stmt1);
stmt1 = NULL;
}
/*
* Both executions share the same digest because the SQL text is
* identical. count_star should be >= 2, rows_affected should be 2.
*/
verify_digest(admin, "INSERT INTO ffto_mixed (id,val) VALUES (?,?)", 2, 2, 0);
/* ================================================================
* Scenario 4: Two prepared statements active simultaneously
*
* Prepares stmt1 (INSERT) and stmt2 (SELECT), then interleaves
* their execution. FFTO must maintain separate m_statements map
* entries for each stmt_id and correctly attribute queries.
* ================================================================ */
diag("--- Scenario 4: two concurrent prepared statements ---");
MYSQL_QUERY(admin, "DELETE FROM stats_mysql_query_digest");
{
const char* ps_ins = "INSERT INTO ffto_mixed (id, val) VALUES (?, ?)";
const char* ps_sel = "SELECT val FROM ffto_mixed WHERE id = ?";
stmt1 = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt1, ps_ins, strlen(ps_ins))) {
diag("Prepare stmt1 failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepare stmt1 failed");
}
stmt2 = mysql_stmt_init(conn);
if (mysql_stmt_prepare(stmt2, ps_sel, strlen(ps_sel))) {
diag("Prepare stmt2 failed: %s", mysql_stmt_error(stmt2));
FAIL_AND_SKIP_REMAINING(cleanup, "Prepare stmt2 failed");
}
/* Interleave: INSERT → SELECT → INSERT */
if (exec_prepared_insert(stmt1, 30, "concurrent_1")) {
diag("Concurrent INSERT 1 failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Concurrent INSERT 1 failed");
}
int rows = exec_prepared_select(stmt2, 30);
diag("Concurrent SELECT returned %d rows", rows);
if (exec_prepared_insert(stmt1, 31, "concurrent_2")) {
diag("Concurrent INSERT 2 failed: %s", mysql_stmt_error(stmt1));
FAIL_AND_SKIP_REMAINING(cleanup, "Concurrent INSERT 2 failed");
}
mysql_stmt_close(stmt1);
stmt1 = NULL;
mysql_stmt_close(stmt2);
stmt2 = NULL;
}
verify_digest(admin, "INSERT INTO ffto_mixed (id,val) VALUES (?,?)", 2, 2, 0);
verify_digest(admin, "SELECT val FROM ffto_mixed WHERE id", 1, 0, 1);
cleanup:
if (stmt1) mysql_stmt_close(stmt1);
if (stmt2) mysql_stmt_close(stmt2);
if (conn) mysql_close(conn);
if (admin) mysql_close(admin);
return exit_status();
}
Loading…
Cancel
Save