Merge pull request #5819 from sysown/v3.0_partition-gate

perf: session-partition gate (supersedes #5776 and #5799)
feature/ci-codecov-tap-legacy-g2
René Cannaò 1 month ago committed by GitHub
commit a1ca6a0ca3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -132,7 +132,7 @@ export SOURCE_DATE_EPOCH
### rebuild SQLite with -USQLITE_ENABLE_MEMORY_MANAGEMENT in deps/Makefile
O0 := -O0
O2 := -O2
O2 := -O2 -fno-omit-frame-pointer
O1 := -O1
O3 := -O3 -mtune=native

@ -47,7 +47,39 @@ public:
private:
bool maintenance_loop;
public:
// Partition-gate state. note_pool_attempt() bumps the counters from the
// get_MyConn_from_pool() call site; update_partition_gate() consumes them
// once per outer process_all_sessions iteration. Single-threaded per worker.
unsigned int partition_pool_attempts = 0;
unsigned int partition_pool_nulls = 0;
unsigned int partition_streak = 0;
bool partition_active = false;
public:
// Gate thresholds: NULL-ratio (NUM/DEN) classifies a tick as "stressed";
// STREAK is the number of consecutive disagreeing ticks required to flip
// the gate state (hysteresis filter against transient bursts).
static constexpr unsigned int PARTITION_GATE_NULL_RATIO_NUM = 1;
static constexpr unsigned int PARTITION_GATE_NULL_RATIO_DEN = 20; // 5%
static constexpr unsigned int PARTITION_GATE_STREAK = 3;
// Below this attempt count a tick carries no signal: gate state and
// streak are left untouched. Avoids "2/2 NULL = 100% stressed" noise.
static constexpr unsigned int PARTITION_GATE_MIN_ATTEMPTS = 4;
// Called by sessions inside this worker at the get_MyConn_from_pool()
// call site to feed the gate.
inline void note_pool_attempt(bool was_null) {
++partition_pool_attempts;
if (was_null) ++partition_pool_nulls;
}
// Runs the hysteresis state machine from per-tick counters, resets them,
// and returns whether the partition pass should run. MUST be called every
// outer iteration of process_all_sessions (even when the partition path
// is otherwise skipped) so the counters don't accumulate stale.
bool update_partition_gate();
unsigned long long curtime;
unsigned long long last_move_to_idle_thread_time;
bool epoll_thread;
@ -65,7 +97,7 @@ private:
template<typename T>
void check_for_invalid_fd(unsigned int n);
template<typename S>
void ProcessAllSessions_SortingSessions();
void ProcessAllSessions_Partition();
template<typename T>
void ProcessAllMyDS_AfterPoll();
template<typename T>

@ -114,6 +114,7 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread
bool retrieve_gtids_required; // if any of the servers has gtid_port enabled, this needs to be turned on too
PtrArray *cached_connections;
unsigned int push_local_counter; // round-robin counter for bounded local caching: cache 1-in-N where N = mysql_threads
#ifdef IDLE_THREADS
struct epoll_event events[MY_EPOLL_THREAD_MAXEVENTS];

@ -160,6 +160,7 @@ private:
//bool maintenance_loop;
PtrArray* cached_connections;
unsigned int push_local_counter; // round-robin counter for bounded local caching: cache 1-in-N where N = pgsql_threads
#ifdef IDLE_THREADS
struct epoll_event events[MY_EPOLL_THREAD_MAXEVENTS];

@ -11,8 +11,8 @@
// Explicitly instantiate the required template class and member functions
template MySQL_Session* Base_Thread::create_new_session_and_client_data_stream<MySQL_Thread, MySQL_Session*>(int);
template PgSQL_Session* Base_Thread::create_new_session_and_client_data_stream<PgSQL_Thread, PgSQL_Session*>(int);
template void Base_Thread::ProcessAllSessions_SortingSessions<MySQL_Session>();
template void Base_Thread::ProcessAllSessions_SortingSessions<PgSQL_Session>();
template void Base_Thread::ProcessAllSessions_Partition<MySQL_Session>();
template void Base_Thread::ProcessAllSessions_Partition<PgSQL_Session>();
template void Base_Thread::ProcessAllMyDS_AfterPoll<MySQL_Thread>();
template void Base_Thread::ProcessAllMyDS_AfterPoll<PgSQL_Thread>();
template void Base_Thread::ProcessAllMyDS_BeforePoll<MySQL_Thread>();
@ -34,6 +34,29 @@ Base_Thread::Base_Thread() :
Base_Thread::~Base_Thread() {
};
bool Base_Thread::update_partition_gate() {
// 64-bit so the multiplications below cannot overflow unsigned int.
const uint64_t attempts = partition_pool_attempts;
const uint64_t nulls = partition_pool_nulls;
partition_pool_attempts = 0;
partition_pool_nulls = 0;
// Low-volume ticks carry no signal; leave gate and streak unchanged.
if (attempts < PARTITION_GATE_MIN_ATTEMPTS) {
return partition_active;
}
const bool stressed = (nulls * PARTITION_GATE_NULL_RATIO_DEN
>= attempts * PARTITION_GATE_NULL_RATIO_NUM);
if (stressed == partition_active) {
partition_streak = 0;
} else if (++partition_streak >= PARTITION_GATE_STREAK) {
partition_active = stressed;
partition_streak = 0;
}
return partition_active;
}
template<typename T, typename S>
void Base_Thread::register_session(T thr, S _sess, bool up_start) {
if (mysql_sessions==NULL) {
@ -230,34 +253,64 @@ void Base_Thread::check_for_invalid_fd(unsigned int n) {
}
}
// this function was inline in MySQL_Thread::process_all_sessions()
/**
* @brief Sort all sessions based on maximum connection time.
*
* This function iterates through all MySQL sessions and sorts them based on their maximum connection time.
* Sessions with a valid maximum connection time are compared, and if one session has a greater maximum connection
* time than another, their positions in the session list are swapped. The sorting is performed in-place.
*
* @note This function assumes that MySQL sessions and their associated data structures have been initialized
* and are accessible within the MySQL Thread.
* @brief Partition all sessions into three blocks by backend state.
*
* Block layout produced in mysql_sessions->pdata:
* [0, running_end) block A - running a query against the backend
* (myconn != NULL, mct == 0, status != WAITING_CLIENT_DATA)
* [running_end, idle_begin) block B - acquiring/awaiting a backend
* (mct != 0)
* [idle_begin, len) block C - idle, or holds-conn-but-WAITING_CLIENT_DATA
*
* Block A drives the backend and may release its conn at end-of-query, giving
* block B sessions a fairness chance to acquire it. Sessions parked in
* WAITING_CLIENT_DATA (idle in a transaction after BEGIN) hold the conn but
* cannot release it until the client sends the next packet, so they live in C.
*
* Classification tests max_connect_time first: it must win over A even when
* myconn != NULL, to catch CHANGING_USER_SERVER on pooled connections and the
* post-error retry path where the old conn hasn't been destroyed yet.
*
* Single O(n) pass, in place. idx walks up, idle_begin walks down, they meet
* and terminate. A previous Lomuto-style sort of the B band by max_connect_time
* was removed: measurement showed it hurt throughput by ~12% at 500 clients /
* 50-conn pool under SSL, without a corresponding tail-latency benefit. If
* reintroduced, it should be gated on an explicit starvation-age signal rather
* than run unconditionally on every iteration.
*/
template<typename S>
void Base_Thread::ProcessAllSessions_SortingSessions() {
unsigned int a=0;
for (unsigned int n=0; n<mysql_sessions->len; n++) {
S *sess=(S *)mysql_sessions->index(n);
if (sess->mybe && sess->mybe->server_myds) {
if (sess->mybe->server_myds->max_connect_time) {
S *sess2=(S *)mysql_sessions->index(a);
if (sess2->mybe && sess2->mybe->server_myds && sess2->mybe->server_myds->max_connect_time && sess2->mybe->server_myds->max_connect_time <= sess->mybe->server_myds->max_connect_time) {
// do nothing
} else {
void *p=mysql_sessions->pdata[a];
mysql_sessions->pdata[a]=mysql_sessions->pdata[n];
mysql_sessions->pdata[n]=p;
a++;
}
void Base_Thread::ProcessAllSessions_Partition() {
size_t running_end = 0;
size_t idle_begin = mysql_sessions->len;
size_t idx = 0;
while (idx < idle_begin) {
S* s = static_cast<S*>(mysql_sessions->index(idx));
const bool has_be = (s->mybe && s->mybe->server_myds);
const bool is_B = has_be && (s->mybe->server_myds->max_connect_time != 0);
const bool is_A = !is_B && has_be && (s->mybe->server_myds->myconn != nullptr) && (s->status != WAITING_CLIENT_DATA);
if (is_A) {
if (idx != running_end) {
void* p = mysql_sessions->pdata[idx];
mysql_sessions->pdata[idx] = mysql_sessions->pdata[running_end];
mysql_sessions->pdata[running_end] = p;
}
++running_end;
++idx;
} else if (is_B) {
++idx;
} else {
--idle_begin;
if (idx != idle_begin) {
void* p = mysql_sessions->pdata[idx];
mysql_sessions->pdata[idx] = mysql_sessions->pdata[idle_begin];
mysql_sessions->pdata[idle_begin] = p;
}
// do NOT advance idx - re-examine the swapped-in element test
}
}
}

@ -7857,6 +7857,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
} else {
mc=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id, this, (session_fast_forward || qpo->create_new_conn), NULL, 0, (int)qpo->max_lag_ms);
}
thread->note_pool_attempt(mc == NULL);
#ifdef STRESSTEST_POOL
if (mc && (loops < NUM_SLOW_LOOPS - 1)) {
if (mc->mysql) {

@ -4452,8 +4452,9 @@ void MySQL_Thread::process_all_sessions() {
sess_sort=false;
}
#endif // IDLE_THREADS
if (sess_sort && mysql_sessions->len > 3) {
ProcessAllSessions_SortingSessions<MySQL_Session>();
const bool partition_wanted = update_partition_gate();
if (sess_sort && mysql_sessions->len > 3 && partition_wanted) {
ProcessAllSessions_Partition<MySQL_Session>();
}
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
@ -4784,6 +4785,7 @@ MySQL_Thread::MySQL_Thread() {
pthread_mutex_init(&thread_mutex,NULL);
my_idle_conns=NULL;
cached_connections=NULL;
push_local_counter=0;
mysql_sessions=NULL;
mirror_queue_mysql_sessions=NULL;
mirror_queue_mysql_sessions_cache=NULL;
@ -6466,14 +6468,22 @@ MySQL_Connection * MySQL_Thread::get_MyConn_local(unsigned int _hid, MySQL_Sessi
* @param c Pointer to the MySQL_Connection object to be pushed to the local connection pool.
*/
void MySQL_Thread::push_MyConn_local(MySQL_Connection *c) {
MySrvC *mysrvc=NULL;
mysrvc=(MySrvC *)c->parent;
// Bounded local cache: cache 1-in-N releases (N = mysql_threads), push the
// rest to the shared HGM pool so peer workers can pick them up.
// At N=1 always cache (no sibling to share with).
// Rationale: avoids the connection-hoarding behavior that starved sibling
// workers at high client count, while preserving most of the lock-amortization
// benefit at lower client counts.
MySrvC *mysrvc=(MySrvC *)c->parent;
// reset insert_id #1093
c->mysql->insert_id = 0;
if (mysrvc->get_status() == MYSQL_SERVER_STATUS_ONLINE) {
if (c->async_state_machine==ASYNC_IDLE) {
cached_connections->add(c);
return; // all went well
unsigned int n = (GloMTH && GloMTH->num_threads > 0) ? GloMTH->num_threads : 1;
if ((push_local_counter++ % n) == 0) {
cached_connections->add(c);
return;
}
}
}
MyHGM->push_MyConn_to_pool(c);

@ -108,8 +108,13 @@ static void __dump_pkt(const char* func, unsigned char* _ptr, unsigned int len)
static enum pgsql_sslstatus get_sslstatus(SSL* ssl, int n)
{
// See issue #5792.
// SSL_get_error() classifies based on the return code + SSL_want() state, not on the
// thread-local OpenSSL error queue. For SSL_ERROR_NONE / WANT_READ / WANT_WRITE no error is
// pushed onto the queue, so there is nothing to clear. Only the actual error classifications
// (ZERO_RETURN / SYSCALL / SSL / default) need the queue drained so the next SSL op on this
// thread starts clean.
int err = SSL_get_error(ssl, n);
ERR_clear_error();
switch (err) {
case SSL_ERROR_NONE:
return PGSQL_SSLSTATUS_OK;
@ -119,6 +124,9 @@ static enum pgsql_sslstatus get_sslstatus(SSL* ssl, int n)
case SSL_ERROR_ZERO_RETURN:
case SSL_ERROR_SYSCALL:
default:
// drain the queue; any consumer that wanted the details should have read them
// before returning to this point
while (ERR_get_error()) { /* discard */ }
return PGSQL_SSLSTATUS_FAIL;
}
}

@ -5420,6 +5420,7 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
else {
mc = PgHGM->get_MyConn_from_pool(mybe->hostgroup_id, this, (session_fast_forward || qpo->create_new_conn), NULL, 0, (int)qpo->max_lag_ms);
}
thread->note_pool_attempt(mc == NULL);
#ifdef STRESSTEST_POOL
if (mc && (loops < NUM_SLOW_LOOPS - 1)) {
if (mc->pgsql) {

@ -3832,8 +3832,9 @@ void PgSQL_Thread::process_all_sessions() {
sess_sort = false;
}
#endif // IDLE_THREADS
if (sess_sort && mysql_sessions->len > 3) {
ProcessAllSessions_SortingSessions<PgSQL_Session>();
const bool partition_wanted = update_partition_gate();
if (sess_sort && mysql_sessions->len > 3 && partition_wanted) {
ProcessAllSessions_Partition<PgSQL_Session>();
}
for (n = 0; n < mysql_sessions->len; n++) {
PgSQL_Session* sess = (PgSQL_Session*)mysql_sessions->index(n);
@ -4209,6 +4210,7 @@ PgSQL_Thread::PgSQL_Thread() {
pthread_mutex_init(&thread_mutex, NULL);
my_idle_conns = NULL;
cached_connections = NULL;
push_local_counter = 0;
mysql_sessions = NULL;
mirror_queue_mysql_sessions = NULL;
mirror_queue_mysql_sessions_cache = NULL;
@ -5840,14 +5842,20 @@ PgSQL_Connection* PgSQL_Thread::get_MyConn_local(unsigned int _hid, PgSQL_Sessio
}
void PgSQL_Thread::push_MyConn_local(PgSQL_Connection * c) {
PgSQL_SrvC* mysrvc = NULL;
mysrvc = (PgSQL_SrvC*)c->parent;
// reset insert_id #1093
//c->pgsql->insert_id = 0;
// Bounded local cache: cache 1-in-N releases (N = pgsql_threads), push the
// rest to the shared HGM pool so peer workers can pick them up.
// At N=1 always cache (no sibling to share with).
// Rationale: avoids the connection-hoarding behavior that starved sibling
// workers at high client count, while preserving most of the lock-amortization
// benefit at lower client counts.
PgSQL_SrvC* mysrvc = (PgSQL_SrvC*)c->parent;
if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE) {
if (c->async_state_machine == ASYNC_IDLE) {
cached_connections->add(c);
return; // all went well
unsigned int n = (GloPTH && GloPTH->num_threads > 0) ? GloPTH->num_threads : 1;
if ((push_local_counter++ % n) == 0) {
cached_connections->add(c);
return;
}
}
}
PgHGM->push_MyConn_to_pool(c);

@ -174,8 +174,13 @@ static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len)
static enum sslstatus get_sslstatus(SSL* ssl, int n)
{
// See issue #5792.
// SSL_get_error() classifies based on the return code + SSL_want() state, not on the
// thread-local OpenSSL error queue. For SSL_ERROR_NONE / WANT_READ / WANT_WRITE no error is
// pushed onto the queue, so there is nothing to clear. Only the actual error classifications
// (ZERO_RETURN / SYSCALL / SSL / default) need the queue drained so the next SSL op on this
// thread starts clean.
int err = SSL_get_error(ssl, n);
ERR_clear_error();
switch (err) {
case SSL_ERROR_NONE:
return SSLSTATUS_OK;
@ -185,6 +190,9 @@ static enum sslstatus get_sslstatus(SSL* ssl, int n)
case SSL_ERROR_ZERO_RETURN:
case SSL_ERROR_SYSCALL:
default:
// drain the queue; any consumer that wanted the details should have read them
// before returning to this point
while (ERR_get_error()) { /* discard */ }
return SSLSTATUS_FAIL;
}
}

@ -277,8 +277,8 @@
"reg_test_mariadb_stmt_store_result_async-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g2","mysql90-g2","mysql95-g2" ],
"reg_test_mariadb_stmt_store_result_libmysql-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g2","mysql90-g2","mysql95-g2" ],
"reg_test_oversize_first_pkt-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g7","mysql90-g2","mysql95-g2" ],
"reg_test_proclist_use_after_free-t" : [ "legacy-g6","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1","mysql84-g6","mysql90-g1","mysql95-g1" ],
"reg_test_pp1_unknown_spoof-t" : [ "legacy-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3","mysql84-g3","mysql90-g3","mysql95-g3" ],
"reg_test_proclist_use_after_free-t" : [ "legacy-g6","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1","mysql84-g6","mysql90-g1","mysql95-g1" ],
"reg_test_proxy_protocol_oversized_address-t" : [ "mysql84-g6","mysql95-g1" ],
"reg_test_sql_calc_found_rows-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g2","mysql90-g2","mysql95-g2" ],
"reg_test_stmt_close_short_packet-t" : [ "mysql84-g6","mysql95-g1" ],

Loading…
Cancel
Save