diff --git a/include/ProxySQL_Cluster.hpp b/include/ProxySQL_Cluster.hpp index 122f797db..c05d3ddf7 100644 --- a/include/ProxySQL_Cluster.hpp +++ b/include/ProxySQL_Cluster.hpp @@ -205,6 +205,8 @@ */ #define CLUSTER_QUERY_PGSQL_QUERY_RULES_FAST_ROUTING "PROXY_SELECT username, database, flagIN, destination_hostgroup, comment FROM runtime_pgsql_query_rules_fast_routing ORDER BY username, database, flagIN" +#define CLUSTER_QUERY_PGSQL_VARIABLES "PROXY_SELECT variable_name, variable_value FROM runtime_pgsql_variables ORDER BY variable_name" + class ProxySQL_Checksum_Value_2: public ProxySQL_Checksum_Value { public: time_t last_updated; diff --git a/lib/PgSQL_Variables_Validator.cpp b/lib/PgSQL_Variables_Validator.cpp index a0a375279..9ef3780cc 100644 --- a/lib/PgSQL_Variables_Validator.cpp +++ b/lib/PgSQL_Variables_Validator.cpp @@ -638,4 +638,33 @@ const pgsql_variable_validator pgsql_variable_validator_search_path = { .type = VARIABLE_TYPE_STRING, .validate = &pgsql_variable_validate_search_path, .params = {} +}; + +/** + * @brief Validates an integer variable for PostgreSQL. + * + * This function checks if the provided value is a valid integer representation + * and falls within the specified range. The range is defined by the params + * parameter. + * + * @param value The value to validate. + * @param params The parameter structure containing the integer range. + * @param session Unused parameter. + * @param transformed_value If not null, will be set to null. + * @return true if the value is a valid integer representation within the specified range, false otherwise. + */ +bool pgsql_variable_validate_integer(const char* value, const params_t* params, PgSQL_Session* session, char** transformed_value) { + (void)session; + if (transformed_value) *transformed_value = nullptr; + char* end = nullptr; + long num = strtol(value, &end, 10); + if (end == value || *end != '\0') return false; + if (num < params->int_range.min || num > params->int_range.max) return false; + return true; +} + +const pgsql_variable_validator pgsql_variable_validator_integer = { + .type = VARIABLE_TYPE_INT, + .validate = &pgsql_variable_validate_integer, + .params = {} }; \ No newline at end of file diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index 2330afb84..a38582939 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -1006,6 +1006,40 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { } } + // PostgreSQL Variables Sync + if (diff_mv_pgsql) { + v = &checksums_values.pgsql_variables; + unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.pgsql_variables.version,0); + unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.pgsql_variables.epoch,0); + char* own_checksum = __sync_fetch_and_add(&GloVars.checksums_values.pgsql_variables.checksum,0); + const std::string v_exp_checksum { v->checksum }; + + if (v->version > 1) { + if ( + (own_version == 1) // we just booted + || + (v->epoch > own_epoch) // epoch is newer + ) { + if (v->diff_check >= diff_mv_pgsql) { + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch); + proxy_info("Cluster: detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch); + GloProxyCluster->pull_pgsql_variables_from_peer(v_exp_checksum, v->epoch); + } + } + if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_mv_pgsql*10)) == 0)) { + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (diff_mv_pgsql * 10)); + proxy_error("Cluster: detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, v->checksum, own_version, own_epoch, own_checksum, (diff_mv_pgsql*10)); + GloProxyCluster->metrics.p_counter_array[p_cluster_counter::sync_conflict_pgsql_variables_share_epoch]->Increment(); + } + } else { + if (v->diff_check && (v->diff_check % (diff_mv_pgsql*10)) == 0) { + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mv_pgsql * 10)); + proxy_warning("Cluster: detected a peer %s:%d with pgsql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %u checks until LOAD PGSQL VARIABLES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mv_pgsql*10)); + GloProxyCluster->metrics.p_counter_array[p_cluster_counter::sync_delayed_pgsql_variables_version_one]->Increment(); + } + } + } + /** * @brief Computes the checksum from a MySQL resultset in the same we already do in 'SQLite3_result::raw_checksum'. * @details For each received column computing the field length via 'strlen' is required, this is because we @@ -2779,6 +2813,113 @@ __exit_pull_pgsql_users_from_peer: if (fetch_failed == true) sleep(1); } +void ProxySQL_Cluster::pull_pgsql_variables_from_peer(const std::string& expected_checksum, const time_t epoch) { + char * hostname = NULL; + char * ip_address = NULL; + uint16_t port = 0; + bool fetch_failed = false; + pthread_mutex_lock(&GloProxyCluster->update_mysql_variables_mutex); // Reuse mysql_variables mutex for pgsql_variables + nodes.get_peer_to_sync_pgsql_variables(&hostname, &port, &ip_address); + if (hostname) { + cluster_creds_t creds {}; + + MYSQL *conn = mysql_init(NULL); + if (conn==NULL) { + proxy_error("Unable to run mysql_init()\n"); + goto __exit_pull_pgsql_variables_from_peer; + } + + creds = GloProxyCluster->get_credentials(); + if (creds.user.size()) { // do not monitor if the username is empty + // READ/WRITE timeouts were enforced as an attempt to prevent deadlocks in the original + // implementation. They were proven unnecessary, leaving only 'CONNECT_TIMEOUT'. + unsigned int timeout = 1; + mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + { + unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val); + mysql_options(conn, MARIADB_OPT_SSL_KEYLOG_CALLBACK, (void*)proxysql_keylog_write_line_callback); + } + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum.c_str()); + proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum.c_str()); + + MYSQL* rc_conn = mysql_real_connect(conn, ip_address ? ip_address : hostname, creds.user.c_str(), creds.pass.c_str(), NULL, port, NULL, 0); + if (rc_conn == nullptr) { + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_failure]->Increment(); + fetch_failed = true; + goto __exit_pull_pgsql_variables_from_peer; + } + + MySQL_Monitor::update_dns_cache_from_mysql_conn(conn); + + int rc_query = mysql_query(conn, CLUSTER_QUERY_PGSQL_VARIABLES); + if (rc_query == 0) { + MYSQL_RES* pgsql_variables_result = mysql_store_result(conn); + + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d completed\n", hostname, port); + proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d completed\n", hostname, port); + + unique_ptr pgsql_variables_resultset { nullptr }; + const uint64_t variables_raw_checksum = get_global_variables_checksum(pgsql_variables_result, "pgsql", pgsql_variables_resultset); + const string computed_checksum { get_checksum_from_hash(variables_raw_checksum) }; + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Computed checksum for PostgreSQL Variables from peer %s:%d : %s\n", hostname, port, computed_checksum.c_str()); + proxy_info("Cluster: Computed checksum for PostgreSQL Variables from peer %s:%d : %s\n", hostname, port, computed_checksum.c_str()); + + if (expected_checksum == computed_checksum) { + update_global_variables(pgsql_variables_result, "pgsql"); // Reuse update_global_variables for pgsql_variables + mysql_free_result(pgsql_variables_result); + + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Loading to runtime PostgreSQL Variables from peer %s:%d\n", hostname, port); + proxy_info("Cluster: Loading to runtime PostgreSQL Variables from peer %s:%d\n", hostname, port); + + GloAdmin->init_global_variables(std::move(pgsql_variables_resultset), "pgsql", expected_checksum, epoch); + if (GloProxyCluster->cluster_pgsql_variables_save_to_disk == true) { + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port); + proxy_info("Cluster: Saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port); + GloAdmin->flush_pgsql_variables__from_memory_to_disk(); + } else { + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "NOT saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port); + proxy_info("Cluster: NOT saving to disk PostgreSQL Variables from peer %s:%d\n", hostname, port); + } + + metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_success]->Increment(); + } else { + if (pgsql_variables_result) { + mysql_free_result(pgsql_variables_result); + } + + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d failed: Checksum changed from %s to %s\n", + hostname, port, expected_checksum.c_str(), computed_checksum.c_str()); + proxy_info( + "Cluster: Fetching PostgreSQL Variables from peer %s:%d failed: Checksum changed from %s to %s\n", + hostname, port, expected_checksum.c_str(), computed_checksum.c_str() + ); + metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_failure]->Increment(); + fetch_failed = true; + } + } else { + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + proxy_info("Cluster: Fetching PostgreSQL Variables from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + metrics.p_counter_array[p_cluster_counter::pulled_pgsql_variables_failure]->Increment(); + fetch_failed = true; + } + } +__exit_pull_pgsql_variables_from_peer: + if (conn) { + if (conn->net.pvio) { + mysql_close(conn); + } + } + free(hostname); + + if (ip_address) + free(ip_address); + } + pthread_mutex_unlock(&GloProxyCluster->update_mysql_variables_mutex); + if (fetch_failed == true) sleep(1); +} + /** * @brief Pulls PostgreSQL query rules configuration from a cluster peer node. *