|
|
|
|
@ -66,7 +66,9 @@ class ConsumerThread : public Thread {
|
|
|
|
|
// this is intentional to EXIT immediately
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
item->routine((void *)item->mmsd);
|
|
|
|
|
if (item->routine) { // NULL is allowed, do nothing for it
|
|
|
|
|
item->routine((void *)item->mmsd);
|
|
|
|
|
}
|
|
|
|
|
//routine((void *)mmsd);
|
|
|
|
|
delete item->mmsd;
|
|
|
|
|
delete item;
|
|
|
|
|
@ -136,7 +138,8 @@ class MySQL_Monitor_Connection_Pool {
|
|
|
|
|
~MySQL_Monitor_Connection_Pool();
|
|
|
|
|
MYSQL * get_connection(char *hostname, int port);
|
|
|
|
|
void put_connection(char *hostname, int port, MYSQL *my);
|
|
|
|
|
void purge_missing_servers(SQLite3_result *resultset);
|
|
|
|
|
//void purge_missing_servers(SQLite3_result *resultset);
|
|
|
|
|
void purge_idle_connections();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
MySQL_Monitor_Connection_Pool::MySQL_Monitor_Connection_Pool() {
|
|
|
|
|
@ -145,9 +148,56 @@ MySQL_Monitor_Connection_Pool::MySQL_Monitor_Connection_Pool() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MySQL_Monitor_Connection_Pool::~MySQL_Monitor_Connection_Pool() {
|
|
|
|
|
purge_missing_servers(NULL);
|
|
|
|
|
// purge_missing_servers(NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MySQL_Monitor_Connection_Pool::purge_idle_connections() {
|
|
|
|
|
pthread_mutex_lock(&mutex);
|
|
|
|
|
std::map<char *, std::list<MYSQL *>*>::iterator it;
|
|
|
|
|
for(it = my_connections.begin(); it != my_connections.end(); it++) {
|
|
|
|
|
std::list<MYSQL *> *lst=it->second;
|
|
|
|
|
if (!lst->empty()) {
|
|
|
|
|
std::list<MYSQL *>::const_iterator it3;
|
|
|
|
|
for(it3 = lst->begin(); it3 != lst->end(); it3++) {
|
|
|
|
|
//it3=lst->begin();
|
|
|
|
|
MYSQL *my=*it3;
|
|
|
|
|
unsigned long long now=monotonic_time();
|
|
|
|
|
unsigned long long then=0;
|
|
|
|
|
memcpy(&then,my->net.buff,sizeof(unsigned long long));
|
|
|
|
|
if (now > then + mysql_thread___monitor_ping_interval * 3) {
|
|
|
|
|
MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false);
|
|
|
|
|
mmsd->mysql=my;
|
|
|
|
|
WorkItem *item;
|
|
|
|
|
item=new WorkItem(mmsd,NULL);
|
|
|
|
|
GloMyMon->queue.add(item);
|
|
|
|
|
lst->remove(*it3);
|
|
|
|
|
}
|
|
|
|
|
/*
|
|
|
|
|
if (memcmp(my->net.buff,pollute_buf,8)) {
|
|
|
|
|
// the buffer is not polluted, it means it didn't match previously
|
|
|
|
|
while(!lst->empty()) {
|
|
|
|
|
my=lst->front();
|
|
|
|
|
lst->pop_front();
|
|
|
|
|
purge_lst->push_back(my);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// try to keep maximum 2 free connections
|
|
|
|
|
// dropping all the others
|
|
|
|
|
while(lst->size() > 2) {
|
|
|
|
|
my=lst->front();
|
|
|
|
|
lst->pop_front();
|
|
|
|
|
purge_lst->push_back(my);
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
my_connections.erase(it);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
void MySQL_Monitor_Connection_Pool::purge_missing_servers(SQLite3_result *resultset) {
|
|
|
|
|
#define POLLUTE_LENGTH 8
|
|
|
|
|
char pollute_buf[POLLUTE_LENGTH];
|
|
|
|
|
@ -225,6 +275,7 @@ __purge_all:
|
|
|
|
|
}
|
|
|
|
|
delete purge_lst;
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) {
|
|
|
|
|
std::map<char *, std::list<MYSQL *>* , cmp_str >::iterator it;
|
|
|
|
|
@ -241,7 +292,7 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port)
|
|
|
|
|
lst->pop_front();
|
|
|
|
|
size--;
|
|
|
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
|
memset(ret->net.buff,0,8); // reset what was polluted
|
|
|
|
|
memset(ret->net.buff,0,sizeof(unsigned long long)); // reset what was polluted
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -254,6 +305,8 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS
|
|
|
|
|
std::map<char *, std::list<MYSQL *>* , cmp_str >::iterator it;
|
|
|
|
|
char * buf=(char *)malloc(16+strlen(hostname));
|
|
|
|
|
sprintf(buf,"%s:%d",hostname,port);
|
|
|
|
|
unsigned long long now=monotonic_time();
|
|
|
|
|
memcpy(my->net.buff,&now,sizeof(unsigned long long)); //mark insert time
|
|
|
|
|
pthread_mutex_lock(&mutex);
|
|
|
|
|
it = my_connections.find(buf);
|
|
|
|
|
std::list<MYSQL *> *lst=NULL;
|
|
|
|
|
@ -835,7 +888,7 @@ void * MySQL_Monitor::monitor_connect() {
|
|
|
|
|
proxy_error("Error on %s : %s\n", query, error);
|
|
|
|
|
goto __end_monitor_connect_loop;
|
|
|
|
|
} else {
|
|
|
|
|
GloMyMon->My_Conn_Pool->purge_missing_servers(resultset);
|
|
|
|
|
//GloMyMon->My_Conn_Pool->purge_missing_servers(resultset);
|
|
|
|
|
if (resultset->rows_count==0) {
|
|
|
|
|
goto __end_monitor_connect_loop;
|
|
|
|
|
}
|
|
|
|
|
@ -1349,7 +1402,7 @@ void * MySQL_Monitor::run() {
|
|
|
|
|
if (GloMTH)
|
|
|
|
|
mysql_thr->refresh_variables();
|
|
|
|
|
//proxy_error("%s\n","MySQL_Monitor refreshing variables");
|
|
|
|
|
My_Conn_Pool->purge_missing_servers(NULL);
|
|
|
|
|
//My_Conn_Pool->purge_missing_servers(NULL);
|
|
|
|
|
}
|
|
|
|
|
usleep(500000);
|
|
|
|
|
int qsize=queue.size();
|
|
|
|
|
|