Rewriting connection pool in Monitor

A lot of debugging code.
Most of it should be removed!
pull/2025/head
René Cannaò 7 years ago
parent 483e7ae9da
commit 92d96815c6

@ -169,7 +169,7 @@ class MySQL_Monitor {
unsigned long long read_only_check_ERR;
unsigned long long replication_lag_check_OK;
unsigned long long replication_lag_check_ERR;
wqueue<WorkItem*> queue;
wqueue<WorkItem*> * queue = NULL;
MySQL_Monitor_Connection_Pool *My_Conn_Pool;
bool shutdown;
bool monitor_enabled;

@ -69,7 +69,7 @@ class ConsumerThread : public Thread {
if (thrn) {
// we took a NULL item that wasn't meant to reach here! Add it again
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
// this is intentional to EXIT immediately
return NULL;
@ -133,17 +133,121 @@ static void close_mysql(MYSQL *my) {
mysql_close_no_command(my);
}
class MonMySrvC {
public:
char *address;
uint16_t port;
PtrArray *conns;
MonMySrvC(char *a, uint16_t p) {
address = strdup(a);
port = p;
conns = new PtrArray();
};
~MonMySrvC() {
free(address);
delete conns;
}
};
class MySQL_Monitor_Connection_Pool {
private:
std::mutex mutex;
std::map<std::pair<std::string, int>, std::vector<MYSQL*> > my_connections;
pthread_mutex_t m2;
PtrArray *conns;
// std::map<std::pair<std::string, int>, std::vector<MYSQL*> > my_connections;
PtrArray *servers;
public:
MYSQL * get_connection(char *hostname, int port);
void put_connection(char *hostname, int port, MYSQL *my);
void purge_idle_connections();
// void purge_idle_connections();
MySQL_Monitor_Connection_Pool() {
servers = new PtrArray();
conns = new PtrArray();
pthread_mutex_init(&m2, NULL);
};
void conn_register(MYSQL *my) {
std::lock_guard<std::mutex> lock(mutex);
pthread_mutex_lock(&m2);
for (unsigned int i=0; i<conns->len; i++) {
MYSQL *my1 = (MYSQL *)conns->index(i);
assert(my!=my1);
assert(my->net.fd!=my1->net.fd);
}
fprintf(stderr,"Registering MYSQL with FD %d\n", my->net.fd);
conns->add(my);
pthread_mutex_unlock(&m2);
};
void conn_unregister(MYSQL *my) {
std::lock_guard<std::mutex> lock(mutex);
pthread_mutex_lock(&m2);
for (unsigned int i=0; i<conns->len; i++) {
MYSQL *my1 = (MYSQL *)conns->index(i);
if (my1 == my) {
conns->remove_index_fast(i);
fprintf(stderr,"Un-registering MYSQL with FD %d\n", my->net.fd);
pthread_mutex_unlock(&m2);
return;
}
}
assert(0);
};
};
MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) {
std::lock_guard<std::mutex> lock(mutex);
pthread_mutex_lock(&m2);
MYSQL *my = NULL;
for (unsigned int i=0; i<servers->len; i++) {
MonMySrvC *srv = (MonMySrvC *)servers->index(i);
if (srv->port == port && strcmp(hostname,srv->address)==0) {
if (srv->conns->len) {
for (unsigned int j=0; j<srv->conns->len; j++) {
MYSQL *my1 = (MYSQL *)srv->conns->index(j);
for (unsigned int k=0; k<srv->conns->len; k++) {
if (k!=j) {
MYSQL *my2 = (MYSQL *)srv->conns->index(k);
assert(my1!=my2);
assert(my1->net.fd!=my2->net.fd);
}
}
}
unsigned int idx = rand()%srv->conns->len;
my = (MYSQL *)srv->conns->remove_index_fast(idx);
for (unsigned int j=0; j<conns->len; j++) {
MYSQL *my1 = (MYSQL *)conns->index(j);
assert(my!=my1);
assert(my->net.fd!=my1->net.fd);
}
}
pthread_mutex_unlock(&m2);
return my;
}
}
pthread_mutex_unlock(&m2);
return my;
}
void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYSQL *my) {
unsigned long long now = monotonic_time();
std::lock_guard<std::mutex> lock(mutex);
pthread_mutex_lock(&m2);
*(unsigned long long*)my->net.buff = now;
for (unsigned int i=0; i<servers->len; i++) {
MonMySrvC *srv = (MonMySrvC *)servers->index(i);
if (srv->port == port && strcmp(hostname,srv->address)==0) {
srv->conns->add(my);
pthread_mutex_unlock(&m2);
return;
}
}
// if no server was found
MonMySrvC *srv = new MonMySrvC(hostname,port);
srv->conns->add(my);
servers->add(srv);
pthread_mutex_unlock(&m2);
}
/*
void MySQL_Monitor_Connection_Pool::purge_idle_connections() {
unsigned long long now = monotonic_time();
std::lock_guard<std::mutex> lock(mutex);
@ -171,8 +275,8 @@ void MySQL_Monitor_Connection_Pool::purge_idle_connections() {
}
}
}
*/
/*
MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) {
std::lock_guard<std::mutex> lock(mutex);
auto it = my_connections.find(std::make_pair(hostname, port));
@ -200,7 +304,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS
my_connections[std::make_pair(hostname,port)].push_back(my);
}
}
*/
MySQL_Monitor_State_Data::MySQL_Monitor_State_Data(char *h, int p, struct event_base *b, bool _use_ssl, int g) {
task_id=MON_CONNECT;
mysql=NULL;
@ -313,6 +417,8 @@ MySQL_Monitor::MySQL_Monitor() {
My_Conn_Pool=new MySQL_Monitor_Connection_Pool();
queue = new wqueue<WorkItem*>();
pthread_mutex_init(&group_replication_mutex,NULL);
Group_Replication_Hosts_resultset=NULL;
@ -429,7 +535,7 @@ void MySQL_Monitor::check_and_build_standard_tables(SQLite3DB *db, std::vector<t
};
void * monitor_connect_thread(void *arg) {
mysql_thread_init();
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
if (!GloMTH) return NULL; // quick exit during shutdown/restart
MySQL_Thread * mysql_thr = new MySQL_Thread();
@ -484,7 +590,7 @@ void * monitor_connect_thread(void *arg) {
}
void * monitor_ping_thread(void *arg) {
mysql_thread_init();
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
if (!GloMTH) return NULL; // quick exit during shutdown/restart
MySQL_Thread * mysql_thr = new MySQL_Thread();
@ -500,10 +606,15 @@ void * monitor_ping_thread(void *arg) {
if (mmsd->mysql==NULL) { // we don't have a connection, let's create it
bool rc;
rc=mmsd->create_new_connection();
if (mmsd->mysql) {
GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql);
}
crc=true;
if (rc==false) {
goto __exit_monitor_ping_thread;
}
} else {
GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql);
}
mmsd->t1=monotonic_time();
@ -529,6 +640,7 @@ void * monitor_ping_thread(void *arg) {
} else {
if (crc==false) {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mmsd->mysql=NULL;
}
}
@ -563,17 +675,21 @@ __fast_exit_monitor_ping_thread:
// if we reached here we didn't put the connection back
if (mmsd->mysql_error_msg) {
mysql_close(mmsd->mysql); // if we reached here we should destroy it
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mmsd->mysql=NULL;
} else {
if (crc) {
bool rc=mmsd->set_wait_timeout();
if (rc) {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
} else {
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mysql_close(mmsd->mysql); // set_wait_timeout failed
}
mmsd->mysql=NULL;
} else { // really not sure how we reached here, drop it
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mysql_close(mmsd->mysql);
mmsd->mysql=NULL;
}
@ -642,6 +758,8 @@ bool MySQL_Monitor_State_Data::create_new_connection() {
}
if (myrc==NULL) {
mysql_error_msg=strdup(mysql_error(mysql));
mysql_close(mysql);
mysql = NULL;
return false;
} else {
// mariadb client library disables NONBLOCK for SSL connections ... re-enable it!
@ -658,7 +776,7 @@ bool MySQL_Monitor_State_Data::create_new_connection() {
}
void * monitor_read_only_thread(void *arg) {
mysql_thread_init();
mysql_close(mysql_init(NULL));
bool timeout_reached = false;
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
if (!GloMTH) return NULL; // quick exit during shutdown/restart
@ -887,7 +1005,7 @@ __fast_exit_monitor_read_only_thread:
}
void * monitor_group_replication_thread(void *arg) {
mysql_thread_init();
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
@ -1152,7 +1270,7 @@ __fast_exit_monitor_group_replication_thread:
}
void * monitor_galera_thread(void *arg) {
mysql_thread_init();
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
@ -1410,13 +1528,17 @@ __fast_exit_monitor_galera_thread:
}
void * monitor_replication_lag_thread(void *arg) {
mysql_thread_init();
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
if (!GloMTH) return NULL; // quick exit during shutdown/restart
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
mysql_thr->refresh_variables();
#ifdef DEBUG
MYSQL *mysqlcopy = NULL;
#endif // DEBUG
mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port);
unsigned long long start_time=mysql_thr->curtime;
@ -1431,12 +1553,21 @@ void * monitor_replication_lag_thread(void *arg) {
if (mmsd->mysql==NULL) { // we don't have a connection, let's create it
bool rc;
rc=mmsd->create_new_connection();
if (mmsd->mysql) {
GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql);
}
crc=true;
if (rc==false) {
goto __fast_exit_monitor_replication_lag_thread;
}
} else {
GloMyMon->My_Conn_Pool->conn_register(mmsd->mysql);
}
#ifdef DEBUG
mysqlcopy = mmsd->mysql;
#endif // DEBUG
mmsd->t1=monotonic_time();
mmsd->interr=0; // reset the value
if (percona_heartbeat_table) {
@ -1491,11 +1622,13 @@ void * monitor_replication_lag_thread(void *arg) {
if (mmsd->interr) { // replication lag check failed
mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql));
if (mmsd->mysql) {
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mysql_close(mmsd->mysql);
mmsd->mysql=NULL;
}
} else {
if (crc==false) {
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
mmsd->mysql=NULL;
}
@ -1577,12 +1710,14 @@ __exit_monitor_replication_lag_thread:
}
if (mmsd->interr) { // check failed
if (mmsd->mysql) {
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mysql_close(mmsd->mysql);
mmsd->mysql=NULL;
}
} else {
if (mmsd->mysql) {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mmsd->mysql=NULL;
}
}
@ -1597,11 +1732,14 @@ __fast_exit_monitor_replication_lag_thread:
bool rc=mmsd->set_wait_timeout();
if (rc) {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
} else {
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mysql_close(mmsd->mysql); // set_wait_timeout failed
}
mmsd->mysql=NULL;
} else { // really not sure how we reached here, drop it
GloMyMon->My_Conn_Pool->conn_unregister(mmsd->mysql);
mysql_close(mmsd->mysql);
mmsd->mysql=NULL;
}
@ -1685,7 +1823,7 @@ void * MySQL_Monitor::monitor_connect() {
mmsd->mondb=monitordb;
WorkItem* item;
item=new WorkItem(mmsd,monitor_connect_thread);
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
usleep(us);
}
if (GloMyMon->shutdown) return NULL;
@ -1733,7 +1871,7 @@ __sleep_monitor_connect_loop:
}
for (unsigned int i=0;i<num_threads; i++) {
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
return NULL;
}
@ -1804,7 +1942,7 @@ void * MySQL_Monitor::monitor_ping() {
mmsd->mondb=monitordb;
WorkItem* item;
item=new WorkItem(mmsd,monitor_ping_thread);
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
usleep(us);
if (GloMyMon->shutdown) return NULL;
}
@ -1971,7 +2109,7 @@ __sleep_monitor_ping_loop:
}
for (unsigned int i=0;i<num_threads; i++) {
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
return NULL;
}
@ -2085,7 +2223,7 @@ void * MySQL_Monitor::monitor_read_only() {
mmsd->mondb=monitordb;
WorkItem* item;
item=new WorkItem(mmsd,monitor_read_only_thread);
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
usleep(us);
}
if (GloMyMon->shutdown) return NULL;
@ -2133,7 +2271,7 @@ __sleep_monitor_read_only:
}
for (unsigned int i=0;i<num_threads; i++) {
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
return NULL;
}
@ -2205,7 +2343,7 @@ void * MySQL_Monitor::monitor_group_replication() {
mmsd->mondb=monitordb;
WorkItem* item;
item=new WorkItem(mmsd,monitor_group_replication_thread);
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
usleep(us);
}
if (GloMyMon->shutdown) {
@ -2262,7 +2400,7 @@ __sleep_monitor_group_replication:
}
for (unsigned int i=0;i<num_threads; i++) {
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
return NULL;
}
@ -2321,7 +2459,7 @@ void * MySQL_Monitor::monitor_galera() {
mmsd->mondb=monitordb;
WorkItem* item;
item=new WorkItem(mmsd,monitor_galera_thread);
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
usleep(us);
}
if (GloMyMon->shutdown) {
@ -2354,7 +2492,7 @@ __sleep_monitor_galera:
}
for (unsigned int i=0;i<num_threads; i++) {
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
return NULL;
}
@ -2417,7 +2555,7 @@ void * MySQL_Monitor::monitor_replication_lag() {
mmsd->mondb=monitordb;
WorkItem* item;
item=new WorkItem(mmsd,monitor_replication_lag_thread);
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
usleep(us);
}
if (GloMyMon->shutdown) return NULL;
@ -2465,7 +2603,7 @@ __sleep_monitor_replication_lag:
}
for (unsigned int i=0;i<num_threads; i++) {
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
return NULL;
}
@ -2484,8 +2622,8 @@ void * MySQL_Monitor::run() {
mysql_thr->refresh_variables();
//if (!GloMTH) return NULL; // quick exit during shutdown/restart
__monitor_run:
while (queue.size()) { // this is a clean up in case Monitor was restarted
WorkItem* item = (WorkItem*)queue.remove();
while (queue->size()) { // this is a clean up in case Monitor was restarted
WorkItem* item = (WorkItem*)queue->remove();
if (item) {
if (item->mmsd) {
delete item->mmsd;
@ -2495,7 +2633,7 @@ __monitor_run:
}
ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*num_threads);
for (unsigned int i=0;i<num_threads; i++) {
threads[i] = new ConsumerThread(queue, 0);
threads[i] = new ConsumerThread(*queue, 0);
threads[i]->start(2048,false);
}
started_threads += num_threads;
@ -2546,7 +2684,7 @@ __monitor_run:
threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads);
started_threads += (num_threads - old_num_threads);
for (unsigned int i = old_num_threads ; i < num_threads ; i++) {
threads[i] = new ConsumerThread(queue, 0);
threads[i] = new ConsumerThread(*queue, 0);
threads[i]->start(2048,false);
}
}
@ -2554,10 +2692,10 @@ __monitor_run:
}
monitor_enabled=mysql_thread___monitor_enabled;
if ( rand()%5 == 0) { // purge once in a while
My_Conn_Pool->purge_idle_connections();
//My_Conn_Pool->purge_idle_connections();
}
usleep(200000);
int qsize=queue.size();
int qsize=queue->size();
if (qsize > mysql_thread___monitor_threads_queue_maxsize/4) {
proxy_warning("Monitor queue too big: %d\n", qsize);
unsigned int threads_max = (unsigned int)mysql_thread___monitor_threads_max;
@ -2572,14 +2710,14 @@ __monitor_run:
threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads);
started_threads += new_threads;
for (unsigned int i = old_num_threads ; i < num_threads ; i++) {
threads[i] = new ConsumerThread(queue, 0);
threads[i] = new ConsumerThread(*queue, 0);
threads[i]->start(2048,false);
}
}
}
// check again. Do we need also aux threads?
usleep(50000);
qsize=queue.size();
qsize=queue->size();
if (qsize > mysql_thread___monitor_threads_queue_maxsize) {
qsize=qsize/50;
unsigned int threads_max = (unsigned int)mysql_thread___monitor_threads_max;
@ -2592,7 +2730,7 @@ __monitor_run:
aux_threads = qsize;
started_threads += aux_threads;
for (int i=0; i<qsize; i++) {
threads_aux[i] = new ConsumerThread(queue, 245);
threads_aux[i] = new ConsumerThread(*queue, 245);
threads_aux[i]->start(2048,false);
}
for (int i=0; i<qsize; i++) {
@ -2607,7 +2745,7 @@ __monitor_run:
}
for (unsigned int i=0;i<num_threads; i++) {
WorkItem *item=NULL;
GloMyMon->queue.add(item);
GloMyMon->queue->add(item);
}
for (unsigned int i=0;i<num_threads; i++) {
threads[i]->join();

Loading…
Cancel
Save