From 48a3547575c29ee5aa43bbbc4223d3a9e596b3d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 9 Apr 2019 14:11:46 +1000 Subject: [PATCH 1/5] Further error handling for Monitor --- lib/MySQL_Monitor.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 8ebd0136c..c75ba61ab 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -1491,14 +1491,20 @@ __exit_monitor_replication_lag_thread: time_now=time_now-(mmsd->t2 - start_time); rc=sqlite3_bind_int64(statement, 3, time_now); assert(rc==SQLITE_OK); rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); - if (mmsd->result) { + if (mmsd->interr == 0 && mmsd->result) { int num_fields=0; int k=0; MYSQL_FIELD * fields=NULL; int j=-1; num_fields = mysql_num_fields(mmsd->result); fields = mysql_fetch_fields(mmsd->result); - if (fields && num_fields == 1) { + if ( + fields && ( + ( num_fields == 1 && use_percona_heartbeat == true ) + || + ( num_fields > 30 && use_percona_heartbeat == false ) + ) + ) { for(k = 0; k < num_fields; k++) { if (fields[k].name) { if (strcmp("Seconds_Behind_Master", fields[k].name)==0) { @@ -1522,7 +1528,7 @@ __exit_monitor_replication_lag_thread: rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); } } else { - proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is not 1 ( %d ). See bug #1994\n", num_fields); + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. See bug #1994\n"); rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); } mysql_free_result(mmsd->result); From d11ec3827757f5f176df944e7d0c449541f5427b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 9 Apr 2019 14:51:48 +1000 Subject: [PATCH 2/5] More error handling for Galera #1994 --- lib/MySQL_Monitor.cpp | 78 ++++++++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index c75ba61ab..9ffbeb6f3 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -841,6 +841,10 @@ VALGRIND_ENABLE_ERROR_REPORTING; } } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { if (mmsd->mysql) { @@ -1106,6 +1110,10 @@ __end_process_group_replication_result: } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { if (mmsd->mysql) { @@ -1235,10 +1243,12 @@ __exit_monitor_galera_thread: bool wsrep_reject_queries = true; bool wsrep_sst_donor_rejects_queries = true; long long wsrep_local_recv_queue=0; + MYSQL_FIELD * fields=NULL; if (mmsd->interr == 0 && mmsd->result) { int num_fields=0; int num_rows=0; num_fields = mysql_num_fields(mmsd->result); + fields = mysql_fetch_fields(mmsd->result); if (num_fields!=7) { proxy_error("Incorrect number of fields, please report a bug\n"); goto __end_process_galera_result; @@ -1310,39 +1320,43 @@ __end_process_galera_result: if (mmsd->mysql_error_msg) { // there was an error checking the status of the server, surely we need to reconfigure GR MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg); } else { - if (primary_partition == false || wsrep_desync == true || wsrep_local_state!=4) { - if (primary_partition == false) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"primary_partition=NO"); - } else { - if (wsrep_desync == true) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_desync=YES"); + if (fields) { // if we didn't get any error, but fileds is NULL, we are likely hitting bug #1994 + if (primary_partition == false || wsrep_desync == true || wsrep_local_state!=4) { + if (primary_partition == false) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"primary_partition=NO"); } else { - char msg[80]; - sprintf(msg,"wsrep_local_state=%d",wsrep_local_state); - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, msg); + if (wsrep_desync == true) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_desync=YES"); + } else { + char msg[80]; + sprintf(msg,"wsrep_local_state=%d",wsrep_local_state); + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, msg); + } } - } - } else { - //if (wsrep_sst_donor_rejects_queries || wsrep_reject_queries) { - if (wsrep_reject_queries) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_reject_queries=true"); - // } else { - // // wsrep_sst_donor_rejects_queries - // MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_sst_donor_rejects_queries=true"); - // } } else { - if (read_only==true) { - if (wsrep_local_recv_queue > mmsd->max_transactions_behind) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging"); + //if (wsrep_sst_donor_rejects_queries || wsrep_reject_queries) { + if (wsrep_reject_queries) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_reject_queries=true"); + // } else { + // // wsrep_sst_donor_rejects_queries + // MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_sst_donor_rejects_queries=true"); + // } + } else { + if (read_only==true) { + if (wsrep_local_recv_queue > mmsd->max_transactions_behind) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging"); + } else { + MyHGM->update_galera_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); + } } else { - MyHGM->update_galera_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); + // the node is a writer + // TODO: for now we don't care about the number of writers + MyHGM->update_galera_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); } - } else { - // the node is a writer - // TODO: for now we don't care about the number of writers - MyHGM->update_galera_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); } } + } else { + proxy_error("mysql_fetch_fields returns NULL. See bug #1994\n"); } } @@ -1353,6 +1367,10 @@ __end_process_galera_result: } } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { if (mmsd->mysql) { @@ -1466,6 +1484,10 @@ void * monitor_replication_lag_thread(void *arg) { } if (mmsd->interr) { // replication lag check failed mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); @@ -1548,6 +1570,10 @@ __exit_monitor_replication_lag_thread: } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (mmsd->mysql) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); From 0fc8694a05653c941023cb7bacd8daaaea47a4e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 9 Apr 2019 15:27:20 +1000 Subject: [PATCH 3/5] Increased verbosity for bug #1994 --- lib/MySQL_Monitor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 9ffbeb6f3..13a64aa9d 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -1356,7 +1356,7 @@ __end_process_galera_result: } } } else { - proxy_error("mysql_fetch_fields returns NULL. See bug #1994\n"); + proxy_error("mysql_fetch_fields returns NULL. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); } } @@ -1550,7 +1550,7 @@ __exit_monitor_replication_lag_thread: rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); } } else { - proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. See bug #1994\n"); + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); } mysql_free_result(mmsd->result); From 483e7ae9da9ef194b91e808f4db5fd57b4b96910 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 9 Apr 2019 20:37:17 +1000 Subject: [PATCH 4/5] Adding mysql_thread_init() in Monitor --- lib/MySQL_Monitor.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 13a64aa9d..d8bd5100f 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -429,6 +429,7 @@ void MySQL_Monitor::check_and_build_standard_tables(SQLite3DB *db, std::vectorcurtime=monotonic_time(); @@ -1148,6 +1152,7 @@ __fast_exit_monitor_group_replication_thread: } void * monitor_galera_thread(void *arg) { + mysql_thread_init(); MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1405,6 +1410,7 @@ __fast_exit_monitor_galera_thread: } void * monitor_replication_lag_thread(void *arg) { + mysql_thread_init(); MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); From 92d96815c6fc872e57620aea2346f982701e5062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Wed, 10 Apr 2019 19:40:01 +1000 Subject: [PATCH 5/5] Rewriting connection pool in Monitor A lot of debugging code. Most of it should be removed! --- include/MySQL_Monitor.hpp | 2 +- lib/MySQL_Monitor.cpp | 206 +++++++++++++++++++++++++++++++------- 2 files changed, 173 insertions(+), 35 deletions(-) diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index b40a2a42a..d8e79026f 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -169,7 +169,7 @@ class MySQL_Monitor { unsigned long long read_only_check_ERR; unsigned long long replication_lag_check_OK; unsigned long long replication_lag_check_ERR; - wqueue queue; + wqueue * queue = NULL; MySQL_Monitor_Connection_Pool *My_Conn_Pool; bool shutdown; bool monitor_enabled; diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index d8bd5100f..76fd7e079 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -69,7 +69,7 @@ class ConsumerThread : public Thread { if (thrn) { // we took a NULL item that wasn't meant to reach here! Add it again WorkItem *item=NULL; - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); } // this is intentional to EXIT immediately return NULL; @@ -133,17 +133,121 @@ static void close_mysql(MYSQL *my) { mysql_close_no_command(my); } +class MonMySrvC { + public: + char *address; + uint16_t port; + PtrArray *conns; + MonMySrvC(char *a, uint16_t p) { + address = strdup(a); + port = p; + conns = new PtrArray(); + }; + ~MonMySrvC() { + free(address); + delete conns; + } +}; class MySQL_Monitor_Connection_Pool { private: std::mutex mutex; - std::map, std::vector > my_connections; + pthread_mutex_t m2; + PtrArray *conns; +// std::map, std::vector > my_connections; + PtrArray *servers; public: MYSQL * get_connection(char *hostname, int port); void put_connection(char *hostname, int port, MYSQL *my); - void purge_idle_connections(); +// void purge_idle_connections(); + MySQL_Monitor_Connection_Pool() { + servers = new PtrArray(); + conns = new PtrArray(); + pthread_mutex_init(&m2, NULL); + }; + void conn_register(MYSQL *my) { + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + for (unsigned int i=0; ilen; i++) { + MYSQL *my1 = (MYSQL *)conns->index(i); + assert(my!=my1); + assert(my->net.fd!=my1->net.fd); + } + fprintf(stderr,"Registering MYSQL with FD %d\n", my->net.fd); + conns->add(my); + pthread_mutex_unlock(&m2); + }; + void conn_unregister(MYSQL *my) { + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + for (unsigned int i=0; ilen; i++) { + MYSQL *my1 = (MYSQL *)conns->index(i); + if (my1 == my) { + conns->remove_index_fast(i); + fprintf(stderr,"Un-registering MYSQL with FD %d\n", my->net.fd); + pthread_mutex_unlock(&m2); + return; + } + } + assert(0); + }; }; +MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) { + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + MYSQL *my = NULL; + for (unsigned int i=0; ilen; i++) { + MonMySrvC *srv = (MonMySrvC *)servers->index(i); + if (srv->port == port && strcmp(hostname,srv->address)==0) { + if (srv->conns->len) { + for (unsigned int j=0; jconns->len; j++) { + MYSQL *my1 = (MYSQL *)srv->conns->index(j); + for (unsigned int k=0; kconns->len; k++) { + if (k!=j) { + MYSQL *my2 = (MYSQL *)srv->conns->index(k); + assert(my1!=my2); + assert(my1->net.fd!=my2->net.fd); + } + } + } + unsigned int idx = rand()%srv->conns->len; + my = (MYSQL *)srv->conns->remove_index_fast(idx); + for (unsigned int j=0; jlen; j++) { + MYSQL *my1 = (MYSQL *)conns->index(j); + assert(my!=my1); + assert(my->net.fd!=my1->net.fd); + } + } + pthread_mutex_unlock(&m2); + return my; + } + } + pthread_mutex_unlock(&m2); + return my; +} + +void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYSQL *my) { + unsigned long long now = monotonic_time(); + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + *(unsigned long long*)my->net.buff = now; + for (unsigned int i=0; ilen; i++) { + MonMySrvC *srv = (MonMySrvC *)servers->index(i); + if (srv->port == port && strcmp(hostname,srv->address)==0) { + srv->conns->add(my); + pthread_mutex_unlock(&m2); + return; + } + } + // if no server was found + MonMySrvC *srv = new MonMySrvC(hostname,port); + srv->conns->add(my); + servers->add(srv); + pthread_mutex_unlock(&m2); +} + +/* void MySQL_Monitor_Connection_Pool::purge_idle_connections() { unsigned long long now = monotonic_time(); std::lock_guard lock(mutex); @@ -171,8 +275,8 @@ void MySQL_Monitor_Connection_Pool::purge_idle_connections() { } } } - - +*/ +/* MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) { std::lock_guard lock(mutex); auto it = my_connections.find(std::make_pair(hostname, port)); @@ -200,7 +304,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS my_connections[std::make_pair(hostname,port)].push_back(my); } } - +*/ MySQL_Monitor_State_Data::MySQL_Monitor_State_Data(char *h, int p, struct event_base *b, bool _use_ssl, int g) { task_id=MON_CONNECT; mysql=NULL; @@ -313,6 +417,8 @@ MySQL_Monitor::MySQL_Monitor() { My_Conn_Pool=new MySQL_Monitor_Connection_Pool(); + queue = new wqueue(); + pthread_mutex_init(&group_replication_mutex,NULL); Group_Replication_Hosts_resultset=NULL; @@ -429,7 +535,7 @@ void MySQL_Monitor::check_and_build_standard_tables(SQLite3DB *db, std::vectormysql==NULL) { // we don't have a connection, let's create it bool rc; rc=mmsd->create_new_connection(); + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); + } crc=true; if (rc==false) { goto __exit_monitor_ping_thread; } + } else { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); } mmsd->t1=monotonic_time(); @@ -529,6 +640,7 @@ void * monitor_ping_thread(void *arg) { } else { if (crc==false) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mmsd->mysql=NULL; } } @@ -563,17 +675,21 @@ __fast_exit_monitor_ping_thread: // if we reached here we didn't put the connection back if (mmsd->mysql_error_msg) { mysql_close(mmsd->mysql); // if we reached here we should destroy it + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mmsd->mysql=NULL; } else { if (crc) { bool rc=mmsd->set_wait_timeout(); if (rc) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); } else { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); // set_wait_timeout failed } mmsd->mysql=NULL; } else { // really not sure how we reached here, drop it + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); mmsd->mysql=NULL; } @@ -642,6 +758,8 @@ bool MySQL_Monitor_State_Data::create_new_connection() { } if (myrc==NULL) { mysql_error_msg=strdup(mysql_error(mysql)); + mysql_close(mysql); + mysql = NULL; return false; } else { // mariadb client library disables NONBLOCK for SSL connections ... re-enable it! @@ -658,7 +776,7 @@ bool MySQL_Monitor_State_Data::create_new_connection() { } void * monitor_read_only_thread(void *arg) { - mysql_thread_init(); + mysql_close(mysql_init(NULL)); bool timeout_reached = false; MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; if (!GloMTH) return NULL; // quick exit during shutdown/restart @@ -887,7 +1005,7 @@ __fast_exit_monitor_read_only_thread: } void * monitor_group_replication_thread(void *arg) { - mysql_thread_init(); + mysql_close(mysql_init(NULL)); MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1152,7 +1270,7 @@ __fast_exit_monitor_group_replication_thread: } void * monitor_galera_thread(void *arg) { - mysql_thread_init(); + mysql_close(mysql_init(NULL)); MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1410,13 +1528,17 @@ __fast_exit_monitor_galera_thread: } void * monitor_replication_lag_thread(void *arg) { - mysql_thread_init(); + mysql_close(mysql_init(NULL)); MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); mysql_thr->refresh_variables(); +#ifdef DEBUG + MYSQL *mysqlcopy = NULL; +#endif // DEBUG + mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port); unsigned long long start_time=mysql_thr->curtime; @@ -1431,12 +1553,21 @@ void * monitor_replication_lag_thread(void *arg) { if (mmsd->mysql==NULL) { // we don't have a connection, let's create it bool rc; rc=mmsd->create_new_connection(); + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); + } crc=true; if (rc==false) { goto __fast_exit_monitor_replication_lag_thread; } + } else { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); } +#ifdef DEBUG + mysqlcopy = mmsd->mysql; +#endif // DEBUG + mmsd->t1=monotonic_time(); mmsd->interr=0; // reset the value if (percona_heartbeat_table) { @@ -1491,11 +1622,13 @@ void * monitor_replication_lag_thread(void *arg) { if (mmsd->interr) { // replication lag check failed mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); mmsd->mysql=NULL; } } else { if (crc==false) { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); mmsd->mysql=NULL; } @@ -1577,12 +1710,14 @@ __exit_monitor_replication_lag_thread: } if (mmsd->interr) { // check failed if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); mmsd->mysql=NULL; } } else { if (mmsd->mysql) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mmsd->mysql=NULL; } } @@ -1597,11 +1732,14 @@ __fast_exit_monitor_replication_lag_thread: bool rc=mmsd->set_wait_timeout(); if (rc) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); } else { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); // set_wait_timeout failed } mmsd->mysql=NULL; } else { // really not sure how we reached here, drop it + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); mmsd->mysql=NULL; } @@ -1685,7 +1823,7 @@ void * MySQL_Monitor::monitor_connect() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_connect_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) return NULL; @@ -1733,7 +1871,7 @@ __sleep_monitor_connect_loop: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -1804,7 +1942,7 @@ void * MySQL_Monitor::monitor_ping() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_ping_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); if (GloMyMon->shutdown) return NULL; } @@ -1971,7 +2109,7 @@ __sleep_monitor_ping_loop: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2085,7 +2223,7 @@ void * MySQL_Monitor::monitor_read_only() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_read_only_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) return NULL; @@ -2133,7 +2271,7 @@ __sleep_monitor_read_only: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2205,7 +2343,7 @@ void * MySQL_Monitor::monitor_group_replication() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_group_replication_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) { @@ -2262,7 +2400,7 @@ __sleep_monitor_group_replication: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2321,7 +2459,7 @@ void * MySQL_Monitor::monitor_galera() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_galera_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) { @@ -2354,7 +2492,7 @@ __sleep_monitor_galera: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2417,7 +2555,7 @@ void * MySQL_Monitor::monitor_replication_lag() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_replication_lag_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) return NULL; @@ -2465,7 +2603,7 @@ __sleep_monitor_replication_lag: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2484,8 +2622,8 @@ void * MySQL_Monitor::run() { mysql_thr->refresh_variables(); //if (!GloMTH) return NULL; // quick exit during shutdown/restart __monitor_run: - while (queue.size()) { // this is a clean up in case Monitor was restarted - WorkItem* item = (WorkItem*)queue.remove(); + while (queue->size()) { // this is a clean up in case Monitor was restarted + WorkItem* item = (WorkItem*)queue->remove(); if (item) { if (item->mmsd) { delete item->mmsd; @@ -2495,7 +2633,7 @@ __monitor_run: } ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*num_threads); for (unsigned int i=0;istart(2048,false); } started_threads += num_threads; @@ -2546,7 +2684,7 @@ __monitor_run: threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); started_threads += (num_threads - old_num_threads); for (unsigned int i = old_num_threads ; i < num_threads ; i++) { - threads[i] = new ConsumerThread(queue, 0); + threads[i] = new ConsumerThread(*queue, 0); threads[i]->start(2048,false); } } @@ -2554,10 +2692,10 @@ __monitor_run: } monitor_enabled=mysql_thread___monitor_enabled; if ( rand()%5 == 0) { // purge once in a while - My_Conn_Pool->purge_idle_connections(); + //My_Conn_Pool->purge_idle_connections(); } usleep(200000); - int qsize=queue.size(); + int qsize=queue->size(); if (qsize > mysql_thread___monitor_threads_queue_maxsize/4) { proxy_warning("Monitor queue too big: %d\n", qsize); unsigned int threads_max = (unsigned int)mysql_thread___monitor_threads_max; @@ -2572,14 +2710,14 @@ __monitor_run: threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); started_threads += new_threads; for (unsigned int i = old_num_threads ; i < num_threads ; i++) { - threads[i] = new ConsumerThread(queue, 0); + threads[i] = new ConsumerThread(*queue, 0); threads[i]->start(2048,false); } } } // check again. Do we need also aux threads? usleep(50000); - qsize=queue.size(); + qsize=queue->size(); if (qsize > mysql_thread___monitor_threads_queue_maxsize) { qsize=qsize/50; unsigned int threads_max = (unsigned int)mysql_thread___monitor_threads_max; @@ -2592,7 +2730,7 @@ __monitor_run: aux_threads = qsize; started_threads += aux_threads; for (int i=0; istart(2048,false); } for (int i=0; iqueue.add(item); + GloMyMon->queue->add(item); } for (unsigned int i=0;ijoin();