fix: address code review issues in PostgreSQL cluster sync

- Fix fetch_and_store incrementing success counter on failure
- Fix sqlite3_stmt leak in pull_pgsql_variables_from_peer (use RAII)
- Convert get_peer_to_sync_pgsql_query_rules and _servers_v2 to
  unified framework delegates (fixes atomic data race on .load())
- Merge dual scan loops in set_checksums to respect enabled_check
- Remove spurious "deprecated" label from new checksum_pgsql_variables
- Replace sprintf with snprintf in pgsql diffs_before_sync getters
- Fix indentation on cluster_pgsql_variables blocks
- Fix TAP plan mismatch: replace MYSQL_QUERY with inline mysql_query
- Fix trivially-true table accessibility assertions
- Add mysql_close on failed connect in test
- Move test_cluster_sync_pgsql-t to legacy-g5 group
fix/postgresql-cluster-sync_2
René Cannaò 2 months ago
parent 8a61232517
commit 93eabfc7bc

@ -3933,19 +3933,19 @@ char * ProxySQL_Admin::get_variable(char *name) {
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_pgsql_query_rules_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_pgsql_query_rules_diffs_before_sync);
snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_query_rules_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_pgsql_servers_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_pgsql_servers_diffs_before_sync);
snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_servers_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_pgsql_users_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_pgsql_users_diffs_before_sync);
snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_users_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_pgsql_variables_diffs_before_sync);
if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) {
snprintf(intbuf, sizeof(intbuf),"%d",variables.cluster_pgsql_variables_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_mysql_servers_sync_algorithm")) {
@ -4561,7 +4561,7 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this
return false;
}
}
if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) {
if (!strcasecmp(name,"cluster_pgsql_variables_diffs_before_sync")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 1000) {
intv = checksum_variables.checksum_pgsql_variables ? intv : 0;
@ -4817,7 +4817,7 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this
}
return rt;
}
if (!strcasecmp(name,"cluster_pgsql_variables_save_to_disk")) {
if (!strcasecmp(name,"cluster_pgsql_variables_save_to_disk")) {
bool rt = false;
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.cluster_pgsql_variables_save_to_disk=true;
@ -4967,7 +4967,7 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this
checksum_variables.checksum_pgsql_variables=false;
variables.cluster_pgsql_variables_diffs_before_sync = 0;
GloProxyCluster->cluster_pgsql_variables_diffs_before_sync = 0;
proxy_warning("Disabling deprecated 'admin-checksum_pgsql_variables', setting 'admin-cluster_pgsql_variables_diffs_before_sync=0'\n");
proxy_warning("Disabling 'admin-checksum_pgsql_variables', setting 'admin-cluster_pgsql_variables_diffs_before_sync=0'\n");
return true;
}
return false;

@ -697,43 +697,30 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
while ( _r && (row = mysql_fetch_row(_r))) {
// Data-driven approach: find the matching module and process it
bool module_found = false;
// Search for the module in our data structure and check if it's enabled
for (const auto& module : modules) {
if (strcmp(row[0], module.module_name) == 0) {
// Skip module if not enabled (for modules with optional dependencies like LDAP)
if (module.enabled_check && !module.enabled_check()) {
module_found = true;
break;
}
module_found = true;
break;
}
}
if (module_found) {
// Find the module and get its diff threshold
for (const auto& module : modules) {
if (strcmp(row[0], module.module_name) == 0) {
// Get diff threshold using member pointer with atomic load
unsigned int diff_threshold = (unsigned int)(GloProxyCluster->*(module.diff_member)).load();
// Generate generalized sync message using the actual variable suffix
char sync_msg[256];
const char* var_suffix = module.sync_var_suffix ? module.sync_var_suffix : module.module_name;
snprintf(sync_msg, sizeof(sync_msg), ErrorMessages::DIFFS_BEFORE_SYNC_FORMAT, var_suffix);
process_component_checksum(
row,
*module.local_checksum,
*module.global_checksum,
now, diff_threshold,
sync_msg,
hostname, port
);
break;
}
// Get diff threshold using member pointer with atomic load
unsigned int diff_threshold = (unsigned int)(GloProxyCluster->*(module.diff_member)).load();
// Generate generalized sync message using the actual variable suffix
char sync_msg[256];
const char* var_suffix = module.sync_var_suffix ? module.sync_var_suffix : module.module_name;
snprintf(sync_msg, sizeof(sync_msg), ErrorMessages::DIFFS_BEFORE_SYNC_FORMAT, var_suffix);
process_component_checksum(
row,
*module.local_checksum,
*module.global_checksum,
now, diff_threshold,
sync_msg,
hostname, port
);
break;
}
}
}
@ -1804,7 +1791,7 @@ int ProxySQL_Cluster::fetch_and_store(MYSQL* conn, const fetch_query& f_query, M
proxy_info("%s", msgs[1].c_str());
}
if (f_query.success_counter != p_cluster_counter::metric(-1)) {
if (query_res == 0 && f_query.success_counter != p_cluster_counter::metric(-1)) {
metrics.p_counter_array[f_query.success_counter]->Increment();
}
@ -3076,9 +3063,10 @@ void ProxySQL_Cluster::pull_pgsql_variables_from_peer(const std::string& expecte
// Insert new pgsql variables
MYSQL_ROW row;
char *q = (char *)"INSERT OR REPLACE INTO global_variables (variable_name, variable_value) VALUES (?1 , ?2)";
sqlite3_stmt *statement1 = NULL;
int rc = GloAdmin->admindb->prepare_v2(q, &statement1);
ASSERT_SQLITE_OK(rc, GloAdmin->admindb);
auto [rc1, statement1_unique] = GloAdmin->admindb->prepare_v2(q);
ASSERT_SQLITE_OK(rc1, GloAdmin->admindb);
sqlite3_stmt *statement1 = statement1_unique.get();
int rc;
while ((row = mysql_fetch_row(pgsql_variables_result))) {
rc=(*proxy_sqlite3_bind_text)(statement1, 1, row[0], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, GloAdmin->admindb); // variable_name
@ -4330,47 +4318,7 @@ void ProxySQL_Cluster_Nodes::get_peer_to_sync_pgsql_users(char **host, uint16_t
* @see ProxySQL_Checksum_Value_2::pgsql_query_rules
*/
void ProxySQL_Cluster_Nodes::get_peer_to_sync_pgsql_query_rules(char **host, uint16_t *port, char** ip_address) {
unsigned long long version = 0;
unsigned long long epoch = 0;
unsigned long long max_epoch = 0;
char *hostname = NULL;
char *ip_addr = NULL;
uint16_t p = 0;
unsigned int diff_mu = (unsigned int)GloProxyCluster->cluster_pgsql_query_rules_diffs_before_sync;
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
ProxySQL_Checksum_Value_2 * v = &node->checksums_values.pgsql_query_rules;
if (v->version > 1) {
if ( v->epoch > epoch ) {
max_epoch = v->epoch;
if (v->diff_check >= diff_mu) {
epoch = v->epoch;
version = v->version;
const char* ip = node->get_ipaddress();
if (!safe_update_peer_info(&hostname, &ip_addr, node->get_hostname(), ip)) {
proxy_error("Memory allocation failed while updating pgsql_query_rules peer info\n");
return;
}
p = node->get_port();
}
}
}
it++;
}
if (epoch) {
if (max_epoch > epoch) {
proxy_warning("Cluster: detected a peer with pgsql_query_rules epoch %llu , but not enough diff_check. We won't sync from epoch %llu: temporarily skipping sync\n", max_epoch, epoch);
// Clean up allocated memory using helper function
safe_update_peer_info(&hostname, &ip_addr, NULL, NULL);
}
}
if (hostname) {
*host = hostname;
*port = p;
*ip_address = ip_addr;
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_query_rules version %llu, epoch %llu\n", hostname, p, version, epoch);
proxy_info("Cluster: detected peer %s:%d with pgsql_query_rules version %llu, epoch %llu\n", hostname, p, version, epoch);
}
get_peer_to_sync_variables_module("pgsql_query_rules", host, port, ip_address, nullptr, nullptr);
}
/**
@ -4433,92 +4381,7 @@ void ProxySQL_Cluster_Nodes::get_peer_to_sync_runtime_pgsql_servers(char **host,
*/
void ProxySQL_Cluster_Nodes::get_peer_to_sync_pgsql_servers_v2(char** host, uint16_t* port, char** peer_pgsql_servers_v2_checksum,
char** peer_runtime_pgsql_servers_checksum, char** ip_address) {
unsigned long long version = 0;
unsigned long long epoch = 0;
unsigned long long max_epoch = 0;
char *hostname = NULL;
char *ip_addr = NULL;
uint16_t p = 0;
char *checksum_v2 = NULL;
char *checksum_runtime = NULL;
unsigned int diff_ms = (unsigned int)GloProxyCluster->cluster_pgsql_servers_diffs_before_sync;
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
ProxySQL_Checksum_Value_2 * v = &node->checksums_values.pgsql_servers_v2;
if (v->version > 1) {
if ( v->epoch > epoch ) {
max_epoch = v->epoch;
if (v->diff_check >= diff_ms) {
epoch = v->epoch;
version = v->version;
if (hostname) {
free(hostname);
}
if (ip_addr) {
free(ip_addr);
}
if (checksum_v2) {
free(checksum_v2);
}
if (checksum_runtime) {
free(checksum_runtime);
}
hostname=strdup(node->get_hostname());
const char* ip = node->get_ipaddress();
if (ip)
ip_addr = strdup(ip);
p = node->get_port();
checksum_v2 = strdup(v->checksum);
// Get runtime checksum as well
ProxySQL_Checksum_Value_2 * v_runtime = &node->checksums_values.pgsql_servers;
if (v_runtime->version > 1) {
checksum_runtime = strdup(v_runtime->checksum);
}
}
}
}
it++;
}
if (epoch) {
if (max_epoch > epoch) {
proxy_warning("Cluster: detected a peer with pgsql_servers_v2 epoch %llu , but not enough diff_check. We won't sync from epoch %llu: temporarily skipping sync\n", max_epoch, epoch);
if (hostname) {
free(hostname);
hostname = NULL;
}
if (ip_addr) {
free(ip_addr);
ip_addr = NULL;
}
if (checksum_v2) {
free(checksum_v2);
checksum_v2 = NULL;
}
if (checksum_runtime) {
free(checksum_runtime);
checksum_runtime = NULL;
}
}
}
if (hostname) {
*host = hostname;
*port = p;
*ip_address = ip_addr;
if (peer_pgsql_servers_v2_checksum) {
*peer_pgsql_servers_v2_checksum = checksum_v2;
} else {
if (checksum_v2)
free(checksum_v2);
}
if (peer_runtime_pgsql_servers_checksum) {
*peer_runtime_pgsql_servers_checksum = checksum_runtime;
} else {
if (checksum_runtime)
free(checksum_runtime);
}
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_servers_v2 version %llu, epoch %llu\n", hostname, p, version, epoch);
proxy_info("Cluster: detected peer %s:%d with pgsql_servers_v2 version %llu, epoch %llu\n", hostname, p, version, epoch);
}
get_peer_to_sync_variables_module("pgsql_servers_v2", host, port, ip_address, peer_pgsql_servers_v2_checksum, peer_runtime_pgsql_servers_checksum);
}

@ -270,7 +270,7 @@
"test_cluster1-t" : [ "legacy-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_cluster_sync-t" : [ "legacy-g5","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_cluster_sync_mysql_servers-t" : [ "legacy-g5","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_cluster_sync_pgsql-t" : [ "legacy-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_cluster_sync_pgsql-t" : [ "legacy-g5","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_com_binlog_dump_enables_fast_forward-t" : [ "legacy-binlog-g1" ],
"test_com_register_slave_enables_fast_forward-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_com_reset_connection_com_change_user-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],

@ -606,6 +606,7 @@ int main(int argc, char** argv) {
if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
diag("Failed to connect to primary admin: %s", mysql_error(proxysql_admin));
mysql_close(proxysql_admin);
return exit_status();
}
@ -625,7 +626,11 @@ int main(int argc, char** argv) {
char query[256];
snprintf(query, sizeof(query), t_check_checksum, checksum_name);
MYSQL_QUERY(proxysql_admin, query);
if (mysql_query(proxysql_admin, query)) {
diag("Query failed: %s — %s", query, mysql_error(proxysql_admin));
ok(false, "PostgreSQL checksum '%s' found in runtime_checksums_values", checksum_name);
continue;
}
MYSQL_RES* result = mysql_store_result(proxysql_admin);
if (!result) {
diag("Failed to store result from query: %s", query);
@ -655,33 +660,25 @@ int main(int argc, char** argv) {
ok(res == EXIT_SUCCESS, "PostgreSQL checksum validation passed");
// Test basic PostgreSQL configuration is supported
MYSQL_QUERY(proxysql_admin, "SELECT 1 FROM pgsql_servers LIMIT 1");
MYSQL_RES* pgsql_servers_result = mysql_store_result(proxysql_admin);
ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL servers table is accessible");
if (pgsql_servers_result) {
mysql_free_result(pgsql_servers_result);
}
MYSQL_QUERY(proxysql_admin, "SELECT 1 FROM pgsql_users LIMIT 1");
MYSQL_RES* pgsql_users_result = mysql_store_result(proxysql_admin);
ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL users table is accessible");
if (pgsql_users_result) {
mysql_free_result(pgsql_users_result);
}
MYSQL_QUERY(proxysql_admin, "SELECT 1 FROM pgsql_query_rules LIMIT 1");
MYSQL_RES* pgsql_query_rules_result = mysql_store_result(proxysql_admin);
ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL query rules table is accessible");
if (pgsql_query_rules_result) {
mysql_free_result(pgsql_query_rules_result);
}
// Check cluster variables exist
MYSQL_QUERY(proxysql_admin, "SHOW VARIABLES LIKE 'cluster_pgsql_%'");
MYSQL_RES* pgsql_cluster_vars_result = mysql_store_result(proxysql_admin);
ok(mysql_errno(proxysql_admin) == 0, "PostgreSQL cluster variables are accessible");
if (pgsql_cluster_vars_result) {
mysql_free_result(pgsql_cluster_vars_result);
{
struct table_check {
const char* query;
const char* desc;
};
const table_check checks[] = {
{"SELECT 1 FROM pgsql_servers LIMIT 1", "PostgreSQL servers table is accessible"},
{"SELECT 1 FROM pgsql_users LIMIT 1", "PostgreSQL users table is accessible"},
{"SELECT 1 FROM pgsql_query_rules LIMIT 1", "PostgreSQL query rules table is accessible"},
{"SHOW VARIABLES LIKE 'cluster_pgsql_%'", "PostgreSQL cluster variables are accessible"},
};
for (const auto& check : checks) {
int rc = mysql_query(proxysql_admin, check.query);
if (rc == 0) {
MYSQL_RES* res = mysql_store_result(proxysql_admin);
if (res) mysql_free_result(res);
}
ok(rc == 0, "%s", check.desc);
}
}
{

Loading…
Cancel
Save