diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index b74d3a6a5..a2eac2e90 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -439,6 +439,8 @@ class MySQL_HostGroups_Manager { std::unordered_map gtid_map; struct ev_async * gtid_ev_async; struct ev_loop * gtid_ev_loop; + struct ev_timer * gtid_ev_timer; + bool gtid_missing_nodes; struct { unsigned int servers_table_version; pthread_mutex_t servers_table_version_lock; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 63cc31d86..0e9503de5 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -47,12 +47,31 @@ static pthread_mutex_t ev_loop_mutex; //static std::unordered_map gtid_map; static void gtid_async_cb(struct ev_loop *loop, struct ev_async *watcher, int revents) { + if (glovars.shutdown) { + ev_break(loop); + } pthread_mutex_lock(&ev_loop_mutex); + MyHGM->gtid_missing_nodes = false; MyHGM->generate_mysql_gtid_executed_tables(); pthread_mutex_unlock(&ev_loop_mutex); return; } +static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int revents) { + ev_timer_stop(loop, timer); + ev_timer_set(timer, 3, 0); + if (glovars.shutdown) { + ev_break(loop); + } + if (MyHGM->gtid_missing_nodes) { + pthread_mutex_lock(&ev_loop_mutex); + MyHGM->gtid_missing_nodes = false; + MyHGM->generate_mysql_gtid_executed_tables(); + pthread_mutex_unlock(&ev_loop_mutex); + } + ev_timer_start(loop, timer); + return; +} static int wait_for_mysql(MYSQL *mysql, int status) { struct pollfd pfd; @@ -88,11 +107,15 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { //delete sd; std::string s1 = sd->address; s1.append(":"); - s1.append(std::to_string(sd->port)); + s1.append(std::to_string(sd->mysql_port)); + MyHGM->gtid_missing_nodes = true; + proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); std::unordered_map ::iterator it2; it2 = MyHGM->gtid_map.find(s1); if (it2 != MyHGM->gtid_map.end()) { - MyHGM->gtid_map.erase(it2); + //MyHGM->gtid_map.erase(it2); + it2->second = NULL; + delete sd; } ev_io_stop(MyHGM->gtid_ev_loop, w); free(w); @@ -115,8 +138,21 @@ void connect_cb(EV_P_ ev_io *w, int revents) { int errnum = optval ? optval : errno; ev_io_stop(MyHGM->gtid_ev_loop, w); close(w->fd); + MyHGM->gtid_missing_nodes = true; GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data; - delete custom_data; + GTID_Server_Data *sd = custom_data; + std::string s1 = sd->address; + s1.append(":"); + s1.append(std::to_string(sd->mysql_port)); + proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); + std::unordered_map ::iterator it2; + it2 = MyHGM->gtid_map.find(s1); + if (it2 != MyHGM->gtid_map.end()) { + //MyHGM->gtid_map.erase(it2); + it2->second = NULL; + delete sd; + } + //delete custom_data; free(c); } else { ev_io_stop(MyHGM->gtid_ev_loop, w); @@ -454,8 +490,11 @@ static void * GTID_syncer_run() { MyHGM->gtid_ev_async = (struct ev_async *)malloc(sizeof(struct ev_async)); //ev_async_init(gtid_ev_async, gtid_async_cb); //ev_async_start(gtid_ev_loop, gtid_ev_async); + MyHGM->gtid_ev_timer = (struct ev_timer *)malloc(sizeof(struct ev_timer)); ev_async_init(MyHGM->gtid_ev_async, gtid_async_cb); ev_async_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_async); + ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, 3, 0); + ev_timer_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_timer); //ev_ref(gtid_ev_loop); ev_run(MyHGM->gtid_ev_loop, 0); //sleep(1000); @@ -794,6 +833,7 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() { incoming_replication_hostgroups=NULL; incoming_group_replication_hostgroups=NULL; pthread_rwlock_init(>id_rwlock, NULL); + gtid_missing_nodes = false; gtid_ev_async = (struct ev_async *)malloc(sizeof(struct ev_async)); } void MySQL_HostGroups_Manager::init() { @@ -813,6 +853,7 @@ MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() { HGCU_thread->join(); //pthread_join(HGCU_thread_id, NULL); //pthread_join(GTID_syncer_thread_id, NULL); + ev_async_send(gtid_ev_loop, gtid_ev_async); GTID_syncer_thread->join(); free(gtid_ev_async); while (MyHostGroups->len) { @@ -1337,10 +1378,12 @@ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uin GTID_Server_Data *gtid_is=NULL; if (it2!=gtid_map.end()) { gtid_is=it2->second; - if (gtid_is->active == true) { - ret = gtid_is->gtid_exists(gtid_uuid,gtid_trxid); + if (gtid_is) { + if (gtid_is->active == true) { + ret = gtid_is->gtid_exists(gtid_uuid,gtid_trxid); + } } - } + } //proxy_info("Checking if server %s has GTID %s:%lu . %s\n", s1.c_str(), gtid_uuid, gtid_trxid, (ret ? "YES" : "NO")); pthread_rwlock_unlock(>id_rwlock); return ret; @@ -1352,7 +1395,9 @@ void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { std::unordered_map::iterator it = gtid_map.begin(); while(it != gtid_map.end()) { GTID_Server_Data * gtid_si = it->second; - gtid_si->active = false; + if (gtid_si) { + gtid_si->active = false; + } it++; } @@ -1370,6 +1415,11 @@ void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { GTID_Server_Data *gtid_is=NULL; if (it2!=gtid_map.end()) { gtid_is=it2->second; + if (gtid_is == NULL) { + gtid_map.erase(it2); + } + } + if (gtid_is) { gtid_is->active = true; } else { // we didn't find it. Create it @@ -3197,14 +3247,23 @@ SQLite3_result * MySQL_HostGroups_Manager::get_stats_mysql_gtid_executed() { GTID_Server_Data * gtid_si = it->second; char buf[64]; char **pta=(char **)malloc(sizeof(char *)*colnum); - pta[0]=strdup(gtid_si->address); - sprintf(buf,"%d", (int)gtid_si->mysql_port); - pta[1]=strdup(buf); - //sprintf(buf,"%d", mysrvc->port); - string s1 = gtid_executed_to_string(gtid_si->gtid_executed); - pta[2]=strdup(s1.c_str()); - sprintf(buf,"%llu", (int)gtid_si->events_read); - pta[3]=strdup(buf); + if (gtid_si) { + pta[0]=strdup(gtid_si->address); + sprintf(buf,"%d", (int)gtid_si->mysql_port); + pta[1]=strdup(buf); + //sprintf(buf,"%d", mysrvc->port); + string s1 = gtid_executed_to_string(gtid_si->gtid_executed); + pta[2]=strdup(s1.c_str()); + sprintf(buf,"%llu", (int)gtid_si->events_read); + pta[3]=strdup(buf); + } else { + string host = it->first; + pta[0]=strdup(host.c_str()); + sprintf(buf,"%d", (int)0); + pta[1]=strdup(buf); + pta[2]=strdup((char *)"NULL"); + pta[3]=strdup((char *)"0"); + } result->add_row(pta); for (k=0; k