From 75606bb94db205f6fc5ea776f41969a61fb924b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 21:20:15 +0200 Subject: [PATCH] Simplifying MySQL_Thread::run() phase 1 --- include/MySQL_Session.h | 1 + include/MySQL_Thread.h | 8 +++ lib/MySQL_Session.cpp | 10 +++ lib/MySQL_Thread.cpp | 154 +++++++++++++++++++++++----------------- 4 files changed, 109 insertions(+), 64 deletions(-) diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index ecd2f8ffd..6c0b05706 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -263,6 +263,7 @@ class MySQL_Session void generate_proxysql_internal_session_json(json &); bool known_query_for_locked_on_hostgroup(uint64_t); void unable_to_parse_set_statement(bool *); + bool has_any_backend(); }; #define KILL_QUERY 1 diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index c69fb53f1..5ccf4be10 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -98,6 +98,14 @@ class MySQL_Thread Session_Regex **match_regexes; +#ifdef IDLE_THREADS + void worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr); + void worker_threads_get_sessions_from_idle_threads(); +#endif // IDLE_THREADS + + unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); + bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); + protected: int nfds; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 442697927..785554f65 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -7226,3 +7226,13 @@ void MySQL_Session::unable_to_parse_set_statement(bool *lock_hostgroup) { } } +bool MySQL_Session::has_any_backend() { + for (unsigned int j=0;j < mybes->len;j++) { + MySQL_Backend *tmp_mybe=(MySQL_Backend *)mybes->index(j); + MySQL_Data_Stream *__myds=tmp_mybe->server_myds; + if (__myds->myconn) { + return true; + } + } + return false; +} diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index e97132a0a..fec7aa866 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3879,39 +3879,9 @@ __mysql_thread_exit_add_mirror: // here we try to move it to the maintenance thread if (myds->myds_type==MYDS_FRONTEND && myds->sess) { if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) { - unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; - if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { - // make sure data stream has no pending data out and session is not throttled (#1939) - // because epoll thread does not handle data stream with data out - if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { - unsigned int j; - int conns=0; - for (j=0;jsess->mybes->len;j++) { - MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); - MySQL_Data_Stream *__myds=tmp_mybe->server_myds; - if (__myds->myconn) { - conns++; - } - } - unsigned long long idle_since = curtime - myds->sess->IdleTime(); - if (conns==0) { - mypolls.remove_index_fast(n); - myds->mypolls=NULL; - unsigned int i; - for (i=0;ilen;i++) { - MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); - if (mysess==myds->sess) { - mysess->thread=NULL; - unregister_session(i); - mysess->idle_since = idle_since; - idle_mysql_sessions->add(mysess); - break; - } - } - n--; // compensate mypolls.remove_index_fast(n) and n++ of loop - continue; - } - } + if (move_session_to_idle_mysql_sessions(myds, n)) { + n--; // compensate mypolls.remove_index_fast(n) and n++ of loop + continue; } } } @@ -3979,37 +3949,8 @@ __mysql_thread_exit_add_mirror: if (idle_maintenance_thread==false) { int r=rand()%(GloMTH->num_threads); MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker; - if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { - pthread_mutex_lock(&thr->myexchange.mutex_idles); - bool empty_queue=true; - if (thr->myexchange.idle_mysql_sessions->len) { - // there are already sessions in the queues. We assume someone already notified worker 0 - empty_queue=false; - } - while (idle_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); - thr->myexchange.idle_mysql_sessions->add(mysess); - } - pthread_mutex_unlock(&thr->myexchange.mutex_idles); - if (empty_queue==true) { - unsigned char c=1; - int fd=thr->pipefd[1]; - if (write(fd,&c,1)==-1) { - //proxy_error("Error while signaling maintenance thread\n"); - } - } - } - pthread_mutex_lock(&myexchange.mutex_resumes); - if (myexchange.resume_mysql_sessions->len) { - //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; - while (myexchange.resume_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); - register_session(mysess, false); - MySQL_Data_Stream *myds=mysess->client_myds; - mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); - } - } - pthread_mutex_unlock(&myexchange.mutex_resumes); + worker_thread_assigns_sessions_to_idle_thread(thr); + worker_threads_get_sessions_from_idle_threads(); } } @@ -4342,6 +4283,58 @@ __run_skip_2: } } +unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) { + int i=0; + for (i=0;ilen;i++) { + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); + if (mysess==sess) { + return i; + } + } + return i; +} + +#ifdef IDLE_THREADS +void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr) { + if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) { + pthread_mutex_lock(&thr->myexchange.mutex_idles); + bool empty_queue=true; + if (thr->myexchange.idle_mysql_sessions->len) { + // there are already sessions in the queues. We assume someone already notified worker 0 + empty_queue=false; + } + while (idle_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0); + thr->myexchange.idle_mysql_sessions->add(mysess); + } + pthread_mutex_unlock(&thr->myexchange.mutex_idles); + if (empty_queue==true) { + unsigned char c=1; + int fd=thr->pipefd[1]; + if (write(fd,&c,1)==-1) { + //proxy_error("Error while signaling maintenance thread\n"); + } + } + } +} + +void MySQL_Thread::worker_threads_get_sessions_from_idle_threads() { + worker_threads_get_sessions_from_idle_threads(); + pthread_mutex_lock(&myexchange.mutex_resumes); + if (myexchange.resume_mysql_sessions->len) { + //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; + while (myexchange.resume_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0); + register_session(mysess, false); + MySQL_Data_Stream *myds=mysess->client_myds; + mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + } + } + pthread_mutex_unlock(&myexchange.mutex_resumes); +} +#endif // IDLE_THREADS + + bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { if (mypolls.fds[n].revents) { #ifdef IDLE_THREADS @@ -6748,3 +6741,36 @@ void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) { } } } + +bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n) { + unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; + if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { + // make sure data stream has no pending data out and session is not throttled (#1939) + // because epoll thread does not handle data stream with data out + if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { + //unsigned int j; + bool has_backends = myds->sess->has_any_backend(); +/* + for (j=0;jsess->mybes->len;j++) { + MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); + MySQL_Data_Stream *__myds=tmp_mybe->server_myds; + if (__myds->myconn) { + conns++; + } + } +*/ + if (has_backends==false) { + unsigned long long idle_since = curtime - myds->sess->IdleTime(); + mypolls.remove_index_fast(n); + myds->mypolls=NULL; + unsigned int i = find_session_idx_in_mysql_sessions(myds->sess); + myds->sess->thread=NULL; + unregister_session(i); + myds->sess->idle_since = idle_since; + idle_mysql_sessions->add(myds->sess); + return true; + } + } + } + return false; +}