@ -2473,44 +2473,7 @@ void MySQL_Thread::unregister_session(int idx) {
mysql_sessions - > remove_index_fast ( idx ) ;
}
// main loop
void MySQL_Thread : : run ( ) {
unsigned int n ;
int rc ;
# ifdef IDLE_THREADS
bool idle_maintenance_thread = epoll_thread ;
if ( idle_maintenance_thread ) {
// we check if it is the first time we are called
if ( efd = = - 1 ) {
efd = EPOLL_CREATE ;
int fd = pipefd [ 0 ] ;
struct epoll_event event ;
memset ( & event , 0 , sizeof ( event ) ) ; // let's make valgrind happy
event . events = EPOLLIN ;
event . data . u32 = 0 ; // special value to point to the pipe
epoll_ctl ( efd , EPOLL_CTL_ADD , fd , & event ) ;
}
}
# endif // IDLE_THREADS
curtime = monotonic_time ( ) ;
atomic_curtime = curtime ;
# ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX
pthread_mutex_lock ( & thread_mutex ) ;
# else
spin_wrlock ( & thread_mutex ) ;
# endif
while ( shutdown = = 0 ) {
# ifdef IDLE_THREADS
if ( idle_maintenance_thread ) {
goto __run_skip_1 ;
}
# endif // IDLE_THREADS
void MySQL_Thread : : ping_idle_connections ( ) {
int num_idles ;
if ( processing_idles = = true & & ( last_processing_idles < curtime - mysql_thread___ping_timeout_server * 1000 ) ) {
processing_idles = false ;
@ -2546,66 +2509,285 @@ void MySQL_Thread::run() {
processing_idles = true ;
last_processing_idles = curtime ;
}
}
# ifdef IDLE_THREADS
__run_skip_1 :
if ( idle_maintenance_thread ) {
pthread_mutex_lock ( & myexchange . mutex_idles ) ;
while ( myexchange . idle_mysql_sessions - > len ) {
MySQL_Session * mysess = ( MySQL_Session * ) myexchange . idle_mysql_sessions - > remove_index_fast ( 0 ) ;
register_session ( mysess , false ) ;
MySQL_Data_Stream * myds = mysess - > client_myds ;
mypolls . add ( POLLIN , myds - > fd , myds , monotonic_time ( ) ) ;
// add in epoll()
struct epoll_event event ;
memset ( & event , 0 , sizeof ( event ) ) ; // let's make valgrind happy
event . data . u32 = mysess - > thread_session_id ;
event . events = EPOLLIN ;
epoll_ctl ( efd , EPOLL_CTL_ADD , myds - > fd , & event ) ;
// we map thread_id -> position in mysql_session (end of the list)
sessmap [ mysess - > thread_session_id ] = mysql_sessions - > len - 1 ;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx);
void MySQL_Thread : : process_mirror_sessions ( ) {
while ( mirror_queue_mysql_sessions - > len ) {
if ( __sync_add_and_fetch ( & GloMTH - > status_variables . mirror_sessions_current , 1 ) > ( unsigned int ) mysql_thread___mirror_max_concurrency ) {
__sync_sub_and_fetch ( & GloMTH - > status_variables . mirror_sessions_current , 1 ) ;
//goto __mysql_thread_exit_add_mirror; // we can't add more mirror sessions at runtime
return ; // we can't add more mirror sessions at runtime
} else {
int idx ;
idx = fastrand ( ) % ( mirror_queue_mysql_sessions - > len ) ;
MySQL_Session * newsess = ( MySQL_Session * ) mirror_queue_mysql_sessions - > remove_index_fast ( idx ) ;
register_session ( newsess ) ;
newsess - > handler ( ) ; // execute immediately
if ( newsess - > status = = WAITING_CLIENT_DATA ) { // the mirror session has completed
unregister_session ( mysql_sessions - > len - 1 ) ;
unsigned int l = ( unsigned int ) mysql_thread___mirror_max_concurrency ;
if ( mirror_queue_mysql_sessions - > len * 0.3 > l ) l = mirror_queue_mysql_sessions - > len * 0.3 ;
if ( mirror_queue_mysql_sessions_cache - > len < = l ) {
bool to_cache = true ;
if ( newsess - > mybe ) {
if ( newsess - > mybe - > server_myds ) {
to_cache = false ;
}
}
if ( to_cache ) {
__sync_sub_and_fetch ( & GloMTH - > status_variables . mirror_sessions_current , 1 ) ;
mirror_queue_mysql_sessions_cache - > add ( newsess ) ;
} else {
delete newsess ;
}
} else {
delete newsess ;
}
}
pthread_mutex_unlock ( & myexchange . mutex_idles ) ;
goto __run_skip_1a ;
//newsess->to_process=0;
}
# endif // IDLE_THREADS
while ( mirror_queue_mysql_sessions - > len ) {
if ( __sync_add_and_fetch ( & GloMTH - > status_variables . mirror_sessions_current , 1 ) > ( unsigned int ) mysql_thread___mirror_max_concurrency ) {
__sync_sub_and_fetch ( & GloMTH - > status_variables . mirror_sessions_current , 1 ) ;
goto __mysql_thread_exit_add_mirror ; // we can't add more mirror sessions at runtime
} else {
int idx ;
idx = fastrand ( ) % ( mirror_queue_mysql_sessions - > len ) ;
MySQL_Session * newsess = ( MySQL_Session * ) mirror_queue_mysql_sessions - > remove_index_fast ( idx ) ;
register_session ( newsess ) ;
newsess - > handler ( ) ; // execute immediately
if ( newsess - > status = = WAITING_CLIENT_DATA ) { // the mirror session has completed
unregister_session ( mysql_sessions - > len - 1 ) ;
unsigned int l = ( unsigned int ) mysql_thread___mirror_max_concurrency ;
if ( mirror_queue_mysql_sessions - > len * 0.3 > l ) l = mirror_queue_mysql_sessions - > len * 0.3 ;
if ( mirror_queue_mysql_sessions_cache - > len < = l ) {
bool to_cache = true ;
if ( newsess - > mybe ) {
if ( newsess - > mybe - > server_myds ) {
to_cache = false ;
}
}
}
void MySQL_Thread : : MoveMydsToMaintenanceThreadIfNeeded ( MySQL_Data_Stream * myds , unsigned int n ) {
// here we try to move it to the maintenance thread
if ( myds - > myds_type = = MYDS_FRONTEND & & myds - > sess ) {
if ( myds - > DSS = = STATE_SLEEP & & myds - > sess - > status = = WAITING_CLIENT_DATA ) {
unsigned long long _tmp_idle = mypolls . last_recv [ n ] > mypolls . last_sent [ n ] ? mypolls . last_recv [ n ] : mypolls . last_sent [ n ] ;
if ( _tmp_idle < ( ( curtime > ( unsigned int ) mysql_thread___session_idle_ms * 1000 ) ? ( curtime - mysql_thread___session_idle_ms * 1000 ) : 0 ) ) {
if ( myds - > sess - > client_myds = = myds & & myds - > PSarrayOUT - > len = = 0 & & ( myds - > queueOUT . head - myds - > queueOUT . tail ) = = 0 ) { // extra check
unsigned int j ;
int conns = 0 ;
for ( j = 0 ; j < myds - > sess - > mybes - > len ; j + + ) {
MySQL_Backend * tmp_mybe = ( MySQL_Backend * ) myds - > sess - > mybes - > index ( j ) ;
MySQL_Data_Stream * __myds = tmp_mybe - > server_myds ;
if ( __myds - > myconn ) {
conns + + ;
}
if ( to_cache ) {
__sync_sub_and_fetch ( & GloMTH - > status_variables . mirror_sessions_current , 1 ) ;
mirror_queue_mysql_sessions_cache - > add ( newsess ) ;
} else {
delete newsess ;
}
unsigned long long idle_since = curtime - myds - > sess - > IdleTime ( ) ;
bool exit_cond = false ;
if ( conns = = 0 ) {
mypolls . remove_index_fast ( n ) ;
myds - > mypolls = NULL ;
unsigned int i ;
for ( i = 0 ; i < mysql_sessions - > len ; i + + ) {
MySQL_Session * mysess = ( MySQL_Session * ) mysql_sessions - > index ( i ) ;
if ( mysess = = myds - > sess ) {
mysess - > thread = NULL ;
unregister_session ( i ) ;
exit_cond = true ;
mysess - > idle_since = idle_since ;
idle_mysql_sessions - > add ( mysess ) ;
break ;
}
}
} else {
delete newsess ;
}
if ( exit_cond ) {
//continue;
return ;
}
}
//newsess->to_process=0;
}
}
__mysql_thread_exit_add_mirror :
}
}
void MySQL_Thread : : HandleEventFromEpoll ( int i ) {
if ( events [ i ] . data . u32 ) {
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if ( events [ i ] . events ) {
uint32_t sess_thr_id = events [ i ] . data . u32 ;
uint32_t sess_pos = sessmap [ sess_thr_id ] ;
MySQL_Session * mysess = ( MySQL_Session * ) mysql_sessions - > index ( sess_pos ) ;
MySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
int dsidx = tmp_myds - > poll_fds_idx ;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls . remove_index_fast ( dsidx ) ;
tmp_myds - > mypolls = NULL ;
mysess - > thread = NULL ;
// we first delete the association in sessmap
sessmap . erase ( mysess - > thread_session_id ) ;
if ( mysql_sessions - > len > 1 ) {
// take the last element and adjust the map
MySQL_Session * mysess_last = ( MySQL_Session * ) mysql_sessions - > index ( mysql_sessions - > len - 1 ) ;
if ( mysess - > thread_session_id ! = mysess_last - > thread_session_id )
sessmap [ mysess_last - > thread_session_id ] = sess_pos ;
}
unregister_session ( sess_pos ) ;
resume_mysql_sessions - > add ( mysess ) ;
epoll_ctl ( efd , EPOLL_CTL_DEL , tmp_myds - > fd , NULL ) ;
}
}
}
void MySQL_Thread : : ScanIdleClients ( ) {
# define SESS_TO_SCAN 128
if ( mysess_idx + SESS_TO_SCAN > mysql_sessions - > len ) {
mysess_idx = 0 ;
}
unsigned int i ;
unsigned long long min_idle = 0 ;
if ( curtime > ( unsigned long long ) mysql_thread___wait_timeout * 1000 ) {
min_idle = curtime - ( unsigned long long ) mysql_thread___wait_timeout * 1000 ;
}
for ( i = 0 ; i < SESS_TO_SCAN & & mysess_idx < mysql_sessions - > len ; i + + ) {
uint32_t sess_pos = mysess_idx ;
MySQL_Session * mysess = ( MySQL_Session * ) mysql_sessions - > index ( sess_pos ) ;
if ( mysess - > idle_since < min_idle ) {
mysess - > killed = true ;
MySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
int dsidx = tmp_myds - > poll_fds_idx ;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls . remove_index_fast ( dsidx ) ;
tmp_myds - > mypolls = NULL ;
mysess - > thread = NULL ;
// we first delete the association in sessmap
sessmap . erase ( mysess - > thread_session_id ) ;
if ( mysql_sessions - > len > 1 ) {
// take the last element and adjust the map
MySQL_Session * mysess_last = ( MySQL_Session * ) mysql_sessions - > index ( mysql_sessions - > len - 1 ) ;
if ( mysess - > thread_session_id ! = mysess_last - > thread_session_id )
sessmap [ mysess_last - > thread_session_id ] = sess_pos ;
}
unregister_session ( sess_pos ) ;
resume_mysql_sessions - > add ( mysess ) ;
epoll_ctl ( efd , EPOLL_CTL_DEL , tmp_myds - > fd , NULL ) ;
}
mysess_idx + + ;
}
}
void MySQL_Thread : : ProcessEventOnDataStream ( MySQL_Data_Stream * myds , unsigned int n ) {
if ( myds & & myds - > myds_type = = MYDS_FRONTEND & & myds - > DSS = = STATE_SLEEP & & myds - > sess & & myds - > sess - > status = = WAITING_CLIENT_DATA ) {
mypolls . myds [ n ] - > set_pollout ( ) ;
} else {
if ( mypolls . myds [ n ] - > DSS > STATE_MARIADB_BEGIN & & mypolls . myds [ n ] - > DSS < STATE_MARIADB_END ) {
mypolls . fds [ n ] . events = POLLIN ;
if ( mypolls . myds [ n ] - > myconn - > async_exit_status & MYSQL_WAIT_WRITE )
mypolls . fds [ n ] . events | = POLLOUT ;
} else {
mypolls . myds [ n ] - > set_pollout ( ) ;
}
}
if ( myds & & myds - > sess - > pause_until > curtime ) {
if ( myds - > myds_type = = MYDS_FRONTEND ) {
mypolls . myds [ n ] - > remove_pollout ( ) ;
}
if ( myds - > myds_type = = MYDS_BACKEND ) {
if ( mysql_thread___throttle_ratio_server_to_client ) {
mypolls . fds [ n ] . events = 0 ;
}
}
}
if ( myds & & myds - > myds_type = = MYDS_BACKEND ) {
if ( myds - > sess & & myds - > sess - > client_myds & & myds - > sess - > mirror = = false ) {
unsigned int buffered_data = 0 ;
buffered_data = myds - > sess - > client_myds - > PSarrayOUT - > len * RESULTSET_BUFLEN ;
buffered_data + = myds - > sess - > client_myds - > resultset - > len * RESULTSET_BUFLEN ;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if ( buffered_data > ( unsigned int ) mysql_thread___threshold_resultset_size * 4 ) {
mypolls . fds [ n ] . events = 0 ;
}
}
}
}
void MySQL_Thread : : ComputeMinTimeToWait ( MySQL_Data_Stream * myds ) {
if ( myds - > wait_until ) {
if ( myds - > wait_until > curtime ) {
//if (mypolls.poll_timeout==0 || (myds->wait_until - curtime < mypolls.poll_timeout) ) {
if ( myds - > wait_until - curtime < mypolls . poll_timeout ) {
//if (myds->wait_until < mypolls.abs_poll_timeout) {
// mypolls.abs_poll_timeout = myds->wait_until;
mypolls . poll_timeout = myds - > wait_until - curtime ;
}
}
}
if ( myds - > sess ) {
if ( myds - > sess - > pause_until > 0 ) {
//if (mypolls.poll_timeout==0 || (myds->sess->pause_until - curtime < mypolls.poll_timeout) ) {
if ( myds - > sess - > pause_until - curtime < mypolls . poll_timeout ) {
mypolls . poll_timeout = myds - > sess - > pause_until - curtime ;
//if (myds->sess->pause_until < mypolls.abs_poll_timeout) {
// mypolls.abs_poll_timeout = myds->sess->pause_until;
}
}
}
}
// main loop
void MySQL_Thread : : run ( ) {
unsigned int n ;
int rc ;
# ifdef IDLE_THREADS
bool idle_maintenance_thread = epoll_thread ;
if ( idle_maintenance_thread ) {
// we check if it is the first time we are called
if ( efd = = - 1 ) {
efd = EPOLL_CREATE ;
int fd = pipefd [ 0 ] ;
struct epoll_event event ;
memset ( & event , 0 , sizeof ( event ) ) ; // let's make valgrind happy
event . events = EPOLLIN ;
event . data . u32 = 0 ; // special value to point to the pipe
epoll_ctl ( efd , EPOLL_CTL_ADD , fd , & event ) ;
}
}
# endif // IDLE_THREADS
curtime = monotonic_time ( ) ;
atomic_curtime = curtime ;
# ifdef PROXYSQL_MYSQL_PTHREAD_MUTEX
pthread_mutex_lock ( & thread_mutex ) ;
# else
spin_wrlock ( & thread_mutex ) ;
# endif
while ( shutdown = = 0 ) {
# ifdef IDLE_THREADS
if ( idle_maintenance_thread ) {
goto __run_skip_1 ;
}
# endif // IDLE_THREADS
ping_idle_connections ( ) ;
# ifdef IDLE_THREADS
__run_skip_1 :
if ( idle_maintenance_thread ) {
pthread_mutex_lock ( & myexchange . mutex_idles ) ;
while ( myexchange . idle_mysql_sessions - > len ) {
MySQL_Session * mysess = ( MySQL_Session * ) myexchange . idle_mysql_sessions - > remove_index_fast ( 0 ) ;
register_session ( mysess , false ) ;
MySQL_Data_Stream * myds = mysess - > client_myds ;
mypolls . add ( POLLIN , myds - > fd , myds , monotonic_time ( ) ) ;
// add in epoll()
struct epoll_event event ;
memset ( & event , 0 , sizeof ( event ) ) ; // let's make valgrind happy
event . data . u32 = mysess - > thread_session_id ;
event . events = EPOLLIN ;
epoll_ctl ( efd , EPOLL_CTL_ADD , myds - > fd , & event ) ;
// we map thread_id -> position in mysql_session (end of the list)
sessmap [ mysess - > thread_session_id ] = mysql_sessions - > len - 1 ;
//fprintf(stderr,"Adding session %p idx, DS %p idx %d\n",mysess,myds,myds->poll_fds_idx);
}
pthread_mutex_unlock ( & myexchange . mutex_idles ) ;
goto __run_skip_1a ;
}
# endif // IDLE_THREADS
process_mirror_sessions ( ) ;
//__mysql_thread_exit_add_mirror:
mypolls . poll_timeout = mysql_thread___poll_timeout / 1000 ; // predefined default value
//mypolls.abs_poll_timeout = curtime + mypolls.poll_timeout;
for ( n = 0 ; n < mypolls . len ; n + + ) {
MySQL_Data_Stream * myds = NULL ;
myds = mypolls . myds [ n ] ;
@ -2613,104 +2795,28 @@ __mysql_thread_exit_add_mirror:
if ( myds ) {
# ifdef IDLE_THREADS
if ( GloVars . global . idle_threads ) {
// here we try to move it to the maintenance thread
if ( myds - > myds_type = = MYDS_FRONTEND & & myds - > sess ) {
if ( myds - > DSS = = STATE_SLEEP & & myds - > sess - > status = = WAITING_CLIENT_DATA ) {
unsigned long long _tmp_idle = mypolls . last_recv [ n ] > mypolls . last_sent [ n ] ? mypolls . last_recv [ n ] : mypolls . last_sent [ n ] ;
if ( _tmp_idle < ( ( curtime > ( unsigned int ) mysql_thread___session_idle_ms * 1000 ) ? ( curtime - mysql_thread___session_idle_ms * 1000 ) : 0 ) ) {
if ( myds - > sess - > client_myds = = myds & & myds - > PSarrayOUT - > len = = 0 & & ( myds - > queueOUT . head - myds - > queueOUT . tail ) = = 0 ) { // extra check
unsigned int j ;
int conns = 0 ;
for ( j = 0 ; j < myds - > sess - > mybes - > len ; j + + ) {
MySQL_Backend * tmp_mybe = ( MySQL_Backend * ) myds - > sess - > mybes - > index ( j ) ;
MySQL_Data_Stream * __myds = tmp_mybe - > server_myds ;
if ( __myds - > myconn ) {
conns + + ;
}
}
unsigned long long idle_since = curtime - myds - > sess - > IdleTime ( ) ;
bool exit_cond = false ;
if ( conns = = 0 ) {
mypolls . remove_index_fast ( n ) ;
myds - > mypolls = NULL ;
unsigned int i ;
for ( i = 0 ; i < mysql_sessions - > len ; i + + ) {
MySQL_Session * mysess = ( MySQL_Session * ) mysql_sessions - > index ( i ) ;
if ( mysess = = myds - > sess ) {
mysess - > thread = NULL ;
unregister_session ( i ) ;
exit_cond = true ;
mysess - > idle_since = idle_since ;
idle_mysql_sessions - > add ( mysess ) ;
break ;
}
}
}
if ( exit_cond ) {
continue ;
}
}
}
}
}
MoveMydsToMaintenanceThreadIfNeeded ( myds , n ) ;
}
# endif // IDLE_THREADS
if ( myds - > wait_until ) {
if ( myds - > wait_until > curtime ) {
if ( mypolls . poll_timeout = = 0 | | ( myds - > wait_until - curtime < mypolls . poll_timeout ) ) {
mypolls . poll_timeout = myds - > wait_until - curtime ;
}
}
}
if ( myds - > sess ) {
if ( myds - > sess - > pause_until > 0 ) {
if ( mypolls . poll_timeout = = 0 | | ( myds - > sess - > pause_until - curtime < mypolls . poll_timeout ) ) {
mypolls . poll_timeout = myds - > sess - > pause_until - curtime ;
}
}
}
ComputeMinTimeToWait ( myds ) ;
}
if ( myds ) myds - > revents = 0 ;
if ( mypolls . myds [ n ] & & mypolls . myds [ n ] - > myds_type ! = MYDS_LISTENER ) {
if ( myds & & myds - > myds_type = = MYDS_FRONTEND & & myds - > DSS = = STATE_SLEEP & & myds - > sess & & myds - > sess - > status = = WAITING_CLIENT_DATA ) {
mypolls . myds [ n ] - > set_pollout ( ) ;
} else {
if ( mypolls . myds [ n ] - > DSS > STATE_MARIADB_BEGIN & & mypolls . myds [ n ] - > DSS < STATE_MARIADB_END ) {
mypolls . fds [ n ] . events = POLLIN ;
if ( mypolls . myds [ n ] - > myconn - > async_exit_status & MYSQL_WAIT_WRITE )
mypolls . fds [ n ] . events | = POLLOUT ;
} else {
mypolls . myds [ n ] - > set_pollout ( ) ;
}
}
if ( myds & & myds - > sess - > pause_until > curtime ) {
if ( myds - > myds_type = = MYDS_FRONTEND ) {
mypolls . myds [ n ] - > remove_pollout ( ) ;
}
if ( myds - > myds_type = = MYDS_BACKEND ) {
if ( mysql_thread___throttle_ratio_server_to_client ) {
mypolls . fds [ n ] . events = 0 ;
}
}
}
if ( myds & & myds - > myds_type = = MYDS_BACKEND ) {
if ( myds - > sess & & myds - > sess - > client_myds & & myds - > sess - > mirror = = false ) {
unsigned int buffered_data = 0 ;
buffered_data = myds - > sess - > client_myds - > PSarrayOUT - > len * RESULTSET_BUFLEN ;
buffered_data + = myds - > sess - > client_myds - > resultset - > len * RESULTSET_BUFLEN ;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
if ( buffered_data > ( unsigned int ) mysql_thread___threshold_resultset_size * 4 ) {
mypolls . fds [ n ] . events = 0 ;
}
}
}
ProcessEventOnDataStream ( myds , n ) ;
}
proxy_debug ( PROXY_DEBUG_NET , 1 , " Poll for DataStream=%p will be called with FD=%d and events=%d \n " , mypolls . myds [ n ] , mypolls . fds [ n ] . fd , mypolls . fds [ n ] . events ) ;
}
/*
if ( mypolls . abs_poll_timeout > curtime ) {
mypolls . abs_poll_timeout - = curtime ;
if ( mypolls . abs_poll_timeout < mypolls . poll_timeout ) {
mypolls . poll_timeout = mypolls . abs_poll_timeout ;
}
} else {
// something went wrong
mypolls . poll_timeout = 0 ;
}
*/
# ifdef IDLE_THREADS
if ( GloVars . global . idle_threads ) {
if ( idle_maintenance_thread = = false ) {
@ -2787,7 +2893,9 @@ __run_skip_1a:
//this is the only portion of code not protected by a global mutex
proxy_debug ( PROXY_DEBUG_NET , 5 , " Calling poll with timeout %d \n " , ( mypolls . poll_timeout ? ( mypolls . poll_timeout / 1000 > ( unsigned int ) mysql_thread___poll_timeout ? mypolls . poll_timeout / 1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ) ;
// poll is called with a timeout of mypolls.poll_timeout if set , or mysql_thread___poll_timeout
rc = poll ( mypolls . fds , mypolls . len , ( mypolls . poll_timeout ? ( mypolls . poll_timeout / 1000 < ( unsigned int ) mysql_thread___poll_timeout ? mypolls . poll_timeout / 1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) ) ;
//rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) );
//rc=poll(mypolls.fds,mypolls.len, 0);
rc = poll ( mypolls . fds , mypolls . len , mypolls . poll_timeout / 1000 ) ;
proxy_debug ( PROXY_DEBUG_NET , 5 , " %s \n " , " Returning poll " ) ;
# ifdef IDLE_THREADS
}
@ -2831,9 +2939,15 @@ __run_skip_1a:
}
// update polls statistics
mypolls . loops + + ;
mypolls . loop_counters - > incr ( curtime / 1000000 ) ;
# ifdef DEBUG
if ( mypolls . loops = = 0 & & rc > 0 ) {
mypolls . first_loops_at = curtime ;
mypolls . loops + + ;
}
if ( mypolls . loops )
mypolls . loops + + ;
//mypolls.loop_counters->incr(curtime/1000000);
# endif // DEBUG
if ( maintenance_loop ) {
// house keeping
unsigned int l = ( unsigned int ) mysql_thread___mirror_max_concurrency ;
@ -2856,9 +2970,9 @@ __run_skip_1a:
exit ( EXIT_FAILURE ) ;
}
if ( __sync_add_and_fetch ( & __global_MySQL_Thread_Variables_version , 0 ) > __thread_MySQL_Thread_Variables_version ) {
refresh_variables ( ) ;
}
// if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) {
// refresh_variables();
// }
# ifdef IDLE_THREADS
if ( idle_maintenance_thread = = false ) {
@ -2877,31 +2991,7 @@ __run_skip_1a:
if ( rc ) {
int i ;
for ( i = 0 ; i < rc ; i + + ) {
if ( events [ i ] . data . u32 ) {
// NOTE: not sure why, sometime events returns odd values. If set, we take it out as normal worker threads know how to handle it
if ( events [ i ] . events ) {
uint32_t sess_thr_id = events [ i ] . data . u32 ;
uint32_t sess_pos = sessmap [ sess_thr_id ] ;
MySQL_Session * mysess = ( MySQL_Session * ) mysql_sessions - > index ( sess_pos ) ;
MySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
int dsidx = tmp_myds - > poll_fds_idx ;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls . remove_index_fast ( dsidx ) ;
tmp_myds - > mypolls = NULL ;
mysess - > thread = NULL ;
// we first delete the association in sessmap
sessmap . erase ( mysess - > thread_session_id ) ;
if ( mysql_sessions - > len > 1 ) {
// take the last element and adjust the map
MySQL_Session * mysess_last = ( MySQL_Session * ) mysql_sessions - > index ( mysql_sessions - > len - 1 ) ;
if ( mysess - > thread_session_id ! = mysess_last - > thread_session_id )
sessmap [ mysess_last - > thread_session_id ] = sess_pos ;
}
unregister_session ( sess_pos ) ;
resume_mysql_sessions - > add ( mysess ) ;
epoll_ctl ( efd , EPOLL_CTL_DEL , tmp_myds - > fd , NULL ) ;
}
}
HandleEventFromEpoll ( i ) ;
}
for ( i = 0 ; i < rc ; i + + ) {
if ( events [ i ] . events = = EPOLLIN & & events [ i ] . data . u32 = = 0 ) {
@ -2915,40 +3005,7 @@ __run_skip_1a:
}
}
if ( mysql_sessions - > len & & maintenance_loop ) {
# define SESS_TO_SCAN 128
if ( mysess_idx + SESS_TO_SCAN > mysql_sessions - > len ) {
mysess_idx = 0 ;
}
unsigned int i ;
unsigned long long min_idle = 0 ;
if ( curtime > ( unsigned long long ) mysql_thread___wait_timeout * 1000 ) {
min_idle = curtime - ( unsigned long long ) mysql_thread___wait_timeout * 1000 ;
}
for ( i = 0 ; i < SESS_TO_SCAN & & mysess_idx < mysql_sessions - > len ; i + + ) {
uint32_t sess_pos = mysess_idx ;
MySQL_Session * mysess = ( MySQL_Session * ) mysql_sessions - > index ( sess_pos ) ;
if ( mysess - > idle_since < min_idle ) {
mysess - > killed = true ;
MySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
int dsidx = tmp_myds - > poll_fds_idx ;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls . remove_index_fast ( dsidx ) ;
tmp_myds - > mypolls = NULL ;
mysess - > thread = NULL ;
// we first delete the association in sessmap
sessmap . erase ( mysess - > thread_session_id ) ;
if ( mysql_sessions - > len > 1 ) {
// take the last element and adjust the map
MySQL_Session * mysess_last = ( MySQL_Session * ) mysql_sessions - > index ( mysql_sessions - > len - 1 ) ;
if ( mysess - > thread_session_id ! = mysess_last - > thread_session_id )
sessmap [ mysess_last - > thread_session_id ] = sess_pos ;
}
unregister_session ( sess_pos ) ;
resume_mysql_sessions - > add ( mysess ) ;
epoll_ctl ( efd , EPOLL_CTL_DEL , tmp_myds - > fd , NULL ) ;
}
mysess_idx + + ;
}
ScanIdleClients ( ) ;
}
goto __run_skip_2 ;
}
@ -3040,40 +3097,7 @@ __run_skip_1a:
# ifdef IDLE_THREADS
__run_skip_2 :
if ( GloVars . global . idle_threads & & idle_maintenance_thread ) {
unsigned int w = rand ( ) % ( GloMTH - > num_threads ) ;
MySQL_Thread * thr = GloMTH - > mysql_threads [ w ] . worker ;
if ( resume_mysql_sessions - > len ) {
pthread_mutex_lock ( & thr - > myexchange . mutex_resumes ) ;
unsigned int ims ;
if ( shutdown = = 0 & & thr - > shutdown = = 0 )
for ( ims = 0 ; ims < resume_mysql_sessions - > len ; ims + + ) {
MySQL_Session * mysess = ( MySQL_Session * ) resume_mysql_sessions - > remove_index_fast ( 0 ) ;
thr - > myexchange . resume_mysql_sessions - > add ( mysess ) ;
}
pthread_mutex_unlock ( & thr - > myexchange . mutex_resumes ) ;
{
unsigned char c = 0 ;
//MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd = thr - > pipefd [ 1 ] ;
if ( write ( fd , & c , 1 ) = = - 1 ) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
} else {
VALGRIND_DISABLE_ERROR_REPORTING ;
pthread_mutex_lock ( & thr - > myexchange . mutex_resumes ) ;
VALGRIND_ENABLE_ERROR_REPORTING ;
if ( shutdown = = 0 & & thr - > shutdown = = 0 & & thr - > myexchange . resume_mysql_sessions - > len ) {
unsigned char c = 0 ;
int fd = thr - > pipefd [ 1 ] ;
if ( write ( fd , & c , 1 ) = = - 1 ) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
VALGRIND_DISABLE_ERROR_REPORTING ;
pthread_mutex_unlock ( & thr - > myexchange . mutex_resumes ) ;
VALGRIND_ENABLE_ERROR_REPORTING ;
}
MoveMydsBackToWorkerThreadIfNeeded ( ) ;
} else {
# endif // IDLE_THREADS
// iterate through all sessions and process the session logic
@ -3086,6 +3110,43 @@ __run_skip_2:
}
}
void MySQL_Thread : : MoveMydsBackToWorkerThreadIfNeeded ( ) {
unsigned int w = rand ( ) % ( GloMTH - > num_threads ) ;
MySQL_Thread * thr = GloMTH - > mysql_threads [ w ] . worker ;
if ( resume_mysql_sessions - > len ) {
pthread_mutex_lock ( & thr - > myexchange . mutex_resumes ) ;
unsigned int ims ;
if ( shutdown = = 0 & & thr - > shutdown = = 0 )
for ( ims = 0 ; ims < resume_mysql_sessions - > len ; ims + + ) {
MySQL_Session * mysess = ( MySQL_Session * ) resume_mysql_sessions - > remove_index_fast ( 0 ) ;
thr - > myexchange . resume_mysql_sessions - > add ( mysess ) ;
}
pthread_mutex_unlock ( & thr - > myexchange . mutex_resumes ) ;
{
unsigned char c = 0 ;
//MySQL_Thread *thr=GloMTH->mysql_threads[w].worker;
int fd = thr - > pipefd [ 1 ] ;
if ( write ( fd , & c , 1 ) = = - 1 ) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
} else {
VALGRIND_DISABLE_ERROR_REPORTING ;
pthread_mutex_lock ( & thr - > myexchange . mutex_resumes ) ;
VALGRIND_ENABLE_ERROR_REPORTING ;
if ( shutdown = = 0 & & thr - > shutdown = = 0 & & thr - > myexchange . resume_mysql_sessions - > len ) {
unsigned char c = 0 ;
int fd = thr - > pipefd [ 1 ] ;
if ( write ( fd , & c , 1 ) = = - 1 ) {
//proxy_error("Error while signaling maintenance thread\n");
}
}
VALGRIND_DISABLE_ERROR_REPORTING ;
pthread_mutex_unlock ( & thr - > myexchange . mutex_resumes ) ;
VALGRIND_ENABLE_ERROR_REPORTING ;
}
}
bool MySQL_Thread : : process_data_on_data_stream ( MySQL_Data_Stream * myds , unsigned int n ) {
if ( mypolls . fds [ n ] . revents ) {
# ifdef IDLE_THREADS
@ -3310,8 +3371,9 @@ void MySQL_Thread::process_all_sessions() {
}
}
unsigned int total_active_transactions_tmp ;
total_active_transactions_tmp = __sync_add_and_fetch ( & status_variables . active_transactions , 0 ) ;
__sync_bool_compare_and_swap ( & status_variables . active_transactions , total_active_transactions_tmp , total_active_transactions_ ) ;
status_variables . active_transactions = total_active_transactions_ ;
//total_active_transactions_tmp=__sync_add_and_fetch(&status_variables.active_transactions,0);
//__sync_bool_compare_and_swap(&status_variables.active_transactions,total_active_transactions_tmp,total_active_transactions_);
}
void MySQL_Thread : : refresh_variables ( ) {
@ -4534,7 +4596,8 @@ unsigned int MySQL_Threads_Handler::get_active_transations() {
if ( mysql_threads ) {
MySQL_Thread * thr = ( MySQL_Thread * ) mysql_threads [ i ] . worker ;
if ( thr )
q + = __sync_fetch_and_add ( & thr - > status_variables . active_transactions , 0 ) ;
//q+=__sync_fetch_and_add(&thr->status_variables.active_transactions,0);
q + = thr - > status_variables . active_transactions ; // using volatile
}
}
return q ;