Splitting Thread::Run ito multiple functions

This allowed also to identify few bottleneck and rewrite some code
v2.0-lab
René Cannaò 8 years ago
parent bdca8ee6bb
commit e95d1cd0de

@ -61,8 +61,10 @@ class ProxySQL_Poll {
public: public:
unsigned int poll_timeout; unsigned int poll_timeout;
#ifdef DEBUG
unsigned long loops; unsigned long loops;
StatCounters *loop_counters; StatCounters *loop_counters;
#endif // DEBUG
unsigned int len; unsigned int len;
unsigned int size; unsigned int size;
struct pollfd *fds; struct pollfd *fds;
@ -73,13 +75,15 @@ class ProxySQL_Poll {
volatile int pending_listener_del; volatile int pending_listener_del;
ProxySQL_Poll() { ProxySQL_Poll() {
#ifdef DEBUG
#ifdef PROXYSQL_STATSCOUNTERS_NOLOCK #ifdef PROXYSQL_STATSCOUNTERS_NOLOCK
loop_counters=new StatCounters(15,10); loop_counters=new StatCounters(15,10);
#else #else
loop_counters=new StatCounters(15,10,false); loop_counters=new StatCounters(15,10,false);
#endif #endif
poll_timeout=0;
loops=0; loops=0;
#endif // DEBUG
poll_timeout=0;
len=0; len=0;
pending_listener_add=0; pending_listener_add=0;
pending_listener_del=0; pending_listener_del=0;
@ -103,7 +107,16 @@ class ProxySQL_Poll {
free(fds); free(fds);
free(last_recv); free(last_recv);
free(last_sent); free(last_sent);
delete loop_counters; #ifdef DEBUG
delete loop_counters;
if (loops) {
unsigned long long last_loops_at=monotonic_time();
unsigned long long elapsed_time = last_loops_at-first_loops_at;
elapsed_time /= 1000;
float lpms = loops/elapsed_time;
proxy_info("Loops/ms = %f\n", lpms);
}
#endif // DEBUG
}; };
void add(uint32_t _events, int _fd, MySQL_Data_Stream *_myds, unsigned long long sent_time) { void add(uint32_t _events, int _fd, MySQL_Data_Stream *_myds, unsigned long long sent_time) {
@ -187,7 +200,8 @@ class MySQL_Thread
unsigned long long curtime; unsigned long long curtime;
unsigned long long pre_poll_time; unsigned long long pre_poll_time;
unsigned long long last_maintenance_time; unsigned long long last_maintenance_time;
std::atomic<unsigned long long> atomic_curtime; //std::atomic<unsigned long long> atomic_curtime;
volatile unsigned long long atomic_curtime; // changed from atomic to volatile
PtrArray *mysql_sessions; PtrArray *mysql_sessions;
PtrArray *mirror_queue_mysql_sessions; PtrArray *mirror_queue_mysql_sessions;
PtrArray *mirror_queue_mysql_sessions_cache; PtrArray *mirror_queue_mysql_sessions_cache;
@ -231,7 +245,8 @@ class MySQL_Thread
unsigned long long ConnPool_get_conn_failure; unsigned long long ConnPool_get_conn_failure;
unsigned long long gtid_binlog_collected; unsigned long long gtid_binlog_collected;
unsigned long long gtid_session_collected; unsigned long long gtid_session_collected;
unsigned int active_transactions; volatile unsigned int active_transactions;
//std::atomic<unsigned int> active_transactions;
} status_variables; } status_variables;
struct { struct {
@ -264,6 +279,14 @@ class MySQL_Thread
MySQL_Connection * get_MyConn_local(unsigned int, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid); MySQL_Connection * get_MyConn_local(unsigned int, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid);
void push_MyConn_local(MySQL_Connection *); void push_MyConn_local(MySQL_Connection *);
void return_local_connections(); void return_local_connections();
void ping_idle_connections();
void process_mirror_sessions();
void MoveMydsToMaintenanceThreadIfNeeded(MySQL_Data_Stream *myds, unsigned int n);
void HandleEventFromEpoll(int i);
void ScanIdleClients();
void ProcessEventOnDataStream(MySQL_Data_Stream *myds, unsigned int n);
void MoveMydsBackToWorkerThreadIfNeeded();
void ComputeMinTimeToWait(MySQL_Data_Stream *myds);
}; };

@ -2473,44 +2473,7 @@ void MySQL_Thread::unregister_session(int idx) {
mysql_sessions->remove_index_fast(idx); mysql_sessions->remove_index_fast(idx);
} }
void MySQL_Thread::ping_idle_connections() {
// main loop
void MySQL_Thread::run() {
unsigned int n;
int rc;
#ifdef IDLE_THREADS
bool idle_maintenance_thread=epoll_thread;
if (idle_maintenance_thread) {
// we check if it is the first time we are called
if (efd==-1) {
efd = EPOLL_CREATE;
int fd=pipefd[0];
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.events = EPOLLIN;
event.data.u32=0; // special value to point to the pipe
epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event);
}
}
#endif // IDLE_THREADS
curtime=monotonic_time();
atomic_curtime=curtime;
#ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX
pthread_mutex_lock(&thread_mutex);
#else
spin_wrlock(&thread_mutex);
#endif
while (shutdown==0) {
#ifdef IDLE_THREADS
if (idle_maintenance_thread) {
goto __run_skip_1;
}
#endif // IDLE_THREADS
int num_idles; int num_idles;
if (processing_idles==true && (last_processing_idles < curtime-mysql_thread___ping_timeout_server*1000)) { if (processing_idles==true && (last_processing_idles < curtime-mysql_thread___ping_timeout_server*1000)) {
processing_idles=false; processing_idles=false;
@ -2546,66 +2509,285 @@ void MySQL_Thread::run() {
processing_idles=true; processing_idles=true;
last_processing_idles=curtime; last_processing_idles=curtime;
} }
}
#ifdef IDLE_THREADS void MySQL_Thread::process_mirror_sessions() {
__run_skip_1: while (mirror_queue_mysql_sessions->len) {
if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) {
if (idle_maintenance_thread) { __sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
pthread_mutex_lock(&myexchange.mutex_idles); //goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime
while (myexchange.idle_mysql_sessions->len) { return; // we can't add more mirror sessions at runtime
MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0); } else {
register_session(mysess, false); int idx;
MySQL_Data_Stream *myds=mysess->client_myds; idx=fastrand()%(mirror_queue_mysql_sessions->len);
mypolls.add(POLLIN, myds->fd, myds, monotonic_time()); MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx);
// add in epoll() register_session(newsess);
struct epoll_event event; newsess->handler(); // execute immediately
memset(&event,0,sizeof(event)); // let's make valgrind happy if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
event.data.u32=mysess->thread_session_id; unregister_session(mysql_sessions->len-1);
event.events = EPOLLIN; unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event); if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3;
// we map thread_id -> position in mysql_session (end of the list) if (mirror_queue_mysql_sessions_cache->len <= l) {
sessmap[mysess->thread_session_id]=mysql_sessions->len-1; bool to_cache=true;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx); if (newsess->mybe) {
if (newsess->mybe->server_myds) {
to_cache=false;
}
}
if (to_cache) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
mirror_queue_mysql_sessions_cache->add(newsess);
} else {
delete newsess;
}
} else {
delete newsess;
}
} }
pthread_mutex_unlock(&myexchange.mutex_idles); //newsess->to_process=0;
goto __run_skip_1a;
} }
#endif // IDLE_THREADS }
while (mirror_queue_mysql_sessions->len) { }
if (__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1) > (unsigned int)mysql_thread___mirror_max_concurrency ) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); void MySQL_Thread::MoveMydsToMaintenanceThreadIfNeeded(MySQL_Data_Stream *myds, unsigned int n) {
goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime // here we try to move it to the maintenance thread
} else { if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
int idx; if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
idx=fastrand()%(mirror_queue_mysql_sessions->len); unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ;
MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions->remove_index_fast(idx); if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
register_session(newsess); if (myds->sess->client_myds == myds && myds->PSarrayOUT->len==0 && (myds->queueOUT.head - myds->queueOUT.tail)==0 ) { // extra check
newsess->handler(); // execute immediately unsigned int j;
if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed int conns=0;
unregister_session(mysql_sessions->len-1); for (j=0;j<myds->sess->mybes->len;j++) {
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3; MySQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (mirror_queue_mysql_sessions_cache->len <= l) { if (__myds->myconn) {
bool to_cache=true; conns++;
if (newsess->mybe) {
if (newsess->mybe->server_myds) {
to_cache=false;
}
} }
if (to_cache) { }
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1); unsigned long long idle_since = curtime - myds->sess->IdleTime();
mirror_queue_mysql_sessions_cache->add(newsess); bool exit_cond=false;
} else { if (conns==0) {
delete newsess; mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i;
for (i=0;i<mysql_sessions->len;i++) {
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i);
if (mysess==myds->sess) {
mysess->thread=NULL;
unregister_session(i);
exit_cond=true;
mysess->idle_since = idle_since;
idle_mysql_sessions->add(mysess);
break;
}
} }
} else { }
delete newsess; if (exit_cond) {
//continue;
return;
} }
} }
//newsess->to_process=0;
} }
} }
__mysql_thread_exit_add_mirror: }
}
void MySQL_Thread::HandleEventFromEpoll(int i) {
if (events[i].data.u32) {
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if (events[i].events) {
uint32_t sess_thr_id=events[i].data.u32;
uint32_t sess_pos=sessmap[sess_thr_id];
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
}
}
void MySQL_Thread::ScanIdleClients() {
#define SESS_TO_SCAN 128
if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) {
mysess_idx=0;
}
unsigned int i;
unsigned long long min_idle = 0;
if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) {
min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000;
}
for (i=0;i<SESS_TO_SCAN && mysess_idx < mysql_sessions->len; i++) {
uint32_t sess_pos=mysess_idx;
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
if (mysess->idle_since < min_idle) {
mysess->killed=true;
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
mysess_idx++;
}
}
void MySQL_Thread::ProcessEventOnDataStream(MySQL_Data_Stream *myds, unsigned int n) {
if (myds && myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) {
mypolls.myds[n]->set_pollout();
} else {
if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
mypolls.myds[n]->set_pollout();
}
}
if (myds && myds->sess->pause_until > curtime) {
if (myds->myds_type==MYDS_FRONTEND) {
mypolls.myds[n]->remove_pollout();
}
if (myds->myds_type==MYDS_BACKEND) {
if (mysql_thread___throttle_ratio_server_to_client) {
mypolls.fds[n].events = 0;
}
}
}
if (myds && myds->myds_type==MYDS_BACKEND) {
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) {
mypolls.fds[n].events = 0;
}
}
}
}
void MySQL_Thread::ComputeMinTimeToWait(MySQL_Data_Stream *myds) {
if (myds->wait_until) {
if (myds->wait_until > curtime) {
//if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) {
if (myds->wait_until - curtime < mypolls.poll_timeout) {
//if (myds->wait_until < mypolls.abs_poll_timeout) {
// mypolls.abs_poll_timeout = myds->wait_until;
mypolls.poll_timeout= myds->wait_until - curtime;
}
}
}
if (myds->sess) {
if (myds->sess->pause_until > 0) {
//if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) {
if (myds->sess->pause_until - curtime < mypolls.poll_timeout) {
mypolls.poll_timeout= myds->sess->pause_until - curtime;
//if (myds->sess->pause_until < mypolls.abs_poll_timeout) {
// mypolls.abs_poll_timeout = myds->sess->pause_until;
}
}
}
}
// main loop
void MySQL_Thread::run() {
unsigned int n;
int rc;
#ifdef IDLE_THREADS
bool idle_maintenance_thread=epoll_thread;
if (idle_maintenance_thread) {
// we check if it is the first time we are called
if (efd==-1) {
efd = EPOLL_CREATE;
int fd=pipefd[0];
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.events = EPOLLIN;
event.data.u32=0; // special value to point to the pipe
epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event);
}
}
#endif // IDLE_THREADS
curtime=monotonic_time();
atomic_curtime=curtime;
#ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX
pthread_mutex_lock(&thread_mutex);
#else
spin_wrlock(&thread_mutex);
#endif
while (shutdown==0) {
#ifdef IDLE_THREADS
if (idle_maintenance_thread) {
goto __run_skip_1;
}
#endif // IDLE_THREADS
ping_idle_connections();
#ifdef IDLE_THREADS
__run_skip_1:
if (idle_maintenance_thread) {
pthread_mutex_lock(&myexchange.mutex_idles);
while (myexchange.idle_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
// add in epoll()
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.data.u32=mysess->thread_session_id;
event.events = EPOLLIN;
epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event);
// we map thread_id -> position in mysql_session (end of the list)
sessmap[mysess->thread_session_id]=mysql_sessions->len-1;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx);
}
pthread_mutex_unlock(&myexchange.mutex_idles);
goto __run_skip_1a;
}
#endif // IDLE_THREADS
process_mirror_sessions();
//__mysql_thread_exit_add_mirror:
mypolls.poll_timeout = mysql_thread___poll_timeout/1000; // predefined default value
//mypolls.abs_poll_timeout = curtime + mypolls.poll_timeout;
for (n = 0; n < mypolls.len; n++) { for (n = 0; n < mypolls.len; n++) {
MySQL_Data_Stream *myds=NULL; MySQL_Data_Stream *myds=NULL;
myds=mypolls.myds[n]; myds=mypolls.myds[n];
@ -2613,104 +2795,28 @@ __mysql_thread_exit_add_mirror:
if (myds) { if (myds) {
#ifdef IDLE_THREADS #ifdef IDLE_THREADS
if (GloVars.global.idle_threads) { if (GloVars.global.idle_threads) {
// here we try to move it to the maintenance thread MoveMydsToMaintenanceThreadIfNeeded(myds,n);
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ;
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
if (myds->sess->client_myds == myds && myds->PSarrayOUT->len==0 && (myds->queueOUT.head - myds->queueOUT.tail)==0 ) { // extra check
unsigned int j;
int conns=0;
for (j=0;j<myds->sess->mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
MySQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (__myds->myconn) {
conns++;
}
}
unsigned long long idle_since = curtime - myds->sess->IdleTime();
bool exit_cond=false;
if (conns==0) {
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i;
for (i=0;i<mysql_sessions->len;i++) {
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i);
if (mysess==myds->sess) {
mysess->thread=NULL;
unregister_session(i);
exit_cond=true;
mysess->idle_since = idle_since;
idle_mysql_sessions->add(mysess);
break;
}
}
}
if (exit_cond) {
continue;
}
}
}
}
}
} }
#endif // IDLE_THREADS #endif // IDLE_THREADS
if (myds->wait_until) { ComputeMinTimeToWait(myds);
if (myds->wait_until > curtime) {
if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->wait_until - curtime;
}
}
}
if (myds->sess) {
if (myds->sess->pause_until > 0) {
if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->sess->pause_until - curtime;
}
}
}
} }
if (myds) myds->revents=0; if (myds) myds->revents=0;
if (mypolls.myds[n] && mypolls.myds[n]->myds_type!=MYDS_LISTENER) { if (mypolls.myds[n] && mypolls.myds[n]->myds_type!=MYDS_LISTENER) {
if (myds && myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) { ProcessEventOnDataStream(myds,n);
mypolls.myds[n]->set_pollout();
} else {
if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
mypolls.myds[n]->set_pollout();
}
}
if (myds && myds->sess->pause_until > curtime) {
if (myds->myds_type==MYDS_FRONTEND) {
mypolls.myds[n]->remove_pollout();
}
if (myds->myds_type==MYDS_BACKEND) {
if (mysql_thread___throttle_ratio_server_to_client) {
mypolls.fds[n].events = 0;
}
}
}
if (myds && myds->myds_type==MYDS_BACKEND) {
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) {
mypolls.fds[n].events = 0;
}
}
}
} }
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
} }
/*
if (mypolls.abs_poll_timeout > curtime) {
mypolls.abs_poll_timeout -= curtime;
if (mypolls.abs_poll_timeout < mypolls.poll_timeout) {
mypolls.poll_timeout = mypolls.abs_poll_timeout;
}
} else {
// something went wrong
mypolls.poll_timeout = 0;
}
*/
#ifdef IDLE_THREADS #ifdef IDLE_THREADS
if (GloVars.global.idle_threads) { if (GloVars.global.idle_threads) {
if (idle_maintenance_thread==false) { if (idle_maintenance_thread==false) {
@ -2787,7 +2893,9 @@ __run_skip_1a:
//this is the only portion of code not protected by a global mutex //this is the only portion of code not protected by a global mutex
proxy_debug(PROXY_DEBUG_NET,5,"Calling poll with timeout %d\n", ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 > (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ); proxy_debug(PROXY_DEBUG_NET,5,"Calling poll with timeout %d\n", ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 > (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) );
// poll is called with a timeout of mypolls.poll_timeout if set , or mysql_thread___poll_timeout // poll is called with a timeout of mypolls.poll_timeout if set , or mysql_thread___poll_timeout
rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ); //rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) );
//rc=poll(mypolls.fds,mypolls.len, 0);
rc=poll(mypolls.fds,mypolls.len, mypolls.poll_timeout/1000);
proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Returning poll"); proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Returning poll");
#ifdef IDLE_THREADS #ifdef IDLE_THREADS
} }
@ -2831,9 +2939,15 @@ __run_skip_1a:
} }
// update polls statistics // update polls statistics
mypolls.loops++; #ifdef DEBUG
mypolls.loop_counters->incr(curtime/1000000); if (mypolls.loops==0 && rc > 0) {
mypolls.first_loops_at = curtime;
mypolls.loops++;
}
if (mypolls.loops)
mypolls.loops++;
//mypolls.loop_counters->incr(curtime/1000000);
#endif // DEBUG
if (maintenance_loop) { if (maintenance_loop) {
// house keeping // house keeping
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
@ -2856,9 +2970,9 @@ __run_skip_1a:
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) { // if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) {
refresh_variables(); // refresh_variables();
} // }
#ifdef IDLE_THREADS #ifdef IDLE_THREADS
if (idle_maintenance_thread==false) { if (idle_maintenance_thread==false) {
@ -2877,31 +2991,7 @@ __run_skip_1a:
if (rc) { if (rc) {
int i; int i;
for (i=0; i<rc; i++) { for (i=0; i<rc; i++) {
if (events[i].data.u32) { HandleEventFromEpoll(i);
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if (events[i].events) {
uint32_t sess_thr_id=events[i].data.u32;
uint32_t sess_pos=sessmap[sess_thr_id];
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
}
} }
for (i=0; i<rc; i++) { for (i=0; i<rc; i++) {
if (events[i].events == EPOLLIN && events[i].data.u32==0) { if (events[i].events == EPOLLIN && events[i].data.u32==0) {
@ -2915,40 +3005,7 @@ __run_skip_1a:
} }
} }
if (mysql_sessions->len && maintenance_loop) { if (mysql_sessions->len && maintenance_loop) {
#define SESS_TO_SCAN 128 ScanIdleClients();
if (mysess_idx + SESS_TO_SCAN > mysql_sessions->len) {
mysess_idx=0;
}
unsigned int i;
unsigned long long min_idle = 0;
if (curtime > (unsigned long long)mysql_thread___wait_timeout*1000) {
min_idle = curtime - (unsigned long long)mysql_thread___wait_timeout*1000;
}
for (i=0;i<SESS_TO_SCAN && mysess_idx < mysql_sessions->len; i++) {
uint32_t sess_pos=mysess_idx;
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
if (mysess->idle_since < min_idle) {
mysess->killed=true;
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
mysess_idx++;
}
} }
goto __run_skip_2; goto __run_skip_2;
} }
@ -3040,40 +3097,7 @@ __run_skip_1a:
#ifdef IDLE_THREADS #ifdef IDLE_THREADS
__run_skip_2: __run_skip_2:
if (GloVars.global.idle_threads && idle_maintenance_thread) { if (GloVars.global.idle_threads && idle_maintenance_thread) {
unsigned int w=rand()%(GloMTH->num_threads); MoveMydsBackToWorkerThreadIfNeeded();
MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
if (resume_mysql_sessions->len) {
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
unsigned int ims;
if (shutdown==0 && thr->shutdown==0)
for (ims=0; ims<resume_mysql_sessions->len; ims++) {
MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0);
thr->myexchange.resume_mysql_sessions->add(mysess);
}
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
{
unsigned char c=0;
//MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
} else {
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) {
unsigned char c=0;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
}
} else { } else {
#endif // IDLE_THREADS #endif // IDLE_THREADS
// iterate through all sessions and process the session logic // iterate through all sessions and process the session logic
@ -3086,6 +3110,43 @@ __run_skip_2:
} }
} }
void MySQL_Thread::MoveMydsBackToWorkerThreadIfNeeded() {
unsigned int w=rand()%(GloMTH->num_threads);
MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
if (resume_mysql_sessions->len) {
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
unsigned int ims;
if (shutdown==0 && thr->shutdown==0)
for (ims=0; ims<resume_mysql_sessions->len; ims++) {
MySQL_Session *mysess=(MySQL_Session *)resume_mysql_sessions->remove_index_fast(0);
thr->myexchange.resume_mysql_sessions->add(mysess);
}
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
{
unsigned char c=0;
//MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
} else {
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) {
unsigned char c=0;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
VALGRIND_DISABLE_ERROR_REPORTING;
pthread_mutex_unlock(&thr->myexchange.mutex_resumes);
VALGRIND_ENABLE_ERROR_REPORTING;
}
}
bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
if (mypolls.fds[n].revents) { if (mypolls.fds[n].revents) {
#ifdef IDLE_THREADS #ifdef IDLE_THREADS
@ -3310,8 +3371,9 @@ void MySQL_Thread::process_all_sessions() {
} }
} }
unsigned int total_active_transactions_tmp; unsigned int total_active_transactions_tmp;
total_active_transactions_tmp=__sync_add_and_fetch(&status_variables.active_transactions,0); status_variables.active_transactions = total_active_transactions_;
__sync_bool_compare_and_swap(&status_variables.active_transactions,total_active_transactions_tmp,total_active_transactions_); //total_active_transactions_tmp=__sync_add_and_fetch(&status_variables.active_transactions,0);
//__sync_bool_compare_and_swap(&status_variables.active_transactions,total_active_transactions_tmp,total_active_transactions_);
} }
void MySQL_Thread::refresh_variables() { void MySQL_Thread::refresh_variables() {
@ -4534,7 +4596,8 @@ unsigned int MySQL_Threads_Handler::get_active_transations() {
if (mysql_threads) { if (mysql_threads) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker; MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
if (thr) if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.active_transactions,0); //q+=__sync_fetch_and_add(&thr->status_variables.active_transactions,0);
q+= thr->status_variables.active_transactions; // using volatile
} }
} }
return q; return q;

Loading…
Cancel
Save