From 1c3c42959c18cd704dcc909fe69b3138fb0000ee Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Sun, 9 Nov 2025 02:39:13 +0500 Subject: [PATCH] Improve ping accuracy - Introduced batching for ping task dispatch (default: 30 servers per batch) - Moved mysql_server_ping_log writes to a thread pool --- include/MySQL_Monitor.hpp | 20 ++- lib/MySQL_Monitor.cpp | 300 +++++++++++++++++++++++++------------- 2 files changed, 211 insertions(+), 109 deletions(-) diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 6ce9b83ff..ed403a34e 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -334,13 +334,19 @@ private: template class WorkItem { public: - T *data; - void *(*routine) (void *); - WorkItem(T*_data, void *(*start_routine) (void *)) { - data=_data; - routine=start_routine; - } - ~WorkItem() {} + std::vector data; + using entry_point = void *(*)(const std::vector& data); + entry_point start_routine; + WorkItem(T*_data, entry_point _start_routine) { + data.push_back(_data); + start_routine = _start_routine; + } + WorkItem(std::vector&& _data, entry_point _start_routine) + : data(std::move(_data)), start_routine(_start_routine) {} + WorkItem(const std::vector& _data, entry_point _start_routine) + : data(_data), start_routine(_start_routine) { + } + ~WorkItem() = default; }; struct p_mon_counter { diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index e324f4278..4cd418779 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -119,7 +119,7 @@ class ConsumerThread : public Thread { // available to process. for (int i = 0; (thrn ? i < thrn : 1); i++) { //VALGRIND_DISABLE_ERROR_REPORTING; - WorkItem* item = (WorkItem*)m_queue.remove(); + WorkItem* item = m_queue.remove(); //VALGRIND_ENABLE_ERROR_REPORTING; if (item == NULL) { if (thrn) { @@ -132,7 +132,7 @@ class ConsumerThread : public Thread { } - if (item->routine) { // NULL is allowed, do nothing for it + if (item->start_routine) { // NULL is allowed, do nothing for it bool me = true; if (check_monitor_enabled_flag) { @@ -142,10 +142,13 @@ class ConsumerThread : public Thread { } if (me) { - item->routine((void *)item->data); + item->start_routine(item->data); } } - delete item->data; + for (auto ptr : item->data) { + delete ptr; + } + item->data.clear(); delete item; } cleanup: @@ -1356,9 +1359,10 @@ void MySQL_Monitor::update_monitor_proxysql_servers(SQLite3_result* resultset) { pthread_mutex_unlock(&GloMyMon->proxysql_servers_mutex); } -void * monitor_connect_thread(void *arg) { +void * monitor_connect_thread(const std::vector& mmsds) { + assert(!mmsds.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = mmsds.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1417,9 +1421,10 @@ void * monitor_connect_thread(void *arg) { return NULL; } -void * monitor_ping_thread(void *arg) { +void * monitor_ping_thread(const std::vector& mmsds) { + assert(!mmsds.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = mmsds.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1654,10 +1659,11 @@ bool MySQL_Monitor_State_Data::create_new_connection() { return true; } -void * monitor_read_only_thread(void *arg) { +void * monitor_read_only_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); bool timeout_reached = false; - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -1935,9 +1941,10 @@ __fast_exit_monitor_read_only_thread: return NULL; } -void * monitor_group_replication_thread(void *arg) { +void * monitor_group_replication_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); mysql_thr->refresh_variables(); @@ -2286,9 +2293,10 @@ __fast_exit_monitor_group_replication_thread: return NULL; } -void * monitor_galera_thread(void *arg) { +void * monitor_galera_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); mysql_thr->refresh_variables(); @@ -2686,9 +2694,10 @@ __fast_exit_monitor_galera_thread: return NULL; } -void * monitor_replication_lag_thread(void *arg) { +void * monitor_replication_lag_thread(const std::vector& data) { + assert(!data.empty()); mysql_close(mysql_init(NULL)); - MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg; + MySQL_Monitor_State_Data *mmsd = data.front(); if (!GloMTH) return NULL; // quick exit during shutdown/restart MySQL_Thread * mysql_thr = new MySQL_Thread(); mysql_thr->curtime=monotonic_time(); @@ -4756,9 +4765,9 @@ std::string debug_iplisttostring(const T& ips) { return sstr.str(); } -void* monitor_dns_resolver_thread(void* args) { - - DNS_Resolve_Data* dns_resolve_data = static_cast(args); +void* monitor_dns_resolver_thread(const std::vector& dns_resolve_data_list) { + assert(!dns_resolve_data_list.empty()); + DNS_Resolve_Data* dns_resolve_data = dns_resolve_data_list.front(); struct addrinfo hints, *res = NULL; @@ -5186,11 +5195,11 @@ void * MySQL_Monitor::run() { __monitor_run: while (queue->size()) { // this is a clean up in case Monitor was restarted - WorkItem* item = (WorkItem*)queue->remove(); + WorkItem* item = queue->remove(); if (item) { - if (item->data) { - delete item->data; - } + for (auto ptr : item->data) + delete ptr; + item->data.clear(); delete item; } } @@ -7034,9 +7043,10 @@ public: public: using process_ready_tasks_cb = bool (MySQL_Monitor::*)(const std::vector& mmsds); - Process_Ready_Task_Callback_Args(unsigned int min_tasks_to_process, float percentage, + Process_Ready_Task_Callback_Args(unsigned int min_tasks_to_process, unsigned int max_task_to_send, float percentage, process_ready_tasks_cb callback, MySQL_Monitor* mysql_monitor) : - min_task_to_process_(min_tasks_to_process), process_task_percentage_(percentage / 100.00), process_ready_tasks_cb_(callback), + min_task_to_process_(min_tasks_to_process), max_task_to_send_(max_task_to_send), + process_task_percentage_(percentage / 100.00), process_ready_tasks_cb_(callback), mysql_monitor_(mysql_monitor) { assert(mysql_monitor_); assert(process_ready_tasks_cb_); @@ -7053,6 +7063,7 @@ public: friend class Monitor_Poll; unsigned int min_task_to_process_; + unsigned int max_task_to_send_; float process_task_percentage_; process_ready_tasks_cb process_ready_tasks_cb_; MySQL_Monitor* mysql_monitor_; @@ -7081,19 +7092,24 @@ public: void add(short _events, MySQL_Monitor_State_Data* mmsd) { assert(mmsd); - assert(mmsd->mysql); - + if (len_ == capacity_) { expand(1); } - fds_[len_].fd = mysql_get_socket(mmsd->mysql); + if (_events > 0) { + assert(mmsd->mysql); + fds_[len_].fd = mysql_get_socket(mmsd->mysql); + } else { + fds_[len_].fd = -1; // will be set in event_loop phase 1 + } fds_[len_].events = _events; fds_[len_].revents = 0; mmsds_[len_] = mmsd; len_++; - mmsd->init_async(); - mmsd->task_handler(-1, _events); + if (_events > 0) { + mmsd->task_handler(-1, _events); + } } void remove_index_fast(unsigned int i) { @@ -7120,13 +7136,17 @@ public: bool event_loop(int poll_timeout_ms, Process_Ready_Task_Callback_Args& process_ready_task_callback_arg) { - if (len_ == 0) - return false; + if (len_ == 0) return false; - int rc = 0; + // Snapshot the number of tasks that are currently queued for this event loop iteration. + // len_ will change as tasks complete and are removed below. + const unsigned int total_tasks = len_; - // number of tasks to process based on provided percentage - unsigned int tasks_to_process_count = len_ * process_ready_task_callback_arg.process_task_percentage_; + // Number of tasks to send in each batch. + const unsigned int send_batch_size = process_ready_task_callback_arg.max_task_to_send_; + + // Determine how many tasks to process before flushing the ready queue to the callback. + unsigned int tasks_to_process_count = total_tasks * process_ready_task_callback_arg.process_task_percentage_; // if number of task to process is less than minimum task to process, overwrite it if (tasks_to_process_count < process_ready_task_callback_arg.min_task_to_process_) { @@ -7136,14 +7156,42 @@ public: std::vector ready_tasks; ready_tasks.reserve(tasks_to_process_count); + unsigned int total_sent = 0; while (len_) { if (GloMyMon->shutdown) { return false; } - rc = poll(fds_, len_, poll_timeout_ms); + // Phase 1: proactively arm inactive sockets by initializing their poll entries + // and triggering the first async step for up to `min_task_to_send` tasks. + unsigned int sockets_armed = 0; + for (unsigned int idx = 0; idx < len_ && sockets_armed < send_batch_size; ++idx) { + struct pollfd& poll_entry = fds_[idx]; + + // Skip already-armed entries + if (poll_entry.fd != -1) + continue; + + // Arm socket + poll_entry.fd = mysql_get_socket(mmsds_[idx]->mysql); + assert(poll_entry.fd != -1); + poll_entry.events = (POLLIN | POLLOUT | POLLPRI); + poll_entry.revents = 0; + + // Kick off the task state machine + mmsds_[idx]->task_handler(-1, poll_entry.events); + ++sockets_armed; + } + total_sent += sockets_armed; + + proxy_debug(PROXY_DEBUG_MONITOR, 7, + "Phase 1: armed %u sockets (this batch), total armed=%u, total tasks=%u\n", + sockets_armed, total_sent, total_tasks); + // If we sent all tasks, use the caller poll timeout; otherwise poll immediately (timeout=0) + int poll_timeout = (total_sent == total_tasks) ? poll_timeout_ms : 0; + int rc = poll(fds_, len_, poll_timeout); if (rc == -1) { if (errno == EINTR) { continue; @@ -7152,6 +7200,8 @@ public: } } + // Phase 2: collect completed tasks and batch deliver to the callback once the + // tasks_to_process_count threshold is met (or no tasks remain). for (unsigned int i = 0; i < len_;) { if (mmsds_[i]->task_handler(fds_[i].revents, fds_[i].events) != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING) { @@ -7164,31 +7214,32 @@ public: tasks_to_process_count--; + // Flush the batch when threshold reached or no more tasks remain. if (tasks_to_process_count == 0 || len_ == 0) { - + proxy_debug(PROXY_DEBUG_MONITOR, 7, "Phase 2: Starting processing of %zu ready tasks\n", ready_tasks.size()); if (process_ready_task_callback_arg.process_ready_tasks(ready_tasks) == false) { return false; } - ready_tasks.clear(); + // Recompute threshold against the current remaining queue length. tasks_to_process_count = len_ * process_ready_task_callback_arg.process_task_percentage_; - if (tasks_to_process_count < process_ready_task_callback_arg.min_task_to_process_) { tasks_to_process_count = process_ready_task_callback_arg.min_task_to_process_; } } continue; } else { - assert(fds_[i].events != 0); + // Not ready yet; ensure we keep polling for its events. + if (fds_[i].fd != -1) + assert(fds_[i].events != 0); } fds_[i].revents = 0; i++; } - } - + assert(ready_tasks.empty()); return true; } @@ -7222,7 +7273,7 @@ MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::task_handler(shor #else const unsigned long long now = monotonic_time(); #endif - if (now > task_expiry_time_) { + if (task_expiry_time_ > 0 && now > task_expiry_time_) { #ifdef DEBUG mark_task_as_timeout((GloMyMon->proxytest_forced_timeout == false) ? now : monotonic_time()); #else @@ -7300,80 +7351,120 @@ __again: return result; } -bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector& mmsds) { - - for (auto& mmsd : mmsds) { - const auto task_result = mmsd->get_task_result(); +void* monitor_ping_process_ready_task_thread(const std::vector& ready_mmsds) { + if (ready_mmsds.empty()) return NULL; + + SQLite3DB* monitor_db = ready_mmsds.front()->mondb; + int rc; + sqlite3_stmt* statement1 = NULL; + sqlite3_stmt* statement32 = NULL; + + const char* query1 = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1, ?2, ?3, ?4, ?5)"; + std::string query32s = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES " + generate_multi_rows_query(32, 5); + const char* query32 = query32s.c_str(); + + rc = monitor_db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, monitor_db); + + rc = monitor_db->prepare_v2(query32, &statement32); + ASSERT_SQLITE_OK(rc, monitor_db); + + size_t row_idx = 0; + size_t max_bulk_row_idx = (ready_mmsds.size() / 32) * 32; + for (const auto& mmsd : ready_mmsds) { + const auto task_result = mmsd->get_task_result(); assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING); - string server_version = ""; - if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version); + std::string server_version{}; + if (mmsd->mysql && mmsd->mysql->server_version) + server_version = std::string(mmsd->mysql->server_version); if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) { - __sync_fetch_and_add(&ping_check_OK, 1); - My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); + __sync_fetch_and_add(&GloMyMon->ping_check_OK, 1); + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); mmsd->mysql = NULL; } else { - __sync_fetch_and_add(&ping_check_ERR, 1); + __sync_fetch_and_add(&GloMyMon->ping_check_ERR, 1); if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_TIMEOUT) { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_PING_TIMEOUT); - proxy_error("Timeout on ping check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_ping_timeout.\n", mmsd->hostname, mmsd->port, (mmsd->t2 - mmsd->t1) / 1000); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, + mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_PING_TIMEOUT); + proxy_error("Timeout on ping check for %s:%d after %lldms.\n", + mmsd->hostname, mmsd->port, (mmsd->t2 - mmsd->t1) / 1000); } else { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); -#ifdef DEBUG - proxy_error("Error after %lldms: server %s:%d , mmsd %p , MYSQL %p , FD %d : %s\n", (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, mmsd, mmsd->mysql, mmsd->mysql->net.fd, (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "")); -#else - proxy_error("Error after %lldms on server %s:%d : %s\n", (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "")); -#endif // DEBUG + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, + mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); + proxy_error("Error after %lldms on server %s:%d : %s\n", + (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, + (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "")); } -//#ifdef DEBUG -// My_Conn_Pool->conn_unregister(mmsd); -//#endif // DEBUG mysql_close(mmsd->mysql); mmsd->mysql = NULL; } - if (shutdown == true) { - return false; - } + if (GloMyMon->shutdown) + return NULL; - sqlite3_stmt* statement = NULL; - const char* query = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)"; - int rc = mmsd->mondb->prepare_v2(query, &statement); - ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb); unsigned long long time_now = realtime_time(); time_now = time_now - (mmsd->t2 - mmsd->t1); - rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_bind_text)(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb); - SAFE_SQLITE3_STEP2(statement); - rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb); - (*proxy_sqlite3_finalize)(statement); - if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) != NULL) { - ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port)); - } else { - if (ReadySet_Servers.size() > 0) { // optimization . The following section is skipped if there are no servers - ServerInfo searchServer(mmsd->hostname, mmsd->port); - if (ReadySet_Servers.count(searchServer) > 0) { - ReadySet_Servers.erase(searchServer); - } + unsigned long long duration = (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1); + const char* errmsg = mmsd->mysql_error_msg; + + size_t idx = row_idx % 32; + + if (row_idx < max_bulk_row_idx) { + // --- bulk insert path --- + rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 5) + 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int)(statement32, (idx * 5) + 2, mmsd->port); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 5) + 3, time_now); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 5) + 4, duration); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 5) + 5, errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + + // execute when we reach 32 rows + if (idx == 31) { + SAFE_SQLITE3_STEP2(statement32); + rc = (*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, monitor_db); } + } else { + // --- single insert path --- + rc = (*proxy_sqlite3_bind_text)(statement1, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int)(statement1, 2, mmsd->port); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 3, time_now); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 4, duration); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_bind_text)(statement1, 5, errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db); + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, monitor_db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, monitor_db); } + + if (strcasestr(server_version.c_str(), (const char*)SERVER_VERSION_READYSET) != NULL) + ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port)); + else if (!ReadySet_Servers.empty()) + ReadySet_Servers.erase(ServerInfo(mmsd->hostname, mmsd->port)); + + row_idx++; } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement32); + + return NULL; +} + +bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector& ready_mmsds) { + // Ensure no null tasks are dispatched to the thread pool + if (!ready_mmsds.empty()) { + WorkItem* item = new WorkItem(ready_mmsds, monitor_ping_process_ready_task_thread); + queue->add(item); + } return true; } void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) { assert(resultset); - std::vector> mmsds; - mmsds.reserve(resultset->rows_count); Monitor_Poll monitor_poll(resultset->rows_count); for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { @@ -7385,8 +7476,8 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) { mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { - monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); - mmsds.push_back(std::move(mmsd)); + // Register the task; don't dispatch it yet. + monitor_poll.add(0, mmsd.release()); } else { WorkItem* item = new WorkItem(mmsd.release(), monitor_ping_thread); @@ -7396,7 +7487,7 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) { if (shutdown) return; } - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_ping_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_ping_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_ping_timeout, args) == false) { return; @@ -7717,7 +7808,8 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { - monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); + // Register the task; don't dispatch it yet. + monitor_poll.add(0, mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item = @@ -7729,7 +7821,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d if (shutdown) return; } - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_read_only_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_read_only_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_read_only_timeout, args) == false) { return; @@ -7923,7 +8015,8 @@ void MySQL_Monitor::monitor_group_replication_async() { mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { - monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); + // Register the task; don't dispatch it yet. + monitor_poll.add(0, mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item = @@ -7939,7 +8032,7 @@ void MySQL_Monitor::monitor_group_replication_async() { } pthread_mutex_unlock(&group_replication_mutex); - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_group_replication_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_group_replication_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_timeout, args) == false) { return; @@ -7964,11 +8057,12 @@ void MySQL_Monitor::monitor_gr_async_actions_handler( Monitor_Poll monitor_poll(mmsds.size()); for (const unique_ptr& mmsd : mmsds) { - monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); + // Register the task; don't dispatch it yet. + monitor_poll.add(0, mmsd.get()); } Monitor_Poll::Process_Ready_Task_Callback_Args args( - mmsds.size(), 100, &MySQL_Monitor::monitor_group_replication_process_ready_tasks_2, this + mmsds.size(), mmsds.size(), 100, &MySQL_Monitor::monitor_group_replication_process_ready_tasks_2, this ); if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_timeout, args) == false) { @@ -8181,7 +8275,8 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) { mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); if (mmsd->mysql) { - monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); + // Register the task; don't dispatch it yet. + monitor_poll.add(0, mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item = @@ -8193,7 +8288,7 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) { if (shutdown) return; } - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_replication_lag_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_replication_lag_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_replication_lag_timeout, args) == false) { return; @@ -8489,7 +8584,8 @@ void MySQL_Monitor::monitor_galera_async() { mmsd->mondb = monitordb; if (mmsd->mysql) { - monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); + // Register the task; don't dispatch it yet. + monitor_poll.add(0, mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item = @@ -8505,7 +8601,7 @@ void MySQL_Monitor::monitor_galera_async() { } pthread_mutex_unlock(&galera_mutex); - Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_galera_process_ready_tasks, this); + Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_galera_process_ready_tasks, this); if (monitor_poll.event_loop(mysql_thread___monitor_galera_healthcheck_timeout, args) == false) { return;