Improve ping accuracy

- Introduced batching for ping task dispatch (default: 30 servers per batch)
- Moved mysql_server_ping_log writes to a thread pool
pull/5199/head
Rahim Kanji 5 months ago
parent cce161bc74
commit 1c3c42959c

@ -334,13 +334,19 @@ private:
template<typename T>
class WorkItem {
public:
T *data;
void *(*routine) (void *);
WorkItem(T*_data, void *(*start_routine) (void *)) {
data=_data;
routine=start_routine;
}
~WorkItem() {}
std::vector<T*> data;
using entry_point = void *(*)(const std::vector<T*>& data);
entry_point start_routine;
WorkItem(T*_data, entry_point _start_routine) {
data.push_back(_data);
start_routine = _start_routine;
}
WorkItem(std::vector<T*>&& _data, entry_point _start_routine)
: data(std::move(_data)), start_routine(_start_routine) {}
WorkItem(const std::vector<T*>& _data, entry_point _start_routine)
: data(_data), start_routine(_start_routine) {
}
~WorkItem() = default;
};
struct p_mon_counter {

@ -119,7 +119,7 @@ class ConsumerThread : public Thread {
// available to process.
for (int i = 0; (thrn ? i < thrn : 1); i++) {
//VALGRIND_DISABLE_ERROR_REPORTING;
WorkItem<T>* item = (WorkItem<T>*)m_queue.remove();
WorkItem<T>* item = m_queue.remove();
//VALGRIND_ENABLE_ERROR_REPORTING;
if (item == NULL) {
if (thrn) {
@ -132,7 +132,7 @@ class ConsumerThread : public Thread {
}
if (item->routine) { // NULL is allowed, do nothing for it
if (item->start_routine) { // NULL is allowed, do nothing for it
bool me = true;
if (check_monitor_enabled_flag) {
@ -142,10 +142,13 @@ class ConsumerThread : public Thread {
}
if (me) {
item->routine((void *)item->data);
item->start_routine(item->data);
}
}
delete item->data;
for (auto ptr : item->data) {
delete ptr;
}
item->data.clear();
delete item;
}
cleanup:
@ -1356,9 +1359,10 @@ void MySQL_Monitor::update_monitor_proxysql_servers(SQLite3_result* resultset) {
pthread_mutex_unlock(&GloMyMon->proxysql_servers_mutex);
}
void * monitor_connect_thread(void *arg) {
void * monitor_connect_thread(const std::vector<MySQL_Monitor_State_Data*>& mmsds) {
assert(!mmsds.empty());
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Monitor_State_Data *mmsd = mmsds.front();
if (!GloMTH) return NULL; // quick exit during shutdown/restart
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
@ -1417,9 +1421,10 @@ void * monitor_connect_thread(void *arg) {
return NULL;
}
void * monitor_ping_thread(void *arg) {
void * monitor_ping_thread(const std::vector<MySQL_Monitor_State_Data*>& mmsds) {
assert(!mmsds.empty());
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Monitor_State_Data *mmsd = mmsds.front();
if (!GloMTH) return NULL; // quick exit during shutdown/restart
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
@ -1654,10 +1659,11 @@ bool MySQL_Monitor_State_Data::create_new_connection() {
return true;
}
void * monitor_read_only_thread(void *arg) {
void * monitor_read_only_thread(const std::vector<MySQL_Monitor_State_Data*>& data) {
assert(!data.empty());
mysql_close(mysql_init(NULL));
bool timeout_reached = false;
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Monitor_State_Data *mmsd = data.front();
if (!GloMTH) return NULL; // quick exit during shutdown/restart
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
@ -1935,9 +1941,10 @@ __fast_exit_monitor_read_only_thread:
return NULL;
}
void * monitor_group_replication_thread(void *arg) {
void * monitor_group_replication_thread(const std::vector<MySQL_Monitor_State_Data*>& data) {
assert(!data.empty());
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Monitor_State_Data *mmsd = data.front();
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
mysql_thr->refresh_variables();
@ -2286,9 +2293,10 @@ __fast_exit_monitor_group_replication_thread:
return NULL;
}
void * monitor_galera_thread(void *arg) {
void * monitor_galera_thread(const std::vector<MySQL_Monitor_State_Data*>& data) {
assert(!data.empty());
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Monitor_State_Data *mmsd = data.front();
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
mysql_thr->refresh_variables();
@ -2686,9 +2694,10 @@ __fast_exit_monitor_galera_thread:
return NULL;
}
void * monitor_replication_lag_thread(void *arg) {
void * monitor_replication_lag_thread(const std::vector<MySQL_Monitor_State_Data*>& data) {
assert(!data.empty());
mysql_close(mysql_init(NULL));
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Monitor_State_Data *mmsd = data.front();
if (!GloMTH) return NULL; // quick exit during shutdown/restart
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
@ -4756,9 +4765,9 @@ std::string debug_iplisttostring(const T& ips) {
return sstr.str();
}
void* monitor_dns_resolver_thread(void* args) {
DNS_Resolve_Data* dns_resolve_data = static_cast<DNS_Resolve_Data*>(args);
void* monitor_dns_resolver_thread(const std::vector<DNS_Resolve_Data*>& dns_resolve_data_list) {
assert(!dns_resolve_data_list.empty());
DNS_Resolve_Data* dns_resolve_data = dns_resolve_data_list.front();
struct addrinfo hints, *res = NULL;
@ -5186,11 +5195,11 @@ void * MySQL_Monitor::run() {
__monitor_run:
while (queue->size()) { // this is a clean up in case Monitor was restarted
WorkItem<MySQL_Monitor_State_Data>* item = (WorkItem<MySQL_Monitor_State_Data>*)queue->remove();
WorkItem<MySQL_Monitor_State_Data>* item = queue->remove();
if (item) {
if (item->data) {
delete item->data;
}
for (auto ptr : item->data)
delete ptr;
item->data.clear();
delete item;
}
}
@ -7034,9 +7043,10 @@ public:
public:
using process_ready_tasks_cb = bool (MySQL_Monitor::*)(const std::vector<MySQL_Monitor_State_Data*>& mmsds);
Process_Ready_Task_Callback_Args(unsigned int min_tasks_to_process, float percentage,
Process_Ready_Task_Callback_Args(unsigned int min_tasks_to_process, unsigned int max_task_to_send, float percentage,
process_ready_tasks_cb callback, MySQL_Monitor* mysql_monitor) :
min_task_to_process_(min_tasks_to_process), process_task_percentage_(percentage / 100.00), process_ready_tasks_cb_(callback),
min_task_to_process_(min_tasks_to_process), max_task_to_send_(max_task_to_send),
process_task_percentage_(percentage / 100.00), process_ready_tasks_cb_(callback),
mysql_monitor_(mysql_monitor) {
assert(mysql_monitor_);
assert(process_ready_tasks_cb_);
@ -7053,6 +7063,7 @@ public:
friend class Monitor_Poll;
unsigned int min_task_to_process_;
unsigned int max_task_to_send_;
float process_task_percentage_;
process_ready_tasks_cb process_ready_tasks_cb_;
MySQL_Monitor* mysql_monitor_;
@ -7081,19 +7092,24 @@ public:
void add(short _events, MySQL_Monitor_State_Data* mmsd) {
assert(mmsd);
assert(mmsd->mysql);
if (len_ == capacity_) {
expand(1);
}
fds_[len_].fd = mysql_get_socket(mmsd->mysql);
if (_events > 0) {
assert(mmsd->mysql);
fds_[len_].fd = mysql_get_socket(mmsd->mysql);
} else {
fds_[len_].fd = -1; // will be set in event_loop phase 1
}
fds_[len_].events = _events;
fds_[len_].revents = 0;
mmsds_[len_] = mmsd;
len_++;
mmsd->init_async();
mmsd->task_handler(-1, _events);
if (_events > 0) {
mmsd->task_handler(-1, _events);
}
}
void remove_index_fast(unsigned int i) {
@ -7120,13 +7136,17 @@ public:
bool event_loop(int poll_timeout_ms, Process_Ready_Task_Callback_Args& process_ready_task_callback_arg) {
if (len_ == 0)
return false;
if (len_ == 0) return false;
int rc = 0;
// Snapshot the number of tasks that are currently queued for this event loop iteration.
// len_ will change as tasks complete and are removed below.
const unsigned int total_tasks = len_;
// number of tasks to process based on provided percentage
unsigned int tasks_to_process_count = len_ * process_ready_task_callback_arg.process_task_percentage_;
// Number of tasks to send in each batch.
const unsigned int send_batch_size = process_ready_task_callback_arg.max_task_to_send_;
// Determine how many tasks to process before flushing the ready queue to the callback.
unsigned int tasks_to_process_count = total_tasks * process_ready_task_callback_arg.process_task_percentage_;
// if number of task to process is less than minimum task to process, overwrite it
if (tasks_to_process_count < process_ready_task_callback_arg.min_task_to_process_) {
@ -7136,14 +7156,42 @@ public:
std::vector<MySQL_Monitor_State_Data*> ready_tasks;
ready_tasks.reserve(tasks_to_process_count);
unsigned int total_sent = 0;
while (len_) {
if (GloMyMon->shutdown) {
return false;
}
rc = poll(fds_, len_, poll_timeout_ms);
// Phase 1: proactively arm inactive sockets by initializing their poll entries
// and triggering the first async step for up to `min_task_to_send` tasks.
unsigned int sockets_armed = 0;
for (unsigned int idx = 0; idx < len_ && sockets_armed < send_batch_size; ++idx) {
struct pollfd& poll_entry = fds_[idx];
// Skip already-armed entries
if (poll_entry.fd != -1)
continue;
// Arm socket
poll_entry.fd = mysql_get_socket(mmsds_[idx]->mysql);
assert(poll_entry.fd != -1);
poll_entry.events = (POLLIN | POLLOUT | POLLPRI);
poll_entry.revents = 0;
// Kick off the task state machine
mmsds_[idx]->task_handler(-1, poll_entry.events);
++sockets_armed;
}
total_sent += sockets_armed;
proxy_debug(PROXY_DEBUG_MONITOR, 7,
"Phase 1: armed %u sockets (this batch), total armed=%u, total tasks=%u\n",
sockets_armed, total_sent, total_tasks);
// If we sent all tasks, use the caller poll timeout; otherwise poll immediately (timeout=0)
int poll_timeout = (total_sent == total_tasks) ? poll_timeout_ms : 0;
int rc = poll(fds_, len_, poll_timeout);
if (rc == -1) {
if (errno == EINTR) {
continue;
@ -7152,6 +7200,8 @@ public:
}
}
// Phase 2: collect completed tasks and batch deliver to the callback once the
// tasks_to_process_count threshold is met (or no tasks remain).
for (unsigned int i = 0; i < len_;) {
if (mmsds_[i]->task_handler(fds_[i].revents, fds_[i].events) != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING) {
@ -7164,31 +7214,32 @@ public:
tasks_to_process_count--;
// Flush the batch when threshold reached or no more tasks remain.
if (tasks_to_process_count == 0 || len_ == 0) {
proxy_debug(PROXY_DEBUG_MONITOR, 7, "Phase 2: Starting processing of %zu ready tasks\n", ready_tasks.size());
if (process_ready_task_callback_arg.process_ready_tasks(ready_tasks) == false) {
return false;
}
ready_tasks.clear();
// Recompute threshold against the current remaining queue length.
tasks_to_process_count = len_ * process_ready_task_callback_arg.process_task_percentage_;
if (tasks_to_process_count < process_ready_task_callback_arg.min_task_to_process_) {
tasks_to_process_count = process_ready_task_callback_arg.min_task_to_process_;
}
}
continue;
} else {
assert(fds_[i].events != 0);
// Not ready yet; ensure we keep polling for its events.
if (fds_[i].fd != -1)
assert(fds_[i].events != 0);
}
fds_[i].revents = 0;
i++;
}
}
assert(ready_tasks.empty());
return true;
}
@ -7222,7 +7273,7 @@ MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::task_handler(shor
#else
const unsigned long long now = monotonic_time();
#endif
if (now > task_expiry_time_) {
if (task_expiry_time_ > 0 && now > task_expiry_time_) {
#ifdef DEBUG
mark_task_as_timeout((GloMyMon->proxytest_forced_timeout == false) ? now : monotonic_time());
#else
@ -7300,80 +7351,120 @@ __again:
return result;
}
bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector<MySQL_Monitor_State_Data*>& mmsds) {
for (auto& mmsd : mmsds) {
const auto task_result = mmsd->get_task_result();
void* monitor_ping_process_ready_task_thread(const std::vector<MySQL_Monitor_State_Data*>& ready_mmsds) {
if (ready_mmsds.empty()) return NULL;
SQLite3DB* monitor_db = ready_mmsds.front()->mondb;
int rc;
sqlite3_stmt* statement1 = NULL;
sqlite3_stmt* statement32 = NULL;
const char* query1 = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1, ?2, ?3, ?4, ?5)";
std::string query32s = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES " + generate_multi_rows_query(32, 5);
const char* query32 = query32s.c_str();
rc = monitor_db->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, monitor_db);
rc = monitor_db->prepare_v2(query32, &statement32);
ASSERT_SQLITE_OK(rc, monitor_db);
size_t row_idx = 0;
size_t max_bulk_row_idx = (ready_mmsds.size() / 32) * 32;
for (const auto& mmsd : ready_mmsds) {
const auto task_result = mmsd->get_task_result();
assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING);
string server_version = "";
if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version);
std::string server_version{};
if (mmsd->mysql && mmsd->mysql->server_version)
server_version = std::string(mmsd->mysql->server_version);
if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) {
__sync_fetch_and_add(&ping_check_OK, 1);
My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
__sync_fetch_and_add(&GloMyMon->ping_check_OK, 1);
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
mmsd->mysql = NULL;
} else {
__sync_fetch_and_add(&ping_check_ERR, 1);
__sync_fetch_and_add(&GloMyMon->ping_check_ERR, 1);
if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_TIMEOUT) {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_PING_TIMEOUT);
proxy_error("Timeout on ping check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_ping_timeout.\n", mmsd->hostname, mmsd->port, (mmsd->t2 - mmsd->t1) / 1000);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql,
mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_PING_TIMEOUT);
proxy_error("Timeout on ping check for %s:%d after %lldms.\n",
mmsd->hostname, mmsd->port, (mmsd->t2 - mmsd->t1) / 1000);
} else {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
#ifdef DEBUG
proxy_error("Error after %lldms: server %s:%d , mmsd %p , MYSQL %p , FD %d : %s\n", (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, mmsd, mmsd->mysql, mmsd->mysql->net.fd, (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : ""));
#else
proxy_error("Error after %lldms on server %s:%d : %s\n", (mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port, (mmsd->mysql_error_msg ? mmsd->mysql_error_msg : ""));
#endif // DEBUG
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql,
mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
proxy_error("Error after %lldms on server %s:%d : %s\n",
(mmsd->t2 - mmsd->t1) / 1000, mmsd->hostname, mmsd->port,
(mmsd->mysql_error_msg ? mmsd->mysql_error_msg : ""));
}
//#ifdef DEBUG
// My_Conn_Pool->conn_unregister(mmsd);
//#endif // DEBUG
mysql_close(mmsd->mysql);
mmsd->mysql = NULL;
}
if (shutdown == true) {
return false;
}
if (GloMyMon->shutdown)
return NULL;
sqlite3_stmt* statement = NULL;
const char* query = "INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)";
int rc = mmsd->mondb->prepare_v2(query, &statement);
ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now = realtime_time();
time_now = time_now - (mmsd->t2 - mmsd->t1);
rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_text)(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
SAFE_SQLITE3_STEP2(statement);
rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
(*proxy_sqlite3_finalize)(statement);
if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) != NULL) {
ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port));
} else {
if (ReadySet_Servers.size() > 0) { // optimization . The following section is skipped if there are no servers
ServerInfo searchServer(mmsd->hostname, mmsd->port);
if (ReadySet_Servers.count(searchServer) > 0) {
ReadySet_Servers.erase(searchServer);
}
unsigned long long duration = (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1);
const char* errmsg = mmsd->mysql_error_msg;
size_t idx = row_idx % 32;
if (row_idx < max_bulk_row_idx) {
// --- bulk insert path ---
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 5) + 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_int)(statement32, (idx * 5) + 2, mmsd->port); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 5) + 3, time_now); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 5) + 4, duration); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 5) + 5, errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db);
// execute when we reach 32 rows
if (idx == 31) {
SAFE_SQLITE3_STEP2(statement32);
rc = (*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, monitor_db);
}
} else {
// --- single insert path ---
rc = (*proxy_sqlite3_bind_text)(statement1, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_int)(statement1, 2, mmsd->port); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_int64)(statement1, 3, time_now); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_int64)(statement1, 4, duration); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_bind_text)(statement1, 5, errmsg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, monitor_db);
SAFE_SQLITE3_STEP2(statement1);
rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, monitor_db);
rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, monitor_db);
}
if (strcasestr(server_version.c_str(), (const char*)SERVER_VERSION_READYSET) != NULL)
ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port));
else if (!ReadySet_Servers.empty())
ReadySet_Servers.erase(ServerInfo(mmsd->hostname, mmsd->port));
row_idx++;
}
(*proxy_sqlite3_finalize)(statement1);
(*proxy_sqlite3_finalize)(statement32);
return NULL;
}
bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector<MySQL_Monitor_State_Data*>& ready_mmsds) {
// Ensure no null tasks are dispatched to the thread pool
if (!ready_mmsds.empty()) {
WorkItem<MySQL_Monitor_State_Data>* item = new WorkItem<MySQL_Monitor_State_Data>(ready_mmsds, monitor_ping_process_ready_task_thread);
queue->add(item);
}
return true;
}
void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) {
assert(resultset);
std::vector<std::unique_ptr<MySQL_Monitor_State_Data>> mmsds;
mmsds.reserve(resultset->rows_count);
Monitor_Poll monitor_poll(resultset->rows_count);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
@ -7385,8 +7476,8 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) {
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());
if (mmsd->mysql) {
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
mmsds.push_back(std::move(mmsd));
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.release());
} else {
WorkItem<MySQL_Monitor_State_Data>* item
= new WorkItem<MySQL_Monitor_State_Data>(mmsd.release(), monitor_ping_thread);
@ -7396,7 +7487,7 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) {
if (shutdown) return;
}
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_ping_process_ready_tasks, this);
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_ping_process_ready_tasks, this);
if (monitor_poll.event_loop(mysql_thread___monitor_ping_timeout, args) == false) {
return;
@ -7717,7 +7808,8 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());
if (mmsd->mysql) {
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =
@ -7729,7 +7821,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d
if (shutdown) return;
}
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_read_only_process_ready_tasks, this);
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_read_only_process_ready_tasks, this);
if (monitor_poll.event_loop(mysql_thread___monitor_read_only_timeout, args) == false) {
return;
@ -7923,7 +8015,8 @@ void MySQL_Monitor::monitor_group_replication_async() {
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());
if (mmsd->mysql) {
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =
@ -7939,7 +8032,7 @@ void MySQL_Monitor::monitor_group_replication_async() {
}
pthread_mutex_unlock(&group_replication_mutex);
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_group_replication_process_ready_tasks, this);
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_group_replication_process_ready_tasks, this);
if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_timeout, args) == false) {
return;
@ -7964,11 +8057,12 @@ void MySQL_Monitor::monitor_gr_async_actions_handler(
Monitor_Poll monitor_poll(mmsds.size());
for (const unique_ptr<MySQL_Monitor_State_Data>& mmsd : mmsds) {
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
}
Monitor_Poll::Process_Ready_Task_Callback_Args args(
mmsds.size(), 100, &MySQL_Monitor::monitor_group_replication_process_ready_tasks_2, this
mmsds.size(), mmsds.size(), 100, &MySQL_Monitor::monitor_group_replication_process_ready_tasks_2, this
);
if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_timeout, args) == false) {
@ -8181,7 +8275,8 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) {
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());
if (mmsd->mysql) {
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =
@ -8193,7 +8288,7 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) {
if (shutdown) return;
}
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_replication_lag_process_ready_tasks, this);
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_replication_lag_process_ready_tasks, this);
if (monitor_poll.event_loop(mysql_thread___monitor_replication_lag_timeout, args) == false) {
return;
@ -8489,7 +8584,8 @@ void MySQL_Monitor::monitor_galera_async() {
mmsd->mondb = monitordb;
if (mmsd->mysql) {
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =
@ -8505,7 +8601,7 @@ void MySQL_Monitor::monitor_galera_async() {
}
pthread_mutex_unlock(&galera_mutex);
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 50, &MySQL_Monitor::monitor_galera_process_ready_tasks, this);
Monitor_Poll::Process_Ready_Task_Callback_Args args(5, 30, 50, &MySQL_Monitor::monitor_galera_process_ready_tasks, this);
if (monitor_poll.event_loop(mysql_thread___monitor_galera_healthcheck_timeout, args) == false) {
return;

Loading…
Cancel
Save