@ -12,6 +12,8 @@
# include "re2/regexp.h"
# include "ProxySQL_Data_Stream.h"
# include "MySQL_Data_Stream.h"
# include "MySQL_Session.h"
# include "query_processor.h"
# include "StatCounters.h"
# include "MySQL_PreparedStatement.h"
@ -2801,11 +2803,11 @@ ProxyWorker_Thread::~ProxyWorker_Thread() {
}
Client _Session * ProxyWorker_Thread : : create_new_session_and_client _data_stream( int _fd ) {
MySQL _Session * ProxyWorker_Thread : : create_new_session_and_client _mysql _data_stream( int _fd ) {
int arg_on = 1 ;
Client_Session * sess = new Client _Session;
MySQL_Session * sess = new MySQL _Session;
register_session ( sess ) ; // register session
sess - > client_myds = new Prox ySQL_Data_Stream( ) ;
sess - > client_myds = new M ySQL_Data_Stream( ) ;
sess - > client_myds - > fd = _fd ;
setsockopt ( sess - > client_myds - > fd , IPPROTO_TCP , TCP_NODELAY , ( char * ) & arg_on , sizeof ( arg_on ) ) ;
@ -2945,9 +2947,9 @@ void ProxyWorker_Thread::run___get_multiple_idle_connections(int& num_idles) {
int i ;
num_idles = MyHGM - > get_multiple_idle_connections ( - 1 , curtime - mysql_thread___ping_interval_server_msec * 1000 , my_idle_conns , SESSIONS_FOR_CONNECTIONS_HANDLER ) ;
for ( i = 0 ; i < num_idles ; i + + ) {
Prox ySQL_Data_Stream * myds ;
M ySQL_Data_Stream * myds ;
MySQL_Connection * mc = my_idle_conns [ i ] ;
Client_Session * sess = new Client _Session( ) ;
MySQL_Session * sess = new MySQL _Session( ) ;
sess - > mybe = sess - > find_or_create_mysql_backend ( mc - > parent - > myhgc - > hid ) ;
myds = sess - > mybe - > server_myds ;
@ -3012,7 +3014,7 @@ void ProxyWorker_Thread::ProcessAllMyDS_BeforePoll() {
}
myds - > revents = 0 ;
if ( myds - > myds_type ! = MYDS_LISTENER ) {
configure_pollout ( myds , n ) ;
configure_pollout ( ( MySQL_Data_Stream * ) myds , n ) ;
}
}
proxy_debug ( PROXY_DEBUG_NET , 1 , " Poll for DataStream=%p will be called with FD=%d and events=%d \n " , mypolls . myds [ n ] , mypolls . fds [ n ] . fd , mypolls . fds [ n ] . events ) ;
@ -3355,10 +3357,10 @@ void ProxyWorker_Thread::idle_thread_to_kill_idle_sessions() {
}
for ( i = 0 ; i < SESS_TO_SCAN & & mysess_idx < mysql_sessions - > len ; i + + ) {
uint32_t sess_pos = mysess_idx ;
Client_Session * mysess = ( Client _Session * ) mysql_sessions - > index ( sess_pos ) ;
MySQL_Session * mysess = ( MySQL _Session * ) mysql_sessions - > index ( sess_pos ) ;
if ( mysess - > idle_since < min_idle | | mysess - > killed = = true ) {
mysess - > killed = true ;
Prox ySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
M ySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
int dsidx = tmp_myds - > poll_fds_idx ;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls . remove_index_fast ( dsidx ) ;
@ -3368,7 +3370,7 @@ void ProxyWorker_Thread::idle_thread_to_kill_idle_sessions() {
sessmap . erase ( mysess - > thread_session_id ) ;
if ( mysql_sessions - > len > 1 ) {
// take the last element and adjust the map
Client_Session * mysess_last = ( Client _Session * ) mysql_sessions - > index ( mysql_sessions - > len - 1 ) ;
MySQL_Session * mysess_last = ( MySQL _Session * ) mysql_sessions - > index ( mysql_sessions - > len - 1 ) ;
if ( mysess - > thread_session_id ! = mysess_last - > thread_session_id )
sessmap [ mysess_last - > thread_session_id ] = sess_pos ;
}
@ -3385,8 +3387,8 @@ void ProxyWorker_Thread::idle_thread_prepares_session_to_send_to_worker_thread(i
if ( events [ i ] . events ) {
uint32_t sess_thr_id = events [ i ] . data . u32 ;
uint32_t sess_pos = sessmap [ sess_thr_id ] ;
Client_Session * mysess = ( Client _Session * ) mysql_sessions - > index ( sess_pos ) ;
Prox ySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
MySQL_Session * mysess = ( MySQL _Session * ) mysql_sessions - > index ( sess_pos ) ;
M ySQL_Data_Stream * tmp_myds = mysess - > client_myds ;
int dsidx = tmp_myds - > poll_fds_idx ;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls . remove_index_fast ( dsidx ) ;
@ -3473,7 +3475,7 @@ void ProxyWorker_Thread::worker_thread_gets_sessions_from_idle_thread() {
while ( myexchange . resume_mysql_sessions - > len ) {
Client_Session * mysess = ( Client_Session * ) myexchange . resume_mysql_sessions - > remove_index_fast ( 0 ) ;
register_session ( mysess , false ) ;
ProxySQL_Data_Stream * myds = mysess - > client_myds ;
ProxySQL_Data_Stream * myds = ( ( MySQL_Session * ) mysess ) - > client_myds ;
mypolls . add ( POLLIN , myds - > fd , myds , monotonic_time ( ) ) ;
}
}
@ -3528,10 +3530,11 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
//
// this can happen, for example, with a low wait_timeout and running transaction
if ( myds - > sess - > status = = WAITING_CLIENT_DATA ) {
if ( myds - > myconn - > async_state_machine = = ASYNC_IDLE ) {
proxy_warning ( " Detected broken idle connection on %s:%d \n " , myds - > myconn - > parent - > address , myds - > myconn - > parent - > port ) ;
myds - > destroy_MySQL_Connection_From_Pool ( false ) ;
myds - > sess - > set_unhealthy ( ) ;
MySQL_Data_Stream * _myds = ( MySQL_Data_Stream * ) myds ;
if ( _myds - > myconn - > async_state_machine = = ASYNC_IDLE ) {
proxy_warning ( " Detected broken idle connection on %s:%d \n " , _myds - > myconn - > parent - > address , _myds - > myconn - > parent - > port ) ;
_myds - > destroy_MySQL_Connection_From_Pool ( false ) ;
_myds - > sess - > set_unhealthy ( ) ;
return false ;
}
}
@ -3543,14 +3546,15 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
// only if we aren't using MariaDB Client Library
int rb = 0 ;
do {
rb = myds - > read_from_net ( ) ;
if ( rb > 0 & & myds - > myds_type = = MYDS_FRONTEND ) {
MySQL_Data_Stream * _myds = ( MySQL_Data_Stream * ) myds ;
rb = _myds - > read_from_net ( ) ;
if ( rb > 0 & & _myds - > myds_type = = MYDS_FRONTEND ) {
status_variables . stvar [ st_var_queries_frontends_bytes_recv ] + = rb ;
}
myds- > read_pkts ( ) ;
_ myds- > read_pkts ( ) ;
if ( rb > 0 & & myds- > myds_type = = MYDS_BACKEND ) {
if ( myds- > sess - > session_fast_forward ) {
if ( rb > 0 & & _ myds- > myds_type = = MYDS_BACKEND ) {
if ( _ myds- > sess - > session_fast_forward ) {
struct pollfd _fds ;
nfds_t _nfds = 1 ;
_fds . fd = mypolls . fds [ n ] . fd ;
@ -3559,7 +3563,7 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
int _rc = poll ( & _fds , _nfds , 0 ) ;
if ( ( _rc > 0 ) & & _fds . revents = = POLLIN ) {
// there is more data
myds- > revents = _fds . revents ;
_ myds- > revents = _fds . revents ;
} else {
rb = 0 ; // exit loop
}
@ -3568,10 +3572,10 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
}
} else {
bool set_rb_zero = true ;
if ( rb > 0 & & myds- > myds_type = = MYDS_FRONTEND ) {
if ( myds- > encrypted = = true ) {
if ( SSL_is_init_finished ( myds- > ssl ) ) {
if ( myds- > data_in_rbio ( ) ) {
if ( rb > 0 & & _ myds- > myds_type = = MYDS_FRONTEND ) {
if ( _ myds- > encrypted = = true ) {
if ( SSL_is_init_finished ( _ myds- > ssl ) ) {
if ( _ myds- > data_in_rbio ( ) ) {
set_rb_zero = false ;
}
}
@ -3584,7 +3588,8 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
} else {
if ( mypolls . fds [ n ] . revents ) {
myds - > myconn - > handler ( mypolls . fds [ n ] . revents ) ;
MySQL_Data_Stream * _myds = ( MySQL_Data_Stream * ) myds ;
_myds - > myconn - > handler ( mypolls . fds [ n ] . revents ) ;
}
}
if ( ( mypolls . fds [ n ] . events & POLLOUT )
@ -3593,20 +3598,21 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
) {
myds - > set_net_failure ( ) ;
}
myds - > check_data_flow ( ) ;
( ( MySQL_Data_Stream * ) myds ) - > check_data_flow ( ) ;
}
if ( myds - > active = = 0 ) {
if ( myds - > sess - > client_myds = = myds ) {
proxy_debug ( PROXY_DEBUG_NET , 1 , " Session=%p, DataStream=%p -- Deleting FD %d \n " , myds - > sess , myds , myds - > fd ) ;
myds - > sess - > set_unhealthy ( ) ;
MySQL_Session * mysess = ( MySQL_Session * ) ( myds - > sess ) ;
if ( mysess - > client_myds = = myds ) {
proxy_debug ( PROXY_DEBUG_NET , 1 , " Session=%p, DataStream=%p -- Deleting FD %d \n " , mysess , myds , myds - > fd ) ;
mysess - > set_unhealthy ( ) ;
} else {
// if this is a backend with fast_forward, set unhealthy
// if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library
if ( my ds- > sess- > session_fast_forward ) { // if fast forward
if ( my sess- > session_fast_forward ) { // if fast forward
if ( myds - > myds_type = = MYDS_BACKEND ) { // and backend
my ds- > sess- > set_unhealthy ( ) ; // set unhealthy
my sess- > set_unhealthy ( ) ; // set unhealthy
}
}
}
@ -3664,7 +3670,7 @@ void ProxyWorker_Thread::ProcessAllSessions_CompletedMirrorSession(unsigned int&
// this function was inline in ProxyWorker_Thread::process_all_sessions()
void ProxyWorker_Thread : : ProcessAllSessions_MaintenanceLoop ( Client _Session * sess , unsigned long long sess_time , unsigned int & total_active_transactions_ ) {
void ProxyWorker_Thread : : ProcessAllSessions_MaintenanceLoop ( MySQL _Session * sess , unsigned long long sess_time , unsigned int & total_active_transactions_ ) {
unsigned int numTrx = 0 ;
sess - > active_transactions = sess - > NumActiveTransactions ( ) ;
{
@ -3759,7 +3765,11 @@ void ProxyWorker_Thread::process_all_sessions() {
ProcessAllSessions_SortingSessions ( ) ;
}
for ( n = 0 ; n < mysql_sessions - > len ; n + + ) {
Client_Session * sess = ( Client_Session * ) mysql_sessions - > index ( n ) ;
Client_Session * c_sess = ( Client_Session * ) mysql_sessions - > index ( n ) ;
if ( c_sess - > session_type ! = PROXYSQL_SESSION_MYSQL ) {
continue ;
}
MySQL_Session * sess = ( MySQL_Session * ) c_sess ;
# ifdef DEBUG
if ( sess = = sess_stopat ) {
sess_stopat = sess ;
@ -3787,7 +3797,9 @@ void ProxyWorker_Thread::process_all_sessions() {
if ( idle_maintenance_thread = = false )
# endif // IDLE_THREADS
{
ProcessAllSessions_MaintenanceLoop ( sess , sess_time , total_active_transactions_ ) ;
if ( sess - > session_type = = PROXYSQL_SESSION_MYSQL ) {
ProcessAllSessions_MaintenanceLoop ( sess , sess_time , total_active_transactions_ ) ;
}
}
# ifdef IDLE_THREADS
else
@ -4172,7 +4184,7 @@ void ProxyWorker_Thread::listener_handle_new_connection(ProxySQL_Data_Stream *my
// create a new client connection
mypolls . fds [ n ] . revents = 0 ;
Client _Session * sess = create_new_session_and_client _data_stream( c ) ;
MySQL _Session * sess = create_new_session_and_client _mysql _data_stream( c ) ;
__sync_add_and_fetch ( & MyHGM - > status . client_connections_created , 1 ) ;
if ( __sync_add_and_fetch ( & MyHGM - > status . client_connections , 1 ) > mysql_thread___max_connections ) {
sess - > max_connections_reached = true ;
@ -4621,7 +4633,11 @@ SQLite3_result * ProxyWorker_Threads_Handler::SQL3_Processlist() {
pthread_mutex_lock ( & thr - > thread_mutex ) ;
unsigned int j ;
for ( j = 0 ; j < thr - > mysql_sessions - > len ; j + + ) {
Client_Session * sess = ( Client_Session * ) thr - > mysql_sessions - > pdata [ j ] ;
Client_Session * c_sess = ( Client_Session * ) thr - > mysql_sessions - > pdata [ j ] ;
if ( c_sess - > session_type ! = PROXYSQL_SESSION_MYSQL ) {
continue ;
}
MySQL_Session * sess = ( MySQL_Session * ) c_sess ;
if ( sess - > client_myds ) {
char buf [ 1024 ] ;
char * * pta = ( char * * ) malloc ( sizeof ( char * ) * colnum ) ;
@ -5231,9 +5247,13 @@ void ProxyWorker_Thread::Get_Memory_Stats() {
}
MySQL_Connection * ProxyWorker_Thread : : get_MyConn_local ( unsigned int _hid , Client_Session * sess, char * gtid_uuid , uint64_t gtid_trxid , int max_lag_ms ) {
MySQL_Connection * ProxyWorker_Thread : : get_MyConn_local ( unsigned int _hid , Client_Session * _ sess, char * gtid_uuid , uint64_t gtid_trxid , int max_lag_ms ) {
// some sanity check
if ( sess = = NULL ) return NULL ;
if ( _sess = = NULL ) return NULL ;
if ( _sess - > session_type ! = PROXYSQL_SESSION_MYSQL ) {
return NULL ;
}
MySQL_Session * sess = ( MySQL_Session * ) _sess ;
if ( sess - > client_myds = = NULL ) return NULL ;
if ( sess - > client_myds - > myconn = = NULL ) return NULL ;
if ( sess - > client_myds - > myconn - > userinfo = = NULL ) return NULL ;
@ -5360,7 +5380,11 @@ void ProxyWorker_Thread::Scan_Sessions_to_Kill_All() {
void ProxyWorker_Thread : : Scan_Sessions_to_Kill ( PtrArray * mysess ) {
for ( unsigned int n = 0 ; n < mysess - > len & & ( kq . conn_ids . size ( ) + kq . query_ids . size ( ) ) ; n + + ) {
Client_Session * _sess = ( Client_Session * ) mysess - > index ( n ) ;
Client_Session * c_sess = ( Client_Session * ) mysess - > index ( n ) ;
if ( c_sess - > session_type ! = PROXYSQL_SESSION_MYSQL ) {
continue ;
}
MySQL_Session * _sess = ( MySQL_Session * ) c_sess ;
bool cont = true ;
for ( std : : vector < thr_id_usr * > : : iterator it = kq . conn_ids . begin ( ) ; cont & & it ! = kq . conn_ids . end ( ) ; + + it ) {
thr_id_usr * t = * it ;
@ -5405,9 +5429,13 @@ bool ProxyWorker_Thread::move_session_to_idle_mysql_sessions(ProxySQL_Data_Strea
if ( _tmp_idle < ( ( curtime > ( unsigned int ) mysql_thread___session_idle_ms * 1000 ) ? ( curtime - mysql_thread___session_idle_ms * 1000 ) : 0 ) ) {
// make sure data stream has no pending data out and session is not throttled (#1939)
// because epoll thread does not handle data stream with data out
if ( myds - > sess - > client_myds = = myds & & ! myds - > available_data_out ( ) & & myds - > sess - > pause_until < = curtime ) {
if ( myds - > sess - > session_type ! = PROXYSQL_SESSION_MYSQL ) {
return false ;
}
MySQL_Session * mysess = ( MySQL_Session * ) ( myds - > sess ) ;
if ( mysess - > client_myds = = myds & & ! myds - > available_data_out ( ) & & mysess - > pause_until < = curtime ) {
//unsigned int j;
bool has_backends = myds - > sess - > has_any_backend ( ) ;
bool has_backends = my sess- > has_any_backend ( ) ;
/*
for ( j = 0 ; j < myds - > sess - > mybes - > len ; j + + ) {
MySQL_Backend * tmp_mybe = ( MySQL_Backend * ) myds - > sess - > mybes - > index ( j ) ;
@ -5418,14 +5446,14 @@ bool ProxyWorker_Thread::move_session_to_idle_mysql_sessions(ProxySQL_Data_Strea
}
*/
if ( has_backends = = false ) {
unsigned long long idle_since = curtime - my ds- > sess- > IdleTime ( ) ;
unsigned long long idle_since = curtime - my sess- > IdleTime ( ) ;
mypolls . remove_index_fast ( n ) ;
myds - > mypolls = NULL ;
unsigned int i = find_session_idx_in_mysql_sessions ( my ds- > sess) ;
my ds- > sess- > thread = NULL ;
unsigned int i = find_session_idx_in_mysql_sessions ( my sess) ;
my sess- > thread = NULL ;
unregister_session ( i ) ;
my ds- > sess- > idle_since = idle_since ;
idle_mysql_sessions - > add ( my ds- > sess) ;
my sess- > idle_since = idle_since ;
idle_mysql_sessions - > add ( my sess) ;
return true ;
}
}
@ -5435,10 +5463,14 @@ bool ProxyWorker_Thread::move_session_to_idle_mysql_sessions(ProxySQL_Data_Strea
# endif // IDLE_THREADS
bool ProxyWorker_Thread : : set_backend_to_be_skipped_if_frontend_is_slow ( ProxySQL_Data_Stream * myds , unsigned int n ) {
if ( myds - > sess & & myds - > sess - > client_myds & & myds - > sess - > mirror = = false ) {
if ( myds - > sess - > session_type ! = PROXYSQL_SESSION_MYSQL ) {
return false ;
}
MySQL_Session * mysess = ( MySQL_Session * ) ( myds - > sess ) ;
if ( mysess & & mysess - > client_myds & & mysess - > mirror = = false ) {
unsigned int buffered_data = 0 ;
buffered_data = myds - > sess - > client_myds - > PSarrayOUT - > len * RESULTSET_BUFLEN ;
buffered_data + = myds - > sess - > client_myds - > resultset - > len * RESULTSET_BUFLEN ;
buffered_data = my sess- > client_myds - > PSarrayOUT - > len * RESULTSET_BUFLEN ;
buffered_data + = my sess- > client_myds - > resultset - > len * RESULTSET_BUFLEN ;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
@ -5454,9 +5486,9 @@ bool ProxyWorker_Thread::set_backend_to_be_skipped_if_frontend_is_slow(ProxySQL_
void ProxyWorker_Thread : : idle_thread_gets_sessions_from_worker_thread ( ) {
pthread_mutex_lock ( & myexchange . mutex_idles ) ;
while ( myexchange . idle_mysql_sessions - > len ) {
Client_Session * mysess = ( Client _Session * ) myexchange . idle_mysql_sessions - > remove_index_fast ( 0 ) ;
MySQL_Session * mysess = ( MySQL _Session * ) myexchange . idle_mysql_sessions - > remove_index_fast ( 0 ) ;
register_session ( mysess , false ) ;
Prox ySQL_Data_Stream * myds = mysess - > client_myds ;
M ySQL_Data_Stream * myds = mysess - > client_myds ;
mypolls . add ( POLLIN , myds - > fd , myds , monotonic_time ( ) ) ;
// add in epoll()
struct epoll_event event ;
@ -5541,15 +5573,18 @@ void ProxyWorker_Thread::check_for_invalid_fd(unsigned int n) {
// check if the FD is valid
if ( mypolls . fds [ n ] . revents = = POLLNVAL ) {
// debugging output before assert
ProxySQL_Data_Stream * _myds = mypolls . myds [ n ] ;
if ( _myds ) {
if ( _myds - > myconn ) {
proxy_error ( " revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d \n " , mypolls . fds [ n ] . fd , mypolls . fds [ n ] . events , _myds - > fd , _myds - > myconn - > fd ) ;
assert ( mypolls . fds [ n ] . revents ! = POLLNVAL ) ;
ProxySQL_Data_Stream * pds = mypolls . myds [ n ] ;
if ( pds ) {
if ( pds - > sess - > session_type = = PROXYSQL_SESSION_MYSQL ) {
MySQL_Data_Stream * _myds = ( MySQL_Data_Stream * ) pds ;
if ( _myds - > myconn ) {
proxy_error ( " revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d \n " , mypolls . fds [ n ] . fd , mypolls . fds [ n ] . events , _myds - > fd , _myds - > myconn - > fd ) ;
assert ( mypolls . fds [ n ] . revents ! = POLLNVAL ) ;
}
}
}
// if we reached her, we didn't assert() yet
proxy_error ( " revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d \n " , mypolls . fds [ n ] . fd , mypolls . fds [ n ] . events , _my ds- > fd ) ;
proxy_error ( " revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d \n " , mypolls . fds [ n ] . fd , mypolls . fds [ n ] . events , p ds- > fd ) ;
assert ( mypolls . fds [ n ] . revents ! = POLLNVAL ) ;
}
}
@ -5590,13 +5625,14 @@ void ProxyWorker_Thread::tune_timeout_for_session_needs_pause(ProxySQL_Data_Stre
}
}
void ProxyWorker_Thread : : configure_pollout ( Prox ySQL_Data_Stream * myds , unsigned int n ) {
void ProxyWorker_Thread : : configure_pollout ( M ySQL_Data_Stream * myds , unsigned int n ) {
if ( myds - > myds_type = = MYDS_FRONTEND & & myds - > DSS = = STATE_SLEEP & & myds - > sess & & myds - > sess - > status = = WAITING_CLIENT_DATA ) {
myds - > set_pollout ( ) ;
} else {
if ( myds - > DSS > STATE_MARIADB_BEGIN & & myds - > DSS < STATE_MARIADB_END ) {
mypolls . fds [ n ] . events = POLLIN ;
if ( mypolls . myds [ n ] - > myconn - > async_exit_status & MYSQL_WAIT_WRITE )
assert ( myds = = mypolls . myds [ n ] ) ;
if ( myds - > myconn - > async_exit_status & MYSQL_WAIT_WRITE )
mypolls . fds [ n ] . events | = POLLOUT ;
} else {
myds - > set_pollout ( ) ;