diff --git a/Makefile b/Makefile index d24e1ae66..06f5fd4ef 100644 --- a/Makefile +++ b/Makefile @@ -132,7 +132,7 @@ export SOURCE_DATE_EPOCH ### rebuild SQLite with -USQLITE_ENABLE_MEMORY_MANAGEMENT in deps/Makefile O0 := -O0 -O2 := -O2 +O2 := -O2 -fno-omit-frame-pointer O1 := -O1 O3 := -O3 -mtune=native diff --git a/include/Base_Thread.h b/include/Base_Thread.h index 485fec913..255b19b18 100644 --- a/include/Base_Thread.h +++ b/include/Base_Thread.h @@ -47,7 +47,39 @@ public: private: bool maintenance_loop; - public: + + // Partition-gate state. note_pool_attempt() bumps the counters from the + // get_MyConn_from_pool() call site; update_partition_gate() consumes them + // once per outer process_all_sessions iteration. Single-threaded per worker. + unsigned int partition_pool_attempts = 0; + unsigned int partition_pool_nulls = 0; + unsigned int partition_streak = 0; + bool partition_active = false; + +public: + // Gate thresholds: NULL-ratio (NUM/DEN) classifies a tick as "stressed"; + // STREAK is the number of consecutive disagreeing ticks required to flip + // the gate state (hysteresis filter against transient bursts). + static constexpr unsigned int PARTITION_GATE_NULL_RATIO_NUM = 1; + static constexpr unsigned int PARTITION_GATE_NULL_RATIO_DEN = 20; // 5% + static constexpr unsigned int PARTITION_GATE_STREAK = 3; + // Below this attempt count a tick carries no signal: gate state and + // streak are left untouched. Avoids "2/2 NULL = 100% stressed" noise. + static constexpr unsigned int PARTITION_GATE_MIN_ATTEMPTS = 4; + + // Called by sessions inside this worker at the get_MyConn_from_pool() + // call site to feed the gate. + inline void note_pool_attempt(bool was_null) { + ++partition_pool_attempts; + if (was_null) ++partition_pool_nulls; + } + + // Runs the hysteresis state machine from per-tick counters, resets them, + // and returns whether the partition pass should run. MUST be called every + // outer iteration of process_all_sessions (even when the partition path + // is otherwise skipped) so the counters don't accumulate stale. + bool update_partition_gate(); + unsigned long long curtime; unsigned long long last_move_to_idle_thread_time; bool epoll_thread; @@ -65,7 +97,7 @@ private: template void check_for_invalid_fd(unsigned int n); template - void ProcessAllSessions_SortingSessions(); + void ProcessAllSessions_Partition(); template void ProcessAllMyDS_AfterPoll(); template diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 5b6b1a723..2a54cb61d 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -114,6 +114,7 @@ class __attribute__((aligned(64))) MySQL_Thread : public Base_Thread bool retrieve_gtids_required; // if any of the servers has gtid_port enabled, this needs to be turned on too PtrArray *cached_connections; + unsigned int push_local_counter; // round-robin counter for bounded local caching: cache 1-in-N where N = mysql_threads #ifdef IDLE_THREADS struct epoll_event events[MY_EPOLL_THREAD_MAXEVENTS]; diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 56ce0dbe2..21b94621a 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -160,6 +160,7 @@ private: //bool maintenance_loop; PtrArray* cached_connections; + unsigned int push_local_counter; // round-robin counter for bounded local caching: cache 1-in-N where N = pgsql_threads #ifdef IDLE_THREADS struct epoll_event events[MY_EPOLL_THREAD_MAXEVENTS]; diff --git a/lib/Base_Thread.cpp b/lib/Base_Thread.cpp index 263ac1004..e5820eafc 100644 --- a/lib/Base_Thread.cpp +++ b/lib/Base_Thread.cpp @@ -11,8 +11,8 @@ // Explicitly instantiate the required template class and member functions template MySQL_Session* Base_Thread::create_new_session_and_client_data_stream(int); template PgSQL_Session* Base_Thread::create_new_session_and_client_data_stream(int); -template void Base_Thread::ProcessAllSessions_SortingSessions(); -template void Base_Thread::ProcessAllSessions_SortingSessions(); +template void Base_Thread::ProcessAllSessions_Partition(); +template void Base_Thread::ProcessAllSessions_Partition(); template void Base_Thread::ProcessAllMyDS_AfterPoll(); template void Base_Thread::ProcessAllMyDS_AfterPoll(); template void Base_Thread::ProcessAllMyDS_BeforePoll(); @@ -34,6 +34,29 @@ Base_Thread::Base_Thread() : Base_Thread::~Base_Thread() { }; +bool Base_Thread::update_partition_gate() { + // 64-bit so the multiplications below cannot overflow unsigned int. + const uint64_t attempts = partition_pool_attempts; + const uint64_t nulls = partition_pool_nulls; + partition_pool_attempts = 0; + partition_pool_nulls = 0; + + // Low-volume ticks carry no signal; leave gate and streak unchanged. + if (attempts < PARTITION_GATE_MIN_ATTEMPTS) { + return partition_active; + } + + const bool stressed = (nulls * PARTITION_GATE_NULL_RATIO_DEN + >= attempts * PARTITION_GATE_NULL_RATIO_NUM); + if (stressed == partition_active) { + partition_streak = 0; + } else if (++partition_streak >= PARTITION_GATE_STREAK) { + partition_active = stressed; + partition_streak = 0; + } + return partition_active; +} + template void Base_Thread::register_session(T thr, S _sess, bool up_start) { if (mysql_sessions==NULL) { @@ -230,34 +253,64 @@ void Base_Thread::check_for_invalid_fd(unsigned int n) { } } -// this function was inline in MySQL_Thread::process_all_sessions() + /** - * @brief Sort all sessions based on maximum connection time. - * - * This function iterates through all MySQL sessions and sorts them based on their maximum connection time. - * Sessions with a valid maximum connection time are compared, and if one session has a greater maximum connection - * time than another, their positions in the session list are swapped. The sorting is performed in-place. - * - * @note This function assumes that MySQL sessions and their associated data structures have been initialized - * and are accessible within the MySQL Thread. + * @brief Partition all sessions into three blocks by backend state. + * + * Block layout produced in mysql_sessions->pdata: + * [0, running_end) block A - running a query against the backend + * (myconn != NULL, mct == 0, status != WAITING_CLIENT_DATA) + * [running_end, idle_begin) block B - acquiring/awaiting a backend + * (mct != 0) + * [idle_begin, len) block C - idle, or holds-conn-but-WAITING_CLIENT_DATA + * + * Block A drives the backend and may release its conn at end-of-query, giving + * block B sessions a fairness chance to acquire it. Sessions parked in + * WAITING_CLIENT_DATA (idle in a transaction after BEGIN) hold the conn but + * cannot release it until the client sends the next packet, so they live in C. + * + * Classification tests max_connect_time first: it must win over A even when + * myconn != NULL, to catch CHANGING_USER_SERVER on pooled connections and the + * post-error retry path where the old conn hasn't been destroyed yet. + * + * Single O(n) pass, in place. idx walks up, idle_begin walks down, they meet + * and terminate. A previous Lomuto-style sort of the B band by max_connect_time + * was removed: measurement showed it hurt throughput by ~12% at 500 clients / + * 50-conn pool under SSL, without a corresponding tail-latency benefit. If + * reintroduced, it should be gated on an explicit starvation-age signal rather + * than run unconditionally on every iteration. */ template -void Base_Thread::ProcessAllSessions_SortingSessions() { - unsigned int a=0; - for (unsigned int n=0; nlen; n++) { - S *sess=(S *)mysql_sessions->index(n); - if (sess->mybe && sess->mybe->server_myds) { - if (sess->mybe->server_myds->max_connect_time) { - S *sess2=(S *)mysql_sessions->index(a); - if (sess2->mybe && sess2->mybe->server_myds && sess2->mybe->server_myds->max_connect_time && sess2->mybe->server_myds->max_connect_time <= sess->mybe->server_myds->max_connect_time) { - // do nothing - } else { - void *p=mysql_sessions->pdata[a]; - mysql_sessions->pdata[a]=mysql_sessions->pdata[n]; - mysql_sessions->pdata[n]=p; - a++; - } +void Base_Thread::ProcessAllSessions_Partition() { + size_t running_end = 0; + size_t idle_begin = mysql_sessions->len; + size_t idx = 0; + + while (idx < idle_begin) { + S* s = static_cast(mysql_sessions->index(idx)); + + const bool has_be = (s->mybe && s->mybe->server_myds); + const bool is_B = has_be && (s->mybe->server_myds->max_connect_time != 0); + const bool is_A = !is_B && has_be && (s->mybe->server_myds->myconn != nullptr) && (s->status != WAITING_CLIENT_DATA); + + if (is_A) { + if (idx != running_end) { + void* p = mysql_sessions->pdata[idx]; + mysql_sessions->pdata[idx] = mysql_sessions->pdata[running_end]; + mysql_sessions->pdata[running_end] = p; + } + ++running_end; + ++idx; + } else if (is_B) { + ++idx; + } else { + --idle_begin; + if (idx != idle_begin) { + void* p = mysql_sessions->pdata[idx]; + mysql_sessions->pdata[idx] = mysql_sessions->pdata[idle_begin]; + mysql_sessions->pdata[idle_begin] = p; } + // do NOT advance idx - re-examine the swapped-in element test } } } diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 892e8b2d7..131ad27c9 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -7857,6 +7857,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED } else { mc=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id, this, (session_fast_forward || qpo->create_new_conn), NULL, 0, (int)qpo->max_lag_ms); } + thread->note_pool_attempt(mc == NULL); #ifdef STRESSTEST_POOL if (mc && (loops < NUM_SLOW_LOOPS - 1)) { if (mc->mysql) { diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index dac2df051..f39e2000d 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -4452,8 +4452,9 @@ void MySQL_Thread::process_all_sessions() { sess_sort=false; } #endif // IDLE_THREADS - if (sess_sort && mysql_sessions->len > 3) { - ProcessAllSessions_SortingSessions(); + const bool partition_wanted = update_partition_gate(); + if (sess_sort && mysql_sessions->len > 3 && partition_wanted) { + ProcessAllSessions_Partition(); } for (n=0; nlen; n++) { MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n); @@ -4784,6 +4785,7 @@ MySQL_Thread::MySQL_Thread() { pthread_mutex_init(&thread_mutex,NULL); my_idle_conns=NULL; cached_connections=NULL; + push_local_counter=0; mysql_sessions=NULL; mirror_queue_mysql_sessions=NULL; mirror_queue_mysql_sessions_cache=NULL; @@ -6466,14 +6468,22 @@ MySQL_Connection * MySQL_Thread::get_MyConn_local(unsigned int _hid, MySQL_Sessi * @param c Pointer to the MySQL_Connection object to be pushed to the local connection pool. */ void MySQL_Thread::push_MyConn_local(MySQL_Connection *c) { - MySrvC *mysrvc=NULL; - mysrvc=(MySrvC *)c->parent; + // Bounded local cache: cache 1-in-N releases (N = mysql_threads), push the + // rest to the shared HGM pool so peer workers can pick them up. + // At N=1 always cache (no sibling to share with). + // Rationale: avoids the connection-hoarding behavior that starved sibling + // workers at high client count, while preserving most of the lock-amortization + // benefit at lower client counts. + MySrvC *mysrvc=(MySrvC *)c->parent; // reset insert_id #1093 c->mysql->insert_id = 0; if (mysrvc->get_status() == MYSQL_SERVER_STATUS_ONLINE) { if (c->async_state_machine==ASYNC_IDLE) { - cached_connections->add(c); - return; // all went well + unsigned int n = (GloMTH && GloMTH->num_threads > 0) ? GloMTH->num_threads : 1; + if ((push_local_counter++ % n) == 0) { + cached_connections->add(c); + return; + } } } MyHGM->push_MyConn_to_pool(c); diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index de2435692..c96ba335d 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -108,8 +108,13 @@ static void __dump_pkt(const char* func, unsigned char* _ptr, unsigned int len) static enum pgsql_sslstatus get_sslstatus(SSL* ssl, int n) { + // See issue #5792. + // SSL_get_error() classifies based on the return code + SSL_want() state, not on the + // thread-local OpenSSL error queue. For SSL_ERROR_NONE / WANT_READ / WANT_WRITE no error is + // pushed onto the queue, so there is nothing to clear. Only the actual error classifications + // (ZERO_RETURN / SYSCALL / SSL / default) need the queue drained so the next SSL op on this + // thread starts clean. int err = SSL_get_error(ssl, n); - ERR_clear_error(); switch (err) { case SSL_ERROR_NONE: return PGSQL_SSLSTATUS_OK; @@ -119,6 +124,9 @@ static enum pgsql_sslstatus get_sslstatus(SSL* ssl, int n) case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_SYSCALL: default: + // drain the queue; any consumer that wanted the details should have read them + // before returning to this point + while (ERR_get_error()) { /* discard */ } return PGSQL_SSLSTATUS_FAIL; } } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 9ca13a850..fa36c3c56 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -5420,6 +5420,7 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED else { mc = PgHGM->get_MyConn_from_pool(mybe->hostgroup_id, this, (session_fast_forward || qpo->create_new_conn), NULL, 0, (int)qpo->max_lag_ms); } + thread->note_pool_attempt(mc == NULL); #ifdef STRESSTEST_POOL if (mc && (loops < NUM_SLOW_LOOPS - 1)) { if (mc->pgsql) { diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 6571d90dd..69ec9d493 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -3832,8 +3832,9 @@ void PgSQL_Thread::process_all_sessions() { sess_sort = false; } #endif // IDLE_THREADS - if (sess_sort && mysql_sessions->len > 3) { - ProcessAllSessions_SortingSessions(); + const bool partition_wanted = update_partition_gate(); + if (sess_sort && mysql_sessions->len > 3 && partition_wanted) { + ProcessAllSessions_Partition(); } for (n = 0; n < mysql_sessions->len; n++) { PgSQL_Session* sess = (PgSQL_Session*)mysql_sessions->index(n); @@ -4209,6 +4210,7 @@ PgSQL_Thread::PgSQL_Thread() { pthread_mutex_init(&thread_mutex, NULL); my_idle_conns = NULL; cached_connections = NULL; + push_local_counter = 0; mysql_sessions = NULL; mirror_queue_mysql_sessions = NULL; mirror_queue_mysql_sessions_cache = NULL; @@ -5840,14 +5842,20 @@ PgSQL_Connection* PgSQL_Thread::get_MyConn_local(unsigned int _hid, PgSQL_Sessio } void PgSQL_Thread::push_MyConn_local(PgSQL_Connection * c) { - PgSQL_SrvC* mysrvc = NULL; - mysrvc = (PgSQL_SrvC*)c->parent; - // reset insert_id #1093 - //c->pgsql->insert_id = 0; + // Bounded local cache: cache 1-in-N releases (N = pgsql_threads), push the + // rest to the shared HGM pool so peer workers can pick them up. + // At N=1 always cache (no sibling to share with). + // Rationale: avoids the connection-hoarding behavior that starved sibling + // workers at high client count, while preserving most of the lock-amortization + // benefit at lower client counts. + PgSQL_SrvC* mysrvc = (PgSQL_SrvC*)c->parent; if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE) { if (c->async_state_machine == ASYNC_IDLE) { - cached_connections->add(c); - return; // all went well + unsigned int n = (GloPTH && GloPTH->num_threads > 0) ? GloPTH->num_threads : 1; + if ((push_local_counter++ % n) == 0) { + cached_connections->add(c); + return; + } } } PgHGM->push_MyConn_to_pool(c); diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index e26fe47f6..451d63367 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -174,8 +174,13 @@ static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) static enum sslstatus get_sslstatus(SSL* ssl, int n) { + // See issue #5792. + // SSL_get_error() classifies based on the return code + SSL_want() state, not on the + // thread-local OpenSSL error queue. For SSL_ERROR_NONE / WANT_READ / WANT_WRITE no error is + // pushed onto the queue, so there is nothing to clear. Only the actual error classifications + // (ZERO_RETURN / SYSCALL / SSL / default) need the queue drained so the next SSL op on this + // thread starts clean. int err = SSL_get_error(ssl, n); - ERR_clear_error(); switch (err) { case SSL_ERROR_NONE: return SSLSTATUS_OK; @@ -185,6 +190,9 @@ static enum sslstatus get_sslstatus(SSL* ssl, int n) case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_SYSCALL: default: + // drain the queue; any consumer that wanted the details should have read them + // before returning to this point + while (ERR_get_error()) { /* discard */ } return SSLSTATUS_FAIL; } } diff --git a/test/tap/groups/groups.json b/test/tap/groups/groups.json index 4aaf85e9d..99d41e652 100644 --- a/test/tap/groups/groups.json +++ b/test/tap/groups/groups.json @@ -277,8 +277,8 @@ "reg_test_mariadb_stmt_store_result_async-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g2","mysql90-g2","mysql95-g2" ], "reg_test_mariadb_stmt_store_result_libmysql-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g2","mysql90-g2","mysql95-g2" ], "reg_test_oversize_first_pkt-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g7","mysql90-g2","mysql95-g2" ], - "reg_test_proclist_use_after_free-t" : [ "legacy-g6","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1","mysql84-g6","mysql90-g1","mysql95-g1" ], "reg_test_pp1_unknown_spoof-t" : [ "legacy-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3","mysql84-g3","mysql90-g3","mysql95-g3" ], + "reg_test_proclist_use_after_free-t" : [ "legacy-g6","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1","mysql84-g6","mysql90-g1","mysql95-g1" ], "reg_test_proxy_protocol_oversized_address-t" : [ "mysql84-g6","mysql95-g1" ], "reg_test_sql_calc_found_rows-t" : [ "legacy-g7","mysql-auto_increment_delay_multiplex=0-g2","mysql-multiplexing=false-g2","mysql-query_digests=0-g2","mysql-query_digests_keep_comment=1-g2","mysql84-g2","mysql90-g2","mysql95-g2" ], "reg_test_stmt_close_short_packet-t" : [ "mysql84-g6","mysql95-g1" ],