|
|
|
|
@ -1,3 +1,4 @@
|
|
|
|
|
#include "GTID_Server_Data.h"
|
|
|
|
|
#include "MySQL_HostGroups_Manager.h"
|
|
|
|
|
|
|
|
|
|
#include "ev.h"
|
|
|
|
|
@ -47,20 +48,12 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
|
|
|
|
|
bool rc = true;
|
|
|
|
|
rc = sd->readall();
|
|
|
|
|
if (rc == false) {
|
|
|
|
|
//delete sd;
|
|
|
|
|
std::string s1 = sd->address;
|
|
|
|
|
s1.append(":");
|
|
|
|
|
s1.append(std::to_string(sd->mysql_port));
|
|
|
|
|
MyHGM->gtid_missing_nodes = true;
|
|
|
|
|
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
|
|
|
|
|
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
|
|
|
|
|
it2 = MyHGM->gtid_map.find(s1);
|
|
|
|
|
if (it2 != MyHGM->gtid_map.end()) {
|
|
|
|
|
//MyHGM->gtid_map.erase(it2);
|
|
|
|
|
it2->second = NULL;
|
|
|
|
|
delete sd;
|
|
|
|
|
}
|
|
|
|
|
sd->active = false;
|
|
|
|
|
proxy_warning("GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
|
|
|
|
|
|
|
|
|
|
ev_io_stop(MyHGM->gtid_ev_loop, w);
|
|
|
|
|
close(w->fd);
|
|
|
|
|
free(w);
|
|
|
|
|
} else {
|
|
|
|
|
sd->dump();
|
|
|
|
|
@ -71,48 +64,36 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
|
|
|
|
|
|
|
|
|
|
void connect_cb(EV_P_ ev_io *w, int revents) {
|
|
|
|
|
pthread_mutex_lock(&ev_loop_mutex);
|
|
|
|
|
struct ev_io * c = w;
|
|
|
|
|
if (revents & EV_WRITE) {
|
|
|
|
|
int optval = 0;
|
|
|
|
|
socklen_t optlen = sizeof(optval);
|
|
|
|
|
if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) ||
|
|
|
|
|
(optval != 0)) {
|
|
|
|
|
/* Connection failed; try the next address in the list. */
|
|
|
|
|
//int errnum = optval ? optval : errno;
|
|
|
|
|
ev_io_stop(MyHGM->gtid_ev_loop, w);
|
|
|
|
|
close(w->fd);
|
|
|
|
|
int fd = w->fd;
|
|
|
|
|
GTID_Server_Data *sd = (GTID_Server_Data *)w->data;
|
|
|
|
|
|
|
|
|
|
// connect() completed, this watcher is no longer needed
|
|
|
|
|
ev_io_stop(MyHGM->gtid_ev_loop, w);
|
|
|
|
|
free(w);
|
|
|
|
|
|
|
|
|
|
// Based on fd status, proceed to next step -> waiting for read event on the socket
|
|
|
|
|
int error = 0;
|
|
|
|
|
socklen_t optlen = sizeof(error);
|
|
|
|
|
int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
|
|
|
|
|
if (rc == -1 || error != 0) {
|
|
|
|
|
/* connection failed */
|
|
|
|
|
MyHGM->gtid_missing_nodes = true;
|
|
|
|
|
GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data;
|
|
|
|
|
GTID_Server_Data *sd = custom_data;
|
|
|
|
|
std::string s1 = sd->address;
|
|
|
|
|
s1.append(":");
|
|
|
|
|
s1.append(std::to_string(sd->mysql_port));
|
|
|
|
|
sd->active = false;
|
|
|
|
|
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
|
|
|
|
|
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
|
|
|
|
|
it2 = MyHGM->gtid_map.find(s1);
|
|
|
|
|
if (it2 != MyHGM->gtid_map.end()) {
|
|
|
|
|
//MyHGM->gtid_map.erase(it2);
|
|
|
|
|
it2->second = NULL;
|
|
|
|
|
delete sd;
|
|
|
|
|
}
|
|
|
|
|
//delete custom_data;
|
|
|
|
|
free(c);
|
|
|
|
|
close(fd);
|
|
|
|
|
} else {
|
|
|
|
|
ev_io_stop(MyHGM->gtid_ev_loop, w);
|
|
|
|
|
int fd=w->fd;
|
|
|
|
|
struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io));
|
|
|
|
|
new_w->data = w->data;
|
|
|
|
|
GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data;
|
|
|
|
|
custom_data->w = new_w;
|
|
|
|
|
free(w);
|
|
|
|
|
ev_io_init(new_w, reader_cb, fd, EV_READ);
|
|
|
|
|
ev_io_start(MyHGM->gtid_ev_loop, new_w);
|
|
|
|
|
struct ev_io *read_watcher = (struct ev_io *) malloc(sizeof(struct ev_io));
|
|
|
|
|
read_watcher->data = sd;
|
|
|
|
|
sd->w = read_watcher;
|
|
|
|
|
ev_io_init(read_watcher, reader_cb, fd, EV_READ);
|
|
|
|
|
ev_io_start(MyHGM->gtid_ev_loop, read_watcher);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pthread_mutex_unlock(&ev_loop_mutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) {
|
|
|
|
|
struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port) {
|
|
|
|
|
int s;
|
|
|
|
|
|
|
|
|
|
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
|
|
|
|
|
@ -149,8 +130,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
|
|
|
|
|
struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io));
|
|
|
|
|
if (c) {
|
|
|
|
|
ev_io_init(c, connect_cb, s, EV_WRITE);
|
|
|
|
|
GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port);
|
|
|
|
|
c->data = (void *)custom_data;
|
|
|
|
|
return c;
|
|
|
|
|
}
|
|
|
|
|
/* else error */
|
|
|
|
|
@ -158,8 +137,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
|
|
|
|
|
active = true;
|
|
|
|
|
w = _w;
|
|
|
|
|
@ -188,26 +165,32 @@ GTID_Server_Data::~GTID_Server_Data() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool GTID_Server_Data::readall() {
|
|
|
|
|
bool ret = true;
|
|
|
|
|
if (size == len) {
|
|
|
|
|
// buffer is full, expand
|
|
|
|
|
resize(len*2);
|
|
|
|
|
resize(len * 2);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int rc = 0;
|
|
|
|
|
rc = read(w->fd,data+len,size-len);
|
|
|
|
|
rc = read(w->fd, data+len, size-len);
|
|
|
|
|
if (rc > 0) {
|
|
|
|
|
len += rc;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int myerr = errno;
|
|
|
|
|
if (rc == 0) {
|
|
|
|
|
proxy_info("Read returned EOF\n");
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// rc == -1
|
|
|
|
|
proxy_error("Read failed, error %d\n", myerr);
|
|
|
|
|
if(myerr == EINTR || myerr == EAGAIN) {
|
|
|
|
|
// non-blocking fd, so this should not be considered as an error
|
|
|
|
|
return true;
|
|
|
|
|
} else {
|
|
|
|
|
int myerr = errno;
|
|
|
|
|
proxy_error("Read returned %d bytes, error %d\n", rc, myerr);
|
|
|
|
|
if (
|
|
|
|
|
(rc == 0) ||
|
|
|
|
|
(rc==-1 && myerr != EINTR && myerr != EAGAIN)
|
|
|
|
|
) {
|
|
|
|
|
ret = false;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -239,9 +222,6 @@ void GTID_Server_Data::dump() {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
read_all_gtids();
|
|
|
|
|
//int rc = write(1,data+pos,len-pos);
|
|
|
|
|
fflush(stdout);
|
|
|
|
|
///pos += rc;
|
|
|
|
|
if (pos >= len/2) {
|
|
|
|
|
memmove(data,data+pos,len-pos);
|
|
|
|
|
len = len-pos;
|
|
|
|
|
@ -285,13 +265,12 @@ bool GTID_Server_Data::read_next_gtid() {
|
|
|
|
|
bs[l-3] = '\0';
|
|
|
|
|
char *saveptr1=NULL;
|
|
|
|
|
char *saveptr2=NULL;
|
|
|
|
|
//char *saveptr3=NULL;
|
|
|
|
|
char *token = NULL;
|
|
|
|
|
char *subtoken = NULL;
|
|
|
|
|
//char *subtoken2 = NULL;
|
|
|
|
|
char *str1 = NULL;
|
|
|
|
|
char *str2 = NULL;
|
|
|
|
|
//char *str3 = NULL;
|
|
|
|
|
bool updated = false;
|
|
|
|
|
|
|
|
|
|
for (str1 = bs; ; str1 = NULL) {
|
|
|
|
|
token = strtok_r(str1, ",", &saveptr1);
|
|
|
|
|
if (token == NULL) {
|
|
|
|
|
@ -312,34 +291,30 @@ bool GTID_Server_Data::read_next_gtid() {
|
|
|
|
|
p++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//fprintf(stdout,"BS from %s\n", uuid_server);
|
|
|
|
|
} else { // we are reading the trxids
|
|
|
|
|
uint64_t trx_from;
|
|
|
|
|
uint64_t trx_to;
|
|
|
|
|
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
|
|
|
|
|
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
|
|
|
|
|
std::string s = uuid_server;
|
|
|
|
|
gtid_executed[s].emplace_back(trx_from, trx_to);
|
|
|
|
|
updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pos += l+1;
|
|
|
|
|
free(bs);
|
|
|
|
|
//return true;
|
|
|
|
|
|
|
|
|
|
if (updated) {
|
|
|
|
|
events_read++;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
strncpy(rec_msg,data+pos,l);
|
|
|
|
|
pos += l+1;
|
|
|
|
|
rec_msg[l] = 0;
|
|
|
|
|
//int rc = write(1,data+pos,l+1);
|
|
|
|
|
//fprintf(stdout,"%s\n", rec_msg);
|
|
|
|
|
if (rec_msg[0]=='I') {
|
|
|
|
|
//char rec_uuid[80];
|
|
|
|
|
uint64_t rec_trxid = 0;
|
|
|
|
|
char *a = NULL;
|
|
|
|
|
int ul = 0;
|
|
|
|
|
switch (rec_msg[1]) {
|
|
|
|
|
case '1':
|
|
|
|
|
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
|
|
|
|
|
a = strchr(rec_msg+3,':');
|
|
|
|
|
ul = a-rec_msg-3;
|
|
|
|
|
strncpy(uuid_server,rec_msg+3,ul);
|
|
|
|
|
@ -347,21 +322,17 @@ bool GTID_Server_Data::read_next_gtid() {
|
|
|
|
|
rec_trxid=atoll(a+1);
|
|
|
|
|
break;
|
|
|
|
|
case '2':
|
|
|
|
|
//sscanf(rec_msg+3,"%lu",&rec_trxid);
|
|
|
|
|
rec_trxid=atoll(rec_msg+3);
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
|
|
|
|
|
std::string s = uuid_server;
|
|
|
|
|
gtid_t new_gtid = std::make_pair(s,rec_trxid);
|
|
|
|
|
addGtid(new_gtid,gtid_executed);
|
|
|
|
|
events_read++;
|
|
|
|
|
//return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -439,6 +410,43 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
|
|
|
|
|
bool updated = true;
|
|
|
|
|
|
|
|
|
|
auto it = gtid_executed.find(server_uuid);
|
|
|
|
|
if (it == gtid_executed.end()) {
|
|
|
|
|
gtid_executed[server_uuid].emplace_back(txid_start, txid_end);
|
|
|
|
|
return updated;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool insert = true;
|
|
|
|
|
|
|
|
|
|
// When ProxySQL reconnects with binlog reader, it might
|
|
|
|
|
// receive updated txid intervals in the bootstrap message.
|
|
|
|
|
// For example,
|
|
|
|
|
// before disconnection -> server_UUID:1-10
|
|
|
|
|
// after reconnection -> server_UUID:1-19
|
|
|
|
|
auto &txid_intervals = it->second;
|
|
|
|
|
for (auto &interval : txid_intervals) {
|
|
|
|
|
if (interval.first == txid_start) {
|
|
|
|
|
if(interval.second == txid_end) {
|
|
|
|
|
updated = false;
|
|
|
|
|
} else {
|
|
|
|
|
interval.second = txid_end;
|
|
|
|
|
}
|
|
|
|
|
insert = false;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (insert) {
|
|
|
|
|
txid_intervals.emplace_back(txid_start, txid_end);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return updated;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void * GTID_syncer_run() {
|
|
|
|
|
//struct ev_loop * gtid_ev_loop;
|
|
|
|
|
//gtid_ev_loop = NULL;
|
|
|
|
|
|