From 159176627debaa59218e2552ac733f56849d7823 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Wed, 9 Nov 2016 08:16:01 +0000 Subject: [PATCH 1/2] Thread to reset connections --- include/MySQL_HostGroups_Manager.h | 7 ++ include/MySQL_Thread.h | 6 +- include/mysql_connection.h | 1 + lib/MySQL_HostGroups_Manager.cpp | 108 ++++++++++++++++++++++++++--- lib/mysql_connection.cpp | 7 ++ 5 files changed, 117 insertions(+), 12 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 9da9d6684..86b71d394 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -3,6 +3,10 @@ #include "proxysql.h" #include "cpp.h" +#include + +#include "thread.h" +#include "wqueue.h" /* Enabling STRESSTEST_POOL ProxySQL will do a lot of loops in the connection pool @@ -139,6 +143,8 @@ class MySQL_HostGroups_Manager { void generate_mysql_replication_hostgroups_table(); SQLite3_result *incoming_replication_hostgroups; + std::thread *HGCU_thread; + public: struct { unsigned int servers_table_version; @@ -160,6 +166,7 @@ class MySQL_HostGroups_Manager { unsigned long long commit_cnt_filtered; unsigned long long rollback_cnt_filtered; } status; + wqueue queue; MySQL_HostGroups_Manager(); ~MySQL_HostGroups_Manager(); // void rdlock(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index aebf876a1..7b5ab51be 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -280,6 +280,9 @@ class MySQL_Threads_Handler size_t stacksize; pthread_attr_t attr; rwlock_t rwlock; + PtrArray *bind_fds; + MySQL_Listeners_Manager *MLM; + public: struct { int monitor_history; int monitor_connect_interval; @@ -356,9 +359,6 @@ class MySQL_Threads_Handler char * ssl_p2s_cipher; int query_cache_size_MB; } variables; - PtrArray *bind_fds; - MySQL_Listeners_Manager *MLM; - public: unsigned int num_threads; proxysql_mysql_thread_t *mysql_threads; proxysql_mysql_thread_t *mysql_threads_idles; diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 1ad8d0c4c..b94584130 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -154,5 +154,6 @@ class MySQL_Connection { void set_is_client(); // used for local_stmts + void reset(); }; #endif /* __CLASS_MYSQL_CONNECTION_H */ diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 0647a449c..2ca49352b 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -4,6 +4,10 @@ #define char_malloc (char *)malloc #define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); } +#include "thread.h" +#include "wqueue.h" + + //#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0, PRIMARY KEY (hostgroup_id, hostname, port) )" @@ -19,6 +23,76 @@ class MySrvList; class MyHGC; + +static void * HGCU_thread_run() { + PtrArray *conn_array=new PtrArray(); + while(1) { + MySQL_Connection *myconn=(MySQL_Connection *)MyHGM->queue.remove(); + if (myconn==NULL) { + // intentionally exit immediately + return NULL; + } + conn_array->add(myconn); + while (MyHGM->queue.size()) { + myconn=(MySQL_Connection *)MyHGM->queue.remove(); + if (myconn==NULL) return NULL; + conn_array->add(myconn); + } + unsigned int l=conn_array->len; + int *errs=(int *)malloc(sizeof(int)*l); + int *statuses=(int *)malloc(sizeof(int)*l); + my_bool *ret=(my_bool *)malloc(sizeof(my_bool)*l); + int i; + for (i=0;i<(int)l;i++) { + myconn=(MySQL_Connection *)conn_array->index(i); + statuses[i]=mysql_change_user_start(&ret[i], myconn->mysql, myconn->userinfo->password, myconn->userinfo->password, myconn->userinfo->schemaname); + } + for (i=0;i<(int)conn_array->len;i++) { + if (statuses[i]==0) { + myconn=(MySQL_Connection *)conn_array->remove_index_fast(i); + if (!ret[i]) { + myconn->reset(); + MyHGM->push_MyConn_to_pool(myconn); + } else { + myconn->send_quit=false; + MyHGM->destroy_MyConn_from_pool(myconn); + } + i--; + } + } + unsigned long long now=monotonic_time(); + while (conn_array->len && ((monotonic_time() - now) < 1000000)) { + usleep(50); + for (i=0;i<(int)conn_array->len;i++) { + myconn=(MySQL_Connection *)conn_array->index(i); + statuses[i]=mysql_change_user_cont(&ret[i], myconn->mysql, statuses[i]); + } + for (i=0;i<(int)conn_array->len;i++) { + if (statuses[i]==0) { + myconn=(MySQL_Connection *)conn_array->remove_index_fast(i); + if (!ret[i]) { + myconn->reset(); + MyHGM->push_MyConn_to_pool(myconn); + } else { + myconn->send_quit=false; + MyHGM->destroy_MyConn_from_pool(myconn); + } + i--; + } + } + } + while (conn_array->len) { + // we reached here, and there are still connections + myconn=(MySQL_Connection *)conn_array->remove_index_fast(0); + myconn->send_quit=false; + MyHGM->destroy_MyConn_from_pool(myconn); + } + free(statuses); + free(errs); + } +} + + MySQL_Connection *MySrvConnList::index(unsigned int _k) { return (MySQL_Connection *)conns->index(_k); } @@ -293,6 +367,7 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() { mydb->execute(MYHGM_MYSQL_REPLICATION_HOSTGROUPS); MyHostGroups=new PtrArray(); incoming_replication_hostgroups=NULL; + HGCU_thread = new std::thread(&HGCU_thread_run); } MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() { @@ -305,6 +380,8 @@ MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() { if (admindb) { delete admindb; } + queue.add(NULL); + HGCU_thread->join(); #ifdef MHM_PTHREAD_MUTEX pthread_mutex_destroy(&lock); #endif @@ -661,14 +738,16 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lo mysrvc=(MySrvC *)c->parent; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status); mysrvc->ConnectionsUsed->remove(c); - if (c->largest_query_length > (unsigned int)mysql_thread___threshold_query_length) { + //if (c->largest_query_length > (unsigned int)mysql_thread___threshold_query_length) { + if (c->largest_query_length > (unsigned int)GloMTH->variables.threshold_query_length) { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d . largest_query_length = %lu\n", c, mysrvc->address, mysrvc->port, mysrvc->status, c->largest_query_length); delete c; goto __exit_push_MyConn_to_pool; } if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { if (c->async_state_machine==ASYNC_IDLE) { - if (c->local_stmts->get_num_entries() > mysql_thread___max_stmts_per_connection) { + //if (c->local_stmts->get_num_entries() > mysql_thread___max_stmts_per_connection) { + if (c->local_stmts->get_num_entries() > (unsigned int)GloMTH->variables.max_stmts_per_connection) { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d because has too many prepared statements\n", c, mysrvc->address, mysrvc->port, mysrvc->status); delete c; } else { @@ -897,14 +976,25 @@ MySQL_Connection * MySQL_HostGroups_Manager::get_MyConn_from_pool(unsigned int _ } void MySQL_HostGroups_Manager::destroy_MyConn_from_pool(MySQL_Connection *c) { - wrlock(); + bool to_del=true; // the default, legacy behavior MySrvC *mysrvc=(MySrvC *)c->parent; - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port); - mysrvc->ConnectionsUsed->remove(c); - status.myconnpoll_destroy++; - //status.server_connections_connected--; - wrunlock(); - delete c; + if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && c->send_quit) { + // overall, the backend seems healthy and so it is the connection. Try to reset it + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Trying to reset MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port); + to_del=false; + c->userinfo->set(mysql_thread___monitor_username,mysql_thread___monitor_password,mysql_thread___default_schema,NULL); + queue.add(c); + } else { + // we lock only this part of the code because we need to remove the connection from ConnectionsUsed + wrlock(); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port); + mysrvc->ConnectionsUsed->remove(c); + status.myconnpoll_destroy++; + wrunlock(); + } + if (to_del) { + delete c; + } } diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index b145a057f..153fcd912 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -1587,3 +1587,10 @@ int MySQL_Connection::async_send_simple_command(short event, char *stmt, unsigne } return 1; } + +void MySQL_Connection::reset() { + status_flags=0; + reusable=true; + delete local_stmts; + local_stmts=new MySQL_STMTs_local(false); +} From 226f71d4c3c6c49e9f34e07632f6762b4aee096d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Wed, 9 Nov 2016 19:28:41 +0000 Subject: [PATCH 2/2] Connections are sent to purge thread if queue<100 --- lib/MySQL_HostGroups_Manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 2ca49352b..b9d61bd7e 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -978,7 +978,7 @@ MySQL_Connection * MySQL_HostGroups_Manager::get_MyConn_from_pool(unsigned int _ void MySQL_HostGroups_Manager::destroy_MyConn_from_pool(MySQL_Connection *c) { bool to_del=true; // the default, legacy behavior MySrvC *mysrvc=(MySrvC *)c->parent; - if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && c->send_quit) { + if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && c->send_quit && queue.size() < 100) { // overall, the backend seems healthy and so it is the connection. Try to reset it proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Trying to reset MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port); to_del=false;