More functions moved to Base_Thread

* move_session_to_idle_mysql_sessions
* find_session_idx_in_mysql_sessions
* ProcessAllMyDS_BeforePoll
v2.x_pg_PrepStmtBase_240714
René Cannaò 2 years ago
parent a881936cad
commit 483b8335d3

@ -41,6 +41,7 @@ class Base_Thread {
bool maintenance_loop;
public:
unsigned long long curtime;
unsigned long long last_move_to_idle_thread_time;
int shutdown;
PtrArray *mysql_sessions;
Session_Regex **match_regexes;
@ -68,6 +69,11 @@ class Base_Thread {
void configure_pollout(DS * myds, unsigned int n);
template<typename T, typename DS>
bool set_backend_to_be_skipped_if_frontend_is_slow(DS * myds, unsigned int n);
#ifdef IDLE_THREADS
template<typename T, typename DS> bool move_session_to_idle_mysql_sessions(DS * myds, unsigned int n);
#endif // IDLE_THREADS
template<typename T, typename S> unsigned int find_session_idx_in_mysql_sessions(S * sess);
template<typename T> void ProcessAllMyDS_BeforePoll();
friend class MySQL_Thread;
friend class PgSQL_Thread;

@ -120,11 +120,11 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread
void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(MySQL_Thread *thr);
void idle_thread_prepares_session_to_send_to_worker_thread(int i);
void idle_thread_to_kill_idle_sessions();
bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n);
//bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n);
void run_Handle_epoll_wait(int);
#endif // IDLE_THREADS
unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess);
//unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess);
//bool set_backend_to_be_skipped_if_frontend_is_slow(MySQL_Data_Stream *myds, unsigned int n);
void handle_mirror_queue_mysql_sessions();
void handle_kill_queues();
@ -154,7 +154,7 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread
// unsigned long long curtime;
unsigned long long pre_poll_time;
unsigned long long last_maintenance_time;
unsigned long long last_move_to_idle_thread_time;
//unsigned long long last_move_to_idle_thread_time;
std::atomic<unsigned long long> atomic_curtime;
//PtrArray *mysql_sessions;
PtrArray *mirror_queue_mysql_sessions;
@ -200,7 +200,7 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread
bool init();
void run___get_multiple_idle_connections(int& num_idles);
void run___cleanup_mirror_queue();
void ProcessAllMyDS_BeforePoll();
//void ProcessAllMyDS_BeforePoll();
//void ProcessAllMyDS_AfterPoll();
void run();
void poll_listener_add(int sock);

@ -141,10 +141,10 @@ private:
void idle_thread_check_if_worker_thread_has_unprocess_resumed_sessions_and_signal_it(PgSQL_Thread * thr);
void idle_thread_prepares_session_to_send_to_worker_thread(int i);
void idle_thread_to_kill_idle_sessions();
bool move_session_to_idle_mysql_sessions(PgSQL_Data_Stream * myds, unsigned int n);
//bool move_session_to_idle_mysql_sessions(PgSQL_Data_Stream * myds, unsigned int n);
#endif // IDLE_THREADS
unsigned int find_session_idx_in_mysql_sessions(PgSQL_Session * sess);
//unsigned int find_session_idx_in_mysql_sessions(PgSQL_Session * sess);
//bool set_backend_to_be_skipped_if_frontend_is_slow(PgSQL_Data_Stream * myds, unsigned int n);
void handle_mirror_queue_mysql_sessions();
void handle_kill_queues();
@ -166,7 +166,7 @@ public:
pthread_t thread_id;
unsigned long long pre_poll_time;
unsigned long long last_maintenance_time;
unsigned long long last_move_to_idle_thread_time;
//unsigned long long last_move_to_idle_thread_time;
std::atomic<unsigned long long> atomic_curtime;
//PtrArray* mysql_sessions;
PtrArray* mirror_queue_mysql_sessions;
@ -211,7 +211,7 @@ public:
bool init();
void run___get_multiple_idle_connections(int& num_idles);
void run___cleanup_mirror_queue();
void ProcessAllMyDS_BeforePoll();
//void ProcessAllMyDS_BeforePoll();
//void ProcessAllMyDS_AfterPoll();
void run();
void poll_listener_add(int sock);

@ -11,22 +11,12 @@
// Explicitly instantiate the required template class and member functions
template MySQL_Session* Base_Thread::create_new_session_and_client_data_stream<MySQL_Thread, MySQL_Session*>(int);
template PgSQL_Session* Base_Thread::create_new_session_and_client_data_stream<PgSQL_Thread, PgSQL_Session*>(int);
template void Base_Thread::check_timing_out_session<MySQL_Thread>(unsigned int);
template void Base_Thread::check_timing_out_session<PgSQL_Thread>(unsigned int);
template void Base_Thread::check_for_invalid_fd<MySQL_Thread>(unsigned int);
template void Base_Thread::check_for_invalid_fd<PgSQL_Thread>(unsigned int);
template void Base_Thread::ProcessAllSessions_SortingSessions<MySQL_Session>();
template void Base_Thread::ProcessAllSessions_SortingSessions<PgSQL_Session>();
template void Base_Thread::ProcessAllMyDS_AfterPoll<MySQL_Thread>();
template void Base_Thread::ProcessAllMyDS_AfterPoll<PgSQL_Thread>();
template void Base_Thread::read_one_byte_from_pipe<MySQL_Thread>(unsigned int n);
template void Base_Thread::read_one_byte_from_pipe<PgSQL_Thread>(unsigned int n);
template void Base_Thread::tune_timeout_for_myds_needs_pause<MySQL_Thread>(MySQL_Data_Stream *);
template void Base_Thread::tune_timeout_for_myds_needs_pause<PgSQL_Thread>(PgSQL_Data_Stream *);
template void Base_Thread::tune_timeout_for_session_needs_pause<MySQL_Thread>(MySQL_Data_Stream *);
template void Base_Thread::tune_timeout_for_session_needs_pause<PgSQL_Thread>(PgSQL_Data_Stream *);
template void Base_Thread::configure_pollout<MySQL_Thread>(MySQL_Data_Stream *, unsigned int);
template void Base_Thread::configure_pollout<PgSQL_Thread>(PgSQL_Data_Stream *, unsigned int);
template void Base_Thread::ProcessAllMyDS_BeforePoll<MySQL_Thread>();
template void Base_Thread::ProcessAllMyDS_BeforePoll<PgSQL_Thread>();
Base_Thread::Base_Thread() {
@ -380,3 +370,100 @@ bool Base_Thread::set_backend_to_be_skipped_if_frontend_is_slow(DS * myds, unsig
return false;
}
#ifdef IDLE_THREADS
/**
* @brief Moves a session to the idle session array if it meets the idle criteria.
*
* This function checks if a session should be moved to the idle session array based on its idle time
* and other conditions. If the session meets the idle criteria, it is moved to the idle session array.
*
* @param myds Pointer to the MySQL data stream associated with the session.
* @param n The index of the session in the poll array.
* @return True if the session is moved to the idle session array, false otherwise.
*/
template<typename T, typename DS>
bool Base_Thread::move_session_to_idle_mysql_sessions(DS * myds, unsigned int n) {
T* thr = static_cast<T*>(this);
unsigned long long _tmp_idle = thr->mypolls.last_recv[n] > thr->mypolls.last_sent[n] ? thr->mypolls.last_recv[n] : thr->mypolls.last_sent[n] ;
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
// make sure data stream has no pending data out and session is not throttled (#1939)
// because epoll thread does not handle data stream with data out
if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) {
//unsigned int j;
bool has_backends = myds->sess->has_any_backend();
if (has_backends==false) {
unsigned long long idle_since = curtime - myds->sess->IdleTime();
thr->mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i = find_session_idx_in_mysql_sessions<T>(myds->sess);
myds->sess->thread=NULL;
thr->unregister_session(i);
myds->sess->idle_since = idle_since;
thr->idle_mysql_sessions->add(myds->sess);
return true;
}
}
}
return false;
}
#endif // IDLE_THREADS
template<typename T, typename S>
unsigned int Base_Thread::find_session_idx_in_mysql_sessions(S * sess) {
T* thr = static_cast<T*>(this);
unsigned int i=0;
for (i=0;i<mysql_sessions->len;i++) {
S *mysess=(S *)thr->mysql_sessions->index(i);
if (mysess==sess) {
return i;
}
}
return i;
}
template<typename T>
void Base_Thread::ProcessAllMyDS_BeforePoll() {
T* thr = static_cast<T*>(this);
bool check_if_move_to_idle_thread = false;
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
if (curtime > last_move_to_idle_thread_time + (unsigned long long)mysql_thread___session_idle_ms * 1000) {
last_move_to_idle_thread_time=curtime;
check_if_move_to_idle_thread=true;
}
}
#endif
for (unsigned int n = 0; n < thr->mypolls.len; n++) {
auto * myds=thr->mypolls.myds[n];
thr->mypolls.fds[n].revents=0;
if (myds) {
#ifdef IDLE_THREADS
if (check_if_move_to_idle_thread == true) {
// here we try to move it to the maintenance thread
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
if (move_session_to_idle_mysql_sessions<T>(myds, n)) {
n--; // compensate mypolls.remove_index_fast(n) and n++ of loop
continue;
}
}
}
}
#endif // IDLE_THREADS
if (unlikely(myds->wait_until)) {
tune_timeout_for_myds_needs_pause<T>(myds);
}
if (myds->sess) {
if (unlikely(myds->sess->pause_until > 0)) {
tune_timeout_for_session_needs_pause<T>(myds);
}
}
myds->revents=0;
if (myds->myds_type!=MYDS_LISTENER) {
configure_pollout<T>(myds, n);
}
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", thr->mypolls.myds[n], thr->mypolls.fds[n].fd, thr->mypolls.fds[n].events);
}
}

@ -3005,52 +3005,6 @@ void MySQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
last_processing_idles=curtime;
}
// this function was inline in MySQL_Thread::run()
void MySQL_Thread::ProcessAllMyDS_BeforePoll() {
bool check_if_move_to_idle_thread = false;
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
if (curtime > last_move_to_idle_thread_time + (unsigned long long)mysql_thread___session_idle_ms * 1000) {
last_move_to_idle_thread_time=curtime;
check_if_move_to_idle_thread=true;
}
}
#endif
for (unsigned int n = 0; n < mypolls.len; n++) {
MySQL_Data_Stream *myds=NULL;
myds=mypolls.myds[n];
mypolls.fds[n].revents=0;
if (myds) {
#ifdef IDLE_THREADS
if (check_if_move_to_idle_thread == true) {
// here we try to move it to the maintenance thread
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
if (move_session_to_idle_mysql_sessions(myds, n)) {
n--; // compensate mypolls.remove_index_fast(n) and n++ of loop
continue;
}
}
}
}
#endif // IDLE_THREADS
if (unlikely(myds->wait_until)) {
tune_timeout_for_myds_needs_pause<MySQL_Thread>(myds);
}
if (myds->sess) {
if (unlikely(myds->sess->pause_until > 0)) {
tune_timeout_for_session_needs_pause<MySQL_Thread>(myds);
}
}
myds->revents=0;
if (myds->myds_type!=MYDS_LISTENER) {
configure_pollout<MySQL_Thread>(myds, n);
}
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
}
}
// this function was inline in MySQL_Thread::run()
/**
* @brief Cleans up the mirror queue by removing excess sessions.
@ -3258,7 +3212,7 @@ __run_skip_1:
handle_mirror_queue_mysql_sessions();
ProcessAllMyDS_BeforePoll();
ProcessAllMyDS_BeforePoll<MySQL_Thread>();
#ifdef IDLE_THREADS
run_MoveSessionsBetweenThreads();
@ -3390,17 +3344,6 @@ __run_skip_1:
}
// end of ::run()
unsigned int MySQL_Thread::find_session_idx_in_mysql_sessions(MySQL_Session *sess) {
unsigned int i=0;
for (i=0;i<mysql_sessions->len;i++) {
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(i);
if (mysess==sess) {
return i;
}
}
return i;
}
#ifdef IDLE_THREADS
/**
@ -5742,42 +5685,6 @@ void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) {
}
}
#ifdef IDLE_THREADS
/**
* @brief Moves a session to the idle session array if it meets the idle criteria.
*
* This function checks if a session should be moved to the idle session array based on its idle time
* and other conditions. If the session meets the idle criteria, it is moved to the idle session array.
*
* @param myds Pointer to the MySQL data stream associated with the session.
* @param n The index of the session in the poll array.
* @return True if the session is moved to the idle session array, false otherwise.
*/
bool MySQL_Thread::move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n) {
unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n] ;
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
// make sure data stream has no pending data out and session is not throttled (#1939)
// because epoll thread does not handle data stream with data out
if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) {
//unsigned int j;
bool has_backends = myds->sess->has_any_backend();
if (has_backends==false) {
unsigned long long idle_since = curtime - myds->sess->IdleTime();
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i = find_session_idx_in_mysql_sessions(myds->sess);
myds->sess->thread=NULL;
unregister_session(i);
myds->sess->idle_since = idle_since;
idle_mysql_sessions->add(myds->sess);
return true;
}
}
}
return false;
}
#endif // IDLE_THREADS
#ifdef IDLE_THREADS
/**
* @brief Moves sessions from the idle thread's session array to the worker thread's session array.

@ -2858,52 +2858,6 @@ void PgSQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
last_processing_idles = curtime;
}
// this function was inline in PgSQL_Thread::run()
void PgSQL_Thread::ProcessAllMyDS_BeforePoll() {
bool check_if_move_to_idle_thread = false;
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
if (curtime > last_move_to_idle_thread_time + (unsigned long long)mysql_thread___session_idle_ms * 1000) {
last_move_to_idle_thread_time = curtime;
check_if_move_to_idle_thread = true;
}
}
#endif
for (unsigned int n = 0; n < mypolls.len; n++) {
PgSQL_Data_Stream* myds = NULL;
myds = mypolls.myds[n];
mypolls.fds[n].revents = 0;
if (myds) {
#ifdef IDLE_THREADS
if (check_if_move_to_idle_thread == true) {
// here we try to move it to the maintenance thread
if (myds->myds_type == MYDS_FRONTEND && myds->sess) {
if (myds->DSS == STATE_SLEEP && myds->sess->status == WAITING_CLIENT_DATA) {
if (move_session_to_idle_mysql_sessions(myds, n)) {
n--; // compensate mypolls.remove_index_fast(n) and n++ of loop
continue;
}
}
}
}
#endif // IDLE_THREADS
if (unlikely(myds->wait_until)) {
tune_timeout_for_myds_needs_pause<PgSQL_Thread>(myds);
}
if (myds->sess) {
if (unlikely(myds->sess->pause_until > 0)) {
tune_timeout_for_session_needs_pause<PgSQL_Thread>(myds);
}
}
myds->revents = 0;
if (myds->myds_type != MYDS_LISTENER) {
configure_pollout<PgSQL_Thread>(myds, n);
}
}
proxy_debug(PROXY_DEBUG_NET, 1, "Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
}
}
// this function was inline in PgSQL_Thread::run()
void PgSQL_Thread::run___cleanup_mirror_queue() {
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
@ -2969,7 +2923,7 @@ void PgSQL_Thread::run() {
handle_mirror_queue_mysql_sessions();
ProcessAllMyDS_BeforePoll();
ProcessAllMyDS_BeforePoll<PgSQL_Thread>();
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
@ -3195,17 +3149,6 @@ void PgSQL_Thread::run() {
}
// end of ::run()
unsigned int PgSQL_Thread::find_session_idx_in_mysql_sessions(PgSQL_Session * sess) {
unsigned int i = 0;
for (i = 0; i < mysql_sessions->len; i++) {
PgSQL_Session* mysess = (PgSQL_Session*)mysql_sessions->index(i);
if (mysess == sess) {
return i;
}
}
return i;
}
#ifdef IDLE_THREADS
void PgSQL_Thread::idle_thread_to_kill_idle_sessions() {
#define SESS_TO_SCAN 128
@ -5369,41 +5312,6 @@ void PgSQL_Thread::Scan_Sessions_to_Kill(PtrArray * mysess) {
}
}
#ifdef IDLE_THREADS
bool PgSQL_Thread::move_session_to_idle_mysql_sessions(PgSQL_Data_Stream * myds, unsigned int n) {
unsigned long long _tmp_idle = mypolls.last_recv[n] > mypolls.last_sent[n] ? mypolls.last_recv[n] : mypolls.last_sent[n];
if (_tmp_idle < ((curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
// make sure data stream has no pending data out and session is not throttled (#1939)
// because epoll thread does not handle data stream with data out
if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) {
//unsigned int j;
bool has_backends = myds->sess->has_any_backend();
/*
for (j=0;j<myds->sess->mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
PgSQL_Data_Stream *__myds=tmp_mybe->server_myds;
if (__myds->myconn) {
conns++;
}
}
*/
if (has_backends == false) {
unsigned long long idle_since = curtime - myds->sess->IdleTime();
mypolls.remove_index_fast(n);
myds->mypolls = NULL;
unsigned int i = find_session_idx_in_mysql_sessions(myds->sess);
myds->sess->thread = NULL;
unregister_session(i);
myds->sess->idle_since = idle_since;
idle_mysql_sessions->add(myds->sess);
return true;
}
}
}
return false;
}
#endif // IDLE_THREADS
#ifdef IDLE_THREADS
void PgSQL_Thread::idle_thread_gets_sessions_from_worker_thread() {
pthread_mutex_lock(&myexchange.mutex_idles);

Loading…
Cancel
Save