From e3c4cb9964aea2b40f8d98374c29037c234cb695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Fri, 1 Sep 2017 21:11:48 +0200 Subject: [PATCH] A lot of changes all related to Cluster In `MySQL_Cluster` class added functions to sync with remote node, like: - `pull_mysql_query_rules_from_peer()` - `pull_mysql_servers_from_peer()` - `pull_mysql_users_from_peer()` - `pull_proxysql_servers_from_peer()` Added 8 new global variables in Admin. 4 variables determine after how many different checks the remote configuration will be synced: - cluster_mysql_query_rules_diffs_before_sync - cluster_mysql_servers_diffs_before_sync - cluster_mysql_users_diffs_before_sync - cluster_proxysql_servers_diffs_before_sync 4 variables determine if after a remote sync the changes need to be written to disk: - cluster_mysql_query_rules_save_to_disk - cluster_mysql_servers_save_to_disk - cluster_mysql_users_save_to_disk - cluster_proxysql_servers_save_to_disk Table `proxysql_servers` is now automatically loaded from disk to memory and into runtime at startup. Added new Admin's command `LOAD PROXYSQL SERVERS FROM CONFIG` to load `proxysql_servers` from config file to memory (not runtime). Internal structures with credentials in MySQL_Authentication moved from `unsorted_map` to `map` : this to ensure the right order when generating the checksum. Config file supports both `address` and `hostname` for `mysql_servers` and `proxysql_servers` , #1091 `ProxySQL_Admin::load_proxysql_servers_to_runtime()` now has a lock or no lock option, to avoid deadlock For now, Cluster module is quite verbose. --- include/MySQL_Authentication.hpp | 4 +- include/ProxySQL_Cluster.hpp | 29 +- include/proxysql_admin.h | 12 +- lib/MySQL_Authentication.cpp | 46 ++- lib/MySQL_HostGroups_Manager.cpp | 73 +++- lib/ProxySQL_Admin.cpp | 301 +++++++++++++-- lib/ProxySQL_Cluster.cpp | 639 ++++++++++++++++++++++++++++++- src/main.cpp | 1 + 8 files changed, 1051 insertions(+), 54 deletions(-) diff --git a/include/MySQL_Authentication.hpp b/include/MySQL_Authentication.hpp index 116e68cd3..6381bfd46 100644 --- a/include/MySQL_Authentication.hpp +++ b/include/MySQL_Authentication.hpp @@ -30,7 +30,7 @@ typedef struct _account_details_t { #endif /* DEBUG */ #define MYSQL_AUTHENTICATION_VERSION "0.2.0902" DEB -typedef std::unordered_map umap_auth; +typedef std::map umap_auth; class PtrArray; @@ -49,6 +49,7 @@ class MySQL_Authentication { creds_group_t creds_backends; creds_group_t creds_frontends; bool _reset(enum cred_username_type usertype); + uint64_t _get_runtime_checksum(enum cred_username_type usertype); public: MySQL_Authentication(); ~MySQL_Authentication(); @@ -64,6 +65,7 @@ class MySQL_Authentication { void remove_inactives(enum cred_username_type usertype); bool set_SHA1(char *username, enum cred_username_type usertype, void *sha_pass); unsigned int memory_usage(); + uint64_t get_runtime_checksum(); }; #endif /* __CLASS_MYSQL_AUTHENTICATION_H */ diff --git a/include/ProxySQL_Cluster.hpp b/include/ProxySQL_Cluster.hpp index 649d49323..b49f864f9 100644 --- a/include/ProxySQL_Cluster.hpp +++ b/include/ProxySQL_Cluster.hpp @@ -51,6 +51,9 @@ class ProxySQL_Node_Entry { int metrics_idx_prev; int metrics_idx; ProxySQL_Node_Metrics **metrics; +// void pull_mysql_query_rules_from_peer(); +// void pull_mysql_servers_from_peer(); +// void pull_proxysql_servers_from_peer(); public: uint64_t get_hash(); @@ -95,13 +98,17 @@ class ProxySQL_Cluster_Nodes { public: ProxySQL_Cluster_Nodes(); ~ProxySQL_Cluster_Nodes(); - void load_servers_list(SQLite3_result *); + void load_servers_list(SQLite3_result *, bool _lock); bool Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_RES *_r, unsigned long long _response_time); bool Update_Global_Checksum(char * _h, uint16_t _p, MYSQL_RES *_r); bool Update_Node_Checksums(char * _h, uint16_t _p, MYSQL_RES *_r); SQLite3_result * dump_table_proxysql_servers(); SQLite3_result * stats_proxysql_servers_checksums(); SQLite3_result * stats_proxysql_servers_metrics(); + void get_peer_to_sync_mysql_query_rules(char **host, uint16_t *port); + void get_peer_to_sync_mysql_servers(char **host, uint16_t *port); + void get_peer_to_sync_mysql_users(char **host, uint16_t *port); + void get_peer_to_sync_proxysql_servers(char **host, uint16_t *port); }; @@ -113,14 +120,26 @@ class ProxySQL_Cluster { char *cluster_username; char *cluster_password; public: + pthread_mutex_t update_mysql_query_rules_mutex; + pthread_mutex_t update_mysql_servers_mutex; + pthread_mutex_t update_mysql_users_mutex; + pthread_mutex_t update_proxysql_servers_mutex; int cluster_check_interval_ms; int cluster_check_status_frequency; + int cluster_mysql_query_rules_diffs_before_sync; + int cluster_mysql_servers_diffs_before_sync; + int cluster_mysql_users_diffs_before_sync; + int cluster_proxysql_servers_diffs_before_sync; + bool cluster_mysql_query_rules_save_to_disk; + bool cluster_mysql_servers_save_to_disk; + bool cluster_mysql_users_save_to_disk; + bool cluster_proxysql_servers_save_to_disk; ProxySQL_Cluster(); ~ProxySQL_Cluster(); void init() {}; void print_version(); - void load_servers_list(SQLite3_result *r) { - nodes.load_servers_list(r); + void load_servers_list(SQLite3_result *r, bool _lock = true) { + nodes.load_servers_list(r, _lock); } void get_credentials(char **, char **); void set_username(char *); @@ -145,5 +164,9 @@ class ProxySQL_Cluster { } void thread_ending(pthread_t); void join_term_thread(); + void pull_mysql_query_rules_from_peer(); + void pull_mysql_servers_from_peer(); + void pull_mysql_users_from_peer(); + void pull_proxysql_servers_from_peer(); }; #endif /* CLASS_PROXYSQL_CLUSTER_H */ diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index fc7b84690..9a86bb35f 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -88,6 +88,14 @@ class ProxySQL_Admin { char * cluster_password; int cluster_check_interval_ms; int cluster_check_status_frequency; + int cluster_mysql_query_rules_diffs_before_sync; + int cluster_mysql_servers_diffs_before_sync; + int cluster_mysql_users_diffs_before_sync; + int cluster_proxysql_servers_diffs_before_sync; + bool cluster_mysql_query_rules_save_to_disk; + bool cluster_mysql_servers_save_to_disk; + bool cluster_mysql_users_save_to_disk; + bool cluster_proxysql_servers_save_to_disk; #ifdef DEBUG bool debug; #endif /* DEBUG */ @@ -165,6 +173,7 @@ class ProxySQL_Admin { void init_users(); void init_mysql_servers(); void init_mysql_query_rules(); + void init_proxysql_servers(); void save_mysql_users_runtime_to_database(bool _runtime); void save_mysql_servers_runtime_to_database(bool); void admin_shutdown(); @@ -215,6 +224,7 @@ class ProxySQL_Admin { int Read_MySQL_Query_Rules_from_configfile(); int Read_MySQL_Servers_from_configfile(); int Read_Scheduler_from_configfile(); + int Read_ProxySQL_Servers_from_configfile(); void flush_error_log(); void GenericRefreshStatistics(const char *query_no_space, unsigned int query_no_space_length, bool admin); @@ -230,7 +240,7 @@ class ProxySQL_Admin { void flush_configdb(); // 923 // Cluster - void load_proxysql_servers_to_runtime(); + void load_proxysql_servers_to_runtime(bool _lock=true); void flush_proxysql_servers__from_memory_to_disk(); void flush_proxysql_servers__from_disk_to_memory(); void save_proxysql_servers_runtime_to_database(bool); diff --git a/lib/MySQL_Authentication.cpp b/lib/MySQL_Authentication.cpp index c81ab36ab..4a860de3e 100644 --- a/lib/MySQL_Authentication.cpp +++ b/lib/MySQL_Authentication.cpp @@ -90,7 +90,7 @@ bool MySQL_Authentication::add(char * username, char * password, enum cred_usern #else spin_wrlock(&cg.lock); #endif - std::unordered_map::iterator lookup; + std::map::iterator lookup; lookup = cg.bt_map.find(hash1); // few changes will follow, due to issue #802 account_details_t *ad=NULL; @@ -273,7 +273,7 @@ int MySQL_Authentication::increase_frontend_user_connections(char *username, int #else spin_wrlock(&cg.lock); #endif - std::unordered_map::iterator it; + std::map::iterator it; it = cg.bt_map.find(hash1); if (it != cg.bt_map.end()) { account_details_t *ad=it->second; @@ -306,7 +306,7 @@ void MySQL_Authentication::decrease_frontend_user_connections(char *username) { #else spin_wrlock(&cg.lock); #endif - std::unordered_map::iterator it; + std::map::iterator it; it = cg.bt_map.find(hash1); if (it != cg.bt_map.end()) { account_details_t *ad=it->second; @@ -338,7 +338,7 @@ bool MySQL_Authentication::del(char * username, enum cred_username_type usertype #else spin_wrlock(&cg.lock); #endif - std::unordered_map::iterator lookup; + std::map::iterator lookup; lookup = cg.bt_map.find(hash1); if (lookup != cg.bt_map.end()) { account_details_t *ad=lookup->second; @@ -376,7 +376,7 @@ bool MySQL_Authentication::set_SHA1(char * username, enum cred_username_type use #else spin_wrlock(&cg.lock); #endif - std::unordered_map::iterator lookup; + std::map::iterator lookup; lookup = cg.bt_map.find(hash1); if (lookup != cg.bt_map.end()) { account_details_t *ad=lookup->second; @@ -410,7 +410,7 @@ char * MySQL_Authentication::lookup(char * username, enum cred_username_type use #else spin_rdlock(&cg.lock); #endif - std::unordered_map::iterator lookup; + std::map::iterator lookup; lookup = cg.bt_map.find(hash1); if (lookup != cg.bt_map.end()) { account_details_t *ad=lookup->second; @@ -446,7 +446,7 @@ bool MySQL_Authentication::_reset(enum cred_username_type usertype) { #else spin_wrlock(&cg.lock); #endif - std::unordered_map::iterator lookup; + std::map::iterator lookup; while (cg.bt_map.size()) { lookup = cg.bt_map.begin(); @@ -476,3 +476,35 @@ bool MySQL_Authentication::reset() { _reset(USERNAME_FRONTEND); return true; } + + +uint64_t MySQL_Authentication::_get_runtime_checksum(enum cred_username_type usertype) { + creds_group_t &cg=(usertype==USERNAME_BACKEND ? creds_backends : creds_frontends); + std::map::iterator it; + if (cg.bt_map.size() == 0) { + return 0; + } + SpookyHash myhash; + myhash.Init(13,4); + for (it = cg.bt_map.begin(); it != cg.bt_map.end(); ) { + account_details_t *ad=it->second; + myhash.Update(&ad->use_ssl,sizeof(ad->use_ssl)); + myhash.Update(&ad->default_hostgroup,sizeof(ad->default_hostgroup)); + myhash.Update(&ad->schema_locked,sizeof(ad->schema_locked)); + myhash.Update(&ad->transaction_persistent,sizeof(ad->transaction_persistent)); + myhash.Update(&ad->fast_forward,sizeof(ad->fast_forward)); + myhash.Update(&ad->max_connections,sizeof(ad->max_connections)); + myhash.Update(ad->username,strlen(ad->username)); + myhash.Update(ad->password,strlen(ad->password)); + it++; + } + uint64_t hash1, hash2; + myhash.Final(&hash1, &hash2); + return hash1; +} + +uint64_t MySQL_Authentication::get_runtime_checksum() { + uint64_t hashB = _get_runtime_checksum(USERNAME_BACKEND); + uint64_t hashF = _get_runtime_checksum(USERNAME_FRONTEND); + return hashB+hashF; +} diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 79014e5e5..a1ccef31e 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -702,14 +702,19 @@ bool MySQL_HostGroups_Manager::commit() { if ( GloAdmin && GloAdmin->checksum_variables.checksum_mysql_servers ) { - uint64_t hash1, hash2; + uint64_t hash1=0, hash2=0; SpookyHash myhash; char buf[80]; - myhash.Init(19,3); + bool init = false; +/* removing all this code, because we need them ordered MySrvC *mysrvc=NULL; for (unsigned int i=0; ilen; i++) { MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); for (unsigned int j=0; jmysrvs->servers->len; j++) { + if (init == false) { + init = true; + myhash.Init(19,3); + } mysrvc=myhgc->mysrvs->idx(j); // hostgroup sprintf(buf,"%u",mysrvc->myhgc->hid); @@ -747,7 +752,69 @@ bool MySQL_HostGroups_Manager::commit() { } else { myhash.Update("",0); } } } - myhash.Final(&hash1, &hash2); +*/ + { + mydb->execute("DELETE FROM mysql_servers"); + generate_mysql_servers_table(); + char *error=NULL; + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + char *query=(char *)"SELECT * FROM mysql_servers WHERE ORDER BY hostgroup_id, hostname, port"; + mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (resultset) { + if (resultset->rows_count) { + if (init == false) { + init = true; + myhash.Init(19,3); + } + uint64_t hash1_ = resultset->raw_checksum(); + myhash.Update(&hash1_, sizeof(hash1_)); + } + delete resultset; + } + } + { + char *error=NULL; + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + char *query=(char *)"SELECT * FROM mysql_replication_hostgroups ORDER BY writer_hostgroup"; + mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (resultset) { + if (resultset->rows_count) { + if (init == false) { + init = true; + myhash.Init(19,3); + } + uint64_t hash1_ = resultset->raw_checksum(); + myhash.Update(&hash1_, sizeof(hash1_)); + } + delete resultset; + } + } + { + char *error=NULL; + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + char *query=(char *)"SELECT * FROM mysql_group_replication_hostgroups ORDER BY writer_hostgroup"; + mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (resultset) { + if (resultset->rows_count) { + if (init == false) { + init = true; + myhash.Init(19,3); + } + uint64_t hash1_ = resultset->raw_checksum(); + myhash.Update(&hash1_, sizeof(hash1_)); + } + delete resultset; + } + } + if (init == true) { + myhash.Final(&hash1, &hash2); + } uint32_t d32[2]; memcpy(&d32,&hash1,sizeof(hash1)); sprintf(buf,"0x%0X%0X", d32[0], d32[1]); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 2d27c442a..01f5b8081 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -282,6 +282,14 @@ static char * admin_variables_names[]= { (char *)"cluster_password", (char *)"cluster_check_interval_ms", (char *)"cluster_check_status_frequency", + (char *)"cluster_mysql_query_rules_diffs_before_sync", + (char *)"cluster_mysql_servers_diffs_before_sync", + (char *)"cluster_mysql_users_diffs_before_sync", + (char *)"cluster_proxysql_servers_diffs_before_sync", + (char *)"cluster_mysql_query_rules_save_to_disk", + (char *)"cluster_mysql_servers_save_to_disk", + (char *)"cluster_mysql_users_save_to_disk", + (char *)"cluster_proxysql_servers_save_to_disk", (char *)"checksum_mysql_query_rules", (char *)"checksum_mysql_servers", (char *)"checksum_mysql_users", @@ -1275,6 +1283,34 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query return false; } + if ( + (query_no_space_length==strlen("LOAD PROXYSQL SERVERS FROM CONFIG") && !strncasecmp("LOAD PROXYSQL SERVERS FROM CONFIG",query_no_space, query_no_space_length)) + ) { + proxy_info("Received %s command\n", query_no_space); + if (GloVars.configfile_open) { + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Loading from file %s\n", GloVars.config_file); + if (GloVars.confFile->OpenFile(NULL)==true) { + ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa; + int rows=0; + rows=SPA->Read_ProxySQL_Servers_from_configfile(); + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Loaded ProxySQL servers from CONFIG\n"); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, rows); + GloVars.confFile->CloseFile(); + } else { + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Unable to open or parse config file %s\n", GloVars.config_file); + char *s=(char *)"Unable to open or parse config file %s"; + char *m=(char *)malloc(strlen(s)+strlen(GloVars.config_file)+1); + sprintf(m,s,GloVars.config_file); + SPA->send_MySQL_ERR(&sess->client_myds->myprot, m); + free(m); + } + } else { + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Unknown config file\n"); + SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"Config file unknown"); + } + return false; + } + } if ((query_no_space_length>23) && ( (!strncasecmp("SAVE MYSQL QUERY RULES ", query_no_space, 23)) || (!strncasecmp("LOAD MYSQL QUERY RULES ", query_no_space, 23))) ) { @@ -2791,9 +2827,17 @@ ProxySQL_Admin::ProxySQL_Admin() { variables.cluster_password=strdup((char *)""); variables.cluster_check_interval_ms=1000; variables.cluster_check_status_frequency=10; + variables.cluster_mysql_query_rules_diffs_before_sync = 3; + variables.cluster_mysql_servers_diffs_before_sync = 3; + variables.cluster_mysql_users_diffs_before_sync = 3; + variables.cluster_proxysql_servers_diffs_before_sync = 3; checksum_variables.checksum_mysql_query_rules = true; checksum_variables.checksum_mysql_servers = true; checksum_variables.checksum_mysql_users = true; + variables.cluster_mysql_query_rules_save_to_disk = true; + variables.cluster_mysql_servers_save_to_disk = true; + variables.cluster_mysql_users_save_to_disk = true; + variables.cluster_proxysql_servers_save_to_disk = true; #ifdef DEBUG variables.debug=GloVars.global.gdbg; #endif /* DEBUG */ @@ -3010,6 +3054,7 @@ bool ProxySQL_Admin::init() { Read_MySQL_Users_from_configfile(); Read_MySQL_Query_Rules_from_configfile(); Read_Scheduler_from_configfile(); + Read_ProxySQL_Servers_from_configfile(); __insert_or_replace_disktable_select_maintable(); } else { if (GloVars.confFile->OpenFile(GloVars.config_file)==true) { @@ -3019,6 +3064,7 @@ bool ProxySQL_Admin::init() { Read_Global_Variables_from_configfile("admin"); Read_Global_Variables_from_configfile("mysql"); Read_Scheduler_from_configfile(); + Read_ProxySQL_Servers_from_configfile(); __insert_or_replace_disktable_select_maintable(); } } @@ -3378,6 +3424,34 @@ char * ProxySQL_Admin::get_variable(char *name) { sprintf(intbuf,"%d",variables.cluster_check_status_frequency); return strdup(intbuf); } + if (!strcasecmp(name,"cluster_mysql_query_rules_diffs_before_sync")) { + sprintf(intbuf,"%d",variables.cluster_mysql_query_rules_diffs_before_sync); + return strdup(intbuf); + } + if (!strcasecmp(name,"cluster_mysql_servers_diffs_before_sync")) { + sprintf(intbuf,"%d",variables.cluster_mysql_servers_diffs_before_sync); + return strdup(intbuf); + } + if (!strcasecmp(name,"cluster_mysql_users_diffs_before_sync")) { + sprintf(intbuf,"%d",variables.cluster_mysql_users_diffs_before_sync); + return strdup(intbuf); + } + if (!strcasecmp(name,"cluster_proxysql_servers_diffs_before_sync")) { + sprintf(intbuf,"%d",variables.cluster_proxysql_servers_diffs_before_sync); + return strdup(intbuf); + } + if (!strcasecmp(name,"cluster_mysql_query_rules_save_to_disk")) { + return strdup((variables.cluster_mysql_query_rules_save_to_disk ? "true" : "false")); + } + if (!strcasecmp(name,"cluster_mysql_servers_save_to_disk")) { + return strdup((variables.cluster_mysql_servers_save_to_disk ? "true" : "false")); + } + if (!strcasecmp(name,"cluster_mysql_users_save_to_disk")) { + return strdup((variables.cluster_mysql_users_save_to_disk ? "true" : "false")); + } + if (!strcasecmp(name,"cluster_proxysql_servers_save_to_disk")) { + return strdup((variables.cluster_proxysql_servers_save_to_disk ? "true" : "false")); + } if (!strcasecmp(name,"refresh_interval")) { sprintf(intbuf,"%d",variables.refresh_interval); return strdup(intbuf); @@ -3598,6 +3672,46 @@ bool ProxySQL_Admin::set_variable(char *name, char *value) { // this is the pub return false; } } + if (!strcasecmp(name,"cluster_mysql_query_rules_diffs_before_sync")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 1000) { + variables.cluster_mysql_query_rules_diffs_before_sync=intv; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_query_rules_diffs_before_sync, intv); + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"cluster_mysql_servers_diffs_before_sync")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 1000) { + variables.cluster_mysql_servers_diffs_before_sync=intv; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_servers_diffs_before_sync, intv); + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"cluster_mysql_users_diffs_before_sync")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 1000) { + variables.cluster_mysql_users_diffs_before_sync=intv; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_users_diffs_before_sync, intv); + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"cluster_proxysql_servers_diffs_before_sync")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 1000) { + variables.cluster_proxysql_servers_diffs_before_sync=intv; + __sync_lock_test_and_set(&GloProxyCluster->cluster_proxysql_servers_diffs_before_sync, intv); + return true; + } else { + return false; + } + } if (!strcasecmp(name,"version")) { if (strcasecmp(value,(char *)PROXYSQL_VERSION)==0) { return true; @@ -3616,6 +3730,58 @@ bool ProxySQL_Admin::set_variable(char *name, char *value) { // this is the pub } return false; } + if (!strcasecmp(name,"cluster_mysql_query_rules_save_to_disk")) { + if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { + variables.cluster_mysql_query_rules_save_to_disk=true; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk, true); + return true; + } + if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) { + variables.cluster_mysql_query_rules_save_to_disk=false; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk, false); + return true; + } + return false; + } + if (!strcasecmp(name,"cluster_mysql_servers_save_to_disk")) { + if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { + variables.cluster_mysql_servers_save_to_disk=true; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_servers_save_to_disk, true); + return true; + } + if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) { + variables.cluster_mysql_servers_save_to_disk=false; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_servers_save_to_disk, false); + return true; + } + return false; + } + if (!strcasecmp(name,"cluster_mysql_users_save_to_disk")) { + if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { + variables.cluster_mysql_users_save_to_disk=true; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_users_save_to_disk, true); + return true; + } + if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) { + variables.cluster_mysql_users_save_to_disk=false; + __sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_users_save_to_disk, false); + return true; + } + return false; + } + if (!strcasecmp(name,"cluster_proxysql_servers_save_to_disk")) { + if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { + variables.cluster_proxysql_servers_save_to_disk=true; + __sync_lock_test_and_set(&GloProxyCluster->cluster_proxysql_servers_save_to_disk, true); + return true; + } + if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) { + variables.cluster_proxysql_servers_save_to_disk=false; + __sync_lock_test_and_set(&GloProxyCluster->cluster_proxysql_servers_save_to_disk, false); + return true; + } + return false; + } if (!strcasecmp(name,"checksum_mysql_query_rules")) { if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { checksum_variables.checksum_mysql_query_rules=true; @@ -4443,58 +4609,62 @@ int ProxySQL_Admin::flush_debug_levels_database_to_runtime(SQLite3DB *db) { void ProxySQL_Admin::__insert_or_ignore_maintable_select_disktable() { - admindb->execute("PRAGMA foreign_keys = OFF"); - admindb->execute("INSERT OR IGNORE INTO main.mysql_servers SELECT * FROM disk.mysql_servers"); - admindb->execute("INSERT OR IGNORE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups"); - admindb->execute("INSERT OR IGNORE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups"); - admindb->execute("INSERT OR IGNORE INTO main.mysql_users SELECT * FROM disk.mysql_users"); + admindb->execute("PRAGMA foreign_keys = OFF"); + admindb->execute("INSERT OR IGNORE INTO main.mysql_servers SELECT * FROM disk.mysql_servers"); + admindb->execute("INSERT OR IGNORE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups"); + admindb->execute("INSERT OR IGNORE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups"); + admindb->execute("INSERT OR IGNORE INTO main.mysql_users SELECT * FROM disk.mysql_users"); admindb->execute("INSERT OR IGNORE INTO main.mysql_query_rules SELECT * FROM disk.mysql_query_rules"); admindb->execute("INSERT OR IGNORE INTO main.global_variables SELECT * FROM disk.global_variables"); admindb->execute("INSERT OR IGNORE INTO main.scheduler SELECT * FROM disk.scheduler"); + admindb->execute("INSERT OR IGNORE INTO main.proxysql_servers SELECT * FROM disk.proxysql_servers"); #ifdef DEBUG - admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels"); + admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels"); #endif /* DEBUG */ - admindb->execute("PRAGMA foreign_keys = ON"); + admindb->execute("PRAGMA foreign_keys = ON"); } void ProxySQL_Admin::__insert_or_replace_maintable_select_disktable() { - admindb->execute("PRAGMA foreign_keys = OFF"); - admindb->execute("INSERT OR REPLACE INTO main.mysql_servers SELECT * FROM disk.mysql_servers"); - admindb->execute("INSERT OR REPLACE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups"); - admindb->execute("INSERT OR REPLACE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups"); - admindb->execute("INSERT OR REPLACE INTO main.mysql_users SELECT * FROM disk.mysql_users"); + admindb->execute("PRAGMA foreign_keys = OFF"); + admindb->execute("INSERT OR REPLACE INTO main.mysql_servers SELECT * FROM disk.mysql_servers"); + admindb->execute("INSERT OR REPLACE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups"); + admindb->execute("INSERT OR REPLACE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups"); + admindb->execute("INSERT OR REPLACE INTO main.mysql_users SELECT * FROM disk.mysql_users"); admindb->execute("INSERT OR REPLACE INTO main.mysql_query_rules SELECT * FROM disk.mysql_query_rules"); admindb->execute("INSERT OR REPLACE INTO main.global_variables SELECT * FROM disk.global_variables"); admindb->execute("INSERT OR REPLACE INTO main.scheduler SELECT * FROM disk.scheduler"); + admindb->execute("INSERT OR REPLACE INTO main.proxysql_servers SELECT * FROM disk.proxysql_servers"); #ifdef DEBUG - admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels"); + admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels"); #endif /* DEBUG */ - admindb->execute("PRAGMA foreign_keys = ON"); + admindb->execute("PRAGMA foreign_keys = ON"); } void ProxySQL_Admin::__delete_disktable() { - admindb->execute("DELETE FROM disk.mysql_servers"); - admindb->execute("DELETE FROM disk.mysql_replication_hostgroups"); - admindb->execute("DELETE FROM disk.mysql_users"); + admindb->execute("DELETE FROM disk.mysql_servers"); + admindb->execute("DELETE FROM disk.mysql_replication_hostgroups"); + admindb->execute("DELETE FROM disk.mysql_users"); admindb->execute("DELETE FROM disk.mysql_query_rules"); admindb->execute("DELETE FROM disk.global_variables"); admindb->execute("DELETE FROM disk.scheduler"); + admindb->execute("DELETE FROM disk.proxysql_servers"); #ifdef DEBUG - admindb->execute("DELETE FROM disk.debug_levels"); + admindb->execute("DELETE FROM disk.debug_levels"); #endif /* DEBUG */ } void ProxySQL_Admin::__insert_or_replace_disktable_select_maintable() { - admindb->execute("INSERT OR REPLACE INTO disk.mysql_servers SELECT * FROM main.mysql_servers"); - admindb->execute("INSERT OR REPLACE INTO disk.mysql_replication_hostgroups SELECT * FROM main.mysql_replication_hostgroups"); - admindb->execute("INSERT OR REPLACE INTO disk.mysql_group_replication_hostgroups SELECT * FROM main.mysql_group_replication_hostgroups"); - admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules"); - admindb->execute("INSERT OR REPLACE INTO disk.mysql_users SELECT * FROM main.mysql_users"); + admindb->execute("INSERT OR REPLACE INTO disk.mysql_servers SELECT * FROM main.mysql_servers"); + admindb->execute("INSERT OR REPLACE INTO disk.mysql_replication_hostgroups SELECT * FROM main.mysql_replication_hostgroups"); + admindb->execute("INSERT OR REPLACE INTO disk.mysql_group_replication_hostgroups SELECT * FROM main.mysql_group_replication_hostgroups"); + admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules"); + admindb->execute("INSERT OR REPLACE INTO disk.mysql_users SELECT * FROM main.mysql_users"); admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules"); admindb->execute("INSERT OR REPLACE INTO disk.global_variables SELECT * FROM main.global_variables"); admindb->execute("INSERT OR REPLACE INTO disk.scheduler SELECT * FROM main.scheduler"); + admindb->execute("INSERT OR REPLACE INTO disk.proxysql_servers SELECT * FROM main.proxysql_servers"); #ifdef DEBUG - admindb->execute("INSERT OR REPLACE INTO disk.debug_levels SELECT * FROM main.debug_levels"); + admindb->execute("INSERT OR REPLACE INTO disk.debug_levels SELECT * FROM main.debug_levels"); #endif /* DEBUG */ } @@ -4597,6 +4767,10 @@ void ProxySQL_Admin::init_mysql_servers() { mysql_servers_wrunlock(); } +void ProxySQL_Admin::init_proxysql_servers() { + load_proxysql_servers_to_runtime(); +} + void ProxySQL_Admin::init_mysql_query_rules() { load_mysql_query_rules_to_runtime(); } @@ -4623,19 +4797,24 @@ void ProxySQL_Admin::__refresh_users() { GloMyAuth->set_all_inactive(USERNAME_BACKEND); GloMyAuth->set_all_inactive(USERNAME_FRONTEND); add_admin_users(); - uint64_t hashB, hashF; - if (calculate_checksum) { - __add_active_users(USERNAME_BACKEND, NULL, &hashB); - __add_active_users(USERNAME_FRONTEND, NULL, &hashF); - } else { + +// uint64_t hashB, hashF; +// if (calculate_checksum) { +// __add_active_users(USERNAME_BACKEND, NULL, &hashB); +// __add_active_users(USERNAME_FRONTEND, NULL, &hashF); +// } else { __add_active_users(USERNAME_BACKEND); __add_active_users(USERNAME_FRONTEND); - } +// } GloMyAuth->remove_inactives(USERNAME_BACKEND); GloMyAuth->remove_inactives(USERNAME_FRONTEND); + uint64_t hash1 = 0; + if (calculate_checksum) { + } set_variable((char *)"admin_credentials",(char *)""); if (calculate_checksum) { - uint64_t hash1 = hashB + hashF; // overflow allowed + hash1 = GloMyAuth->get_runtime_checksum(); + //uint64_t hash1 = hashB + hashF; // overflow allowed uint32_t d32[2]; char buf[20]; memcpy(&d32, &hash1, sizeof(hash1)); @@ -4693,6 +4872,7 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char * char *error=NULL; int cols=0; int affected_rows=0; + bool empty = true; SpookyHash myhash; if (hash1) { myhash.Init(19,3); @@ -4731,6 +4911,7 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char * SQLite3_row *r=new SQLite3_row(cols); r->add_fields(statement); if (hash1) { + empty = false; for (int i=0; ifields[i]) { myhash.Update(r->fields[i],r->sizes[i]); @@ -4800,6 +4981,9 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char * uint64_t h1, h2; myhash.Final(&h1, &h2); *hash1 = h1; + if (empty) { + *hash1 = 0; + } } #else if (resultset) delete resultset; @@ -5366,7 +5550,7 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() { int affected_rows=0; if (GloQPro==NULL) return (char *)"Global Query Processor not started: command impossible to run"; SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment FROM main.mysql_query_rules WHERE active=1"; + char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment FROM main.mysql_query_rules WHERE active=1 ORDER BY rule_id"; admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { proxy_error("Error on %s : %s\n", query, error); @@ -5886,7 +6070,11 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() { int use_ssl=0; int max_latency_ms=0; std::string comment=""; - if (server.lookupValue("address", address)==false) continue; + if (server.lookupValue("address", address)==false) { + if (server.lookupValue("hostname", address)==false) { + continue; + } + } if (server.lookupValue("port", port)==false) continue; if (server.lookupValue("hostgroup", hostgroup)==false) continue; server.lookupValue("status", status); @@ -5945,6 +6133,47 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() { return rows; } +int ProxySQL_Admin::Read_ProxySQL_Servers_from_configfile() { + const Setting& root = GloVars.confFile->cfg->getRoot(); + int i; + int rows=0; + admindb->execute("PRAGMA foreign_keys = OFF"); + if (root.exists("proxysql_servers")==true) { + const Setting &mysql_servers = root["proxysql_servers"]; + int count = mysql_servers.getLength(); + //fprintf(stderr, "Found %d servers\n",count); + char *q=(char *)"INSERT OR REPLACE INTO proxysql_servers (hostname, port, weight, comment) VALUES (\"%s\", %d, %d, '%s')"; + for (i=0; i< count; i++) { + const Setting &server = mysql_servers[i]; + std::string address; + int port; + int weight=0; + std::string comment=""; + if (server.lookupValue("address", address)==false) { + if (server.lookupValue("hostname", address)==false) { + continue; + } + } + if (server.lookupValue("port", port)==false) continue; + server.lookupValue("weight", weight); + server.lookupValue("comment", comment); + char *o1=strdup(comment.c_str()); + char *o=escape_string_single_quotes(o1, false); + char *query=(char *)malloc(strlen(q)+strlen(address.c_str())+strlen(o)+128); + sprintf(query, q, address.c_str(), port, weight, o); + proxy_info("Cluster: Adding ProxySQL Servers %s:%d from config file\n", address.c_str(), port); + //fprintf(stderr, "%s\n", query); + admindb->execute(query); + if (o!=o1) free(o); + free(o1); + free(query); + rows++; + } + } + admindb->execute("PRAGMA foreign_keys = ON"); + return rows; +} + extern "C" ProxySQL_Admin * create_ProxySQL_Admin_func() { return new ProxySQL_Admin(); } @@ -6422,19 +6651,19 @@ unsigned long long ProxySQL_External_Scheduler::run_once() { return next_run; } -void ProxySQL_Admin::load_proxysql_servers_to_runtime() { +void ProxySQL_Admin::load_proxysql_servers_to_runtime(bool _lock) { // make sure that the caller has called mysql_servers_wrlock() char *error=NULL; int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT hostname,port,weight,comment FROM proxysql_servers"; + char *query=(char *)"SELECT hostname, port, weight, comment FROM proxysql_servers ORDER BY hostname, port"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { proxy_error("Error on %s : %s\n", query, error); } else { - GloProxyCluster->load_servers_list(resultset); + GloProxyCluster->load_servers_list(resultset, _lock); // if (checksum_variables.checksum_mysql_query_rules) { pthread_mutex_lock(&GloVars.checksum_mutex); uint64_t hash1 = resultset->raw_checksum(); diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index 572890fbf..1577c58ba 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -9,10 +9,25 @@ #endif /* DEBUG */ #define PROXYSQL_CLUSTER_VERSION "0.1.0702" DEB + +#define SAFE_SQLITE3_STEP(_stmt) do {\ + do {\ + rc=sqlite3_step(_stmt);\ + if (rc!=SQLITE_DONE) {\ + assert(rc==SQLITE_LOCKED);\ + usleep(10);\ + }\ + } while (rc!=SQLITE_DONE);\ +} while (0) + + + static char *NODE_COMPUTE_DELIMITER=(char *)"-gtyw23a-"; // a random string used for hashing extern ProxySQL_Cluster * GloProxyCluster; +extern ProxySQL_Admin *GloAdmin; + typedef struct _proxy_node_address_t { pthread_t thrid; uint64_t hash; // unused for now @@ -35,6 +50,7 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) { mysql_thread_init(); pthread_detach(pthread_self()); + proxy_info("Cluster: starting thread for peer %s:%d\n", node->hostname, node->port); char *query1 = (char *)"SELECT GLOBAL_CHECKSUM()"; // in future this will be used for "light check" char *query2 = (char *)"SELECT * FROM stats_mysql_global ORDER BY Variable_Name"; char *query3 = (char *)"SELECT * FROM runtime_checksums_values ORDER BY name"; @@ -153,6 +169,7 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) { mysql_close(conn); } } else { + proxy_warning("Cluster: unable to connect to peer %s:%d . Error: %s\n", node->hostname, node->port, mysql_error(conn)); mysql_close(conn); conn = mysql_init(NULL); int ci = __sync_fetch_and_add(&GloProxyCluster->cluster_check_interval_ms,0); @@ -167,6 +184,7 @@ __exit_monitor_thread: if (conn->net.vio) { mysql_close(conn); } + proxy_info("Cluster: closing thread for peer %s:%d\n", node->hostname, node->port); free(node->hostname); free(node); //pthread_exit(0); @@ -217,9 +235,11 @@ ProxySQL_Node_Entry::ProxySQL_Node_Entry(char *_hostname, uint16_t _port, uint64 for (int i = 0; i < PROXYSQL_NODE_METRICS_LEN ; i++) { metrics[i] = new ProxySQL_Node_Metrics(); } + proxy_info("Created new Cluster Node Entry for host %s:%d\n",hostname,port); } ProxySQL_Node_Entry::~ProxySQL_Node_Entry() { + proxy_info("Destroyed Cluster Node Entry for host %s:%d\n",hostname,port); if (hostname) { free(hostname); hostname = NULL; @@ -302,6 +322,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { strcpy(checksums_values.mysql_query_rules.checksum, row[3]); checksums_values.mysql_query_rules.last_changed = now; checksums_values.mysql_query_rules.diff_check = 1; + proxy_info("Cluster: detected a new checksum for mysql_query_rules from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.mysql_query_rules.version, checksums_values.mysql_query_rules.epoch, checksums_values.mysql_query_rules.checksum); + if (strcmp(checksums_values.mysql_query_rules.checksum, GloVars.checksums_values.mysql_query_rules.checksum) == 0) { + proxy_info("Cluster: checksum for mysql_query_rules from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.mysql_query_rules.checksum); + } } else { checksums_values.mysql_query_rules.diff_check++; } @@ -318,6 +342,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { strcpy(checksums_values.mysql_servers.checksum, row[3]); checksums_values.mysql_servers.last_changed = now; checksums_values.mysql_servers.diff_check = 1; + proxy_info("Cluster: detected a new checksum for mysql_servers from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.mysql_servers.version, checksums_values.mysql_servers.epoch, checksums_values.mysql_servers.checksum); + if (strcmp(checksums_values.mysql_servers.checksum, GloVars.checksums_values.mysql_servers.checksum) == 0) { + proxy_info("Cluster: checksum for mysql_servers from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.mysql_servers.checksum); + } } else { checksums_values.mysql_servers.diff_check++; } @@ -334,6 +362,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { strcpy(checksums_values.mysql_users.checksum, row[3]); checksums_values.mysql_users.last_changed = now; checksums_values.mysql_users.diff_check = 1; + proxy_info("Cluster: detected a new checksum for mysql_users from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.mysql_users.version, checksums_values.mysql_users.epoch, checksums_values.mysql_users.checksum); + if (strcmp(checksums_values.mysql_users.checksum, GloVars.checksums_values.mysql_users.checksum) == 0) { + proxy_info("Cluster: checksum for mysql_users from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.mysql_users.checksum); + } } else { checksums_values.mysql_users.diff_check++; } @@ -366,6 +398,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { strcpy(checksums_values.proxysql_servers.checksum, row[3]); checksums_values.proxysql_servers.last_changed = now; checksums_values.proxysql_servers.diff_check = 1; + proxy_info("Cluster: detected a new checksum for proxysql_servers from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.proxysql_servers.version, checksums_values.proxysql_servers.epoch, checksums_values.proxysql_servers.checksum); + if (strcmp(checksums_values.proxysql_servers.checksum, GloVars.checksums_values.proxysql_servers.checksum) == 0) { + proxy_info("Cluster: checksum for proxysql_servers from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.proxysql_servers.checksum); + } } else { checksums_values.proxysql_servers.diff_check++; } @@ -383,14 +419,23 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { v->diff_check++; v = &checksums_values.mysql_query_rules; v->last_updated = now; + if (strcmp(v->checksum, GloVars.checksums_values.mysql_query_rules.checksum) == 0) { + v->diff_check = 0; + } if (v->diff_check) v->diff_check++; v = &checksums_values.mysql_servers; v->last_updated = now; + if (strcmp(v->checksum, GloVars.checksums_values.mysql_servers.checksum) == 0) { + v->diff_check = 0; + } if (v->diff_check) v->diff_check++; v = &checksums_values.mysql_users; v->last_updated = now; + if (strcmp(v->checksum, GloVars.checksums_values.mysql_users.checksum) == 0) { + v->diff_check = 0; + } if (v->diff_check) v->diff_check++; v = &checksums_values.mysql_variables; @@ -399,10 +444,460 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) { v->diff_check++; v = &checksums_values.proxysql_servers; v->last_updated = now; + if (strcmp(v->checksum, GloVars.checksums_values.proxysql_servers.checksum) == 0) { + v->diff_check = 0; + } if (v->diff_check) v->diff_check++; } pthread_mutex_unlock(&GloVars.checksum_mutex); + // we now do a series of checks, and we take action + // note that this is done outside the critical section + // as mutex on GloVars.checksum_mutex is already released + unsigned int diff_mqr = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_query_rules_diffs_before_sync,0); + unsigned int diff_ms = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_servers_diffs_before_sync,0); + unsigned int diff_mu = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_users_diffs_before_sync,0); + unsigned int diff_ps = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_proxysql_servers_diffs_before_sync,0); + ProxySQL_Checksum_Value_2 *v = NULL; + if (diff_mqr) { + unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.mysql_query_rules.version,0); + unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.mysql_query_rules.epoch,0); + v = &checksums_values.mysql_query_rules; + if (v->version > 1) { + if ( + (own_version == 1) // we just booted + || + (v->epoch > own_version) // epoch is newer + ) { + if (v->diff_check > diff_mqr) { + proxy_info("Cluster: detected a peer %s:%d with mysql_query_rules version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch); + GloProxyCluster->pull_mysql_query_rules_from_peer(); + } + } + } else { + if (v->diff_check && (v->diff_check % (diff_mqr*10)) == 0) { + proxy_warning("Cluster: detected a peer %s:%d with mysql_query_rules version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD MYSQL QUERY RULES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mqr*10)); + } + } + } + if (diff_ms) { + v = &checksums_values.mysql_servers; + unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.mysql_servers.version,0); + unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.mysql_servers.epoch,0); + if (v->version > 1) { + if ( + (own_version == 1) // we just booted + || + (v->epoch > own_version) // epoch is newer + ) { + if (v->diff_check > diff_ms) { + proxy_info("Cluster: detected a peer %s:%d with mysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch); + GloProxyCluster->pull_mysql_servers_from_peer(); + } + } + } else { + if (v->diff_check && (v->diff_check % (diff_ms*10)) == 0) { + proxy_warning("Cluster: detected a peer %s:%d with mysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD MYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_ms*10)); + } + } + } + if (diff_mu) { + v = &checksums_values.mysql_users; + unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.mysql_users.version,0); + unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.mysql_users.epoch,0); + if (v->version > 1) { + if ( + (own_version == 1) // we just booted + || + (v->epoch > own_version) // epoch is newer + ) { + if (v->diff_check > diff_mu) { + proxy_info("Cluster: detected a peer %s:%d with mysql_users version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch); + GloProxyCluster->pull_mysql_users_from_peer(); + } + } + } else { + if (v->diff_check && (v->diff_check % (diff_mu*10)) == 0) { + proxy_warning("Cluster: detected a peer %s:%d with mysql_users version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD MYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mu*10)); + } + } + } + if (diff_ps) { + v = &checksums_values.proxysql_servers; + unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.proxysql_servers.version,0); + unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.proxysql_servers.epoch,0); + if (v->version > 1) { + if ( + (own_version == 1) // we just booted + || + (v->epoch > own_version) // epoch is newer + ) { + if (v->diff_check > diff_ps) { + proxy_info("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch); + GloProxyCluster->pull_proxysql_servers_from_peer(); + } + } + } else { + if (v->diff_check && (v->diff_check % (diff_ms*10)) == 0) { + proxy_warning("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD PROXYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_ps*10)); + } + } + } +} + +void ProxySQL_Cluster::pull_mysql_query_rules_from_peer() { + char * hostname = NULL; + uint16_t port = 0; + pthread_mutex_lock(&GloProxyCluster->update_mysql_query_rules_mutex); + nodes.get_peer_to_sync_mysql_query_rules(&hostname, &port); + if (hostname) { + char *username = NULL; + char *password = NULL; + bool rc_bool = true; + MYSQL *rc_conn; + int rc_query; + int rc; + MYSQL *conn = mysql_init(NULL); + if (conn==NULL) { + proxy_error("Unable to run mysql_init()\n"); + goto __exit_pull_mysql_query_rules_from_peer; + } + GloProxyCluster->get_credentials(&username, &password); + if (strlen(username)) { // do not monitor if the username is empty + unsigned int timeout = 1; + unsigned int timeout_long = 60; + mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long); + mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); + proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d started\n", hostname, port); + rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + if (rc_conn) { + rc_query = mysql_query(conn,"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment FROM runtime_mysql_query_rules"); + if ( rc_query == 0 ) { + MYSQL_RES *result = mysql_store_result(conn); + GloAdmin->admindb->execute("DELETE FROM mysql_query_rules"); + MYSQL_ROW row; + char *q = (char *)"INSERT INTO mysql_query_rules (rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30)"; + sqlite3_stmt *statement1 = NULL; + sqlite3 *mydb3 = GloAdmin->admindb->get_db(); + rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0); + assert(rc==SQLITE_OK); + while ((row = mysql_fetch_row(result))) { + rc=sqlite3_bind_int64(statement1, 1, atoll(row[0])); assert(rc==SQLITE_OK); // rule_id + rc=sqlite3_bind_text(statement1, 2, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username + rc=sqlite3_bind_text(statement1, 3, row[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname + rc=sqlite3_bind_int64(statement1, 4, atoll(row[3])); assert(rc==SQLITE_OK); // flagIN + rc=sqlite3_bind_text(statement1, 5, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // client_addr + rc=sqlite3_bind_text(statement1, 6, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_addr + rc=sqlite3_bind_int64(statement1, 7, atoll(row[6])); assert(rc==SQLITE_OK); // proxy_port + rc=sqlite3_bind_text(statement1, 8, row[7], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // digest + rc=sqlite3_bind_text(statement1, 9, row[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_digest + rc=sqlite3_bind_text(statement1, 10, row[9], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_pattern + rc=sqlite3_bind_int64(statement1, 11, atoll(row[10])); assert(rc==SQLITE_OK); // negate_match_pattern + rc=sqlite3_bind_text(statement1, 12, row[11], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // re_modifiers + rc=sqlite3_bind_int64(statement1, 13, atoll(row[12])); assert(rc==SQLITE_OK); // flagOUT + rc=sqlite3_bind_text(statement1, 14, row[13], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // replace_pattern + rc=sqlite3_bind_int64(statement1, 15, atoll(row[14])); assert(rc==SQLITE_OK); // destination_hostgroup + rc=sqlite3_bind_int64(statement1, 16, atoll(row[15])); assert(rc==SQLITE_OK); // cache_ttl + rc=sqlite3_bind_int64(statement1, 17, atoll(row[16])); assert(rc==SQLITE_OK); // reconnect + rc=sqlite3_bind_int64(statement1, 18, atoll(row[17])); assert(rc==SQLITE_OK); // timeout + rc=sqlite3_bind_int64(statement1, 19, atoll(row[18])); assert(rc==SQLITE_OK); // retries + rc=sqlite3_bind_int64(statement1, 20, atoll(row[19])); assert(rc==SQLITE_OK); // delay + rc=sqlite3_bind_int64(statement1, 21, atoll(row[20])); assert(rc==SQLITE_OK); // next_query_flagIN + rc=sqlite3_bind_int64(statement1, 22, atoll(row[21])); assert(rc==SQLITE_OK); // mirror_flagOUT + rc=sqlite3_bind_int64(statement1, 23, atoll(row[22])); assert(rc==SQLITE_OK); // mirror_hostgroup + rc=sqlite3_bind_text(statement1, 24, row[23], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // error_msg + rc=sqlite3_bind_text(statement1, 25, row[24], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // OK_msg + rc=sqlite3_bind_int64(statement1, 26, atoll(row[25])); assert(rc==SQLITE_OK); // sticky_conn + rc=sqlite3_bind_int64(statement1, 27, atoll(row[26])); assert(rc==SQLITE_OK); // multiplex + rc=sqlite3_bind_int64(statement1, 28, atoll(row[27])); assert(rc==SQLITE_OK); // log + rc=sqlite3_bind_int64(statement1, 29, atoll(row[28])); assert(rc==SQLITE_OK); // apply + rc=sqlite3_bind_text(statement1, 30, row[29], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment + + SAFE_SQLITE3_STEP(statement1); + rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK); + } + mysql_free_result(result); + proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d completed\n", hostname, port); + proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port); + GloAdmin->load_mysql_query_rules_to_runtime(); + if (__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk,0) == true) { + proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port); + GloAdmin->flush_mysql_query_rules__from_memory_to_disk(); + } + } else { + proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } else { + proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } +__exit_pull_mysql_query_rules_from_peer: + if (conn) { + if (conn->net.vio) { + mysql_close(conn); + } + } + free(hostname); + } + pthread_mutex_unlock(&GloProxyCluster->update_mysql_query_rules_mutex); +} + +void ProxySQL_Cluster::pull_mysql_users_from_peer() { + char * hostname = NULL; + uint16_t port = 0; + pthread_mutex_lock(&GloProxyCluster->update_mysql_users_mutex); + nodes.get_peer_to_sync_mysql_users(&hostname, &port); + if (hostname) { + char *username = NULL; + char *password = NULL; + bool rc_bool = true; + MYSQL *rc_conn; + int rc_query; + int rc; + MYSQL *conn = mysql_init(NULL); + if (conn==NULL) { + proxy_error("Unable to run mysql_init()\n"); + goto __exit_pull_mysql_users_from_peer; + } + GloProxyCluster->get_credentials(&username, &password); + if (strlen(username)) { // do not monitor if the username is empty + unsigned int timeout = 1; + unsigned int timeout_long = 60; + mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long); + mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); + proxy_info("Cluster: Fetching MySQL Users from peer %s:%d started\n", hostname, port); + rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + if (rc_conn) { + rc_query = mysql_query(conn, "SELECT username, password, active, use_ssl, default_hostgroup, default_schema, schema_locked, transaction_persistent, fast_forward, backend, frontend, max_connections FROM runtime_mysql_users"); + if ( rc_query == 0 ) { + MYSQL_RES *result = mysql_store_result(conn); + GloAdmin->admindb->execute("DELETE FROM mysql_users"); + MYSQL_ROW row; + char *q = (char *)"INSERT INTO mysql_users (username, password, active, use_ssl, default_hostgroup, default_schema, schema_locked, transaction_persistent, fast_forward, backend, frontend, max_connections) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)"; + sqlite3_stmt *statement1 = NULL; + sqlite3 *mydb3 = GloAdmin->admindb->get_db(); + rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0); + assert(rc==SQLITE_OK); + while ((row = mysql_fetch_row(result))) { + rc=sqlite3_bind_text(statement1, 1, row[0], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username + rc=sqlite3_bind_text(statement1, 2, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // password + rc=sqlite3_bind_int64(statement1, 3, atoll(row[2])); assert(rc==SQLITE_OK); // active + rc=sqlite3_bind_int64(statement1, 4, atoll(row[3])); assert(rc==SQLITE_OK); // use_ssl + rc=sqlite3_bind_int64(statement1, 5, atoll(row[4])); assert(rc==SQLITE_OK); // default_hostgroup + rc=sqlite3_bind_text(statement1, 6, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // default_schema + rc=sqlite3_bind_int64(statement1, 7, atoll(row[6])); assert(rc==SQLITE_OK); // schema_locked + rc=sqlite3_bind_int64(statement1, 8, atoll(row[7])); assert(rc==SQLITE_OK); // transaction_persistent + rc=sqlite3_bind_int64(statement1, 9, atoll(row[8])); assert(rc==SQLITE_OK); // fast_forward + rc=sqlite3_bind_int64(statement1, 10, atoll(row[9])); assert(rc==SQLITE_OK); // backend + rc=sqlite3_bind_int64(statement1, 11, atoll(row[10])); assert(rc==SQLITE_OK); // frontend + rc=sqlite3_bind_int64(statement1, 12, atoll(row[11])); assert(rc==SQLITE_OK); // max_connection + + SAFE_SQLITE3_STEP(statement1); + rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK); + } + mysql_free_result(result); + proxy_info("Cluster: Fetching MySQL Users from peer %s:%d completed\n", hostname, port); + proxy_info("Cluster: Loading to runtime MySQL Users from peer %s:%d\n", hostname, port); + GloAdmin->init_users(); + if (__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk,0) == true) { + proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port); + GloAdmin->flush_mysql_users__from_memory_to_disk(); + } + } else { + proxy_info("Cluster: Fetching MySQL Users from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } else { + proxy_info("Cluster: Fetching MySQL Users from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } +__exit_pull_mysql_users_from_peer: + if (conn) { + if (conn->net.vio) { + mysql_close(conn); + } + } + free(hostname); + } + pthread_mutex_unlock(&GloProxyCluster->update_mysql_users_mutex); +} + +void ProxySQL_Cluster::pull_mysql_servers_from_peer() { + char * hostname = NULL; + uint16_t port = 0; + pthread_mutex_lock(&GloProxyCluster->update_mysql_servers_mutex); + nodes.get_peer_to_sync_mysql_servers(&hostname, &port); + if (hostname) { + char *username = NULL; + char *password = NULL; + bool rc_bool = true; + MYSQL *rc_conn; + int rc_query; + MYSQL *conn = mysql_init(NULL); + if (conn==NULL) { + proxy_error("Unable to run mysql_init()\n"); + goto __exit_pull_mysql_servers_from_peer; + } + GloProxyCluster->get_credentials(&username, &password); + if (strlen(username)) { // do not monitor if the username is empty + unsigned int timeout = 1; + unsigned int timeout_long = 60; + mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long); + mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); + proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d started\n", hostname, port); + rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + if (rc_conn) { + rc_query = mysql_query(conn,"SELECT hostgroup_id, hostname, port, status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM runtime_mysql_servers"); + if ( rc_query == 0 ) { + MYSQL_RES *result = mysql_store_result(conn); + GloAdmin->admindb->execute("DELETE FROM mysql_servers"); + MYSQL_ROW row; + char *q=(char *)"INSERT INTO mysql_servers (hostgroup_id, hostname, port, status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (%s, \"%s\", %s, \"%s\", %s, %s, %s, %s, %s, %s, '%s')"; + while ((row = mysql_fetch_row(result))) { + int i; + int l=0; + for (i=0; i<10; i++) { + l+=strlen(row[i]); + } + char *o=escape_string_single_quotes(row[10],false); + char *query = (char *)malloc(strlen(q)+i+strlen(o)+64); + + sprintf(query,q,row[0],row[1],row[2],row[3],row[4],row[5],row[6],row[7],row[8],row[9],o); + if (o!=row[10]) { // there was a copy + free(o); + } + GloAdmin->admindb->execute(query); + free(query); + } + mysql_free_result(result); + + rc_query = mysql_query(conn,"SELECT writer_hostgroup, reader_hostgroup, comment FROM mysql_replication_hostgroups"); + if ( rc_query == 0 ) { + MYSQL_RES *result = mysql_store_result(conn); + GloAdmin->admindb->execute("DELETE FROM mysql_replication_hostgroups"); + MYSQL_ROW row; + char *q=(char *)"INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, comment) VALUES (%s, %s, '%s')"; + while ((row = mysql_fetch_row(result))) { + int i; + int l=0; + for (i=0; i<2; i++) { + l+=strlen(row[i]); + } + char *o=escape_string_single_quotes(row[2],false); + char *query = (char *)malloc(strlen(q)+i+strlen(o)+64); + + sprintf(query,q,row[0],row[1],o); + if (o!=row[2]) { // there was a copy + free(o); + } + GloAdmin->admindb->execute(query); + free(query); + } + mysql_free_result(result); + proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d completed\n", hostname, port); + proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port); + GloAdmin->load_mysql_servers_to_runtime(); + if (__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_servers_save_to_disk,0) == true) { + proxy_info("Cluster: Saving to disk MySQL Servers from peer %s:%d\n", hostname, port); + GloAdmin->flush_mysql_servers__from_memory_to_disk(); + } + } else { + proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } else { + proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } else { + proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } +__exit_pull_mysql_servers_from_peer: + if (conn) { + if (conn->net.vio) { + mysql_close(conn); + } + } + free(hostname); + } + pthread_mutex_unlock(&GloProxyCluster->update_mysql_servers_mutex); +} + +void ProxySQL_Cluster::pull_proxysql_servers_from_peer() { + char * hostname = NULL; + uint16_t port = 0; + pthread_mutex_lock(&GloProxyCluster->update_proxysql_servers_mutex); + nodes.get_peer_to_sync_proxysql_servers(&hostname, &port); + if (hostname) { + char *username = NULL; + char *password = NULL; + bool rc_bool = true; + MYSQL *rc_conn; + int rc_query; + MYSQL *conn = mysql_init(NULL); + if (conn==NULL) { + proxy_error("Unable to run mysql_init()\n"); + goto __exit_pull_proxysql_servers_from_peer; + } + GloProxyCluster->get_credentials(&username, &password); + if (strlen(username)) { // do not monitor if the username is empty + unsigned int timeout = 1; + unsigned int timeout_long = 60; + mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout); + mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long); + mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); + proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d started\n", hostname, port); + rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + if (rc_conn) { + rc_query = mysql_query(conn,"SELECT hostname, port, weight, comment FROM runtime_proxysql_servers"); + if ( rc_query == 0 ) { + MYSQL_RES *result = mysql_store_result(conn); + GloAdmin->admindb->execute("DELETE FROM proxysql_servers"); + MYSQL_ROW row; + char *q=(char *)"INSERT INTO proxysql_servers (hostname, port, weight, comment) VALUES (\"%s\", %s, %s, '%s')"; + while ((row = mysql_fetch_row(result))) { + int i; + int l=0; + for (i=0; i<3; i++) { + l+=strlen(row[i]); + } + char *o=escape_string_single_quotes(row[3],false); + char *query = (char *)malloc(strlen(q)+i+strlen(o)+64); + + sprintf(query,q,row[0],row[1],row[2],o); + if (o!=row[3]) { // there was a copy + free(o); + } + GloAdmin->admindb->execute(query); + free(query); + } + mysql_free_result(result); + proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d completed\n", hostname, port); + proxy_info("Cluster: Loading to runtime ProxySQL Servers from peer %s:%d\n", hostname, port); + GloAdmin->load_proxysql_servers_to_runtime(false); + if (__sync_fetch_and_add(&GloProxyCluster->cluster_proxysql_servers_save_to_disk,0) == true) { + proxy_info("Cluster: Saving to disk ProxySQL Servers from peer %s:%d\n", hostname, port); + GloAdmin->flush_proxysql_servers__from_memory_to_disk(); + } + } else { + proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } else { + proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + } + } +__exit_pull_proxysql_servers_from_peer: + if (conn) { + if (conn->net.vio) { + mysql_close(conn); + } + } + free(hostname); + } + pthread_mutex_unlock(&GloProxyCluster->update_proxysql_servers_mutex); } void ProxySQL_Node_Entry::set_metrics(MYSQL_RES *_r, unsigned long long _response_time) { @@ -487,8 +982,9 @@ uint64_t ProxySQL_Cluster_Nodes::generate_hash(char *_hostname, uint16_t _port) return hash_; } -void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset) { - pthread_mutex_lock(&mutex); +void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset, bool _lock) { + if (_lock) + pthread_mutex_lock(&mutex); set_all_inactive(); for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; @@ -521,7 +1017,8 @@ void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset) { } } remove_inactives(); - pthread_mutex_unlock(&mutex); + if (_lock) + pthread_mutex_unlock(&mutex); } // if it returns false , the node doesn't exist anymore and the monitor should stop @@ -578,6 +1075,130 @@ bool ProxySQL_Cluster_Nodes::Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_R return ret; } +void ProxySQL_Cluster_Nodes::get_peer_to_sync_mysql_query_rules(char **host, uint16_t *port) { + unsigned long long version = 0; + unsigned long long epoch = 0; + char *hostname = NULL; + uint16_t p = 0; +// pthread_mutex_lock(&mutex); + //unsigned long long curtime = monotonic_time(); + for( std::unordered_map::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) { + ProxySQL_Node_Entry * node = it->second; + ProxySQL_Checksum_Value_2 * v = &node->checksums_values.mysql_query_rules; + if (v->version > 1) { + if ( v->epoch > epoch && v->diff_check > 3) { + epoch = v->epoch; + version = v->version; + if (hostname) { + free(hostname); + } + hostname=strdup(node->get_hostname()); + p = node->get_port(); + } + } + it++; + } +// pthread_mutex_unlock(&mutex); + if (hostname) { + *host = hostname; + *port = p; + } + proxy_info("Cluster: detected peer %s:%d with mysql_query_rules version %llu, epoch %llu\n", hostname, p, version, epoch); +} + +void ProxySQL_Cluster_Nodes::get_peer_to_sync_mysql_servers(char **host, uint16_t *port) { + unsigned long long version = 0; + unsigned long long epoch = 0; + char *hostname = NULL; + uint16_t p = 0; +// pthread_mutex_lock(&mutex); + //unsigned long long curtime = monotonic_time(); + for( std::unordered_map::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) { + ProxySQL_Node_Entry * node = it->second; + ProxySQL_Checksum_Value_2 * v = &node->checksums_values.mysql_servers; + if (v->version > 1) { + if ( v->epoch > epoch && v->diff_check > 3) { + epoch = v->epoch; + version = v->version; + if (hostname) { + free(hostname); + } + hostname=strdup(node->get_hostname()); + p = node->get_port(); + } + } + it++; + } +// pthread_mutex_unlock(&mutex); + if (hostname) { + *host = hostname; + *port = p; + } + proxy_info("Cluster: detected peer %s:%d with mysql_servers version %llu, epoch %llu\n", hostname, p, version, epoch); +} + +void ProxySQL_Cluster_Nodes::get_peer_to_sync_mysql_users(char **host, uint16_t *port) { + unsigned long long version = 0; + unsigned long long epoch = 0; + char *hostname = NULL; + uint16_t p = 0; +// pthread_mutex_lock(&mutex); + //unsigned long long curtime = monotonic_time(); + for( std::unordered_map::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) { + ProxySQL_Node_Entry * node = it->second; + ProxySQL_Checksum_Value_2 * v = &node->checksums_values.mysql_users; + if (v->version > 1) { + if ( v->epoch > epoch && v->diff_check > 3) { + epoch = v->epoch; + version = v->version; + if (hostname) { + free(hostname); + } + hostname=strdup(node->get_hostname()); + p = node->get_port(); + } + } + it++; + } +// pthread_mutex_unlock(&mutex); + if (hostname) { + *host = hostname; + *port = p; + } + proxy_info("Cluster: detected peer %s:%d with mysql_users version %llu, epoch %llu\n", hostname, p, version, epoch); +} + +void ProxySQL_Cluster_Nodes::get_peer_to_sync_proxysql_servers(char **host, uint16_t *port) { + unsigned long long version = 0; + unsigned long long epoch = 0; + char *hostname = NULL; + uint16_t p = 0; +// pthread_mutex_lock(&mutex); + //unsigned long long curtime = monotonic_time(); + for( std::unordered_map::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) { + ProxySQL_Node_Entry * node = it->second; + ProxySQL_Checksum_Value_2 * v = &node->checksums_values.proxysql_servers; + if (v->version > 1) { + if ( v->epoch > epoch && v->diff_check > 3) { + epoch = v->epoch; + version = v->version; + if (hostname) { + free(hostname); + } + hostname=strdup(node->get_hostname()); + p = node->get_port(); + } + } + it++; + } +// pthread_mutex_unlock(&mutex); + if (hostname) { + *host = hostname; + *port = p; + } + proxy_info("Cluster: detected peer %s:%d with proxysql_servers version %llu, epoch %llu\n", hostname, p, version, epoch); +} + SQLite3_result * ProxySQL_Cluster_Nodes::stats_proxysql_servers_checksums() { const int colnum=9; SQLite3_result *result=new SQLite3_result(colnum); @@ -746,10 +1367,22 @@ SQLite3_result * ProxySQL_Cluster_Nodes::dump_table_proxysql_servers() { ProxySQL_Cluster::ProxySQL_Cluster() { pthread_mutex_init(&mutex,NULL); + pthread_mutex_init(&update_mysql_query_rules_mutex,NULL); + pthread_mutex_init(&update_mysql_servers_mutex,NULL); + pthread_mutex_init(&update_mysql_users_mutex,NULL); + pthread_mutex_init(&update_proxysql_servers_mutex,NULL); cluster_username = strdup((char *)""); cluster_password = strdup((char *)""); cluster_check_interval_ms = 1000; cluster_check_status_frequency = 10; + cluster_mysql_query_rules_diffs_before_sync = 3; + cluster_mysql_servers_diffs_before_sync = 3; + cluster_mysql_users_diffs_before_sync = 3; + cluster_proxysql_servers_diffs_before_sync = 3; + cluster_mysql_query_rules_save_to_disk = true; + cluster_mysql_servers_save_to_disk = true; + cluster_mysql_users_save_to_disk = true; + cluster_proxysql_servers_save_to_disk = true; } ProxySQL_Cluster::~ProxySQL_Cluster() { diff --git a/src/main.cpp b/src/main.cpp index 300d5d712..9f8852402 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -569,6 +569,7 @@ void ProxySQL_Main_init_phase3___start_all() { { cpu_timer t; GloAdmin->init_mysql_servers(); + GloAdmin->init_proxysql_servers(); GloAdmin->load_scheduler_to_runtime(); #ifdef DEBUG std::cerr << "Main phase3 : GloAdmin initialized in ";