diff --git a/include/Base_Thread.h b/include/Base_Thread.h index 485fec913..b4fb25c1a 100644 --- a/include/Base_Thread.h +++ b/include/Base_Thread.h @@ -65,7 +65,7 @@ private: template void check_for_invalid_fd(unsigned int n); template - void ProcessAllSessions_SortingSessions(); + void ProcessAllSessions_Partition(); template void ProcessAllMyDS_AfterPoll(); template diff --git a/lib/Base_Thread.cpp b/lib/Base_Thread.cpp index 263ac1004..25155dbf6 100644 --- a/lib/Base_Thread.cpp +++ b/lib/Base_Thread.cpp @@ -11,8 +11,8 @@ // Explicitly instantiate the required template class and member functions template MySQL_Session* Base_Thread::create_new_session_and_client_data_stream(int); template PgSQL_Session* Base_Thread::create_new_session_and_client_data_stream(int); -template void Base_Thread::ProcessAllSessions_SortingSessions(); -template void Base_Thread::ProcessAllSessions_SortingSessions(); +template void Base_Thread::ProcessAllSessions_Partition(); +template void Base_Thread::ProcessAllSessions_Partition(); template void Base_Thread::ProcessAllMyDS_AfterPoll(); template void Base_Thread::ProcessAllMyDS_AfterPoll(); template void Base_Thread::ProcessAllMyDS_BeforePoll(); @@ -230,34 +230,58 @@ void Base_Thread::check_for_invalid_fd(unsigned int n) { } } -// this function was inline in MySQL_Thread::process_all_sessions() + /** - * @brief Sort all sessions based on maximum connection time. - * - * This function iterates through all MySQL sessions and sorts them based on their maximum connection time. - * Sessions with a valid maximum connection time are compared, and if one session has a greater maximum connection - * time than another, their positions in the session list are swapped. The sorting is performed in-place. - * - * @note This function assumes that MySQL sessions and their associated data structures have been initialized - * and are accessible within the MySQL Thread. + * @brief Partition all sessions into three contiguous blocks + * + * Layout produced in mysql_sessions->pdata: + * [0, running_end) block A: running a query against the backend + * [running_end, idle_begin) block B: acquiring a backend (max_connect_time != 0) + * [idle_begin, len) block C: idle (no backend, or holds-conn-but-WAITING_CLIENT_DATA) + * + * Block A is "session is currently driving the backend" — these sessions have a chance + * of releasing the connection at end-of-query (depending on multiplexing eligibility, + * transaction state, etc.), giving block B sessions a fairness chance to acquire it. + * Sessions parked in WAITING_CLIENT_DATA (e.g., idle in a transaction after BEGIN) hold + * the conn but cannot release it until the client sends the next packet, so they live in C. + * + * Classification tests max_connect_time first: it must win over A even when myconn != NULL, + * to catch CHANGING_USER_SERVER on pooled connections and the post-error retry path where + * the old conn hasn't been destroyed yet. + * + * Single O(n) pass, in place. idx walks up, idle_begin walks down, they meet and terminate. */ template -void Base_Thread::ProcessAllSessions_SortingSessions() { - unsigned int a=0; - for (unsigned int n=0; nlen; n++) { - S *sess=(S *)mysql_sessions->index(n); - if (sess->mybe && sess->mybe->server_myds) { - if (sess->mybe->server_myds->max_connect_time) { - S *sess2=(S *)mysql_sessions->index(a); - if (sess2->mybe && sess2->mybe->server_myds && sess2->mybe->server_myds->max_connect_time && sess2->mybe->server_myds->max_connect_time <= sess->mybe->server_myds->max_connect_time) { - // do nothing - } else { - void *p=mysql_sessions->pdata[a]; - mysql_sessions->pdata[a]=mysql_sessions->pdata[n]; - mysql_sessions->pdata[n]=p; - a++; - } +void Base_Thread::ProcessAllSessions_Partition() { + size_t running_end = 0; + size_t idle_begin = mysql_sessions->len; + size_t idx = 0; + + while (idx < idle_begin) { + S* s = static_cast(mysql_sessions->index(idx)); + + const bool has_be = (s->mybe && s->mybe->server_myds); + const bool is_B = has_be && (s->mybe->server_myds->max_connect_time != 0); + const bool is_A = !is_B && has_be && (s->mybe->server_myds->myconn != nullptr) && (s->status != WAITING_CLIENT_DATA); + + if (is_A) { + if (idx != running_end) { + void* p = mysql_sessions->pdata[idx]; + mysql_sessions->pdata[idx] = mysql_sessions->pdata[running_end]; + mysql_sessions->pdata[running_end] = p; + } + ++running_end; + ++idx; + } else if (is_B) { + ++idx; + } else { + --idle_begin; + if (idx != idle_begin) { + void* p = mysql_sessions->pdata[idx]; + mysql_sessions->pdata[idx] = mysql_sessions->pdata[idle_begin]; + mysql_sessions->pdata[idle_begin] = p; } + // do NOT advance idx - re-examine the swapped-in element test } } } diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 1e4847663..26abae76e 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -4453,7 +4453,7 @@ void MySQL_Thread::process_all_sessions() { } #endif // IDLE_THREADS if (sess_sort && mysql_sessions->len > 3) { - ProcessAllSessions_SortingSessions(); + ProcessAllSessions_Partition(); } for (n=0; nlen; n++) { MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n); diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index e99e3f8c5..94874e39e 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -3833,7 +3833,7 @@ void PgSQL_Thread::process_all_sessions() { } #endif // IDLE_THREADS if (sess_sort && mysql_sessions->len > 3) { - ProcessAllSessions_SortingSessions(); + ProcessAllSessions_Partition(); } for (n = 0; n < mysql_sessions->len; n++) { PgSQL_Session* sess = (PgSQL_Session*)mysql_sessions->index(n);