Simplifying MySQL_Thread::run() phase 5

pull/2651/head
René Cannaò 6 years ago
parent da21ca1b24
commit 9c2750027d

@ -104,12 +104,20 @@ class MySQL_Thread
void idle_thread_gets_sessions_from_worker_thread();
void idle_thread_assigns_sessions_to_worker_thread(MySQL_Thread *thr);
void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr);
void idle_thread_prepares_session_to_send_to_worker_thread(int i);
#endif // IDLE_THREADS
unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess);
bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n);
bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n);
void handle_mirror_queue_mysql_sessions();
void handle_kill_queues();
void check_timing_out_session(unsigned int n);
void check_for_invalid_fd(unsigned int n);
void read_one_byte_from_pipe(unsigned int n);
void tune_timeout_for_myds_needs_pause(MySQL_Data_Stream *myds);
void tune_timeout_for_session_needs_pause(MySQL_Data_Stream *myds);
void configure_pollout(MySQL_Data_Stream *myds, unsigned int n);
protected:
int nfds;

@ -3841,47 +3841,16 @@ __run_skip_1:
}
#endif // IDLE_THREADS
if (unlikely(myds->wait_until)) {
if (myds->wait_until > curtime) {
if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->wait_until - curtime;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , wait_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->wait_until, curtime);
}
}
tune_timeout_for_myds_needs_pause(myds);
}
if (myds->sess) {
if (unlikely(myds->sess->pause_until > 0)) {
if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->sess->pause_until - curtime;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , pause_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->pause_until, curtime);
}
tune_timeout_for_session_needs_pause(myds);
}
}
myds->revents=0;
if (myds->myds_type!=MYDS_LISTENER) {
if (myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) {
myds->set_pollout();
} else {
if (myds->DSS > STATE_MARIADB_BEGIN && myds->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
myds->set_pollout();
}
}
if (unlikely(myds->sess->pause_until > curtime)) {
if (myds->myds_type==MYDS_FRONTEND) {
myds->remove_pollout();
}
if (myds->myds_type==MYDS_BACKEND) {
if (mysql_thread___throttle_ratio_server_to_client) {
mypolls.fds[n].events = 0;
}
}
}
if (myds->myds_type==MYDS_BACKEND) {
set_backend_to_be_skipped_if_frontend_is_slow(myds, n);
}
configure_pollout(myds, n);
}
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
@ -3972,12 +3941,7 @@ __run_skip_1a:
maintenance_loop=false;
}
pthread_mutex_lock(&kq.m);
if (kq.conn_ids.size() + kq.query_ids.size()) {
Scan_Sessions_to_Kill_All();
maintenance_loop=true;
}
pthread_mutex_unlock(&kq.m);
handle_kill_queues();
// update polls statistics
mypolls.loops++;
@ -4027,31 +3991,10 @@ __run_skip_1a:
int i;
for (i=0; i<rc; i++) {
if (events[i].data.u32) {
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if (events[i].events) {
uint32_t sess_thr_id=events[i].data.u32;
uint32_t sess_pos=sessmap[sess_thr_id];
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
idle_thread_prepares_session_to_send_to_worker_thread(i);
}
}
// FIXME: this loop seems suboptimal, it can be combined with the previous one
for (i=0; i<rc; i++) {
if (events[i].events == EPOLLIN && events[i].data.u32==0) {
unsigned char c;
@ -4108,60 +4051,17 @@ __run_skip_1a:
MySQL_Data_Stream *myds=mypolls.myds[n];
if (myds==NULL) {
if (mypolls.fds[n].revents) {
unsigned char c;
if (read(mypolls.fds[n].fd, &c, 1)==-1) {// read just one byte
proxy_error("Error during read from signal_all_threads()\n");
}
proxy_debug(PROXY_DEBUG_GENERIC,3, "Got signal from admin , done nothing\n");
//fprintf(stderr,"Got signal from admin , done nothing\n"); // FIXME: this is just the skeleton for issue #253
if (c) {
// we are being signaled to sleep for some ms. Before going to sleep we also release the mutex
pthread_mutex_unlock(&thread_mutex);
usleep(c*1000);
pthread_mutex_lock(&thread_mutex);
// we enter in maintenance loop only if c is set
// when threads are signaling each other, there is no need to set maintenance_loop
maintenance_loop=true;
}
}
continue;
read_one_byte_from_pipe(n);
continue;
}
if (mypolls.fds[n].revents==0) {
// FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout
// check for timeout
// no events. This section is copied from process_data_on_data_stream()
if (poll_timeout_bool) {
MySQL_Data_Stream *_myds=mypolls.myds[n];
if (_myds && _myds->sess) {
if (_myds->wait_until && curtime > _myds->wait_until) {
// timeout
_myds->sess->to_process=1;
} else {
if (_myds->sess->pause_until && curtime > _myds->sess->pause_until) {
// timeout
_myds->sess->to_process=1;
}
}
}
check_timing_out_session(n);
}
} else {
// check if the FD is valid
if (mypolls.fds[n].revents==POLLNVAL) {
// debugging output before assert
MySQL_Data_Stream *_myds=mypolls.myds[n];
if (_myds) {
if (_myds->myconn) {
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, myds->fd, myds->myconn->fd);
assert(mypolls.fds[n].revents!=POLLNVAL);
}
}
// if we reached her, we didn't assert() yet
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, myds->fd);
assert(mypolls.fds[n].revents!=POLLNVAL);
}
check_for_invalid_fd(n); // this is designed to assert in case of failure
switch(myds->myds_type) {
// Note: this logic that was here was removed completely because we added mariadb client library.
// Note: this logic that was here was removed completely because we added mariadb client library.
case MYDS_LISTENER:
// we got a new connection!
listener_handle_new_connection(myds,n);
@ -4175,7 +4075,7 @@ __run_skip_1a:
if (rc==false) {
n--;
}
}
}
}
#ifdef IDLE_THREADS
@ -4214,6 +4114,32 @@ unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *ses
}
#ifdef IDLE_THREADS
void MySQL_Thread::idle_thread_prepares_session_to_send_to_worker_thread(int i) {
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if (events[i].events) {
uint32_t sess_thr_id=events[i].data.u32;
uint32_t sess_pos=sessmap[sess_thr_id];
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
tmp_myds->mypolls=NULL;
mysess->thread=NULL;
// we first delete the association in sessmap
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
unregister_session(sess_pos);
resume_mysql_sessions->add(mysess);
epoll_ctl(efd, EPOLL_CTL_DEL, tmp_myds->fd, NULL);
}
}
void MySQL_Thread::idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr) {
pthread_mutex_lock(&thr->myexchange.mutex_resumes);
if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) {
@ -6797,3 +6723,110 @@ void MySQL_Thread::handle_mirror_queue_mysql_sessions() {
}
}
}
void MySQL_Thread::handle_kill_queues() {
pthread_mutex_lock(&kq.m);
if (kq.conn_ids.size() + kq.query_ids.size()) {
Scan_Sessions_to_Kill_All();
maintenance_loop=true;
}
pthread_mutex_unlock(&kq.m);
}
void MySQL_Thread::check_timing_out_session(unsigned int n) {
// FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout
// check for timeout
// no events. This section is copied from process_data_on_data_stream()
MySQL_Data_Stream *_myds=mypolls.myds[n];
if (_myds && _myds->sess) {
if (_myds->wait_until && curtime > _myds->wait_until) {
// timeout
_myds->sess->to_process=1;
} else {
if (_myds->sess->pause_until && curtime > _myds->sess->pause_until) {
// timeout
_myds->sess->to_process=1;
}
}
}
}
void MySQL_Thread::check_for_invalid_fd(unsigned int n) {
// check if the FD is valid
if (mypolls.fds[n].revents==POLLNVAL) {
// debugging output before assert
MySQL_Data_Stream *_myds=mypolls.myds[n];
if (_myds) {
if (_myds->myconn) {
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd, _myds->myconn->fd);
assert(mypolls.fds[n].revents!=POLLNVAL);
}
}
// if we reached her, we didn't assert() yet
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd);
assert(mypolls.fds[n].revents!=POLLNVAL);
}
}
void MySQL_Thread::read_one_byte_from_pipe(unsigned int n) {
if (mypolls.fds[n].revents) {
unsigned char c;
if (read(mypolls.fds[n].fd, &c, 1)==-1) {// read just one byte
proxy_error("Error during read from signal_all_threads()\n");
}
proxy_debug(PROXY_DEBUG_GENERIC,3, "Got signal from admin , done nothing\n");
//fprintf(stderr,"Got signal from admin , done nothing\n"); // FIXME: this is just the skeleton for issue #253
if (c) {
// we are being signaled to sleep for some ms. Before going to sleep we also release the mutex
pthread_mutex_unlock(&thread_mutex);
usleep(c*1000);
pthread_mutex_lock(&thread_mutex);
// we enter in maintenance loop only if c is set
// when threads are signaling each other, there is no need to set maintenance_loop
maintenance_loop=true;
}
}
}
void MySQL_Thread::tune_timeout_for_myds_needs_pause(MySQL_Data_Stream *myds) {
if (myds->wait_until > curtime) {
if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->wait_until - curtime;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , wait_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->wait_until, curtime);
}
}
}
void MySQL_Thread::tune_timeout_for_session_needs_pause(MySQL_Data_Stream *myds) {
if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) {
mypolls.poll_timeout= myds->sess->pause_until - curtime;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Session=%p , poll_timeout=%llu , pause_until=%llu , curtime=%llu\n", mypolls.poll_timeout, myds->sess->pause_until, curtime);
}
}
void MySQL_Thread::configure_pollout(MySQL_Data_Stream *myds, unsigned int n) {
if (myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) {
myds->set_pollout();
} else {
if (myds->DSS > STATE_MARIADB_BEGIN && myds->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
myds->set_pollout();
}
}
if (unlikely(myds->sess->pause_until > curtime)) {
if (myds->myds_type==MYDS_FRONTEND) {
myds->remove_pollout();
}
if (myds->myds_type==MYDS_BACKEND) {
if (mysql_thread___throttle_ratio_server_to_client) {
mypolls.fds[n].events = 0;
}
}
}
if (myds->myds_type==MYDS_BACKEND) {
set_backend_to_be_skipped_if_frontend_is_slow(myds, n);
}
}

Loading…
Cancel
Save