Simplifying MySQL_Thread::run() phase 1

pull/2651/head
René Cannaò 6 years ago
parent cdbf70817a
commit 75606bb94d

@ -263,6 +263,7 @@ class MySQL_Session
void generate_proxysql_internal_session_json(json &);
bool known_query_for_locked_on_hostgroup(uint64_t);
void unable_to_parse_set_statement(bool *);
bool has_any_backend();
};
#define KILL_QUERY 1

@ -98,6 +98,14 @@ class MySQL_Thread
Session_Regex **match_regexes;
#ifdef IDLE_THREADS
void worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr);
void worker_threads_get_sessions_from_idle_threads();
#endif // IDLE_THREADS
unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess);
bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n);
protected:
int nfds;

@ -7226,3 +7226,13 @@ void MySQL_Session::unable_to_parse_set_statement(bool *lock_hostgroup) {
}
}
bool MySQL_Session::has_any_backend() {
for (unsigned int j=0;j < mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)mybes->index(j);
MySQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (__myds->myconn) {
return true;
}
}
return false;
}

@ -3879,39 +3879,9 @@ __mysql_thread_exit_add_mirror:
// here we try to move it to the maintenance thread
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ;
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
// make sure data stream has no pending data out and session is not throttled (#1939)
// because epoll thread does not handle data stream with data out
if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) {
unsigned int j;
int conns=0;
for (j=0;j<myds->sess->mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
MySQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (__myds->myconn) {
conns++;
}
}
unsigned long long idle_since = curtime - myds->sess->IdleTime();
if (conns==0) {
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i;
for (i=0;i<mysql_sessions->len;i++) {
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i);
if (mysess==myds->sess) {
mysess->thread=NULL;
unregister_session(i);
mysess->idle_since = idle_since;
idle_mysql_sessions->add(mysess);
break;
}
}
n--; // compensate mypolls.remove_index_fast(n) and n++ of loop
continue;
}
}
if (move_session_to_idle_mysql_sessions(myds, n)) {
n--; // compensate mypolls.remove_index_fast(n) and n++ of loop
continue;
}
}
}
@ -3979,37 +3949,8 @@ __mysql_thread_exit_add_mirror:
if (idle_maintenance_thread==false) {
int r=rand()%(GloMTH->num_threads);
MySQL_Thread *thr=GloMTH->mysql_threads_idles[r].worker;
if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) {
pthread_mutex_lock(&thr->myexchange.mutex_idles);
bool empty_queue=true;
if (thr->myexchange.idle_mysql_sessions->len) {
// there are already sessions in the queues. We assume someone already notified worker 0
empty_queue=false;
}
while (idle_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0);
thr->myexchange.idle_mysql_sessions->add(mysess);
}
pthread_mutex_unlock(&thr->myexchange.mutex_idles);
if (empty_queue==true) {
unsigned char c=1;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
}
pthread_mutex_lock(&myexchange.mutex_resumes);
if (myexchange.resume_mysql_sessions->len) {
//unsigned int maxsess=GloMTH->resume_mysql_sessions->len;
while (myexchange.resume_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
}
}
pthread_mutex_unlock(&myexchange.mutex_resumes);
worker_thread_assigns_sessions_to_idle_thread(thr);
worker_threads_get_sessions_from_idle_threads();
}
}
@ -4342,6 +4283,58 @@ __run_skip_2:
}
}
unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) {
int i=0;
for (i=0;i<mysql_sessions->len;i++) {
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i);
if (mysess==sess) {
return i;
}
}
return i;
}
#ifdef IDLE_THREADS
void MySQL_Thread::worker_thread_assigns_sessions_to_idle_thread(MySQL_Thread *thr) {
if (shutdown==0 && thr->shutdown==0 && idle_mysql_sessions->len) {
pthread_mutex_lock(&thr->myexchange.mutex_idles);
bool empty_queue=true;
if (thr->myexchange.idle_mysql_sessions->len) {
// there are already sessions in the queues. We assume someone already notified worker 0
empty_queue=false;
}
while (idle_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)idle_mysql_sessions->remove_index_fast(0);
thr->myexchange.idle_mysql_sessions->add(mysess);
}
pthread_mutex_unlock(&thr->myexchange.mutex_idles);
if (empty_queue==true) {
unsigned char c=1;
int fd=thr->pipefd[1];
if (write(fd,&c,1)==-1) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
}
}
void MySQL_Thread::worker_threads_get_sessions_from_idle_threads() {
worker_threads_get_sessions_from_idle_threads();
pthread_mutex_lock(&myexchange.mutex_resumes);
if (myexchange.resume_mysql_sessions->len) {
//unsigned int maxsess=GloMTH->resume_mysql_sessions->len;
while (myexchange.resume_mysql_sessions->len) {
MySQL_Session *mysess=(MySQL_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
}
}
pthread_mutex_unlock(&myexchange.mutex_resumes);
}
#endif // IDLE_THREADS
bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
if (mypolls.fds[n].revents) {
#ifdef IDLE_THREADS
@ -6748,3 +6741,36 @@ void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) {
}
}
}
bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n) {
unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ;
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
// make sure data stream has no pending data out and session is not throttled (#1939)
// because epoll thread does not handle data stream with data out
if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) {
//unsigned int j;
bool has_backends = myds->sess->has_any_backend();
/*
for (j=0;j<myds->sess->mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
MySQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (__myds->myconn) {
conns++;
}
}
*/
if (has_backends==false) {
unsigned long long idle_since = curtime - myds->sess->IdleTime();
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i = find_session_idx_in_mysql_sessions(myds->sess);
myds->sess->thread=NULL;
unregister_session(i);
myds->sess->idle_since = idle_since;
idle_mysql_sessions->add(myds->sess);
return true;
}
}
}
return false;
}

Loading…
Cancel
Save