diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index c0f93b7cd..8569d59ed 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -70,6 +70,7 @@ struct cmp_str { class MySQL_Monitor_Connection_Pool { private: + pthread_mutex_t mutex; int size; //std::map* > my_connections; std::map* , cmp_str> my_connections; @@ -78,13 +79,94 @@ class MySQL_Monitor_Connection_Pool { ~MySQL_Monitor_Connection_Pool(); MYSQL * get_connection(char *hostname, int port); void put_connection(char *hostname, int port, MYSQL *my); + void purge_missing_servers(SQLite3_result *resultset); }; MySQL_Monitor_Connection_Pool::MySQL_Monitor_Connection_Pool() { size=0; + pthread_mutex_init(&mutex,NULL); } MySQL_Monitor_Connection_Pool::~MySQL_Monitor_Connection_Pool() { + purge_missing_servers(NULL); +} + +void MySQL_Monitor_Connection_Pool::purge_missing_servers(SQLite3_result *resultset) { +#define POLLUTE_LENGTH 8 + char pollute_buf[POLLUTE_LENGTH]; + srand(monotonic_time()); + for (int i=0; i *purge_lst=NULL; + purge_lst=new std::list; + pthread_mutex_lock(&mutex); + if (resultset==NULL) { + goto __purge_all; + } + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + // for each host configured ... + SQLite3_row *r=*it; + char *buf=(char *)malloc(16+strlen(r->fields[0])); + sprintf(buf,"%s:%s",r->fields[0],r->fields[1]); + std::map* , cmp_str >::iterator it2; + it2 = my_connections.find(buf); // find the host + free(buf); + if (it2 != my_connections.end()) { // if the host exists + std::list *lst=it2->second; + std::list::const_iterator it3; + for (it3 = lst->begin(); it3 != lst->end(); ++it3) { + MYSQL *my=*it3; + memcpy(my->net.buff,pollute_buf,8); // pollute this buffer + // + } + } + } +__purge_all: + std::map*>::iterator it; + //std::map>::iterator it_type; + for(it = my_connections.begin(); it != my_connections.end(); it++) { + std::list *lst=it->second; + if (!lst->empty()) { + std::list::const_iterator it3; + it3=lst->begin(); + MYSQL *my=*it3; + if (memcmp(my->net.buff,pollute_buf,8)) { + // the buffer is not polluted, it means it didn't match previously + while(!lst->empty()) { + my=lst->front(); + lst->pop_front(); + purge_lst->push_back(my); + } + } else { + // try to keep maximum 2 free connections + // dropping all the others + while(lst->size() > 2) { + my=lst->front(); + lst->pop_front(); + purge_lst->push_back(my); + } + } + } + } + pthread_mutex_unlock(&mutex); + char quit_buff[5]; + memset(quit_buff,0,5); + quit_buff[0]=1; + quit_buff[4]=1; + + // close all idle connections + while (!purge_lst->empty()) { + MYSQL *my=purge_lst->front(); + purge_lst->pop_front(); + int fd=my->net.fd; + int wb=write(fd,quit_buff,5); + fd+=wb; // dummy, to make compiler happy + fd-=wb; // dummy, to make compiler happy + mysql_close_no_command(my); + shutdown(fd, SHUT_RDWR); + } + delete purge_lst; } MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) { @@ -92,6 +174,7 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) //it = my_connections.find(std::make_pair(hostname,port)); char *buf=(char *)malloc(16+strlen(hostname)); sprintf(buf,"%s:%d",hostname,port); + pthread_mutex_lock(&mutex); it = my_connections.find(buf); free(buf); if (it != my_connections.end()) { @@ -100,9 +183,12 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) MYSQL *ret=lst->front(); lst->pop_front(); size--; + pthread_mutex_unlock(&mutex); + memset(ret->net.buff,0,8); // reset what was polluted return ret; } } + pthread_mutex_unlock(&mutex); return NULL; } @@ -111,6 +197,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS std::map* , cmp_str >::iterator it; char * buf=(char *)malloc(16+strlen(hostname)); sprintf(buf,"%s:%d",hostname,port); + pthread_mutex_lock(&mutex); it = my_connections.find(buf); std::list *lst=NULL; if (it==my_connections.end()) { @@ -121,6 +208,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS lst=it->second; } lst->push_back(my); + pthread_mutex_unlock(&mutex); } enum MySQL_Monitor_State_Data_Task_Type { @@ -565,6 +653,7 @@ MySQL_Monitor::~MySQL_Monitor() { delete tables_defs_monitor; delete monitordb; delete admindb; + delete My_Conn_Pool; }; @@ -630,6 +719,13 @@ void * MySQL_Monitor::monitor_connect() { unsigned int glover; t1=monotonic_time(); + glover=GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; + mysql_thr->refresh_variables(); + next_loop_at=0; + } + if (t1 < next_loop_at) { goto __sleep_monitor_connect_loop; } @@ -643,19 +739,13 @@ void * MySQL_Monitor::monitor_connect() { // create libevent base libevent_base= event_base_new(); - glover=GloMTH->get_global_version(); - if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { - MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; - mysql_thr->refresh_variables(); - //proxy_error("%s\n", "MySQL_Monitor - CONNECT - refreshing variables"); - } - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { proxy_error("Error on %s : %s\n", query, error); goto __end_monitor_connect_loop; } else { + GloMyMon->My_Conn_Pool->purge_missing_servers(resultset); if (resultset->rows_count==0) { goto __end_monitor_connect_loop; } @@ -725,6 +815,10 @@ __sleep_monitor_connect_loop: usleep(st); } } + if (mysql_thr) { + delete mysql_thr; + mysql_thr=NULL; + } return NULL; } @@ -754,9 +848,16 @@ void * MySQL_Monitor::monitor_ping() { SQLite3_result *resultset=NULL; MySQL_Monitor_State_Data **sds=NULL; int i=0; - char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers"; + char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE status!='OFFLINE_HARD'"; t1=monotonic_time(); + glover=GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; + mysql_thr->refresh_variables(); + next_loop_at=0; + } + if (t1 < next_loop_at) { goto __sleep_monitor_ping_loop; } @@ -770,13 +871,6 @@ void * MySQL_Monitor::monitor_ping() { // create libevent base libevent_base= event_base_new(); - glover=GloMTH->get_global_version(); - if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { - MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; - mysql_thr->refresh_variables(); - //proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables"); - } - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { @@ -861,6 +955,10 @@ __sleep_monitor_ping_loop: usleep(st); } } + if (mysql_thr) { + delete mysql_thr; + mysql_thr=NULL; + } return NULL; } @@ -887,9 +985,16 @@ void * MySQL_Monitor::monitor_read_only() { SQLite3_result *resultset=NULL; MySQL_Monitor_State_Data **sds=NULL; int i=0; - char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup"; + char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status!='OFFLINE_HARD'"; t1=monotonic_time(); + glover=GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; + mysql_thr->refresh_variables(); + next_loop_at=0; + } + if (t1 < next_loop_at) { goto __sleep_monitor_read_only; } @@ -903,13 +1008,6 @@ void * MySQL_Monitor::monitor_read_only() { // create libevent base libevent_base= event_base_new(); - glover=GloMTH->get_global_version(); - if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { - MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; - mysql_thr->refresh_variables(); - //proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables"); - } - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); // admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); resultset = MyHGM->execute_query(query, &error); @@ -1036,6 +1134,10 @@ __sleep_monitor_read_only: usleep(st); } } + if (mysql_thr) { + delete mysql_thr; + mysql_thr=NULL; + } return NULL; } @@ -1065,6 +1167,13 @@ void * MySQL_Monitor::monitor_replication_lag() { char *query=(char *)"SELECT hostgroup_id, hostname, port, max_replication_lag FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT LIKE 'OFFLINE%'"; t1=monotonic_time(); + glover=GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; + mysql_thr->refresh_variables(); + next_loop_at=0; + } + if (t1 < next_loop_at) { goto __sleep_monitor_replication_lag; } @@ -1078,13 +1187,6 @@ void * MySQL_Monitor::monitor_replication_lag() { // create libevent base libevent_base= event_base_new(); - glover=GloMTH->get_global_version(); - if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { - MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; - mysql_thr->refresh_variables(); - //proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables"); - } - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); // admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); resultset = MyHGM->execute_query(query, &error); @@ -1207,6 +1309,10 @@ __sleep_monitor_replication_lag: usleep(st); } } + if (mysql_thr) { + delete mysql_thr; + mysql_thr=NULL; + } return NULL; } @@ -1227,6 +1333,7 @@ void * MySQL_Monitor::run() { MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; mysql_thr->refresh_variables(); //proxy_error("%s\n","MySQL_Monitor refreshing variables"); + My_Conn_Pool->purge_missing_servers(NULL); } usleep(500000); } @@ -1234,5 +1341,9 @@ void * MySQL_Monitor::run() { monitor_ping_thread->join(); monitor_read_only_thread->join(); monitor_replication_lag_thread->join(); + if (mysql_thr) { + delete mysql_thr; + mysql_thr=NULL; + } return NULL; };