First attempt for an hybrid poll/epoll implementation

pull/738/head
René Cannaò 10 years ago
parent 854120c435
commit e2b155a7a4

@ -165,6 +165,8 @@ class MySQL_Thread
int pipefd[2];
int shutdown;
bool epoll_thread;
// status variables are per thread only
// in this way, there is no need for atomic operation and there is no cache miss
// when it is needed a total, all threads are checked
@ -344,6 +346,7 @@ class MySQL_Threads_Handler
public:
unsigned int num_threads;
proxysql_mysql_thread_t *mysql_threads;
proxysql_mysql_thread_t *mysql_threads_idles;
rwlock_t rwlock_idles;
rwlock_t rwlock_resumes;
PtrArray *idle_mysql_sessions;

@ -282,6 +282,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
}
num_threads=0;
mysql_threads=NULL;
mysql_threads_idles=NULL;
stacksize=0;
shutdown_=0;
spinlock_rwlock_init(&rwlock);
@ -386,9 +387,9 @@ int MySQL_Threads_Handler::listener_add(const char *iface) {
unsigned int i;
if (perthrsocks==NULL) {
for (i=0;i<num_threads;i++) {
if (i==0 && num_threads >= MIN_THREADS_FOR_MAINTENANCE) {
continue;
}
//if (i==0 && num_threads >= MIN_THREADS_FOR_MAINTENANCE) {
// continue;
//}
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,rc)) {
usleep(10); // pause a bit
@ -396,10 +397,10 @@ int MySQL_Threads_Handler::listener_add(const char *iface) {
}
} else {
for (i=0;i<num_threads;i++) {
if (i==0 && num_threads >= MIN_THREADS_FOR_MAINTENANCE) {
close(perthrsocks[i]);
continue;
}
//if (i==0 && num_threads >= MIN_THREADS_FOR_MAINTENANCE) {
// close(perthrsocks[i]);
// continue;
//}
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,perthrsocks[i])) {
usleep(10); // pause a bit
@ -1602,10 +1603,12 @@ void MySQL_Threads_Handler::init(unsigned int num, size_t stack) {
int rc=pthread_attr_setstacksize(&attr, stacksize);
assert(rc==0);
mysql_threads=(proxysql_mysql_thread_t *)malloc(sizeof(proxysql_mysql_thread_t)*num_threads);
mysql_threads_idles=(proxysql_mysql_thread_t *)malloc(sizeof(proxysql_mysql_thread_t)*num_threads);
}
proxysql_mysql_thread_t * MySQL_Threads_Handler::create_thread(unsigned int tn, void *(*start_routine) (void *)) {
pthread_create(&mysql_threads[tn].thread_id, &attr, start_routine , &mysql_threads[tn]);
pthread_create(&mysql_threads_idles[tn].thread_id, &attr, start_routine , &mysql_threads_idles[tn]);
return NULL;
}
@ -1617,10 +1620,16 @@ void MySQL_Threads_Handler::shutdown_threads() {
if (mysql_threads[i].worker)
mysql_threads[i].worker->shutdown=1;
}
for (i=0; i<num_threads; i++) {
if (mysql_threads_idles[i].worker)
mysql_threads_idles[i].worker->shutdown=1;
}
signal_all_threads(1);
for (i=0; i<num_threads; i++) {
if (mysql_threads[i].worker)
pthread_join(mysql_threads[i].thread_id,NULL);
if (mysql_threads_idles[i].worker)
pthread_join(mysql_threads_idles[i].thread_id,NULL);
}
}
}
@ -1662,7 +1671,9 @@ MySQL_Threads_Handler::~MySQL_Threads_Handler() {
if (variables.ssl_p2s_key) free(variables.ssl_p2s_key);
if (variables.ssl_p2s_cipher) free(variables.ssl_p2s_cipher);
free(mysql_threads);
free(mysql_threads_idles);
mysql_threads=NULL;
mysql_threads_idles=NULL;
delete MLM;
MLM=NULL;
}
@ -1833,9 +1844,10 @@ void MySQL_Thread::run() {
int rc;
//int arg_on=1;
bool idle_maintenance_thread=false;
if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) {
idle_maintenance_thread=true;
bool idle_maintenance_thread=epoll_thread;
// if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) {
// idle_maintenance_thread=true;
if (idle_maintenance_thread) {
// we check if it is the first time we are called
if (efd==-1) {
efd = epoll_create1(0);
@ -2007,7 +2019,8 @@ __run_skip_1:
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
}
if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && idle_maintenance_thread==false) {
//if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && idle_maintenance_thread==false) {
if (idle_maintenance_thread==false) {
if (idle_mysql_sessions->len) {
unsigned int ims=0;
spin_wrlock(&GloMTH->rwlock_idles);
@ -2023,7 +2036,8 @@ __run_skip_1:
spin_wrunlock(&GloMTH->rwlock_idles);
if (empty_queue==true) {
unsigned char c=1;
MySQL_Thread *thr=GloMTH->mysql_threads[0].worker;
int r=rand()%(GloMTH->num_threads);
MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
proxy_error("Error while signaling maintenance thread\n");
@ -2123,7 +2137,7 @@ __run_skip_1a:
refresh_variables();
}
if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && idle_maintenance_thread==false) {
if (idle_maintenance_thread==false) {
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *_sess=(MySQL_Session *)mysql_sessions->index(n);
_sess->to_process=0;
@ -2370,8 +2384,8 @@ __run_skip_2:
}
spin_wrunlock(&GloMTH->rwlock_resumes);
{
unsigned int w=rand()%(GloMTH->num_threads-1);
w++;
unsigned int w=rand()%(GloMTH->num_threads);
//w++;
unsigned char c=0;
MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd=thr->pipefd[1];
@ -2382,8 +2396,8 @@ __run_skip_2:
} else {
spin_wrlock(&GloMTH->rwlock_resumes);
if (GloMTH->resume_mysql_sessions->len) {
unsigned int w=rand()%(GloMTH->num_threads-1);
w++;
unsigned int w=rand()%(GloMTH->num_threads);
//w++;
unsigned char c=0;
MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd=thr->pipefd[1];
@ -2405,7 +2419,8 @@ __run_skip_2:
bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
if (mypolls.fds[n].revents) {
if (myds->myds_type==MYDS_FRONTEND) {
if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) {
if (epoll_thread) {
//if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) {
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i;
@ -2482,10 +2497,10 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
void MySQL_Thread::process_all_sessions() {
unsigned int n;
unsigned int total_active_transactions_=0;
bool idle_maintenance_thread=false;
if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) {
idle_maintenance_thread=true;
}
bool idle_maintenance_thread=epoll_thread;
//if (GloMTH->num_threads >= MIN_THREADS_FOR_MAINTENANCE && this == GloMTH->mysql_threads[0].worker) {
// idle_maintenance_thread=true;
//}
int rc;
bool sess_sort=mysql_thread___sessions_sort;
if (idle_maintenance_thread) {
@ -2681,6 +2696,7 @@ void MySQL_Thread::refresh_variables() {
MySQL_Thread::MySQL_Thread() {
efd=-1;
epoll_thread=false;
spinlock_rwlock_init(&thread_mutex);
mysess_idx=0;
// mypolls.len=0;
@ -3203,6 +3219,13 @@ void MySQL_Threads_Handler::signal_all_threads(unsigned char _c) {
proxy_error("Error during write in signal_all_threads()\n");
}
}
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
proxy_error("Error during write in signal_all_threads()\n");
}
}
}
bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) {
@ -3213,6 +3236,10 @@ bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
spin_wrlock(&thr->thread_mutex);
}
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker;
spin_wrlock(&thr->thread_mutex);
}
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
unsigned int j;
@ -3225,11 +3252,27 @@ bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) {
}
}
}
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker;
unsigned int j;
for (j=0; j<thr->mysql_sessions->len; j++) {
MySQL_Session *sess=(MySQL_Session *)thr->mysql_sessions->pdata[j];
if (sess->thread_session_id==_thread_session_id) {
sess->killed=true;
ret=true;
goto __exit_kill_session;
}
}
}
__exit_kill_session:
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
spin_wrunlock(&thr->thread_mutex);
}
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker;
spin_wrunlock(&thr->thread_mutex);
}
return ret;
}
@ -3386,6 +3429,13 @@ unsigned long long MySQL_Threads_Handler::get_mysql_frontend_buffers_bytes() {
q+=__sync_fetch_and_add(&thr->status_variables.mysql_frontend_buffers_bytes,0);
}
}
for (i=0;i<num_threads;i++) {
if (mysql_threads_idles) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.mysql_frontend_buffers_bytes,0);
}
}
return q;
}
@ -3398,6 +3448,11 @@ unsigned long long MySQL_Threads_Handler::get_mysql_session_internal_bytes() {
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.mysql_session_internal_bytes,0);
}
if (mysql_threads_idles) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.mysql_session_internal_bytes,0);
}
}
return q;
}

@ -270,7 +270,7 @@ void ProxySQL_Main_init_Query_module() {
void ProxySQL_Main_init_MySQL_Threads_Handler_module() {
unsigned int i;
GloMTH->init();
load_ = GloMTH->num_threads + 1;
load_ = GloMTH->num_threads * 2 + 1;
for (i=0; i<GloMTH->num_threads; i++) {
GloMTH->create_thread(i,mysql_worker_thread_func);
}

Loading…
Cancel
Save