From 72912f0b88254ada55df35b3abc3ab742b5b6b95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 22:05:39 +0200 Subject: [PATCH] Simplifying MySQL_Thread::run() phase 3 --- include/MySQL_Thread.h | 4 ++- lib/MySQL_Thread.cpp | 73 ++++++++++++++++++++++++------------------ 2 files changed, 45 insertions(+), 32 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index e02cdacc7..49f6c7e90 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -100,13 +100,15 @@ class MySQL_Thread #ifdef IDLE_THREADS void worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr); - void worker_threads_get_sessions_from_idle_threads(); + void worker_thread_gets_sessions_from_idle_thread(); + void idle_thread_gets_sessions_from_worker_thread(); void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr); void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr); #endif // IDLE_THREADS unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); + bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n); protected: int nfds; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 06546f44c..9a3ab5626 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3814,23 +3814,7 @@ void MySQL_Thread::run() { __run_skip_1: if (idle_maintenance_thread) { - pthread_mutex_lock(&myexchange.mutex_idles); - while (myexchange.idle_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); - register_session(mysess, false); - MySQL_Data_Stream *myds=mysess->client_myds; - mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); - // add in epoll() - struct epoll_event event; - memset(&event,0,sizeof(event)); // let's make valgrind happy - event.data.u32=mysess->thread_session_id; - event.events = EPOLLIN; - epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event); - // we map thread_id -> position in mysql_session (end of the list) - sessmap[mysess->thread_session_id]=mysql_sessions->len-1; - //fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx); - } - pthread_mutex_unlock(&myexchange.mutex_idles); + idle_thread_gets_sessions_from_worker_thread(); goto __run_skip_1a; } #endif // IDLE_THREADS @@ -3927,17 +3911,7 @@ __mysql_thread_exit_add_mirror: } } if (myds->myds_type==MYDS_BACKEND) { - if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) { - unsigned int buffered_data=0; - buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; - buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; - // we pause receiving from backend at mysql_thread___threshold_resultset_size * 8 - // but assuming that client isn't completely blocked, we will stop checking for data - // only at mysql_thread___threshold_resultset_size * 4 - if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) { - mypolls.fds[n].events = 0; - } - } + set_backend_to_be_skipped_if_frontend_is_slow(myds, n); } } } @@ -3950,7 +3924,7 @@ __mysql_thread_exit_add_mirror: int r=rand()%(GloMTH->num_threads); MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker; worker_thread_assigns_sessions_to_idle_thread(thr); - worker_threads_get_sessions_from_idle_threads(); + worker_thread_gets_sessions_from_idle_thread(); } } @@ -4238,6 +4212,7 @@ __run_skip_1a: #ifdef IDLE_THREADS __run_skip_2: if (GloVars.global.idle_threads && idle_maintenance_thread) { + // this is an idle thread unsigned int w=rand()%(GloMTH->num_threads); MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; if (resume_mysql_sessions->len) { @@ -4256,6 +4231,7 @@ __run_skip_2: #endif // IDLE_THREADS } } +// end of ::run() unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) { int i=0; @@ -4323,8 +4299,7 @@ void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *t } } -void MySQL_Thread::worker_threads_get_sessions_from_idle_threads() { - worker_threads_get_sessions_from_idle_threads(); +void MySQL_Thread::worker_thread_gets_sessions_from_idle_thread() { pthread_mutex_lock(&myexchange.mutex_resumes); if (myexchange.resume_mysql_sessions->len) { //unsigned int maxsess=GloMTH->resume_mysql_sessions->len; @@ -6779,3 +6754,39 @@ bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, } return false; } + +bool MySQL_Thread::set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n) { + if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) { + unsigned int buffered_data=0; + buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; + buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; + // we pause receiving from backend at mysql_thread___threshold_resultset_size * 8 + // but assuming that client isn't completely blocked, we will stop checking for data + // only at mysql_thread___threshold_resultset_size * 4 + if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) { + mypolls.fds[n].events = 0; + return true; + } + } + return false; +} + +void MySQL_Thread::idle_thread_gets_sessions_from_worker_thread() { + pthread_mutex_lock(&myexchange.mutex_idles); + while (myexchange.idle_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); + register_session(mysess, false); + MySQL_Data_Stream *myds=mysess->client_myds; + mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + // add in epoll() + struct epoll_event event; + memset(&event,0,sizeof(event)); // let's make valgrind happy + event.data.u32=mysess->thread_session_id; + event.events = EPOLLIN; + epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event); + // we map thread_id -> position in mysql_session (end of the list) + sessmap[mysess->thread_session_id]=mysql_sessions->len-1; + //fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx); + } + pthread_mutex_unlock(&myexchange.mutex_idles); +}