A lot of changes all related to Cluster

In `MySQL_Cluster` class added functions to sync with remote node, like:
- `pull_mysql_query_rules_from_peer()`
- `pull_mysql_servers_from_peer()`
- `pull_mysql_users_from_peer()`
- `pull_proxysql_servers_from_peer()`

Added 8 new global variables in Admin.
4 variables determine after how many different checks the remote configuration will be synced:
- cluster_mysql_query_rules_diffs_before_sync
- cluster_mysql_servers_diffs_before_sync
- cluster_mysql_users_diffs_before_sync
- cluster_proxysql_servers_diffs_before_sync
4 variables determine if after a remote sync the changes need to be written to disk:
- cluster_mysql_query_rules_save_to_disk
- cluster_mysql_servers_save_to_disk
- cluster_mysql_users_save_to_disk
- cluster_proxysql_servers_save_to_disk

Table `proxysql_servers` is now automatically loaded from disk to memory and into runtime at startup.

Added new Admin's command `LOAD PROXYSQL SERVERS FROM CONFIG` to load `proxysql_servers` from config file to memory (not runtime).

Internal structures with credentials in MySQL_Authentication moved from `unsorted_map` to `map` : this to ensure the right order when generating the checksum.

Config file supports both `address` and `hostname` for `mysql_servers` and `proxysql_servers` , #1091

`ProxySQL_Admin::load_proxysql_servers_to_runtime()` now has a lock or no lock option, to avoid deadlock

For now, Cluster module is quite verbose.
pull/1169/head
René Cannaò 9 years ago
parent 2a76f95e31
commit e3c4cb9964

@ -30,7 +30,7 @@ typedef struct _account_details_t {
#endif /* DEBUG */
#define MYSQL_AUTHENTICATION_VERSION "0.2.0902" DEB
typedef std::unordered_map<uint64_t, account_details_t *> umap_auth;
typedef std::map<uint64_t, account_details_t *> umap_auth;
class PtrArray;
@ -49,6 +49,7 @@ class MySQL_Authentication {
creds_group_t creds_backends;
creds_group_t creds_frontends;
bool _reset(enum cred_username_type usertype);
uint64_t _get_runtime_checksum(enum cred_username_type usertype);
public:
MySQL_Authentication();
~MySQL_Authentication();
@ -64,6 +65,7 @@ class MySQL_Authentication {
void remove_inactives(enum cred_username_type usertype);
bool set_SHA1(char *username, enum cred_username_type usertype, void *sha_pass);
unsigned int memory_usage();
uint64_t get_runtime_checksum();
};
#endif /* __CLASS_MYSQL_AUTHENTICATION_H */

@ -51,6 +51,9 @@ class ProxySQL_Node_Entry {
int metrics_idx_prev;
int metrics_idx;
ProxySQL_Node_Metrics **metrics;
// void pull_mysql_query_rules_from_peer();
// void pull_mysql_servers_from_peer();
// void pull_proxysql_servers_from_peer();
public:
uint64_t get_hash();
@ -95,13 +98,17 @@ class ProxySQL_Cluster_Nodes {
public:
ProxySQL_Cluster_Nodes();
~ProxySQL_Cluster_Nodes();
void load_servers_list(SQLite3_result *);
void load_servers_list(SQLite3_result *, bool _lock);
bool Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_RES *_r, unsigned long long _response_time);
bool Update_Global_Checksum(char * _h, uint16_t _p, MYSQL_RES *_r);
bool Update_Node_Checksums(char * _h, uint16_t _p, MYSQL_RES *_r);
SQLite3_result * dump_table_proxysql_servers();
SQLite3_result * stats_proxysql_servers_checksums();
SQLite3_result * stats_proxysql_servers_metrics();
void get_peer_to_sync_mysql_query_rules(char **host, uint16_t *port);
void get_peer_to_sync_mysql_servers(char **host, uint16_t *port);
void get_peer_to_sync_mysql_users(char **host, uint16_t *port);
void get_peer_to_sync_proxysql_servers(char **host, uint16_t *port);
};
@ -113,14 +120,26 @@ class ProxySQL_Cluster {
char *cluster_username;
char *cluster_password;
public:
pthread_mutex_t update_mysql_query_rules_mutex;
pthread_mutex_t update_mysql_servers_mutex;
pthread_mutex_t update_mysql_users_mutex;
pthread_mutex_t update_proxysql_servers_mutex;
int cluster_check_interval_ms;
int cluster_check_status_frequency;
int cluster_mysql_query_rules_diffs_before_sync;
int cluster_mysql_servers_diffs_before_sync;
int cluster_mysql_users_diffs_before_sync;
int cluster_proxysql_servers_diffs_before_sync;
bool cluster_mysql_query_rules_save_to_disk;
bool cluster_mysql_servers_save_to_disk;
bool cluster_mysql_users_save_to_disk;
bool cluster_proxysql_servers_save_to_disk;
ProxySQL_Cluster();
~ProxySQL_Cluster();
void init() {};
void print_version();
void load_servers_list(SQLite3_result *r) {
nodes.load_servers_list(r);
void load_servers_list(SQLite3_result *r, bool _lock = true) {
nodes.load_servers_list(r, _lock);
}
void get_credentials(char **, char **);
void set_username(char *);
@ -145,5 +164,9 @@ class ProxySQL_Cluster {
}
void thread_ending(pthread_t);
void join_term_thread();
void pull_mysql_query_rules_from_peer();
void pull_mysql_servers_from_peer();
void pull_mysql_users_from_peer();
void pull_proxysql_servers_from_peer();
};
#endif /* CLASS_PROXYSQL_CLUSTER_H */

@ -88,6 +88,14 @@ class ProxySQL_Admin {
char * cluster_password;
int cluster_check_interval_ms;
int cluster_check_status_frequency;
int cluster_mysql_query_rules_diffs_before_sync;
int cluster_mysql_servers_diffs_before_sync;
int cluster_mysql_users_diffs_before_sync;
int cluster_proxysql_servers_diffs_before_sync;
bool cluster_mysql_query_rules_save_to_disk;
bool cluster_mysql_servers_save_to_disk;
bool cluster_mysql_users_save_to_disk;
bool cluster_proxysql_servers_save_to_disk;
#ifdef DEBUG
bool debug;
#endif /* DEBUG */
@ -165,6 +173,7 @@ class ProxySQL_Admin {
void init_users();
void init_mysql_servers();
void init_mysql_query_rules();
void init_proxysql_servers();
void save_mysql_users_runtime_to_database(bool _runtime);
void save_mysql_servers_runtime_to_database(bool);
void admin_shutdown();
@ -215,6 +224,7 @@ class ProxySQL_Admin {
int Read_MySQL_Query_Rules_from_configfile();
int Read_MySQL_Servers_from_configfile();
int Read_Scheduler_from_configfile();
int Read_ProxySQL_Servers_from_configfile();
void flush_error_log();
void GenericRefreshStatistics(const char *query_no_space, unsigned int query_no_space_length, bool admin);
@ -230,7 +240,7 @@ class ProxySQL_Admin {
void flush_configdb(); // 923
// Cluster
void load_proxysql_servers_to_runtime();
void load_proxysql_servers_to_runtime(bool _lock=true);
void flush_proxysql_servers__from_memory_to_disk();
void flush_proxysql_servers__from_disk_to_memory();
void save_proxysql_servers_runtime_to_database(bool);

@ -90,7 +90,7 @@ bool MySQL_Authentication::add(char * username, char * password, enum cred_usern
#else
spin_wrlock(&cg.lock);
#endif
std::unordered_map<uint64_t, account_details_t *>::iterator lookup;
std::map<uint64_t, account_details_t *>::iterator lookup;
lookup = cg.bt_map.find(hash1);
// few changes will follow, due to issue #802
account_details_t *ad=NULL;
@ -273,7 +273,7 @@ int MySQL_Authentication::increase_frontend_user_connections(char *username, int
#else
spin_wrlock(&cg.lock);
#endif
std::unordered_map<uint64_t, account_details_t *>::iterator it;
std::map<uint64_t, account_details_t *>::iterator it;
it = cg.bt_map.find(hash1);
if (it != cg.bt_map.end()) {
account_details_t *ad=it->second;
@ -306,7 +306,7 @@ void MySQL_Authentication::decrease_frontend_user_connections(char *username) {
#else
spin_wrlock(&cg.lock);
#endif
std::unordered_map<uint64_t, account_details_t *>::iterator it;
std::map<uint64_t, account_details_t *>::iterator it;
it = cg.bt_map.find(hash1);
if (it != cg.bt_map.end()) {
account_details_t *ad=it->second;
@ -338,7 +338,7 @@ bool MySQL_Authentication::del(char * username, enum cred_username_type usertype
#else
spin_wrlock(&cg.lock);
#endif
std::unordered_map<uint64_t, account_details_t *>::iterator lookup;
std::map<uint64_t, account_details_t *>::iterator lookup;
lookup = cg.bt_map.find(hash1);
if (lookup != cg.bt_map.end()) {
account_details_t *ad=lookup->second;
@ -376,7 +376,7 @@ bool MySQL_Authentication::set_SHA1(char * username, enum cred_username_type use
#else
spin_wrlock(&cg.lock);
#endif
std::unordered_map<uint64_t, account_details_t *>::iterator lookup;
std::map<uint64_t, account_details_t *>::iterator lookup;
lookup = cg.bt_map.find(hash1);
if (lookup != cg.bt_map.end()) {
account_details_t *ad=lookup->second;
@ -410,7 +410,7 @@ char * MySQL_Authentication::lookup(char * username, enum cred_username_type use
#else
spin_rdlock(&cg.lock);
#endif
std::unordered_map<uint64_t, account_details_t *>::iterator lookup;
std::map<uint64_t, account_details_t *>::iterator lookup;
lookup = cg.bt_map.find(hash1);
if (lookup != cg.bt_map.end()) {
account_details_t *ad=lookup->second;
@ -446,7 +446,7 @@ bool MySQL_Authentication::_reset(enum cred_username_type usertype) {
#else
spin_wrlock(&cg.lock);
#endif
std::unordered_map<uint64_t, account_details_t *>::iterator lookup;
std::map<uint64_t, account_details_t *>::iterator lookup;
while (cg.bt_map.size()) {
lookup = cg.bt_map.begin();
@ -476,3 +476,35 @@ bool MySQL_Authentication::reset() {
_reset(USERNAME_FRONTEND);
return true;
}
uint64_t MySQL_Authentication::_get_runtime_checksum(enum cred_username_type usertype) {
creds_group_t &cg=(usertype==USERNAME_BACKEND ? creds_backends : creds_frontends);
std::map<uint64_t, account_details_t *>::iterator it;
if (cg.bt_map.size() == 0) {
return 0;
}
SpookyHash myhash;
myhash.Init(13,4);
for (it = cg.bt_map.begin(); it != cg.bt_map.end(); ) {
account_details_t *ad=it->second;
myhash.Update(&ad->use_ssl,sizeof(ad->use_ssl));
myhash.Update(&ad->default_hostgroup,sizeof(ad->default_hostgroup));
myhash.Update(&ad->schema_locked,sizeof(ad->schema_locked));
myhash.Update(&ad->transaction_persistent,sizeof(ad->transaction_persistent));
myhash.Update(&ad->fast_forward,sizeof(ad->fast_forward));
myhash.Update(&ad->max_connections,sizeof(ad->max_connections));
myhash.Update(ad->username,strlen(ad->username));
myhash.Update(ad->password,strlen(ad->password));
it++;
}
uint64_t hash1, hash2;
myhash.Final(&hash1, &hash2);
return hash1;
}
uint64_t MySQL_Authentication::get_runtime_checksum() {
uint64_t hashB = _get_runtime_checksum(USERNAME_BACKEND);
uint64_t hashF = _get_runtime_checksum(USERNAME_FRONTEND);
return hashB+hashF;
}

@ -702,14 +702,19 @@ bool MySQL_HostGroups_Manager::commit() {
if ( GloAdmin && GloAdmin->checksum_variables.checksum_mysql_servers ) {
uint64_t hash1, hash2;
uint64_t hash1=0, hash2=0;
SpookyHash myhash;
char buf[80];
myhash.Init(19,3);
bool init = false;
/* removing all this code, because we need them ordered
MySrvC *mysrvc=NULL;
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
if (init == false) {
init = true;
myhash.Init(19,3);
}
mysrvc=myhgc->mysrvs->idx(j);
// hostgroup
sprintf(buf,"%u",mysrvc->myhgc->hid);
@ -747,7 +752,69 @@ bool MySQL_HostGroups_Manager::commit() {
} else { myhash.Update("",0); }
}
}
myhash.Final(&hash1, &hash2);
*/
{
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT * FROM mysql_servers WHERE ORDER BY hostgroup_id, hostname, port";
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (resultset) {
if (resultset->rows_count) {
if (init == false) {
init = true;
myhash.Init(19,3);
}
uint64_t hash1_ = resultset->raw_checksum();
myhash.Update(&hash1_, sizeof(hash1_));
}
delete resultset;
}
}
{
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT * FROM mysql_replication_hostgroups ORDER BY writer_hostgroup";
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (resultset) {
if (resultset->rows_count) {
if (init == false) {
init = true;
myhash.Init(19,3);
}
uint64_t hash1_ = resultset->raw_checksum();
myhash.Update(&hash1_, sizeof(hash1_));
}
delete resultset;
}
}
{
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT * FROM mysql_group_replication_hostgroups ORDER BY writer_hostgroup";
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (resultset) {
if (resultset->rows_count) {
if (init == false) {
init = true;
myhash.Init(19,3);
}
uint64_t hash1_ = resultset->raw_checksum();
myhash.Update(&hash1_, sizeof(hash1_));
}
delete resultset;
}
}
if (init == true) {
myhash.Final(&hash1, &hash2);
}
uint32_t d32[2];
memcpy(&d32,&hash1,sizeof(hash1));
sprintf(buf,"0x%0X%0X", d32[0], d32[1]);

@ -282,6 +282,14 @@ static char * admin_variables_names[]= {
(char *)"cluster_password",
(char *)"cluster_check_interval_ms",
(char *)"cluster_check_status_frequency",
(char *)"cluster_mysql_query_rules_diffs_before_sync",
(char *)"cluster_mysql_servers_diffs_before_sync",
(char *)"cluster_mysql_users_diffs_before_sync",
(char *)"cluster_proxysql_servers_diffs_before_sync",
(char *)"cluster_mysql_query_rules_save_to_disk",
(char *)"cluster_mysql_servers_save_to_disk",
(char *)"cluster_mysql_users_save_to_disk",
(char *)"cluster_proxysql_servers_save_to_disk",
(char *)"checksum_mysql_query_rules",
(char *)"checksum_mysql_servers",
(char *)"checksum_mysql_users",
@ -1275,6 +1283,34 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query
return false;
}
if (
(query_no_space_length==strlen("LOAD PROXYSQL SERVERS FROM CONFIG") && !strncasecmp("LOAD PROXYSQL SERVERS FROM CONFIG",query_no_space, query_no_space_length))
) {
proxy_info("Received %s command\n", query_no_space);
if (GloVars.configfile_open) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Loading from file %s\n", GloVars.config_file);
if (GloVars.confFile->OpenFile(NULL)==true) {
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
int rows=0;
rows=SPA->Read_ProxySQL_Servers_from_configfile();
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Loaded ProxySQL servers from CONFIG\n");
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, rows);
GloVars.confFile->CloseFile();
} else {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Unable to open or parse config file %s\n", GloVars.config_file);
char *s=(char *)"Unable to open or parse config file %s";
char *m=(char *)malloc(strlen(s)+strlen(GloVars.config_file)+1);
sprintf(m,s,GloVars.config_file);
SPA->send_MySQL_ERR(&sess->client_myds->myprot, m);
free(m);
}
} else {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Unknown config file\n");
SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"Config file unknown");
}
return false;
}
}
if ((query_no_space_length>23) && ( (!strncasecmp("SAVE MYSQL QUERY RULES ", query_no_space, 23)) || (!strncasecmp("LOAD MYSQL QUERY RULES ", query_no_space, 23))) ) {
@ -2791,9 +2827,17 @@ ProxySQL_Admin::ProxySQL_Admin() {
variables.cluster_password=strdup((char *)"");
variables.cluster_check_interval_ms=1000;
variables.cluster_check_status_frequency=10;
variables.cluster_mysql_query_rules_diffs_before_sync = 3;
variables.cluster_mysql_servers_diffs_before_sync = 3;
variables.cluster_mysql_users_diffs_before_sync = 3;
variables.cluster_proxysql_servers_diffs_before_sync = 3;
checksum_variables.checksum_mysql_query_rules = true;
checksum_variables.checksum_mysql_servers = true;
checksum_variables.checksum_mysql_users = true;
variables.cluster_mysql_query_rules_save_to_disk = true;
variables.cluster_mysql_servers_save_to_disk = true;
variables.cluster_mysql_users_save_to_disk = true;
variables.cluster_proxysql_servers_save_to_disk = true;
#ifdef DEBUG
variables.debug=GloVars.global.gdbg;
#endif /* DEBUG */
@ -3010,6 +3054,7 @@ bool ProxySQL_Admin::init() {
Read_MySQL_Users_from_configfile();
Read_MySQL_Query_Rules_from_configfile();
Read_Scheduler_from_configfile();
Read_ProxySQL_Servers_from_configfile();
__insert_or_replace_disktable_select_maintable();
} else {
if (GloVars.confFile->OpenFile(GloVars.config_file)==true) {
@ -3019,6 +3064,7 @@ bool ProxySQL_Admin::init() {
Read_Global_Variables_from_configfile("admin");
Read_Global_Variables_from_configfile("mysql");
Read_Scheduler_from_configfile();
Read_ProxySQL_Servers_from_configfile();
__insert_or_replace_disktable_select_maintable();
}
}
@ -3378,6 +3424,34 @@ char * ProxySQL_Admin::get_variable(char *name) {
sprintf(intbuf,"%d",variables.cluster_check_status_frequency);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_mysql_query_rules_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_mysql_query_rules_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_mysql_servers_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_mysql_servers_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_mysql_users_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_mysql_users_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_proxysql_servers_diffs_before_sync")) {
sprintf(intbuf,"%d",variables.cluster_proxysql_servers_diffs_before_sync);
return strdup(intbuf);
}
if (!strcasecmp(name,"cluster_mysql_query_rules_save_to_disk")) {
return strdup((variables.cluster_mysql_query_rules_save_to_disk ? "true" : "false"));
}
if (!strcasecmp(name,"cluster_mysql_servers_save_to_disk")) {
return strdup((variables.cluster_mysql_servers_save_to_disk ? "true" : "false"));
}
if (!strcasecmp(name,"cluster_mysql_users_save_to_disk")) {
return strdup((variables.cluster_mysql_users_save_to_disk ? "true" : "false"));
}
if (!strcasecmp(name,"cluster_proxysql_servers_save_to_disk")) {
return strdup((variables.cluster_proxysql_servers_save_to_disk ? "true" : "false"));
}
if (!strcasecmp(name,"refresh_interval")) {
sprintf(intbuf,"%d",variables.refresh_interval);
return strdup(intbuf);
@ -3598,6 +3672,46 @@ bool ProxySQL_Admin::set_variable(char *name, char *value) { // this is the pub
return false;
}
}
if (!strcasecmp(name,"cluster_mysql_query_rules_diffs_before_sync")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 1000) {
variables.cluster_mysql_query_rules_diffs_before_sync=intv;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_query_rules_diffs_before_sync, intv);
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"cluster_mysql_servers_diffs_before_sync")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 1000) {
variables.cluster_mysql_servers_diffs_before_sync=intv;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_servers_diffs_before_sync, intv);
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"cluster_mysql_users_diffs_before_sync")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 1000) {
variables.cluster_mysql_users_diffs_before_sync=intv;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_users_diffs_before_sync, intv);
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"cluster_proxysql_servers_diffs_before_sync")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 1000) {
variables.cluster_proxysql_servers_diffs_before_sync=intv;
__sync_lock_test_and_set(&GloProxyCluster->cluster_proxysql_servers_diffs_before_sync, intv);
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"version")) {
if (strcasecmp(value,(char *)PROXYSQL_VERSION)==0) {
return true;
@ -3616,6 +3730,58 @@ bool ProxySQL_Admin::set_variable(char *name, char *value) { // this is the pub
}
return false;
}
if (!strcasecmp(name,"cluster_mysql_query_rules_save_to_disk")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.cluster_mysql_query_rules_save_to_disk=true;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk, true);
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.cluster_mysql_query_rules_save_to_disk=false;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk, false);
return true;
}
return false;
}
if (!strcasecmp(name,"cluster_mysql_servers_save_to_disk")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.cluster_mysql_servers_save_to_disk=true;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_servers_save_to_disk, true);
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.cluster_mysql_servers_save_to_disk=false;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_servers_save_to_disk, false);
return true;
}
return false;
}
if (!strcasecmp(name,"cluster_mysql_users_save_to_disk")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.cluster_mysql_users_save_to_disk=true;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_users_save_to_disk, true);
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.cluster_mysql_users_save_to_disk=false;
__sync_lock_test_and_set(&GloProxyCluster->cluster_mysql_users_save_to_disk, false);
return true;
}
return false;
}
if (!strcasecmp(name,"cluster_proxysql_servers_save_to_disk")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.cluster_proxysql_servers_save_to_disk=true;
__sync_lock_test_and_set(&GloProxyCluster->cluster_proxysql_servers_save_to_disk, true);
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.cluster_proxysql_servers_save_to_disk=false;
__sync_lock_test_and_set(&GloProxyCluster->cluster_proxysql_servers_save_to_disk, false);
return true;
}
return false;
}
if (!strcasecmp(name,"checksum_mysql_query_rules")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
checksum_variables.checksum_mysql_query_rules=true;
@ -4443,58 +4609,62 @@ int ProxySQL_Admin::flush_debug_levels_database_to_runtime(SQLite3DB *db) {
void ProxySQL_Admin::__insert_or_ignore_maintable_select_disktable() {
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("INSERT OR IGNORE INTO main.mysql_servers SELECT * FROM disk.mysql_servers");
admindb->execute("INSERT OR IGNORE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups");
admindb->execute("INSERT OR IGNORE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups");
admindb->execute("INSERT OR IGNORE INTO main.mysql_users SELECT * FROM disk.mysql_users");
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("INSERT OR IGNORE INTO main.mysql_servers SELECT * FROM disk.mysql_servers");
admindb->execute("INSERT OR IGNORE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups");
admindb->execute("INSERT OR IGNORE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups");
admindb->execute("INSERT OR IGNORE INTO main.mysql_users SELECT * FROM disk.mysql_users");
admindb->execute("INSERT OR IGNORE INTO main.mysql_query_rules SELECT * FROM disk.mysql_query_rules");
admindb->execute("INSERT OR IGNORE INTO main.global_variables SELECT * FROM disk.global_variables");
admindb->execute("INSERT OR IGNORE INTO main.scheduler SELECT * FROM disk.scheduler");
admindb->execute("INSERT OR IGNORE INTO main.proxysql_servers SELECT * FROM disk.proxysql_servers");
#ifdef DEBUG
admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels");
admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels");
#endif /* DEBUG */
admindb->execute("PRAGMA foreign_keys = ON");
admindb->execute("PRAGMA foreign_keys = ON");
}
void ProxySQL_Admin::__insert_or_replace_maintable_select_disktable() {
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("INSERT OR REPLACE INTO main.mysql_servers SELECT * FROM disk.mysql_servers");
admindb->execute("INSERT OR REPLACE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO main.mysql_users SELECT * FROM disk.mysql_users");
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("INSERT OR REPLACE INTO main.mysql_servers SELECT * FROM disk.mysql_servers");
admindb->execute("INSERT OR REPLACE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO main.mysql_group_replication_hostgroups SELECT * FROM disk.mysql_group_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO main.mysql_users SELECT * FROM disk.mysql_users");
admindb->execute("INSERT OR REPLACE INTO main.mysql_query_rules SELECT * FROM disk.mysql_query_rules");
admindb->execute("INSERT OR REPLACE INTO main.global_variables SELECT * FROM disk.global_variables");
admindb->execute("INSERT OR REPLACE INTO main.scheduler SELECT * FROM disk.scheduler");
admindb->execute("INSERT OR REPLACE INTO main.proxysql_servers SELECT * FROM disk.proxysql_servers");
#ifdef DEBUG
admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels");
admindb->execute("INSERT OR IGNORE INTO main.debug_levels SELECT * FROM disk.debug_levels");
#endif /* DEBUG */
admindb->execute("PRAGMA foreign_keys = ON");
admindb->execute("PRAGMA foreign_keys = ON");
}
void ProxySQL_Admin::__delete_disktable() {
admindb->execute("DELETE FROM disk.mysql_servers");
admindb->execute("DELETE FROM disk.mysql_replication_hostgroups");
admindb->execute("DELETE FROM disk.mysql_users");
admindb->execute("DELETE FROM disk.mysql_servers");
admindb->execute("DELETE FROM disk.mysql_replication_hostgroups");
admindb->execute("DELETE FROM disk.mysql_users");
admindb->execute("DELETE FROM disk.mysql_query_rules");
admindb->execute("DELETE FROM disk.global_variables");
admindb->execute("DELETE FROM disk.scheduler");
admindb->execute("DELETE FROM disk.proxysql_servers");
#ifdef DEBUG
admindb->execute("DELETE FROM disk.debug_levels");
admindb->execute("DELETE FROM disk.debug_levels");
#endif /* DEBUG */
}
void ProxySQL_Admin::__insert_or_replace_disktable_select_maintable() {
admindb->execute("INSERT OR REPLACE INTO disk.mysql_servers SELECT * FROM main.mysql_servers");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_replication_hostgroups SELECT * FROM main.mysql_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_group_replication_hostgroups SELECT * FROM main.mysql_group_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_users SELECT * FROM main.mysql_users");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_servers SELECT * FROM main.mysql_servers");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_replication_hostgroups SELECT * FROM main.mysql_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_group_replication_hostgroups SELECT * FROM main.mysql_group_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_users SELECT * FROM main.mysql_users");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules");
admindb->execute("INSERT OR REPLACE INTO disk.global_variables SELECT * FROM main.global_variables");
admindb->execute("INSERT OR REPLACE INTO disk.scheduler SELECT * FROM main.scheduler");
admindb->execute("INSERT OR REPLACE INTO disk.proxysql_servers SELECT * FROM main.proxysql_servers");
#ifdef DEBUG
admindb->execute("INSERT OR REPLACE INTO disk.debug_levels SELECT * FROM main.debug_levels");
admindb->execute("INSERT OR REPLACE INTO disk.debug_levels SELECT * FROM main.debug_levels");
#endif /* DEBUG */
}
@ -4597,6 +4767,10 @@ void ProxySQL_Admin::init_mysql_servers() {
mysql_servers_wrunlock();
}
void ProxySQL_Admin::init_proxysql_servers() {
load_proxysql_servers_to_runtime();
}
void ProxySQL_Admin::init_mysql_query_rules() {
load_mysql_query_rules_to_runtime();
}
@ -4623,19 +4797,24 @@ void ProxySQL_Admin::__refresh_users() {
GloMyAuth->set_all_inactive(USERNAME_BACKEND);
GloMyAuth->set_all_inactive(USERNAME_FRONTEND);
add_admin_users();
uint64_t hashB, hashF;
if (calculate_checksum) {
__add_active_users(USERNAME_BACKEND, NULL, &hashB);
__add_active_users(USERNAME_FRONTEND, NULL, &hashF);
} else {
// uint64_t hashB, hashF;
// if (calculate_checksum) {
// __add_active_users(USERNAME_BACKEND, NULL, &hashB);
// __add_active_users(USERNAME_FRONTEND, NULL, &hashF);
// } else {
__add_active_users(USERNAME_BACKEND);
__add_active_users(USERNAME_FRONTEND);
}
// }
GloMyAuth->remove_inactives(USERNAME_BACKEND);
GloMyAuth->remove_inactives(USERNAME_FRONTEND);
uint64_t hash1 = 0;
if (calculate_checksum) {
}
set_variable((char *)"admin_credentials",(char *)"");
if (calculate_checksum) {
uint64_t hash1 = hashB + hashF; // overflow allowed
hash1 = GloMyAuth->get_runtime_checksum();
//uint64_t hash1 = hashB + hashF; // overflow allowed
uint32_t d32[2];
char buf[20];
memcpy(&d32, &hash1, sizeof(hash1));
@ -4693,6 +4872,7 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *
char *error=NULL;
int cols=0;
int affected_rows=0;
bool empty = true;
SpookyHash myhash;
if (hash1) {
myhash.Init(19,3);
@ -4731,6 +4911,7 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *
SQLite3_row *r=new SQLite3_row(cols);
r->add_fields(statement);
if (hash1) {
empty = false;
for (int i=0; i<cols;i++) {
if (r->fields[i]) {
myhash.Update(r->fields[i],r->sizes[i]);
@ -4800,6 +4981,9 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *
uint64_t h1, h2;
myhash.Final(&h1, &h2);
*hash1 = h1;
if (empty) {
*hash1 = 0;
}
}
#else
if (resultset) delete resultset;
@ -5366,7 +5550,7 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() {
int affected_rows=0;
if (GloQPro==NULL) return (char *)"Global Query Processor not started: command impossible to run";
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment FROM main.mysql_query_rules WHERE active=1";
char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment FROM main.mysql_query_rules WHERE active=1 ORDER BY rule_id";
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
@ -5886,7 +6070,11 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() {
int use_ssl=0;
int max_latency_ms=0;
std::string comment="";
if (server.lookupValue("address", address)==false) continue;
if (server.lookupValue("address", address)==false) {
if (server.lookupValue("hostname", address)==false) {
continue;
}
}
if (server.lookupValue("port", port)==false) continue;
if (server.lookupValue("hostgroup", hostgroup)==false) continue;
server.lookupValue("status", status);
@ -5945,6 +6133,47 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() {
return rows;
}
int ProxySQL_Admin::Read_ProxySQL_Servers_from_configfile() {
const Setting& root = GloVars.confFile->cfg->getRoot();
int i;
int rows=0;
admindb->execute("PRAGMA foreign_keys = OFF");
if (root.exists("proxysql_servers")==true) {
const Setting &mysql_servers = root["proxysql_servers"];
int count = mysql_servers.getLength();
//fprintf(stderr, "Found %d servers\n",count);
char *q=(char *)"INSERT OR REPLACE INTO proxysql_servers (hostname, port, weight, comment) VALUES (\"%s\", %d, %d, '%s')";
for (i=0; i< count; i++) {
const Setting &server = mysql_servers[i];
std::string address;
int port;
int weight=0;
std::string comment="";
if (server.lookupValue("address", address)==false) {
if (server.lookupValue("hostname", address)==false) {
continue;
}
}
if (server.lookupValue("port", port)==false) continue;
server.lookupValue("weight", weight);
server.lookupValue("comment", comment);
char *o1=strdup(comment.c_str());
char *o=escape_string_single_quotes(o1, false);
char *query=(char *)malloc(strlen(q)+strlen(address.c_str())+strlen(o)+128);
sprintf(query, q, address.c_str(), port, weight, o);
proxy_info("Cluster: Adding ProxySQL Servers %s:%d from config file\n", address.c_str(), port);
//fprintf(stderr, "%s\n", query);
admindb->execute(query);
if (o!=o1) free(o);
free(o1);
free(query);
rows++;
}
}
admindb->execute("PRAGMA foreign_keys = ON");
return rows;
}
extern "C" ProxySQL_Admin * create_ProxySQL_Admin_func() {
return new ProxySQL_Admin();
}
@ -6422,19 +6651,19 @@ unsigned long long ProxySQL_External_Scheduler::run_once() {
return next_run;
}
void ProxySQL_Admin::load_proxysql_servers_to_runtime() {
void ProxySQL_Admin::load_proxysql_servers_to_runtime(bool _lock) {
// make sure that the caller has called mysql_servers_wrlock()
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT hostname,port,weight,comment FROM proxysql_servers";
char *query=(char *)"SELECT hostname, port, weight, comment FROM proxysql_servers ORDER BY hostname, port";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
GloProxyCluster->load_servers_list(resultset);
GloProxyCluster->load_servers_list(resultset, _lock);
// if (checksum_variables.checksum_mysql_query_rules) {
pthread_mutex_lock(&GloVars.checksum_mutex);
uint64_t hash1 = resultset->raw_checksum();

@ -9,10 +9,25 @@
#endif /* DEBUG */
#define PROXYSQL_CLUSTER_VERSION "0.1.0702" DEB
#define SAFE_SQLITE3_STEP(_stmt) do {\
do {\
rc=sqlite3_step(_stmt);\
if (rc!=SQLITE_DONE) {\
assert(rc==SQLITE_LOCKED);\
usleep(10);\
}\
} while (rc!=SQLITE_DONE);\
} while (0)
static char *NODE_COMPUTE_DELIMITER=(char *)"-gtyw23a-"; // a random string used for hashing
extern ProxySQL_Cluster * GloProxyCluster;
extern ProxySQL_Admin *GloAdmin;
typedef struct _proxy_node_address_t {
pthread_t thrid;
uint64_t hash; // unused for now
@ -35,6 +50,7 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) {
mysql_thread_init();
pthread_detach(pthread_self());
proxy_info("Cluster: starting thread for peer %s:%d\n", node->hostname, node->port);
char *query1 = (char *)"SELECT GLOBAL_CHECKSUM()"; // in future this will be used for "light check"
char *query2 = (char *)"SELECT * FROM stats_mysql_global ORDER BY Variable_Name";
char *query3 = (char *)"SELECT * FROM runtime_checksums_values ORDER BY name";
@ -153,6 +169,7 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) {
mysql_close(conn);
}
} else {
proxy_warning("Cluster: unable to connect to peer %s:%d . Error: %s\n", node->hostname, node->port, mysql_error(conn));
mysql_close(conn);
conn = mysql_init(NULL);
int ci = __sync_fetch_and_add(&GloProxyCluster->cluster_check_interval_ms,0);
@ -167,6 +184,7 @@ __exit_monitor_thread:
if (conn->net.vio) {
mysql_close(conn);
}
proxy_info("Cluster: closing thread for peer %s:%d\n", node->hostname, node->port);
free(node->hostname);
free(node);
//pthread_exit(0);
@ -217,9 +235,11 @@ ProxySQL_Node_Entry::ProxySQL_Node_Entry(char *_hostname, uint16_t _port, uint64
for (int i = 0; i < PROXYSQL_NODE_METRICS_LEN ; i++) {
metrics[i] = new ProxySQL_Node_Metrics();
}
proxy_info("Created new Cluster Node Entry for host %s:%d\n",hostname,port);
}
ProxySQL_Node_Entry::~ProxySQL_Node_Entry() {
proxy_info("Destroyed Cluster Node Entry for host %s:%d\n",hostname,port);
if (hostname) {
free(hostname);
hostname = NULL;
@ -302,6 +322,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
strcpy(checksums_values.mysql_query_rules.checksum, row[3]);
checksums_values.mysql_query_rules.last_changed = now;
checksums_values.mysql_query_rules.diff_check = 1;
proxy_info("Cluster: detected a new checksum for mysql_query_rules from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.mysql_query_rules.version, checksums_values.mysql_query_rules.epoch, checksums_values.mysql_query_rules.checksum);
if (strcmp(checksums_values.mysql_query_rules.checksum, GloVars.checksums_values.mysql_query_rules.checksum) == 0) {
proxy_info("Cluster: checksum for mysql_query_rules from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.mysql_query_rules.checksum);
}
} else {
checksums_values.mysql_query_rules.diff_check++;
}
@ -318,6 +342,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
strcpy(checksums_values.mysql_servers.checksum, row[3]);
checksums_values.mysql_servers.last_changed = now;
checksums_values.mysql_servers.diff_check = 1;
proxy_info("Cluster: detected a new checksum for mysql_servers from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.mysql_servers.version, checksums_values.mysql_servers.epoch, checksums_values.mysql_servers.checksum);
if (strcmp(checksums_values.mysql_servers.checksum, GloVars.checksums_values.mysql_servers.checksum) == 0) {
proxy_info("Cluster: checksum for mysql_servers from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.mysql_servers.checksum);
}
} else {
checksums_values.mysql_servers.diff_check++;
}
@ -334,6 +362,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
strcpy(checksums_values.mysql_users.checksum, row[3]);
checksums_values.mysql_users.last_changed = now;
checksums_values.mysql_users.diff_check = 1;
proxy_info("Cluster: detected a new checksum for mysql_users from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.mysql_users.version, checksums_values.mysql_users.epoch, checksums_values.mysql_users.checksum);
if (strcmp(checksums_values.mysql_users.checksum, GloVars.checksums_values.mysql_users.checksum) == 0) {
proxy_info("Cluster: checksum for mysql_users from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.mysql_users.checksum);
}
} else {
checksums_values.mysql_users.diff_check++;
}
@ -366,6 +398,10 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
strcpy(checksums_values.proxysql_servers.checksum, row[3]);
checksums_values.proxysql_servers.last_changed = now;
checksums_values.proxysql_servers.diff_check = 1;
proxy_info("Cluster: detected a new checksum for proxysql_servers from peer %s:%d, version %llu, epoch %llu, checksum %s . Not syncing yet ...\n", hostname, port, checksums_values.proxysql_servers.version, checksums_values.proxysql_servers.epoch, checksums_values.proxysql_servers.checksum);
if (strcmp(checksums_values.proxysql_servers.checksum, GloVars.checksums_values.proxysql_servers.checksum) == 0) {
proxy_info("Cluster: checksum for proxysql_servers from peer %s:%d matches with local checksum %s , we won't sync.\n", hostname, port, GloVars.checksums_values.proxysql_servers.checksum);
}
} else {
checksums_values.proxysql_servers.diff_check++;
}
@ -383,14 +419,23 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
v->diff_check++;
v = &checksums_values.mysql_query_rules;
v->last_updated = now;
if (strcmp(v->checksum, GloVars.checksums_values.mysql_query_rules.checksum) == 0) {
v->diff_check = 0;
}
if (v->diff_check)
v->diff_check++;
v = &checksums_values.mysql_servers;
v->last_updated = now;
if (strcmp(v->checksum, GloVars.checksums_values.mysql_servers.checksum) == 0) {
v->diff_check = 0;
}
if (v->diff_check)
v->diff_check++;
v = &checksums_values.mysql_users;
v->last_updated = now;
if (strcmp(v->checksum, GloVars.checksums_values.mysql_users.checksum) == 0) {
v->diff_check = 0;
}
if (v->diff_check)
v->diff_check++;
v = &checksums_values.mysql_variables;
@ -399,10 +444,460 @@ void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
v->diff_check++;
v = &checksums_values.proxysql_servers;
v->last_updated = now;
if (strcmp(v->checksum, GloVars.checksums_values.proxysql_servers.checksum) == 0) {
v->diff_check = 0;
}
if (v->diff_check)
v->diff_check++;
}
pthread_mutex_unlock(&GloVars.checksum_mutex);
// we now do a series of checks, and we take action
// note that this is done outside the critical section
// as mutex on GloVars.checksum_mutex is already released
unsigned int diff_mqr = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_query_rules_diffs_before_sync,0);
unsigned int diff_ms = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_servers_diffs_before_sync,0);
unsigned int diff_mu = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_users_diffs_before_sync,0);
unsigned int diff_ps = (unsigned int)__sync_fetch_and_add(&GloProxyCluster->cluster_proxysql_servers_diffs_before_sync,0);
ProxySQL_Checksum_Value_2 *v = NULL;
if (diff_mqr) {
unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.mysql_query_rules.version,0);
unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.mysql_query_rules.epoch,0);
v = &checksums_values.mysql_query_rules;
if (v->version > 1) {
if (
(own_version == 1) // we just booted
||
(v->epoch > own_version) // epoch is newer
) {
if (v->diff_check > diff_mqr) {
proxy_info("Cluster: detected a peer %s:%d with mysql_query_rules version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_mysql_query_rules_from_peer();
}
}
} else {
if (v->diff_check && (v->diff_check % (diff_mqr*10)) == 0) {
proxy_warning("Cluster: detected a peer %s:%d with mysql_query_rules version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD MYSQL QUERY RULES TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mqr*10));
}
}
}
if (diff_ms) {
v = &checksums_values.mysql_servers;
unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.mysql_servers.version,0);
unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.mysql_servers.epoch,0);
if (v->version > 1) {
if (
(own_version == 1) // we just booted
||
(v->epoch > own_version) // epoch is newer
) {
if (v->diff_check > diff_ms) {
proxy_info("Cluster: detected a peer %s:%d with mysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_mysql_servers_from_peer();
}
}
} else {
if (v->diff_check && (v->diff_check % (diff_ms*10)) == 0) {
proxy_warning("Cluster: detected a peer %s:%d with mysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD MYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_ms*10));
}
}
}
if (diff_mu) {
v = &checksums_values.mysql_users;
unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.mysql_users.version,0);
unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.mysql_users.epoch,0);
if (v->version > 1) {
if (
(own_version == 1) // we just booted
||
(v->epoch > own_version) // epoch is newer
) {
if (v->diff_check > diff_mu) {
proxy_info("Cluster: detected a peer %s:%d with mysql_users version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_mysql_users_from_peer();
}
}
} else {
if (v->diff_check && (v->diff_check % (diff_mu*10)) == 0) {
proxy_warning("Cluster: detected a peer %s:%d with mysql_users version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD MYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_mu*10));
}
}
}
if (diff_ps) {
v = &checksums_values.proxysql_servers;
unsigned long long own_version = __sync_fetch_and_add(&GloVars.checksums_values.proxysql_servers.version,0);
unsigned long long own_epoch = __sync_fetch_and_add(&GloVars.checksums_values.proxysql_servers.epoch,0);
if (v->version > 1) {
if (
(own_version == 1) // we just booted
||
(v->epoch > own_version) // epoch is newer
) {
if (v->diff_check > diff_ps) {
proxy_info("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. Proceeding with remote sync\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch);
GloProxyCluster->pull_proxysql_servers_from_peer();
}
}
} else {
if (v->diff_check && (v->diff_check % (diff_ms*10)) == 0) {
proxy_warning("Cluster: detected a peer %s:%d with proxysql_servers version %llu, epoch %llu, diff_check %llu. Own version: %llu, epoch: %llu. diff_check is increasing, but version 1 doesn't allow sync. This message will be repeated every %llu checks until LOAD PROXYSQL SERVERS TO RUNTIME is executed on candidate master.\n", hostname, port, v->version, v->epoch, v->diff_check, own_version, own_epoch, (diff_ps*10));
}
}
}
}
void ProxySQL_Cluster::pull_mysql_query_rules_from_peer() {
char * hostname = NULL;
uint16_t port = 0;
pthread_mutex_lock(&GloProxyCluster->update_mysql_query_rules_mutex);
nodes.get_peer_to_sync_mysql_query_rules(&hostname, &port);
if (hostname) {
char *username = NULL;
char *password = NULL;
bool rc_bool = true;
MYSQL *rc_conn;
int rc_query;
int rc;
MYSQL *conn = mysql_init(NULL);
if (conn==NULL) {
proxy_error("Unable to run mysql_init()\n");
goto __exit_pull_mysql_query_rules_from_peer;
}
GloProxyCluster->get_credentials(&username, &password);
if (strlen(username)) { // do not monitor if the username is empty
unsigned int timeout = 1;
unsigned int timeout_long = 60;
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long);
mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d started\n", hostname, port);
rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0);
if (rc_conn) {
rc_query = mysql_query(conn,"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment FROM runtime_mysql_query_rules");
if ( rc_query == 0 ) {
MYSQL_RES *result = mysql_store_result(conn);
GloAdmin->admindb->execute("DELETE FROM mysql_query_rules");
MYSQL_ROW row;
char *q = (char *)"INSERT INTO mysql_query_rules (rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30)";
sqlite3_stmt *statement1 = NULL;
sqlite3 *mydb3 = GloAdmin->admindb->get_db();
rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0);
assert(rc==SQLITE_OK);
while ((row = mysql_fetch_row(result))) {
rc=sqlite3_bind_int64(statement1, 1, atoll(row[0])); assert(rc==SQLITE_OK); // rule_id
rc=sqlite3_bind_text(statement1, 2, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username
rc=sqlite3_bind_text(statement1, 3, row[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname
rc=sqlite3_bind_int64(statement1, 4, atoll(row[3])); assert(rc==SQLITE_OK); // flagIN
rc=sqlite3_bind_text(statement1, 5, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // client_addr
rc=sqlite3_bind_text(statement1, 6, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_addr
rc=sqlite3_bind_int64(statement1, 7, atoll(row[6])); assert(rc==SQLITE_OK); // proxy_port
rc=sqlite3_bind_text(statement1, 8, row[7], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // digest
rc=sqlite3_bind_text(statement1, 9, row[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_digest
rc=sqlite3_bind_text(statement1, 10, row[9], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_pattern
rc=sqlite3_bind_int64(statement1, 11, atoll(row[10])); assert(rc==SQLITE_OK); // negate_match_pattern
rc=sqlite3_bind_text(statement1, 12, row[11], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // re_modifiers
rc=sqlite3_bind_int64(statement1, 13, atoll(row[12])); assert(rc==SQLITE_OK); // flagOUT
rc=sqlite3_bind_text(statement1, 14, row[13], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // replace_pattern
rc=sqlite3_bind_int64(statement1, 15, atoll(row[14])); assert(rc==SQLITE_OK); // destination_hostgroup
rc=sqlite3_bind_int64(statement1, 16, atoll(row[15])); assert(rc==SQLITE_OK); // cache_ttl
rc=sqlite3_bind_int64(statement1, 17, atoll(row[16])); assert(rc==SQLITE_OK); // reconnect
rc=sqlite3_bind_int64(statement1, 18, atoll(row[17])); assert(rc==SQLITE_OK); // timeout
rc=sqlite3_bind_int64(statement1, 19, atoll(row[18])); assert(rc==SQLITE_OK); // retries
rc=sqlite3_bind_int64(statement1, 20, atoll(row[19])); assert(rc==SQLITE_OK); // delay
rc=sqlite3_bind_int64(statement1, 21, atoll(row[20])); assert(rc==SQLITE_OK); // next_query_flagIN
rc=sqlite3_bind_int64(statement1, 22, atoll(row[21])); assert(rc==SQLITE_OK); // mirror_flagOUT
rc=sqlite3_bind_int64(statement1, 23, atoll(row[22])); assert(rc==SQLITE_OK); // mirror_hostgroup
rc=sqlite3_bind_text(statement1, 24, row[23], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // error_msg
rc=sqlite3_bind_text(statement1, 25, row[24], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // OK_msg
rc=sqlite3_bind_int64(statement1, 26, atoll(row[25])); assert(rc==SQLITE_OK); // sticky_conn
rc=sqlite3_bind_int64(statement1, 27, atoll(row[26])); assert(rc==SQLITE_OK); // multiplex
rc=sqlite3_bind_int64(statement1, 28, atoll(row[27])); assert(rc==SQLITE_OK); // log
rc=sqlite3_bind_int64(statement1, 29, atoll(row[28])); assert(rc==SQLITE_OK); // apply
rc=sqlite3_bind_text(statement1, 30, row[29], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
mysql_free_result(result);
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->load_mysql_query_rules_to_runtime();
if (__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk,0) == true) {
proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_query_rules__from_memory_to_disk();
}
} else {
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
} else {
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
}
__exit_pull_mysql_query_rules_from_peer:
if (conn) {
if (conn->net.vio) {
mysql_close(conn);
}
}
free(hostname);
}
pthread_mutex_unlock(&GloProxyCluster->update_mysql_query_rules_mutex);
}
void ProxySQL_Cluster::pull_mysql_users_from_peer() {
char * hostname = NULL;
uint16_t port = 0;
pthread_mutex_lock(&GloProxyCluster->update_mysql_users_mutex);
nodes.get_peer_to_sync_mysql_users(&hostname, &port);
if (hostname) {
char *username = NULL;
char *password = NULL;
bool rc_bool = true;
MYSQL *rc_conn;
int rc_query;
int rc;
MYSQL *conn = mysql_init(NULL);
if (conn==NULL) {
proxy_error("Unable to run mysql_init()\n");
goto __exit_pull_mysql_users_from_peer;
}
GloProxyCluster->get_credentials(&username, &password);
if (strlen(username)) { // do not monitor if the username is empty
unsigned int timeout = 1;
unsigned int timeout_long = 60;
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long);
mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
proxy_info("Cluster: Fetching MySQL Users from peer %s:%d started\n", hostname, port);
rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0);
if (rc_conn) {
rc_query = mysql_query(conn, "SELECT username, password, active, use_ssl, default_hostgroup, default_schema, schema_locked, transaction_persistent, fast_forward, backend, frontend, max_connections FROM runtime_mysql_users");
if ( rc_query == 0 ) {
MYSQL_RES *result = mysql_store_result(conn);
GloAdmin->admindb->execute("DELETE FROM mysql_users");
MYSQL_ROW row;
char *q = (char *)"INSERT INTO mysql_users (username, password, active, use_ssl, default_hostgroup, default_schema, schema_locked, transaction_persistent, fast_forward, backend, frontend, max_connections) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)";
sqlite3_stmt *statement1 = NULL;
sqlite3 *mydb3 = GloAdmin->admindb->get_db();
rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0);
assert(rc==SQLITE_OK);
while ((row = mysql_fetch_row(result))) {
rc=sqlite3_bind_text(statement1, 1, row[0], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username
rc=sqlite3_bind_text(statement1, 2, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // password
rc=sqlite3_bind_int64(statement1, 3, atoll(row[2])); assert(rc==SQLITE_OK); // active
rc=sqlite3_bind_int64(statement1, 4, atoll(row[3])); assert(rc==SQLITE_OK); // use_ssl
rc=sqlite3_bind_int64(statement1, 5, atoll(row[4])); assert(rc==SQLITE_OK); // default_hostgroup
rc=sqlite3_bind_text(statement1, 6, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // default_schema
rc=sqlite3_bind_int64(statement1, 7, atoll(row[6])); assert(rc==SQLITE_OK); // schema_locked
rc=sqlite3_bind_int64(statement1, 8, atoll(row[7])); assert(rc==SQLITE_OK); // transaction_persistent
rc=sqlite3_bind_int64(statement1, 9, atoll(row[8])); assert(rc==SQLITE_OK); // fast_forward
rc=sqlite3_bind_int64(statement1, 10, atoll(row[9])); assert(rc==SQLITE_OK); // backend
rc=sqlite3_bind_int64(statement1, 11, atoll(row[10])); assert(rc==SQLITE_OK); // frontend
rc=sqlite3_bind_int64(statement1, 12, atoll(row[11])); assert(rc==SQLITE_OK); // max_connection
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
mysql_free_result(result);
proxy_info("Cluster: Fetching MySQL Users from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Loading to runtime MySQL Users from peer %s:%d\n", hostname, port);
GloAdmin->init_users();
if (__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_query_rules_save_to_disk,0) == true) {
proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_users__from_memory_to_disk();
}
} else {
proxy_info("Cluster: Fetching MySQL Users from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
} else {
proxy_info("Cluster: Fetching MySQL Users from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
}
__exit_pull_mysql_users_from_peer:
if (conn) {
if (conn->net.vio) {
mysql_close(conn);
}
}
free(hostname);
}
pthread_mutex_unlock(&GloProxyCluster->update_mysql_users_mutex);
}
void ProxySQL_Cluster::pull_mysql_servers_from_peer() {
char * hostname = NULL;
uint16_t port = 0;
pthread_mutex_lock(&GloProxyCluster->update_mysql_servers_mutex);
nodes.get_peer_to_sync_mysql_servers(&hostname, &port);
if (hostname) {
char *username = NULL;
char *password = NULL;
bool rc_bool = true;
MYSQL *rc_conn;
int rc_query;
MYSQL *conn = mysql_init(NULL);
if (conn==NULL) {
proxy_error("Unable to run mysql_init()\n");
goto __exit_pull_mysql_servers_from_peer;
}
GloProxyCluster->get_credentials(&username, &password);
if (strlen(username)) { // do not monitor if the username is empty
unsigned int timeout = 1;
unsigned int timeout_long = 60;
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long);
mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d started\n", hostname, port);
rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0);
if (rc_conn) {
rc_query = mysql_query(conn,"SELECT hostgroup_id, hostname, port, status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM runtime_mysql_servers");
if ( rc_query == 0 ) {
MYSQL_RES *result = mysql_store_result(conn);
GloAdmin->admindb->execute("DELETE FROM mysql_servers");
MYSQL_ROW row;
char *q=(char *)"INSERT INTO mysql_servers (hostgroup_id, hostname, port, status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (%s, \"%s\", %s, \"%s\", %s, %s, %s, %s, %s, %s, '%s')";
while ((row = mysql_fetch_row(result))) {
int i;
int l=0;
for (i=0; i<10; i++) {
l+=strlen(row[i]);
}
char *o=escape_string_single_quotes(row[10],false);
char *query = (char *)malloc(strlen(q)+i+strlen(o)+64);
sprintf(query,q,row[0],row[1],row[2],row[3],row[4],row[5],row[6],row[7],row[8],row[9],o);
if (o!=row[10]) { // there was a copy
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
mysql_free_result(result);
rc_query = mysql_query(conn,"SELECT writer_hostgroup, reader_hostgroup, comment FROM mysql_replication_hostgroups");
if ( rc_query == 0 ) {
MYSQL_RES *result = mysql_store_result(conn);
GloAdmin->admindb->execute("DELETE FROM mysql_replication_hostgroups");
MYSQL_ROW row;
char *q=(char *)"INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, comment) VALUES (%s, %s, '%s')";
while ((row = mysql_fetch_row(result))) {
int i;
int l=0;
for (i=0; i<2; i++) {
l+=strlen(row[i]);
}
char *o=escape_string_single_quotes(row[2],false);
char *query = (char *)malloc(strlen(q)+i+strlen(o)+64);
sprintf(query,q,row[0],row[1],o);
if (o!=row[2]) { // there was a copy
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
mysql_free_result(result);
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->load_mysql_servers_to_runtime();
if (__sync_fetch_and_add(&GloProxyCluster->cluster_mysql_servers_save_to_disk,0) == true) {
proxy_info("Cluster: Saving to disk MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_servers__from_memory_to_disk();
}
} else {
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
} else {
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
} else {
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
}
__exit_pull_mysql_servers_from_peer:
if (conn) {
if (conn->net.vio) {
mysql_close(conn);
}
}
free(hostname);
}
pthread_mutex_unlock(&GloProxyCluster->update_mysql_servers_mutex);
}
void ProxySQL_Cluster::pull_proxysql_servers_from_peer() {
char * hostname = NULL;
uint16_t port = 0;
pthread_mutex_lock(&GloProxyCluster->update_proxysql_servers_mutex);
nodes.get_peer_to_sync_proxysql_servers(&hostname, &port);
if (hostname) {
char *username = NULL;
char *password = NULL;
bool rc_bool = true;
MYSQL *rc_conn;
int rc_query;
MYSQL *conn = mysql_init(NULL);
if (conn==NULL) {
proxy_error("Unable to run mysql_init()\n");
goto __exit_pull_proxysql_servers_from_peer;
}
GloProxyCluster->get_credentials(&username, &password);
if (strlen(username)) { // do not monitor if the username is empty
unsigned int timeout = 1;
unsigned int timeout_long = 60;
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long);
mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d started\n", hostname, port);
rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0);
if (rc_conn) {
rc_query = mysql_query(conn,"SELECT hostname, port, weight, comment FROM runtime_proxysql_servers");
if ( rc_query == 0 ) {
MYSQL_RES *result = mysql_store_result(conn);
GloAdmin->admindb->execute("DELETE FROM proxysql_servers");
MYSQL_ROW row;
char *q=(char *)"INSERT INTO proxysql_servers (hostname, port, weight, comment) VALUES (\"%s\", %s, %s, '%s')";
while ((row = mysql_fetch_row(result))) {
int i;
int l=0;
for (i=0; i<3; i++) {
l+=strlen(row[i]);
}
char *o=escape_string_single_quotes(row[3],false);
char *query = (char *)malloc(strlen(q)+i+strlen(o)+64);
sprintf(query,q,row[0],row[1],row[2],o);
if (o!=row[3]) { // there was a copy
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
mysql_free_result(result);
proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Loading to runtime ProxySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->load_proxysql_servers_to_runtime(false);
if (__sync_fetch_and_add(&GloProxyCluster->cluster_proxysql_servers_save_to_disk,0) == true) {
proxy_info("Cluster: Saving to disk ProxySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->flush_proxysql_servers__from_memory_to_disk();
}
} else {
proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
} else {
proxy_info("Cluster: Fetching ProxySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
}
__exit_pull_proxysql_servers_from_peer:
if (conn) {
if (conn->net.vio) {
mysql_close(conn);
}
}
free(hostname);
}
pthread_mutex_unlock(&GloProxyCluster->update_proxysql_servers_mutex);
}
void ProxySQL_Node_Entry::set_metrics(MYSQL_RES *_r, unsigned long long _response_time) {
@ -487,8 +982,9 @@ uint64_t ProxySQL_Cluster_Nodes::generate_hash(char *_hostname, uint16_t _port)
return hash_;
}
void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset) {
pthread_mutex_lock(&mutex);
void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset, bool _lock) {
if (_lock)
pthread_mutex_lock(&mutex);
set_all_inactive();
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
@ -521,7 +1017,8 @@ void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset) {
}
}
remove_inactives();
pthread_mutex_unlock(&mutex);
if (_lock)
pthread_mutex_unlock(&mutex);
}
// if it returns false , the node doesn't exist anymore and the monitor should stop
@ -578,6 +1075,130 @@ bool ProxySQL_Cluster_Nodes::Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_R
return ret;
}
void ProxySQL_Cluster_Nodes::get_peer_to_sync_mysql_query_rules(char **host, uint16_t *port) {
unsigned long long version = 0;
unsigned long long epoch = 0;
char *hostname = NULL;
uint16_t p = 0;
// pthread_mutex_lock(&mutex);
//unsigned long long curtime = monotonic_time();
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
ProxySQL_Checksum_Value_2 * v = &node->checksums_values.mysql_query_rules;
if (v->version > 1) {
if ( v->epoch > epoch && v->diff_check > 3) {
epoch = v->epoch;
version = v->version;
if (hostname) {
free(hostname);
}
hostname=strdup(node->get_hostname());
p = node->get_port();
}
}
it++;
}
// pthread_mutex_unlock(&mutex);
if (hostname) {
*host = hostname;
*port = p;
}
proxy_info("Cluster: detected peer %s:%d with mysql_query_rules version %llu, epoch %llu\n", hostname, p, version, epoch);
}
void ProxySQL_Cluster_Nodes::get_peer_to_sync_mysql_servers(char **host, uint16_t *port) {
unsigned long long version = 0;
unsigned long long epoch = 0;
char *hostname = NULL;
uint16_t p = 0;
// pthread_mutex_lock(&mutex);
//unsigned long long curtime = monotonic_time();
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
ProxySQL_Checksum_Value_2 * v = &node->checksums_values.mysql_servers;
if (v->version > 1) {
if ( v->epoch > epoch && v->diff_check > 3) {
epoch = v->epoch;
version = v->version;
if (hostname) {
free(hostname);
}
hostname=strdup(node->get_hostname());
p = node->get_port();
}
}
it++;
}
// pthread_mutex_unlock(&mutex);
if (hostname) {
*host = hostname;
*port = p;
}
proxy_info("Cluster: detected peer %s:%d with mysql_servers version %llu, epoch %llu\n", hostname, p, version, epoch);
}
void ProxySQL_Cluster_Nodes::get_peer_to_sync_mysql_users(char **host, uint16_t *port) {
unsigned long long version = 0;
unsigned long long epoch = 0;
char *hostname = NULL;
uint16_t p = 0;
// pthread_mutex_lock(&mutex);
//unsigned long long curtime = monotonic_time();
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
ProxySQL_Checksum_Value_2 * v = &node->checksums_values.mysql_users;
if (v->version > 1) {
if ( v->epoch > epoch && v->diff_check > 3) {
epoch = v->epoch;
version = v->version;
if (hostname) {
free(hostname);
}
hostname=strdup(node->get_hostname());
p = node->get_port();
}
}
it++;
}
// pthread_mutex_unlock(&mutex);
if (hostname) {
*host = hostname;
*port = p;
}
proxy_info("Cluster: detected peer %s:%d with mysql_users version %llu, epoch %llu\n", hostname, p, version, epoch);
}
void ProxySQL_Cluster_Nodes::get_peer_to_sync_proxysql_servers(char **host, uint16_t *port) {
unsigned long long version = 0;
unsigned long long epoch = 0;
char *hostname = NULL;
uint16_t p = 0;
// pthread_mutex_lock(&mutex);
//unsigned long long curtime = monotonic_time();
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
ProxySQL_Checksum_Value_2 * v = &node->checksums_values.proxysql_servers;
if (v->version > 1) {
if ( v->epoch > epoch && v->diff_check > 3) {
epoch = v->epoch;
version = v->version;
if (hostname) {
free(hostname);
}
hostname=strdup(node->get_hostname());
p = node->get_port();
}
}
it++;
}
// pthread_mutex_unlock(&mutex);
if (hostname) {
*host = hostname;
*port = p;
}
proxy_info("Cluster: detected peer %s:%d with proxysql_servers version %llu, epoch %llu\n", hostname, p, version, epoch);
}
SQLite3_result * ProxySQL_Cluster_Nodes::stats_proxysql_servers_checksums() {
const int colnum=9;
SQLite3_result *result=new SQLite3_result(colnum);
@ -746,10 +1367,22 @@ SQLite3_result * ProxySQL_Cluster_Nodes::dump_table_proxysql_servers() {
ProxySQL_Cluster::ProxySQL_Cluster() {
pthread_mutex_init(&mutex,NULL);
pthread_mutex_init(&update_mysql_query_rules_mutex,NULL);
pthread_mutex_init(&update_mysql_servers_mutex,NULL);
pthread_mutex_init(&update_mysql_users_mutex,NULL);
pthread_mutex_init(&update_proxysql_servers_mutex,NULL);
cluster_username = strdup((char *)"");
cluster_password = strdup((char *)"");
cluster_check_interval_ms = 1000;
cluster_check_status_frequency = 10;
cluster_mysql_query_rules_diffs_before_sync = 3;
cluster_mysql_servers_diffs_before_sync = 3;
cluster_mysql_users_diffs_before_sync = 3;
cluster_proxysql_servers_diffs_before_sync = 3;
cluster_mysql_query_rules_save_to_disk = true;
cluster_mysql_servers_save_to_disk = true;
cluster_mysql_users_save_to_disk = true;
cluster_proxysql_servers_save_to_disk = true;
}
ProxySQL_Cluster::~ProxySQL_Cluster() {

@ -569,6 +569,7 @@ void ProxySQL_Main_init_phase3___start_all() {
{
cpu_timer t;
GloAdmin->init_mysql_servers();
GloAdmin->init_proxysql_servers();
GloAdmin->load_scheduler_to_runtime();
#ifdef DEBUG
std::cerr << "Main phase3 : GloAdmin initialized in ";

Loading…
Cancel
Save