From 9c2750027d1d1005d5a544304c0041eb5d7c1fca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 6 Apr 2020 23:41:53 +0200 Subject: [PATCH] Simplifying MySQL_Thread::run() phase 5 --- include/MySQL_Thread.h | 8 ++ lib/MySQL_Thread.cpp | 257 +++++++++++++++++++++++------------------ 2 files changed, 153 insertions(+), 112 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 072607973..d73ac709e 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -104,12 +104,20 @@ class MySQL_Thread void idle_thread_gets_sessions_from_worker_thread(); void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr); void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr); + void idle_thread_prepares_session_to_send_to_worker_thread(int i); #endif // IDLE_THREADS unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n); void handle_mirror_queue_mysql_sessions(); + void handle_kill_queues(); + void check_timing_out_session(unsigned int n); + void check_for_invalid_fd(unsigned int n); + void read_one_byte_from_pipe(unsigned int n); + void tune_timeout_for_myds_needs_pause(MySQL_Data_Stream *myds); + void tune_timeout_for_session_needs_pause(MySQL_Data_Stream *myds); + void configure_pollout(MySQL_Data_Stream *myds, unsigned int n); protected: int nfds; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index fd83afad0..31ddc97d8 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3841,47 +3841,16 @@ __run_skip_1: } #endif // IDLE_THREADS if (unlikely(myds->wait_until)) { - if (myds->wait_until > curtime) { - if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) { - mypolls.poll_timeout= myds->wait_until - curtime; - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , wait_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->wait_until, curtime); - } - } + tune_timeout_for_myds_needs_pause(myds); } if (myds->sess) { if (unlikely(myds->sess->pause_until > 0)) { - if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) { - mypolls.poll_timeout= myds->sess->pause_until - curtime; - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , pause_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->pause_until, curtime); - } + tune_timeout_for_session_needs_pause(myds); } } myds->revents=0; if (myds->myds_type!=MYDS_LISTENER) { - if (myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) { - myds->set_pollout(); - } else { - if (myds->DSS > STATE_MARIADB_BEGIN && myds->DSS < STATE_MARIADB_END) { - mypolls.fds[n].events = POLLIN; - if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE) - mypolls.fds[n].events |= POLLOUT; - } else { - myds->set_pollout(); - } - } - if (unlikely(myds->sess->pause_until > curtime)) { - if (myds->myds_type==MYDS_FRONTEND) { - myds->remove_pollout(); - } - if (myds->myds_type==MYDS_BACKEND) { - if (mysql_thread___throttle_ratio_server_to_client) { - mypolls.fds[n].events = 0; - } - } - } - if (myds->myds_type==MYDS_BACKEND) { - set_backend_to_be_skipped_if_frontend_is_slow(myds, n); - } + configure_pollout(myds, n); } } proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); @@ -3972,12 +3941,7 @@ __run_skip_1a: maintenance_loop=false; } - pthread_mutex_lock(&kq.m); - if (kq.conn_ids.size() + kq.query_ids.size()) { - Scan_Sessions_to_Kill_All(); - maintenance_loop=true; - } - pthread_mutex_unlock(&kq.m); + handle_kill_queues(); // update polls statistics mypolls.loops++; @@ -4027,31 +3991,10 @@ __run_skip_1a: int i; for (i=0; iindex(sess_pos); - MySQL_Data_Stream *tmp_myds=mysess->client_myds; - int dsidx=tmp_myds->poll_fds_idx; - //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); - mypolls.remove_index_fast(dsidx); - tmp_myds->mypolls=NULL; - mysess->thread=NULL; - // we first delete the association in sessmap - sessmap.erase(mysess->thread_session_id); - if (mysql_sessions->len > 1) { - // take the last element and adjust the map - MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); - if (mysess->thread_session_id != mysess_last->thread_session_id) - sessmap[mysess_last->thread_session_id]=sess_pos; - } - unregister_session(sess_pos); - resume_mysql_sessions->add(mysess); - epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); - } + idle_thread_prepares_session_to_send_to_worker_thread(i); } } + // FIXME: this loop seems suboptimal, it can be combined with the previous one for (i=0; isess) { - if (_myds->wait_until && curtime > _myds->wait_until) { - // timeout - _myds->sess->to_process=1; - } else { - if (_myds->sess->pause_until && curtime > _myds->sess->pause_until) { - // timeout - _myds->sess->to_process=1; - } - } - } + check_timing_out_session(n); } } else { - // check if the FD is valid - if (mypolls.fds[n].revents==POLLNVAL) { - // debugging output before assert - MySQL_Data_Stream *_myds=mypolls.myds[n]; - if (_myds) { - if (_myds->myconn) { - proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, myds->fd, myds->myconn->fd); - assert(mypolls.fds[n].revents!=POLLNVAL); - } - } - // if we reached her, we didn't assert() yet - proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, myds->fd); - assert(mypolls.fds[n].revents!=POLLNVAL); - } + check_for_invalid_fd(n); // this is designed to assert in case of failure switch(myds->myds_type) { - // Note: this logic that was here was removed completely because we added mariadb client library. + // Note: this logic that was here was removed completely because we added mariadb client library. case MYDS_LISTENER: // we got a new connection! listener_handle_new_connection(myds,n); @@ -4175,7 +4075,7 @@ __run_skip_1a: if (rc==false) { n--; } - } + } } #ifdef IDLE_THREADS @@ -4214,6 +4114,32 @@ unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *ses } #ifdef IDLE_THREADS +void MySQL_Thread::idle_thread_prepares_session_to_send_to_worker_thread(int i) { + // NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it + if (events[i].events) { + uint32_t sess_thr_id=events[i].data.u32; + uint32_t sess_pos=sessmap[sess_thr_id]; + MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos); + MySQL_Data_Stream *tmp_myds=mysess->client_myds; + int dsidx=tmp_myds->poll_fds_idx; + //fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx); + mypolls.remove_index_fast(dsidx); + tmp_myds->mypolls=NULL; + mysess->thread=NULL; + // we first delete the association in sessmap + sessmap.erase(mysess->thread_session_id); + if (mysql_sessions->len > 1) { + // take the last element and adjust the map + MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1); + if (mysess->thread_session_id != mysess_last->thread_session_id) + sessmap[mysess_last->thread_session_id]=sess_pos; + } + unregister_session(sess_pos); + resume_mysql_sessions->add(mysess); + epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL); + } +} + void MySQL_Thread::idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr) { pthread_mutex_lock(&thr->myexchange.mutex_resumes); if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) { @@ -6797,3 +6723,110 @@ void MySQL_Thread::handle_mirror_queue_mysql_sessions() { } } } + +void MySQL_Thread::handle_kill_queues() { + pthread_mutex_lock(&kq.m); + if (kq.conn_ids.size() + kq.query_ids.size()) { + Scan_Sessions_to_Kill_All(); + maintenance_loop=true; + } + pthread_mutex_unlock(&kq.m); +} + +void MySQL_Thread::check_timing_out_session(unsigned int n) { + // FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout + // check for timeout + // no events. This section is copied from process_data_on_data_stream() + MySQL_Data_Stream *_myds=mypolls.myds[n]; + if (_myds && _myds->sess) { + if (_myds->wait_until && curtime > _myds->wait_until) { + // timeout + _myds->sess->to_process=1; + } else { + if (_myds->sess->pause_until && curtime > _myds->sess->pause_until) { + // timeout + _myds->sess->to_process=1; + } + } + } +} + +void MySQL_Thread::check_for_invalid_fd(unsigned int n) { + // check if the FD is valid + if (mypolls.fds[n].revents==POLLNVAL) { + // debugging output before assert + MySQL_Data_Stream *_myds=mypolls.myds[n]; + if (_myds) { + if (_myds->myconn) { + proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd, _myds->myconn->fd); + assert(mypolls.fds[n].revents!=POLLNVAL); + } + } + // if we reached her, we didn't assert() yet + proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd); + assert(mypolls.fds[n].revents!=POLLNVAL); + } +} + +void MySQL_Thread::read_one_byte_from_pipe(unsigned int n) { + if (mypolls.fds[n].revents) { + unsigned char c; + if (read(mypolls.fds[n].fd, &c, 1)==-1) {// read just one byte + proxy_error("Error during read from signal_all_threads()\n"); + } + proxy_debug(PROXY_DEBUG_GENERIC,3, "Got signal from admin , done nothing\n"); + //fprintf(stderr,"Got signal from admin , done nothing\n"); // FIXME: this is just the skeleton for issue #253 + if (c) { + // we are being signaled to sleep for some ms. Before going to sleep we also release the mutex + pthread_mutex_unlock(&thread_mutex); + usleep(c*1000); + pthread_mutex_lock(&thread_mutex); + // we enter in maintenance loop only if c is set + // when threads are signaling each other, there is no need to set maintenance_loop + maintenance_loop=true; + } + } +} + +void MySQL_Thread::tune_timeout_for_myds_needs_pause(MySQL_Data_Stream *myds) { + if (myds->wait_until > curtime) { + if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) { + mypolls.poll_timeout= myds->wait_until - curtime; + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , wait_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->wait_until, curtime); + } + } +} + +void MySQL_Thread::tune_timeout_for_session_needs_pause(MySQL_Data_Stream *myds) { + if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) { + mypolls.poll_timeout= myds->sess->pause_until - curtime; + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , pause_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->sess->pause_until, curtime); + } +} + +void MySQL_Thread::configure_pollout(MySQL_Data_Stream *myds, unsigned int n) { + if (myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) { + myds->set_pollout(); + } else { + if (myds->DSS > STATE_MARIADB_BEGIN && myds->DSS < STATE_MARIADB_END) { + mypolls.fds[n].events = POLLIN; + if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE) + mypolls.fds[n].events |= POLLOUT; + } else { + myds->set_pollout(); + } + } + if (unlikely(myds->sess->pause_until > curtime)) { + if (myds->myds_type==MYDS_FRONTEND) { + myds->remove_pollout(); + } + if (myds->myds_type==MYDS_BACKEND) { + if (mysql_thread___throttle_ratio_server_to_client) { + mypolls.fds[n].events = 0; + } + } + } + if (myds->myds_type==MYDS_BACKEND) { + set_backend_to_be_skipped_if_frontend_is_slow(myds, n); + } +}