From 93eabfc7bc265a274a2bf9ffe37fbbcafbfda374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 9 Apr 2026 10:46:38 +0700 Subject: [PATCH] fix: address code review issues in PostgreSQL cluster sync - Fix fetch_and_store incrementing success counter on failure - Fix sqlite3_stmt leak in pull_pgsql_variables_from_peer (use RAII) - Convert get_peer_to_sync_pgsql_query_rules and _servers_v2 to unified framework delegates (fixes atomic data race on .load()) - Merge dual scan loops in set_checksums to respect enabled_check - Remove spurious "deprecated" label from new checksum_pgsql_variables - Replace sprintf with snprintf in pgsql diffs_before_sync getters - Fix indentation on cluster_pgsql_variables blocks - Fix TAP plan mismatch: replace MYSQL_QUERY with inline mysql_query - Fix trivially-true table accessibility assertions - Add mysql_close on failed connect in test - Move test_cluster_sync_pgsql-t to legacy-g5 group --- lib/ProxySQL_Admin.cpp | 16 +- lib/ProxySQL_Cluster.cpp | 185 +++---------------- test/tap/groups/groups.json | 2 +- test/tap/tests/test_cluster_sync_pgsql-t.cpp | 53 +++--- 4 files changed, 58 insertions(+), 198 deletions(-) diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 07d8ca0f7..e461d8a3c 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -3933,19 +3933,19 @@ char * ProxySQL_Admin::get_variable(char *name) { return strdup(intbuf); } if (!strcasecmp(name,"cluster_pgsql_query_rules_diffs_before_sync")) { - sprintf(intbuf,"%d",variables.cluster_pgsql_query_rules_diffs_before_sync); + snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_query_rules_diffs_before_sync); return strdup(intbuf); } if (!strcasecmp(name,"cluster_pgsql_servers_diffs_before_sync")) { - sprintf(intbuf,"%d",variables.cluster_pgsql_servers_diffs_before_sync); + snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_servers_diffs_before_sync); return strdup(intbuf); } if (!strcasecmp(name,"cluster_pgsql_users_diffs_before_sync")) { - sprintf(intbuf,"%d",variables.cluster_pgsql_users_diffs_before_sync); + snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_users_diffs_before_sync); return strdup(intbuf); } - if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) { - sprintf(intbuf,"%d",variables.cluster_pgsql_variables_diffs_before_sync); + if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) { + snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_variables_diffs_before_sync); return strdup(intbuf); } if (!strcasecmp(name,"cluster_mysql_servers_sync_algorithm")) { @@ -4561,7 +4561,7 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this return false; } } - if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) { + if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) { int intv=atoi(value); if (intv >= 0 && intv <= 1000) { intv = checksum_variables.checksum_pgsql_variables ? intv : 0; @@ -4817,7 +4817,7 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this } return rt; } - if (!strcasecmp(name,"cluster_pgsql_variables_save_to_disk")) { + if (!strcasecmp(name,"cluster_pgsql_variables_save_to_disk")) { bool rt = false; if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { variables.cluster_pgsql_variables_save_to_disk=true; @@ -4967,7 +4967,7 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this checksum_variables.checksum_pgsql_variables=false; variables.cluster_pgsql_variables_diffs_before_sync = 0; GloProxyCluster->cluster_pgsql_variables_diffs_before_sync = 0; - proxy_warning("Disabling deprecated 'admin-checksum_pgsql_variables', setting 'admin-cluster_pgsql_variables_diffs_before_sync=0'\n"); + proxy_warning("Disabling 'admin-checksum_pgsql_variables', setting 'admin-cluster_pgsql_variables_diffs_before_sync=0'\n"); return true; } return false; diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index 234761de3..c7e8a52ac 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -697,43 +697,30 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { while ( _r && (row = mysql_fetch_row(_r))) { // Data-driven approach: find the matching module and process it - bool module_found = false; - - // Search for the module in our data structure and check if it's enabled for (const auto& module : modules) { if (strcmp(row[0], module.module_name) == 0) { // Skip module if not enabled (for modules with optional dependencies like LDAP) if (module.enabled_check && !module.enabled_check()) { - module_found = true; break; } - module_found = true; - break; - } - } - if (module_found) { - // Find the module and get its diff threshold - for (const auto& module : modules) { - if (strcmp(row[0], module.module_name) == 0) { - // Get diff threshold using member pointer with atomic load - unsigned int diff_threshold = (unsigned int)(GloProxyCluster->*(module.diff_member)).load(); - - // Generate generalized sync message using the actual variable suffix - char sync_msg[256]; - const char* var_suffix = module.sync_var_suffix ? module.sync_var_suffix : module.module_name; - snprintf(sync_msg, sizeof(sync_msg), ErrorMessages::DIFFS_BEFORE_SYNC_FORMAT, var_suffix); - - process_component_checksum( - row, - *module.local_checksum, - *module.global_checksum, - now, diff_threshold, - sync_msg, - hostname, port - ); - break; - } + // Get diff threshold using member pointer with atomic load + unsigned int diff_threshold = (unsigned int)(GloProxyCluster->*(module.diff_member)).load(); + + // Generate generalized sync message using the actual variable suffix + char sync_msg[256]; + const char* var_suffix = module.sync_var_suffix ? module.sync_var_suffix : module.module_name; + snprintf(sync_msg, sizeof(sync_msg), ErrorMessages::DIFFS_BEFORE_SYNC_FORMAT, var_suffix); + + process_component_checksum( + row, + *module.local_checksum, + *module.global_checksum, + now, diff_threshold, + sync_msg, + hostname, port + ); + break; } } } @@ -1804,7 +1791,7 @@ int ProxySQL_Cluster::fetch_and_store(MYSQL* conn, const fetch_query& f_query, M proxy_info("%s", msgs[1].c_str()); } - if (f_query.success_counter != p_cluster_counter::metric(-1)) { + if (query_res == 0 && f_query.success_counter != p_cluster_counter::metric(-1)) { metrics.p_counter_array[f_query.success_counter]->Increment(); } @@ -3076,9 +3063,10 @@ void ProxySQL_Cluster::pull_pgsql_variables_from_peer(const std::string& expecte // Insert new pgsql variables MYSQL_ROW row; char *q = (char *)"INSERT OR REPLACE INTO global_variables (variable_name, variable_value) VALUES (?1 , ?2)"; - sqlite3_stmt *statement1 = NULL; - int rc = GloAdmin->admindb->prepare_v2(q, &statement1); - ASSERT_SQLITE_OK(rc, GloAdmin->admindb); + auto [rc1, statement1_unique] = GloAdmin->admindb->prepare_v2(q); + ASSERT_SQLITE_OK(rc1, GloAdmin->admindb); + sqlite3_stmt *statement1 = statement1_unique.get(); + int rc; while ((row = mysql_fetch_row(pgsql_variables_result))) { rc=(*proxy_sqlite3_bind_text)(statement1, 1, row[0], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, GloAdmin->admindb); // variable_name @@ -4330,47 +4318,7 @@ void ProxySQL_Cluster_Nodes::get_peer_to_sync_pgsql_users(char **host, uint16_t * @see ProxySQL_Checksum_Value_2::pgsql_query_rules */ void ProxySQL_Cluster_Nodes::get_peer_to_sync_pgsql_query_rules(char **host, uint16_t *port, char** ip_address) { - unsigned long long version = 0; - unsigned long long epoch = 0; - unsigned long long max_epoch = 0; - char *hostname = NULL; - char *ip_addr = NULL; - uint16_t p = 0; - unsigned int diff_mu = (unsigned int)GloProxyCluster->cluster_pgsql_query_rules_diffs_before_sync; - for( std::unordered_map::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) { - ProxySQL_Node_Entry * node = it->second; - ProxySQL_Checksum_Value_2 * v = &node->checksums_values.pgsql_query_rules; - if (v->version > 1) { - if ( v->epoch > epoch ) { - max_epoch = v->epoch; - if (v->diff_check >= diff_mu) { - epoch = v->epoch; - version = v->version; - const char* ip = node->get_ipaddress(); - if (!safe_update_peer_info(&hostname, &ip_addr, node->get_hostname(), ip)) { - proxy_error("Memory allocation failed while updating pgsql_query_rules peer info\n"); - return; - } - p = node->get_port(); - } - } - } - it++; - } - if (epoch) { - if (max_epoch > epoch) { - proxy_warning("Cluster: detected a peer with pgsql_query_rules epoch %llu , but not enough diff_check. We won't sync from epoch %llu: temporarily skipping sync\n", max_epoch, epoch); - // Clean up allocated memory using helper function - safe_update_peer_info(&hostname, &ip_addr, NULL, NULL); - } - } - if (hostname) { - *host = hostname; - *port = p; - *ip_address = ip_addr; - proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_query_rules version %llu, epoch %llu\n", hostname, p, version, epoch); - proxy_info("Cluster: detected peer %s:%d with pgsql_query_rules version %llu, epoch %llu\n", hostname, p, version, epoch); - } + get_peer_to_sync_variables_module("pgsql_query_rules", host, port, ip_address, nullptr, nullptr); } /** @@ -4433,92 +4381,7 @@ void ProxySQL_Cluster_Nodes::get_peer_to_sync_runtime_pgsql_servers(char **host, */ void ProxySQL_Cluster_Nodes::get_peer_to_sync_pgsql_servers_v2(char** host, uint16_t* port, char** peer_pgsql_servers_v2_checksum, char** peer_runtime_pgsql_servers_checksum, char** ip_address) { - unsigned long long version = 0; - unsigned long long epoch = 0; - unsigned long long max_epoch = 0; - char *hostname = NULL; - char *ip_addr = NULL; - uint16_t p = 0; - char *checksum_v2 = NULL; - char *checksum_runtime = NULL; - unsigned int diff_ms = (unsigned int)GloProxyCluster->cluster_pgsql_servers_diffs_before_sync; - for( std::unordered_map::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) { - ProxySQL_Node_Entry * node = it->second; - ProxySQL_Checksum_Value_2 * v = &node->checksums_values.pgsql_servers_v2; - if (v->version > 1) { - if ( v->epoch > epoch ) { - max_epoch = v->epoch; - if (v->diff_check >= diff_ms) { - epoch = v->epoch; - version = v->version; - if (hostname) { - free(hostname); - } - if (ip_addr) { - free(ip_addr); - } - if (checksum_v2) { - free(checksum_v2); - } - if (checksum_runtime) { - free(checksum_runtime); - } - hostname=strdup(node->get_hostname()); - const char* ip = node->get_ipaddress(); - if (ip) - ip_addr = strdup(ip); - p = node->get_port(); - checksum_v2 = strdup(v->checksum); - // Get runtime checksum as well - ProxySQL_Checksum_Value_2 * v_runtime = &node->checksums_values.pgsql_servers; - if (v_runtime->version > 1) { - checksum_runtime = strdup(v_runtime->checksum); - } - } - } - } - it++; - } - if (epoch) { - if (max_epoch > epoch) { - proxy_warning("Cluster: detected a peer with pgsql_servers_v2 epoch %llu , but not enough diff_check. We won't sync from epoch %llu: temporarily skipping sync\n", max_epoch, epoch); - if (hostname) { - free(hostname); - hostname = NULL; - } - if (ip_addr) { - free(ip_addr); - ip_addr = NULL; - } - if (checksum_v2) { - free(checksum_v2); - checksum_v2 = NULL; - } - if (checksum_runtime) { - free(checksum_runtime); - checksum_runtime = NULL; - } - } - } - if (hostname) { - *host = hostname; - *port = p; - *ip_address = ip_addr; - if (peer_pgsql_servers_v2_checksum) { - *peer_pgsql_servers_v2_checksum = checksum_v2; - } else { - if (checksum_v2) - free(checksum_v2); - } - if (peer_runtime_pgsql_servers_checksum) { - *peer_runtime_pgsql_servers_checksum = checksum_runtime; - } else { - if (checksum_runtime) - free(checksum_runtime); - } - proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_servers_v2 version %llu, epoch %llu\n", hostname, p, version, epoch); - proxy_info("Cluster: detected peer %s:%d with pgsql_servers_v2 version %llu, epoch %llu\n", hostname, p, version, epoch); - } + get_peer_to_sync_variables_module("pgsql_servers_v2", host, port, ip_address, peer_pgsql_servers_v2_checksum, peer_runtime_pgsql_servers_checksum); } diff --git a/test/tap/groups/groups.json b/test/tap/groups/groups.json index f79d55cab..e7884f086 100644 --- a/test/tap/groups/groups.json +++ b/test/tap/groups/groups.json @@ -270,7 +270,7 @@ "test_cluster1-t" : [ "legacy-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], "test_cluster_sync-t" : [ "legacy-g5","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], "test_cluster_sync_mysql_servers-t" : [ "legacy-g5","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], - "test_cluster_sync_pgsql-t" : [ "legacy-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], + "test_cluster_sync_pgsql-t" : [ "legacy-g5","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], "test_com_binlog_dump_enables_fast_forward-t" : [ "legacy-binlog-g1" ], "test_com_register_slave_enables_fast_forward-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], "test_com_reset_connection_com_change_user-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], diff --git a/test/tap/tests/test_cluster_sync_pgsql-t.cpp b/test/tap/tests/test_cluster_sync_pgsql-t.cpp index d3c64a409..af8c9fc47 100644 --- a/test/tap/tests/test_cluster_sync_pgsql-t.cpp +++ b/test/tap/tests/test_cluster_sync_pgsql-t.cpp @@ -606,6 +606,7 @@ int main(int argc, char** argv) { if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { diag("Failed to connect to primary admin: %s", mysql_error(proxysql_admin)); + mysql_close(proxysql_admin); return exit_status(); } @@ -625,7 +626,11 @@ int main(int argc, char** argv) { char query[256]; snprintf(query, sizeof(query), t_check_checksum, checksum_name); - MYSQL_QUERY(proxysql_admin, query); + if (mysql_query(proxysql_admin, query)) { + diag("Query failed: %s — %s", query, mysql_error(proxysql_admin)); + ok(false, "PostgreSQL checksum '%s' found in runtime_checksums_values", checksum_name); + continue; + } MYSQL_RES* result = mysql_store_result(proxysql_admin); if (!result) { diag("Failed to store result from query: %s", query); @@ -655,33 +660,25 @@ int main(int argc, char** argv) { ok(res == EXIT_SUCCESS, "PostgreSQL checksum validation passed"); // Test basic PostgreSQL configuration is supported - MYSQL_QUERY(proxysql_admin, "SELECT 1 FROM pgsql_servers LIMIT 1"); - MYSQL_RES* pgsql_servers_result = mysql_store_result(proxysql_admin); - ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL servers table is accessible"); - if (pgsql_servers_result) { - mysql_free_result(pgsql_servers_result); - } - - MYSQL_QUERY(proxysql_admin, "SELECT 1 FROM pgsql_users LIMIT 1"); - MYSQL_RES* pgsql_users_result = mysql_store_result(proxysql_admin); - ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL users table is accessible"); - if (pgsql_users_result) { - mysql_free_result(pgsql_users_result); - } - - MYSQL_QUERY(proxysql_admin, "SELECT 1 FROM pgsql_query_rules LIMIT 1"); - MYSQL_RES* pgsql_query_rules_result = mysql_store_result(proxysql_admin); - ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL query rules table is accessible"); - if (pgsql_query_rules_result) { - mysql_free_result(pgsql_query_rules_result); - } - - // Check cluster variables exist - MYSQL_QUERY(proxysql_admin, "SHOW VARIABLES LIKE 'cluster_pgsql_%'"); - MYSQL_RES* pgsql_cluster_vars_result = mysql_store_result(proxysql_admin); - ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL cluster variables are accessible"); - if (pgsql_cluster_vars_result) { - mysql_free_result(pgsql_cluster_vars_result); + { + struct table_check { + const char* query; + const char* desc; + }; + const table_check checks[] = { + {"SELECT 1 FROM pgsql_servers LIMIT 1", "PostgreSQL servers table is accessible"}, + {"SELECT 1 FROM pgsql_users LIMIT 1", "PostgreSQL users table is accessible"}, + {"SELECT 1 FROM pgsql_query_rules LIMIT 1", "PostgreSQL query rules table is accessible"}, + {"SHOW VARIABLES LIKE 'cluster_pgsql_%'", "PostgreSQL cluster variables are accessible"}, + }; + for (const auto& check : checks) { + int rc = mysql_query(proxysql_admin, check.query); + if (rc == 0) { + MYSQL_RES* res = mysql_store_result(proxysql_admin); + if (res) mysql_free_result(res); + } + ok(rc == 0, "%s", check.desc); + } } {