diff --git a/include/GTID_Server_Data.h b/include/GTID_Server_Data.h index 0c4ca00cb..9bc9219fd 100644 --- a/include/GTID_Server_Data.h +++ b/include/GTID_Server_Data.h @@ -1,11 +1,5 @@ #ifndef CLASS_GTID_Server_Data_H #define CLASS_GTID_Server_Data_H - -#include -#include -#include -#include - class GTID_Server_Data { public: char *address; @@ -30,7 +24,4 @@ class GTID_Server_Data { void read_all_gtids(); void dump(); }; - -bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end); - #endif // CLASS_GTID_Server_Data_H diff --git a/include/btree.h b/include/btree.h index 02cee7fbc..cd54d2fbe 100644 --- a/include/btree.h +++ b/include/btree.h @@ -104,7 +104,6 @@ #include #include #include -#include #include #include #include diff --git a/include/proxysql_gtid.h b/include/proxysql_gtid.h index 836e95c7a..b2e11614c 100644 --- a/include/proxysql_gtid.h +++ b/include/proxysql_gtid.h @@ -2,7 +2,6 @@ #define PROXYSQL_GTID // highly inspired by libslave // https://github.com/vozbu/libslave/ -#include #include #include #include diff --git a/lib/GTID_Server_Data.cpp b/lib/GTID_Server_Data.cpp index 828d1fd06..c05828398 100644 --- a/lib/GTID_Server_Data.cpp +++ b/lib/GTID_Server_Data.cpp @@ -1,4 +1,3 @@ -#include "GTID_Server_Data.h" #include "MySQL_HostGroups_Manager.h" #include "ev.h" @@ -48,12 +47,20 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { bool rc = true; rc = sd->readall(); if (rc == false) { + //delete sd; + std::string s1 = sd->address; + s1.append(":"); + s1.append(std::to_string(sd->mysql_port)); MyHGM->gtid_missing_nodes = true; - sd->active = false; - proxy_warning("GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); - + proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); + std::unordered_map ::iterator it2; + it2 = MyHGM->gtid_map.find(s1); + if (it2 != MyHGM->gtid_map.end()) { + //MyHGM->gtid_map.erase(it2); + it2->second = NULL; + delete sd; + } ev_io_stop(MyHGM->gtid_ev_loop, w); - close(w->fd); free(w); } else { sd->dump(); @@ -64,36 +71,48 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) { void connect_cb(EV_P_ ev_io *w, int revents) { pthread_mutex_lock(&ev_loop_mutex); + struct ev_io * c = w; if (revents & EV_WRITE) { - int fd = w->fd; - GTID_Server_Data *sd = (GTID_Server_Data *)w->data; - - // connect() completed, this watcher is no longer needed - ev_io_stop(MyHGM->gtid_ev_loop, w); - free(w); - - // Based on fd status, proceed to next step -> waiting for read event on the socket - int error = 0; - socklen_t optlen = sizeof(error); - int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen); - if (rc == -1 || error != 0) { - /* connection failed */ + int optval = 0; + socklen_t optlen = sizeof(optval); + if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) || + (optval != 0)) { + /* Connection failed; try the next address in the list. */ + //int errnum = optval ? optval : errno; + ev_io_stop(MyHGM->gtid_ev_loop, w); + close(w->fd); MyHGM->gtid_missing_nodes = true; - sd->active = false; + GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data; + GTID_Server_Data *sd = custom_data; + std::string s1 = sd->address; + s1.append(":"); + s1.append(std::to_string(sd->mysql_port)); proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); - close(fd); + std::unordered_map ::iterator it2; + it2 = MyHGM->gtid_map.find(s1); + if (it2 != MyHGM->gtid_map.end()) { + //MyHGM->gtid_map.erase(it2); + it2->second = NULL; + delete sd; + } + //delete custom_data; + free(c); } else { - struct ev_io *read_watcher = (struct ev_io *) malloc(sizeof(struct ev_io)); - read_watcher->data = sd; - sd->w = read_watcher; - ev_io_init(read_watcher, reader_cb, fd, EV_READ); - ev_io_start(MyHGM->gtid_ev_loop, read_watcher); + ev_io_stop(MyHGM->gtid_ev_loop, w); + int fd=w->fd; + struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io)); + new_w->data = w->data; + GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data; + custom_data->w = new_w; + free(w); + ev_io_init(new_w, reader_cb, fd, EV_READ); + ev_io_start(MyHGM->gtid_ev_loop, new_w); } } pthread_mutex_unlock(&ev_loop_mutex); } -struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port) { +struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) { int s; if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { @@ -130,6 +149,8 @@ struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t m struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io)); if (c) { ev_io_init(c, connect_cb, s, EV_WRITE); + GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port); + c->data = (void *)custom_data; return c; } /* else error */ @@ -137,6 +158,8 @@ struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t m return NULL; } + + GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) { active = true; w = _w; @@ -165,32 +188,26 @@ GTID_Server_Data::~GTID_Server_Data() { } bool GTID_Server_Data::readall() { + bool ret = true; if (size == len) { // buffer is full, expand - resize(len * 2); + resize(len*2); } - int rc = 0; - rc = read(w->fd, data+len, size-len); + rc = read(w->fd,data+len,size-len); if (rc > 0) { len += rc; - return true; - } - - int myerr = errno; - if (rc == 0) { - proxy_info("Read returned EOF\n"); - return false; - } - - // rc == -1 - proxy_error("Read failed, error %d\n", myerr); - if(myerr == EINTR || myerr == EAGAIN) { - // non-blocking fd, so this should not be considered as an error - return true; } else { - return false; + int myerr = errno; + proxy_error("Read returned %d bytes, error %d\n", rc, myerr); + if ( + (rc == 0) || + (rc==-1 && myerr != EINTR && myerr != EAGAIN) + ) { + ret = false; + } } + return ret; } @@ -222,6 +239,9 @@ void GTID_Server_Data::dump() { return; } read_all_gtids(); + //int rc = write(1,data+pos,len-pos); + fflush(stdout); + ///pos += rc; if (pos >= len/2) { memmove(data,data+pos,len-pos); len = len-pos; @@ -265,12 +285,13 @@ bool GTID_Server_Data::read_next_gtid() { bs[l-3] = '\0'; char *saveptr1=NULL; char *saveptr2=NULL; + //char *saveptr3=NULL; char *token = NULL; char *subtoken = NULL; + //char *subtoken2 = NULL; char *str1 = NULL; char *str2 = NULL; - bool updated = false; - + //char *str3 = NULL; for (str1 = bs; ; str1 = NULL) { token = strtok_r(str1, ",", &saveptr1); if (token == NULL) { @@ -291,30 +312,34 @@ bool GTID_Server_Data::read_next_gtid() { p++; } } + //fprintf(stdout,"BS from %s\n", uuid_server); } else { // we are reading the trxids uint64_t trx_from; uint64_t trx_to; sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to); - updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated; + //fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to); + std::string s = uuid_server; + gtid_executed[s].emplace_back(trx_from, trx_to); } } } pos += l+1; free(bs); - - if (updated) { - events_read++; - } + //return true; } else { strncpy(rec_msg,data+pos,l); pos += l+1; rec_msg[l] = 0; + //int rc = write(1,data+pos,l+1); + //fprintf(stdout,"%s\n", rec_msg); if (rec_msg[0]=='I') { + //char rec_uuid[80]; uint64_t rec_trxid = 0; char *a = NULL; int ul = 0; switch (rec_msg[1]) { case '1': + //sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid); a = strchr(rec_msg+3,':'); ul = a-rec_msg-3; strncpy(uuid_server,rec_msg+3,ul); @@ -322,17 +347,21 @@ bool GTID_Server_Data::read_next_gtid() { rec_trxid=atoll(a+1); break; case '2': + //sscanf(rec_msg+3,"%lu",&rec_trxid); rec_trxid=atoll(rec_msg+3); break; default: break; } + //fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid); std::string s = uuid_server; gtid_t new_gtid = std::make_pair(s,rec_trxid); addGtid(new_gtid,gtid_executed); events_read++; + //return true; } } + //std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl; return true; } @@ -410,43 +439,6 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) { } } -bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) { - bool updated = true; - - auto it = gtid_executed.find(server_uuid); - if (it == gtid_executed.end()) { - gtid_executed[server_uuid].emplace_back(txid_start, txid_end); - return updated; - } - - bool insert = true; - - // When ProxySQL reconnects with binlog reader, it might - // receive updated txid intervals in the bootstrap message. - // For example, - // before disconnection -> server_UUID:1-10 - // after reconnection -> server_UUID:1-19 - auto &txid_intervals = it->second; - for (auto &interval : txid_intervals) { - if (interval.first == txid_start) { - if(interval.second == txid_end) { - updated = false; - } else { - interval.second = txid_end; - } - insert = false; - break; - } - } - - if (insert) { - txid_intervals.emplace_back(txid_start, txid_end); - - } - - return updated; -} - void * GTID_syncer_run() { //struct ev_loop * gtid_ev_loop; //gtid_ev_loop = NULL; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 2babdf459..7b579a231 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -59,7 +59,7 @@ class MyHGC; const int MYSQL_ERRORS_STATS_FIELD_NUM = 11; -struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port); +struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port); void * GTID_syncer_run(); static int wait_for_mysql(MYSQL *mysql, int status) { @@ -1682,12 +1682,13 @@ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uin void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { pthread_rwlock_wrlock(>id_rwlock); - - // first, add them all as stale entries - std::unordered_set stale_server; + // first, set them all as active = false std::unordered_map::iterator it = gtid_map.begin(); while(it != gtid_map.end()) { - stale_server.emplace(it->first); + GTID_Server_Data * gtid_si = it->second; + if (gtid_si) { + gtid_si->active = false; + } it++; } @@ -1700,52 +1701,58 @@ void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { for (unsigned int j=0; jmysrvs->servers->len; j++) { mysrvc=myhgc->mysrvs->idx(j); if (mysrvc->gtid_port) { - std::string srv = mysrvc->address; - srv.append(":"); - srv.append(std::to_string(mysrvc->port)); - - GTID_Server_Data *gtid_sd = nullptr; - it = gtid_map.find(srv); - if (it != gtid_map.end()) { - gtid_sd = it->second; - stale_server.erase(srv); - } - - if (gtid_sd && gtid_sd->active) { - continue; + std::string s1 = mysrvc->address; + s1.append(":"); + s1.append(std::to_string(mysrvc->port)); + std::unordered_map ::iterator it2; + it2 = gtid_map.find(s1); + GTID_Server_Data *gtid_is=NULL; + if (it2!=gtid_map.end()) { + gtid_is=it2->second; + if (gtid_is == NULL) { + gtid_map.erase(it2); + } } - - if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) { - // a new server with gtid port - // OR an existing server, but we lost connection with binlog_reader - struct ev_io *cw = new_connect_watcher(mysrvc->address, mysrvc->gtid_port, mysrvc->port); - if (cw) { - if (!gtid_sd) { - gtid_sd = new GTID_Server_Data(cw, mysrvc->address, mysrvc->gtid_port, mysrvc->port); - cw->data = (void *)gtid_sd; - gtid_map.emplace(srv, gtid_sd); - } else { - gtid_sd->w = cw; - gtid_sd->active = true; - cw->data = (void *)gtid_sd; - } - ev_io_start(MyHGM->gtid_ev_loop, cw); + if (gtid_is) { + gtid_is->active = true; + } else if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) { + // we didn't find it. Create it + /* + struct ev_io *watcher = (struct ev_io *)malloc(sizeof(struct ev_io)); + gtid_is = new GTID_Server_Data(watcher, mysrvc->address, mysrvc->port, mysrvc->gtid_port); + gtid_map.emplace(s1,gtid_is); + */ + struct ev_io * c = NULL; + c = new_connector(mysrvc->address, mysrvc->gtid_port, mysrvc->port); + if (c) { + gtid_is = (GTID_Server_Data *)c->data; + gtid_map.emplace(s1,gtid_is); + //pthread_mutex_lock(&ev_loop_mutex); + ev_io_start(MyHGM->gtid_ev_loop,c); + //pthread_mutex_unlock(&ev_loop_mutex); } } } } } wrunlock(); - - for (auto &srv : stale_server) { - it = gtid_map.find(srv); - GTID_Server_Data *gtid_sd = it->second; - ev_io_stop(MyHGM->gtid_ev_loop, gtid_sd->w); - close(gtid_sd->w->fd); - free(gtid_sd->w); - gtid_map.erase(srv); + std::vector to_remove; + it = gtid_map.begin(); + while(it != gtid_map.end()) { + GTID_Server_Data * gtid_si = it->second; + if (gtid_si && gtid_si->active == false) { + to_remove.push_back(it->first); + } + it++; + } + for (std::vector::iterator it3=to_remove.begin(); it3!=to_remove.end(); ++it3) { + it = gtid_map.find(*it3); + GTID_Server_Data * gtid_si = it->second; + ev_io_stop(MyHGM->gtid_ev_loop, gtid_si->w); + close(gtid_si->w->fd); + free(gtid_si->w); + gtid_map.erase(*it3); } - pthread_rwlock_unlock(>id_rwlock); } @@ -5800,39 +5807,48 @@ void MySQL_HostGroups_Manager::p_update_mysql_gtid_executed() { std::unordered_map::iterator it = gtid_map.begin(); while(it != gtid_map.end()) { - GTID_Server_Data* gtid_sd = it->second; - if (!gtid_sd) { - // invalid state - continue; - } + GTID_Server_Data* gtid_si = it->second; + std::string address {}; + std::string port {}; + std::string endpoint_id {}; - std::string endpoint = it->first; - std::string address = std::string(gtid_sd->address); - std::string port = std::to_string(gtid_sd->mysql_port); + if (gtid_si) { + address = std::string(gtid_si->address); + port = std::to_string(gtid_si->mysql_port); + } else { + std::string s = it->first; + std::size_t found = s.find_last_of(":"); + address = s.substr(0, found); + port = s.substr(found + 1); + } + endpoint_id = address + ":" + port; + const auto& gitd_id_counter = this->status.p_gtid_executed_map.find(endpoint_id); prometheus::Counter* gtid_counter = nullptr; - const auto& pc_itr = this->status.p_gtid_executed_map.find(endpoint); - if (pc_itr == this->status.p_gtid_executed_map.end()) { - auto& gtid_counter_group = + + if (gitd_id_counter == this->status.p_gtid_executed_map.end()) { + auto& gitd_counter = this->status.p_dyn_counter_array[p_hg_dyn_counter::gtid_executed]; - gtid_counter = std::addressof(gtid_counter_group->Add({ + gtid_counter = std::addressof(gitd_counter->Add({ { "hostname", address }, { "port", port }, })); this->status.p_gtid_executed_map.insert( { - endpoint, + endpoint_id, gtid_counter } ); } else { - gtid_counter = pc_itr->second; + gtid_counter = gitd_id_counter->second; } - const auto& cur_executed_gtid = gtid_counter->Value(); - gtid_counter->Increment(gtid_sd->events_read - cur_executed_gtid); + if (gtid_si) { + const auto& cur_executed_gtid = gtid_counter->Value(); + gtid_counter->Increment(gtid_si->events_read - cur_executed_gtid); + } it++; }