From 6a80c3f2881139c2b8bff4622e1729adf314422c Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Sun, 9 Nov 2025 17:43:52 +0500 Subject: [PATCH] Implemented explicit task memory ownership management in Monitor_Poll. --- lib/MySQL_Monitor.cpp | 49 ++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index c8657ca4a..26f9549dc 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -119,7 +119,7 @@ class ConsumerThread : public Thread { // available to process. for (int i = 0; (thrn ? i < thrn : 1); i++) { //VALGRIND_DISABLE_ERROR_REPORTING; - WorkItem* item = m_queue.remove(); + WorkItem* item = static_cast*>(m_queue.remove()); //VALGRIND_ENABLE_ERROR_REPORTING; if (item == NULL) { if (thrn) { @@ -5182,7 +5182,7 @@ void * MySQL_Monitor::run() { __monitor_run: while (queue->size()) { // this is a clean up in case Monitor was restarted - WorkItem* item = queue->remove(); + WorkItem* item = static_cast*>(queue->remove()); if (item) { for (auto ptr : item->data) delete ptr; @@ -7056,14 +7056,18 @@ public: MySQL_Monitor* mysql_monitor_; }; - Monitor_Poll(unsigned int capacity) { + Monitor_Poll(unsigned int capacity, bool owns_task_memory = false) { len_ = 0; + owns_task_memory_ = owns_task_memory; // if true, this object takes ownership of task memory and will delete unprocessed tasks on destruction capacity_ = capacity; fds_ = (struct pollfd*)malloc(capacity_ * sizeof(struct pollfd)); mmsds_ = (MySQL_Monitor_State_Data**)malloc(capacity_ * sizeof(MySQL_Monitor_State_Data*)); } ~Monitor_Poll() { + if (owns_task_memory_) { + cleanup_unprocessed_tasks(); // free remaining unprocessed tasks + } free(fds_); free(mmsds_); } @@ -7079,24 +7083,16 @@ public: void add(short _events, MySQL_Monitor_State_Data* mmsd) { assert(mmsd); - + assert(_events); if (len_ == capacity_) { expand(1); } - if (_events > 0) { - assert(mmsd->mysql); - fds_[len_].fd = mysql_get_socket(mmsd->mysql); - } else { - fds_[len_].fd = -1; // will be set in event_loop phase 1 - } + fds_[len_].fd = -1; // will be set in event_loop phase 1 fds_[len_].events = _events; fds_[len_].revents = 0; mmsds_[len_] = mmsd; len_++; mmsd->init_async(); - if (_events > 0) { - mmsd->task_handler(-1, _events); - } } void remove_index_fast(unsigned int i) { @@ -7163,7 +7159,6 @@ public: // Arm socket poll_entry.fd = mysql_get_socket(mmsds_[idx]->mysql); assert(poll_entry.fd != -1); - poll_entry.events = (POLLIN | POLLOUT | POLLPRI); poll_entry.revents = 0; // Kick off the task state machine @@ -7242,10 +7237,22 @@ private: return i ? i : n; } + // Deletes any task objects that were not processed + inline + void cleanup_unprocessed_tasks() { + if (len_ == 0) return; + for (unsigned int i = 0; i < len_; ++i) { + delete mmsds_[i]; + mmsds_[i] = nullptr; + } + len_ = 0; + } + unsigned int len_; unsigned int capacity_; struct pollfd* fds_; MySQL_Monitor_State_Data** mmsds_; + bool owns_task_memory_; }; MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::task_handler(short event_, short& wait_event) { @@ -7452,7 +7459,7 @@ bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vectorrows_count); + Monitor_Poll monitor_poll(resultset->rows_count, true); for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { const SQLite3_row* r = *it; @@ -7464,7 +7471,7 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) { if (mmsd->mysql) { // Register the task; don't dispatch it yet. - monitor_poll.add(0, mmsd.release()); + monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.release()); } else { WorkItem* item = new WorkItem(mmsd.release(), monitor_ping_thread); @@ -7796,7 +7803,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d if (mmsd->mysql) { // Register the task; don't dispatch it yet. - monitor_poll.add(0, mmsd.get()); + monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item = @@ -8003,7 +8010,7 @@ void MySQL_Monitor::monitor_group_replication_async() { if (mmsd->mysql) { // Register the task; don't dispatch it yet. - monitor_poll.add(0, mmsd.get()); + monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item = @@ -8045,7 +8052,7 @@ void MySQL_Monitor::monitor_gr_async_actions_handler( for (const unique_ptr& mmsd : mmsds) { // Register the task; don't dispatch it yet. - monitor_poll.add(0, mmsd.get()); + monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); } Monitor_Poll::Process_Ready_Task_Callback_Args args( @@ -8263,7 +8270,7 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) { if (mmsd->mysql) { // Register the task; don't dispatch it yet. - monitor_poll.add(0, mmsd.get()); + monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item = @@ -8572,7 +8579,7 @@ void MySQL_Monitor::monitor_galera_async() { if (mmsd->mysql) { // Register the task; don't dispatch it yet. - monitor_poll.add(0, mmsd.get()); + monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get()); mmsds.push_back(std::move(mmsd)); } else { WorkItem* item =