From 50c60284e6052742da1534e0aee5a0a72b6bff03 Mon Sep 17 00:00:00 2001 From: Wazir Ahmed Date: Sun, 14 Sep 2025 10:38:49 +0530 Subject: [PATCH] gtid: Refactor reconnect logic & prevent `events_count` reset - This patch was originally added by commit 0a70fd5 and reverted by 8d1b5b5, prior to the release of `v3.0.3`. - The following issues are addressed in this update, - Fix for `use-after-free` issue which occured during CI test. - Fix for deadlock issue between `GTID_syncer` and `MySQL_Worker`. Signed-off-by: Wazir Ahmed --- include/GTID_Server_Data.h | 9 ++ include/btree.h | 1 + include/proxysql_gtid.h | 1 + lib/GTID_Server_Data.cpp | 170 ++++++++++++++++--------------- lib/MySQL_HostGroups_Manager.cpp | 144 ++++++++++++-------------- 5 files changed, 167 insertions(+), 158 deletions(-) diff --git a/include/GTID_Server_Data.h b/include/GTID_Server_Data.h index 9bc9219fd..0c4ca00cb 100644 --- a/include/GTID_Server_Data.h +++ b/include/GTID_Server_Data.h @@ -1,5 +1,11 @@ #ifndef CLASS_GTID_Server_Data_H #define CLASS_GTID_Server_Data_H + +#include +#include +#include +#include + class GTID_Server_Data { public: char *address; @@ -24,4 +30,7 @@ class GTID_Server_Data { void read_all_gtids(); void dump(); }; + +bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end); + #endif // CLASS_GTID_Server_Data_H diff --git a/include/btree.h b/include/btree.h index cd54d2fbe..02cee7fbc 100644 --- a/include/btree.h +++ b/include/btree.h @@ -104,6 +104,7 @@ #include #include #include +#include #include #include #include diff --git a/include/proxysql_gtid.h b/include/proxysql_gtid.h index b2e11614c..836e95c7a 100644 --- a/include/proxysql_gtid.h +++ b/include/proxysql_gtid.h @@ -2,6 +2,7 @@ #define PROXYSQL_GTID // highly inspired by libslave // https://github.com/vozbu/libslave/ +#include #include #include #include diff --git a/lib/GTID_Server_Data.cpp b/lib/GTID_Server_Data.cpp index c05828398..0e8146fb1 100644 --- a/lib/GTID_Server_Data.cpp +++ b/lib/GTID_Server_Data.cpp @@ -1,3 +1,4 @@ +#include "GTID_Server_Data.h" #include "MySQL_HostGroups_Manager.h" #include "ev.h" @@ -47,21 +48,14 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { bool rc = true; rc = sd->readall(); if (rc == false) { - //delete sd; - std::string s1 = sd->address; - s1.append(":"); - s1.append(std::to_string(sd->mysql_port)); MyHGM->gtid_missing_nodes = true; - proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); - std::unordered_map ::iterator it2; - it2 = MyHGM->gtid_map.find(s1); - if (it2 != MyHGM->gtid_map.end()) { - //MyHGM->gtid_map.erase(it2); - it2->second = NULL; - delete sd; - } + sd->active = false; + proxy_warning("GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); + ev_io_stop(MyHGM->gtid_ev_loop, w); + close(w->fd); free(w); + sd->w = nullptr; } else { sd->dump(); } @@ -71,48 +65,37 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { void connect_cb(EV_P_ ev_io *w, int revents) { pthread_mutex_lock(&ev_loop_mutex); - struct ev_io * c = w; if (revents & EV_WRITE) { - int optval = 0; - socklen_t optlen = sizeof(optval); - if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) || - (optval != 0)) { - /* Connection failed; try the next address in the list. */ - //int errnum = optval ? optval : errno; - ev_io_stop(MyHGM->gtid_ev_loop, w); - close(w->fd); + int fd = w->fd; + GTID_Server_Data *sd = (GTID_Server_Data *)w->data; + + // connect() completed, this watcher is no longer needed + ev_io_stop(MyHGM->gtid_ev_loop, w); + free(w); + sd->w = nullptr; + + // Based on fd status, proceed to next step -> waiting for read event on the socket + int error = 0; + socklen_t optlen = sizeof(error); + int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen); + if (rc == -1 || error != 0) { + /* connection failed */ MyHGM->gtid_missing_nodes = true; - GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data; - GTID_Server_Data *sd = custom_data; - std::string s1 = sd->address; - s1.append(":"); - s1.append(std::to_string(sd->mysql_port)); + sd->active = false; proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); - std::unordered_map ::iterator it2; - it2 = MyHGM->gtid_map.find(s1); - if (it2 != MyHGM->gtid_map.end()) { - //MyHGM->gtid_map.erase(it2); - it2->second = NULL; - delete sd; - } - //delete custom_data; - free(c); + close(fd); } else { - ev_io_stop(MyHGM->gtid_ev_loop, w); - int fd=w->fd; - struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io)); - new_w->data = w->data; - GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data; - custom_data->w = new_w; - free(w); - ev_io_init(new_w, reader_cb, fd, EV_READ); - ev_io_start(MyHGM->gtid_ev_loop, new_w); + struct ev_io *read_watcher = (struct ev_io *) malloc(sizeof(struct ev_io)); + read_watcher->data = sd; + sd->w = read_watcher; + ev_io_init(read_watcher, reader_cb, fd, EV_READ); + ev_io_start(MyHGM->gtid_ev_loop, read_watcher); } } pthread_mutex_unlock(&ev_loop_mutex); } -struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) { +struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port) { int s; if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { @@ -149,8 +132,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io)); if (c) { ev_io_init(c, connect_cb, s, EV_WRITE); - GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port); - c->data = (void *)custom_data; return c; } /* else error */ @@ -158,8 +139,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p return NULL; } - - GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) { active = true; w = _w; @@ -188,26 +167,32 @@ GTID_Server_Data::~GTID_Server_Data() { } bool GTID_Server_Data::readall() { - bool ret = true; if (size == len) { // buffer is full, expand - resize(len*2); + resize(len * 2); } + int rc = 0; - rc = read(w->fd,data+len,size-len); + rc = read(w->fd, data+len, size-len); if (rc > 0) { len += rc; + return true; + } + + int myerr = errno; + if (rc == 0) { + proxy_info("Read returned EOF\n"); + return false; + } + + // rc == -1 + proxy_error("Read failed, error %d\n", myerr); + if(myerr == EINTR || myerr == EAGAIN) { + // non-blocking fd, so this should not be considered as an error + return true; } else { - int myerr = errno; - proxy_error("Read returned %d bytes, error %d\n", rc, myerr); - if ( - (rc == 0) || - (rc==-1 && myerr != EINTR && myerr != EAGAIN) - ) { - ret = false; - } + return false; } - return ret; } @@ -239,9 +224,6 @@ void GTID_Server_Data::dump() { return; } read_all_gtids(); - //int rc = write(1,data+pos,len-pos); - fflush(stdout); - ///pos += rc; if (pos >= len/2) { memmove(data,data+pos,len-pos); len = len-pos; @@ -285,13 +267,12 @@ bool GTID_Server_Data::read_next_gtid() { bs[l-3] = '\0'; char *saveptr1=NULL; char *saveptr2=NULL; - //char *saveptr3=NULL; char *token = NULL; char *subtoken = NULL; - //char *subtoken2 = NULL; char *str1 = NULL; char *str2 = NULL; - //char *str3 = NULL; + bool updated = false; + for (str1 = bs; ; str1 = NULL) { token = strtok_r(str1, ",", &saveptr1); if (token == NULL) { @@ -312,34 +293,30 @@ bool GTID_Server_Data::read_next_gtid() { p++; } } - //fprintf(stdout,"BS from %s\n", uuid_server); } else { // we are reading the trxids uint64_t trx_from; uint64_t trx_to; sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to); - //fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to); - std::string s = uuid_server; - gtid_executed[s].emplace_back(trx_from, trx_to); + updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated; } } } pos += l+1; free(bs); - //return true; + + if (updated) { + events_read++; + } } else { strncpy(rec_msg,data+pos,l); pos += l+1; rec_msg[l] = 0; - //int rc = write(1,data+pos,l+1); - //fprintf(stdout,"%s\n", rec_msg); if (rec_msg[0]=='I') { - //char rec_uuid[80]; uint64_t rec_trxid = 0; char *a = NULL; int ul = 0; switch (rec_msg[1]) { case '1': - //sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid); a = strchr(rec_msg+3,':'); ul = a-rec_msg-3; strncpy(uuid_server,rec_msg+3,ul); @@ -347,21 +324,17 @@ bool GTID_Server_Data::read_next_gtid() { rec_trxid=atoll(a+1); break; case '2': - //sscanf(rec_msg+3,"%lu",&rec_trxid); rec_trxid=atoll(rec_msg+3); break; default: break; } - //fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid); std::string s = uuid_server; gtid_t new_gtid = std::make_pair(s,rec_trxid); addGtid(new_gtid,gtid_executed); events_read++; - //return true; } } - //std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl; return true; } @@ -439,6 +412,43 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) { } } +bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) { + bool updated = true; + + auto it = gtid_executed.find(server_uuid); + if (it == gtid_executed.end()) { + gtid_executed[server_uuid].emplace_back(txid_start, txid_end); + return updated; + } + + bool insert = true; + + // When ProxySQL reconnects with binlog reader, it might + // receive updated txid intervals in the bootstrap message. + // For example, + // before disconnection -> server_UUID:1-10 + // after reconnection -> server_UUID:1-19 + auto &txid_intervals = it->second; + for (auto &interval : txid_intervals) { + if (interval.first == txid_start) { + if(interval.second == txid_end) { + updated = false; + } else { + interval.second = txid_end; + } + insert = false; + break; + } + } + + if (insert) { + txid_intervals.emplace_back(txid_start, txid_end); + + } + + return updated; +} + void * GTID_syncer_run() { //struct ev_loop * gtid_ev_loop; //gtid_ev_loop = NULL; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 7b579a231..4e5e2b136 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -59,7 +59,7 @@ class MyHGC; const int MYSQL_ERRORS_STATS_FIELD_NUM = 11; -struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port); +struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port); void * GTID_syncer_run(); static int wait_for_mysql(MYSQL *mysql, int status) { @@ -1681,79 +1681,76 @@ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uin } void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { + // NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place, + // e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed. + wrlock(); + pthread_rwlock_wrlock(>id_rwlock); - // first, set them all as active = false + + // first, add them all as stale entries + std::unordered_set stale_server; std::unordered_map::iterator it = gtid_map.begin(); while(it != gtid_map.end()) { - GTID_Server_Data * gtid_si = it->second; - if (gtid_si) { - gtid_si->active = false; - } + stale_server.emplace(it->first); it++; } - // NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place, - // e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed. - wrlock(); for (unsigned int i=0; ilen; i++) { MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); MySrvC *mysrvc=NULL; for (unsigned int j=0; jmysrvs->servers->len; j++) { mysrvc=myhgc->mysrvs->idx(j); if (mysrvc->gtid_port) { - std::string s1 = mysrvc->address; - s1.append(":"); - s1.append(std::to_string(mysrvc->port)); - std::unordered_map ::iterator it2; - it2 = gtid_map.find(s1); - GTID_Server_Data *gtid_is=NULL; - if (it2!=gtid_map.end()) { - gtid_is=it2->second; - if (gtid_is == NULL) { - gtid_map.erase(it2); - } + std::string srv = mysrvc->address; + srv.append(":"); + srv.append(std::to_string(mysrvc->port)); + + GTID_Server_Data *gtid_sd = nullptr; + it = gtid_map.find(srv); + if (it != gtid_map.end()) { + gtid_sd = it->second; + stale_server.erase(srv); } - if (gtid_is) { - gtid_is->active = true; - } else if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) { - // we didn't find it. Create it - /* - struct ev_io *watcher = (struct ev_io *)malloc(sizeof(struct ev_io)); - gtid_is = new GTID_Server_Data(watcher, mysrvc->address, mysrvc->port, mysrvc->gtid_port); - gtid_map.emplace(s1,gtid_is); - */ - struct ev_io * c = NULL; - c = new_connector(mysrvc->address, mysrvc->gtid_port, mysrvc->port); - if (c) { - gtid_is = (GTID_Server_Data *)c->data; - gtid_map.emplace(s1,gtid_is); - //pthread_mutex_lock(&ev_loop_mutex); - ev_io_start(MyHGM->gtid_ev_loop,c); - //pthread_mutex_unlock(&ev_loop_mutex); + + if (gtid_sd && gtid_sd->active) { + continue; + } + + if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) { + // a new server with gtid port + // OR an existing server, but we lost connection with binlog_reader + struct ev_io *cw = new_connect_watcher(mysrvc->address, mysrvc->gtid_port, mysrvc->port); + if (cw) { + if (!gtid_sd) { + gtid_sd = new GTID_Server_Data(cw, mysrvc->address, mysrvc->gtid_port, mysrvc->port); + cw->data = (void *)gtid_sd; + gtid_map.emplace(srv, gtid_sd); + } else { + gtid_sd->w = cw; + gtid_sd->active = true; + cw->data = (void *)gtid_sd; + } + ev_io_start(MyHGM->gtid_ev_loop, cw); } } } } } - wrunlock(); - std::vector to_remove; - it = gtid_map.begin(); - while(it != gtid_map.end()) { - GTID_Server_Data * gtid_si = it->second; - if (gtid_si && gtid_si->active == false) { - to_remove.push_back(it->first); + + for (auto &srv : stale_server) { + it = gtid_map.find(srv); + GTID_Server_Data *gtid_sd = it->second; + if (gtid_sd->w) { + ev_io_stop(MyHGM->gtid_ev_loop, gtid_sd->w); + close(gtid_sd->w->fd); + free(gtid_sd->w); } - it++; - } - for (std::vector::iterator it3=to_remove.begin(); it3!=to_remove.end(); ++it3) { - it = gtid_map.find(*it3); - GTID_Server_Data * gtid_si = it->second; - ev_io_stop(MyHGM->gtid_ev_loop, gtid_si->w); - close(gtid_si->w->fd); - free(gtid_si->w); - gtid_map.erase(*it3); + gtid_map.erase(srv); + delete gtid_sd; } + pthread_rwlock_unlock(>id_rwlock); + wrunlock(); } /** @@ -5807,48 +5804,39 @@ void MySQL_HostGroups_Manager::p_update_mysql_gtid_executed() { std::unordered_map::iterator it = gtid_map.begin(); while(it != gtid_map.end()) { - GTID_Server_Data* gtid_si = it->second; - std::string address {}; - std::string port {}; - std::string endpoint_id {}; - - if (gtid_si) { - address = std::string(gtid_si->address); - port = std::to_string(gtid_si->mysql_port); - } else { - std::string s = it->first; - std::size_t found = s.find_last_of(":"); - address = s.substr(0, found); - port = s.substr(found + 1); + GTID_Server_Data* gtid_sd = it->second; + if (!gtid_sd) { + // invalid state + continue; } - endpoint_id = address + ":" + port; - const auto& gitd_id_counter = this->status.p_gtid_executed_map.find(endpoint_id); - prometheus::Counter* gtid_counter = nullptr; + std::string endpoint = it->first; + std::string address = std::string(gtid_sd->address); + std::string port = std::to_string(gtid_sd->mysql_port); - if (gitd_id_counter == this->status.p_gtid_executed_map.end()) { - auto& gitd_counter = + prometheus::Counter* gtid_counter = nullptr; + const auto& pc_itr = this->status.p_gtid_executed_map.find(endpoint); + if (pc_itr == this->status.p_gtid_executed_map.end()) { + auto& gtid_counter_group = this->status.p_dyn_counter_array[p_hg_dyn_counter::gtid_executed]; - gtid_counter = std::addressof(gitd_counter->Add({ + gtid_counter = std::addressof(gtid_counter_group->Add({ { "hostname", address }, { "port", port }, })); this->status.p_gtid_executed_map.insert( { - endpoint_id, + endpoint, gtid_counter } ); } else { - gtid_counter = gitd_id_counter->second; + gtid_counter = pc_itr->second; } - if (gtid_si) { - const auto& cur_executed_gtid = gtid_counter->Value(); - gtid_counter->Increment(gtid_si->events_read - cur_executed_gtid); - } + const auto& cur_executed_gtid = gtid_counter->Value(); + gtid_counter->Increment(gtid_sd->events_read - cur_executed_gtid); it++; }