gtid: Refactor reconnect logic & prevent `events_count` reset

- This patch was originally added by commit 0a70fd5 and
  reverted by 8d1b5b5, prior to the release of `v3.0.3`.
- The following issues are addressed in this update,
  - Fix for `use-after-free` issue which occured during CI test.
  - Fix for deadlock issue between `GTID_syncer` and `MySQL_Worker`.

Signed-off-by: Wazir Ahmed <wazir@proxysql.com>
refactor-gtid-events-count_doc
Wazir Ahmed 5 months ago
parent dd847a15e1
commit 50c60284e6

@ -1,5 +1,11 @@
#ifndef CLASS_GTID_Server_Data_H #ifndef CLASS_GTID_Server_Data_H
#define CLASS_GTID_Server_Data_H #define CLASS_GTID_Server_Data_H
#include <cstddef>
#include <cstdint>
#include <string>
#include <proxysql_gtid.h>
class GTID_Server_Data { class GTID_Server_Data {
public: public:
char *address; char *address;
@ -24,4 +30,7 @@ class GTID_Server_Data {
void read_all_gtids(); void read_all_gtids();
void dump(); void dump();
}; };
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end);
#endif // CLASS_GTID_Server_Data_H #endif // CLASS_GTID_Server_Data_H

@ -104,6 +104,7 @@
#include <stddef.h> #include <stddef.h>
#include <string.h> #include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include <cstdint>
#include <algorithm> #include <algorithm>
#include <functional> #include <functional>
#include <iostream> #include <iostream>

@ -2,6 +2,7 @@
#define PROXYSQL_GTID #define PROXYSQL_GTID
// highly inspired by libslave // highly inspired by libslave
// https://github.com/vozbu/libslave/ // https://github.com/vozbu/libslave/
#include <string>
#include <unordered_map> #include <unordered_map>
#include <list> #include <list>
#include <utility> #include <utility>

@ -1,3 +1,4 @@
#include "GTID_Server_Data.h"
#include "MySQL_HostGroups_Manager.h" #include "MySQL_HostGroups_Manager.h"
#include "ev.h" #include "ev.h"
@ -47,21 +48,14 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
bool rc = true; bool rc = true;
rc = sd->readall(); rc = sd->readall();
if (rc == false) { if (rc == false) {
//delete sd;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
MyHGM->gtid_missing_nodes = true; MyHGM->gtid_missing_nodes = true;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); sd->active = false;
std::unordered_map <string, GTID_Server_Data *>::iterator it2; proxy_warning("GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
ev_io_stop(MyHGM->gtid_ev_loop, w); ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
free(w); free(w);
sd->w = nullptr;
} else { } else {
sd->dump(); sd->dump();
} }
@ -71,48 +65,37 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
void connect_cb(EV_P_ ev_io *w, int revents) { void connect_cb(EV_P_ ev_io *w, int revents) {
pthread_mutex_lock(&ev_loop_mutex); pthread_mutex_lock(&ev_loop_mutex);
struct ev_io * c = w;
if (revents & EV_WRITE) { if (revents & EV_WRITE) {
int optval = 0; int fd = w->fd;
socklen_t optlen = sizeof(optval); GTID_Server_Data *sd = (GTID_Server_Data *)w->data;
if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) ||
(optval != 0)) { // connect() completed, this watcher is no longer needed
/* Connection failed; try the next address in the list. */ ev_io_stop(MyHGM->gtid_ev_loop, w);
//int errnum = optval ? optval : errno; free(w);
ev_io_stop(MyHGM->gtid_ev_loop, w); sd->w = nullptr;
close(w->fd);
// Based on fd status, proceed to next step -> waiting for read event on the socket
int error = 0;
socklen_t optlen = sizeof(error);
int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
if (rc == -1 || error != 0) {
/* connection failed */
MyHGM->gtid_missing_nodes = true; MyHGM->gtid_missing_nodes = true;
GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data; sd->active = false;
GTID_Server_Data *sd = custom_data;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port); proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2; close(fd);
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
//delete custom_data;
free(c);
} else { } else {
ev_io_stop(MyHGM->gtid_ev_loop, w); struct ev_io *read_watcher = (struct ev_io *) malloc(sizeof(struct ev_io));
int fd=w->fd; read_watcher->data = sd;
struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io)); sd->w = read_watcher;
new_w->data = w->data; ev_io_init(read_watcher, reader_cb, fd, EV_READ);
GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data; ev_io_start(MyHGM->gtid_ev_loop, read_watcher);
custom_data->w = new_w;
free(w);
ev_io_init(new_w, reader_cb, fd, EV_READ);
ev_io_start(MyHGM->gtid_ev_loop, new_w);
} }
} }
pthread_mutex_unlock(&ev_loop_mutex); pthread_mutex_unlock(&ev_loop_mutex);
} }
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) { struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port) {
int s; int s;
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
@ -149,8 +132,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io)); struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io));
if (c) { if (c) {
ev_io_init(c, connect_cb, s, EV_WRITE); ev_io_init(c, connect_cb, s, EV_WRITE);
GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port);
c->data = (void *)custom_data;
return c; return c;
} }
/* else error */ /* else error */
@ -158,8 +139,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
return NULL; return NULL;
} }
GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) { GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
active = true; active = true;
w = _w; w = _w;
@ -188,26 +167,32 @@ GTID_Server_Data::~GTID_Server_Data() {
} }
bool GTID_Server_Data::readall() { bool GTID_Server_Data::readall() {
bool ret = true;
if (size == len) { if (size == len) {
// buffer is full, expand // buffer is full, expand
resize(len*2); resize(len * 2);
} }
int rc = 0; int rc = 0;
rc = read(w->fd,data+len,size-len); rc = read(w->fd, data+len, size-len);
if (rc > 0) { if (rc > 0) {
len += rc; len += rc;
return true;
}
int myerr = errno;
if (rc == 0) {
proxy_info("Read returned EOF\n");
return false;
}
// rc == -1
proxy_error("Read failed, error %d\n", myerr);
if(myerr == EINTR || myerr == EAGAIN) {
// non-blocking fd, so this should not be considered as an error
return true;
} else { } else {
int myerr = errno; return false;
proxy_error("Read returned %d bytes, error %d\n", rc, myerr);
if (
(rc == 0) ||
(rc==-1 && myerr != EINTR && myerr != EAGAIN)
) {
ret = false;
}
} }
return ret;
} }
@ -239,9 +224,6 @@ void GTID_Server_Data::dump() {
return; return;
} }
read_all_gtids(); read_all_gtids();
//int rc = write(1,data+pos,len-pos);
fflush(stdout);
///pos += rc;
if (pos >= len/2) { if (pos >= len/2) {
memmove(data,data+pos,len-pos); memmove(data,data+pos,len-pos);
len = len-pos; len = len-pos;
@ -285,13 +267,12 @@ bool GTID_Server_Data::read_next_gtid() {
bs[l-3] = '\0'; bs[l-3] = '\0';
char *saveptr1=NULL; char *saveptr1=NULL;
char *saveptr2=NULL; char *saveptr2=NULL;
//char *saveptr3=NULL;
char *token = NULL; char *token = NULL;
char *subtoken = NULL; char *subtoken = NULL;
//char *subtoken2 = NULL;
char *str1 = NULL; char *str1 = NULL;
char *str2 = NULL; char *str2 = NULL;
//char *str3 = NULL; bool updated = false;
for (str1 = bs; ; str1 = NULL) { for (str1 = bs; ; str1 = NULL) {
token = strtok_r(str1, ",", &saveptr1); token = strtok_r(str1, ",", &saveptr1);
if (token == NULL) { if (token == NULL) {
@ -312,34 +293,30 @@ bool GTID_Server_Data::read_next_gtid() {
p++; p++;
} }
} }
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids } else { // we are reading the trxids
uint64_t trx_from; uint64_t trx_from;
uint64_t trx_to; uint64_t trx_to;
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to); sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to); updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated;
std::string s = uuid_server;
gtid_executed[s].emplace_back(trx_from, trx_to);
} }
} }
} }
pos += l+1; pos += l+1;
free(bs); free(bs);
//return true;
if (updated) {
events_read++;
}
} else { } else {
strncpy(rec_msg,data+pos,l); strncpy(rec_msg,data+pos,l);
pos += l+1; pos += l+1;
rec_msg[l] = 0; rec_msg[l] = 0;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if (rec_msg[0]=='I') { if (rec_msg[0]=='I') {
//char rec_uuid[80];
uint64_t rec_trxid = 0; uint64_t rec_trxid = 0;
char *a = NULL; char *a = NULL;
int ul = 0; int ul = 0;
switch (rec_msg[1]) { switch (rec_msg[1]) {
case '1': case '1':
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr(rec_msg+3,':'); a = strchr(rec_msg+3,':');
ul = a-rec_msg-3; ul = a-rec_msg-3;
strncpy(uuid_server,rec_msg+3,ul); strncpy(uuid_server,rec_msg+3,ul);
@ -347,21 +324,17 @@ bool GTID_Server_Data::read_next_gtid() {
rec_trxid=atoll(a+1); rec_trxid=atoll(a+1);
break; break;
case '2': case '2':
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid=atoll(rec_msg+3); rec_trxid=atoll(rec_msg+3);
break; break;
default: default:
break; break;
} }
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std::string s = uuid_server; std::string s = uuid_server;
gtid_t new_gtid = std::make_pair(s,rec_trxid); gtid_t new_gtid = std::make_pair(s,rec_trxid);
addGtid(new_gtid,gtid_executed); addGtid(new_gtid,gtid_executed);
events_read++; events_read++;
//return true;
} }
} }
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true; return true;
} }
@ -439,6 +412,43 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
} }
} }
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
bool updated = true;
auto it = gtid_executed.find(server_uuid);
if (it == gtid_executed.end()) {
gtid_executed[server_uuid].emplace_back(txid_start, txid_end);
return updated;
}
bool insert = true;
// When ProxySQL reconnects with binlog reader, it might
// receive updated txid intervals in the bootstrap message.
// For example,
// before disconnection -> server_UUID:1-10
// after reconnection -> server_UUID:1-19
auto &txid_intervals = it->second;
for (auto &interval : txid_intervals) {
if (interval.first == txid_start) {
if(interval.second == txid_end) {
updated = false;
} else {
interval.second = txid_end;
}
insert = false;
break;
}
}
if (insert) {
txid_intervals.emplace_back(txid_start, txid_end);
}
return updated;
}
void * GTID_syncer_run() { void * GTID_syncer_run() {
//struct ev_loop * gtid_ev_loop; //struct ev_loop * gtid_ev_loop;
//gtid_ev_loop = NULL; //gtid_ev_loop = NULL;

@ -59,7 +59,7 @@ class MyHGC;
const int MYSQL_ERRORS_STATS_FIELD_NUM = 11; const int MYSQL_ERRORS_STATS_FIELD_NUM = 11;
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port); struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port);
void * GTID_syncer_run(); void * GTID_syncer_run();
static int wait_for_mysql(MYSQL *mysql, int status) { static int wait_for_mysql(MYSQL *mysql, int status) {
@ -1681,79 +1681,76 @@ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uin
} }
void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() { void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() {
// NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place,
// e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed.
wrlock();
pthread_rwlock_wrlock(&gtid_rwlock); pthread_rwlock_wrlock(&gtid_rwlock);
// first, set them all as active = false
// first, add them all as stale entries
std::unordered_set<string> stale_server;
std::unordered_map<string, GTID_Server_Data *>::iterator it = gtid_map.begin(); std::unordered_map<string, GTID_Server_Data *>::iterator it = gtid_map.begin();
while(it != gtid_map.end()) { while(it != gtid_map.end()) {
GTID_Server_Data * gtid_si = it->second; stale_server.emplace(it->first);
if (gtid_si) {
gtid_si->active = false;
}
it++; it++;
} }
// NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place,
// e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed.
wrlock();
for (unsigned int i=0; i<MyHostGroups->len; i++) { for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
MySrvC *mysrvc=NULL; MySrvC *mysrvc=NULL;
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) { for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j); mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->gtid_port) { if (mysrvc->gtid_port) {
std::string s1 = mysrvc->address; std::string srv = mysrvc->address;
s1.append(":"); srv.append(":");
s1.append(std::to_string(mysrvc->port)); srv.append(std::to_string(mysrvc->port));
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = gtid_map.find(s1); GTID_Server_Data *gtid_sd = nullptr;
GTID_Server_Data *gtid_is=NULL; it = gtid_map.find(srv);
if (it2!=gtid_map.end()) { if (it != gtid_map.end()) {
gtid_is=it2->second; gtid_sd = it->second;
if (gtid_is == NULL) { stale_server.erase(srv);
gtid_map.erase(it2);
}
} }
if (gtid_is) {
gtid_is->active = true; if (gtid_sd && gtid_sd->active) {
} else if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) { continue;
// we didn't find it. Create it }
/*
struct ev_io *watcher = (struct ev_io *)malloc(sizeof(struct ev_io)); if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) {
gtid_is = new GTID_Server_Data(watcher, mysrvc->address, mysrvc->port, mysrvc->gtid_port); // a new server with gtid port
gtid_map.emplace(s1,gtid_is); // OR an existing server, but we lost connection with binlog_reader
*/ struct ev_io *cw = new_connect_watcher(mysrvc->address, mysrvc->gtid_port, mysrvc->port);
struct ev_io * c = NULL; if (cw) {
c = new_connector(mysrvc->address, mysrvc->gtid_port, mysrvc->port); if (!gtid_sd) {
if (c) { gtid_sd = new GTID_Server_Data(cw, mysrvc->address, mysrvc->gtid_port, mysrvc->port);
gtid_is = (GTID_Server_Data *)c->data; cw->data = (void *)gtid_sd;
gtid_map.emplace(s1,gtid_is); gtid_map.emplace(srv, gtid_sd);
//pthread_mutex_lock(&ev_loop_mutex); } else {
ev_io_start(MyHGM->gtid_ev_loop,c); gtid_sd->w = cw;
//pthread_mutex_unlock(&ev_loop_mutex); gtid_sd->active = true;
cw->data = (void *)gtid_sd;
}
ev_io_start(MyHGM->gtid_ev_loop, cw);
} }
} }
} }
} }
} }
wrunlock();
std::vector<string> to_remove; for (auto &srv : stale_server) {
it = gtid_map.begin(); it = gtid_map.find(srv);
while(it != gtid_map.end()) { GTID_Server_Data *gtid_sd = it->second;
GTID_Server_Data * gtid_si = it->second; if (gtid_sd->w) {
if (gtid_si && gtid_si->active == false) { ev_io_stop(MyHGM->gtid_ev_loop, gtid_sd->w);
to_remove.push_back(it->first); close(gtid_sd->w->fd);
free(gtid_sd->w);
} }
it++; gtid_map.erase(srv);
} delete gtid_sd;
for (std::vector<string>::iterator it3=to_remove.begin(); it3!=to_remove.end(); ++it3) {
it = gtid_map.find(*it3);
GTID_Server_Data * gtid_si = it->second;
ev_io_stop(MyHGM->gtid_ev_loop, gtid_si->w);
close(gtid_si->w->fd);
free(gtid_si->w);
gtid_map.erase(*it3);
} }
pthread_rwlock_unlock(&gtid_rwlock); pthread_rwlock_unlock(&gtid_rwlock);
wrunlock();
} }
/** /**
@ -5807,48 +5804,39 @@ void MySQL_HostGroups_Manager::p_update_mysql_gtid_executed() {
std::unordered_map<string, GTID_Server_Data*>::iterator it = gtid_map.begin(); std::unordered_map<string, GTID_Server_Data*>::iterator it = gtid_map.begin();
while(it != gtid_map.end()) { while(it != gtid_map.end()) {
GTID_Server_Data* gtid_si = it->second; GTID_Server_Data* gtid_sd = it->second;
std::string address {}; if (!gtid_sd) {
std::string port {}; // invalid state
std::string endpoint_id {}; continue;
if (gtid_si) {
address = std::string(gtid_si->address);
port = std::to_string(gtid_si->mysql_port);
} else {
std::string s = it->first;
std::size_t found = s.find_last_of(":");
address = s.substr(0, found);
port = s.substr(found + 1);
} }
endpoint_id = address + ":" + port;
const auto& gitd_id_counter = this->status.p_gtid_executed_map.find(endpoint_id); std::string endpoint = it->first;
prometheus::Counter* gtid_counter = nullptr; std::string address = std::string(gtid_sd->address);
std::string port = std::to_string(gtid_sd->mysql_port);
if (gitd_id_counter == this->status.p_gtid_executed_map.end()) { prometheus::Counter* gtid_counter = nullptr;
auto& gitd_counter = const auto& pc_itr = this->status.p_gtid_executed_map.find(endpoint);
if (pc_itr == this->status.p_gtid_executed_map.end()) {
auto& gtid_counter_group =
this->status.p_dyn_counter_array[p_hg_dyn_counter::gtid_executed]; this->status.p_dyn_counter_array[p_hg_dyn_counter::gtid_executed];
gtid_counter = std::addressof(gitd_counter->Add({ gtid_counter = std::addressof(gtid_counter_group->Add({
{ "hostname", address }, { "hostname", address },
{ "port", port }, { "port", port },
})); }));
this->status.p_gtid_executed_map.insert( this->status.p_gtid_executed_map.insert(
{ {
endpoint_id, endpoint,
gtid_counter gtid_counter
} }
); );
} else { } else {
gtid_counter = gitd_id_counter->second; gtid_counter = pc_itr->second;
} }
if (gtid_si) { const auto& cur_executed_gtid = gtid_counter->Value();
const auto& cur_executed_gtid = gtid_counter->Value(); gtid_counter->Increment(gtid_sd->events_read - cur_executed_gtid);
gtid_counter->Increment(gtid_si->events_read - cur_executed_gtid);
}
it++; it++;
} }

Loading…
Cancel
Save