refactor: unify ChecksumModuleInfo and SyncModuleConfig structures

Complete architectural unification by eliminating redundant SyncModuleConfig
structure and extending ChecksumModuleInfo to include all sync decision fields.
This final unification removes architectural duplication and creates a single
comprehensive configuration structure for all cluster sync operations.

Key improvements:
- Eliminated redundant SyncModuleConfig structure entirely
- Extended ChecksumModuleInfo with sync decision fields (sync_command,
  load_runtime_command, sync_conflict_counter, sync_delayed_counter)
- Added sync_enabled_modules unordered_set for selective processing
- Simplified loop to iterate through unified modules array
- Reduced architectural complexity while maintaining functionality
- Added #include <unordered_set> header for std::unordered_set support

All sync operations now use consistent data-driven architecture with
enabled_check() pattern for conditional module dependencies.
fix/postgresql-cluster-sync
Rene Cannao 6 months ago
parent c2a05aebe4
commit c97cca0d31

@ -1,4 +1,5 @@
#include <utility>
#include <unordered_set>
#include "proxysql.h"
#include "proxysql_utils.h"
@ -533,30 +534,57 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
pthread_mutex_lock(&GloVars.checksum_mutex);
// Data-driven mapping of module names to their checksum fields and diff thresholds
// Data-driven mapping of module names to their checksum fields and sync configuration
struct ChecksumModuleInfo {
const char* module_name;
ProxySQL_Checksum_Value_2* local_checksum;
ProxySQL_Checksum_Value* global_checksum;
std::atomic<int> ProxySQL_Cluster::*diff_member;
// Sync decision fields (used only for modules that need special sync processing)
const char* sync_command; // "admin", "mysql", "ldap" for pull_global_variables_from_peer()
const char* load_runtime_command; // Command name for warning messages
int sync_conflict_counter; // Counter for epoch conflicts
int sync_delayed_counter; // Counter for version=1 delays
bool (*enabled_check)(); // Function to check if module is enabled (nullptr for always enabled)
};
// Initialize all supported modules with their respective checksum field pointers
ChecksumModuleInfo modules[] = {
{"admin_variables", &checksums_values.admin_variables, &GloVars.checksums_values.admin_variables, &ProxySQL_Cluster::cluster_admin_variables_diffs_before_sync, nullptr},
{"mysql_query_rules", &checksums_values.mysql_query_rules, &GloVars.checksums_values.mysql_query_rules, &ProxySQL_Cluster::cluster_mysql_query_rules_diffs_before_sync, nullptr},
{"mysql_servers", &checksums_values.mysql_servers, &GloVars.checksums_values.mysql_servers, &ProxySQL_Cluster::cluster_mysql_servers_diffs_before_sync, nullptr},
{"mysql_servers_v2", &checksums_values.mysql_servers_v2, &GloVars.checksums_values.mysql_servers_v2, &ProxySQL_Cluster::cluster_mysql_servers_diffs_before_sync, nullptr},
{"mysql_users", &checksums_values.mysql_users, &GloVars.checksums_values.mysql_users, &ProxySQL_Cluster::cluster_mysql_users_diffs_before_sync, nullptr},
{"mysql_variables", &checksums_values.mysql_variables, &GloVars.checksums_values.mysql_variables, &ProxySQL_Cluster::cluster_mysql_variables_diffs_before_sync, nullptr},
{"proxysql_servers", &checksums_values.proxysql_servers, &GloVars.checksums_values.proxysql_servers, &ProxySQL_Cluster::cluster_proxysql_servers_diffs_before_sync, nullptr},
{"ldap_variables", &checksums_values.ldap_variables, &GloVars.checksums_values.ldap_variables, &ProxySQL_Cluster::cluster_ldap_variables_diffs_before_sync, []() { return GloMyLdapAuth != nullptr; }},
{"pgsql_query_rules", &checksums_values.pgsql_query_rules, &GloVars.checksums_values.pgsql_query_rules, &ProxySQL_Cluster::cluster_pgsql_query_rules_diffs_before_sync, nullptr},
{"pgsql_servers", &checksums_values.pgsql_servers, &GloVars.checksums_values.pgsql_servers, &ProxySQL_Cluster::cluster_pgsql_servers_diffs_before_sync, nullptr},
{"pgsql_servers_v2", &checksums_values.pgsql_servers_v2, &GloVars.checksums_values.pgsql_servers_v2, &ProxySQL_Cluster::cluster_pgsql_servers_diffs_before_sync, nullptr},
{"pgsql_users", &checksums_values.pgsql_users, &GloVars.checksums_values.pgsql_users, &ProxySQL_Cluster::cluster_pgsql_users_diffs_before_sync, nullptr},
{"pgsql_variables", &checksums_values.pgsql_variables, &GloVars.checksums_values.pgsql_variables, &ProxySQL_Cluster::cluster_pgsql_variables_diffs_before_sync, nullptr}
{"admin_variables", &checksums_values.admin_variables, &GloVars.checksums_values.admin_variables, &ProxySQL_Cluster::cluster_admin_variables_diffs_before_sync,
"admin", "LOAD ADMIN VARIABLES TO RUNTIME",
static_cast<int>(p_cluster_counter::sync_conflict_admin_variables_share_epoch),
static_cast<int>(p_cluster_counter::sync_delayed_admin_variables_version_one), nullptr},
{"mysql_query_rules", &checksums_values.mysql_query_rules, &GloVars.checksums_values.mysql_query_rules, &ProxySQL_Cluster::cluster_mysql_query_rules_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"mysql_servers", &checksums_values.mysql_servers, &GloVars.checksums_values.mysql_servers, &ProxySQL_Cluster::cluster_mysql_servers_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"mysql_servers_v2", &checksums_values.mysql_servers_v2, &GloVars.checksums_values.mysql_servers_v2, &ProxySQL_Cluster::cluster_mysql_servers_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"mysql_users", &checksums_values.mysql_users, &GloVars.checksums_values.mysql_users, &ProxySQL_Cluster::cluster_mysql_users_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"mysql_variables", &checksums_values.mysql_variables, &GloVars.checksums_values.mysql_variables, &ProxySQL_Cluster::cluster_mysql_variables_diffs_before_sync,
"mysql", "LOAD MYSQL VARIABLES TO RUNTIME",
static_cast<int>(p_cluster_counter::sync_conflict_mysql_variables_share_epoch),
static_cast<int>(p_cluster_counter::sync_delayed_mysql_variables_version_one), nullptr},
{"proxysql_servers", &checksums_values.proxysql_servers, &GloVars.checksums_values.proxysql_servers, &ProxySQL_Cluster::cluster_proxysql_servers_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"ldap_variables", &checksums_values.ldap_variables, &GloVars.checksums_values.ldap_variables, &ProxySQL_Cluster::cluster_ldap_variables_diffs_before_sync,
"ldap", "LOAD LDAP VARIABLES TO RUNTIME",
static_cast<int>(p_cluster_counter::sync_conflict_ldap_variables_share_epoch),
static_cast<int>(p_cluster_counter::sync_delayed_ldap_variables_version_one),
[]() { return GloMyLdapAuth != nullptr; }},
{"pgsql_query_rules", &checksums_values.pgsql_query_rules, &GloVars.checksums_values.pgsql_query_rules, &ProxySQL_Cluster::cluster_pgsql_query_rules_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"pgsql_servers", &checksums_values.pgsql_servers, &GloVars.checksums_values.pgsql_servers, &ProxySQL_Cluster::cluster_pgsql_servers_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"pgsql_servers_v2", &checksums_values.pgsql_servers_v2, &GloVars.checksums_values.pgsql_servers_v2, &ProxySQL_Cluster::cluster_pgsql_servers_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"pgsql_users", &checksums_values.pgsql_users, &GloVars.checksums_values.pgsql_users, &ProxySQL_Cluster::cluster_pgsql_users_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr},
{"pgsql_variables", &checksums_values.pgsql_variables, &GloVars.checksums_values.pgsql_variables, &ProxySQL_Cluster::cluster_pgsql_variables_diffs_before_sync,
nullptr, nullptr, 0, 0, nullptr}
};
while ( _r && (row = mysql_fetch_row(_r))) {
@ -625,43 +653,34 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
// note that this is done outside the critical section
// as mutex on GloVars.checksum_mutex is already released
// Data-driven approach for sync decisions of admin_variables, mysql_variables, and ldap_variables
struct SyncModuleConfig {
const char* name;
unsigned int diff_threshold;
const char* sync_command;
const char* load_runtime_command;
int sync_conflict_counter;
int sync_delayed_counter;
ProxySQL_Checksum_Value_2* local_checksum;
ProxySQL_Checksum_Value* global_checksum;
bool (*enabled_check)(); // Function to check if module is enabled (nullptr for always enabled)
// Set of modules that need special sync decision processing (admin_variables, mysql_variables, ldap_variables)
const std::unordered_set<std::string> sync_enabled_modules = {
"admin_variables",
"mysql_variables",
"ldap_variables"
};
SyncModuleConfig sync_modules[] = {
{"admin_variables", diff_av, "admin", "LOAD ADMIN VARIABLES TO RUNTIME",
static_cast<int>(p_cluster_counter::sync_conflict_admin_variables_share_epoch),
static_cast<int>(p_cluster_counter::sync_delayed_admin_variables_version_one),
&checksums_values.admin_variables, &GloVars.checksums_values.admin_variables, nullptr},
{"mysql_variables", diff_mv, "mysql", "LOAD MYSQL VARIABLES TO RUNTIME",
static_cast<int>(p_cluster_counter::sync_conflict_mysql_variables_share_epoch),
static_cast<int>(p_cluster_counter::sync_delayed_mysql_variables_version_one),
&checksums_values.mysql_variables, &GloVars.checksums_values.mysql_variables, nullptr},
{"ldap_variables", diff_lv, "ldap", "LOAD LDAP VARIABLES TO RUNTIME",
static_cast<int>(p_cluster_counter::sync_conflict_ldap_variables_share_epoch),
static_cast<int>(p_cluster_counter::sync_delayed_ldap_variables_version_one),
&checksums_values.ldap_variables, &GloVars.checksums_values.ldap_variables,
[]() { return GloMyLdapAuth != nullptr; }}
};
// Process sync decisions for modules that need special sync processing
for (const auto& module : modules) {
// Only process modules that are in the sync_enabled_modules set
if (sync_enabled_modules.find(module.module_name) == sync_enabled_modules.end()) {
continue;
}
// Process sync decisions for admin_variables, mysql_variables, and ldap_variables using loop
for (const auto& module : sync_modules) {
// Skip module if not enabled (for modules with optional dependencies like LDAP)
if (module.enabled_check && !module.enabled_check()) {
continue;
}
if (module.diff_threshold > 0) {
// Skip modules that don't have sync configuration (those with nullptr sync_command)
if (!module.sync_command) {
continue;
}
// Get diff threshold using member pointer with atomic load
unsigned int diff_threshold = (unsigned int)(GloProxyCluster->*(module.diff_member)).load();
if (diff_threshold > 0) {
ProxySQL_Checksum_Value_2 *v = module.local_checksum;
unsigned long long own_version = __sync_fetch_and_add(&module.global_checksum->version, 0);
unsigned long long own_epoch = __sync_fetch_and_add(&module.global_checksum->epoch, 0);
@ -674,21 +693,21 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
||
(v->epoch > own_epoch) // epoch is newer
) {
if (v->diff_check >= module.diff_threshold) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, module.name, v->version, v->epoch, v->diff_check, own_version, own_epoch);
proxy_info("Cluster: detected a peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, module.name, v->version, v->epoch, v->diff_check, own_version, own_epoch);
if (v->diff_check >= diff_threshold) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, module.module_name, v->version, v->epoch, v->diff_check, own_version, own_epoch);
proxy_info("Cluster: detected a peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, module.module_name, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_global_variables_from_peer(module.sync_command, expected_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (module.diff_threshold*10)) == 0)) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with %s version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.name, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (module.diff_threshold * 10), module.load_runtime_command);
proxy_error("Cluster: detected a peer %s:%d with %s version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.name, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (module.diff_threshold*10), module.load_runtime_command);
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_threshold*10)) == 0)) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with %s version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.module_name, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (diff_threshold * 10), module.load_runtime_command);
proxy_error("Cluster: detected a peer %s:%d with %s version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.module_name, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (diff_threshold*10), module.load_runtime_command);
GloProxyCluster->metrics.p_counter_array[module.sync_conflict_counter]->Increment();
}
} else {
if (v->diff_check && (v->diff_check % (module.diff_threshold*10)) == 0) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.name, v->version, v->epoch, v->diff_check, own_version, own_epoch, (module.diff_threshold * 10), module.load_runtime_command);
proxy_warning("Cluster: detected a peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.name, v->version, v->epoch, v->diff_check, own_version, own_epoch, (module.diff_threshold*10), module.load_runtime_command);
if (v->diff_check && (v->diff_check % (diff_threshold*10)) == 0) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.module_name, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_threshold * 10), module.load_runtime_command);
proxy_warning("Cluster: detected a peer %s:%d with %s version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until %s is executed on candidate master.\n", hostname, port, module.module_name, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_threshold*10), module.load_runtime_command);
GloProxyCluster->metrics.p_counter_array[module.sync_delayed_counter]->Increment();
}
}

Loading…
Cancel
Save