diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 8a6d5b3d4..6beecc8ef 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -241,7 +241,7 @@ class MySQL_Monitor { unsigned long long read_only_check_ERR; unsigned long long replication_lag_check_OK; unsigned long long replication_lag_check_ERR; - wqueue queue; + wqueue * queue = NULL; MySQL_Monitor_Connection_Pool *My_Conn_Pool; bool shutdown; pthread_mutex_t mon_en_mutex; diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 45a43c9d9..3b52424f9 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -69,7 +69,7 @@ class ConsumerThread : public Thread { if (thrn) { // we took a NULL item that wasn't meant to reach here! Add it again WorkItem *item=NULL; - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); } // this is intentional to EXIT immediately return NULL; @@ -137,17 +137,121 @@ static void close_mysql(MYSQL *my) { mysql_close_no_command(my); } +class MonMySrvC { + public: + char *address; + uint16_t port; + PtrArray *conns; + MonMySrvC(char *a, uint16_t p) { + address = strdup(a); + port = p; + conns = new PtrArray(); + }; + ~MonMySrvC() { + free(address); + delete conns; + } +}; class MySQL_Monitor_Connection_Pool { private: std::mutex mutex; - std::map, std::vector > my_connections; + pthread_mutex_t m2; + PtrArray *conns; +// std::map, std::vector > my_connections; + PtrArray *servers; public: MYSQL * get_connection(char *hostname, int port); void put_connection(char *hostname, int port, MYSQL *my); - void purge_idle_connections(); +// void purge_idle_connections(); + MySQL_Monitor_Connection_Pool() { + servers = new PtrArray(); + conns = new PtrArray(); + pthread_mutex_init(&m2, NULL); + }; + void conn_register(MYSQL *my) { + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + for (unsigned int i=0; ilen; i++) { + MYSQL *my1 = (MYSQL *)conns->index(i); + assert(my!=my1); + assert(my->net.fd!=my1->net.fd); + } + fprintf(stderr,"Registering MYSQL with FD %d\n", my->net.fd); + conns->add(my); + pthread_mutex_unlock(&m2); + }; + void conn_unregister(MYSQL *my) { + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + for (unsigned int i=0; ilen; i++) { + MYSQL *my1 = (MYSQL *)conns->index(i); + if (my1 == my) { + conns->remove_index_fast(i); + fprintf(stderr,"Un-registering MYSQL with FD %d\n", my->net.fd); + pthread_mutex_unlock(&m2); + return; + } + } + assert(0); + }; }; +MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) { + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + MYSQL *my = NULL; + for (unsigned int i=0; ilen; i++) { + MonMySrvC *srv = (MonMySrvC *)servers->index(i); + if (srv->port == port && strcmp(hostname,srv->address)==0) { + if (srv->conns->len) { + for (unsigned int j=0; jconns->len; j++) { + MYSQL *my1 = (MYSQL *)srv->conns->index(j); + for (unsigned int k=0; kconns->len; k++) { + if (k!=j) { + MYSQL *my2 = (MYSQL *)srv->conns->index(k); + assert(my1!=my2); + assert(my1->net.fd!=my2->net.fd); + } + } + } + unsigned int idx = rand()%srv->conns->len; + my = (MYSQL *)srv->conns->remove_index_fast(idx); + for (unsigned int j=0; jlen; j++) { + MYSQL *my1 = (MYSQL *)conns->index(j); + assert(my!=my1); + assert(my->net.fd!=my1->net.fd); + } + } + pthread_mutex_unlock(&m2); + return my; + } + } + pthread_mutex_unlock(&m2); + return my; +} + +void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYSQL *my) { + unsigned long long now = monotonic_time(); + std::lock_guard lock(mutex); + pthread_mutex_lock(&m2); + *(unsigned long long*)my->net.buff = now; + for (unsigned int i=0; ilen; i++) { + MonMySrvC *srv = (MonMySrvC *)servers->index(i); + if (srv->port == port && strcmp(hostname,srv->address)==0) { + srv->conns->add(my); + pthread_mutex_unlock(&m2); + return; + } + } + // if no server was found + MonMySrvC *srv = new MonMySrvC(hostname,port); + srv->conns->add(my); + servers->add(srv); + pthread_mutex_unlock(&m2); +} + +/* void MySQL_Monitor_Connection_Pool::purge_idle_connections() { unsigned long long now = monotonic_time(); std::lock_guard lock(mutex); @@ -175,8 +279,8 @@ void MySQL_Monitor_Connection_Pool::purge_idle_connections() { } } } - - +*/ +/* MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) { std::lock_guard lock(mutex); auto it = my_connections.find(std::make_pair(hostname, port)); @@ -204,7 +308,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS my_connections[std::make_pair(hostname,port)].push_back(my); } } - +*/ MySQL_Monitor_State_Data::MySQL_Monitor_State_Data(char *h, int p, struct event_base *b, bool _use_ssl, int g) { task_id=MON_CONNECT; mysql=NULL; @@ -330,6 +434,8 @@ MySQL_Monitor::MySQL_Monitor() { My_Conn_Pool=new MySQL_Monitor_Connection_Pool(); + queue = new wqueue(); + pthread_mutex_init(&group_replication_mutex,NULL); Group_Replication_Hosts_resultset=NULL; @@ -535,10 +641,15 @@ void * monitor_ping_thread(void *arg) { if (mmsd->mysql==NULL) { // we don't have a connection, let's create it bool rc; rc=mmsd->create_new_connection(); + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); + } crc=true; if (rc==false) { goto __exit_monitor_ping_thread; } + } else { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); } mmsd->t1=monotonic_time(); @@ -564,6 +675,7 @@ void * monitor_ping_thread(void *arg) { } else { if (crc==false) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mmsd->mysql=NULL; } } @@ -604,17 +716,21 @@ __fast_exit_monitor_ping_thread: // if we reached here we didn't put the connection back if (mmsd->mysql_error_msg) { mysql_close(mmsd->mysql); // if we reached here we should destroy it + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mmsd->mysql=NULL; } else { if (crc) { bool rc=mmsd->set_wait_timeout(); if (rc) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); } else { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); // set_wait_timeout failed } mmsd->mysql=NULL; } else { // really not sure how we reached here, drop it + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); mmsd->mysql=NULL; } @@ -686,6 +802,8 @@ bool MySQL_Monitor_State_Data::create_new_connection() { } if (myrc==NULL) { mysql_error_msg=strdup(mysql_error(mysql)); + mysql_close(mysql); + mysql = NULL; return false; } else { // mariadb client library disables NONBLOCK for SSL connections ... re-enable it! @@ -888,6 +1006,10 @@ VALGRIND_ENABLE_ERROR_REPORTING; } } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { if (mmsd->mysql) { @@ -1154,6 +1276,10 @@ __end_process_group_replication_result: } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { if (mmsd->mysql) { @@ -1284,10 +1410,12 @@ __exit_monitor_galera_thread: bool wsrep_reject_queries = true; bool wsrep_sst_donor_rejects_queries = true; long long wsrep_local_recv_queue=0; + MYSQL_FIELD * fields=NULL; if (mmsd->interr == 0 && mmsd->result) { int num_fields=0; int num_rows=0; num_fields = mysql_num_fields(mmsd->result); + fields = mysql_fetch_fields(mmsd->result); if (num_fields!=7) { proxy_error("Incorrect number of fields, please report a bug\n"); goto __end_process_galera_result; @@ -1359,39 +1487,43 @@ __end_process_galera_result: if (mmsd->mysql_error_msg) { // there was an error checking the status of the server, surely we need to reconfigure GR MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg); } else { - if (primary_partition == false || wsrep_desync == true || wsrep_local_state!=4) { - if (primary_partition == false) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"primary_partition=NO"); - } else { - if (wsrep_desync == true) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_desync=YES"); + if (fields) { // if we didn't get any error, but fileds is NULL, we are likely hitting bug #1994 + if (primary_partition == false || wsrep_desync == true || wsrep_local_state!=4) { + if (primary_partition == false) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"primary_partition=NO"); } else { - char msg[80]; - sprintf(msg,"wsrep_local_state=%d",wsrep_local_state); - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, msg); + if (wsrep_desync == true) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_desync=YES"); + } else { + char msg[80]; + sprintf(msg,"wsrep_local_state=%d",wsrep_local_state); + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, msg); + } } - } - } else { - //if (wsrep_sst_donor_rejects_queries || wsrep_reject_queries) { - if (wsrep_reject_queries) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_reject_queries=true"); - // } else { - // // wsrep_sst_donor_rejects_queries - // MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_sst_donor_rejects_queries=true"); - // } } else { - if (read_only==true) { - if (wsrep_local_recv_queue > mmsd->max_transactions_behind) { - MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging"); + //if (wsrep_sst_donor_rejects_queries || wsrep_reject_queries) { + if (wsrep_reject_queries) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_reject_queries=true"); + // } else { + // // wsrep_sst_donor_rejects_queries + // MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"wsrep_sst_donor_rejects_queries=true"); + // } + } else { + if (read_only==true) { + if (wsrep_local_recv_queue > mmsd->max_transactions_behind) { + MyHGM->update_galera_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging"); + } else { + MyHGM->update_galera_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); + } } else { - MyHGM->update_galera_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); + // the node is a writer + // TODO: for now we don't care about the number of writers + MyHGM->update_galera_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); } - } else { - // the node is a writer - // TODO: for now we don't care about the number of writers - MyHGM->update_galera_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); } } + } else { + proxy_error("mysql_fetch_fields returns NULL. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); } } @@ -1402,6 +1534,10 @@ __end_process_galera_result: } } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { if (mmsd->mysql) { @@ -1461,10 +1597,15 @@ void * monitor_replication_lag_thread(void *arg) { if (mmsd->mysql==NULL) { // we don't have a connection, let's create it bool rc; rc=mmsd->create_new_connection(); + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); + } crc=true; if (rc==false) { goto __fast_exit_monitor_replication_lag_thread; } + } else { + GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql); } #ifdef DEBUG @@ -1524,8 +1665,14 @@ void * monitor_replication_lag_thread(void *arg) { } if (mmsd->interr) { // replication lag check failed mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql)); + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (crc==false) { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); mmsd->mysql=NULL; } @@ -1549,14 +1696,20 @@ __exit_monitor_replication_lag_thread: time_now=time_now-(mmsd->t2 - start_time); rc=sqlite3_bind_int64(statement, 3, time_now); assert(rc==SQLITE_OK); rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); - if (mmsd->result) { + if (mmsd->interr == 0 && mmsd->result) { int num_fields=0; int k=0; MYSQL_FIELD * fields=NULL; int j=-1; num_fields = mysql_num_fields(mmsd->result); fields = mysql_fetch_fields(mmsd->result); - if (fields && num_fields == 1) { + if ( + fields && ( + ( num_fields == 1 && use_percona_heartbeat == true ) + || + ( num_fields > 30 && use_percona_heartbeat == false ) + ) + ) { for(k = 0; k < num_fields; k++) { if (fields[k].name) { if (strcmp("Seconds_Behind_Master", fields[k].name)==0) { @@ -1580,7 +1733,7 @@ __exit_monitor_replication_lag_thread: rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); } } else { - proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is not 1 ( %d ). See bug #1994\n", num_fields); + proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); } mysql_free_result(mmsd->result); @@ -1600,9 +1753,15 @@ __exit_monitor_replication_lag_thread: } if (mmsd->interr) { // check failed + if (mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); + mysql_close(mmsd->mysql); + mmsd->mysql=NULL; + } } else { if (mmsd->mysql) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mmsd->mysql=NULL; } } @@ -1617,11 +1776,14 @@ __fast_exit_monitor_replication_lag_thread: bool rc=mmsd->set_wait_timeout(); if (rc) { GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); } else { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); // set_wait_timeout failed } mmsd->mysql=NULL; } else { // really not sure how we reached here, drop it + GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql); mysql_close(mmsd->mysql); mmsd->mysql=NULL; } @@ -1705,7 +1867,7 @@ void * MySQL_Monitor::monitor_connect() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_connect_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) return NULL; @@ -1753,7 +1915,7 @@ __sleep_monitor_connect_loop: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -1824,7 +1986,7 @@ void * MySQL_Monitor::monitor_ping() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_ping_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); if (GloMyMon->shutdown) return NULL; } @@ -1991,7 +2153,7 @@ __sleep_monitor_ping_loop: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2105,7 +2267,7 @@ void * MySQL_Monitor::monitor_read_only() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_read_only_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) return NULL; @@ -2153,7 +2315,7 @@ __sleep_monitor_read_only: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2225,7 +2387,7 @@ void * MySQL_Monitor::monitor_group_replication() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_group_replication_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) { @@ -2282,7 +2444,7 @@ __sleep_monitor_group_replication: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2341,7 +2503,7 @@ void * MySQL_Monitor::monitor_galera() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_galera_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) { @@ -2374,7 +2536,7 @@ __sleep_monitor_galera: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2437,7 +2599,7 @@ void * MySQL_Monitor::monitor_replication_lag() { mmsd->mondb=monitordb; WorkItem* item; item=new WorkItem(mmsd,monitor_replication_lag_thread); - GloMyMon->queue.add(item); + GloMyMon->queue->add(item); usleep(us); } if (GloMyMon->shutdown) return NULL; @@ -2485,7 +2647,7 @@ __sleep_monitor_replication_lag: } for (unsigned int i=0;iqueue.add(item); + GloMyMon->queue->add(item); } return NULL; } @@ -2505,8 +2667,8 @@ void * MySQL_Monitor::run() { mysql_thr->refresh_variables(); //if (!GloMTH) return NULL; // quick exit during shutdown/restart __monitor_run: - while (queue.size()) { // this is a clean up in case Monitor was restarted - WorkItem* item = (WorkItem*)queue.remove(); + while (queue->size()) { // this is a clean up in case Monitor was restarted + WorkItem* item = (WorkItem*)queue->remove(); if (item) { if (item->mmsd) { delete item->mmsd; @@ -2516,7 +2678,7 @@ __monitor_run: } ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*num_threads); for (unsigned int i=0;istart(2048,false); } started_threads += num_threads; @@ -2572,7 +2734,7 @@ __monitor_run: threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); started_threads += (num_threads - old_num_threads); for (unsigned int i = old_num_threads ; i < num_threads ; i++) { - threads[i] = new ConsumerThread(queue, 0); + threads[i] = new ConsumerThread(*queue, 0); threads[i]->start(2048,false); } } @@ -2582,10 +2744,10 @@ __monitor_run: monitor_enabled=mysql_thread___monitor_enabled; pthread_mutex_unlock(&mon_en_mutex); if ( rand()%5 == 0) { // purge once in a while - My_Conn_Pool->purge_idle_connections(); + //My_Conn_Pool->purge_idle_connections(); } usleep(200000); - int qsize=queue.size(); + int qsize=queue->size(); if (qsize > mysql_thread___monitor_threads_queue_maxsize/4) { proxy_warning("Monitor queue too big: %d\n", qsize); unsigned int threads_max = (unsigned int)mysql_thread___monitor_threads_max; @@ -2600,14 +2762,14 @@ __monitor_run: threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); started_threads += new_threads; for (unsigned int i = old_num_threads ; i < num_threads ; i++) { - threads[i] = new ConsumerThread(queue, 0); + threads[i] = new ConsumerThread(*queue, 0); threads[i]->start(2048,false); } } } // check again. Do we need also aux threads? usleep(50000); - qsize=queue.size(); + qsize=queue->size(); if (qsize > mysql_thread___monitor_threads_queue_maxsize) { qsize=qsize/50; unsigned int threads_max = (unsigned int)mysql_thread___monitor_threads_max; @@ -2620,7 +2782,7 @@ __monitor_run: aux_threads = qsize; started_threads += aux_threads; for (int i=0; istart(2048,false); } for (int i=0; iqueue.add(item); + GloMyMon->queue->add(item); } for (unsigned int i=0;ijoin();