From e95d1cd0decdbaea91a4028f701ffb4921289c5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 14 Apr 2018 07:28:17 +0200 Subject: [PATCH] Splitting Thread::Run ito multiple functions This allowed also to identify few bottleneck and rewrite some code --- include/MySQL_Thread.h | 31 +- lib/MySQL_Thread.cpp | 627 +++++++++++++++++++++++------------------ 2 files changed, 372 insertions(+), 286 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 619ef383a..4a8ca29ad 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -61,8 +61,10 @@ class ProxySQL_Poll { public: unsigned int poll_timeout; +#ifdef DEBUG unsigned long loops; StatCounters *loop_counters; +#endif // DEBUG unsigned int len; unsigned int size; struct pollfd *fds; @@ -73,13 +75,15 @@ class ProxySQL_Poll { volatile int pending_listener_del; ProxySQL_Poll() { +#ifdef DEBUG #ifdef PROXYSQL_STATSCOUNTERS_NOLOCK loop_counters=new StatCounters(15,10); #else loop_counters=new StatCounters(15,10,false); #endif - poll_timeout=0; loops=0; +#endif // DEBUG + poll_timeout=0; len=0; pending_listener_add=0; pending_listener_del=0; @@ -103,7 +107,16 @@ class ProxySQL_Poll { free(fds); free(last_recv); free(last_sent); - delete loop_counters; +#ifdef DEBUG + delete loop_counters; + if (loops) { + unsigned long long last_loops_at=monotonic_time(); + unsigned long long elapsed_time = last_loops_at-first_loops_at; + elapsed_time /= 1000; + float lpms = loops/elapsed_time; + proxy_info("Loops/ms = %f\n", lpms); + } +#endif // DEBUG }; void add(uint32_t _events, int _fd, MySQL_Data_Stream *_myds, unsigned long long sent_time) { @@ -187,7 +200,8 @@ class MySQL_Thread unsigned long long curtime; unsigned long long pre_poll_time; unsigned long long last_maintenance_time; - std::atomic atomic_curtime; + //std::atomic atomic_curtime; + volatile unsigned long long atomic_curtime; // changed from atomic to volatile PtrArray *mysql_sessions; PtrArray *mirror_queue_mysql_sessions; PtrArray *mirror_queue_mysql_sessions_cache; @@ -231,7 +245,8 @@ class MySQL_Thread unsigned long long ConnPool_get_conn_failure; unsigned long long gtid_binlog_collected; unsigned long long gtid_session_collected; - unsigned int active_transactions; + volatile unsigned int active_transactions; + //std::atomic active_transactions; } status_variables; struct { @@ -264,6 +279,14 @@ class MySQL_Thread MySQL_Connection * get_MyConn_local(unsigned int, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid); void push_MyConn_local(MySQL_Connection *); void return_local_connections(); + void ping_idle_connections(); + void process_mirror_sessions(); + void MoveMydsToMaintenanceThreadIfNeeded(MySQL_Data_Stream *myds, unsigned int n); + void HandleEventFromEpoll(int i); + void ScanIdleClients(); + void ProcessEventOnDataStream(MySQL_Data_Stream *myds, unsigned int n); + void MoveMydsBackToWorkerThreadIfNeeded(); + void ComputeMinTimeToWait(MySQL_Data_Stream *myds); }; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index a43b110ff..f8924a17f 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -2473,44 +2473,7 @@ void MySQL_Thread::unregister_session(int idx) { mysql_sessions->remove_index_fast(idx); } - -// main loop -void MySQL_Thread::run() { - unsigned int n; - int rc; - -#ifdef IDLE_THREADS - bool idle_maintenance_thread=epoll_thread; - if (idle_maintenance_thread) { - // we check if it is the first time we are called - if (efd==-1) { - efd = EPOLL_CREATE; - int fd=pipefd[0]; - struct epoll_event event; - memset(&event,0,sizeof(event)); // let's make valgrind happy - event.events = EPOLLIN; - event.data.u32=0; // special value to point to the pipe - epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event); - } - } -#endif // IDLE_THREADS - - curtime=monotonic_time(); - atomic_curtime=curtime; - -#ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX - pthread_mutex_lock(&thread_mutex); -#else - spin_wrlock(&thread_mutex); -#endif - while (shutdown==0) { - -#ifdef IDLE_THREADS - if (idle_maintenance_thread) { - goto __run_skip_1; - } -#endif // IDLE_THREADS - +void MySQL_Thread::ping_idle_connections() { int num_idles; if (processing_idles==true && (last_processing_idles < curtime-mysql_thread___ping_timeout_server*1000)) { processing_idles=false; @@ -2546,66 +2509,285 @@ void MySQL_Thread::run() { processing_idles=true; last_processing_idles=curtime; } +} -#ifdef IDLE_THREADS -__run_skip_1: - - if (idle_maintenance_thread) { - pthread_mutex_lock(&myexchange.mutex_idles); - while (myexchange.idle_mysql_sessions->len) { - MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); - register_session(mysess, false); - MySQL_Data_Stream *myds=mysess->client_myds; - mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); - // add in epoll() - struct epoll_event event; - memset(&event,0,sizeof(event)); // let's make valgrind happy - event.data.u32=mysess->thread_session_id; - event.events = EPOLLIN; - epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event); - // we map thread_id -> position in mysql_session (end of the list) - sessmap[mysess->thread_session_id]=mysql_sessions->len-1; - //fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx); +void MySQL_Thread::process_mirror_sessions() { + while (mirror_queue_mysql_sessions->len) { + if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) { + __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); + //goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime + return; // we can't add more mirror sessions at runtime + } else { + int idx; + idx=fastrand()%(mirror_queue_mysql_sessions->len); + MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx); + register_session(newsess); + newsess->handler(); // execute immediately + if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed + unregister_session(mysql_sessions->len-1); + unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; + if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3; + if (mirror_queue_mysql_sessions_cache->len <= l) { + bool to_cache=true; + if (newsess->mybe) { + if (newsess->mybe->server_myds) { + to_cache=false; + } + } + if (to_cache) { + __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); + mirror_queue_mysql_sessions_cache->add(newsess); + } else { + delete newsess; + } + } else { + delete newsess; + } } - pthread_mutex_unlock(&myexchange.mutex_idles); - goto __run_skip_1a; + //newsess->to_process=0; } -#endif // IDLE_THREADS - while (mirror_queue_mysql_sessions->len) { - if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) { - __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); - goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime - } else { - int idx; - idx=fastrand()%(mirror_queue_mysql_sessions->len); - MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx); - register_session(newsess); - newsess->handler(); // execute immediately - if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed - unregister_session(mysql_sessions->len-1); - unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; - if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3; - if (mirror_queue_mysql_sessions_cache->len <= l) { - bool to_cache=true; - if (newsess->mybe) { - if (newsess->mybe->server_myds) { - to_cache=false; - } + } +} + +void MySQL_Thread::MoveMydsToMaintenanceThreadIfNeeded(MySQL_Data_Stream *myds, unsigned int n) { + // here we try to move it to the maintenance thread + if (myds->myds_type==MYDS_FRONTEND && myds->sess) { + if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) { + unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; + if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { + if (myds->sess->client_myds == myds && myds->PSarrayOUT->len==0 && (myds->queueOUT.head - myds->queueOUT.tail)==0 ) { // extra check + unsigned int j; + int conns=0; + for (j=0;jsess->mybes->len;j++) { + MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); + MySQL_Data_Stream *__myds=tmp_mybe->server_myds; + if (__myds->myconn) { + conns++; } - if (to_cache) { - __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); - mirror_queue_mysql_sessions_cache->add(newsess); - } else { - delete newsess; + } + unsigned long long idle_since = curtime - myds->sess->IdleTime(); + bool exit_cond=false; + if (conns==0) { + mypolls.remove_index_fast(n); + myds->mypolls=NULL; + unsigned int i; + for (i=0;ilen;i++) { + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); + if (mysess==myds->sess) { + mysess->thread=NULL; + unregister_session(i); + exit_cond=true; + mysess->idle_since = idle_since; + idle_mysql_sessions->add(mysess); + break; + } } - } else { - delete newsess; + } + if (exit_cond) { + //continue; + return; } } - //newsess->to_process=0; } } -__mysql_thread_exit_add_mirror: + } +} + +void MySQL_Thread::HandleEventFromEpoll(int i) { + if (events[i].data.u32) { + // NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it + if (events[i].events) { + uint32_t sess_thr_id=events[i].data.u32; + uint32_t sess_pos=sessmap[sess_thr_id]; + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos); + MySQL_Data_Stream *tmp_myds=mysess->client_myds; + int dsidx=tmp_myds->poll_fds_idx; + //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); + mypolls.remove_index_fast(dsidx); + tmp_myds->mypolls=NULL; + mysess->thread=NULL; + // we first delete the association in sessmap + sessmap.erase(mysess->thread_session_id); + if (mysql_sessions->len > 1) { + // take the last element and adjust the map + MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); + if (mysess->thread_session_id != mysess_last->thread_session_id) + sessmap[mysess_last->thread_session_id]=sess_pos; + } + unregister_session(sess_pos); + resume_mysql_sessions->add(mysess); + epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); + } + } +} + +void MySQL_Thread::ScanIdleClients() { +#define SESS_TO_SCAN 128 + if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) { + mysess_idx=0; + } + unsigned int i; + unsigned long long min_idle = 0; + if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) { + min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000; + } + for (i=0;ilen; i++) { + uint32_t sess_pos=mysess_idx; + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos); + if (mysess->idle_since < min_idle) { + mysess->killed=true; + MySQL_Data_Stream *tmp_myds=mysess->client_myds; + int dsidx=tmp_myds->poll_fds_idx; + //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); + mypolls.remove_index_fast(dsidx); + tmp_myds->mypolls=NULL; + mysess->thread=NULL; + // we first delete the association in sessmap + sessmap.erase(mysess->thread_session_id); + if (mysql_sessions->len > 1) { + // take the last element and adjust the map + MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); + if (mysess->thread_session_id != mysess_last->thread_session_id) + sessmap[mysess_last->thread_session_id]=sess_pos; + } + unregister_session(sess_pos); + resume_mysql_sessions->add(mysess); + epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); + } + mysess_idx++; + } +} + +void MySQL_Thread::ProcessEventOnDataStream(MySQL_Data_Stream *myds, unsigned int n) { + if (myds && myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) { + mypolls.myds[n]->set_pollout(); + } else { + if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) { + mypolls.fds[n].events = POLLIN; + if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE) + mypolls.fds[n].events |= POLLOUT; + } else { + mypolls.myds[n]->set_pollout(); + } + } + if (myds && myds->sess->pause_until > curtime) { + if (myds->myds_type==MYDS_FRONTEND) { + mypolls.myds[n]->remove_pollout(); + } + if (myds->myds_type==MYDS_BACKEND) { + if (mysql_thread___throttle_ratio_server_to_client) { + mypolls.fds[n].events = 0; + } + } + } + if (myds && myds->myds_type==MYDS_BACKEND) { + if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) { + unsigned int buffered_data=0; + buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; + buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; + // we pause receiving from backend at mysql_thread___threshold_resultset_size * 8 + // but assuming that client isn't completely blocked, we will stop checking for data + // only at mysql_thread___threshold_resultset_size * 4 + if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) { + mypolls.fds[n].events = 0; + } + } + } +} + +void MySQL_Thread::ComputeMinTimeToWait(MySQL_Data_Stream *myds) { + if (myds->wait_until) { + if (myds->wait_until > curtime) { + //if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) { + if (myds->wait_until - curtime < mypolls.poll_timeout) { + //if (myds->wait_until < mypolls.abs_poll_timeout) { + // mypolls.abs_poll_timeout = myds->wait_until; + mypolls.poll_timeout= myds->wait_until - curtime; + } + } + } + if (myds->sess) { + if (myds->sess->pause_until > 0) { + //if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) { + if (myds->sess->pause_until - curtime < mypolls.poll_timeout) { + mypolls.poll_timeout= myds->sess->pause_until - curtime; + //if (myds->sess->pause_until < mypolls.abs_poll_timeout) { + // mypolls.abs_poll_timeout = myds->sess->pause_until; + } + } + } +} + +// main loop +void MySQL_Thread::run() { + unsigned int n; + int rc; + +#ifdef IDLE_THREADS + bool idle_maintenance_thread=epoll_thread; + if (idle_maintenance_thread) { + // we check if it is the first time we are called + if (efd==-1) { + efd = EPOLL_CREATE; + int fd=pipefd[0]; + struct epoll_event event; + memset(&event,0,sizeof(event)); // let's make valgrind happy + event.events = EPOLLIN; + event.data.u32=0; // special value to point to the pipe + epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event); + } + } +#endif // IDLE_THREADS + + curtime=monotonic_time(); + atomic_curtime=curtime; + +#ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX + pthread_mutex_lock(&thread_mutex); +#else + spin_wrlock(&thread_mutex); +#endif + while (shutdown==0) { + +#ifdef IDLE_THREADS + if (idle_maintenance_thread) { + goto __run_skip_1; + } +#endif // IDLE_THREADS + + ping_idle_connections(); + +#ifdef IDLE_THREADS +__run_skip_1: + + if (idle_maintenance_thread) { + pthread_mutex_lock(&myexchange.mutex_idles); + while (myexchange.idle_mysql_sessions->len) { + MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); + register_session(mysess, false); + MySQL_Data_Stream *myds=mysess->client_myds; + mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); + // add in epoll() + struct epoll_event event; + memset(&event,0,sizeof(event)); // let's make valgrind happy + event.data.u32=mysess->thread_session_id; + event.events = EPOLLIN; + epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event); + // we map thread_id -> position in mysql_session (end of the list) + sessmap[mysess->thread_session_id]=mysql_sessions->len-1; + //fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx); + } + pthread_mutex_unlock(&myexchange.mutex_idles); + goto __run_skip_1a; + } +#endif // IDLE_THREADS + + process_mirror_sessions(); + +//__mysql_thread_exit_add_mirror: + + mypolls.poll_timeout = mysql_thread___poll_timeout/1000; // predefined default value + //mypolls.abs_poll_timeout = curtime + mypolls.poll_timeout; for (n = 0; n < mypolls.len; n++) { MySQL_Data_Stream *myds=NULL; myds=mypolls.myds[n]; @@ -2613,104 +2795,28 @@ __mysql_thread_exit_add_mirror: if (myds) { #ifdef IDLE_THREADS if (GloVars.global.idle_threads) { - // here we try to move it to the maintenance thread - if (myds->myds_type==MYDS_FRONTEND && myds->sess) { - if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) { - unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; - if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { - if (myds->sess->client_myds == myds && myds->PSarrayOUT->len==0 && (myds->queueOUT.head - myds->queueOUT.tail)==0 ) { // extra check - unsigned int j; - int conns=0; - for (j=0;jsess->mybes->len;j++) { - MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); - MySQL_Data_Stream *__myds=tmp_mybe->server_myds; - if (__myds->myconn) { - conns++; - } - } - unsigned long long idle_since = curtime - myds->sess->IdleTime(); - bool exit_cond=false; - if (conns==0) { - mypolls.remove_index_fast(n); - myds->mypolls=NULL; - unsigned int i; - for (i=0;ilen;i++) { - MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); - if (mysess==myds->sess) { - mysess->thread=NULL; - unregister_session(i); - exit_cond=true; - mysess->idle_since = idle_since; - idle_mysql_sessions->add(mysess); - break; - } - } - } - if (exit_cond) { - continue; - } - } - } - } - } + MoveMydsToMaintenanceThreadIfNeeded(myds,n); } #endif // IDLE_THREADS - if (myds->wait_until) { - if (myds->wait_until > curtime) { - if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) { - mypolls.poll_timeout= myds->wait_until - curtime; - } - } - } - if (myds->sess) { - if (myds->sess->pause_until > 0) { - if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) { - mypolls.poll_timeout= myds->sess->pause_until - curtime; - } - } - } + ComputeMinTimeToWait(myds); } if (myds) myds->revents=0; if (mypolls.myds[n] && mypolls.myds[n]->myds_type!=MYDS_LISTENER) { - if (myds && myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) { - mypolls.myds[n]->set_pollout(); - } else { - if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) { - mypolls.fds[n].events = POLLIN; - if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE) - mypolls.fds[n].events |= POLLOUT; - } else { - mypolls.myds[n]->set_pollout(); - } - } - if (myds && myds->sess->pause_until > curtime) { - if (myds->myds_type==MYDS_FRONTEND) { - mypolls.myds[n]->remove_pollout(); - } - if (myds->myds_type==MYDS_BACKEND) { - if (mysql_thread___throttle_ratio_server_to_client) { - mypolls.fds[n].events = 0; - } - } - } - if (myds && myds->myds_type==MYDS_BACKEND) { - if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) { - unsigned int buffered_data=0; - buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; - buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; - // we pause receiving from backend at mysql_thread___threshold_resultset_size * 8 - // but assuming that client isn't completely blocked, we will stop checking for data - // only at mysql_thread___threshold_resultset_size * 4 - if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) { - mypolls.fds[n].events = 0; - } - } - } - + ProcessEventOnDataStream(myds,n); } proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); } - +/* + if (mypolls.abs_poll_timeout > curtime) { + mypolls.abs_poll_timeout -= curtime; + if (mypolls.abs_poll_timeout < mypolls.poll_timeout) { + mypolls.poll_timeout = mypolls.abs_poll_timeout; + } + } else { + // something went wrong + mypolls.poll_timeout = 0; + } +*/ #ifdef IDLE_THREADS if (GloVars.global.idle_threads) { if (idle_maintenance_thread==false) { @@ -2787,7 +2893,9 @@ __run_skip_1a: //this is the only portion of code not protected by a global mutex proxy_debug(PROXY_DEBUG_NET,5,"Calling poll with timeout %d\n", ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 > (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ); // poll is called with a timeout of mypolls.poll_timeout if set , or mysql_thread___poll_timeout - rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ); + //rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ); + //rc=poll(mypolls.fds,mypolls.len, 0); + rc=poll(mypolls.fds,mypolls.len, mypolls.poll_timeout/1000); proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Returning poll"); #ifdef IDLE_THREADS } @@ -2831,9 +2939,15 @@ __run_skip_1a: } // update polls statistics - mypolls.loops++; - mypolls.loop_counters->incr(curtime/1000000); - +#ifdef DEBUG + if (mypolls.loops==0 && rc > 0) { + mypolls.first_loops_at = curtime; + mypolls.loops++; + } + if (mypolls.loops) + mypolls.loops++; + //mypolls.loop_counters->incr(curtime/1000000); +#endif // DEBUG if (maintenance_loop) { // house keeping unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; @@ -2856,9 +2970,9 @@ __run_skip_1a: exit(EXIT_FAILURE); } - if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) { - refresh_variables(); - } +// if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) { +// refresh_variables(); +// } #ifdef IDLE_THREADS if (idle_maintenance_thread==false) { @@ -2877,31 +2991,7 @@ __run_skip_1a: if (rc) { int i; for (i=0; iindex(sess_pos); - MySQL_Data_Stream *tmp_myds=mysess->client_myds; - int dsidx=tmp_myds->poll_fds_idx; - //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); - mypolls.remove_index_fast(dsidx); - tmp_myds->mypolls=NULL; - mysess->thread=NULL; - // we first delete the association in sessmap - sessmap.erase(mysess->thread_session_id); - if (mysql_sessions->len > 1) { - // take the last element and adjust the map - MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); - if (mysess->thread_session_id != mysess_last->thread_session_id) - sessmap[mysess_last->thread_session_id]=sess_pos; - } - unregister_session(sess_pos); - resume_mysql_sessions->add(mysess); - epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); - } - } + HandleEventFromEpoll(i); } for (i=0; ilen && maintenance_loop) { -#define SESS_TO_SCAN 128 - if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) { - mysess_idx=0; - } - unsigned int i; - unsigned long long min_idle = 0; - if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) { - min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000; - } - for (i=0;ilen; i++) { - uint32_t sess_pos=mysess_idx; - MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos); - if (mysess->idle_since < min_idle) { - mysess->killed=true; - MySQL_Data_Stream *tmp_myds=mysess->client_myds; - int dsidx=tmp_myds->poll_fds_idx; - //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); - mypolls.remove_index_fast(dsidx); - tmp_myds->mypolls=NULL; - mysess->thread=NULL; - // we first delete the association in sessmap - sessmap.erase(mysess->thread_session_id); - if (mysql_sessions->len > 1) { - // take the last element and adjust the map - MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); - if (mysess->thread_session_id != mysess_last->thread_session_id) - sessmap[mysess_last->thread_session_id]=sess_pos; - } - unregister_session(sess_pos); - resume_mysql_sessions->add(mysess); - epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); - } - mysess_idx++; - } + ScanIdleClients(); } goto __run_skip_2; } @@ -3040,40 +3097,7 @@ __run_skip_1a: #ifdef IDLE_THREADS __run_skip_2: if (GloVars.global.idle_threads && idle_maintenance_thread) { - unsigned int w=rand()%(GloMTH->num_threads); - MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; - if (resume_mysql_sessions->len) { - pthread_mutex_lock(&thr->myexchange.mutex_resumes); - unsigned int ims; - if (shutdown==0 && thr->shutdown==0) - for (ims=0; imslen; ims++) { - MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0); - thr->myexchange.resume_mysql_sessions->add(mysess); - } - pthread_mutex_unlock(&thr->myexchange.mutex_resumes); - { - unsigned char c=0; - //MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; - int fd=thr->pipefd[1]; - if (write(fd,&c,1)==-1) { - //proxy_error("Error while signaling maintenance thread\n"); - } - } - } else { - VALGRIND_DISABLE_ERROR_REPORTING; - pthread_mutex_lock(&thr->myexchange.mutex_resumes); - VALGRIND_ENABLE_ERROR_REPORTING; - if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) { - unsigned char c=0; - int fd=thr->pipefd[1]; - if (write(fd,&c,1)==-1) { - //proxy_error("Error while signaling maintenance thread\n"); - } - } - VALGRIND_DISABLE_ERROR_REPORTING; - pthread_mutex_unlock(&thr->myexchange.mutex_resumes); - VALGRIND_ENABLE_ERROR_REPORTING; - } + MoveMydsBackToWorkerThreadIfNeeded(); } else { #endif // IDLE_THREADS // iterate through all sessions and process the session logic @@ -3086,6 +3110,43 @@ __run_skip_2: } } +void MySQL_Thread::MoveMydsBackToWorkerThreadIfNeeded() { + unsigned int w=rand()%(GloMTH->num_threads); + MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; + if (resume_mysql_sessions->len) { + pthread_mutex_lock(&thr->myexchange.mutex_resumes); + unsigned int ims; + if (shutdown==0 && thr->shutdown==0) + for (ims=0; imslen; ims++) { + MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0); + thr->myexchange.resume_mysql_sessions->add(mysess); + } + pthread_mutex_unlock(&thr->myexchange.mutex_resumes); + { + unsigned char c=0; + //MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; + int fd=thr->pipefd[1]; + if (write(fd,&c,1)==-1) { + //proxy_error("Error while signaling maintenance thread\n"); + } + } + } else { + VALGRIND_DISABLE_ERROR_REPORTING; + pthread_mutex_lock(&thr->myexchange.mutex_resumes); + VALGRIND_ENABLE_ERROR_REPORTING; + if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) { + unsigned char c=0; + int fd=thr->pipefd[1]; + if (write(fd,&c,1)==-1) { + //proxy_error("Error while signaling maintenance thread\n"); + } + } + VALGRIND_DISABLE_ERROR_REPORTING; + pthread_mutex_unlock(&thr->myexchange.mutex_resumes); + VALGRIND_ENABLE_ERROR_REPORTING; + } +} + bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { if (mypolls.fds[n].revents) { #ifdef IDLE_THREADS @@ -3310,8 +3371,9 @@ void MySQL_Thread::process_all_sessions() { } } unsigned int total_active_transactions_tmp; - total_active_transactions_tmp=__sync_add_and_fetch(&status_variables.active_transactions,0); - __sync_bool_compare_and_swap(&status_variables.active_transactions,total_active_transactions_tmp,total_active_transactions_); + status_variables.active_transactions = total_active_transactions_; + //total_active_transactions_tmp=__sync_add_and_fetch(&status_variables.active_transactions,0); + //__sync_bool_compare_and_swap(&status_variables.active_transactions,total_active_transactions_tmp,total_active_transactions_); } void MySQL_Thread::refresh_variables() { @@ -4534,7 +4596,8 @@ unsigned int MySQL_Threads_Handler::get_active_transations() { if (mysql_threads) { MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker; if (thr) - q+=__sync_fetch_and_add(&thr->status_variables.active_transactions,0); + //q+=__sync_fetch_and_add(&thr->status_variables.active_transactions,0); + q+= thr->status_variables.active_transactions; // using volatile } } return q;