diff --git a/.gitignore b/.gitignore index 56f6e200b..ad2fc6aee 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ lint/ # direnv .envrc.local +# bear/clangd +compile_commands.json +compile_commands.events.json # Intelij .idea/ diff --git a/compile_commands.events.json b/compile_commands.events.json deleted file mode 100644 index 260facad0..000000000 Binary files a/compile_commands.events.json and /dev/null differ diff --git a/include/MySQL_PreparedStatement.h b/include/MySQL_PreparedStatement.h index 8b7a64008..d7225c875 100644 --- a/include/MySQL_PreparedStatement.h +++ b/include/MySQL_PreparedStatement.h @@ -186,6 +186,8 @@ class MySQL_STMTs_local_v14 { public: // this map associate client_stmt_id to global_stmt_id : this is used only for client connections std::map client_stmt_to_global_ids; + // this map associates client_stmt_id to prepare-time min_gtid annotations + std::map client_stmt_to_min_gtid; // this multimap associate global_stmt_id to client_stmt_id : this is used only for client connections std::multimap global_stmt_to_client_ids; @@ -202,6 +204,7 @@ class MySQL_STMTs_local_v14 { sess = NULL; is_client_ = _ic; client_stmt_to_global_ids = std::map(); + client_stmt_to_min_gtid = std::map(); global_stmt_to_client_ids = std::multimap(); backend_stmt_to_global_ids = std::map(); global_stmt_to_backend_ids = std::map(); @@ -221,6 +224,9 @@ class MySQL_STMTs_local_v14 { unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); } uint32_t generate_new_client_stmt_id(uint64_t global_statement_id); uint64_t find_global_stmt_id_from_client(uint32_t client_stmt_id); + void set_client_min_gtid(uint32_t client_stmt_id, const char* min_gtid); + const char* find_client_min_gtid(uint32_t client_stmt_id) const; + void erase_client_min_gtid(uint32_t client_stmt_id); bool client_close(uint32_t client_statement_id); MYSQL_STMT * find_backend_stmt_by_global_id(uint32_t global_statement_id) { auto s=global_stmt_to_backend_stmt.find(global_statement_id); diff --git a/lib/MySQL_PreparedStatement.cpp b/lib/MySQL_PreparedStatement.cpp index 1b27c6a6e..e729ae351 100644 --- a/lib/MySQL_PreparedStatement.cpp +++ b/lib/MySQL_PreparedStatement.cpp @@ -794,10 +794,31 @@ uint64_t MySQL_STMTs_local_v14::find_global_stmt_id_from_client(uint32_t client_ return ret; } +void MySQL_STMTs_local_v14::set_client_min_gtid(uint32_t client_stmt_id, const char* min_gtid) { + if (min_gtid && min_gtid[0] != '\0') { + client_stmt_to_min_gtid[client_stmt_id] = min_gtid; + } else { + client_stmt_to_min_gtid.erase(client_stmt_id); + } +} + +const char* MySQL_STMTs_local_v14::find_client_min_gtid(uint32_t client_stmt_id) const { + auto s = client_stmt_to_min_gtid.find(client_stmt_id); + if (s != client_stmt_to_min_gtid.end()) { + return s->second.c_str(); + } + return NULL; +} + +void MySQL_STMTs_local_v14::erase_client_min_gtid(uint32_t client_stmt_id) { + client_stmt_to_min_gtid.erase(client_stmt_id); +} + bool MySQL_STMTs_local_v14::client_close(uint32_t client_statement_id) { auto s = client_stmt_to_global_ids.find(client_statement_id); if (s != client_stmt_to_global_ids.end()) { // found uint64_t global_stmt_id = s->second; + erase_client_min_gtid(client_statement_id); client_stmt_to_global_ids.erase(s); GloMyStmt->ref_count_client(global_stmt_id, -1); //auto s2 = global_stmt_to_client_ids.find(global_stmt_id); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 623c0461d..cc6917cd1 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -3379,6 +3379,16 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // we will now generate a unique stmt and send it to the client uint32_t new_stmt_id=client_myds->myconn->local_stmts->generate_new_client_stmt_id(stmt_info->statement_id); CurrentQuery.stmt_client_id=new_stmt_id; + + // When first_comment_parsing is set to 1 (before query rules) or 3 (before_and_after query rules), + // query rules may strip the min_gtid annotation during STMT_PREPARE. Persist it per client + // statement ID so that STMT_EXECUTE can restore the original routing constraint. + // For more context, refer to https://github.com/sysown/proxysql/issues/5384 + int first_comment_parsing = mysql_thread___query_processor_first_comment_parsing; + if (first_comment_parsing == 1 || first_comment_parsing == 3) { + client_myds->myconn->local_stmts->set_client_min_gtid(new_stmt_id, qpo->min_gtid); + } + client_myds->setDSS_STATE_QUERY_SENT_NET(); client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info,new_stmt_id); LogQuery(NULL); @@ -3494,10 +3504,22 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // but as we reached here, stmt_meta is not null and we save the metadata sess_STMTs_meta->insert(stmt_global_id,stmt_meta); } - // else CurrentQuery.stmt_meta=stmt_meta; //current_hostgroup=qpo->destination_hostgroup; + + // When first_comment_parsing is set to 1 (before query rules) or 3 (before_and_after query rules), + // query rules may strip the min_gtid annotation during STMT_PREPARE. So we persist it per client + // statement ID and restore it during STMT_EXECUTE. + // For more context, refer to https://github.com/sysown/proxysql/issues/5384 + int first_comment_parsing = mysql_thread___query_processor_first_comment_parsing; + if (!qpo->min_gtid && (first_comment_parsing == 1 || first_comment_parsing == 3)) { + const char *saved_min_gtid = client_myds->myconn->local_stmts->find_client_min_gtid(client_stmt_id); + if (saved_min_gtid) { + qpo->min_gtid = strdup(saved_min_gtid); + } + } + rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, ps_type_execute_stmt); if (rc_break==true) { return; @@ -5621,6 +5643,16 @@ bool MySQL_Session::handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, CurrentQuery.stmt_client_id=client_stmtid; } CurrentQuery.mysql_stmt=NULL; + + // When first_comment_parsing is set to 1 (before query rules) or 3 (before_and_after query rules), + // query rules may strip the min_gtid annotation during STMT_PREPARE. Persist it per client + // statement ID so that STMT_EXECUTE can restore the original routing constraint. + // For more context, refer to https://github.com/sysown/proxysql/issues/5384 + int first_comment_parsing = mysql_thread___query_processor_first_comment_parsing; + if (previous_status.size() == 0 && (first_comment_parsing == 1 || first_comment_parsing == 3)) { + client_myds->myconn->local_stmts->set_client_min_gtid(client_stmtid, qpo->min_gtid); + } + st=status; size_t sts=previous_status.size(); if (sts) { diff --git a/test/tap/groups/groups.json b/test/tap/groups/groups.json index 119d99c8f..f7f81e345 100644 --- a/test/tap/groups/groups.json +++ b/test/tap/groups/groups.json @@ -383,5 +383,7 @@ "transaction_state_unit-t" : [ "unit-tests-g1" ], "unit-strip_schema_from_query-t" : [ "unit-tests-g1" ], "vector_db_performance-t" : [ "ai-g1","@proxysql_min_version:4.0" ], - "vector_features-t" : [ "ai-g1","@proxysql_min_version:4.0" ] + "vector_features-t" : [ "ai-g1","@proxysql_min_version:4.0" ], + "test_ps_min_gtid_fc-t" : [ "legacy-binlog-g1" ], + "test_ps_min_gtid-t" : [ "legacy-binlog-g1" ] } diff --git a/test/tap/tap/utils.cpp b/test/tap/tap/utils.cpp index a4d494432..84d9d6271 100644 --- a/test/tap/tap/utils.cpp +++ b/test/tap/tap/utils.cpp @@ -2565,3 +2565,125 @@ void spawn_noise(const CommandLine& cl, const std::string& tool_path, const std: fprintf(stderr, "Failed to fork for noise tool: %s\n", tool_path.c_str()); } } + +int get_backend_gtid_position( + MYSQL* admin, + const std::string& backend_host, + uint32_t backend_port, + std::string& server_uuid, + uint64_t& max_trxid +) { + auto trim = [](const std::string& s) -> std::string { + size_t start = s.find_first_not_of(" \t\n\r"); + if (start == std::string::npos) return ""; + size_t end = s.find_last_not_of(" \t\n\r"); + return s.substr(start, end - start + 1); + }; + + auto strip_dashes = [](const std::string& uuid) -> std::string { + std::string result; + result.reserve(uuid.size()); + for (char c : uuid) { + if (c != '-') result.push_back(c); + } + return result; + }; + + auto parse_interval_end = [](const std::string& token, uint64_t& interval_end) -> bool { + size_t dash_pos = token.find('-'); + if (dash_pos == std::string::npos) { + interval_end = std::stoull(token); + return true; + } + std::string to_str = token.substr(dash_pos + 1); + if (to_str.empty()) return false; + interval_end = std::stoull(to_str); + return true; + }; + + // A GTID executed set may contain multiple comma-separated GTID entries, + // we parse only the first GTID entry from the set. + auto parse_first_gtid = [&](const std::string& gtid_executed_raw) -> bool { + std::string gtid_executed = trim(gtid_executed_raw); + if (gtid_executed.empty()) return false; + + size_t comma_pos = gtid_executed.find(','); + std::string group = trim( + (comma_pos == std::string::npos) ? gtid_executed : gtid_executed.substr(0, comma_pos) + ); + if (group.empty()) return false; + + size_t colon_pos = group.find(':'); + if (colon_pos == std::string::npos || colon_pos == 0 || colon_pos == group.size() - 1) { + return false; + } + + server_uuid = strip_dashes(group.substr(0, colon_pos)); + if (server_uuid.empty()) return false; + + std::string intervals_str = group.substr(colon_pos + 1); + max_trxid = 0; + bool found = false; + + size_t ipos = 0; + while (ipos < intervals_str.size()) { + size_t next_colon = intervals_str.find(':', ipos); + std::string token = trim( + (next_colon == std::string::npos) ? intervals_str.substr(ipos) : intervals_str.substr(ipos, next_colon - ipos) + ); + + if (!token.empty()) { + uint64_t interval_end = 0; + if (!parse_interval_end(token, interval_end)) { + return false; + } + if (interval_end > max_trxid) { + max_trxid = interval_end; + } + found = true; + } + + if (next_colon == std::string::npos) { + break; + } + ipos = next_colon + 1; + } + + return found; + }; + + std::string gtid_query = "SELECT gtid_executed FROM stats.stats_mysql_gtid_executed" + " WHERE hostname='" + backend_host + "' AND port=" + std::to_string(backend_port) + + " AND gtid_executed IS NOT NULL AND gtid_executed != ''"; + + int rc = mysql_query(admin, gtid_query.c_str()); + if (rc != 0) { + diag("Failed to query gtid_executed from stats: %s", mysql_error(admin)); + return -1; + } + + MYSQL_RES* res = mysql_store_result(admin); + MYSQL_ROW row = nullptr; + + if (!res) { + diag("Failed to store result for gtid_executed query"); + return -1; + } + + row = mysql_fetch_row(res); + if (!row || !row[0]) { + mysql_free_result(res); + diag("No GTID info for backend %s:%d", backend_host.c_str(), backend_port); + return -1; + } + + std::string gtid_executed = row[0]; + mysql_free_result(res); + + if (!parse_first_gtid(gtid_executed)) { + diag("Failed to parse GTID entry from gtid_executed: %s", gtid_executed.c_str()); + return -1; + } + + return 0; +} diff --git a/test/tap/tap/utils.h b/test/tap/tap/utils.h index 064a74ad4..5c8382f92 100644 --- a/test/tap/tap/utils.h +++ b/test/tap/tap/utils.h @@ -981,6 +981,23 @@ using pool_state_t = std::map; */ std::pair fetch_conn_stats(MYSQL* admin, const std::vector hgs); +/** + * @brief Fetches GTID info for a backend from the gtid_executed set. + * @param admin An already opened connection to ProxySQL admin interface. + * @param backend_host The hostname of the backend server. + * @param backend_port The port of the backend server. + * @param server_uuid Output: the UUID of the first GTID entry in gtid_executed set, with dashes stripped. + * @param max_trxid Output: the maximum transaction ID found in the first GTID entry. + * @return 0 on success, -1 on failure (query error, missing UUID, parse error). + */ +int get_backend_gtid_position( + MYSQL* admin, + const std::string& backend_host, + uint32_t backend_port, + std::string& server_uuid, + uint64_t& max_trxid +); + /** * @brief Waits for a generic condition. * @details Wait finishes by a non-zero return code by the condition or by timeout. diff --git a/test/tap/tests/test_ps_min_gtid-t.cpp b/test/tap/tests/test_ps_min_gtid-t.cpp new file mode 100644 index 000000000..bbce04100 --- /dev/null +++ b/test/tap/tests/test_ps_min_gtid-t.cpp @@ -0,0 +1,212 @@ +/** + * @file test_ps_min_gtid-t.cpp + * @brief Test min_gtid based routing for prepared statements + * + * This test verifies that min_gtid annotation is handled properly + * during STMT_PREPARE and STMT_EXECUTE + * + * Test flow: + * 1. Test 1: Prepare/execute with a reachable GTID - should succeed + * 2. Test 2: Prepare/execute with a future GTID - should fail with timeout + */ + +#include +#include +#include +#include +#include +#include "mysql.h" +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +static const char* EXPECTED_PS_QUERY = "SELECT id FROM test.ps_min_gtid WHERE id=?"; + +static int get_ps_cache_count(MYSQL* admin) { + std::string query = std::string("SELECT COUNT(*) FROM stats.stats_mysql_prepared_statements_info") + + " WHERE query like '%" + EXPECTED_PS_QUERY + "'"; + + ext_val_t result = mysql_query_ext_val(admin, query, int32_t(0)); + if (result.err != 0) { + diag("Failed to query PS cache count: err=%d, val=%d", result.err, result.val); + return -1; + } + return result.val; +} + +static int run_prepared_stmt( + MYSQL* proxy, + const std::string& gtid, + unsigned int& stmt_errno, + std::string& stmt_errmsg +) { + std::string query = "/*+ ;min_gtid=" + gtid + " */ SELECT id FROM test.ps_min_gtid WHERE id=?"; + + MYSQL_STMT* stmt = mysql_stmt_init(proxy); + if (!stmt) { + stmt_errno = mysql_errno(proxy); + stmt_errmsg = mysql_error(proxy) ? mysql_error(proxy) : "(null)"; + diag("mysql_stmt_init() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + return -1; + } + + if (mysql_stmt_prepare(stmt, query.c_str(), query.length()) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_prepare() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + diag("Prepared statement_id: %lu", stmt->stmt_id); + + int id_val = 1; + unsigned long len = 0; + my_bool is_null = 0; + + MYSQL_BIND bind_param; + memset(&bind_param, 0, sizeof(bind_param)); + bind_param.buffer_type = MYSQL_TYPE_LONG; + bind_param.buffer = &id_val; + bind_param.buffer_length = sizeof(id_val); + bind_param.length = &len; + bind_param.is_null = &is_null; + + if (mysql_stmt_bind_param(stmt, &bind_param) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_bind_param() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + if (mysql_stmt_execute(stmt) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_execute() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + int result_id = 0; + MYSQL_BIND bind_result; + memset(&bind_result, 0, sizeof(bind_result)); + bind_result.buffer_type = MYSQL_TYPE_LONG; + bind_result.buffer = &result_id; + bind_result.buffer_length = sizeof(result_id); + + if (mysql_stmt_bind_result(stmt, &bind_result) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_bind_result() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + if (mysql_stmt_fetch(stmt) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_fetch() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_free_result(stmt); + mysql_stmt_close(stmt); + return -1; + } + + mysql_stmt_free_result(stmt); + mysql_stmt_close(stmt); + + return result_id; +} + +static void test_prepare_stmt_valid_gtid(MYSQL* admin, MYSQL* proxy, const std::string& current_gtid, int exp_ps_cache_count) { + diag("========== Test 1: Valid GTID =========="); + + unsigned int stmt_errno = 0; + std::string stmt_errmsg; + int result_id = run_prepared_stmt(proxy, current_gtid, stmt_errno, stmt_errmsg); + + ok(result_id == 1, "test_valid_gtid: Execute with valid GTID succeeded, result=%d", result_id); + + int ps_count = get_ps_cache_count(admin); + if (ps_count < 0) { + BAIL_OUT("test_valid_gtid: Failed to query PS cache count"); + } + ok(ps_count == exp_ps_cache_count, "test_valid_gtid: PS cache should have exactly %d entry for query, got %d", exp_ps_cache_count, ps_count); +} + +static void test_prepare_stmt_future_gtid(MYSQL* admin, MYSQL* proxy, const std::string& future_gtid, int exp_ps_cache_count) { + diag("========== Test 2: Future GTID =========="); + + unsigned int stmt_errno = 0; + std::string stmt_errmsg; + int result_id = run_prepared_stmt(proxy, future_gtid, stmt_errno, stmt_errmsg); + + ok(result_id == -1, "test_future_gtid: Execute with future GTID should fail"); + ok(stmt_errno == 9001, "test_future_gtid: Error code should be 9001 (timeout), got %u", stmt_errno); + + bool has_timeout_msg = (stmt_errmsg.find("Max connect timeout") != std::string::npos); + ok(has_timeout_msg, "test_future_gtid: Error message should contain 'Max connect timeout': %s", stmt_errmsg.c_str()); + + int ps_count = get_ps_cache_count(admin); + if (ps_count < 0) { + BAIL_OUT("test_future_gtid: Failed to query PS cache count"); + } + + ok(ps_count == exp_ps_cache_count, "test_future_gtid: PS cache should have exactly %d entry for query, got %d", exp_ps_cache_count, ps_count); +} + +int main(int, char**) { + CommandLine cl; + if (cl.getEnv()) { + diag("Failed to get required environmental variables"); + return -1; + } + + plan(6); + + MYSQL* admin = init_mysql_conn(cl.host, cl.admin_port, cl.admin_username, cl.admin_password); + if (!admin) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin)); + return exit_status(); + } + + MYSQL* proxy = init_mysql_conn(cl.host, cl.port, cl.username, cl.password); + if (!proxy) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy)); + mysql_close(admin); + return exit_status(); + } + + diag("========== Setup =========="); + MYSQL_QUERY_T(proxy, "CREATE DATABASE IF NOT EXISTS test"); + MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid"); + MYSQL_QUERY_T(proxy, "CREATE TABLE test.ps_min_gtid (id INT PRIMARY KEY)"); + MYSQL_QUERY_T(proxy, "INSERT INTO test.ps_min_gtid VALUES (1)"); + + std::string server_uuid; + uint64_t max_trxid = 0; + if (get_backend_gtid_position(admin, cl.mysql_host, cl.mysql_port, server_uuid, max_trxid) != 0) { + mysql_close(proxy); + mysql_close(admin); + BAIL_OUT("No GTID info available from stats.stats_mysql_gtid_executed for backend %s:%d", + cl.mysql_host, cl.mysql_port); + } + + std::string current_gtid = server_uuid + ":" + std::to_string(max_trxid); + std::string future_gtid = server_uuid + ":" + std::to_string(max_trxid + 100000); + + diag("Current GTID: %s", current_gtid.c_str()); + diag("Future GTID: %s", future_gtid.c_str()); + + int ps_cache_count = get_ps_cache_count(admin); + + test_prepare_stmt_valid_gtid(admin, proxy, current_gtid, (ps_cache_count + 1)); + test_prepare_stmt_future_gtid(admin, proxy, future_gtid, (ps_cache_count + 1)); + + diag("========== Teardown =========="); + MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid"); + mysql_close(proxy); + mysql_close(admin); + + return exit_status(); +} diff --git a/test/tap/tests/test_ps_min_gtid_fc-t.cpp b/test/tap/tests/test_ps_min_gtid_fc-t.cpp new file mode 100644 index 000000000..e71df8960 --- /dev/null +++ b/test/tap/tests/test_ps_min_gtid_fc-t.cpp @@ -0,0 +1,258 @@ +/** + * @file test_ps_min_gtid_fc-t.cpp + * @brief Test for min_gtid preservation in prepared statements with first_comment_parsing mode 1 + * + * This test verifies that min_gtid annotations are preserved across STMT_PREPARE and STMT_EXECUTE + * when using mysql-query_processor_first_comment_parsing=1 (parse before rules). + * + * Issue: https://github.com/sysown/proxysql/issues/5384 + * + * Test flow: + * 1. Set first_comment_parsing=1 and install a rewrite rule to strip min_gtid + * 2. Test 1: Prepare/execute with a reachable GTID - should succeed + * 3. Test 2: Prepare/execute with a future GTID - should fail with timeout + * 4. Both tests use the same normalized SQL (rewrite rule strips min_gtid) + */ + +#include +#include +#include +#include +#include +#include "mysql.h" +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +static const int TEST_HG = 59999; +static const char* EXP_PS_CACHE_QUERY = "/*+ */ SELECT id FROM test.ps_min_gtid_fc WHERE id=?"; + +static int setup(MYSQL* admin, MYSQL* proxy, const CommandLine& cl) { + diag("========== Setup =========="); + + MYSQL_QUERY_T(proxy, "CREATE DATABASE IF NOT EXISTS test"); + MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid_fc"); + MYSQL_QUERY_T(proxy, "CREATE TABLE test.ps_min_gtid_fc (id INT PRIMARY KEY)"); + MYSQL_QUERY_T(proxy, "INSERT INTO test.ps_min_gtid_fc VALUES (1)"); + + char server_query[512]; + snprintf(server_query, sizeof(server_query), + "INSERT OR REPLACE INTO mysql_servers (hostgroup_id, hostname, port) VALUES (%d, '%s', %d)", + TEST_HG, cl.mysql_host, cl.mysql_port); + MYSQL_QUERY_T(admin, server_query); + MYSQL_QUERY_T(admin, "LOAD MYSQL SERVERS TO RUNTIME"); + + char rule_query[1024]; + MYSQL_QUERY_T(admin, "DELETE FROM mysql_query_rules WHERE rule_id=42"); + snprintf(rule_query, sizeof(rule_query), + "INSERT INTO mysql_query_rules (rule_id, active, match_pattern, replace_pattern, apply, destination_hostgroup, comment)" + " VALUES (42, 1, ';min_gtid=[:\\-\\w]+', '', 1, %d, 'Strip min_gtid annotation and route to HG %d')", + TEST_HG, TEST_HG); + MYSQL_QUERY_T(admin, rule_query); + MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME"); + + MYSQL_QUERY_T(admin, "SET mysql-query_processor_first_comment_parsing = 1"); + MYSQL_QUERY_T(admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + + return 0; +} + +static int cleanup(MYSQL* admin, MYSQL* proxy) { + diag("========== Teardown =========="); + + MYSQL_QUERY_T(admin, "DELETE FROM mysql_query_rules WHERE rule_id=42"); + MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME"); + MYSQL_QUERY_T(admin, "SET mysql-query_processor_first_comment_parsing = 2"); + MYSQL_QUERY_T(admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + + char del_query[256]; + snprintf(del_query, sizeof(del_query), + "DELETE FROM mysql_servers WHERE hostgroup_id=%d", TEST_HG); + MYSQL_QUERY_T(admin, del_query); + MYSQL_QUERY_T(admin, "LOAD MYSQL SERVERS TO RUNTIME"); + + MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid_fc"); + + return 0; +} + +static int get_ps_cache_count(MYSQL* admin) { + std::string query = std::string("SELECT COUNT(*) FROM stats.stats_mysql_prepared_statements_info") + + " WHERE query='" + EXP_PS_CACHE_QUERY + "'"; + + ext_val_t result = mysql_query_ext_val(admin, query, int32_t(0)); + if (result.err != 0) { + diag("Failed to query PS cache count: err=%d, val=%d", result.err, result.val); + return -1; + } + return result.val; +} + +static int run_prepared_stmt( + MYSQL* proxy, + const std::string& gtid, + unsigned int& stmt_errno, + std::string& stmt_errmsg +) { + std::string query = "/*+ ;min_gtid=" + gtid + " */ SELECT id FROM test.ps_min_gtid_fc WHERE id=?"; + + MYSQL_STMT* stmt = mysql_stmt_init(proxy); + if (!stmt) { + stmt_errno = mysql_errno(proxy); + stmt_errmsg = mysql_error(proxy) ? mysql_error(proxy) : "(null)"; + diag("mysql_stmt_init() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + return -1; + } + + if (mysql_stmt_prepare(stmt, query.c_str(), query.length()) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_prepare() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + diag("Prepared statement_id: %lu", stmt->stmt_id); + + int id_val = 1; + unsigned long len = 0; + my_bool is_null = 0; + + MYSQL_BIND bind_param; + memset(&bind_param, 0, sizeof(bind_param)); + bind_param.buffer_type = MYSQL_TYPE_LONG; + bind_param.buffer = &id_val; + bind_param.buffer_length = sizeof(id_val); + bind_param.length = &len; + bind_param.is_null = &is_null; + + if (mysql_stmt_bind_param(stmt, &bind_param) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_bind_param() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + if (mysql_stmt_execute(stmt) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_execute() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + int result_id = 0; + MYSQL_BIND bind_result; + memset(&bind_result, 0, sizeof(bind_result)); + bind_result.buffer_type = MYSQL_TYPE_LONG; + bind_result.buffer = &result_id; + bind_result.buffer_length = sizeof(result_id); + + if (mysql_stmt_bind_result(stmt, &bind_result) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_bind_result() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_close(stmt); + return -1; + } + + if (mysql_stmt_fetch(stmt) != 0) { + stmt_errno = mysql_stmt_errno(stmt); + stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)"; + diag("mysql_stmt_fetch() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str()); + mysql_stmt_free_result(stmt); + mysql_stmt_close(stmt); + return -1; + } + + mysql_stmt_free_result(stmt); + mysql_stmt_close(stmt); + + return result_id; +} + +static void test_prepare_stmt_valid_gtid(MYSQL* admin, MYSQL* proxy, const std::string& current_gtid) { + diag("========== Test 1: Valid GTID =========="); + + unsigned int stmt_errno = 0; + std::string stmt_errmsg; + int result_id = run_prepared_stmt(proxy, current_gtid, stmt_errno, stmt_errmsg); + + ok(result_id == 1, "test_valid_gtid: Execute with valid GTID succeeded, result=%d", result_id); + + int ps_count = get_ps_cache_count(admin); + if (ps_count < 0) { + BAIL_OUT("test_valid_gtid: Failed to query PS cache count"); + } + ok(ps_count == 1, "test_valid_gtid: PS cache should have exactly 1 entry for query, got %d", ps_count); +} + +static void test_prepare_stmt_future_gtid(MYSQL* admin, MYSQL* proxy, const std::string& future_gtid) { + diag("========== Test 2: Future GTID =========="); + + unsigned int stmt_errno = 0; + std::string stmt_errmsg; + int result_id = run_prepared_stmt(proxy, future_gtid, stmt_errno, stmt_errmsg); + + ok(result_id == -1, "test_future_gtid: Execute with future GTID should fail"); + ok(stmt_errno == 9001, "test_future_gtid: Error code should be 9001 (timeout), got %u", stmt_errno); + + bool has_timeout_msg = (stmt_errmsg.find("Max connect timeout") != std::string::npos); + ok(has_timeout_msg, "test_future_gtid: Error message should contain 'Max connect timeout': %s", stmt_errmsg.c_str()); + + int ps_count = get_ps_cache_count(admin); + if (ps_count < 0) { + BAIL_OUT("test_future_gtid: Failed to query PS cache count"); + } + ok(ps_count == 1, "test_future_gtid: PS cache should still have exactly 1 entry for query, got %d", ps_count); +} + +int main(int, char**) { + CommandLine cl; + if (cl.getEnv()) { + diag("Failed to get required environmental variables"); + return -1; + } + + plan(6); + + MYSQL* admin = init_mysql_conn(cl.host, cl.admin_port, cl.admin_username, cl.admin_password); + if (!admin) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin)); + return exit_status(); + } + + MYSQL* proxy = init_mysql_conn(cl.host, cl.port, cl.username, cl.password); + if (!proxy) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy)); + mysql_close(admin); + return exit_status(); + } + + setup(admin, proxy, cl); + + std::string server_uuid; + uint64_t max_trxid = 0; + if (get_backend_gtid_position(admin, cl.mysql_host, cl.mysql_port, server_uuid, max_trxid) != 0) { + mysql_close(proxy); + mysql_close(admin); + BAIL_OUT("No GTID info available from stats.stats_mysql_gtid_executed for backend %s:%d", + cl.mysql_host, cl.mysql_port); + } + + std::string current_gtid = server_uuid + ":" + std::to_string(max_trxid); + std::string future_gtid = server_uuid + ":" + std::to_string(max_trxid + 100000); + + diag("Current GTID: %s", current_gtid.c_str()); + diag("Future GTID: %s", future_gtid.c_str()); + test_prepare_stmt_valid_gtid(admin, proxy, current_gtid); + test_prepare_stmt_future_gtid(admin, proxy, future_gtid); + + cleanup(admin, proxy); + + mysql_close(proxy); + mysql_close(admin); + + return exit_status(); +}