feat: implement PostgreSQL variables cluster sync core functionality

Add complete PostgreSQL variables cluster synchronization including:
- CLUSTER_QUERY_PGSQL_VARIABLES constant definition
- pull_pgsql_variables_from_peer() function implementation
- PostgreSQL variables sync decision logic in cluster sync loop
- Comprehensive error handling and metrics integration
- Runtime loading and save-to-disk support
- Interface filtering for cluster_sync_interfaces=false

This addresses the critical gap where PostgreSQL variables were not
being synchronized between ProxySQL cluster nodes, completing the
PostgreSQL module integration into the unified cluster architecture.
fix/postgresql-cluster-sync
Rene Cannao 4 months ago
parent 5af4011c51
commit 1b58c783d6

@ -205,6 +205,8 @@
*/
#define CLUSTER_QUERY_PGSQL_QUERY_RULES_FAST_ROUTING "PROXY_SELECT username, database, flagIN, destination_hostgroup, comment FROM runtime_pgsql_query_rules_fast_routing ORDER BY username, database, flagIN"
#define CLUSTER_QUERY_PGSQL_VARIABLES "PROXY_SELECT variable_name, variable_value FROM runtime_pgsql_variables ORDER BY variable_name"
class ProxySQL_Checksum_Value_2: public ProxySQL_Checksum_Value {
public:
time_t last_updated;

@ -638,4 +638,33 @@ const pgsql_variable_validator pgsql_variable_validator_search_path = {
.type = VARIABLE_TYPE_STRING,
.validate = &pgsql_variable_validate_search_path,
.params = {}
};
/**
* @brief Validates an integer variable for PostgreSQL.
*
* This function checks if the provided value is a valid integer representation
* and falls within the specified range. The range is defined by the params
* parameter.
*
* @param value The value to validate.
* @param params The parameter structure containing the integer range.
* @param session Unused parameter.
* @param transformed_value If not null, will be set to null.
* @return true if the value is a valid integer representation within the specified range, false otherwise.
*/
bool pgsql_variable_validate_integer(const char* value, const params_t* params, PgSQL_Session* session, char** transformed_value) {
(void)session;
if (transformed_value) *transformed_value = nullptr;
char* end = nullptr;
long num = strtol(value, &end, 10);
if (end == value || *end != '\0') return false;
if (num < params->int_range.min || num > params->int_range.max) return false;
return true;
}
const pgsql_variable_validator pgsql_variable_validator_integer = {
.type = VARIABLE_TYPE_INT,
.validate = &pgsql_variable_validate_integer,
.params = {}
};

@ -1006,6 +1006,40 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
}
}
// PostgreSQL Variables Sync
if (diff_mv_pgsql) {
v = &checksums_values.pgsql_variables;
unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.pgsql_variables.version,0);
unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.pgsql_variables.epoch,0);
char* own_checksum = __sync_fetch_and_add(&GloVars.checksums_values.pgsql_variables.checksum,0);
const std::string v_exp_checksum { v->checksum };
if (v->version > 1) {
if (
(own_version == 1) // we just booted
||
(v->epoch > own_epoch) // epoch is newer
) {
if (v->diff_check >= diff_mv_pgsql) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
proxy_info("Cluster: detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_pgsql_variables_from_peer(v_exp_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_mv_pgsql*10)) == 0)) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (diff_mv_pgsql * 10));
proxy_error("Cluster: detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (diff_mv_pgsql*10));
GloProxyCluster->metrics.p_counter_array[p_cluster_counter::sync_conflict_pgsql_variables_share_epoch]->Increment();
}
} else {
if (v->diff_check && (v->diff_check % (diff_mv_pgsql*10)) == 0) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mv_pgsql * 10));
proxy_warning("Cluster: detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mv_pgsql*10));
GloProxyCluster->metrics.p_counter_array[p_cluster_counter::sync_delayed_pgsql_variables_version_one]->Increment();
}
}
}
/**
* @brief Computes the checksum from a MySQL resultset in the same we already do in 'SQLite3_result::raw_checksum'.
* @details For each received column computing the field length via 'strlen' is required, this is because we
@ -2779,6 +2813,113 @@ __exit_pull_pgsql_users_from_peer:
if (fetch_failed == true) sleep(1);
}
void ProxySQL_Cluster::pull_pgsql_variables_from_peer(const std::string& expected_checksum, const time_t epoch) {
char * hostname = NULL;
char * ip_address = NULL;
uint16_t port = 0;
bool fetch_failed = false;
pthread_mutex_lock(&GloProxyCluster->update_mysql_variables_mutex); // Reuse mysql_variables mutex for pgsql_variables
nodes.get_peer_to_sync_pgsql_variables(&hostname, &port, &ip_address);
if (hostname) {
cluster_creds_t creds {};
MYSQL *conn = mysql_init(NULL);
if (conn==NULL) {
proxy_error("Unable to run mysql_init()\n");
goto __exit_pull_pgsql_variables_from_peer;
}
creds = GloProxyCluster->get_credentials();
if (creds.user.size()) { // do not monitor if the username is empty
// READ/WRITE timeouts were enforced as an attempt to prevent deadlocks in the original
// implementation. They were proven unnecessary, leaving only 'CONNECT_TIMEOUT'.
unsigned int timeout = 1;
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
{
unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val);
mysql_options(conn, MARIADB_OPT_SSL_KEYLOG_CALLBACK, (void*)proxysql_keylog_write_line_callback);
}
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum.c_str());
proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum.c_str());
MYSQL* rc_conn = mysql_real_connect(conn, ip_address ? ip_address : hostname, creds.user.c_str(), creds.pass.c_str(), NULL, port, NULL, 0);
if (rc_conn == nullptr) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_failure]->Increment();
fetch_failed = true;
goto __exit_pull_pgsql_variables_from_peer;
}
MySQL_Monitor::update_dns_cache_from_mysql_conn(conn);
int rc_query = mysql_query(conn, CLUSTER_QUERY_PGSQL_VARIABLES);
if (rc_query == 0) {
MYSQL_RES* pgsql_variables_result = mysql_store_result(conn);
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d completed\n", hostname, port);
unique_ptr<SQLite3_result> pgsql_variables_resultset { nullptr };
const uint64_t variables_raw_checksum = get_global_variables_checksum(pgsql_variables_result, "pgsql", pgsql_variables_resultset);
const string computed_checksum { get_checksum_from_hash(variables_raw_checksum) };
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Computed checksum for PostgreSQL Variables from peer %s:%d : %s\n", hostname, port, computed_checksum.c_str());
proxy_info("Cluster: Computed checksum for PostgreSQL Variables from peer %s:%d : %s\n", hostname, port, computed_checksum.c_str());
if (expected_checksum == computed_checksum) {
update_global_variables(pgsql_variables_result, "pgsql"); // Reuse update_global_variables for pgsql_variables
mysql_free_result(pgsql_variables_result);
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Loading to runtime PostgreSQL Variables from peer %s:%d\n", hostname, port);
proxy_info("Cluster: Loading to runtime PostgreSQL Variables from peer %s:%d\n", hostname, port);
GloAdmin->init_global_variables(std::move(pgsql_variables_resultset), "pgsql", expected_checksum, epoch);
if (GloProxyCluster->cluster_pgsql_variables_save_to_disk == true) {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port);
proxy_info("Cluster: Saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port);
GloAdmin->flush_pgsql_variables__from_memory_to_disk();
} else {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "NOT saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port);
proxy_info("Cluster: NOT saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port);
}
metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_success]->Increment();
} else {
if (pgsql_variables_result) {
mysql_free_result(pgsql_variables_result);
}
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d failed: Checksum changed from %s to %s\n",
hostname, port, expected_checksum.c_str(), computed_checksum.c_str());
proxy_info(
"Cluster: Fetching PostgreSQL Variables from peer %s:%d failed: Checksum changed from %s to %s\n",
hostname, port, expected_checksum.c_str(), computed_checksum.c_str()
);
metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_failure]->Increment();
fetch_failed = true;
}
} else {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_failure]->Increment();
fetch_failed = true;
}
}
__exit_pull_pgsql_variables_from_peer:
if (conn) {
if (conn->net.pvio) {
mysql_close(conn);
}
}
free(hostname);
if (ip_address)
free(ip_address);
}
pthread_mutex_unlock(&GloProxyCluster->update_mysql_variables_mutex);
if (fetch_failed == true) sleep(1);
}
/**
* @brief Pulls PostgreSQL query rules configuration from a cluster peer node.
*

Loading…
Cancel
Save