Added retry mechanism for missing binlog reader

pull/1404/merge
René Cannaò 8 years ago
parent cd44fcf470
commit 9c4d097917

@ -439,6 +439,8 @@ class MySQL_HostGroups_Manager {
std::unordered_map <string, GTID_Server_Data *> gtid_map;
struct ev_async * gtid_ev_async;
struct ev_loop * gtid_ev_loop;
struct ev_timer * gtid_ev_timer;
bool gtid_missing_nodes;
struct {
unsigned int servers_table_version;
pthread_mutex_t servers_table_version_lock;

@ -47,12 +47,31 @@ static pthread_mutex_t ev_loop_mutex;
//static std::unordered_map <string, Gtid_Server_Info *> gtid_map;
static void gtid_async_cb(struct ev_loop *loop, struct ev_async *watcher, int revents) {
if (glovars.shutdown) {
ev_break(loop);
}
pthread_mutex_lock(&ev_loop_mutex);
MyHGM->gtid_missing_nodes = false;
MyHGM->generate_mysql_gtid_executed_tables();
pthread_mutex_unlock(&ev_loop_mutex);
return;
}
static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int revents) {
ev_timer_stop(loop, timer);
ev_timer_set(timer, 3, 0);
if (glovars.shutdown) {
ev_break(loop);
}
if (MyHGM->gtid_missing_nodes) {
pthread_mutex_lock(&ev_loop_mutex);
MyHGM->gtid_missing_nodes = false;
MyHGM->generate_mysql_gtid_executed_tables();
pthread_mutex_unlock(&ev_loop_mutex);
}
ev_timer_start(loop, timer);
return;
}
static int wait_for_mysql(MYSQL *mysql, int status) {
struct pollfd pfd;
@ -88,11 +107,15 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
//delete sd;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->port));
s1.append(std::to_string(sd->mysql_port));
MyHGM->gtid_missing_nodes = true;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
MyHGM->gtid_map.erase(it2);
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
ev_io_stop(MyHGM->gtid_ev_loop, w);
free(w);
@ -115,8 +138,21 @@ void connect_cb(EV_P_ ev_io *w, int revents) {
int errnum = optval ? optval : errno;
ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
MyHGM->gtid_missing_nodes = true;
GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data;
delete custom_data;
GTID_Server_Data *sd = custom_data;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
//delete custom_data;
free(c);
} else {
ev_io_stop(MyHGM->gtid_ev_loop, w);
@ -454,8 +490,11 @@ static void * GTID_syncer_run() {
MyHGM->gtid_ev_async = (struct ev_async *)malloc(sizeof(struct ev_async));
//ev_async_init(gtid_ev_async, gtid_async_cb);
//ev_async_start(gtid_ev_loop, gtid_ev_async);
MyHGM->gtid_ev_timer = (struct ev_timer *)malloc(sizeof(struct ev_timer));
ev_async_init(MyHGM->gtid_ev_async, gtid_async_cb);
ev_async_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_async);
ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, 3, 0);
ev_timer_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_timer);
//ev_ref(gtid_ev_loop);
ev_run(MyHGM->gtid_ev_loop, 0);
//sleep(1000);
@ -794,6 +833,7 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() {
incoming_replication_hostgroups=NULL;
incoming_group_replication_hostgroups=NULL;
pthread_rwlock_init(&gtid_rwlock, NULL);
gtid_missing_nodes = false;
gtid_ev_async = (struct ev_async *)malloc(sizeof(struct ev_async));
}
void MySQL_HostGroups_Manager::init() {
@ -813,6 +853,7 @@ MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() {
HGCU_thread->join();
//pthread_join(HGCU_thread_id, NULL);
//pthread_join(GTID_syncer_thread_id, NULL);
ev_async_send(gtid_ev_loop, gtid_ev_async);
GTID_syncer_thread->join();
free(gtid_ev_async);
while (MyHostGroups->len) {
@ -1337,10 +1378,12 @@ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uin
GTID_Server_Data *gtid_is=NULL;
if (it2!=gtid_map.end()) {
gtid_is=it2->second;
if (gtid_is->active == true) {
ret = gtid_is->gtid_exists(gtid_uuid,gtid_trxid);
if (gtid_is) {
if (gtid_is->active == true) {
ret = gtid_is->gtid_exists(gtid_uuid,gtid_trxid);
}
}
}
}
//proxy_info("Checking if server %s has GTID %s:%lu . %s\n", s1.c_str(), gtid_uuid, gtid_trxid, (ret ? "YES" : "NO"));
pthread_rwlock_unlock(&gtid_rwlock);
return ret;
@ -1352,7 +1395,9 @@ void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() {
std::unordered_map<string, GTID_Server_Data *>::iterator it = gtid_map.begin();
while(it != gtid_map.end()) {
GTID_Server_Data * gtid_si = it->second;
gtid_si->active = false;
if (gtid_si) {
gtid_si->active = false;
}
it++;
}
@ -1370,6 +1415,11 @@ void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() {
GTID_Server_Data *gtid_is=NULL;
if (it2!=gtid_map.end()) {
gtid_is=it2->second;
if (gtid_is == NULL) {
gtid_map.erase(it2);
}
}
if (gtid_is) {
gtid_is->active = true;
} else {
// we didn't find it. Create it
@ -3197,14 +3247,23 @@ SQLite3_result * MySQL_HostGroups_Manager::get_stats_mysql_gtid_executed() {
GTID_Server_Data * gtid_si = it->second;
char buf[64];
char **pta=(char **)malloc(sizeof(char *)*colnum);
pta[0]=strdup(gtid_si->address);
sprintf(buf,"%d", (int)gtid_si->mysql_port);
pta[1]=strdup(buf);
//sprintf(buf,"%d", mysrvc->port);
string s1 = gtid_executed_to_string(gtid_si->gtid_executed);
pta[2]=strdup(s1.c_str());
sprintf(buf,"%llu", (int)gtid_si->events_read);
pta[3]=strdup(buf);
if (gtid_si) {
pta[0]=strdup(gtid_si->address);
sprintf(buf,"%d", (int)gtid_si->mysql_port);
pta[1]=strdup(buf);
//sprintf(buf,"%d", mysrvc->port);
string s1 = gtid_executed_to_string(gtid_si->gtid_executed);
pta[2]=strdup(s1.c_str());
sprintf(buf,"%llu", (int)gtid_si->events_read);
pta[3]=strdup(buf);
} else {
string host = it->first;
pta[0]=strdup(host.c_str());
sprintf(buf,"%d", (int)0);
pta[1]=strdup(buf);
pta[2]=strdup((char *)"NULL");
pta[3]=strdup((char *)"0");
}
result->add_row(pta);
for (k=0; k<colnum; k++) {
if (pta[k])

Loading…
Cancel
Save