Refactor: partition sessions by backend state instead of sorting

Replace ProcessAllSessions_SortingSessions with ProcessAllSessions_Partition.
The old function did an implicit one-pass swap-sort over mysql_sessions by
max_connect_time; the new one explicitly partitions sessions into three
contiguous blocks in pdata according to their backend state:

    [0, running_end)         A: running a query against the backend
    [running_end, idle_begin) B: acquiring a backend (max_connect_time != 0)
    [idle_begin, len)        C: idle (no backend, or holding a conn while
                                parked in WAITING_CLIENT_DATA)
v3.0_latency_consistency_improvement
Rahim Kanji 1 month ago
parent 9cc20a8775
commit ca202ceba1

@ -65,7 +65,7 @@ private:
template<typename T>
void check_for_invalid_fd(unsigned int n);
template<typename S>
void ProcessAllSessions_SortingSessions();
void ProcessAllSessions_Partition();
template<typename T>
void ProcessAllMyDS_AfterPoll();
template<typename T>

@ -11,8 +11,8 @@
// Explicitly instantiate the required template class and member functions
template MySQL_Session* Base_Thread::create_new_session_and_client_data_stream<MySQL_Thread, MySQL_Session*>(int);
template PgSQL_Session* Base_Thread::create_new_session_and_client_data_stream<PgSQL_Thread, PgSQL_Session*>(int);
template void Base_Thread::ProcessAllSessions_SortingSessions<MySQL_Session>();
template void Base_Thread::ProcessAllSessions_SortingSessions<PgSQL_Session>();
template void Base_Thread::ProcessAllSessions_Partition<MySQL_Session>();
template void Base_Thread::ProcessAllSessions_Partition<PgSQL_Session>();
template void Base_Thread::ProcessAllMyDS_AfterPoll<MySQL_Thread>();
template void Base_Thread::ProcessAllMyDS_AfterPoll<PgSQL_Thread>();
template void Base_Thread::ProcessAllMyDS_BeforePoll<MySQL_Thread>();
@ -230,34 +230,58 @@ void Base_Thread::check_for_invalid_fd(unsigned int n) {
}
}
// this function was inline in MySQL_Thread::process_all_sessions()
/**
* @brief Sort all sessions based on maximum connection time.
*
* This function iterates through all MySQL sessions and sorts them based on their maximum connection time.
* Sessions with a valid maximum connection time are compared, and if one session has a greater maximum connection
* time than another, their positions in the session list are swapped. The sorting is performed in-place.
*
* @note This function assumes that MySQL sessions and their associated data structures have been initialized
* and are accessible within the MySQL Thread.
* @brief Partition all sessions into three contiguous blocks
*
* Layout produced in mysql_sessions->pdata:
* [0, running_end) block A: running a query against the backend
* [running_end, idle_begin) block B: acquiring a backend (max_connect_time != 0)
* [idle_begin, len) block C: idle (no backend, or holds-conn-but-WAITING_CLIENT_DATA)
*
* Block A is "session is currently driving the backend" these sessions have a chance
* of releasing the connection at end-of-query (depending on multiplexing eligibility,
* transaction state, etc.), giving block B sessions a fairness chance to acquire it.
* Sessions parked in WAITING_CLIENT_DATA (e.g., idle in a transaction after BEGIN) hold
* the conn but cannot release it until the client sends the next packet, so they live in C.
*
* Classification tests max_connect_time first: it must win over A even when myconn != NULL,
* to catch CHANGING_USER_SERVER on pooled connections and the post-error retry path where
* the old conn hasn't been destroyed yet.
*
* Single O(n) pass, in place. idx walks up, idle_begin walks down, they meet and terminate.
*/
template<typename S>
void Base_Thread::ProcessAllSessions_SortingSessions() {
unsigned int a=0;
for (unsigned int n=0; n<mysql_sessions->len; n++) {
S *sess=(S *)mysql_sessions->index(n);
if (sess->mybe && sess->mybe->server_myds) {
if (sess->mybe->server_myds->max_connect_time) {
S *sess2=(S *)mysql_sessions->index(a);
if (sess2->mybe && sess2->mybe->server_myds && sess2->mybe->server_myds->max_connect_time && sess2->mybe->server_myds->max_connect_time <= sess->mybe->server_myds->max_connect_time) {
// do nothing
} else {
void *p=mysql_sessions->pdata[a];
mysql_sessions->pdata[a]=mysql_sessions->pdata[n];
mysql_sessions->pdata[n]=p;
a++;
}
void Base_Thread::ProcessAllSessions_Partition() {
size_t running_end = 0;
size_t idle_begin = mysql_sessions->len;
size_t idx = 0;
while (idx < idle_begin) {
S* s = static_cast<S*>(mysql_sessions->index(idx));
const bool has_be = (s->mybe && s->mybe->server_myds);
const bool is_B = has_be && (s->mybe->server_myds->max_connect_time != 0);
const bool is_A = !is_B && has_be && (s->mybe->server_myds->myconn != nullptr) && (s->status != WAITING_CLIENT_DATA);
if (is_A) {
if (idx != running_end) {
void* p = mysql_sessions->pdata[idx];
mysql_sessions->pdata[idx] = mysql_sessions->pdata[running_end];
mysql_sessions->pdata[running_end] = p;
}
++running_end;
++idx;
} else if (is_B) {
++idx;
} else {
--idle_begin;
if (idx != idle_begin) {
void* p = mysql_sessions->pdata[idx];
mysql_sessions->pdata[idx] = mysql_sessions->pdata[idle_begin];
mysql_sessions->pdata[idle_begin] = p;
}
// do NOT advance idx - re-examine the swapped-in element test
}
}
}

@ -4453,7 +4453,7 @@ void MySQL_Thread::process_all_sessions() {
}
#endif // IDLE_THREADS
if (sess_sort && mysql_sessions->len > 3) {
ProcessAllSessions_SortingSessions<MySQL_Session>();
ProcessAllSessions_Partition<MySQL_Session>();
}
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);

@ -3833,7 +3833,7 @@ void PgSQL_Thread::process_all_sessions() {
}
#endif // IDLE_THREADS
if (sess_sort && mysql_sessions->len > 3) {
ProcessAllSessions_SortingSessions<PgSQL_Session>();
ProcessAllSessions_Partition<PgSQL_Session>();
}
for (n = 0; n < mysql_sessions->len; n++) {
PgSQL_Session* sess = (PgSQL_Session*)mysql_sessions->index(n);

Loading…
Cancel
Save