diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index a826bc8ec..b4aea22ff 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -178,6 +178,7 @@ public: MYSQL * get_connection(char *hostname, int port, MySQL_Monitor_State_Data *mmsd); void put_connection(char *hostname, int port, MYSQL *my); void purge_some_connections(); + void purge_all_connections(); MySQL_Monitor_Connection_Pool() { servers = std::unique_ptr(new PtrArray()); #ifdef DEBUG @@ -186,15 +187,10 @@ public: #endif // DEBUG }; ~MySQL_Monitor_Connection_Pool() { - if (servers) { - while(servers->len) { - MonMySrvC *srv = static_cast(servers->index(0)); - if (srv) { - delete srv; - } - servers->remove_index_fast(0); - } - } + purge_all_connections(); +#ifdef DEBUG + pthread_mutex_destroy(&m2); +#endif // DEBUG } void conn_register(MySQL_Monitor_State_Data *mmsd) { #ifdef DEBUG @@ -248,12 +244,33 @@ __conn_register_label: }; }; +void MySQL_Monitor_Connection_Pool::purge_all_connections() { + std::lock_guard lock(mutex); +#ifdef DEBUG + pthread_mutex_lock(&m2); +#endif + if (servers) { + while (servers->len) { + MonMySrvC* srv = static_cast(servers->index(0)); + if (srv) { + delete srv; + } + servers->remove_index_fast(0); + } + } +#ifdef DEBUG + conns->reset(); + pthread_mutex_unlock(&m2); +#endif +} + MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port, MySQL_Monitor_State_Data *mmsd) { std::lock_guard lock(mutex); #ifdef DEBUG pthread_mutex_lock(&m2); #endif // DEBUG MYSQL *my = NULL; + unsigned long long now = monotonic_time(); for (unsigned int i=0; ilen; i++) { MonMySrvC *srv = (MonMySrvC *)servers->index(i); if (srv->port == port && strcmp(hostname,srv->address)==0) { @@ -270,21 +287,33 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port, } } #endif // DEBUG - unsigned int idx = rand()%srv->conns->len; - my = (MYSQL *)srv->conns->remove_index_fast(idx); + while (srv->conns->len) { + unsigned int idx = rand() % srv->conns->len; + MYSQL* mysql = (MYSQL*)srv->conns->remove_index_fast(idx); + + if (!mysql) continue; + + // close connection if not used for a while + unsigned long long then = *(unsigned long long*)mysql->net.buff; + if (now > (then + mysql_thread___monitor_ping_interval * 1000 * 10)) { + MySQL_Monitor_State_Data* mmsd = new MySQL_Monitor_State_Data((char*)"", 0, NULL, false); + mmsd->mysql = mysql; + GloMyMon->queue->add(new WorkItem(mmsd, NULL)); + continue; + } + + my = mysql; + break; + } #ifdef DEBUG for (unsigned int j=0; jlen; j++) { MYSQL *my1 = (MYSQL *)conns->index(j); assert(my!=my1); assert(my->net.fd!=my1->net.fd); } - for (unsigned int l=0; llen; l++) { - MYSQL *my1 = (MYSQL *)conns->index(l); - assert(my!=my1); - assert(my->net.fd!=my1->net.fd); - } //proxy_info("Registering MYSQL with FD %d from mmsd %p and MYSQL %p\n", my->net.fd, mmsd, my); - conns->add(my); + if (my) + conns->add(my); #endif // DEBUG } #ifdef DEBUG @@ -3527,6 +3556,9 @@ __monitor_run: pthread_join(monitor_galera_thread,NULL); pthread_join(monitor_aws_aurora_thread,NULL); pthread_join(monitor_replication_lag_thread,NULL); + + My_Conn_Pool->purge_all_connections(); + while (shutdown==false) { unsigned int glover; if (GloMTH) { @@ -3540,6 +3572,7 @@ __monitor_run: if (mysql_thread___monitor_enabled==true) { goto __monitor_run; } + usleep(200000); } if (mysql_thr) {