Implemented explicit task memory ownership management in Monitor_Poll.

pull/5199/head
Rahim Kanji 5 months ago
parent 50fae0b02a
commit 6a80c3f288

@ -119,7 +119,7 @@ class ConsumerThread : public Thread {
// available to process.
for (int i = 0; (thrn ? i < thrn : 1); i++) {
//VALGRIND_DISABLE_ERROR_REPORTING;
WorkItem<T>* item = m_queue.remove();
WorkItem<T>* item = static_cast<WorkItem<T>*>(m_queue.remove());
//VALGRIND_ENABLE_ERROR_REPORTING;
if (item == NULL) {
if (thrn) {
@ -5182,7 +5182,7 @@ void * MySQL_Monitor::run() {
__monitor_run:
while (queue->size()) { // this is a clean up in case Monitor was restarted
WorkItem<MySQL_Monitor_State_Data>* item = queue->remove();
WorkItem<MySQL_Monitor_State_Data>* item = static_cast<WorkItem<MySQL_Monitor_State_Data>*>(queue->remove());
if (item) {
for (auto ptr : item->data)
delete ptr;
@ -7056,14 +7056,18 @@ public:
MySQL_Monitor* mysql_monitor_;
};
Monitor_Poll(unsigned int capacity) {
Monitor_Poll(unsigned int capacity, bool owns_task_memory = false) {
len_ = 0;
owns_task_memory_ = owns_task_memory; // if true, this object takes ownership of task memory and will delete unprocessed tasks on destruction
capacity_ = capacity;
fds_ = (struct pollfd*)malloc(capacity_ * sizeof(struct pollfd));
mmsds_ = (MySQL_Monitor_State_Data**)malloc(capacity_ * sizeof(MySQL_Monitor_State_Data*));
}
~Monitor_Poll() {
if (owns_task_memory_) {
cleanup_unprocessed_tasks(); // free remaining unprocessed tasks
}
free(fds_);
free(mmsds_);
}
@ -7079,24 +7083,16 @@ public:
void add(short _events, MySQL_Monitor_State_Data* mmsd) {
assert(mmsd);
assert(_events);
if (len_ == capacity_) {
expand(1);
}
if (_events > 0) {
assert(mmsd->mysql);
fds_[len_].fd = mysql_get_socket(mmsd->mysql);
} else {
fds_[len_].fd = -1; // will be set in event_loop phase 1
}
fds_[len_].fd = -1; // will be set in event_loop phase 1
fds_[len_].events = _events;
fds_[len_].revents = 0;
mmsds_[len_] = mmsd;
len_++;
mmsd->init_async();
if (_events > 0) {
mmsd->task_handler(-1, _events);
}
}
void remove_index_fast(unsigned int i) {
@ -7163,7 +7159,6 @@ public:
// Arm socket
poll_entry.fd = mysql_get_socket(mmsds_[idx]->mysql);
assert(poll_entry.fd != -1);
poll_entry.events = (POLLIN | POLLOUT | POLLPRI);
poll_entry.revents = 0;
// Kick off the task state machine
@ -7242,10 +7237,22 @@ private:
return i ? i : n;
}
// Deletes any task objects that were not processed
inline
void cleanup_unprocessed_tasks() {
if (len_ == 0) return;
for (unsigned int i = 0; i < len_; ++i) {
delete mmsds_[i];
mmsds_[i] = nullptr;
}
len_ = 0;
}
unsigned int len_;
unsigned int capacity_;
struct pollfd* fds_;
MySQL_Monitor_State_Data** mmsds_;
bool owns_task_memory_;
};
MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::task_handler(short event_, short& wait_event) {
@ -7452,7 +7459,7 @@ bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector<MySQL_Mon
void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) {
assert(resultset);
Monitor_Poll monitor_poll(resultset->rows_count);
Monitor_Poll monitor_poll(resultset->rows_count, true);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
const SQLite3_row* r = *it;
@ -7464,7 +7471,7 @@ void MySQL_Monitor::monitor_ping_async(SQLite3_result* resultset) {
if (mmsd->mysql) {
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.release());
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.release());
} else {
WorkItem<MySQL_Monitor_State_Data>* item
= new WorkItem<MySQL_Monitor_State_Data>(mmsd.release(), monitor_ping_thread);
@ -7796,7 +7803,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d
if (mmsd->mysql) {
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =
@ -8003,7 +8010,7 @@ void MySQL_Monitor::monitor_group_replication_async() {
if (mmsd->mysql) {
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =
@ -8045,7 +8052,7 @@ void MySQL_Monitor::monitor_gr_async_actions_handler(
for (const unique_ptr<MySQL_Monitor_State_Data>& mmsd : mmsds) {
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
}
Monitor_Poll::Process_Ready_Task_Callback_Args args(
@ -8263,7 +8270,7 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3_result* resultset) {
if (mmsd->mysql) {
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =
@ -8572,7 +8579,7 @@ void MySQL_Monitor::monitor_galera_async() {
if (mmsd->mysql) {
// Register the task; don't dispatch it yet.
monitor_poll.add(0, mmsd.get());
monitor_poll.add((POLLIN|POLLOUT|POLLPRI), mmsd.get());
mmsds.push_back(std::move(mmsd));
} else {
WorkItem<MySQL_Monitor_State_Data>* item =

Loading…
Cancel
Save