Simplifying MySQL_Thread::run() phase 3

pull/2651/head
René Cannaò 6 years ago
parent 64bc42a122
commit 72912f0b88

@ -100,13 +100,15 @@ class MySQL_Thread
#ifdef IDLE_THREADS
void worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr);
void worker_threads_get_sessions_from_idle_threads();
void worker_thread_gets_sessions_from_idle_thread();
void idle_thread_gets_sessions_from_worker_thread();
void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr);
void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr);
#endif // IDLE_THREADS
unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess);
bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n);
bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n);
protected:
int nfds;

@ -3814,23 +3814,7 @@ void MySQL_Thread::run() {
__run_skip_1:
if (idle_maintenance_thread) {
pthread_mutex_lock(&myexchange.mutex_idles);
while (myexchange.idle_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
// add in epoll()
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.data.u32=mysess->thread_session_id;
event.events = EPOLLIN;
epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event);
// we map thread_id -> position in mysql_session (end of the list)
sessmap[mysess->thread_session_id]=mysql_sessions->len-1;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx);
}
pthread_mutex_unlock(&myexchange.mutex_idles);
idle_thread_gets_sessions_from_worker_thread();
goto __run_skip_1a;
}
#endif // IDLE_THREADS
@ -3927,17 +3911,7 @@ __mysql_thread_exit_add_mirror:
}
}
if (myds->myds_type==MYDS_BACKEND) {
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) {
mypolls.fds[n].events = 0;
}
}
set_backend_to_be_skipped_if_frontend_is_slow(myds, n);
}
}
}
@ -3950,7 +3924,7 @@ __mysql_thread_exit_add_mirror:
int r=rand()%(GloMTH->num_threads);
MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker;
worker_thread_assigns_sessions_to_idle_thread(thr);
worker_threads_get_sessions_from_idle_threads();
worker_thread_gets_sessions_from_idle_thread();
}
}
@ -4238,6 +4212,7 @@ __run_skip_1a:
#ifdef IDLE_THREADS
__run_skip_2:
if (GloVars.global.idle_threads && idle_maintenance_thread) {
// this is an idle thread
unsigned int w=rand()%(GloMTH->num_threads);
MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
if (resume_mysql_sessions->len) {
@ -4256,6 +4231,7 @@ __run_skip_2:
#endif // IDLE_THREADS
}
}
// end of ::run()
unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) {
int i=0;
@ -4323,8 +4299,7 @@ void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *t
}
}
void MySQL_Thread::worker_threads_get_sessions_from_idle_threads() {
worker_threads_get_sessions_from_idle_threads();
void MySQL_Thread::worker_thread_gets_sessions_from_idle_thread() {
pthread_mutex_lock(&myexchange.mutex_resumes);
if (myexchange.resume_mysql_sessions->len) {
//unsigned int maxsess=GloMTH->resume_mysql_sessions->len;
@ -6779,3 +6754,39 @@ bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds,
}
return false;
}
bool MySQL_Thread::set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n) {
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*4) {
mypolls.fds[n].events = 0;
return true;
}
}
return false;
}
void MySQL_Thread::idle_thread_gets_sessions_from_worker_thread() {
pthread_mutex_lock(&myexchange.mutex_idles);
while (myexchange.idle_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
// add in epoll()
struct epoll_event event;
memset(&event,0,sizeof(event)); // let's make valgrind happy
event.data.u32=mysess->thread_session_id;
event.events = EPOLLIN;
epoll_ctl (efd, EPOLL_CTL_ADD, myds->fd, &event);
// we map thread_id -> position in mysql_session (end of the list)
sessmap[mysess->thread_session_id]=mysql_sessions->len-1;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx);
}
pthread_mutex_unlock(&myexchange.mutex_idles);
}

Loading…
Cancel
Save