From c3a435e2efff8635564c7c807e2f2f02b3c59542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Fri, 17 Feb 2017 10:40:50 +0000 Subject: [PATCH] Make idle threads optional #904 --- include/MySQL_Thread.h | 12 ++ include/proxy_defines.h | 2 + include/proxysql_glovars.hpp | 4 + lib/MySQL_Thread.cpp | 343 +++++++++++++++++++++++++---------- lib/ProxySQL_GloVars.cpp | 14 +- src/main.cpp | 16 +- 6 files changed, 297 insertions(+), 94 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index f450bab39..26993dfef 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -22,12 +22,14 @@ static unsigned int near_pow_2 (unsigned int n) { return i ? i : n; } +#ifdef IDLE_THREADS typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) _conn_exchange_t { pthread_mutex_t mutex_idles; PtrArray *idle_mysql_sessions; pthread_mutex_t mutex_resumes; PtrArray *resume_mysql_sessions; } conn_exchange_t; +#endif // IDLE_THREADS class ProxySQL_Poll { @@ -152,10 +154,12 @@ class MySQL_Thread PtrArray *cached_connections; +#ifdef IDLE_THREADS struct epoll_event events[MY_EPOLL_THREAD_MAXEVENTS]; int efd; unsigned int mysess_idx; std::map sessmap; +#endif // IDLE_THREADS protected: int nfds; @@ -168,10 +172,12 @@ class MySQL_Thread unsigned long long pre_poll_time; unsigned long long last_maintenance_time; PtrArray *mysql_sessions; +#ifdef IDLE_THREADS PtrArray *idle_mysql_sessions; PtrArray *resume_mysql_sessions; conn_exchange_t myexchange; +#endif // IDLE_THREADS int pipefd[2]; int shutdown; @@ -301,8 +307,10 @@ class MySQL_Threads_Handler int connect_timeout_server; int connect_timeout_server_max; int free_connections_pct; +#ifdef IDLE_THREADS int session_idle_ms; bool session_idle_show_processlist; +#endif // IDLE_THREADS bool sessions_sort; char *default_schema; char *interfaces; @@ -356,7 +364,9 @@ class MySQL_Threads_Handler } variables; unsigned int num_threads; proxysql_mysql_thread_t *mysql_threads; +#ifdef IDLE_THREADS proxysql_mysql_thread_t *mysql_threads_idles; +#endif // IDLE_THREADS unsigned int get_global_version(); void wrlock(); void wrunlock(); @@ -395,7 +405,9 @@ class MySQL_Threads_Handler unsigned long long get_queries_backends_bytes_recv(); unsigned long long get_queries_backends_bytes_sent(); unsigned int get_active_transations(); +#ifdef IDLE_THREADS unsigned int get_non_idle_client_connections(); +#endif // IDLE_THREADS unsigned long long get_query_processor_time(); unsigned long long get_backend_query_time(); unsigned long long get_mysql_backend_buffers_bytes(); diff --git a/include/proxy_defines.h b/include/proxy_defines.h index 20b0bfa1c..39f1c38f3 100644 --- a/include/proxy_defines.h +++ b/include/proxy_defines.h @@ -2,3 +2,5 @@ // If defined then active pthread mutex in ProxySQL_Admin else use the wrlock #define PA_PTHREAD_MUTEX +// If enabled, it adds support for auxiliary threads +#define IDLE_THREADS diff --git a/include/proxysql_glovars.hpp b/include/proxysql_glovars.hpp index cbbd046a5..8dc1b2428 100644 --- a/include/proxysql_glovars.hpp +++ b/include/proxysql_glovars.hpp @@ -2,6 +2,7 @@ #define __CLASS_PROXYSQL_GLOVARS_H #include "configfile.hpp" +#include "proxy_defines.h" namespace ez { class ezOptionParser; @@ -33,6 +34,9 @@ class ProxySQL_GlobalVariables { #ifdef SO_REUSEPORT bool reuseport; #endif /* SO_REUSEPORT */ +#ifdef IDLE_THREADS + bool idle_threads; +#endif /* IDLE_THREADS */ pthread_mutex_t start_mutex; bool foreground; #ifdef DEBUG diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 927abfb05..bd8e91007 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -204,7 +204,9 @@ static char * mysql_thread_variables_names[]= { (char *)"eventslog_filesize", (char *)"default_charset", (char *)"free_connections_pct", +#ifdef IDLE_THREADS (char *)"session_idle_ms", +#endif // IDLE_THREADS (char *)"have_compress", (char *)"client_found_rows", (char *)"interfaces", @@ -254,7 +256,9 @@ static char * mysql_thread_variables_names[]= { (char *)"server_capabilities", (char *)"server_version", (char *)"sessions_sort", +#ifdef IDLE_THREADS (char *)"session_idle_show_processlist", +#endif // IDLE_THREADS (char *)"commands_stats", (char *)"query_digests", (char *)"query_digests_lowercase", @@ -284,7 +288,9 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { } num_threads=0; mysql_threads=NULL; +#ifdef IDLE_THREADS mysql_threads_idles=NULL; +#endif // IDLE_THREADS stacksize=0; shutdown_=0; spinlock_rwlock_init(&rwlock); @@ -297,7 +303,6 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.connect_timeout_server=1000; variables.connect_timeout_server_max=10000; variables.free_connections_pct=10; - variables.session_idle_ms=1000; variables.connect_retries_delay=1; variables.monitor_enabled=true; variables.monitor_history=600000; @@ -357,7 +362,10 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.query_digests=true; variables.query_digests_lowercase=false; variables.sessions_sort=true; +#ifdef IDLE_THREADS + variables.session_idle_ms=1000; variables.session_idle_show_processlist=false; +#endif // IDLE_THREADS variables.servers_stats=true; variables.default_reconnect=true; variables.ssl_p2s_ca=NULL; @@ -568,7 +576,9 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"long_query_time")) return (int)variables.long_query_time; if (!strcasecmp(name,"query_cache_size_MB")) return (int)variables.query_cache_size_MB; if (!strcasecmp(name,"free_connections_pct")) return (int)variables.free_connections_pct; +#ifdef IDLE_THREADS if (!strcasecmp(name,"session_idle_ms")) return (int)variables.session_idle_ms; +#endif // IDLE_THREADS if (!strcasecmp(name,"ping_interval_server_msec")) return (int)variables.ping_interval_server_msec; if (!strcasecmp(name,"ping_timeout_server")) return (int)variables.ping_timeout_server; if (!strcasecmp(name,"have_compress")) return (int)variables.have_compress; @@ -580,7 +590,9 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"query_digests")) return (int)variables.query_digests; if (!strcasecmp(name,"query_digests_lowercase")) return (int)variables.query_digests_lowercase; if (!strcasecmp(name,"sessions_sort")) return (int)variables.sessions_sort; +#ifdef IDLE_THREADS if (!strcasecmp(name,"session_idle_show_processlist")) return (int)variables.session_idle_show_processlist; +#endif // IDLE_THREADS if (!strcasecmp(name,"servers_stats")) return (int)variables.servers_stats; if (!strcasecmp(name,"default_reconnect")) return (int)variables.default_reconnect; if (!strcasecmp(name,"poll_timeout")) return variables.poll_timeout; @@ -755,10 +767,12 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f sprintf(intbuf,"%d",variables.free_connections_pct); return strdup(intbuf); } +#ifdef IDLE_THREADS if (!strcasecmp(name,"session_idle_ms")) { sprintf(intbuf,"%d",variables.session_idle_ms); return strdup(intbuf); } +#endif // IDLE_THREADS if (!strcasecmp(name,"connect_retries_delay")) { sprintf(intbuf,"%d",variables.connect_retries_delay); return strdup(intbuf); @@ -895,9 +909,11 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f if (!strcasecmp(name,"sessions_sort")) { return strdup((variables.sessions_sort ? "true" : "false")); } +#ifdef IDLE_THREADS if (!strcasecmp(name,"session_idle_show_processlist")) { return strdup((variables.session_idle_show_processlist ? "true" : "false")); } +#endif // IDLE_THREADS if (!strcasecmp(name,"servers_stats")) { return strdup((variables.servers_stats ? "true" : "false")); } @@ -1158,6 +1174,7 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t return false; } } +#ifdef IDLE_THREADS if (!strcasecmp(name,"session_idle_ms")) { int intv=atoi(value); if (intv >= 100 && intv <= 3600*1000) { @@ -1167,6 +1184,7 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t return false; } } +#endif // IDLE_THREADS if (!strcasecmp(name,"max_connections")) { int intv=atoi(value); if (intv >= 1 && intv <= 1000*1000) { @@ -1634,6 +1652,7 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t } return false; } +#ifdef IDLE_THREADS if (!strcasecmp(name,"session_idle_show_processlist")) { if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { variables.session_idle_show_processlist=true; @@ -1645,6 +1664,7 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t } return false; } +#endif // IDLE_THREADS if (!strcasecmp(name,"sessions_sort")) { if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { variables.sessions_sort=true; @@ -1721,14 +1741,18 @@ void MySQL_Threads_Handler::init(unsigned int num, size_t stack) { int rc=pthread_attr_setstacksize(&attr, stacksize); assert(rc==0); mysql_threads=(proxysql_mysql_thread_t *)malloc(sizeof(proxysql_mysql_thread_t)*num_threads); +#ifdef IDLE_THREADS mysql_threads_idles=(proxysql_mysql_thread_t *)malloc(sizeof(proxysql_mysql_thread_t)*num_threads); +#endif // IDLE_THREADS } proxysql_mysql_thread_t * MySQL_Threads_Handler::create_thread(unsigned int tn, void *(*start_routine) (void *), bool idles) { if (idles==false) { pthread_create(&mysql_threads[tn].thread_id, &attr, start_routine , &mysql_threads[tn]); +#ifdef IDLE_THREADS } else { pthread_create(&mysql_threads_idles[tn].thread_id, &attr, start_routine , &mysql_threads_idles[tn]); +#endif // IDLE_THREADS } return NULL; } @@ -1741,16 +1765,24 @@ void MySQL_Threads_Handler::shutdown_threads() { if (mysql_threads[i].worker) mysql_threads[i].worker->shutdown=1; } - for (i=0; ishutdown=1; +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + for (i=0; ishutdown=1; + } } +#endif /* IDLE_THREADS */ signal_all_threads(1); for (i=0; ilen) { - MySQL_Session *sess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); - delete sess; - } - delete idle_mysql_sessions; - } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + if (idle_mysql_sessions) { + while(idle_mysql_sessions->len) { + MySQL_Session *sess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); + delete sess; + } + delete idle_mysql_sessions; + } - if (resume_mysql_sessions) { - while(resume_mysql_sessions->len) { - MySQL_Session *sess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0); - delete sess; - } - delete resume_mysql_sessions; - } + if (resume_mysql_sessions) { + while(resume_mysql_sessions->len) { + MySQL_Session *sess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0); + delete sess; + } + delete resume_mysql_sessions; + } - if (myexchange.idle_mysql_sessions) { - while(myexchange.idle_mysql_sessions->len) { - MySQL_Session *sess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); - delete sess; - } - delete myexchange.idle_mysql_sessions; - } + if (myexchange.idle_mysql_sessions) { + while(myexchange.idle_mysql_sessions->len) { + MySQL_Session *sess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); + delete sess; + } + delete myexchange.idle_mysql_sessions; + } - if (myexchange.resume_mysql_sessions) { - while(myexchange.resume_mysql_sessions->len) { - MySQL_Session *sess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); - delete sess; - } - delete myexchange.resume_mysql_sessions; + if (myexchange.resume_mysql_sessions) { + while(myexchange.resume_mysql_sessions->len) { + MySQL_Session *sess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); + delete sess; + } + delete myexchange.resume_mysql_sessions; + } } +#endif // IDLE_THREADS if (cached_connections) { while(cached_connections->len) { @@ -1906,18 +1946,21 @@ MySQL_Session * MySQL_Thread::create_new_session_and_client_data_stream(int _fd) bool MySQL_Thread::init() { int i; mysql_sessions = new PtrArray(); + cached_connections = new PtrArray(); + assert(mysql_sessions); + +#ifdef IDLE_THREADS idle_mysql_sessions = new PtrArray(); resume_mysql_sessions = new PtrArray(); - cached_connections = new PtrArray(); myexchange.idle_mysql_sessions = new PtrArray(); myexchange.resume_mysql_sessions = new PtrArray(); pthread_mutex_init(&myexchange.mutex_idles,NULL); pthread_mutex_init(&myexchange.mutex_resumes,NULL); - - assert(mysql_sessions); assert(idle_mysql_sessions); assert(resume_mysql_sessions); +#endif // IDLE_THREADS + shutdown=0; my_idle_conns=(MySQL_Connection **)malloc(sizeof(MySQL_Connection *)*SESSIONS_FOR_CONNECTIONS_HANDLER); memset(my_idle_conns,0,sizeof(MySQL_Connection *)*SESSIONS_FOR_CONNECTIONS_HANDLER); @@ -1976,6 +2019,7 @@ void MySQL_Thread::run() { unsigned int n; int rc; +#ifdef IDLE_THREADS bool idle_maintenance_thread=epoll_thread; if (idle_maintenance_thread) { // we check if it is the first time we are called @@ -1989,6 +2033,7 @@ void MySQL_Thread::run() { epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event); } } +#endif // IDLE_THREADS curtime=monotonic_time(); @@ -1996,9 +2041,11 @@ void MySQL_Thread::run() { while (shutdown==0) { +#ifdef IDLE_THREADS if (idle_maintenance_thread) { goto __run_skip_1; } +#endif // IDLE_THREADS int num_idles; if (processing_idles==true && (last_processing_idles < curtime-mysql_thread___ping_timeout_server*1000)) { @@ -2036,6 +2083,7 @@ void MySQL_Thread::run() { last_processing_idles=curtime; } +#ifdef IDLE_THREADS __run_skip_1: if (idle_maintenance_thread) { @@ -2058,11 +2106,14 @@ __run_skip_1: pthread_mutex_unlock(&myexchange.mutex_idles); goto __run_skip_1a; } +#endif // IDLE_THREADS for (n = 0; n < mypolls.len; n++) { MySQL_Data_Stream *myds=NULL; myds=mypolls.myds[n]; mypolls.fds[n].revents=0; if (myds) { +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { // here we try to move it to the maintenance thread if (myds->myds_type==MYDS_FRONTEND && myds->sess) { if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) { @@ -2103,6 +2154,8 @@ __run_skip_1: } } } + } +#endif // IDLE_THREADS if (myds->wait_until) { if (myds->wait_until > curtime) { if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) { @@ -2135,43 +2188,50 @@ __run_skip_1: proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); } - if (idle_maintenance_thread==false) { - int r=rand()%(GloMTH->num_threads); - MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker; - if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { - unsigned int ims=0; - pthread_mutex_lock(&thr->myexchange.mutex_idles); - bool empty_queue=true; - if (thr->myexchange.idle_mysql_sessions->len) { - // there are already sessions in the queues. We assume someone already notified worker 0 - empty_queue=false; - } - for (ims=0; imslen; ims++) { - MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); - thr->myexchange.idle_mysql_sessions->add(mysess); - } - pthread_mutex_unlock(&thr->myexchange.mutex_idles); - if (empty_queue==true) { - unsigned char c=1; - int fd=thr->pipefd[1]; - if (write(fd,&c,1)==-1) { - //proxy_error("Error while signaling maintenance thread\n"); +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + if (idle_maintenance_thread==false) { + int r=rand()%(GloMTH->num_threads); + MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker; + if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { + unsigned int ims=0; + pthread_mutex_lock(&thr->myexchange.mutex_idles); + bool empty_queue=true; + if (thr->myexchange.idle_mysql_sessions->len) { + // there are already sessions in the queues. We assume someone already notified worker 0 + empty_queue=false; + } + for (ims=0; imslen; ims++) { + MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); + thr->myexchange.idle_mysql_sessions->add(mysess); + } + pthread_mutex_unlock(&thr->myexchange.mutex_idles); + if (empty_queue==true) { + unsigned char c=1; + int fd=thr->pipefd[1]; + if (write(fd,&c,1)==-1) { + //proxy_error("Error while signaling maintenance thread\n"); + } } } - } - pthread_mutex_lock(&myexchange.mutex_resumes); - if (myexchange.resume_mysql_sessions->len) { - //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; - while (myexchange.resume_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); - register_session(mysess, false); - MySQL_Data_Stream *myds=mysess->client_myds; - mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + pthread_mutex_lock(&myexchange.mutex_resumes); + if (myexchange.resume_mysql_sessions->len) { + //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; + while (myexchange.resume_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); + register_session(mysess, false); + MySQL_Data_Stream *myds=mysess->client_myds; + mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + } } + pthread_mutex_unlock(&myexchange.mutex_resumes); } - pthread_mutex_unlock(&myexchange.mutex_resumes); } + + __run_skip_1a: +#endif // IDLE_THREADS + spin_wrunlock(&thread_mutex); while ((n=__sync_add_and_fetch(&mypolls.pending_listener_add,0))) { // spin here @@ -2191,17 +2251,21 @@ __run_skip_1a: GloMyLogger->flush(); pre_poll_time=curtime; - if (idle_maintenance_thread) { +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads && idle_maintenance_thread) { memset(events,0,sizeof(struct epoll_event)*MY_EPOLL_THREAD_MAXEVENTS); // let's make valgrind happy. It also seems that needs to be zeroed anyway // we call epoll() rc = epoll_wait (efd, events, MY_EPOLL_THREAD_MAXEVENTS, mysql_thread___poll_timeout); } else { +#endif // IDLE_THREADS //this is the only portion of code not protected by a global mutex proxy_debug(PROXY_DEBUG_NET,5,"Calling poll with timeout %d\n", ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 > (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ); // poll is called with a timeout of mypolls.poll_timeout if set , or mysql_thread___poll_timeout rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ); proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Returning poll"); +#ifdef IDLE_THREADS } +#endif // IDLE_THREADS while ((n=__sync_add_and_fetch(&mypolls.pending_listener_del,0))) { // spin here poll_listener_del(n); @@ -2212,15 +2276,20 @@ __run_skip_1a: mypolls.poll_timeout=0; // always reset this to 0 . If a session needs a specific timeout, it will set this one curtime=monotonic_time(); - if (idle_maintenance_thread==false && (curtime >= pre_poll_time + mypolls.poll_timeout)) { - poll_timeout_bool=true; - } else { - poll_timeout_bool=false; - } + poll_timeout_bool=false; + if ( +#ifdef IDLE_THREADS + idle_maintenance_thread==false && +#endif // IDLE_THREADS + (curtime >= pre_poll_time + mypolls.poll_timeout)) { + poll_timeout_bool=true; + } unsigned int maintenance_interval = 1000000; // hardcoded value for now +#ifdef IDLE_THREADS if (idle_maintenance_thread) { maintenance_interval=maintenance_interval*2; } +#endif // IDLE_THREADS if (curtime > last_maintenance_time + maintenance_interval) { last_maintenance_time=curtime; maintenance_loop=true; @@ -2248,15 +2317,20 @@ __run_skip_1a: refresh_variables(); } +#ifdef IDLE_THREADS if (idle_maintenance_thread==false) { +#endif // IDLE_THREADS for (n=0; nlen; n++) { MySQL_Session *_sess=(MySQL_Session *)mysql_sessions->index(n); _sess->to_process=0; } +#ifdef IDLE_THREADS } +#endif // IDLE_THREADS +#ifdef IDLE_THREADS // here we handle epoll_wait() - if (idle_maintenance_thread) { + if (GloVars.global.idle_threads && idle_maintenance_thread) { if (rc) { int i; for (i=0; inum_threads); MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; if (resume_mysql_sessions->len) { @@ -2446,16 +2522,20 @@ __run_skip_2: VALGRIND_ENABLE_ERROR_REPORTING; } } else { +#endif // IDLE_THREADS // iterate through all sessions and process the session logic process_all_sessions(); return_local_connections(); +#ifdef IDLE_THREADS } +#endif // IDLE_THREADS } } bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { if (mypolls.fds[n].revents) { +#ifdef IDLE_THREADS if (myds->myds_type==MYDS_FRONTEND) { if (epoll_thread) { mypolls.remove_index_fast(n); @@ -2473,6 +2553,7 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned } } } +#endif // IDLE_THREADS mypolls.last_recv[n]=curtime; myds->revents=mypolls.fds[n].revents; myds->sess->to_process=1; @@ -2534,12 +2615,16 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned void MySQL_Thread::process_all_sessions() { unsigned int n; unsigned int total_active_transactions_=0; +#ifdef IDLE_THREADS bool idle_maintenance_thread=epoll_thread; +#endif // IDLE_THREADS int rc; bool sess_sort=mysql_thread___sessions_sort; +#ifdef IDLE_THREADS if (idle_maintenance_thread) { sess_sort=false; } +#endif // IDLE_THREADS if (sess_sort && mysql_sessions->len > 3) { unsigned int a=0; for (n=0; nlen; n++) { @@ -2577,7 +2662,10 @@ void MySQL_Thread::process_all_sessions() { if (maintenance_loop) { unsigned int numTrx=0; unsigned long long sess_time = sess->IdleTime(); - if (idle_maintenance_thread==false) { +#ifdef IDLE_THREADS + if (idle_maintenance_thread==false) +#endif // IDLE_THREADS + { sess->to_process=1; if ( (sess_time/1000 > (unsigned long long)mysql_thread___max_transaction_time) || (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) ) { numTrx = sess->NumActiveTransactions(); @@ -2589,12 +2677,16 @@ void MySQL_Thread::process_all_sessions() { if (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) sess->killed=true; } } - } else { + } +#ifdef IDLE_THREADS + else + { if ( (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) ) { sess->killed=true; sess->to_process=1; } } +#endif // IDLE_THREADS } if (sess->healthy==0) { unregister_session(n); @@ -2660,7 +2752,9 @@ void MySQL_Thread::refresh_variables() { mysql_thread___connect_timeout_server=GloMTH->get_variable_int((char *)"connect_timeout_server"); mysql_thread___connect_timeout_server_max=GloMTH->get_variable_int((char *)"connect_timeout_server_max"); mysql_thread___free_connections_pct=GloMTH->get_variable_int((char *)"free_connections_pct"); +#ifdef IDLE_THREADS mysql_thread___session_idle_ms=GloMTH->get_variable_int((char *)"session_idle_ms"); +#endif // IDLE_THREADS mysql_thread___connect_retries_delay=GloMTH->get_variable_int((char *)"connect_retries_delay"); if (mysql_thread___monitor_username) free(mysql_thread___monitor_username); @@ -2721,7 +2815,9 @@ void MySQL_Thread::refresh_variables() { mysql_thread___query_digests=(bool)GloMTH->get_variable_int((char *)"query_digests"); mysql_thread___query_digests_lowercase=(bool)GloMTH->get_variable_int((char *)"query_digests_lowercase"); mysql_thread___sessions_sort=(bool)GloMTH->get_variable_int((char *)"sessions_sort"); +#ifdef IDLE_THREADS mysql_thread___session_idle_show_processlist=(bool)GloMTH->get_variable_int((char *)"session_idle_show_processlist"); +#endif // IDLE_THREADS mysql_thread___servers_stats=(bool)GloMTH->get_variable_int((char *)"servers_stats"); mysql_thread___default_reconnect=(bool)GloMTH->get_variable_int((char *)"default_reconnect"); #ifdef DEBUG @@ -2731,17 +2827,19 @@ void MySQL_Thread::refresh_variables() { } MySQL_Thread::MySQL_Thread() { - efd=-1; - epoll_thread=false; spinlock_rwlock_init(&thread_mutex); - mysess_idx=0; my_idle_conns=NULL; cached_connections=NULL; mysql_sessions=NULL; +#ifdef IDLE_THREADS + efd=-1; + epoll_thread=false; + mysess_idx=0; idle_mysql_sessions=NULL; resume_mysql_sessions=NULL; myexchange.idle_mysql_sessions=NULL; myexchange.resume_mysql_sessions=NULL; +#endif // IDLE_THREADS processing_idles=false; last_processing_idles=0; __thread_MySQL_Thread_Variables_version=0; @@ -2914,12 +3012,14 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus() { pta[1]=buf; result->add_row(pta); } +#ifdef IDLE_THREADS { // Connections non idle pta[0]=(char *)"Client_Connections_non_idle"; sprintf(buf,"%u",get_non_idle_client_connections()); pta[1]=buf; result->add_row(pta); } +#endif // IDLE_THREADS { // Queries bytes recv pta[0]=(char *)"Queries_backends_bytes_recv"; sprintf(buf,"%llu",get_queries_backends_bytes_recv()); @@ -3109,29 +3209,48 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus() { void MySQL_Threads_Handler::Get_Memory_Stats() { unsigned int i; + unsigned int j; signal_all_threads(1); MySQL_Thread *thr=NULL; - for (i=0;ithread_mutex); } - for (i=0;iGet_Memory_Stats(); } - for (i=0;ithread_mutex); } @@ -3157,24 +3276,40 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() { result->add_column_definition(SQLITE_TEXT,"time_ms"); result->add_column_definition(SQLITE_TEXT,"info"); unsigned int i; + unsigned int i2; signal_all_threads(1); MySQL_Thread *thr=NULL; - for (i=0;ithread_mutex); +#ifdef IDLE_THREADS } else { if (mysql_thread___session_idle_show_processlist) { thr=(MySQL_Thread *)mysql_threads_idles[i-num_threads].worker; spin_wrlock(&thr->thread_mutex); } +#endif // IDLE_THREADS } } - for (i=0;i < ( mysql_thread___session_idle_show_processlist ? num_threads*2 : num_threads); i++) { +#ifdef IDLE_THREADS + for (i=0;i < ( (mysql_thread___session_idle_show_processlist && GloVars.global.idle_threads ) ? num_threads*2 : num_threads); i++) { +#else + for (i=0;i < num_threads; i++) { +#endif // IDLE_THREADS if (imysql_sessions->len; j++) { @@ -3348,15 +3483,21 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() { } } } - for (i=0;ithread_mutex); +#ifdef IDLE_THREADS } else { if (mysql_thread___session_idle_show_processlist) { thr=(MySQL_Thread *)mysql_threads_idles[i-num_threads].worker; spin_wrunlock(&thr->thread_mutex); } +#endif // IDLE_THREADS } } return result; @@ -3373,6 +3514,8 @@ void MySQL_Threads_Handler::signal_all_threads(unsigned char _c) { proxy_error("Error during write in signal_all_threads()\n"); } } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) for (i=0;ipipefd[1]; @@ -3380,6 +3523,7 @@ void MySQL_Threads_Handler::signal_all_threads(unsigned char _c) { proxy_error("Error during write in signal_all_threads()\n"); } } +#endif // IDLE_THREADS } bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) { @@ -3390,10 +3534,13 @@ bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) { MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker; spin_wrlock(&thr->thread_mutex); } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) for (i=0;ithread_mutex); } +#endif // IDLE_THREADS for (i=0;ithread_mutex); } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) for (i=0;ithread_mutex); } +#endif // IDLE_THREADS return ret; } @@ -3534,6 +3687,7 @@ unsigned int MySQL_Threads_Handler::get_active_transations() { return q; } +#ifdef IDLE_THREADS unsigned int MySQL_Threads_Handler::get_non_idle_client_connections() { unsigned long long q=0; unsigned int i; @@ -3546,6 +3700,7 @@ unsigned int MySQL_Threads_Handler::get_non_idle_client_connections() { } return q; } +#endif // IDLE_THREADS unsigned long long MySQL_Threads_Handler::get_query_processor_time() { unsigned long long q=0; @@ -3596,6 +3751,8 @@ unsigned long long MySQL_Threads_Handler::get_mysql_frontend_buffers_bytes() { q+=__sync_fetch_and_add(&thr->status_variables.mysql_frontend_buffers_bytes,0); } } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) for (i=0;istatus_variables.mysql_frontend_buffers_bytes,0); } } +#endif // IDLE_THREADS return q; } @@ -3615,11 +3773,14 @@ unsigned long long MySQL_Threads_Handler::get_mysql_session_internal_bytes() { if (thr) q+=__sync_fetch_and_add(&thr->status_variables.mysql_session_internal_bytes,0); } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) if (mysql_threads_idles) { MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker; if (thr) q+=__sync_fetch_and_add(&thr->status_variables.mysql_session_internal_bytes,0); } +#endif // IDLE_THREADS } return q; } diff --git a/lib/ProxySQL_GloVars.cpp b/lib/ProxySQL_GloVars.cpp index 80e41ffe4..c66714e76 100644 --- a/lib/ProxySQL_GloVars.cpp +++ b/lib/ProxySQL_GloVars.cpp @@ -55,6 +55,9 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() { #ifdef SO_REUSEPORT global.reuseport=false; #endif /* SO_REUSEPORT */ +#ifdef IDLE_THREADS + global.idle_threads=false; +#endif /* IDLE_THREADS */ // global.use_proxysql_mem=false; pthread_mutex_init(&global.start_mutex,NULL); #ifdef DEBUG @@ -83,6 +86,9 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() { opt->add((const char *)"",0,1,0,(const char *)"Datadir",(const char *)"-D",(const char *)"--datadir"); opt->add((const char *)"",0,0,0,(const char *)"Rename/empty database file",(const char *)"--initial"); opt->add((const char *)"",0,0,0,(const char *)"Merge config file into database file",(const char *)"--reload"); +#ifdef IDLE_THREADS + opt->add((const char *)"",0,0,0,(const char *)"Create auxiliary threads to handle idle connections",(const char *)"--idle-threads"); +#endif /* IDLE_THREADS */ opt->add((const char *)"",0,1,0,(const char *)"Administration Unix Socket",(const char *)"-S",(const char *)"--admin-socket"); confFile=new ProxySQL_ConfigFile(); @@ -144,7 +150,13 @@ void ProxySQL_GlobalVariables::process_opts_pre() { if (opt->isSet("--reload")) { __cmd_proxysql_reload=true; } - + +#ifdef IDLE_THREADS + if (opt->isSet("--idle-threads")) { + global.idle_threads=true; + } +#endif /* IDLE_THREADS */ + config_file=GloVars.__cmd_proxysql_config_file; if (config_file==NULL) { diff --git a/src/main.cpp b/src/main.cpp index 5d676b2ce..bd0f2e03e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -173,6 +173,7 @@ void * mysql_worker_thread_func(void *arg) { return NULL; } +#ifdef IDLE_THREADS void * mysql_worker_thread_func_idles(void *arg) { // __thr_sfp=l_mem_init(); @@ -192,6 +193,7 @@ void * mysql_worker_thread_func_idles(void *arg) { // l_mem_destroy(__thr_sfp); return NULL; } +#endif // IDLE_THREADS void * mysql_shared_query_cache_funct(void *arg) { GloQC->purgeHash_thread(NULL); @@ -294,10 +296,20 @@ void ProxySQL_Main_init_Query_module() { void ProxySQL_Main_init_MySQL_Threads_Handler_module() { unsigned int i; GloMTH->init(); - load_ = GloMTH->num_threads * 2 + 1; + load_ = 1; + load_ += GloMTH->num_threads; +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + load_ += GloMTH->num_threads; + } +#endif // IDLE_THREADS for (i=0; inum_threads; i++) { GloMTH->create_thread(i,mysql_worker_thread_func, false); - GloMTH->create_thread(i,mysql_worker_thread_func_idles, true); +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + GloMTH->create_thread(i,mysql_worker_thread_func_idles, true); + } +#endif // IDLE_THREADS } }