You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
proxysql/test/tap/tests/test_ffto_pgsql_pipeline-t.cpp

239 lines
9.7 KiB

/**
* @file test_ffto_pgsql_pipeline-t.cpp
* @brief FFTO E2E TAP test — PostgreSQL extended query pipelining.
*
* Validates that PgSQLFFTO correctly tracks queries sent in a pipelined
* fashion via the PostgreSQL extended query protocol. Pipelining means
* multiple Parse+Bind+Execute sequences are sent before a single Sync,
* and the server responds with multiple CommandComplete messages followed
* by a single ReadyForQuery.
*
* Uses pg_lite_client for raw protocol access — libpq doesn't expose
* true pipelining (it sends Sync after each PQexecPrepared).
*
* @par Test scenarios
* 1. 3 different queries pipelined before Sync → 3 separate digests
* 2. Same prepared statement executed 10 times in pipeline → count_star=10
*
* @pre ProxySQL running with a PostgreSQL backend.
*
* @see PgSQLFFTO.cpp — m_pending_queries deque handles pipelined queries
* @see pg_lite_client.h — PgConnection for raw protocol access
*/
#include <string>
#include <stdio.h>
#include <cstring>
#include <unistd.h>
#include <vector>
#include <cstdint>
/* pg_lite_client.h must come before mysql.h to avoid PROTOCOL_VERSION macro conflict */
#include "pg_lite_client.h"
#include "mysql.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
/**
* @brief Total number of planned TAP assertions.
*
* Breakdown:
* - Setup: 1 (connect)
* - Scenario 1 (3 queries): 3 x 3 = 9 (3 verify_pg_digest calls)
* - Scenario 2 (10x exec): 1 x 3 = 3 (1 verify_pg_digest call)
* Total = 13
*/
static constexpr int kPlannedTests = 13;
#define FAIL_AND_SKIP_REMAINING(cleanup_label, fmt, ...) \
do { \
diag(fmt, ##__VA_ARGS__); \
int remaining = kPlannedTests - tests_last(); \
if (remaining > 0) { \
skip(remaining, "Skipping remaining assertions after setup failure"); \
} \
goto cleanup_label; \
} while (0)
void verify_pg_digest(MYSQL* admin, const char* template_text, int expected_count,
uint64_t expected_rows_affected = 0, uint64_t expected_rows_sent = 0) {
char query[1024];
snprintf(query, sizeof(query),
"SELECT count_star, sum_rows_affected, sum_rows_sent, digest_text "
"FROM stats_pgsql_query_digest WHERE digest_text LIKE '%%%s%%'",
template_text);
MYSQL_RES* res = NULL;
MYSQL_ROW row = NULL;
for (int attempt = 0; attempt < 20; attempt++) {
int rc = run_q(admin, query);
if (rc != 0) { usleep(100000); continue; }
res = mysql_store_result(admin);
if (!res) { usleep(100000); continue; }
row = mysql_fetch_row(res);
if (row) break;
mysql_free_result(res); res = NULL;
usleep(100000);
}
if (row) {
int count = atoi(row[0]);
uint64_t ra = strtoull(row[1], NULL, 10);
uint64_t rs_val = strtoull(row[2], NULL, 10);
ok(count >= expected_count, "PG digest '%s': count=%d (>= %d)", row[3], count, expected_count);
ok(ra == expected_rows_affected, "PG rows_affected '%s': %llu (== %llu)",
row[3], (unsigned long long)ra, (unsigned long long)expected_rows_affected);
ok(rs_val == expected_rows_sent, "PG rows_sent '%s': %llu (== %llu)",
row[3], (unsigned long long)rs_val, (unsigned long long)expected_rows_sent);
} else {
ok(0, "PG digest NOT found: %s", template_text);
ok(0, "Skipping rows_affected"); ok(0, "Skipping rows_sent");
diag("Dumping stats_pgsql_query_digest:");
run_q(admin, "SELECT digest_text, count_star FROM stats_pgsql_query_digest");
MYSQL_RES* dr = mysql_store_result(admin);
MYSQL_ROW drw;
while (dr && (drw = mysql_fetch_row(dr))) diag(" %s count:%s", drw[0], drw[1]);
if (dr) mysql_free_result(dr);
}
if (res) mysql_free_result(res);
}
static void clear_pg_stats(MYSQL* admin) {
mysql_query(admin, "SELECT * FROM stats_pgsql_query_digest_reset");
MYSQL_RES* r = mysql_store_result(admin);
if (r) mysql_free_result(r);
}
int main(int argc, char** argv) {
CommandLine cl;
if (cl.getEnv()) { diag("Failed to get env vars."); return -1; }
diag("=== FFTO PostgreSQL Pipeline Test ===");
diag("Validates FFTO with pipelined extended query protocol.");
plan(kPlannedTests);
MYSQL* admin = mysql_init(NULL);
PgConnection* pgc = NULL;
if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password,
NULL, cl.admin_port, NULL, 0)) {
diag("Admin connection failed"); return -1;
}
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='true' "
"WHERE variable_name='pgsql-ffto_enabled'");
MYSQL_QUERY(admin, "UPDATE global_variables SET variable_value='1048576' "
"WHERE variable_name='pgsql-ffto_max_buffer_size'");
MYSQL_QUERY(admin, "LOAD PGSQL VARIABLES TO RUNTIME");
{
char eu[2 * strlen(cl.pgsql_root_username) + 1]; char ep[2 * strlen(cl.pgsql_root_password) + 1];
mysql_real_escape_string(admin, eu, cl.pgsql_root_username, strlen(cl.pgsql_root_username));
mysql_real_escape_string(admin, ep, cl.pgsql_root_password, strlen(cl.pgsql_root_password));
char uq[1024];
snprintf(uq, sizeof(uq),
"INSERT OR REPLACE INTO pgsql_users (username, password, fast_forward) "
"VALUES ('%s', '%s', 1)", eu, ep);
MYSQL_QUERY(admin, uq);
MYSQL_QUERY(admin, "LOAD PGSQL USERS TO RUNTIME");
}
{
char sq[1024];
snprintf(sq, sizeof(sq),
"INSERT OR REPLACE INTO pgsql_servers (hostgroup_id, hostname, port) "
"VALUES (0, '%s', %d)", cl.pgsql_server_host, cl.pgsql_server_port);
MYSQL_QUERY(admin, sq);
MYSQL_QUERY(admin, "LOAD PGSQL SERVERS TO RUNTIME");
}
/* ── PgConnection (raw protocol) ────────────────────────────────── */
try {
pgc = new PgConnection(5000);
pgc->connect(cl.pgsql_host, cl.pgsql_port, "postgres",
cl.pgsql_root_username, cl.pgsql_root_password);
} catch (const PgException& e) {
diag("PgConnection failed: %s", e.what());
FAIL_AND_SKIP_REMAINING(cleanup, "PgConnection failed");
}
ok(pgc != NULL && pgc->isConnected(), "Connected via pg_lite_client");
/* Create test table via simple query */
pgc->execute("DROP TABLE IF EXISTS ffto_pg_pipe");
pgc->execute("CREATE TABLE ffto_pg_pipe (id INT PRIMARY KEY, val TEXT)");
pgc->execute("INSERT INTO ffto_pg_pipe VALUES (1,'a'), (2,'b'), (3,'c')");
/* ================================================================
* Scenario 1: 3 different queries pipelined before Sync
*
* Send Parse+Bind+Execute for 3 different queries, then Sync.
* PgSQLFFTO queues them in m_pending_queries and finalizes each
* on its respective CommandComplete, then ReadyForQuery.
* ================================================================ */
diag("--- Scenario 1: 3 pipelined queries ---");
clear_pg_stats(admin);
try {
/* Parse 3 different statements without sending Sync */
pgc->prepareStatement("pipe_sel", "SELECT val FROM ffto_pg_pipe WHERE id = $1", false);
pgc->prepareStatement("pipe_ins", "INSERT INTO ffto_pg_pipe VALUES ($1, $2)", false);
pgc->prepareStatement("pipe_upd", "UPDATE ffto_pg_pipe SET val = $2 WHERE id = $1", false);
/* Send Sync to get ParseComplete responses */
pgc->sendSync();
pgc->consumeInputUntilReady();
/* Bind+Execute all 3 without Sync between them */
pgc->bindStatement("pipe_sel", "",
{{std::string("1"), 0}}, {}, false);
pgc->executePortal("", 0, false);
pgc->bindStatement("pipe_ins", "",
{{std::string("10"), 0}, {std::string("pipelined"), 0}}, {}, false);
pgc->executePortal("", 0, false);
pgc->bindStatement("pipe_upd", "",
{{std::string("1"), 0}, {std::string("pipe_updated"), 0}}, {}, false);
pgc->executePortal("", 0, false);
/* Single Sync for all 3 */
pgc->sendSync();
pgc->consumeInputUntilReady();
} catch (const PgException& e) {
diag("Pipeline scenario 1 failed: %s", e.what());
FAIL_AND_SKIP_REMAINING(cleanup, "Pipeline failed");
}
verify_pg_digest(admin, "SELECT val FROM ffto_pg_pipe WHERE id = $1", 1, 0, 1);
verify_pg_digest(admin, "INSERT INTO ffto_pg_pipe VALUES ($1,$2)", 1, 1, 0);
verify_pg_digest(admin, "UPDATE ffto_pg_pipe SET val = $2 WHERE id = $1", 1, 1, 0);
/* ================================================================
* Scenario 2: Same statement executed 10 times in pipeline
*
* Bind+Execute the same prepared statement 10 times, then Sync.
* All 10 should aggregate into one digest with count_star=10.
* ================================================================ */
diag("--- Scenario 2: 10x pipelined execution ---");
clear_pg_stats(admin);
try {
for (int i = 0; i < 10; i++) {
char id_str[8];
snprintf(id_str, sizeof(id_str), "%d", (i % 3) + 1);
pgc->bindStatement("pipe_sel", "",
{{std::string(id_str), 0}}, {}, false);
pgc->executePortal("", 0, false);
}
pgc->sendSync();
pgc->consumeInputUntilReady();
} catch (const PgException& e) {
diag("Pipeline scenario 2 failed: %s", e.what());
FAIL_AND_SKIP_REMAINING(cleanup, "Pipeline 10x failed");
}
verify_pg_digest(admin, "SELECT val FROM ffto_pg_pipe WHERE id = $1", 10, 0, 10);
cleanup:
if (pgc) { delete pgc; }
if (admin) mysql_close(admin);
return exit_status();
}