From e2b155a7a4bba299c2f056774ca42c3aedccd809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Fri, 14 Oct 2016 20:46:10 +0000 Subject: [PATCH] First attempt for an hybrid poll/epoll implementation --- include/MySQL_Thread.h | 3 ++ lib/MySQL_Thread.cpp | 99 ++++++++++++++++++++++++++++++++---------- src/main.cpp | 2 +- 3 files changed, 81 insertions(+), 23 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index bcf8d723e..1c10ae919 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -165,6 +165,8 @@ class MySQL_Thread int pipefd[2]; int shutdown; + bool epoll_thread; + // status variables are per thread only // in this way, there is no need for atomic operation and there is no cache miss // when it is needed a total, all threads are checked @@ -344,6 +346,7 @@ class MySQL_Threads_Handler public: unsigned int num_threads; proxysql_mysql_thread_t *mysql_threads; + proxysql_mysql_thread_t *mysql_threads_idles; rwlock_t rwlock_idles; rwlock_t rwlock_resumes; PtrArray *idle_mysql_sessions; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 3aca49e34..2a4862b0d 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -282,6 +282,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { } num_threads=0; mysql_threads=NULL; + mysql_threads_idles=NULL; stacksize=0; shutdown_=0; spinlock_rwlock_init(&rwlock); @@ -386,9 +387,9 @@ int MySQL_Threads_Handler::listener_add(const char *iface) { unsigned int i; if (perthrsocks==NULL) { for (i=0;i= MIN_THREADS_FOR_MAINTENANCE) { - continue; - } + //if (i==0 && num_threads >= MIN_THREADS_FOR_MAINTENANCE) { + // continue; + //} MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker; while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,rc)) { usleep(10); // pause a bit @@ -396,10 +397,10 @@ int MySQL_Threads_Handler::listener_add(const char *iface) { } } else { for (i=0;i= MIN_THREADS_FOR_MAINTENANCE) { - close(perthrsocks[i]); - continue; - } + //if (i==0 && num_threads >= MIN_THREADS_FOR_MAINTENANCE) { + // close(perthrsocks[i]); + // continue; + //} MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker; while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,perthrsocks[i])) { usleep(10); // pause a bit @@ -1602,10 +1603,12 @@ void MySQL_Threads_Handler::init(unsigned int num, size_t stack) { int rc=pthread_attr_setstacksize(&attr, stacksize); assert(rc==0); mysql_threads=(proxysql_mysql_thread_t *)malloc(sizeof(proxysql_mysql_thread_t)*num_threads); + mysql_threads_idles=(proxysql_mysql_thread_t *)malloc(sizeof(proxysql_mysql_thread_t)*num_threads); } proxysql_mysql_thread_t * MySQL_Threads_Handler::create_thread(unsigned int tn, void *(*start_routine) (void *)) { pthread_create(&mysql_threads[tn].thread_id, &attr, start_routine , &mysql_threads[tn]); + pthread_create(&mysql_threads_idles[tn].thread_id, &attr, start_routine , &mysql_threads_idles[tn]); return NULL; } @@ -1617,10 +1620,16 @@ void MySQL_Threads_Handler::shutdown_threads() { if (mysql_threads[i].worker) mysql_threads[i].worker->shutdown=1; } + for (i=0; ishutdown=1; + } signal_all_threads(1); for (i=0; inum_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) { - idle_maintenance_thread=true; + bool idle_maintenance_thread=epoll_thread; +// if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) { +// idle_maintenance_thread=true; + if (idle_maintenance_thread) { // we check if it is the first time we are called if (efd==-1) { efd = epoll_create1(0); @@ -2007,7 +2019,8 @@ __run_skip_1: proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); } - if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && idle_maintenance_thread==false) { + //if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && idle_maintenance_thread==false) { + if (idle_maintenance_thread==false) { if (idle_mysql_sessions->len) { unsigned int ims=0; spin_wrlock(&GloMTH->rwlock_idles); @@ -2023,7 +2036,8 @@ __run_skip_1: spin_wrunlock(&GloMTH->rwlock_idles); if (empty_queue==true) { unsigned char c=1; - MySQL_Thread *thr=GloMTH->mysql_threads[0].worker; + int r=rand()%(GloMTH->num_threads); + MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker; int fd=thr->pipefd[1]; if (write(fd,&c,1)==-1) { proxy_error("Error while signaling maintenance thread\n"); @@ -2123,7 +2137,7 @@ __run_skip_1a: refresh_variables(); } - if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && idle_maintenance_thread==false) { + if (idle_maintenance_thread==false) { for (n=0; nlen; n++) { MySQL_Session *_sess=(MySQL_Session *)mysql_sessions->index(n); _sess->to_process=0; @@ -2370,8 +2384,8 @@ __run_skip_2: } spin_wrunlock(&GloMTH->rwlock_resumes); { - unsigned int w=rand()%(GloMTH->num_threads-1); - w++; + unsigned int w=rand()%(GloMTH->num_threads); + //w++; unsigned char c=0; MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; int fd=thr->pipefd[1]; @@ -2382,8 +2396,8 @@ __run_skip_2: } else { spin_wrlock(&GloMTH->rwlock_resumes); if (GloMTH->resume_mysql_sessions->len) { - unsigned int w=rand()%(GloMTH->num_threads-1); - w++; + unsigned int w=rand()%(GloMTH->num_threads); + //w++; unsigned char c=0; MySQL_Thread *thr=GloMTH->mysql_threads[w].worker; int fd=thr->pipefd[1]; @@ -2405,7 +2419,8 @@ __run_skip_2: bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { if (mypolls.fds[n].revents) { if (myds->myds_type==MYDS_FRONTEND) { - if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) { + if (epoll_thread) { + //if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) { mypolls.remove_index_fast(n); myds->mypolls=NULL; unsigned int i; @@ -2482,10 +2497,10 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned void MySQL_Thread::process_all_sessions() { unsigned int n; unsigned int total_active_transactions_=0; - bool idle_maintenance_thread=false; - if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) { - idle_maintenance_thread=true; - } + bool idle_maintenance_thread=epoll_thread; + //if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) { + // idle_maintenance_thread=true; + //} int rc; bool sess_sort=mysql_thread___sessions_sort; if (idle_maintenance_thread) { @@ -2681,6 +2696,7 @@ void MySQL_Thread::refresh_variables() { MySQL_Thread::MySQL_Thread() { efd=-1; + epoll_thread=false; spinlock_rwlock_init(&thread_mutex); mysess_idx=0; // mypolls.len=0; @@ -3203,6 +3219,13 @@ void MySQL_Threads_Handler::signal_all_threads(unsigned char _c) { proxy_error("Error during write in signal_all_threads()\n"); } } + for (i=0;ipipefd[1]; + if (write(fd,&c,1)==-1) { + proxy_error("Error during write in signal_all_threads()\n"); + } + } } bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) { @@ -3213,6 +3236,10 @@ bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) { MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker; spin_wrlock(&thr->thread_mutex); } + for (i=0;ithread_mutex); + } for (i=0;imysql_sessions->len; j++) { + MySQL_Session *sess=(MySQL_Session *)thr->mysql_sessions->pdata[j]; + if (sess->thread_session_id==_thread_session_id) { + sess->killed=true; + ret=true; + goto __exit_kill_session; + } + } + } __exit_kill_session: for (i=0;ithread_mutex); } + for (i=0;ithread_mutex); + } return ret; } @@ -3386,6 +3429,13 @@ unsigned long long MySQL_Threads_Handler::get_mysql_frontend_buffers_bytes() { q+=__sync_fetch_and_add(&thr->status_variables.mysql_frontend_buffers_bytes,0); } } + for (i=0;istatus_variables.mysql_frontend_buffers_bytes,0); + } + } return q; } @@ -3398,6 +3448,11 @@ unsigned long long MySQL_Threads_Handler::get_mysql_session_internal_bytes() { if (thr) q+=__sync_fetch_and_add(&thr->status_variables.mysql_session_internal_bytes,0); } + if (mysql_threads_idles) { + MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker; + if (thr) + q+=__sync_fetch_and_add(&thr->status_variables.mysql_session_internal_bytes,0); + } } return q; } diff --git a/src/main.cpp b/src/main.cpp index ddac77405..38a2939bf 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -270,7 +270,7 @@ void ProxySQL_Main_init_Query_module() { void ProxySQL_Main_init_MySQL_Threads_Handler_module() { unsigned int i; GloMTH->init(); - load_ = GloMTH->num_threads + 1; + load_ = GloMTH->num_threads * 2 + 1; for (i=0; inum_threads; i++) { GloMTH->create_thread(i,mysql_worker_thread_func); }