diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 9a5db2b9c..3d8b8d0a9 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -165,7 +165,7 @@ class MySQL_HostGroups_Manager { void add(MySrvC *, unsigned int); void purge_mysql_servers_table(); - void generate_mysql_servers_table(); + void generate_mysql_servers_table(int *_onlyhg=NULL); void generate_mysql_replication_hostgroups_table(); SQLite3_result *incoming_replication_hostgroups; void generate_mysql_group_replication_hostgroups_table(); @@ -238,6 +238,10 @@ class MySQL_HostGroups_Manager { void shun_and_killall(char *hostname, int port); void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us); unsigned long long Get_Memory_Stats(); + + void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); + void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error); + void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup); }; #endif /* __CLASS_MYSQL_HOSTGROUPS_MANAGER_H */ diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 4fc514a0c..984707e2b 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -18,6 +18,7 @@ #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" +#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG "CREATE TABLE mysql_server_group_replication_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , viable_candidate VARCHAR NOT NULL DEFAULT 'NO' , read_only VARCHAR NOT NULL DEFAULT 'YES' , transactions_behind INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" /* struct cmp_str { @@ -28,11 +29,12 @@ struct cmp_str { }; */ -#define MyGR_Nentries 10 +#define MyGR_Nentries 50 typedef struct _MyGR_status_entry_t { // char *address; // int port; + unsigned long long start_time; unsigned long long check_time; long long transactions_behind; bool primary_partition; @@ -51,7 +53,7 @@ class MyGR_monitor_node { MyGR_status_entry_t last_entries[MyGR_Nentries]; MyGR_monitor_node(char *_a, int _p, int _whg); ~MyGR_monitor_node(); - bool add_entry(unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error); // return true if status changed + bool add_entry(unsigned long long _st, unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error); // return true if status changed }; @@ -74,6 +76,8 @@ class MySQL_Monitor_State_Data { char *hostname; int port; int writer_hostgroup; // used only by group replication + bool writer_is_also_reader; // used only by group replication + int max_transactions_behind; // used only by group replication bool use_ssl; MYSQL *mysql; MYSQL_RES *result; @@ -130,6 +134,7 @@ class MySQL_Monitor { void * monitor_group_replication(); void * monitor_replication_lag(); void * run(); + void populate_monitor_mysql_server_group_replication_log(); }; #endif /* __CLASS_MYSQL_MONITOR_H */ diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 1aa1392d1..2d16972f0 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -352,6 +352,7 @@ void MySQL_HostGroups_Manager::wrunlock() { #endif } + unsigned int MySQL_HostGroups_Manager::get_servers_table_version() { return __sync_fetch_and_add(&status.servers_table_version,0); } @@ -623,10 +624,11 @@ bool MySQL_HostGroups_Manager::commit() { // group replication - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n"); - mydb->execute("DELETE FROM mysql_group_replication_hostgroups"); - generate_mysql_group_replication_hostgroups_table(); - + if (incoming_group_replication_hostgroups) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n"); + mydb->execute("DELETE FROM mysql_group_replication_hostgroups"); + generate_mysql_group_replication_hostgroups_table(); + } __sync_fetch_and_add(&status.servers_table_version,1); wrunlock(); if (GloMTH) { @@ -653,7 +655,7 @@ void MySQL_HostGroups_Manager::purge_mysql_servers_table() { } } -void MySQL_HostGroups_Manager::generate_mysql_servers_table() { +void MySQL_HostGroups_Manager::generate_mysql_servers_table(int *_onlyhg) { int rc; sqlite3_stmt *statement1=NULL; sqlite3_stmt *statement32=NULL; @@ -667,8 +669,16 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() { rc=sqlite3_prepare_v2(mydb3, query32, -1, &statement32, 0); assert(rc==SQLITE_OK); + proxy_info("Dumping current MySQL Servers structures\n"); for (unsigned int i=0; ilen; i++) { MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); + if (_onlyhg) { + int hidonly=*_onlyhg; + if (myhgc->hid!=(unsigned int)hidonly) { + // skipping this HG + continue; + } + } MySrvC *mysrvc=NULL; for (unsigned int j=0; jmysrvs->servers->len; j++) { mysrvc=myhgc->mysrvs->idx(j); @@ -847,7 +857,7 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port"; + char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl , writer_is_also_reader , max_transactions_behind FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port"; mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (resultset) { if (GloMyMon->Group_Replication_Hosts_resultset) { @@ -1694,3 +1704,230 @@ bool Group_Replication_Info::update(int b, int r, int o, int mw, int mtb, bool _ } return ret; } + +void MySQL_HostGroups_Manager::update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *_error) { + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + char *query=NULL; + char *q=NULL; + char *error=NULL; + q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; + query=(char *)malloc(strlen(q)+strlen(_hostname)+32); + sprintf(query,q,_hostname,_port); + mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (error) { + free(error); + error=NULL; + } + free(query); + if (resultset) { // we lock only if needed + if (resultset->rows_count) { + proxy_warning("Group Replication: setting host %s:%d offline because: %s\n", _hostname, _port, _error); + GloAdmin->mysql_servers_wrlock(); + mydb->execute("DELETE FROM mysql_servers_incoming"); + mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); + q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + commit(); + wrlock(); + SQLite3_result *resultset2=NULL; + q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_port,_writer_hostgroup); + mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); + if (resultset2) { + if (resultset2->rows_count) { + for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { + SQLite3_row *r=*it; + int writer_hostgroup=atoi(r->fields[0]); + int backup_writer_hostgroup=atoi(r->fields[1]); + int reader_hostgroup=atoi(r->fields[2]); + int offline_hostgroup=atoi(r->fields[3]); + q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; + sprintf(query,q,_port,_writer_hostgroup); + mydb->execute(query); + generate_mysql_servers_table(&writer_hostgroup); + generate_mysql_servers_table(&backup_writer_hostgroup); + generate_mysql_servers_table(&reader_hostgroup); + generate_mysql_servers_table(&offline_hostgroup); + } + } + delete resultset2; + resultset2=NULL; + } + wrunlock(); + GloAdmin->mysql_servers_wrunlock(); + free(query); + } + } + if (resultset) { + delete resultset; + resultset=NULL; + } +} + +void MySQL_HostGroups_Manager::update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *_error) { + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + char *query=NULL; + char *q=NULL; + char *error=NULL; + q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; + query=(char *)malloc(strlen(q)+strlen(_hostname)+32); + sprintf(query,q,_hostname,_port); + mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); + if (error) { + free(error); + error=NULL; + } + free(query); + if (resultset) { // we lock only if needed + if (resultset->rows_count) { + proxy_warning("Group Replication: setting host %s:%d read_only because: %s\n", _hostname, _port, _error); + GloAdmin->mysql_servers_wrlock(); + mydb->execute("DELETE FROM mysql_servers_incoming"); + mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); + q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + commit(); + wrlock(); + SQLite3_result *resultset2=NULL; + q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_port,_writer_hostgroup); + mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); + if (resultset2) { + if (resultset2->rows_count) { + for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { + SQLite3_row *r=*it; + int writer_hostgroup=atoi(r->fields[0]); + int backup_writer_hostgroup=atoi(r->fields[1]); + int reader_hostgroup=atoi(r->fields[2]); + int offline_hostgroup=atoi(r->fields[3]); + q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; + sprintf(query,q,_port,_writer_hostgroup); + mydb->execute(query); + generate_mysql_servers_table(&writer_hostgroup); + generate_mysql_servers_table(&backup_writer_hostgroup); + generate_mysql_servers_table(&reader_hostgroup); + generate_mysql_servers_table(&offline_hostgroup); + } + } + delete resultset2; + resultset2=NULL; + } + wrunlock(); + GloAdmin->mysql_servers_wrunlock(); + free(query); + } + } + if (resultset) { + delete resultset; + resultset=NULL; + } +} + +void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup) { + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + char *query=NULL; + char *q=NULL; + char *error=NULL; + q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; + query=(char *)malloc(strlen(q)+strlen(_hostname)+32); + sprintf(query,q,_hostname,_port); + mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); + if (error) { + free(error); + error=NULL; + } + free(query); + if (resultset) { // we lock only if needed + if (resultset->rows_count) { + proxy_warning("Group Replication: setting host %s:%d as writer\n", _hostname, _port); + GloAdmin->mysql_servers_wrlock(); + mydb->execute("DELETE FROM mysql_servers_incoming"); + mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); + q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d"; + query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_hostname,_port,_writer_hostgroup); + mydb->execute(query); + //free(query); + commit(); + wrlock(); + SQLite3_result *resultset2=NULL; + q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"; + //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); + sprintf(query,q,_port,_writer_hostgroup); + mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); + if (resultset2) { + if (resultset2->rows_count) { + for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { + SQLite3_row *r=*it; + int writer_hostgroup=atoi(r->fields[0]); + int backup_writer_hostgroup=atoi(r->fields[1]); + int reader_hostgroup=atoi(r->fields[2]); + int offline_hostgroup=atoi(r->fields[3]); + q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; + sprintf(query,q,_port,_writer_hostgroup); + mydb->execute(query); + generate_mysql_servers_table(&writer_hostgroup); + generate_mysql_servers_table(&backup_writer_hostgroup); + generate_mysql_servers_table(&reader_hostgroup); + generate_mysql_servers_table(&offline_hostgroup); + } + } + delete resultset2; + resultset2=NULL; + } + wrunlock(); + GloAdmin->mysql_servers_wrunlock(); + free(query); + } + } + if (resultset) { + delete resultset; + resultset=NULL; + } +} + + diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 8735220ff..c54e9c00b 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -274,12 +274,14 @@ MySQL_Monitor::MySQL_Monitor() { insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_read_only_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_replication_lag_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG); + insert_into_tables_defs(tables_defs_monitor,"mysql_server_group_replication_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG); // create monitoring tables check_and_build_standard_tables(monitordb, tables_defs_monitor); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON mysql_server_connect_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON mysql_server_ping_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_read_only_log_time_start ON mysql_server_read_only_log (time_start_us)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_replication_lag_log_time_start ON mysql_server_replication_lag_log (time_start_us)"); + monitordb->execute("CREATE INDEX IF NOT EXISTS idx_group_replication_log_time_start ON mysql_server_group_replication_log (time_start_us)"); num_threads=8; if (GloMTH) { @@ -791,11 +793,16 @@ __exit_monitor_group_replication_thread: read_only=false; } transactions_behind=atol(row[2]); + mysql_free_result(mmsd->result); + mmsd->result=NULL; } __end_process_group_replication_result: - proxy_info("GR: %s:%d , viable=%s , ro=%s, trx=%ld, err=%s\n", mmsd->hostname, mmsd->port, (viable_candidate ? "YES": "NO") , (read_only ? "YES": "NO") , transactions_behind, ( mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "") ); - + //proxy_info("GR: %s:%d , viable=%s , ro=%s, trx=%ld, err=%s\n", mmsd->hostname, mmsd->port, (viable_candidate ? "YES": "NO") , (read_only ? "YES": "NO") , transactions_behind, ( mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "") ); + if (mmsd->mysql_error_msg) { + //proxy_warning("GR: %s:%d , viable=%s , ro=%s, trx=%ld, err=%s\n", mmsd->hostname, mmsd->port, (viable_candidate ? "YES": "NO") , (read_only ? "YES": "NO") , transactions_behind, ( mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "") ); + } unsigned long long time_now=realtime_time(); + time_now=time_now-(mmsd->t2 - start_time); pthread_mutex_lock(&GloMyMon->group_replication_mutex); //auto it = // TODO : complete this @@ -804,13 +811,36 @@ __end_process_group_replication_result: MyGR_monitor_node *node=NULL; if (it2!=GloMyMon->Group_Replication_Hosts_Map.end()) { node=it2->second; - node->add_entry(time_now,transactions_behind,viable_candidate,read_only,NULL); + node->add_entry(time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1) , transactions_behind,viable_candidate,read_only,mmsd->mysql_error_msg); } else { node = new MyGR_monitor_node(mmsd->hostname,mmsd->port,mmsd->writer_hostgroup); - node->add_entry(time_now,transactions_behind,viable_candidate,read_only,NULL); + node->add_entry(time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1) , transactions_behind,viable_candidate,read_only,mmsd->mysql_error_msg); GloMyMon->Group_Replication_Hosts_Map.insert(std::make_pair(s,node)); } pthread_mutex_unlock(&GloMyMon->group_replication_mutex); + + // NOTE: we update MyHGM outside the mutex group_replication_mutex + if (mmsd->mysql_error_msg) { // there was an error checking the status of the server, surely we need to reconfigure GR + MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg); + } else { + if (viable_candidate==false) { + MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"viable_candidate=NO"); + } else { + if (read_only==true) { + if (transactions_behind > mmsd->max_transactions_behind) { + MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging"); + } else { + MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); + } + } else { + // the node is a writer + // TODO: for now we don't care about the number of writers + MyHGM->update_group_replication_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); + } + } + } + + // clean up if (l<110) { } else { free(s); @@ -1554,7 +1584,9 @@ void * MySQL_Monitor::monitor_group_replication() { for (std::vector::iterator it = Group_Replication_Hosts_resultset->rows.begin() ; it != Group_Replication_Hosts_resultset->rows.end(); ++it) { SQLite3_row *r=*it; MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[1],atoi(r->fields[2]), NULL, atoi(r->fields[3])); - mmsd->writer_hostgroup=atoi(r->fields[1]); + mmsd->writer_hostgroup=atoi(r->fields[0]); + mmsd->writer_is_also_reader=atoi(r->fields[4]); + mmsd->max_transactions_behind=atoi(r->fields[5]); mmsd->mondb=monitordb; //pthread_t thr_; //if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) { @@ -1846,6 +1878,7 @@ MyGR_monitor_node::MyGR_monitor_node(char *_a, int _p, int _whg) { int i; for (i=0;i=MyGR_Nentries) { idx_last_entry=0; } + last_entries[idx_last_entry].start_time=_st; last_entries[idx_last_entry].check_time=_ct; last_entries[idx_last_entry].transactions_behind=_tb; last_entries[idx_last_entry].primary_partition=_pp; @@ -1894,3 +1928,42 @@ bool MyGR_monitor_node::add_entry(unsigned long long _ct, long long _tb, bool _p } return ret; } + +void MySQL_Monitor::populate_monitor_mysql_server_group_replication_log() { + sqlite3 *mondb=monitordb->get_db(); + int rc; + //char *query=NULL; + char *query1=NULL; + query1=(char *)"INSERT INTO mysql_server_group_replication_log VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"; + sqlite3_stmt *statement1=NULL; + pthread_mutex_lock(&GloMyMon->group_replication_mutex); + rc=sqlite3_prepare_v2(mondb, query1, -1, &statement1, 0); + assert(rc==SQLITE_OK); + monitordb->execute((char *)"DELETE FROM mysql_server_group_replication_log"); + std::map::iterator it2; + MyGR_monitor_node *node=NULL; + for (it2=GloMyMon->Group_Replication_Hosts_Map.begin(); it2!=GloMyMon->Group_Replication_Hosts_Map.end(); ++it2) { + std::string s=it2->first; + node=it2->second; + std::size_t found=s.find_last_of(":"); + std::string host=s.substr(0,found); + std::string port=s.substr(found+1); + int i; + for (i=0; ilast_entries[i].start_time) { + rc=sqlite3_bind_text(statement1, 1, host.c_str(), -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement1, 2, atoi(port.c_str())); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement1, 3, node->last_entries[i].start_time ); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement1, 4, node->last_entries[i].check_time ); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement1, 5, ( node->last_entries[i].primary_partition ? (char *)"YES" : (char *)"NO" ) , -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement1, 6, ( node->last_entries[i].read_only ? (char *)"YES" : (char *)"NO" ) , -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement1, 7, node->last_entries[i].transactions_behind ); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement1, 8, node->last_entries[i].error , -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement1); + rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK); + } + } + } + pthread_mutex_unlock(&GloMyMon->group_replication_mutex); +} diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 357ca4fcc..75a790fa8 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -97,6 +97,8 @@ extern Query_Processor *GloQPro; extern MySQL_Threads_Handler *GloMTH; extern MySQL_Logger *GloMyLogger; extern MySQL_STMT_Manager *GloMyStmt; +extern MySQL_Monitor *GloMyMon; + #define PANIC(msg) { perror(msg); exit(EXIT_FAILURE); } int rc, arg_on=1, arg_off=0; @@ -1222,6 +1224,8 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign bool runtime_mysql_servers=false; bool runtime_mysql_query_rules=false; + bool monitor_mysql_server_group_replication_log=false; + if (strcasestr(query_no_space,"processlist")) // This will match the following usecases: // SHOW PROCESSLIST @@ -1264,6 +1268,9 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign } } } + if (strstr(query_no_space,"mysql_server_group_replication_log")) { + monitor_mysql_server_group_replication_log=true; refresh=true; + } // if (stats_mysql_processlist || stats_mysql_connection_pool || stats_mysql_query_digest || stats_mysql_query_digest_reset) { if (refresh==true) { pthread_mutex_lock(&admin_mutex); @@ -1303,6 +1310,11 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign save_scheduler_runtime_to_database(true); } } + if (monitor_mysql_server_group_replication_log) { + if (GloMyMon) { + GloMyMon->populate_monitor_mysql_server_group_replication_log(); + } + } pthread_mutex_unlock(&admin_mutex); } }