Merge pull request #5581 from sysown/fix-fc-parsing

Preserve prepared statement `min_gtid` in `first_comment_parsing` mode
feature/gtid-range-update
René Cannaò 1 month ago committed by GitHub
commit 6cab7bd13b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

3
.gitignore vendored

@ -10,6 +10,9 @@ lint/
# direnv
.envrc.local
# bear/clangd
compile_commands.json
compile_commands.events.json
# Intelij
.idea/

Binary file not shown.

@ -186,6 +186,8 @@ class MySQL_STMTs_local_v14 {
public:
// this map associate client_stmt_id to global_stmt_id : this is used only for client connections
std::map<uint32_t, uint64_t> client_stmt_to_global_ids;
// this map associates client_stmt_id to prepare-time min_gtid annotations
std::map<uint32_t, std::string> client_stmt_to_min_gtid;
// this multimap associate global_stmt_id to client_stmt_id : this is used only for client connections
std::multimap<uint64_t, uint32_t> global_stmt_to_client_ids;
@ -202,6 +204,7 @@ class MySQL_STMTs_local_v14 {
sess = NULL;
is_client_ = _ic;
client_stmt_to_global_ids = std::map<uint32_t, uint64_t>();
client_stmt_to_min_gtid = std::map<uint32_t, std::string>();
global_stmt_to_client_ids = std::multimap<uint64_t, uint32_t>();
backend_stmt_to_global_ids = std::map<uint32_t, uint64_t>();
global_stmt_to_backend_ids = std::map<uint64_t, uint32_t>();
@ -221,6 +224,9 @@ class MySQL_STMTs_local_v14 {
unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); }
uint32_t generate_new_client_stmt_id(uint64_t global_statement_id);
uint64_t find_global_stmt_id_from_client(uint32_t client_stmt_id);
void set_client_min_gtid(uint32_t client_stmt_id, const char* min_gtid);
const char* find_client_min_gtid(uint32_t client_stmt_id) const;
void erase_client_min_gtid(uint32_t client_stmt_id);
bool client_close(uint32_t client_statement_id);
MYSQL_STMT * find_backend_stmt_by_global_id(uint32_t global_statement_id) {
auto s=global_stmt_to_backend_stmt.find(global_statement_id);

@ -794,10 +794,31 @@ uint64_t MySQL_STMTs_local_v14::find_global_stmt_id_from_client(uint32_t client_
return ret;
}
void MySQL_STMTs_local_v14::set_client_min_gtid(uint32_t client_stmt_id, const char* min_gtid) {
if (min_gtid && min_gtid[0] != '\0') {
client_stmt_to_min_gtid[client_stmt_id] = min_gtid;
} else {
client_stmt_to_min_gtid.erase(client_stmt_id);
}
}
const char* MySQL_STMTs_local_v14::find_client_min_gtid(uint32_t client_stmt_id) const {
auto s = client_stmt_to_min_gtid.find(client_stmt_id);
if (s != client_stmt_to_min_gtid.end()) {
return s->second.c_str();
}
return NULL;
}
void MySQL_STMTs_local_v14::erase_client_min_gtid(uint32_t client_stmt_id) {
client_stmt_to_min_gtid.erase(client_stmt_id);
}
bool MySQL_STMTs_local_v14::client_close(uint32_t client_statement_id) {
auto s = client_stmt_to_global_ids.find(client_statement_id);
if (s != client_stmt_to_global_ids.end()) { // found
uint64_t global_stmt_id = s->second;
erase_client_min_gtid(client_statement_id);
client_stmt_to_global_ids.erase(s);
GloMyStmt->ref_count_client(global_stmt_id, -1);
//auto s2 = global_stmt_to_client_ids.find(global_stmt_id);

@ -3379,6 +3379,16 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// we will now generate a unique stmt and send it to the client
uint32_t new_stmt_id=client_myds->myconn->local_stmts->generate_new_client_stmt_id(stmt_info->statement_id);
CurrentQuery.stmt_client_id=new_stmt_id;
// When first_comment_parsing is set to 1 (before query rules) or 3 (before_and_after query rules),
// query rules may strip the min_gtid annotation during STMT_PREPARE. Persist it per client
// statement ID so that STMT_EXECUTE can restore the original routing constraint.
// For more context, refer to https://github.com/sysown/proxysql/issues/5384
int first_comment_parsing = mysql_thread___query_processor_first_comment_parsing;
if (first_comment_parsing == 1 || first_comment_parsing == 3) {
client_myds->myconn->local_stmts->set_client_min_gtid(new_stmt_id, qpo->min_gtid);
}
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info,new_stmt_id);
LogQuery(NULL);
@ -3494,10 +3504,22 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// but as we reached here, stmt_meta is not null and we save the metadata
sess_STMTs_meta->insert(stmt_global_id,stmt_meta);
}
// else
CurrentQuery.stmt_meta=stmt_meta;
//current_hostgroup=qpo->destination_hostgroup;
// When first_comment_parsing is set to 1 (before query rules) or 3 (before_and_after query rules),
// query rules may strip the min_gtid annotation during STMT_PREPARE. So we persist it per client
// statement ID and restore it during STMT_EXECUTE.
// For more context, refer to https://github.com/sysown/proxysql/issues/5384
int first_comment_parsing = mysql_thread___query_processor_first_comment_parsing;
if (!qpo->min_gtid && (first_comment_parsing == 1 || first_comment_parsing == 3)) {
const char *saved_min_gtid = client_myds->myconn->local_stmts->find_client_min_gtid(client_stmt_id);
if (saved_min_gtid) {
qpo->min_gtid = strdup(saved_min_gtid);
}
}
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, ps_type_execute_stmt);
if (rc_break==true) {
return;
@ -5621,6 +5643,16 @@ bool MySQL_Session::handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st,
CurrentQuery.stmt_client_id=client_stmtid;
}
CurrentQuery.mysql_stmt=NULL;
// When first_comment_parsing is set to 1 (before query rules) or 3 (before_and_after query rules),
// query rules may strip the min_gtid annotation during STMT_PREPARE. Persist it per client
// statement ID so that STMT_EXECUTE can restore the original routing constraint.
// For more context, refer to https://github.com/sysown/proxysql/issues/5384
int first_comment_parsing = mysql_thread___query_processor_first_comment_parsing;
if (previous_status.size() == 0 && (first_comment_parsing == 1 || first_comment_parsing == 3)) {
client_myds->myconn->local_stmts->set_client_min_gtid(client_stmtid, qpo->min_gtid);
}
st=status;
size_t sts=previous_status.size();
if (sts) {

@ -383,5 +383,7 @@
"transaction_state_unit-t" : [ "unit-tests-g1" ],
"unit-strip_schema_from_query-t" : [ "unit-tests-g1" ],
"vector_db_performance-t" : [ "ai-g1","@proxysql_min_version:4.0" ],
"vector_features-t" : [ "ai-g1","@proxysql_min_version:4.0" ]
"vector_features-t" : [ "ai-g1","@proxysql_min_version:4.0" ],
"test_ps_min_gtid_fc-t" : [ "legacy-binlog-g1" ],
"test_ps_min_gtid-t" : [ "legacy-binlog-g1" ]
}

@ -2565,3 +2565,125 @@ void spawn_noise(const CommandLine& cl, const std::string& tool_path, const std:
fprintf(stderr, "Failed to fork for noise tool: %s\n", tool_path.c_str());
}
}
int get_backend_gtid_position(
MYSQL* admin,
const std::string& backend_host,
uint32_t backend_port,
std::string& server_uuid,
uint64_t& max_trxid
) {
auto trim = [](const std::string& s) -> std::string {
size_t start = s.find_first_not_of(" \t\n\r");
if (start == std::string::npos) return "";
size_t end = s.find_last_not_of(" \t\n\r");
return s.substr(start, end - start + 1);
};
auto strip_dashes = [](const std::string& uuid) -> std::string {
std::string result;
result.reserve(uuid.size());
for (char c : uuid) {
if (c != '-') result.push_back(c);
}
return result;
};
auto parse_interval_end = [](const std::string& token, uint64_t& interval_end) -> bool {
size_t dash_pos = token.find('-');
if (dash_pos == std::string::npos) {
interval_end = std::stoull(token);
return true;
}
std::string to_str = token.substr(dash_pos + 1);
if (to_str.empty()) return false;
interval_end = std::stoull(to_str);
return true;
};
// A GTID executed set may contain multiple comma-separated GTID entries,
// we parse only the first GTID entry from the set.
auto parse_first_gtid = [&](const std::string& gtid_executed_raw) -> bool {
std::string gtid_executed = trim(gtid_executed_raw);
if (gtid_executed.empty()) return false;
size_t comma_pos = gtid_executed.find(',');
std::string group = trim(
(comma_pos == std::string::npos) ? gtid_executed : gtid_executed.substr(0, comma_pos)
);
if (group.empty()) return false;
size_t colon_pos = group.find(':');
if (colon_pos == std::string::npos || colon_pos == 0 || colon_pos == group.size() - 1) {
return false;
}
server_uuid = strip_dashes(group.substr(0, colon_pos));
if (server_uuid.empty()) return false;
std::string intervals_str = group.substr(colon_pos + 1);
max_trxid = 0;
bool found = false;
size_t ipos = 0;
while (ipos < intervals_str.size()) {
size_t next_colon = intervals_str.find(':', ipos);
std::string token = trim(
(next_colon == std::string::npos) ? intervals_str.substr(ipos) : intervals_str.substr(ipos, next_colon - ipos)
);
if (!token.empty()) {
uint64_t interval_end = 0;
if (!parse_interval_end(token, interval_end)) {
return false;
}
if (interval_end > max_trxid) {
max_trxid = interval_end;
}
found = true;
}
if (next_colon == std::string::npos) {
break;
}
ipos = next_colon + 1;
}
return found;
};
std::string gtid_query = "SELECT gtid_executed FROM stats.stats_mysql_gtid_executed"
" WHERE hostname='" + backend_host + "' AND port=" + std::to_string(backend_port) +
" AND gtid_executed IS NOT NULL AND gtid_executed != ''";
int rc = mysql_query(admin, gtid_query.c_str());
if (rc != 0) {
diag("Failed to query gtid_executed from stats: %s", mysql_error(admin));
return -1;
}
MYSQL_RES* res = mysql_store_result(admin);
MYSQL_ROW row = nullptr;
if (!res) {
diag("Failed to store result for gtid_executed query");
return -1;
}
row = mysql_fetch_row(res);
if (!row || !row[0]) {
mysql_free_result(res);
diag("No GTID info for backend %s:%d", backend_host.c_str(), backend_port);
return -1;
}
std::string gtid_executed = row[0];
mysql_free_result(res);
if (!parse_first_gtid(gtid_executed)) {
diag("Failed to parse GTID entry from gtid_executed: %s", gtid_executed.c_str());
return -1;
}
return 0;
}

@ -981,6 +981,23 @@ using pool_state_t = std::map<uint32_t,mysql_row_t>;
*/
std::pair<int,pool_state_t> fetch_conn_stats(MYSQL* admin, const std::vector<uint32_t> hgs);
/**
* @brief Fetches GTID info for a backend from the gtid_executed set.
* @param admin An already opened connection to ProxySQL admin interface.
* @param backend_host The hostname of the backend server.
* @param backend_port The port of the backend server.
* @param server_uuid Output: the UUID of the first GTID entry in gtid_executed set, with dashes stripped.
* @param max_trxid Output: the maximum transaction ID found in the first GTID entry.
* @return 0 on success, -1 on failure (query error, missing UUID, parse error).
*/
int get_backend_gtid_position(
MYSQL* admin,
const std::string& backend_host,
uint32_t backend_port,
std::string& server_uuid,
uint64_t& max_trxid
);
/**
* @brief Waits for a generic condition.
* @details Wait finishes by a non-zero return code by the condition or by timeout.

@ -0,0 +1,212 @@
/**
* @file test_ps_min_gtid-t.cpp
* @brief Test min_gtid based routing for prepared statements
*
* This test verifies that min_gtid annotation is handled properly
* during STMT_PREPARE and STMT_EXECUTE
*
* Test flow:
* 1. Test 1: Prepare/execute with a reachable GTID - should succeed
* 2. Test 2: Prepare/execute with a future GTID - should fail with timeout
*/
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cstdint>
#include <string>
#include "mysql.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
static const char* EXPECTED_PS_QUERY = "SELECT id FROM test.ps_min_gtid WHERE id=?";
static int get_ps_cache_count(MYSQL* admin) {
std::string query = std::string("SELECT COUNT(*) FROM stats.stats_mysql_prepared_statements_info")
+ " WHERE query like '%" + EXPECTED_PS_QUERY + "'";
ext_val_t<int32_t> result = mysql_query_ext_val(admin, query, int32_t(0));
if (result.err != 0) {
diag("Failed to query PS cache count: err=%d, val=%d", result.err, result.val);
return -1;
}
return result.val;
}
static int run_prepared_stmt(
MYSQL* proxy,
const std::string& gtid,
unsigned int& stmt_errno,
std::string& stmt_errmsg
) {
std::string query = "/*+ ;min_gtid=" + gtid + " */ SELECT id FROM test.ps_min_gtid WHERE id=?";
MYSQL_STMT* stmt = mysql_stmt_init(proxy);
if (!stmt) {
stmt_errno = mysql_errno(proxy);
stmt_errmsg = mysql_error(proxy) ? mysql_error(proxy) : "(null)";
diag("mysql_stmt_init() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
return -1;
}
if (mysql_stmt_prepare(stmt, query.c_str(), query.length()) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_prepare() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
diag("Prepared statement_id: %lu", stmt->stmt_id);
int id_val = 1;
unsigned long len = 0;
my_bool is_null = 0;
MYSQL_BIND bind_param;
memset(&bind_param, 0, sizeof(bind_param));
bind_param.buffer_type = MYSQL_TYPE_LONG;
bind_param.buffer = &id_val;
bind_param.buffer_length = sizeof(id_val);
bind_param.length = &len;
bind_param.is_null = &is_null;
if (mysql_stmt_bind_param(stmt, &bind_param) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_bind_param() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
if (mysql_stmt_execute(stmt) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_execute() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
int result_id = 0;
MYSQL_BIND bind_result;
memset(&bind_result, 0, sizeof(bind_result));
bind_result.buffer_type = MYSQL_TYPE_LONG;
bind_result.buffer = &result_id;
bind_result.buffer_length = sizeof(result_id);
if (mysql_stmt_bind_result(stmt, &bind_result) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_bind_result() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
if (mysql_stmt_fetch(stmt) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_fetch() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_free_result(stmt);
mysql_stmt_close(stmt);
return -1;
}
mysql_stmt_free_result(stmt);
mysql_stmt_close(stmt);
return result_id;
}
static void test_prepare_stmt_valid_gtid(MYSQL* admin, MYSQL* proxy, const std::string& current_gtid, int exp_ps_cache_count) {
diag("========== Test 1: Valid GTID ==========");
unsigned int stmt_errno = 0;
std::string stmt_errmsg;
int result_id = run_prepared_stmt(proxy, current_gtid, stmt_errno, stmt_errmsg);
ok(result_id == 1, "test_valid_gtid: Execute with valid GTID succeeded, result=%d", result_id);
int ps_count = get_ps_cache_count(admin);
if (ps_count < 0) {
BAIL_OUT("test_valid_gtid: Failed to query PS cache count");
}
ok(ps_count == exp_ps_cache_count, "test_valid_gtid: PS cache should have exactly %d entry for query, got %d", exp_ps_cache_count, ps_count);
}
static void test_prepare_stmt_future_gtid(MYSQL* admin, MYSQL* proxy, const std::string& future_gtid, int exp_ps_cache_count) {
diag("========== Test 2: Future GTID ==========");
unsigned int stmt_errno = 0;
std::string stmt_errmsg;
int result_id = run_prepared_stmt(proxy, future_gtid, stmt_errno, stmt_errmsg);
ok(result_id == -1, "test_future_gtid: Execute with future GTID should fail");
ok(stmt_errno == 9001, "test_future_gtid: Error code should be 9001 (timeout), got %u", stmt_errno);
bool has_timeout_msg = (stmt_errmsg.find("Max connect timeout") != std::string::npos);
ok(has_timeout_msg, "test_future_gtid: Error message should contain 'Max connect timeout': %s", stmt_errmsg.c_str());
int ps_count = get_ps_cache_count(admin);
if (ps_count < 0) {
BAIL_OUT("test_future_gtid: Failed to query PS cache count");
}
ok(ps_count == exp_ps_cache_count, "test_future_gtid: PS cache should have exactly %d entry for query, got %d", exp_ps_cache_count, ps_count);
}
int main(int, char**) {
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to get required environmental variables");
return -1;
}
plan(6);
MYSQL* admin = init_mysql_conn(cl.host, cl.admin_port, cl.admin_username, cl.admin_password);
if (!admin) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin));
return exit_status();
}
MYSQL* proxy = init_mysql_conn(cl.host, cl.port, cl.username, cl.password);
if (!proxy) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy));
mysql_close(admin);
return exit_status();
}
diag("========== Setup ==========");
MYSQL_QUERY_T(proxy, "CREATE DATABASE IF NOT EXISTS test");
MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid");
MYSQL_QUERY_T(proxy, "CREATE TABLE test.ps_min_gtid (id INT PRIMARY KEY)");
MYSQL_QUERY_T(proxy, "INSERT INTO test.ps_min_gtid VALUES (1)");
std::string server_uuid;
uint64_t max_trxid = 0;
if (get_backend_gtid_position(admin, cl.mysql_host, cl.mysql_port, server_uuid, max_trxid) != 0) {
mysql_close(proxy);
mysql_close(admin);
BAIL_OUT("No GTID info available from stats.stats_mysql_gtid_executed for backend %s:%d",
cl.mysql_host, cl.mysql_port);
}
std::string current_gtid = server_uuid + ":" + std::to_string(max_trxid);
std::string future_gtid = server_uuid + ":" + std::to_string(max_trxid + 100000);
diag("Current GTID: %s", current_gtid.c_str());
diag("Future GTID: %s", future_gtid.c_str());
int ps_cache_count = get_ps_cache_count(admin);
test_prepare_stmt_valid_gtid(admin, proxy, current_gtid, (ps_cache_count + 1));
test_prepare_stmt_future_gtid(admin, proxy, future_gtid, (ps_cache_count + 1));
diag("========== Teardown ==========");
MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid");
mysql_close(proxy);
mysql_close(admin);
return exit_status();
}

@ -0,0 +1,258 @@
/**
* @file test_ps_min_gtid_fc-t.cpp
* @brief Test for min_gtid preservation in prepared statements with first_comment_parsing mode 1
*
* This test verifies that min_gtid annotations are preserved across STMT_PREPARE and STMT_EXECUTE
* when using mysql-query_processor_first_comment_parsing=1 (parse before rules).
*
* Issue: https://github.com/sysown/proxysql/issues/5384
*
* Test flow:
* 1. Set first_comment_parsing=1 and install a rewrite rule to strip min_gtid
* 2. Test 1: Prepare/execute with a reachable GTID - should succeed
* 3. Test 2: Prepare/execute with a future GTID - should fail with timeout
* 4. Both tests use the same normalized SQL (rewrite rule strips min_gtid)
*/
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cstdint>
#include <string>
#include "mysql.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
static const int TEST_HG = 59999;
static const char* EXP_PS_CACHE_QUERY = "/*+ */ SELECT id FROM test.ps_min_gtid_fc WHERE id=?";
static int setup(MYSQL* admin, MYSQL* proxy, const CommandLine& cl) {
diag("========== Setup ==========");
MYSQL_QUERY_T(proxy, "CREATE DATABASE IF NOT EXISTS test");
MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid_fc");
MYSQL_QUERY_T(proxy, "CREATE TABLE test.ps_min_gtid_fc (id INT PRIMARY KEY)");
MYSQL_QUERY_T(proxy, "INSERT INTO test.ps_min_gtid_fc VALUES (1)");
char server_query[512];
snprintf(server_query, sizeof(server_query),
"INSERT OR REPLACE INTO mysql_servers (hostgroup_id, hostname, port) VALUES (%d, '%s', %d)",
TEST_HG, cl.mysql_host, cl.mysql_port);
MYSQL_QUERY_T(admin, server_query);
MYSQL_QUERY_T(admin, "LOAD MYSQL SERVERS TO RUNTIME");
char rule_query[1024];
MYSQL_QUERY_T(admin, "DELETE FROM mysql_query_rules WHERE rule_id=42");
snprintf(rule_query, sizeof(rule_query),
"INSERT INTO mysql_query_rules (rule_id, active, match_pattern, replace_pattern, apply, destination_hostgroup, comment)"
" VALUES (42, 1, ';min_gtid=[:\\-\\w]+', '', 1, %d, 'Strip min_gtid annotation and route to HG %d')",
TEST_HG, TEST_HG);
MYSQL_QUERY_T(admin, rule_query);
MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
MYSQL_QUERY_T(admin, "SET mysql-query_processor_first_comment_parsing = 1");
MYSQL_QUERY_T(admin, "LOAD MYSQL VARIABLES TO RUNTIME");
return 0;
}
static int cleanup(MYSQL* admin, MYSQL* proxy) {
diag("========== Teardown ==========");
MYSQL_QUERY_T(admin, "DELETE FROM mysql_query_rules WHERE rule_id=42");
MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
MYSQL_QUERY_T(admin, "SET mysql-query_processor_first_comment_parsing = 2");
MYSQL_QUERY_T(admin, "LOAD MYSQL VARIABLES TO RUNTIME");
char del_query[256];
snprintf(del_query, sizeof(del_query),
"DELETE FROM mysql_servers WHERE hostgroup_id=%d", TEST_HG);
MYSQL_QUERY_T(admin, del_query);
MYSQL_QUERY_T(admin, "LOAD MYSQL SERVERS TO RUNTIME");
MYSQL_QUERY_T(proxy, "DROP TABLE IF EXISTS test.ps_min_gtid_fc");
return 0;
}
static int get_ps_cache_count(MYSQL* admin) {
std::string query = std::string("SELECT COUNT(*) FROM stats.stats_mysql_prepared_statements_info")
+ " WHERE query='" + EXP_PS_CACHE_QUERY + "'";
ext_val_t<int32_t> result = mysql_query_ext_val(admin, query, int32_t(0));
if (result.err != 0) {
diag("Failed to query PS cache count: err=%d, val=%d", result.err, result.val);
return -1;
}
return result.val;
}
static int run_prepared_stmt(
MYSQL* proxy,
const std::string& gtid,
unsigned int& stmt_errno,
std::string& stmt_errmsg
) {
std::string query = "/*+ ;min_gtid=" + gtid + " */ SELECT id FROM test.ps_min_gtid_fc WHERE id=?";
MYSQL_STMT* stmt = mysql_stmt_init(proxy);
if (!stmt) {
stmt_errno = mysql_errno(proxy);
stmt_errmsg = mysql_error(proxy) ? mysql_error(proxy) : "(null)";
diag("mysql_stmt_init() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
return -1;
}
if (mysql_stmt_prepare(stmt, query.c_str(), query.length()) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_prepare() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
diag("Prepared statement_id: %lu", stmt->stmt_id);
int id_val = 1;
unsigned long len = 0;
my_bool is_null = 0;
MYSQL_BIND bind_param;
memset(&bind_param, 0, sizeof(bind_param));
bind_param.buffer_type = MYSQL_TYPE_LONG;
bind_param.buffer = &id_val;
bind_param.buffer_length = sizeof(id_val);
bind_param.length = &len;
bind_param.is_null = &is_null;
if (mysql_stmt_bind_param(stmt, &bind_param) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_bind_param() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
if (mysql_stmt_execute(stmt) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_execute() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
int result_id = 0;
MYSQL_BIND bind_result;
memset(&bind_result, 0, sizeof(bind_result));
bind_result.buffer_type = MYSQL_TYPE_LONG;
bind_result.buffer = &result_id;
bind_result.buffer_length = sizeof(result_id);
if (mysql_stmt_bind_result(stmt, &bind_result) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_bind_result() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_close(stmt);
return -1;
}
if (mysql_stmt_fetch(stmt) != 0) {
stmt_errno = mysql_stmt_errno(stmt);
stmt_errmsg = mysql_stmt_error(stmt) ? mysql_stmt_error(stmt) : "(null)";
diag("mysql_stmt_fetch() failed: errno=%u, error=%s", stmt_errno, stmt_errmsg.c_str());
mysql_stmt_free_result(stmt);
mysql_stmt_close(stmt);
return -1;
}
mysql_stmt_free_result(stmt);
mysql_stmt_close(stmt);
return result_id;
}
static void test_prepare_stmt_valid_gtid(MYSQL* admin, MYSQL* proxy, const std::string& current_gtid) {
diag("========== Test 1: Valid GTID ==========");
unsigned int stmt_errno = 0;
std::string stmt_errmsg;
int result_id = run_prepared_stmt(proxy, current_gtid, stmt_errno, stmt_errmsg);
ok(result_id == 1, "test_valid_gtid: Execute with valid GTID succeeded, result=%d", result_id);
int ps_count = get_ps_cache_count(admin);
if (ps_count < 0) {
BAIL_OUT("test_valid_gtid: Failed to query PS cache count");
}
ok(ps_count == 1, "test_valid_gtid: PS cache should have exactly 1 entry for query, got %d", ps_count);
}
static void test_prepare_stmt_future_gtid(MYSQL* admin, MYSQL* proxy, const std::string& future_gtid) {
diag("========== Test 2: Future GTID ==========");
unsigned int stmt_errno = 0;
std::string stmt_errmsg;
int result_id = run_prepared_stmt(proxy, future_gtid, stmt_errno, stmt_errmsg);
ok(result_id == -1, "test_future_gtid: Execute with future GTID should fail");
ok(stmt_errno == 9001, "test_future_gtid: Error code should be 9001 (timeout), got %u", stmt_errno);
bool has_timeout_msg = (stmt_errmsg.find("Max connect timeout") != std::string::npos);
ok(has_timeout_msg, "test_future_gtid: Error message should contain 'Max connect timeout': %s", stmt_errmsg.c_str());
int ps_count = get_ps_cache_count(admin);
if (ps_count < 0) {
BAIL_OUT("test_future_gtid: Failed to query PS cache count");
}
ok(ps_count == 1, "test_future_gtid: PS cache should still have exactly 1 entry for query, got %d", ps_count);
}
int main(int, char**) {
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to get required environmental variables");
return -1;
}
plan(6);
MYSQL* admin = init_mysql_conn(cl.host, cl.admin_port, cl.admin_username, cl.admin_password);
if (!admin) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin));
return exit_status();
}
MYSQL* proxy = init_mysql_conn(cl.host, cl.port, cl.username, cl.password);
if (!proxy) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy));
mysql_close(admin);
return exit_status();
}
setup(admin, proxy, cl);
std::string server_uuid;
uint64_t max_trxid = 0;
if (get_backend_gtid_position(admin, cl.mysql_host, cl.mysql_port, server_uuid, max_trxid) != 0) {
mysql_close(proxy);
mysql_close(admin);
BAIL_OUT("No GTID info available from stats.stats_mysql_gtid_executed for backend %s:%d",
cl.mysql_host, cl.mysql_port);
}
std::string current_gtid = server_uuid + ":" + std::to_string(max_trxid);
std::string future_gtid = server_uuid + ":" + std::to_string(max_trxid + 100000);
diag("Current GTID: %s", current_gtid.c_str());
diag("Future GTID: %s", future_gtid.c_str());
test_prepare_stmt_valid_gtid(admin, proxy, current_gtid);
test_prepare_stmt_future_gtid(admin, proxy, future_gtid);
cleanup(admin, proxy);
mysql_close(proxy);
mysql_close(admin);
return exit_status();
}
Loading…
Cancel
Save