diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 0c48d3b90..5e915b5f7 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -66,7 +66,9 @@ class ConsumerThread : public Thread { // this is intentional to EXIT immediately return NULL; } - item->routine((void *)item->mmsd); + if (item->routine) { // NULL is allowed, do nothing for it + item->routine((void *)item->mmsd); + } //routine((void *)mmsd); delete item->mmsd; delete item; @@ -136,7 +138,8 @@ class MySQL_Monitor_Connection_Pool { ~MySQL_Monitor_Connection_Pool(); MYSQL * get_connection(char *hostname, int port); void put_connection(char *hostname, int port, MYSQL *my); - void purge_missing_servers(SQLite3_result *resultset); + //void purge_missing_servers(SQLite3_result *resultset); + void purge_idle_connections(); }; MySQL_Monitor_Connection_Pool::MySQL_Monitor_Connection_Pool() { @@ -145,9 +148,56 @@ MySQL_Monitor_Connection_Pool::MySQL_Monitor_Connection_Pool() { } MySQL_Monitor_Connection_Pool::~MySQL_Monitor_Connection_Pool() { - purge_missing_servers(NULL); +// purge_missing_servers(NULL); +} + +void MySQL_Monitor_Connection_Pool::purge_idle_connections() { + pthread_mutex_lock(&mutex); + std::map*>::iterator it; + for(it = my_connections.begin(); it != my_connections.end(); it++) { + std::list *lst=it->second; + if (!lst->empty()) { + std::list::const_iterator it3; + for(it3 = lst->begin(); it3 != lst->end(); it3++) { + //it3=lst->begin(); + MYSQL *my=*it3; + unsigned long long now=monotonic_time(); + unsigned long long then=0; + memcpy(&then,my->net.buff,sizeof(unsigned long long)); + if (now > then + mysql_thread___monitor_ping_interval * 3) { + MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false); + mmsd->mysql=my; + WorkItem *item; + item=new WorkItem(mmsd,NULL); + GloMyMon->queue.add(item); + lst->remove(*it3); + } +/* + if (memcmp(my->net.buff,pollute_buf,8)) { + // the buffer is not polluted, it means it didn't match previously + while(!lst->empty()) { + my=lst->front(); + lst->pop_front(); + purge_lst->push_back(my); + } + } else { + // try to keep maximum 2 free connections + // dropping all the others + while(lst->size() > 2) { + my=lst->front(); + lst->pop_front(); + purge_lst->push_back(my); + } +*/ + } + } else { + my_connections.erase(it); + } + } + pthread_mutex_unlock(&mutex); } +/* void MySQL_Monitor_Connection_Pool::purge_missing_servers(SQLite3_result *resultset) { #define POLLUTE_LENGTH 8 char pollute_buf[POLLUTE_LENGTH]; @@ -225,6 +275,7 @@ __purge_all: } delete purge_lst; } +*/ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) { std::map* , cmp_str >::iterator it; @@ -241,7 +292,7 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) lst->pop_front(); size--; pthread_mutex_unlock(&mutex); - memset(ret->net.buff,0,8); // reset what was polluted + memset(ret->net.buff,0,sizeof(unsigned long long)); // reset what was polluted return ret; } } @@ -254,6 +305,8 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS std::map* , cmp_str >::iterator it; char * buf=(char *)malloc(16+strlen(hostname)); sprintf(buf,"%s:%d",hostname,port); + unsigned long long now=monotonic_time(); + memcpy(my->net.buff,&now,sizeof(unsigned long long)); //mark insert time pthread_mutex_lock(&mutex); it = my_connections.find(buf); std::list *lst=NULL; @@ -835,7 +888,7 @@ void * MySQL_Monitor::monitor_connect() { proxy_error("Error on %s : %s\n", query, error); goto __end_monitor_connect_loop; } else { - GloMyMon->My_Conn_Pool->purge_missing_servers(resultset); + //GloMyMon->My_Conn_Pool->purge_missing_servers(resultset); if (resultset->rows_count==0) { goto __end_monitor_connect_loop; } @@ -1349,7 +1402,7 @@ void * MySQL_Monitor::run() { if (GloMTH) mysql_thr->refresh_variables(); //proxy_error("%s\n","MySQL_Monitor refreshing variables"); - My_Conn_Pool->purge_missing_servers(NULL); + //My_Conn_Pool->purge_missing_servers(NULL); } usleep(500000); int qsize=queue.size();