Splitting Thread::Run ito multiple functions

This allowed also to identify few bottleneck and rewrite some code
v2.0-lab
René Cannaò 8 years ago
parent bdca8ee6bb
commit e95d1cd0de

@ -61,8 +61,10 @@ class ProxySQL_Poll {
public:
unsigned int poll_timeout;
#ifdef DEBUG
unsigned long loops;
StatCounters *loop_counters;
#endif // DEBUG
unsigned int len;
unsigned int size;
struct pollfd *fds;
@ -73,13 +75,15 @@ class ProxySQL_Poll {
volatile int pending_listener_del;
ProxySQL_Poll() {
#ifdef DEBUG
#ifdef PROXYSQL_STATSCOUNTERS_NOLOCK
loop_counters=new StatCounters(15,10);
#else
loop_counters=new StatCounters(15,10,false);
#endif
poll_timeout=0;
loops=0;
#endif // DEBUG
poll_timeout=0;
len=0;
pending_listener_add=0;
pending_listener_del=0;
@ -103,7 +107,16 @@ class ProxySQL_Poll {
free(fds);
free(last_recv);
free(last_sent);
delete loop_counters;
#ifdef DEBUG
delete loop_counters;
if (loops) {
unsigned long long last_loops_at=monotonic_time();
unsigned long long elapsed_time = last_loops_at-first_loops_at;
elapsed_time /= 1000;
float lpms = loops/elapsed_time;
proxy_info("Loops/ms = %f\n", lpms);
}
#endif // DEBUG
};
void add(uint32_t _events, int _fd, MySQL_Data_Stream *_myds, unsigned long long sent_time) {
@ -187,7 +200,8 @@ class MySQL_Thread
unsigned long long curtime;
unsigned long long pre_poll_time;
unsigned long long last_maintenance_time;
std::atomic<unsigned long long> atomic_curtime;
//std::atomic<unsigned long long> atomic_curtime;
volatile unsigned long long atomic_curtime; // changed from atomic to volatile
PtrArray *mysql_sessions;
PtrArray *mirror_queue_mysql_sessions;
PtrArray *mirror_queue_mysql_sessions_cache;
@ -231,7 +245,8 @@ class MySQL_Thread
unsigned long long ConnPool_get_conn_failure;
unsigned long long gtid_binlog_collected;
unsigned long long gtid_session_collected;
unsigned int active_transactions;
volatile unsigned int active_transactions;
//std::atomic<unsigned int> active_transactions;
} status_variables;
struct {
@ -264,6 +279,14 @@ class MySQL_Thread
MySQL_Connection * get_MyConn_local(unsigned int, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid);
void push_MyConn_local(MySQL_Connection *);
void return_local_connections();
void ping_idle_connections();
void process_mirror_sessions();
void MoveMydsToMaintenanceThreadIfNeeded(MySQL_Data_Stream *myds, unsigned int n);
void HandleEventFromEpoll(int i);
void ScanIdleClients();
void ProcessEventOnDataStream(MySQL_Data_Stream *myds, unsigned int n);
void MoveMydsBackToWorkerThreadIfNeeded();
void ComputeMinTimeToWait(MySQL_Data_Stream *myds);
};

@ -2473,44 +2473,7 @@ void MySQL_Thread::unregister_session(int idx) {
mysql_sessions->remove_index_fast(idx);
}
// main loop
void MySQL_Thread::run() {
unsigned int n;
int rc;
#ifdef IDLE_THREADS
bool idle_maintenance_thread=epoll_thread;
if (idle_maintenance_thread) {
// we check if it is the first time we are called
if (efd==-1) {
efd = EPOLL_CREATE;
int fd=pipefd[0];
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.events = EPOLLIN;
event.data.u32=0; // special value to point to the pipe
epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event);
}
}
#endif // IDLE_THREADS
curtime=monotonic_time();
atomic_curtime=curtime;
#ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX
pthread_mutex_lock(&thread_mutex);
#else
spin_wrlock(&thread_mutex);
#endif
while (shutdown==0) {
#ifdef IDLE_THREADS
if (idle_maintenance_thread) {
goto __run_skip_1;
}
#endif // IDLE_THREADS
void MySQL_Thread::ping_idle_connections() {
int num_idles;
if (processing_idles==true && (last_processing_idles < curtime-mysql_thread___ping_timeout_server*1000)) {
processing_idles=false;
@ -2546,66 +2509,285 @@ void MySQL_Thread::run() {
processing_idles=true;
last_processing_idles=curtime;
}
}
#ifdef IDLE_THREADS
__run_skip_1:
if (idle_maintenance_thread) {
pthread_mutex_lock(&myexchange.mutex_idles);
while (myexchange.idle_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
// add in epoll()
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.data.u32=mysess->thread_session_id;
event.events = EPOLLIN;
epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event);
// we map thread_id -> position in mysql_session (end of the list)
sessmap[mysess->thread_session_id]=mysql_sessions->len-1;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx);
void MySQL_Thread::process_mirror_sessions() {
while (mirror_queue_mysql_sessions->len) {
if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
//goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime
return; // we can't add more mirror sessions at runtime
} else {
int idx;
idx=fastrand()%(mirror_queue_mysql_sessions->len);
MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx);
register_session(newsess);
newsess->handler(); // execute immediately
if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
unregister_session(mysql_sessions->len-1);
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3;
if (mirror_queue_mysql_sessions_cache->len <= l) {
bool to_cache=true;
if (newsess->mybe) {
if (newsess->mybe->server_myds) {
to_cache=false;
}
}
if (to_cache) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
mirror_queue_mysql_sessions_cache->add(newsess);
} else {
delete newsess;
}
} else {
delete newsess;
}
}
pthread_mutex_unlock(&myexchange.mutex_idles);
goto __run_skip_1a;
//newsess->to_process=0;
}
#endif // IDLE_THREADS
while (mirror_queue_mysql_sessions->len) {
if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime
} else {
int idx;
idx=fastrand()%(mirror_queue_mysql_sessions->len);
MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx);
register_session(newsess);
newsess->handler(); // execute immediately
if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
unregister_session(mysql_sessions->len-1);
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3;
if (mirror_queue_mysql_sessions_cache->len <= l) {
bool to_cache=true;
if (newsess->mybe) {
if (newsess->mybe->server_myds) {
to_cache=false;
}
}
}
void MySQL_Thread::MoveMydsToMaintenanceThreadIfNeeded(MySQL_Data_Stream *myds, unsigned int n) {
// here we try to move it to the maintenance thread
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ;
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
if (myds->sess->client_myds == myds && myds->PSarrayOUT->len==0 && (myds->queueOUT.head - myds->queueOUT.tail)==0 ) { // extra check
unsigned int j;
int conns=0;
for (j=0;j<myds->sess->mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
MySQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (__myds->myconn) {
conns++;
}
if (to_cache) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
mirror_queue_mysql_sessions_cache->add(newsess);
} else {
delete newsess;
}
unsigned long long idle_since = curtime - myds->sess->IdleTime();
bool exit_cond=false;
if (conns==0) {
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i;
for (i=0;i<mysql_sessions->len;i++) {
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i);
if (mysess==myds->sess) {
mysess->thread=NULL;
unregister_session(i);
exit_cond=true;
mysess->idle_since = idle_since;
idle_mysql_sessions->add(mysess);
break;
}
}
} else {
delete newsess;
}
if (exit_cond) {
//continue;
return;
}
}
//newsess->to_process=0;
}
}
__mysql_thread_exit_add_mirror:
}
}
void MySQL_Thread::HandleEventFromEpoll(int i) {
if (events[i].data.u32) {
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if (events[i].events) {
uint32_t sess_thr_id=events[i].data.u32;
uint32_t sess_pos=sessmap[sess_thr_id];
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
}
}
void MySQL_Thread::ScanIdleClients() {
#define SESS_TO_SCAN 128
if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) {
mysess_idx=0;
}
unsigned int i;
unsigned long long min_idle = 0;
if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) {
min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000;
}
for (i=0;i<SESS_TO_SCAN && mysess_idx < mysql_sessions->len; i++) {
uint32_t sess_pos=mysess_idx;
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
if (mysess->idle_since < min_idle) {
mysess->killed=true;
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
mysess_idx++;
}
}
void MySQL_Thread::ProcessEventOnDataStream(MySQL_Data_Stream *myds, unsigned int n) {
if (myds && myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) {
mypolls.myds[n]->set_pollout();
} else {
if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
mypolls.myds[n]->set_pollout();
}
}
if (myds && myds->sess->pause_until > curtime) {
if (myds->myds_type==MYDS_FRONTEND) {
mypolls.myds[n]->remove_pollout();
}
if (myds->myds_type==MYDS_BACKEND) {
if (mysql_thread___throttle_ratio_server_to_client) {
mypolls.fds[n].events = 0;
}
}
}
if (myds && myds->myds_type==MYDS_BACKEND) {
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) {
mypolls.fds[n].events = 0;
}
}
}
}
void MySQL_Thread::ComputeMinTimeToWait(MySQL_Data_Stream *myds) {
if (myds->wait_until) {
if (myds->wait_until > curtime) {
//if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) {
if (myds->wait_until - curtime < mypolls.poll_timeout) {
//if (myds->wait_until < mypolls.abs_poll_timeout) {
// mypolls.abs_poll_timeout = myds->wait_until;
mypolls.poll_timeout= myds->wait_until - curtime;
}
}
}
if (myds->sess) {
if (myds->sess->pause_until > 0) {
//if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) {
if (myds->sess->pause_until - curtime < mypolls.poll_timeout) {
mypolls.poll_timeout= myds->sess->pause_until - curtime;
//if (myds->sess->pause_until < mypolls.abs_poll_timeout) {
// mypolls.abs_poll_timeout = myds->sess->pause_until;
}
}
}
}
// main loop
void MySQL_Thread::run() {
unsigned int n;
int rc;
#ifdef IDLE_THREADS
bool idle_maintenance_thread=epoll_thread;
if (idle_maintenance_thread) {
// we check if it is the first time we are called
if (efd==-1) {
efd = EPOLL_CREATE;
int fd=pipefd[0];
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.events = EPOLLIN;
event.data.u32=0; // special value to point to the pipe
epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event);
}
}
#endif // IDLE_THREADS
curtime=monotonic_time();
atomic_curtime=curtime;
#ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX
pthread_mutex_lock(&thread_mutex);
#else
spin_wrlock(&thread_mutex);
#endif
while (shutdown==0) {
#ifdef IDLE_THREADS
if (idle_maintenance_thread) {
goto __run_skip_1;
}
#endif // IDLE_THREADS
ping_idle_connections();
#ifdef IDLE_THREADS
__run_skip_1:
if (idle_maintenance_thread) {
pthread_mutex_lock(&myexchange.mutex_idles);
while (myexchange.idle_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
// add in epoll()
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.data.u32=mysess->thread_session_id;
event.events = EPOLLIN;
epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event);
// we map thread_id -> position in mysql_session (end of the list)
sessmap[mysess->thread_session_id]=mysql_sessions->len-1;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx);
}
pthread_mutex_unlock(&myexchange.mutex_idles);
goto __run_skip_1a;
}
#endif // IDLE_THREADS
process_mirror_sessions();
//__mysql_thread_exit_add_mirror:
mypolls.poll_timeout = mysql_thread___poll_timeout/1000; // predefined default value
//mypolls.abs_poll_timeout = curtime + mypolls.poll_timeout;
for (n = 0; n < mypolls.len; n++) {
MySQL_Data_Stream *myds=NULL;
myds=mypolls.myds[n];
@ -2613,104 +2795,28 @@ __mysql_thread_exit_add_mirror:
if (myds) {
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
// here we try to move it to the maintenance thread
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ;
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
if (myds->sess->client_myds == myds && myds->PSarrayOUT->len==0 && (myds->queueOUT.head - myds->queueOUT.tail)==0 ) { // extra check
unsigned int j;
int conns=0;
for (j=0;j<myds->sess->mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
MySQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (__myds->myconn) {
conns++;
}
}
unsigned long long idle_since = curtime - myds->sess->IdleTime();
bool exit_cond=false;
if (conns==0) {
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i;
for (i=0;i<mysql_sessions->len;i++) {
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i);
if (mysess==myds->sess) {
mysess->thread=NULL;
unregister_session(i);
exit_cond=true;
mysess->idle_since = idle_since;
idle_mysql_sessions->add(mysess);
break;
}
}
}
if (exit_cond) {
continue;
}
}
}
}
}
MoveMydsToMaintenanceThreadIfNeeded(myds,n);
}
#endif // IDLE_THREADS
if (myds->wait_until) {
if (myds->wait_until > curtime) {
if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->wait_until - curtime;
}
}
}
if (myds->sess) {
if (myds->sess->pause_until > 0) {
if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->sess->pause_until - curtime;
}
}
}
ComputeMinTimeToWait(myds);
}
if (myds) myds->revents=0;
if (mypolls.myds[n] && mypolls.myds[n]->myds_type!=MYDS_LISTENER) {
if (myds && myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) {
mypolls.myds[n]->set_pollout();
} else {
if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
mypolls.myds[n]->set_pollout();
}
}
if (myds && myds->sess->pause_until > curtime) {
if (myds->myds_type==MYDS_FRONTEND) {
mypolls.myds[n]->remove_pollout();
}
if (myds->myds_type==MYDS_BACKEND) {
if (mysql_thread___throttle_ratio_server_to_client) {
mypolls.fds[n].events = 0;
}
}
}
if (myds && myds->myds_type==MYDS_BACKEND) {
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) {
mypolls.fds[n].events = 0;
}
}
}
ProcessEventOnDataStream(myds,n);
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
}
/*
if (mypolls.abs_poll_timeout > curtime) {
mypolls.abs_poll_timeout -= curtime;
if (mypolls.abs_poll_timeout < mypolls.poll_timeout) {
mypolls.poll_timeout = mypolls.abs_poll_timeout;
}
} else {
// something went wrong
mypolls.poll_timeout = 0;
}
*/
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
if (idle_maintenance_thread==false) {
@ -2787,7 +2893,9 @@ __run_skip_1a:
//this is the only portion of code not protected by a global mutex
proxy_debug(PROXY_DEBUG_NET,5,"Calling poll with timeout %d\n", ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 > (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) );
// poll is called with a timeout of mypolls.poll_timeout if set , or mysql_thread___poll_timeout
rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) );
//rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) );
//rc=poll(mypolls.fds,mypolls.len, 0);
rc=poll(mypolls.fds,mypolls.len, mypolls.poll_timeout/1000);
proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Returning poll");
#ifdef IDLE_THREADS
}
@ -2831,9 +2939,15 @@ __run_skip_1a:
}
// update polls statistics
mypolls.loops++;
mypolls.loop_counters->incr(curtime/1000000);
#ifdef DEBUG
if (mypolls.loops==0 && rc > 0) {
mypolls.first_loops_at = curtime;
mypolls.loops++;
}
if (mypolls.loops)
mypolls.loops++;
//mypolls.loop_counters->incr(curtime/1000000);
#endif // DEBUG
if (maintenance_loop) {
// house keeping
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
@ -2856,9 +2970,9 @@ __run_skip_1a:
exit(EXIT_FAILURE);
}
if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) {
refresh_variables();
}
// if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) {
// refresh_variables();
// }
#ifdef IDLE_THREADS
if (idle_maintenance_thread==false) {
@ -2877,31 +2991,7 @@ __run_skip_1a:
if (rc) {
int i;
for (i=0; i<rc; i++) {
if (events[i].data.u32) {
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if (events[i].events) {
uint32_t sess_thr_id=events[i].data.u32;
uint32_t sess_pos=sessmap[sess_thr_id];
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
}
HandleEventFromEpoll(i);
}
for (i=0; i<rc; i++) {
if (events[i].events == EPOLLIN && events[i].data.u32==0) {
@ -2915,40 +3005,7 @@ __run_skip_1a:
}
}
if (mysql_sessions->len && maintenance_loop) {
#define SESS_TO_SCAN 128
if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) {
mysess_idx=0;
}
unsigned int i;
unsigned long long min_idle = 0;
if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) {
min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000;
}
for (i=0;i<SESS_TO_SCAN && mysess_idx < mysql_sessions->len; i++) {
uint32_t sess_pos=mysess_idx;
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
if (mysess->idle_since < min_idle) {
mysess->killed=true;
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
mysess_idx++;
}
ScanIdleClients();
}
goto __run_skip_2;
}
@ -3040,40 +3097,7 @@ __run_skip_1a:
#ifdef IDLE_THREADS
__run_skip_2:
if (GloVars.global.idle_threads && idle_maintenance_thread) {
unsigned int w=rand()%(GloMTH->num_threads);
MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
if (resume_mysql_sessions->len) {
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
unsigned int ims;
if (shutdown==0 && thr->shutdown==0)
for (ims=0; ims<resume_mysql_sessions->len; ims++) {
MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0);
thr->myexchange.resume_mysql_sessions->add(mysess);
}
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
{
unsigned char c=0;
//MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
} else {
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) {
unsigned char c=0;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
}
MoveMydsBackToWorkerThreadIfNeeded();
} else {
#endif // IDLE_THREADS
// iterate through all sessions and process the session logic
@ -3086,6 +3110,43 @@ __run_skip_2:
}
}
void MySQL_Thread::MoveMydsBackToWorkerThreadIfNeeded() {
unsigned int w=rand()%(GloMTH->num_threads);
MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
if (resume_mysql_sessions->len) {
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
unsigned int ims;
if (shutdown==0 && thr->shutdown==0)
for (ims=0; ims<resume_mysql_sessions->len; ims++) {
MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0);
thr->myexchange.resume_mysql_sessions->add(mysess);
}
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
{
unsigned char c=0;
//MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
} else {
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) {
unsigned char c=0;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
}
}
bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
if (mypolls.fds[n].revents) {
#ifdef IDLE_THREADS
@ -3310,8 +3371,9 @@ void MySQL_Thread::process_all_sessions() {
}
}
unsigned int total_active_transactions_tmp;
total_active_transactions_tmp=__sync_add_and_fetch(&status_variables.active_transactions,0);
__sync_bool_compare_and_swap(&status_variables.active_transactions,total_active_transactions_tmp,total_active_transactions_);
status_variables.active_transactions = total_active_transactions_;
//total_active_transactions_tmp=__sync_add_and_fetch(&status_variables.active_transactions,0);
//__sync_bool_compare_and_swap(&status_variables.active_transactions,total_active_transactions_tmp,total_active_transactions_);
}
void MySQL_Thread::refresh_variables() {
@ -4534,7 +4596,8 @@ unsigned int MySQL_Threads_Handler::get_active_transations() {
if (mysql_threads) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.active_transactions,0);
//q+=__sync_fetch_and_add(&thr->status_variables.active_transactions,0);
q+= thr->status_variables.active_transactions; // using volatile
}
}
return q;

Loading…
Cancel
Save