From 483b8335d3cf7ff8b0aafd380fdd257deec3971c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Wed, 8 May 2024 10:54:24 +0000 Subject: [PATCH] More functions moved to Base_Thread * move_session_to_idle_mysql_sessions * find_session_idx_in_mysql_sessions * ProcessAllMyDS_BeforePoll --- include/Base_Thread.h | 6 +++ include/MySQL_Thread.h | 8 +-- include/PgSQL_Thread.h | 8 +-- lib/Base_Thread.cpp | 111 ++++++++++++++++++++++++++++++++++++----- lib/MySQL_Thread.cpp | 95 +---------------------------------- lib/PgSQL_Thread.cpp | 94 +--------------------------------- 6 files changed, 115 insertions(+), 207 deletions(-) diff --git a/include/Base_Thread.h b/include/Base_Thread.h index 8c16f9331..c41889d59 100644 --- a/include/Base_Thread.h +++ b/include/Base_Thread.h @@ -41,6 +41,7 @@ class Base_Thread { bool maintenance_loop; public: unsigned long long curtime; + unsigned long long last_move_to_idle_thread_time; int shutdown; PtrArray *mysql_sessions; Session_Regex **match_regexes; @@ -68,6 +69,11 @@ class Base_Thread { void configure_pollout(DS * myds, unsigned int n); template bool set_backend_to_be_skipped_if_frontend_is_slow(DS * myds, unsigned int n); +#ifdef IDLE_THREADS + template bool move_session_to_idle_mysql_sessions(DS * myds, unsigned int n); +#endif // IDLE_THREADS + template unsigned int find_session_idx_in_mysql_sessions(S * sess); + template void ProcessAllMyDS_BeforePoll(); friend class MySQL_Thread; friend class PgSQL_Thread; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 6b328a606..83c824abd 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -120,11 +120,11 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr); void idle_thread_prepares_session_to_send_to_worker_thread(int i); void idle_thread_to_kill_idle_sessions(); - bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); + //bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n); void run_Handle_epoll_wait(int); #endif // IDLE_THREADS - unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); + //unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess); //bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n); void handle_mirror_queue_mysql_sessions(); void handle_kill_queues(); @@ -154,7 +154,7 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread // unsigned long long curtime; unsigned long long pre_poll_time; unsigned long long last_maintenance_time; - unsigned long long last_move_to_idle_thread_time; + //unsigned long long last_move_to_idle_thread_time; std::atomic atomic_curtime; //PtrArray *mysql_sessions; PtrArray *mirror_queue_mysql_sessions; @@ -200,7 +200,7 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread bool init(); void run___get_multiple_idle_connections(int& num_idles); void run___cleanup_mirror_queue(); - void ProcessAllMyDS_BeforePoll(); + //void ProcessAllMyDS_BeforePoll(); //void ProcessAllMyDS_AfterPoll(); void run(); void poll_listener_add(int sock); diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index eb1f541e3..f05b732a3 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -141,10 +141,10 @@ private: void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(PgSQL_Thread * thr); void idle_thread_prepares_session_to_send_to_worker_thread(int i); void idle_thread_to_kill_idle_sessions(); - bool move_session_to_idle_mysql_sessions(PgSQL_Data_Stream * myds, unsigned int n); + //bool move_session_to_idle_mysql_sessions(PgSQL_Data_Stream * myds, unsigned int n); #endif // IDLE_THREADS - unsigned int find_session_idx_in_mysql_sessions(PgSQL_Session * sess); + //unsigned int find_session_idx_in_mysql_sessions(PgSQL_Session * sess); //bool set_backend_to_be_skipped_if_frontend_is_slow(PgSQL_Data_Stream * myds, unsigned int n); void handle_mirror_queue_mysql_sessions(); void handle_kill_queues(); @@ -166,7 +166,7 @@ public: pthread_t thread_id; unsigned long long pre_poll_time; unsigned long long last_maintenance_time; - unsigned long long last_move_to_idle_thread_time; + //unsigned long long last_move_to_idle_thread_time; std::atomic atomic_curtime; //PtrArray* mysql_sessions; PtrArray* mirror_queue_mysql_sessions; @@ -211,7 +211,7 @@ public: bool init(); void run___get_multiple_idle_connections(int& num_idles); void run___cleanup_mirror_queue(); - void ProcessAllMyDS_BeforePoll(); + //void ProcessAllMyDS_BeforePoll(); //void ProcessAllMyDS_AfterPoll(); void run(); void poll_listener_add(int sock); diff --git a/lib/Base_Thread.cpp b/lib/Base_Thread.cpp index 558b0fc8a..e6b41c806 100644 --- a/lib/Base_Thread.cpp +++ b/lib/Base_Thread.cpp @@ -11,22 +11,12 @@ // Explicitly instantiate the required template class and member functions template MySQL_Session* Base_Thread::create_new_session_and_client_data_stream(int); template PgSQL_Session* Base_Thread::create_new_session_and_client_data_stream(int); -template void Base_Thread::check_timing_out_session(unsigned int); -template void Base_Thread::check_timing_out_session(unsigned int); -template void Base_Thread::check_for_invalid_fd(unsigned int); -template void Base_Thread::check_for_invalid_fd(unsigned int); template void Base_Thread::ProcessAllSessions_SortingSessions(); template void Base_Thread::ProcessAllSessions_SortingSessions(); template void Base_Thread::ProcessAllMyDS_AfterPoll(); template void Base_Thread::ProcessAllMyDS_AfterPoll(); -template void Base_Thread::read_one_byte_from_pipe(unsigned int n); -template void Base_Thread::read_one_byte_from_pipe(unsigned int n); -template void Base_Thread::tune_timeout_for_myds_needs_pause(MySQL_Data_Stream *); -template void Base_Thread::tune_timeout_for_myds_needs_pause(PgSQL_Data_Stream *); -template void Base_Thread::tune_timeout_for_session_needs_pause(MySQL_Data_Stream *); -template void Base_Thread::tune_timeout_for_session_needs_pause(PgSQL_Data_Stream *); -template void Base_Thread::configure_pollout(MySQL_Data_Stream *, unsigned int); -template void Base_Thread::configure_pollout(PgSQL_Data_Stream *, unsigned int); +template void Base_Thread::ProcessAllMyDS_BeforePoll(); +template void Base_Thread::ProcessAllMyDS_BeforePoll(); Base_Thread::Base_Thread() { @@ -380,3 +370,100 @@ bool Base_Thread::set_backend_to_be_skipped_if_frontend_is_slow(DS * myds, unsig return false; } +#ifdef IDLE_THREADS +/** + * @brief Moves a session to the idle session array if it meets the idle criteria. + * + * This function checks if a session should be moved to the idle session array based on its idle time + * and other conditions. If the session meets the idle criteria, it is moved to the idle session array. + * + * @param myds Pointer to the MySQL data stream associated with the session. + * @param n The index of the session in the poll array. + * @return True if the session is moved to the idle session array, false otherwise. + */ +template +bool Base_Thread::move_session_to_idle_mysql_sessions(DS * myds, unsigned int n) { + T* thr = static_cast(this); + unsigned long long _tmp_idle = thr->mypolls.last_recv[n] > thr->mypolls.last_sent[n] ? thr->mypolls.last_recv[n] : thr->mypolls.last_sent[n] ; + if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { + // make sure data stream has no pending data out and session is not throttled (#1939) + // because epoll thread does not handle data stream with data out + if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { + //unsigned int j; + bool has_backends = myds->sess->has_any_backend(); + if (has_backends==false) { + unsigned long long idle_since = curtime - myds->sess->IdleTime(); + thr->mypolls.remove_index_fast(n); + myds->mypolls=NULL; + unsigned int i = find_session_idx_in_mysql_sessions(myds->sess); + myds->sess->thread=NULL; + thr->unregister_session(i); + myds->sess->idle_since = idle_since; + thr->idle_mysql_sessions->add(myds->sess); + return true; + } + } + } + return false; +} +#endif // IDLE_THREADS + +template +unsigned int Base_Thread::find_session_idx_in_mysql_sessions(S * sess) { + T* thr = static_cast(this); + unsigned int i=0; + for (i=0;ilen;i++) { + S *mysess=(S *)thr->mysql_sessions->index(i); + if (mysess==sess) { + return i; + } + } + return i; +} + +template +void Base_Thread::ProcessAllMyDS_BeforePoll() { + T* thr = static_cast(this); + bool check_if_move_to_idle_thread = false; +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + if (curtime > last_move_to_idle_thread_time + (unsigned long long)mysql_thread___session_idle_ms * 1000) { + last_move_to_idle_thread_time=curtime; + check_if_move_to_idle_thread=true; + } + } +#endif + for (unsigned int n = 0; n < thr->mypolls.len; n++) { + auto * myds=thr->mypolls.myds[n]; + thr->mypolls.fds[n].revents=0; + if (myds) { +#ifdef IDLE_THREADS + if (check_if_move_to_idle_thread == true) { + // here we try to move it to the maintenance thread + if (myds->myds_type==MYDS_FRONTEND && myds->sess) { + if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) { + if (move_session_to_idle_mysql_sessions(myds, n)) { + n--; // compensate mypolls.remove_index_fast(n) and n++ of loop + continue; + } + } + } + } +#endif // IDLE_THREADS + if (unlikely(myds->wait_until)) { + tune_timeout_for_myds_needs_pause(myds); + } + if (myds->sess) { + if (unlikely(myds->sess->pause_until > 0)) { + tune_timeout_for_session_needs_pause(myds); + } + } + myds->revents=0; + if (myds->myds_type!=MYDS_LISTENER) { + configure_pollout(myds, n); + } + } + proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", thr->mypolls.myds[n], thr->mypolls.fds[n].fd, thr->mypolls.fds[n].events); + } +} + diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index ea6f1a210..88fe791b6 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3005,52 +3005,6 @@ void MySQL_Thread::run___get_multiple_idle_connections(int& num_idles) { last_processing_idles=curtime; } -// this function was inline in MySQL_Thread::run() -void MySQL_Thread::ProcessAllMyDS_BeforePoll() { - bool check_if_move_to_idle_thread = false; -#ifdef IDLE_THREADS - if (GloVars.global.idle_threads) { - if (curtime > last_move_to_idle_thread_time + (unsigned long long)mysql_thread___session_idle_ms * 1000) { - last_move_to_idle_thread_time=curtime; - check_if_move_to_idle_thread=true; - } - } -#endif - for (unsigned int n = 0; n < mypolls.len; n++) { - MySQL_Data_Stream *myds=NULL; - myds=mypolls.myds[n]; - mypolls.fds[n].revents=0; - if (myds) { -#ifdef IDLE_THREADS - if (check_if_move_to_idle_thread == true) { - // here we try to move it to the maintenance thread - if (myds->myds_type==MYDS_FRONTEND && myds->sess) { - if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) { - if (move_session_to_idle_mysql_sessions(myds, n)) { - n--; // compensate mypolls.remove_index_fast(n) and n++ of loop - continue; - } - } - } - } -#endif // IDLE_THREADS - if (unlikely(myds->wait_until)) { - tune_timeout_for_myds_needs_pause(myds); - } - if (myds->sess) { - if (unlikely(myds->sess->pause_until > 0)) { - tune_timeout_for_session_needs_pause(myds); - } - } - myds->revents=0; - if (myds->myds_type!=MYDS_LISTENER) { - configure_pollout(myds, n); - } - } - proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); - } -} - // this function was inline in MySQL_Thread::run() /** * @brief Cleans up the mirror queue by removing excess sessions. @@ -3258,7 +3212,7 @@ __run_skip_1: handle_mirror_queue_mysql_sessions(); - ProcessAllMyDS_BeforePoll(); + ProcessAllMyDS_BeforePoll(); #ifdef IDLE_THREADS run_MoveSessionsBetweenThreads(); @@ -3390,17 +3344,6 @@ __run_skip_1: } // end of ::run() -unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) { - unsigned int i=0; - for (i=0;ilen;i++) { - MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i); - if (mysess==sess) { - return i; - } - } - return i; -} - #ifdef IDLE_THREADS /** @@ -5742,42 +5685,6 @@ void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) { } } -#ifdef IDLE_THREADS -/** - * @brief Moves a session to the idle session array if it meets the idle criteria. - * - * This function checks if a session should be moved to the idle session array based on its idle time - * and other conditions. If the session meets the idle criteria, it is moved to the idle session array. - * - * @param myds Pointer to the MySQL data stream associated with the session. - * @param n The index of the session in the poll array. - * @return True if the session is moved to the idle session array, false otherwise. - */ -bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n) { - unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ; - if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { - // make sure data stream has no pending data out and session is not throttled (#1939) - // because epoll thread does not handle data stream with data out - if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { - //unsigned int j; - bool has_backends = myds->sess->has_any_backend(); - if (has_backends==false) { - unsigned long long idle_since = curtime - myds->sess->IdleTime(); - mypolls.remove_index_fast(n); - myds->mypolls=NULL; - unsigned int i = find_session_idx_in_mysql_sessions(myds->sess); - myds->sess->thread=NULL; - unregister_session(i); - myds->sess->idle_since = idle_since; - idle_mysql_sessions->add(myds->sess); - return true; - } - } - } - return false; -} -#endif // IDLE_THREADS - #ifdef IDLE_THREADS /** * @brief Moves sessions from the idle thread's session array to the worker thread's session array. diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index a13791c85..d663a6667 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -2858,52 +2858,6 @@ void PgSQL_Thread::run___get_multiple_idle_connections(int& num_idles) { last_processing_idles = curtime; } -// this function was inline in PgSQL_Thread::run() -void PgSQL_Thread::ProcessAllMyDS_BeforePoll() { - bool check_if_move_to_idle_thread = false; -#ifdef IDLE_THREADS - if (GloVars.global.idle_threads) { - if (curtime > last_move_to_idle_thread_time + (unsigned long long)mysql_thread___session_idle_ms * 1000) { - last_move_to_idle_thread_time = curtime; - check_if_move_to_idle_thread = true; - } - } -#endif - for (unsigned int n = 0; n < mypolls.len; n++) { - PgSQL_Data_Stream* myds = NULL; - myds = mypolls.myds[n]; - mypolls.fds[n].revents = 0; - if (myds) { -#ifdef IDLE_THREADS - if (check_if_move_to_idle_thread == true) { - // here we try to move it to the maintenance thread - if (myds->myds_type == MYDS_FRONTEND && myds->sess) { - if (myds->DSS == STATE_SLEEP && myds->sess->status == WAITING_CLIENT_DATA) { - if (move_session_to_idle_mysql_sessions(myds, n)) { - n--; // compensate mypolls.remove_index_fast(n) and n++ of loop - continue; - } - } - } - } -#endif // IDLE_THREADS - if (unlikely(myds->wait_until)) { - tune_timeout_for_myds_needs_pause(myds); - } - if (myds->sess) { - if (unlikely(myds->sess->pause_until > 0)) { - tune_timeout_for_session_needs_pause(myds); - } - } - myds->revents = 0; - if (myds->myds_type != MYDS_LISTENER) { - configure_pollout(myds, n); - } - } - proxy_debug(PROXY_DEBUG_NET, 1, "Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); - } -} - // this function was inline in PgSQL_Thread::run() void PgSQL_Thread::run___cleanup_mirror_queue() { unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency; @@ -2969,7 +2923,7 @@ void PgSQL_Thread::run() { handle_mirror_queue_mysql_sessions(); - ProcessAllMyDS_BeforePoll(); + ProcessAllMyDS_BeforePoll(); #ifdef IDLE_THREADS if (GloVars.global.idle_threads) { @@ -3195,17 +3149,6 @@ void PgSQL_Thread::run() { } // end of ::run() -unsigned int PgSQL_Thread::find_session_idx_in_mysql_sessions(PgSQL_Session * sess) { - unsigned int i = 0; - for (i = 0; i < mysql_sessions->len; i++) { - PgSQL_Session* mysess = (PgSQL_Session*)mysql_sessions->index(i); - if (mysess == sess) { - return i; - } - } - return i; -} - #ifdef IDLE_THREADS void PgSQL_Thread::idle_thread_to_kill_idle_sessions() { #define SESS_TO_SCAN 128 @@ -5369,41 +5312,6 @@ void PgSQL_Thread::Scan_Sessions_to_Kill(PtrArray * mysess) { } } -#ifdef IDLE_THREADS -bool PgSQL_Thread::move_session_to_idle_mysql_sessions(PgSQL_Data_Stream * myds, unsigned int n) { - unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n]; - if (_tmp_idle < ((curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) { - // make sure data stream has no pending data out and session is not throttled (#1939) - // because epoll thread does not handle data stream with data out - if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) { - //unsigned int j; - bool has_backends = myds->sess->has_any_backend(); - /* - for (j=0;jsess->mybes->len;j++) { - MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j); - PgSQL_Data_Stream *__myds=tmp_mybe->server_myds; - if (__myds->myconn) { - conns++; - } - } - */ - if (has_backends == false) { - unsigned long long idle_since = curtime - myds->sess->IdleTime(); - mypolls.remove_index_fast(n); - myds->mypolls = NULL; - unsigned int i = find_session_idx_in_mysql_sessions(myds->sess); - myds->sess->thread = NULL; - unregister_session(i); - myds->sess->idle_since = idle_since; - idle_mysql_sessions->add(myds->sess); - return true; - } - } - } - return false; -} -#endif // IDLE_THREADS - #ifdef IDLE_THREADS void PgSQL_Thread::idle_thread_gets_sessions_from_worker_thread() { pthread_mutex_lock(&myexchange.mutex_idles);