Perform atomic update of Cluster fetched module checksum epochs

pull/3921/head
Javier Jaramago Fernández 4 years ago
parent e6083250b5
commit 3dacb77321

@ -538,7 +538,7 @@ class MySQL_HostGroups_Manager {
void wrlock();
void wrunlock();
int servers_add(SQLite3_result *resultset);
bool commit();
bool commit(const std::string& checksum = "", const time_t epoch = 0);
void set_incoming_replication_hostgroups(SQLite3_result *);
void set_incoming_group_replication_hostgroups(SQLite3_result *);

@ -385,9 +385,9 @@ class ProxySQL_Cluster {
void p_update_metrics();
void thread_ending(pthread_t);
void join_term_thread();
void pull_mysql_query_rules_from_peer(const std::string& expected_checksum);
void pull_mysql_servers_from_peer();
void pull_mysql_users_from_peer(const std::string& expected_checksum);
void pull_mysql_query_rules_from_peer(const std::string& expected_checksum, const time_t epoch);
void pull_mysql_servers_from_peer(const std::string& expected_checksum, const time_t epoch);
void pull_mysql_users_from_peer(const std::string& expected_checksum, const time_t epoch);
/**
* @brief Pulls from peer the specified global variables by the type parameter.
* @param type A string specifying the type of global variables to pull from the peer, supported
@ -395,7 +395,7 @@ class ProxySQL_Cluster {
* - 'mysql'.
* - 'admin'.
*/
void pull_global_variables_from_peer(const std::string& type, const std::string& expected_checksum);
void pull_proxysql_servers_from_peer(const char *expected_checksum);
void pull_global_variables_from_peer(const std::string& type, const std::string& expected_checksum, const time_t epoch);
void pull_proxysql_servers_from_peer(const std::string& expected_checksum, const time_t epoch);
};
#endif /* CLASS_PROXYSQL_CLUSTER_H */

@ -235,15 +235,15 @@ class ProxySQL_Admin {
void __add_active_users(enum cred_username_type usertype, char *user=NULL, uint64_t *hash1 = NULL);
void __delete_inactive_users(enum cred_username_type usertype);
void add_admin_users();
void __refresh_users();
void __refresh_users(const std::string& checksum = "", const time_t epoch = 0);
void __add_active_users_ldap();
void flush_mysql_variables___runtime_to_database(SQLite3DB *db, bool replace, bool del, bool onlyifempty, bool runtime=false, bool use_lock=true);
void flush_mysql_variables___database_to_runtime(SQLite3DB *db, bool replace);
void flush_mysql_variables___database_to_runtime(SQLite3DB *db, bool replace, const std::string& checksum = "", const time_t epoch = 0);
char **get_variables_list();
bool set_variable(char *name, char *value);
void flush_admin_variables___database_to_runtime(SQLite3DB *db, bool replace);
void flush_admin_variables___database_to_runtime(SQLite3DB *db, bool replace, const std::string& checksum = "", const time_t epoch = 0);
void flush_admin_variables___runtime_to_database(SQLite3DB *db, bool replace, bool del, bool onlyifempty, bool runtime=false);
void disk_upgrade_mysql_query_rules();
void disk_upgrade_mysql_servers();
@ -273,7 +273,7 @@ class ProxySQL_Admin {
// LDAP
void flush_ldap_variables___runtime_to_database(SQLite3DB *db, bool replace, bool del, bool onlyifempty, bool runtime=false);
void flush_ldap_variables___database_to_runtime(SQLite3DB *db, bool replace);
void flush_ldap_variables___database_to_runtime(SQLite3DB *db, bool replace, const std::string& checksum = "", const time_t epoch = 0);
public:
pthread_mutex_t sql_query_global_mutex;
@ -306,7 +306,7 @@ class ProxySQL_Admin {
bool get_read_only() { return variables.admin_read_only; }
bool set_read_only(bool ro) { variables.admin_read_only=ro; return variables.admin_read_only; }
bool has_variable(const char *name);
void init_users();
void init_users(const std::string& checksum = "", const time_t epoch = 0);
void init_mysql_servers();
void init_mysql_query_rules();
void init_mysql_firewall();
@ -339,9 +339,33 @@ class ProxySQL_Admin {
// void flush_admin_variables__from_disk_to_memory(); // commented in 2.3 because unused
void flush_admin_variables__from_memory_to_disk();
void flush_ldap_variables__from_memory_to_disk();
void load_mysql_servers_to_runtime();
void load_mysql_servers_to_runtime(const std::string& checksum = "", const time_t epoch = 0);
void save_mysql_servers_from_runtime();
char * load_mysql_query_rules_to_runtime(SQLite3_result *SQLite3_query_rules_resultset=NULL, SQLite3_result *SQLite3_query_rules_fast_routing_resultset=NULL);
/**
* @brief Performs the load to runtime of the current configuration in 'main' for 'mysql_query_rules' and
* 'mysql_query_rules_fast_routing' and computes the 'mysql_query_rules' module checksum.
*
* @param SQLite3_query_rules_resultset If this parameter is provided, current rows on
* 'mysql_query_rules' are not queried, instead, the contents of the resultset are used. Must
* be the outcome of query 'CLUSTER_QUERY_MYSQL_QUERY_RULES', it's UNSAFE to supply other
* resultset to the function.
* @param SQLite3_query_rules_fast_routing_resultset If this parameter is provided, current rows on
* 'mysql_query_rules_fast_routing' are not queried, instead, the contents of the resultset are used. Must
* be the outcome of query 'CLUSTER_QUERY_MYSQL_QUERY_RULES_FAST_ROUTING', it's UNSAFE to supply other
* resultset to the function.
* @param checksum When used, this parameter must match several requirements depending on the other
* supplied parameters:
* - If the previous two resultset parameters are supplied to this function, this parameter MUST BE the
* already computed checksum from both resultsets combined.
* - When used in combination with the epoch parameter, if the checksum computed for the values from
* tables 'mysql_query_rules' and 'mysql_query_rules_fast_routing' matches this supplied checksum, the
* epoch of 'GloVars.checksums_values.mysql_query_rules.epoch' is updated to be the supplied epoch.
* @param epoch When 'checksum' parameter is supplied, this is the epoch to which the computed checksum
* is to be updated if it matches 'checksum' parameter.
*
* @return Error message in case of not being able to perform the operation, 'NULL' otherwise.
*/
char* load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_query_rules_resultset=NULL, SQLite3_result* SQLite3_query_rules_fast_routing_resultset=NULL, const std::string& checksum = "", const time_t epoch = 0);
void save_mysql_query_rules_from_runtime(bool);
void save_mysql_query_rules_fast_routing_from_runtime(bool);
char * load_mysql_firewall_to_runtime();
@ -355,12 +379,12 @@ class ProxySQL_Admin {
void flush_scheduler__from_memory_to_disk();
void flush_scheduler__from_disk_to_memory();
void load_admin_variables_to_runtime() { flush_admin_variables___database_to_runtime(admindb, true); }
void load_admin_variables_to_runtime(const std::string& checksum = "", const time_t epoch = 0) { flush_admin_variables___database_to_runtime(admindb, true, checksum, epoch); }
void save_admin_variables_from_runtime() { flush_admin_variables___runtime_to_database(admindb, true, true, false); }
void load_or_update_global_settings(SQLite3DB *);
void load_mysql_variables_to_runtime() { flush_mysql_variables___database_to_runtime(admindb, true); }
void load_mysql_variables_to_runtime(const std::string& checksum = "", const time_t epoch = 0) { flush_mysql_variables___database_to_runtime(admindb, true, checksum, epoch); }
void save_mysql_variables_from_runtime() { flush_mysql_variables___runtime_to_database(admindb, true, true, false); }
void p_update_metrics();
@ -406,7 +430,7 @@ class ProxySQL_Admin {
void flush_configdb(); // 923
// Cluster
void load_proxysql_servers_to_runtime(bool _lock=true);
void load_proxysql_servers_to_runtime(bool _lock=true, const std::string& checksum = "", const time_t epoch = 0);
void flush_proxysql_servers__from_memory_to_disk();
void flush_proxysql_servers__from_disk_to_memory();
void save_proxysql_servers_runtime_to_database(bool);
@ -414,7 +438,7 @@ class ProxySQL_Admin {
// LDAP
void init_ldap_variables();
void load_ldap_variables_to_runtime() { flush_ldap_variables___database_to_runtime(admindb, true); }
void load_ldap_variables_to_runtime(const std::string& checksum = "", const time_t epoch = 0) { flush_ldap_variables___database_to_runtime(admindb, true, checksum, epoch); }
void save_ldap_variables_from_runtime() { flush_ldap_variables___runtime_to_database(admindb, true, true, false); }
void save_mysql_ldap_mapping_runtime_to_database(bool);

@ -1644,7 +1644,7 @@ SQLite3_result * MySQL_HostGroups_Manager::execute_query(char *query, char **err
return resultset;
}
bool MySQL_HostGroups_Manager::commit() {
bool MySQL_HostGroups_Manager::commit(const std::string& checksum, const time_t epoch) {
unsigned long long curtime1=monotonic_time();
wrlock();
@ -2051,7 +2051,11 @@ bool MySQL_HostGroups_Manager::commit() {
//struct timespec ts;
//clock_gettime(CLOCK_REALTIME, &ts);
time_t t = time(NULL);
GloVars.checksums_values.mysql_servers.epoch = t;
if (epoch != 0 && checksum != "" && GloVars.checksums_values.mysql_servers.checksum == checksum) {
GloVars.checksums_values.mysql_servers.epoch = epoch;
} else {
GloVars.checksums_values.mysql_servers.epoch = t;
}
GloVars.checksums_values.updates_cnt++;
GloVars.generate_global_checksum();
GloVars.epoch_version = t;

@ -6397,7 +6397,7 @@ void ProxySQL_Admin::load_or_update_global_settings(SQLite3DB *db) {
}
}
void ProxySQL_Admin::flush_admin_variables___database_to_runtime(SQLite3DB *db, bool replace) {
void ProxySQL_Admin::flush_admin_variables___database_to_runtime(SQLite3DB *db, bool replace, const string& checksum, const time_t epoch) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Flushing ADMIN variables. Replace:%d\n", replace);
char *error=NULL;
int cols=0;
@ -6464,11 +6464,19 @@ void ProxySQL_Admin::flush_admin_variables___database_to_runtime(SQLite3DB *db,
GloVars.checksums_values.admin_variables.set_checksum(buf);
GloVars.checksums_values.admin_variables.version++;
time_t t = time(NULL);
GloVars.checksums_values.admin_variables.epoch = t;
if (epoch != 0 && checksum != "" && GloVars.checksums_values.admin_variables.checksum == checksum) {
GloVars.checksums_values.admin_variables.epoch = epoch;
} else {
GloVars.checksums_values.admin_variables.epoch = t;
}
GloVars.epoch_version = t;
GloVars.generate_global_checksum();
GloVars.checksums_values.updates_cnt++;
pthread_mutex_unlock(&GloVars.checksum_mutex);
proxy_info(
"Computed checksum for 'LOAD ADMIN VARIABLES TO RUNTIME' was '%s', with epoch '%d'\n",
GloVars.checksums_values.admin_variables.checksum, GloVars.checksums_values.admin_variables.epoch
);
delete resultset;
}
wrunlock();
@ -6631,7 +6639,7 @@ void ProxySQL_Admin::flush_admin_variables___database_to_runtime(SQLite3DB *db,
if (resultset) delete resultset;
}
void ProxySQL_Admin::flush_mysql_variables___database_to_runtime(SQLite3DB *db, bool replace) {
void ProxySQL_Admin::flush_mysql_variables___database_to_runtime(SQLite3DB *db, bool replace, const std::string& checksum, const time_t epoch) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Flushing MySQL variables. Replace:%d\n", replace);
char *error=NULL;
int cols=0;
@ -6805,13 +6813,21 @@ void ProxySQL_Admin::flush_mysql_variables___database_to_runtime(SQLite3DB *db,
GloVars.checksums_values.mysql_variables.set_checksum(buf);
GloVars.checksums_values.mysql_variables.version++;
time_t t = time(NULL);
GloVars.checksums_values.mysql_variables.epoch = t;
if (epoch != 0 && checksum != "" && GloVars.checksums_values.mysql_variables.checksum == checksum) {
GloVars.checksums_values.mysql_variables.epoch = epoch;
} else {
GloVars.checksums_values.mysql_variables.epoch = t;
}
GloVars.epoch_version = t;
GloVars.generate_global_checksum();
GloVars.checksums_values.updates_cnt++;
pthread_mutex_unlock(&GloVars.checksum_mutex);
delete resultset;
}
proxy_info(
"Computed checksum for 'LOAD MYSQL VARIABLES TO RUNTIME' was '%s', with epoch '%d'\n",
GloVars.checksums_values.mysql_variables.checksum, GloVars.checksums_values.mysql_variables.epoch
);
}
if (resultset) delete resultset;
}
@ -7392,7 +7408,7 @@ void ProxySQL_Admin::flush_mysql_variables___runtime_to_database(SQLite3DB *db,
free(varnames);
}
void ProxySQL_Admin::flush_ldap_variables___database_to_runtime(SQLite3DB *db, bool replace) {
void ProxySQL_Admin::flush_ldap_variables___database_to_runtime(SQLite3DB *db, bool replace, const std::string& checksum, const time_t epoch) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Flushing LDAP variables. Replace:%d\n", replace);
if (GloMyLdapAuth == NULL) {
return;
@ -7460,13 +7476,21 @@ void ProxySQL_Admin::flush_ldap_variables___database_to_runtime(SQLite3DB *db, b
GloVars.checksums_values.ldap_variables.set_checksum(buf);
GloVars.checksums_values.ldap_variables.version++;
time_t t = time(NULL);
GloVars.checksums_values.ldap_variables.epoch = t;
if (epoch != 0 && checksum != "" && GloVars.checksums_values.ldap_variables.checksum == checksum) {
GloVars.checksums_values.ldap_variables.epoch = epoch;
} else {
GloVars.checksums_values.ldap_variables.epoch = t;
}
GloVars.epoch_version = t;
GloVars.generate_global_checksum();
GloVars.checksums_values.updates_cnt++;
pthread_mutex_unlock(&GloVars.checksum_mutex);
delete resultset;
}
proxy_info(
"Computed checksum for 'LOAD LDAP VARIABLES TO RUNTIME' was '%s', with epoch '%d'\n",
GloVars.checksums_values.ldap_variables.checksum, GloVars.checksums_values.ldap_variables.epoch
);
}
if (resultset) delete resultset;
}
@ -10457,9 +10481,9 @@ void ProxySQL_Admin::__attach_db(SQLite3DB *db1, SQLite3DB *db2, char *alias) {
}
void ProxySQL_Admin::init_users() {
void ProxySQL_Admin::init_users(const std::string& checksum, const time_t epoch) {
pthread_mutex_lock(&users_mutex);
__refresh_users();
__refresh_users(checksum, epoch);
pthread_mutex_unlock(&users_mutex);
}
@ -10499,7 +10523,7 @@ void ProxySQL_Admin::add_admin_users() {
#endif /* DEBUG */
}
void ProxySQL_Admin::__refresh_users() {
void ProxySQL_Admin::__refresh_users(const std::string& checksum, const time_t epoch) {
bool calculate_checksum = false;
if (checksum_variables.checksum_mysql_users) {
calculate_checksum = true;
@ -10540,12 +10564,20 @@ void ProxySQL_Admin::__refresh_users() {
GloVars.checksums_values.mysql_users.set_checksum(buf);
GloVars.checksums_values.mysql_users.version++;
time_t t = time(NULL);
GloVars.checksums_values.mysql_users.epoch = t;
if (epoch != 0 && checksum != "" && GloVars.checksums_values.mysql_users.checksum == checksum) {
GloVars.checksums_values.mysql_users.epoch = epoch;
} else {
GloVars.checksums_values.mysql_users.epoch = t;
}
GloVars.epoch_version = t;
GloVars.generate_global_checksum();
GloVars.checksums_values.updates_cnt++;
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
proxy_info(
"Computed checksum for 'LOAD MYSQL USERS TO RUNTIME' was '%s', with epoch '%d'\n",
GloVars.checksums_values.mysql_users.checksum, GloVars.checksums_values.mysql_users.epoch
);
}
#ifdef PROXYSQLCLICKHOUSE
@ -11684,7 +11716,7 @@ void ProxySQL_Admin::load_scheduler_to_runtime() {
resultset=NULL;
}
void ProxySQL_Admin::load_mysql_servers_to_runtime() {
void ProxySQL_Admin::load_mysql_servers_to_runtime(const std::string& checksum, const time_t epoch) {
// make sure that the caller has called mysql_servers_wrlock()
char *error=NULL;
int cols=0;
@ -11819,7 +11851,7 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() {
MyHGM->set_incoming_aws_aurora_hostgroups(resultset_aws_aurora);
}
// commit all the changes
MyHGM->commit();
MyHGM->commit(checksum, epoch);
GloAdmin->save_mysql_servers_runtime_to_database(true);
// clean up
@ -11899,7 +11931,7 @@ char * ProxySQL_Admin::load_mysql_firewall_to_runtime() {
return NULL;
}
char * ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result *SQLite3_query_rules_resultset, SQLite3_result *SQLite3_query_rules_fast_routing_resultset) {
char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_query_rules_resultset, SQLite3_result* SQLite3_query_rules_fast_routing_resultset, const std::string& checksum, const time_t epoch) {
// About the queries used here, see notes about CLUSTER_QUERY_MYSQL_QUERY_RULES and
// CLUSTER_QUERY_MYSQL_QUERY_RULES_FAST_ROUTING in ProxySQL_Cluster.hpp
char *error=NULL;
@ -11938,21 +11970,52 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result *SQLite3
#endif // BENCHMARK_FASTROUTING_LOAD
if (checksum_variables.checksum_mysql_query_rules) {
pthread_mutex_lock(&GloVars.checksum_mutex);
uint64_t hash1 = resultset->raw_checksum();
uint64_t hash2 = resultset2->raw_checksum();
hash1 += hash2;
uint32_t d32[2];
char* buff = nullptr;
char buf[20];
memcpy(&d32, &hash1, sizeof(hash1));
sprintf(buf,"0x%0X%0X", d32[0], d32[1]);
GloVars.checksums_values.mysql_query_rules.set_checksum(buf);
// If both the resultsets are supplied, then the supplied checksum is the already computed one.
if (
SQLite3_query_rules_resultset == nullptr ||
SQLite3_query_rules_fast_routing_resultset == nullptr
) {
uint64_t hash1 = resultset->raw_checksum();
uint64_t hash2 = resultset2->raw_checksum();
hash1 += hash2;
uint32_t d32[2];
memcpy(&d32, &hash1, sizeof(hash1));
sprintf(buf,"0x%0X%0X", d32[0], d32[1]);
buff = buf;
} else {
buff = const_cast<char*>(checksum.c_str());
}
GloVars.checksums_values.mysql_query_rules.set_checksum(buff);
GloVars.checksums_values.mysql_query_rules.version++;
time_t t = time(NULL);
GloVars.checksums_values.mysql_query_rules.epoch = t;
// Since the supplied checksum is the already computed one and both resultset are
// supplied there is no need for comparsion, because we will be comparing it with itself.
bool same_checksum =
SQLite3_query_rules_resultset != nullptr &&
SQLite3_query_rules_fast_routing_resultset != nullptr;
bool matching_checksums =
same_checksum || (GloVars.checksums_values.mysql_query_rules.checksum == checksum);
if (epoch != 0 && checksum != "" && matching_checksums) {
GloVars.checksums_values.mysql_query_rules.epoch = epoch;
} else {
GloVars.checksums_values.mysql_query_rules.epoch = t;
}
GloVars.epoch_version = t;
GloVars.generate_global_checksum();
GloVars.checksums_values.updates_cnt++;
pthread_mutex_unlock(&GloVars.checksum_mutex);
proxy_info(
"Computed checksum for 'LOAD MYSQL QUERY RULES TO RUNTIME' was '%s', with epoch '%d'\n",
GloVars.checksums_values.mysql_query_rules.checksum, GloVars.checksums_values.mysql_query_rules.epoch
);
}
GloQPro->reset_all(false);
QP_rule_t * nqpr;
@ -12840,7 +12903,7 @@ unsigned long long ProxySQL_External_Scheduler::run_once() {
return next_run;
}
void ProxySQL_Admin::load_proxysql_servers_to_runtime(bool _lock) {
void ProxySQL_Admin::load_proxysql_servers_to_runtime(bool _lock, const std::string& checksum, const time_t epoch) {
// make sure that the caller has called mysql_servers_wrlock()
char *error=NULL;
int cols=0;
@ -12863,11 +12926,19 @@ void ProxySQL_Admin::load_proxysql_servers_to_runtime(bool _lock) {
GloVars.checksums_values.proxysql_servers.set_checksum(buf);
GloVars.checksums_values.proxysql_servers.version++;
time_t t = time(NULL);
GloVars.checksums_values.proxysql_servers.epoch = t;
if (epoch != 0 && checksum != "" && GloVars.checksums_values.proxysql_servers.checksum == checksum) {
GloVars.checksums_values.proxysql_servers.epoch = epoch;
} else {
GloVars.checksums_values.proxysql_servers.epoch = t;
}
GloVars.epoch_version = t;
GloVars.generate_global_checksum();
GloVars.checksums_values.updates_cnt++;
pthread_mutex_unlock(&GloVars.checksum_mutex);
proxy_info(
"Computed checksum for 'LOAD PROXYSQL SERVERS TO RUNTIME' was '%s', with epoch '%d'\n",
GloVars.checksums_values.proxysql_servers.checksum, GloVars.checksums_values.proxysql_servers.epoch
);
// }
}
if (resultset) delete resultset;

@ -611,11 +611,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
) {
if (v->diff_check >= diff_mqr) {
proxy_info("Cluster: detected a peer %s:%d with mysql_query_rules version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_mysql_query_rules_from_peer(v_exp_checksum);
if (strncmp(v->checksum, GloVars.checksums_values.mysql_query_rules.checksum, 20)==0) {
// we copied from the remote server, let's also copy the same epoch
GloVars.checksums_values.mysql_query_rules.epoch = v->epoch;
}
GloProxyCluster->pull_mysql_query_rules_from_peer(v_exp_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_mqr*10)) == 0)) {
@ -634,6 +630,8 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.mysql_servers.version,0);
unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.mysql_servers.epoch,0);
char* own_checksum = __sync_fetch_and_add(&GloVars.checksums_values.mysql_servers.checksum,0);
const std::string v_exp_checksum { v->checksum };
if (v->version > 1) {
if (
(own_version == 1) // we just booted
@ -642,11 +640,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
) {
if (v->diff_check >= diff_ms) {
proxy_info("Cluster: detected a peer %s:%d with mysql_servers version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_mysql_servers_from_peer();
if (strncmp(v->checksum, GloVars.checksums_values.mysql_servers.checksum, 20)==0) {
// we copied from the remote server, let's also copy the same epoch
GloVars.checksums_values.mysql_servers.epoch = v->epoch;
}
GloProxyCluster->pull_mysql_servers_from_peer(v_exp_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_ms*10)) == 0)) {
@ -675,11 +669,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
) {
if (v->diff_check >= diff_mu) {
proxy_info("Cluster: detected a peer %s:%d with mysql_users version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_mysql_users_from_peer(v_exp_checksum);
if (strncmp(v->checksum, GloVars.checksums_values.mysql_users.checksum, 20)==0) {
// we copied from the remote server, let's also copy the same epoch
GloVars.checksums_values.mysql_users.epoch = v->epoch;
}
GloProxyCluster->pull_mysql_users_from_peer(v_exp_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_mu*10)) == 0)) {
@ -708,11 +698,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
) {
if (v->diff_check >= diff_mv) {
proxy_info("Cluster: detected a peer %s:%d with mysql_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_global_variables_from_peer("mysql", expected_checksum);
if (strncmp(v->checksum, GloVars.checksums_values.mysql_variables.checksum, 20)==0) {
// we copied from the remote server, let's also copy the same epoch
GloVars.checksums_values.mysql_variables.epoch = v->epoch;
}
GloProxyCluster->pull_global_variables_from_peer("mysql", expected_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_mv*10)) == 0)) {
@ -741,11 +727,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
) {
if (v->diff_check >= diff_av) {
proxy_info("Cluster: detected a peer %s:%d with admin_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_global_variables_from_peer("admin", expected_checksum);
if (strncmp(v->checksum, GloVars.checksums_values.admin_variables.checksum, 20)==0) {
// we copied from the remote server, let's also copy the same epoch
GloVars.checksums_values.admin_variables.epoch = v->epoch;
}
GloProxyCluster->pull_global_variables_from_peer("admin", expected_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_av*10)) == 0)) {
@ -774,11 +756,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
) {
if (v->diff_check >= diff_lv) {
proxy_info("Cluster: detected a peer %s:%d with ldap_variables version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_global_variables_from_peer("ldap", expected_checksum);
if (strncmp(v->checksum, GloVars.checksums_values.ldap_variables.checksum, 20)==0) {
// we copied from the remote server, let's also copy the same epoch
GloVars.checksums_values.ldap_variables.epoch = v->epoch;
}
GloProxyCluster->pull_global_variables_from_peer("ldap", expected_checksum, v->epoch);
}
}
if ((v->epoch == own_epoch) && v->diff_check && ((v->diff_check % (diff_lv*10)) == 0)) {
@ -804,7 +782,7 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
unsigned long long v_epoch = v->epoch;
unsigned long long v_version = v->version;
unsigned int v_diff_check = v->diff_check;
char* v_exp_checksum = strdup(v->checksum);
const string v_exp_checksum { v->checksum };
if (
(own_version == 1) // we just booted
@ -813,20 +791,13 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
) {
if (v->diff_check >= diff_ps) {
proxy_info("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
// thus we need to copy it now
GloProxyCluster->pull_proxysql_servers_from_peer((const char *)v_exp_checksum);
if (strncmp(v_exp_checksum, GloVars.checksums_values.proxysql_servers.checksum, 20)==0) {
// we copied from the remote server, let's also copy the same epoch
GloVars.checksums_values.proxysql_servers.epoch = v_epoch;
}
GloProxyCluster->pull_proxysql_servers_from_peer(v_exp_checksum, v->epoch);
}
}
if ((v_epoch == own_epoch) && v_diff_check && ((v_diff_check % (diff_ps*10)) == 0)) {
proxy_error("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %llu checks until LOAD MYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v_version, v_epoch, v_diff_check, v_exp_checksum, own_version, own_epoch, own_checksum, (diff_ps*10));
proxy_error("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %u, checksum %s. Own version: %llu, epoch: %llu, checksum %s. Sync conflict, epoch times are EQUAL, can't determine which server holds the latest config, we won't sync. This message will be repeated every %llu checks until LOAD MYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v_version, v_epoch, v_diff_check, v->checksum, own_version, own_epoch, own_checksum, (diff_ps*10));
GloProxyCluster->metrics.p_counter_array[p_cluster_counter::sync_conflict_proxysql_servers_share_epoch]->Increment();
}
free(v_exp_checksum);
} else {
if (v->diff_check && (v->diff_check % (diff_ps*10)) == 0) {
proxy_warning("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %u. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD PROXYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_ps*10));
@ -884,7 +855,7 @@ uint64_t mysql_raw_checksum(MYSQL_RES* resultset) {
return res_hash;
}
void ProxySQL_Cluster::pull_mysql_query_rules_from_peer(const string& expected_checksum) {
void ProxySQL_Cluster::pull_mysql_query_rules_from_peer(const string& expected_checksum, const time_t epoch) {
char * hostname = NULL;
uint16_t port = 0;
pthread_mutex_lock(&GloProxyCluster->update_mysql_query_rules_mutex);
@ -1032,7 +1003,9 @@ void ProxySQL_Cluster::pull_mysql_query_rules_from_peer(const string& expected_c
GloAdmin->admindb->execute("COMMIT");
// We release the ownership of the memory for 'SQLite3' resultsets here since now it's no longer
// our responsability to free the memory, they should be directly passed to the 'Query Processor'
GloAdmin->load_mysql_query_rules_to_runtime(SQLite3_query_rules_resultset.release(), SQLite3_query_rules_fast_routing_resultset.release());
GloAdmin->load_mysql_query_rules_to_runtime(
SQLite3_query_rules_resultset.release(), SQLite3_query_rules_fast_routing_resultset.release(), expected_checksum, epoch
);
if (GloProxyCluster->cluster_mysql_query_rules_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_query_rules__from_memory_to_disk();
@ -1196,7 +1169,7 @@ void update_ldap_mappings(MYSQL_RES* result) {
}
}
void ProxySQL_Cluster::pull_mysql_users_from_peer(const string& expected_checksum) {
void ProxySQL_Cluster::pull_mysql_users_from_peer(const string& expected_checksum, const time_t epoch) {
char * hostname = NULL;
uint16_t port = 0;
pthread_mutex_lock(&GloProxyCluster->update_mysql_users_mutex);
@ -1278,7 +1251,7 @@ void ProxySQL_Cluster::pull_mysql_users_from_peer(const string& expected_checksu
proxy_info("Cluster: Loading to runtime LDAP Mappings from peer %s:%d\n", hostname, port);
}
GloAdmin->init_users();
GloAdmin->init_users(expected_checksum, epoch);
if (GloProxyCluster->cluster_mysql_users_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Users from peer %s:%d\n", hostname, port);
if (GloMyLdapAuth) {
@ -1491,7 +1464,7 @@ uint64_t compute_servers_tables_raw_checksum(const vector<MYSQL_RES*>& results)
return servers_hash;
}
void ProxySQL_Cluster::pull_mysql_servers_from_peer() {
void ProxySQL_Cluster::pull_mysql_servers_from_peer(const std::string& checksum, const time_t epoch) {
char * hostname = NULL;
uint16_t port = 0;
char * peer_checksum = NULL;
@ -1783,7 +1756,7 @@ void ProxySQL_Cluster::pull_mysql_servers_from_peer() {
delete resultset;
proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->load_mysql_servers_to_runtime();
GloAdmin->load_mysql_servers_to_runtime(checksum, epoch);
if (GloProxyCluster->cluster_mysql_servers_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_servers__from_memory_to_disk();
@ -1822,7 +1795,7 @@ __exit_pull_mysql_servers_from_peer:
pthread_mutex_unlock(&GloProxyCluster->update_mysql_servers_mutex);
}
void ProxySQL_Cluster::pull_global_variables_from_peer(const string& var_type, const string& expected_checksum) {
void ProxySQL_Cluster::pull_global_variables_from_peer(const string& var_type, const string& expected_checksum, const time_t epoch) {
char * hostname = NULL;
uint16_t port = 0;
char* vars_type_str = nullptr;
@ -1941,21 +1914,21 @@ void ProxySQL_Cluster::pull_global_variables_from_peer(const string& var_type, c
proxy_info("Cluster: Loading to runtime %s Variables from peer %s:%d\n", vars_type_str, hostname, port);
if (var_type == "mysql") {
GloAdmin->load_mysql_variables_to_runtime();
GloAdmin->load_mysql_variables_to_runtime(expected_checksum, epoch);
if (GloProxyCluster->cluster_mysql_variables_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Variables from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_variables__from_memory_to_disk();
}
} else if (var_type == "admin") {
GloAdmin->load_admin_variables_to_runtime();
GloAdmin->load_admin_variables_to_runtime(expected_checksum, epoch);
if (GloProxyCluster->cluster_admin_variables_save_to_disk == true) {
proxy_info("Cluster: Saving to disk Admin Variables from peer %s:%d\n", hostname, port);
GloAdmin->flush_admin_variables__from_memory_to_disk();
}
} else if (var_type == "ldap") {
GloAdmin->load_ldap_variables_to_runtime();
GloAdmin->load_ldap_variables_to_runtime(expected_checksum, epoch);
if (GloProxyCluster->cluster_ldap_variables_save_to_disk == true) {
proxy_info("Cluster: Saving to disk LDAP Variables from peer %s:%d\n", hostname, port);
@ -1994,7 +1967,7 @@ __exit_pull_mysql_variables_from_peer:
pthread_mutex_unlock(&GloProxyCluster->update_mysql_variables_mutex);
}
void ProxySQL_Cluster::pull_proxysql_servers_from_peer(const char *expected_checksum) {
void ProxySQL_Cluster::pull_proxysql_servers_from_peer(const std::string& expected_checksum, const time_t epoch) {
char * hostname = NULL;
uint16_t port = 0;
pthread_mutex_lock(&GloProxyCluster->update_proxysql_servers_mutex);
@ -2018,7 +1991,10 @@ void ProxySQL_Cluster::pull_proxysql_servers_from_peer(const char *expected_chec
//mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long);
//mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
{ unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val); }
proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum);
proxy_info(
"Cluster: Fetching ProxySQL Servers from peer %s:%d started. Expected checksum: %s\n",
hostname, port, expected_checksum.c_str()
);
rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0);
if (rc_conn) {
rc_query = mysql_query(conn,"SELECT hostname, port, weight, comment FROM runtime_proxysql_servers ORDER BY hostname, port");
@ -2058,7 +2034,7 @@ void ProxySQL_Cluster::pull_proxysql_servers_from_peer(const char *expected_chec
delete resultset;
proxy_info("Cluster: Loading to runtime ProxySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->load_proxysql_servers_to_runtime(false);
GloAdmin->load_proxysql_servers_to_runtime(false, expected_checksum, epoch);
if (GloProxyCluster->cluster_proxysql_servers_save_to_disk == true) {
proxy_info("Cluster: Saving to disk ProxySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->flush_proxysql_servers__from_memory_to_disk();
@ -2069,7 +2045,7 @@ void ProxySQL_Cluster::pull_proxysql_servers_from_peer(const char *expected_chec
} else {
proxy_info(
"Cluster: Fetching ProxySQL Servers from peer %s:%d failed: Checksum changed from %s to %s\n",
hostname, port, expected_checksum, computed_cks.c_str()
hostname, port, expected_checksum.c_str(), computed_cks.c_str()
);
metrics.p_counter_array[p_cluster_counter::pulled_proxysql_servers_failure]->Increment();
}

Loading…
Cancel
Save