gtid: Refactor reconnect logic & prevent `events_count` reset

- This patch was originally added by commit 0a70fd5 and
  reverted by 8d1b5b5, prior to the release of `v3.0.3`.
- The following issues are addressed in this update,
  - Fix for `use-after-free` issue which occured during CI test.
  - Fix for deadlock issue between `GTID_syncer` and `MySQL_Worker`.

Signed-off-by: Wazir Ahmed <wazir@proxysql.com>
refactor-gtid-events-count_doc
Wazir Ahmed 5 months ago
parent dd847a15e1
commit 50c60284e6

@ -1,5 +1,11 @@
#ifndef CLASS_GTID_Server_Data_H
#define CLASS_GTID_Server_Data_H
#include <cstddef>
#include <cstdint>
#include <string>
#include <proxysql_gtid.h>
class GTID_Server_Data {
public:
char *address;
@ -24,4 +30,7 @@ class GTID_Server_Data {
void read_all_gtids();
void dump();
};
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end);
#endif // CLASS_GTID_Server_Data_H

@ -104,6 +104,7 @@
#include <stddef.h>
#include <string.h>
#include <sys/types.h>
#include <cstdint>
#include <algorithm>
#include <functional>
#include <iostream>

@ -2,6 +2,7 @@
#define PROXYSQL_GTID
// highly inspired by libslave
// https://github.com/vozbu/libslave/
#include <string>
#include <unordered_map>
#include <list>
#include <utility>

@ -1,3 +1,4 @@
#include "GTID_Server_Data.h"
#include "MySQL_HostGroups_Manager.h"
#include "ev.h"
@ -47,21 +48,14 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
bool rc = true;
rc = sd->readall();
if (rc == false) {
//delete sd;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
MyHGM->gtid_missing_nodes = true;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
sd->active = false;
proxy_warning("GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
free(w);
sd->w = nullptr;
} else {
sd->dump();
}
@ -71,48 +65,37 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
void connect_cb(EV_P_ ev_io *w, int revents) {
pthread_mutex_lock(&ev_loop_mutex);
struct ev_io * c = w;
if (revents & EV_WRITE) {
int optval = 0;
socklen_t optlen = sizeof(optval);
if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) ||
(optval != 0)) {
/* Connection failed; try the next address in the list. */
//int errnum = optval ? optval : errno;
ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
int fd = w->fd;
GTID_Server_Data *sd = (GTID_Server_Data *)w->data;
// connect() completed, this watcher is no longer needed
ev_io_stop(MyHGM->gtid_ev_loop, w);
free(w);
sd->w = nullptr;
// Based on fd status, proceed to next step -> waiting for read event on the socket
int error = 0;
socklen_t optlen = sizeof(error);
int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
if (rc == -1 || error != 0) {
/* connection failed */
MyHGM->gtid_missing_nodes = true;
GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data;
GTID_Server_Data *sd = custom_data;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
sd->active = false;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
//delete custom_data;
free(c);
close(fd);
} else {
ev_io_stop(MyHGM->gtid_ev_loop, w);
int fd=w->fd;
struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io));
new_w->data = w->data;
GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data;
custom_data->w = new_w;
free(w);
ev_io_init(new_w, reader_cb, fd, EV_READ);
ev_io_start(MyHGM->gtid_ev_loop, new_w);
struct ev_io *read_watcher = (struct ev_io *) malloc(sizeof(struct ev_io));
read_watcher->data = sd;
sd->w = read_watcher;
ev_io_init(read_watcher, reader_cb, fd, EV_READ);
ev_io_start(MyHGM->gtid_ev_loop, read_watcher);
}
}
pthread_mutex_unlock(&ev_loop_mutex);
}
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) {
struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port) {
int s;
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
@ -149,8 +132,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io));
if (c) {
ev_io_init(c, connect_cb, s, EV_WRITE);
GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port);
c->data = (void *)custom_data;
return c;
}
/* else error */
@ -158,8 +139,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
return NULL;
}
GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
active = true;
w = _w;
@ -188,26 +167,32 @@ GTID_Server_Data::~GTID_Server_Data() {
}
bool GTID_Server_Data::readall() {
bool ret = true;
if (size == len) {
// buffer is full, expand
resize(len*2);
resize(len * 2);
}
int rc = 0;
rc = read(w->fd,data+len,size-len);
rc = read(w->fd, data+len, size-len);
if (rc > 0) {
len += rc;
return true;
}
int myerr = errno;
if (rc == 0) {
proxy_info("Read returned EOF\n");
return false;
}
// rc == -1
proxy_error("Read failed, error %d\n", myerr);
if(myerr == EINTR || myerr == EAGAIN) {
// non-blocking fd, so this should not be considered as an error
return true;
} else {
int myerr = errno;
proxy_error("Read returned %d bytes, error %d\n", rc, myerr);
if (
(rc == 0) ||
(rc==-1 && myerr != EINTR && myerr != EAGAIN)
) {
ret = false;
}
return false;
}
return ret;
}
@ -239,9 +224,6 @@ void GTID_Server_Data::dump() {
return;
}
read_all_gtids();
//int rc = write(1,data+pos,len-pos);
fflush(stdout);
///pos += rc;
if (pos >= len/2) {
memmove(data,data+pos,len-pos);
len = len-pos;
@ -285,13 +267,12 @@ bool GTID_Server_Data::read_next_gtid() {
bs[l-3] = '\0';
char *saveptr1=NULL;
char *saveptr2=NULL;
//char *saveptr3=NULL;
char *token = NULL;
char *subtoken = NULL;
//char *subtoken2 = NULL;
char *str1 = NULL;
char *str2 = NULL;
//char *str3 = NULL;
bool updated = false;
for (str1 = bs; ; str1 = NULL) {
token = strtok_r(str1, ",", &saveptr1);
if (token == NULL) {
@ -312,34 +293,30 @@ bool GTID_Server_Data::read_next_gtid() {
p++;
}
}
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids
uint64_t trx_from;
uint64_t trx_to;
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
std::string s = uuid_server;
gtid_executed[s].emplace_back(trx_from, trx_to);
updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated;
}
}
}
pos += l+1;
free(bs);
//return true;
if (updated) {
events_read++;
}
} else {
strncpy(rec_msg,data+pos,l);
pos += l+1;
rec_msg[l] = 0;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if (rec_msg[0]=='I') {
//char rec_uuid[80];
uint64_t rec_trxid = 0;
char *a = NULL;
int ul = 0;
switch (rec_msg[1]) {
case '1':
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr(rec_msg+3,':');
ul = a-rec_msg-3;
strncpy(uuid_server,rec_msg+3,ul);
@ -347,21 +324,17 @@ bool GTID_Server_Data::read_next_gtid() {
rec_trxid=atoll(a+1);
break;
case '2':
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid=atoll(rec_msg+3);
break;
default:
break;
}
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std::string s = uuid_server;
gtid_t new_gtid = std::make_pair(s,rec_trxid);
addGtid(new_gtid,gtid_executed);
events_read++;
//return true;
}
}
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true;
}
@ -439,6 +412,43 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
}
}
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
bool updated = true;
auto it = gtid_executed.find(server_uuid);
if (it == gtid_executed.end()) {
gtid_executed[server_uuid].emplace_back(txid_start, txid_end);
return updated;
}
bool insert = true;
// When ProxySQL reconnects with binlog reader, it might
// receive updated txid intervals in the bootstrap message.
// For example,
// before disconnection -> server_UUID:1-10
// after reconnection -> server_UUID:1-19
auto &txid_intervals = it->second;
for (auto &interval : txid_intervals) {
if (interval.first == txid_start) {
if(interval.second == txid_end) {
updated = false;
} else {
interval.second = txid_end;
}
insert = false;
break;
}
}
if (insert) {
txid_intervals.emplace_back(txid_start, txid_end);
}
return updated;
}
void * GTID_syncer_run() {
//struct ev_loop * gtid_ev_loop;
//gtid_ev_loop = NULL;

@ -59,7 +59,7 @@ class MyHGC;
const int MYSQL_ERRORS_STATS_FIELD_NUM = 11;
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port);
struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port);
void * GTID_syncer_run();
static int wait_for_mysql(MYSQL *mysql, int status) {
@ -1681,79 +1681,76 @@ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uin
}
void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() {
// NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place,
// e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed.
wrlock();
pthread_rwlock_wrlock(&gtid_rwlock);
// first, set them all as active = false
// first, add them all as stale entries
std::unordered_set<string> stale_server;
std::unordered_map<string, GTID_Server_Data *>::iterator it = gtid_map.begin();
while(it != gtid_map.end()) {
GTID_Server_Data * gtid_si = it->second;
if (gtid_si) {
gtid_si->active = false;
}
stale_server.emplace(it->first);
it++;
}
// NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place,
// e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed.
wrlock();
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
MySrvC *mysrvc=NULL;
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->gtid_port) {
std::string s1 = mysrvc->address;
s1.append(":");
s1.append(std::to_string(mysrvc->port));
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = gtid_map.find(s1);
GTID_Server_Data *gtid_is=NULL;
if (it2!=gtid_map.end()) {
gtid_is=it2->second;
if (gtid_is == NULL) {
gtid_map.erase(it2);
}
std::string srv = mysrvc->address;
srv.append(":");
srv.append(std::to_string(mysrvc->port));
GTID_Server_Data *gtid_sd = nullptr;
it = gtid_map.find(srv);
if (it != gtid_map.end()) {
gtid_sd = it->second;
stale_server.erase(srv);
}
if (gtid_is) {
gtid_is->active = true;
} else if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) {
// we didn't find it. Create it
/*
struct ev_io *watcher = (struct ev_io *)malloc(sizeof(struct ev_io));
gtid_is = new GTID_Server_Data(watcher, mysrvc->address, mysrvc->port, mysrvc->gtid_port);
gtid_map.emplace(s1,gtid_is);
*/
struct ev_io * c = NULL;
c = new_connector(mysrvc->address, mysrvc->gtid_port, mysrvc->port);
if (c) {
gtid_is = (GTID_Server_Data *)c->data;
gtid_map.emplace(s1,gtid_is);
//pthread_mutex_lock(&ev_loop_mutex);
ev_io_start(MyHGM->gtid_ev_loop,c);
//pthread_mutex_unlock(&ev_loop_mutex);
if (gtid_sd && gtid_sd->active) {
continue;
}
if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) {
// a new server with gtid port
// OR an existing server, but we lost connection with binlog_reader
struct ev_io *cw = new_connect_watcher(mysrvc->address, mysrvc->gtid_port, mysrvc->port);
if (cw) {
if (!gtid_sd) {
gtid_sd = new GTID_Server_Data(cw, mysrvc->address, mysrvc->gtid_port, mysrvc->port);
cw->data = (void *)gtid_sd;
gtid_map.emplace(srv, gtid_sd);
} else {
gtid_sd->w = cw;
gtid_sd->active = true;
cw->data = (void *)gtid_sd;
}
ev_io_start(MyHGM->gtid_ev_loop, cw);
}
}
}
}
}
wrunlock();
std::vector<string> to_remove;
it = gtid_map.begin();
while(it != gtid_map.end()) {
GTID_Server_Data * gtid_si = it->second;
if (gtid_si && gtid_si->active == false) {
to_remove.push_back(it->first);
for (auto &srv : stale_server) {
it = gtid_map.find(srv);
GTID_Server_Data *gtid_sd = it->second;
if (gtid_sd->w) {
ev_io_stop(MyHGM->gtid_ev_loop, gtid_sd->w);
close(gtid_sd->w->fd);
free(gtid_sd->w);
}
it++;
}
for (std::vector<string>::iterator it3=to_remove.begin(); it3!=to_remove.end(); ++it3) {
it = gtid_map.find(*it3);
GTID_Server_Data * gtid_si = it->second;
ev_io_stop(MyHGM->gtid_ev_loop, gtid_si->w);
close(gtid_si->w->fd);
free(gtid_si->w);
gtid_map.erase(*it3);
gtid_map.erase(srv);
delete gtid_sd;
}
pthread_rwlock_unlock(&gtid_rwlock);
wrunlock();
}
/**
@ -5807,48 +5804,39 @@ void MySQL_HostGroups_Manager::p_update_mysql_gtid_executed() {
std::unordered_map<string, GTID_Server_Data*>::iterator it = gtid_map.begin();
while(it != gtid_map.end()) {
GTID_Server_Data* gtid_si = it->second;
std::string address {};
std::string port {};
std::string endpoint_id {};
if (gtid_si) {
address = std::string(gtid_si->address);
port = std::to_string(gtid_si->mysql_port);
} else {
std::string s = it->first;
std::size_t found = s.find_last_of(":");
address = s.substr(0, found);
port = s.substr(found + 1);
GTID_Server_Data* gtid_sd = it->second;
if (!gtid_sd) {
// invalid state
continue;
}
endpoint_id = address + ":" + port;
const auto& gitd_id_counter = this->status.p_gtid_executed_map.find(endpoint_id);
prometheus::Counter* gtid_counter = nullptr;
std::string endpoint = it->first;
std::string address = std::string(gtid_sd->address);
std::string port = std::to_string(gtid_sd->mysql_port);
if (gitd_id_counter == this->status.p_gtid_executed_map.end()) {
auto& gitd_counter =
prometheus::Counter* gtid_counter = nullptr;
const auto& pc_itr = this->status.p_gtid_executed_map.find(endpoint);
if (pc_itr == this->status.p_gtid_executed_map.end()) {
auto& gtid_counter_group =
this->status.p_dyn_counter_array[p_hg_dyn_counter::gtid_executed];
gtid_counter = std::addressof(gitd_counter->Add({
gtid_counter = std::addressof(gtid_counter_group->Add({
{ "hostname", address },
{ "port", port },
}));
this->status.p_gtid_executed_map.insert(
{
endpoint_id,
endpoint,
gtid_counter
}
);
} else {
gtid_counter = gitd_id_counter->second;
gtid_counter = pc_itr->second;
}
if (gtid_si) {
const auto& cur_executed_gtid = gtid_counter->Value();
gtid_counter->Increment(gtid_si->events_read - cur_executed_gtid);
}
const auto& cur_executed_gtid = gtid_counter->Value();
gtid_counter->Increment(gtid_sd->events_read - cur_executed_gtid);
it++;
}

Loading…
Cancel
Save