More support for group replication

v1.4.0
René Cannaò 9 years ago
parent 63e86039e6
commit c7cffc9ea7

@ -165,7 +165,7 @@ class MySQL_HostGroups_Manager {
void add(MySrvC *, unsigned int);
void purge_mysql_servers_table();
void generate_mysql_servers_table();
void generate_mysql_servers_table(int *_onlyhg=NULL);
void generate_mysql_replication_hostgroups_table();
SQLite3_result *incoming_replication_hostgroups;
void generate_mysql_group_replication_hostgroups_table();
@ -238,6 +238,10 @@ class MySQL_HostGroups_Manager {
void shun_and_killall(char *hostname, int port);
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();
void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
};
#endif /* __CLASS_MYSQL_HOSTGROUPS_MANAGER_H */

@ -18,6 +18,7 @@
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG "CREATE TABLE mysql_server_group_replication_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , viable_candidate VARCHAR NOT NULL DEFAULT 'NO' , read_only VARCHAR NOT NULL DEFAULT 'YES' , transactions_behind INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"
/*
struct cmp_str {
@ -28,11 +29,12 @@ struct cmp_str {
};
*/
#define MyGR_Nentries 10
#define MyGR_Nentries 50
typedef struct _MyGR_status_entry_t {
// char *address;
// int port;
unsigned long long start_time;
unsigned long long check_time;
long long transactions_behind;
bool primary_partition;
@ -51,7 +53,7 @@ class MyGR_monitor_node {
MyGR_status_entry_t last_entries[MyGR_Nentries];
MyGR_monitor_node(char *_a, int _p, int _whg);
~MyGR_monitor_node();
bool add_entry(unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error); // return true if status changed
bool add_entry(unsigned long long _st, unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error); // return true if status changed
};
@ -74,6 +76,8 @@ class MySQL_Monitor_State_Data {
char *hostname;
int port;
int writer_hostgroup; // used only by group replication
bool writer_is_also_reader; // used only by group replication
int max_transactions_behind; // used only by group replication
bool use_ssl;
MYSQL *mysql;
MYSQL_RES *result;
@ -130,6 +134,7 @@ class MySQL_Monitor {
void * monitor_group_replication();
void * monitor_replication_lag();
void * run();
void populate_monitor_mysql_server_group_replication_log();
};
#endif /* __CLASS_MYSQL_MONITOR_H */

@ -352,6 +352,7 @@ void MySQL_HostGroups_Manager::wrunlock() {
#endif
}
unsigned int MySQL_HostGroups_Manager::get_servers_table_version() {
return __sync_fetch_and_add(&status.servers_table_version,0);
}
@ -623,10 +624,11 @@ bool MySQL_HostGroups_Manager::commit() {
// group replication
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_group_replication_hostgroups");
generate_mysql_group_replication_hostgroups_table();
if (incoming_group_replication_hostgroups) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_group_replication_hostgroups");
generate_mysql_group_replication_hostgroups_table();
}
__sync_fetch_and_add(&status.servers_table_version,1);
wrunlock();
if (GloMTH) {
@ -653,7 +655,7 @@ void MySQL_HostGroups_Manager::purge_mysql_servers_table() {
}
}
void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
void MySQL_HostGroups_Manager::generate_mysql_servers_table(int *_onlyhg) {
int rc;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
@ -667,8 +669,16 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
rc=sqlite3_prepare_v2(mydb3, query32, -1, &statement32, 0);
assert(rc==SQLITE_OK);
proxy_info("Dumping current MySQL Servers structures\n");
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
if (_onlyhg) {
int hidonly=*_onlyhg;
if (myhgc->hid!=(unsigned int)hidonly) {
// skipping this HG
continue;
}
}
MySrvC *mysrvc=NULL;
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
@ -847,7 +857,7 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port";
char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl , writer_is_also_reader , max_transactions_behind FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port";
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (resultset) {
if (GloMyMon->Group_Replication_Hosts_resultset) {
@ -1694,3 +1704,230 @@ bool Group_Replication_Info::update(int b, int r, int o, int mw, int mtb, bool _
}
return ret;
}
void MySQL_HostGroups_Manager::update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *_error) {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=NULL;
char *q=NULL;
char *error=NULL;
q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s' AND port=%d AND status<>3";
query=(char *)malloc(strlen(q)+strlen(_hostname)+32);
sprintf(query,q,_hostname,_port);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
free(error);
error=NULL;
}
free(query);
if (resultset) { // we lock only if needed
if (resultset->rows_count) {
proxy_warning("Group Replication: setting host %s:%d offline because: %s\n", _hostname, _port, _error);
GloAdmin->mysql_servers_wrlock();
mydb->execute("DELETE FROM mysql_servers_incoming");
mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers");
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
commit();
wrlock();
SQLite3_result *resultset2=NULL;
q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_port,_writer_hostgroup);
mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2);
if (resultset2) {
if (resultset2->rows_count) {
for (std::vector<SQLite3_row *>::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) {
SQLite3_row *r=*it;
int writer_hostgroup=atoi(r->fields[0]);
int backup_writer_hostgroup=atoi(r->fields[1]);
int reader_hostgroup=atoi(r->fields[2]);
int offline_hostgroup=atoi(r->fields[3]);
q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)";
sprintf(query,q,_port,_writer_hostgroup);
mydb->execute(query);
generate_mysql_servers_table(&writer_hostgroup);
generate_mysql_servers_table(&backup_writer_hostgroup);
generate_mysql_servers_table(&reader_hostgroup);
generate_mysql_servers_table(&offline_hostgroup);
}
}
delete resultset2;
resultset2=NULL;
}
wrunlock();
GloAdmin->mysql_servers_wrunlock();
free(query);
}
}
if (resultset) {
delete resultset;
resultset=NULL;
}
}
void MySQL_HostGroups_Manager::update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *_error) {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=NULL;
char *q=NULL;
char *error=NULL;
q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3";
query=(char *)malloc(strlen(q)+strlen(_hostname)+32);
sprintf(query,q,_hostname,_port);
mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset);
if (error) {
free(error);
error=NULL;
}
free(query);
if (resultset) { // we lock only if needed
if (resultset->rows_count) {
proxy_warning("Group Replication: setting host %s:%d read_only because: %s\n", _hostname, _port, _error);
GloAdmin->mysql_servers_wrlock();
mydb->execute("DELETE FROM mysql_servers_incoming");
mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers");
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
commit();
wrlock();
SQLite3_result *resultset2=NULL;
q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_port,_writer_hostgroup);
mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2);
if (resultset2) {
if (resultset2->rows_count) {
for (std::vector<SQLite3_row *>::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) {
SQLite3_row *r=*it;
int writer_hostgroup=atoi(r->fields[0]);
int backup_writer_hostgroup=atoi(r->fields[1]);
int reader_hostgroup=atoi(r->fields[2]);
int offline_hostgroup=atoi(r->fields[3]);
q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)";
sprintf(query,q,_port,_writer_hostgroup);
mydb->execute(query);
generate_mysql_servers_table(&writer_hostgroup);
generate_mysql_servers_table(&backup_writer_hostgroup);
generate_mysql_servers_table(&reader_hostgroup);
generate_mysql_servers_table(&offline_hostgroup);
}
}
delete resultset2;
resultset2=NULL;
}
wrunlock();
GloAdmin->mysql_servers_wrunlock();
free(query);
}
}
if (resultset) {
delete resultset;
resultset=NULL;
}
}
void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup) {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=NULL;
char *q=NULL;
char *error=NULL;
q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3";
query=(char *)malloc(strlen(q)+strlen(_hostname)+32);
sprintf(query,q,_hostname,_port);
mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset);
if (error) {
free(error);
error=NULL;
}
free(query);
if (resultset) { // we lock only if needed
if (resultset->rows_count) {
proxy_warning("Group Replication: setting host %s:%d as writer\n", _hostname, _port);
GloAdmin->mysql_servers_wrlock();
mydb->execute("DELETE FROM mysql_servers_incoming");
mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers");
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d";
query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=%d";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
commit();
wrlock();
SQLite3_result *resultset2=NULL;
q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_port,_writer_hostgroup);
mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2);
if (resultset2) {
if (resultset2->rows_count) {
for (std::vector<SQLite3_row *>::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) {
SQLite3_row *r=*it;
int writer_hostgroup=atoi(r->fields[0]);
int backup_writer_hostgroup=atoi(r->fields[1]);
int reader_hostgroup=atoi(r->fields[2]);
int offline_hostgroup=atoi(r->fields[3]);
q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)";
sprintf(query,q,_port,_writer_hostgroup);
mydb->execute(query);
generate_mysql_servers_table(&writer_hostgroup);
generate_mysql_servers_table(&backup_writer_hostgroup);
generate_mysql_servers_table(&reader_hostgroup);
generate_mysql_servers_table(&offline_hostgroup);
}
}
delete resultset2;
resultset2=NULL;
}
wrunlock();
GloAdmin->mysql_servers_wrunlock();
free(query);
}
}
if (resultset) {
delete resultset;
resultset=NULL;
}
}

@ -274,12 +274,14 @@ MySQL_Monitor::MySQL_Monitor() {
insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_read_only_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_replication_lag_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_group_replication_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG);
// create monitoring tables
check_and_build_standard_tables(monitordb, tables_defs_monitor);
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON mysql_server_connect_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON mysql_server_ping_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_read_only_log_time_start ON mysql_server_read_only_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_replication_lag_log_time_start ON mysql_server_replication_lag_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_group_replication_log_time_start ON mysql_server_group_replication_log (time_start_us)");
num_threads=8;
if (GloMTH) {
@ -791,11 +793,16 @@ __exit_monitor_group_replication_thread:
read_only=false;
}
transactions_behind=atol(row[2]);
mysql_free_result(mmsd->result);
mmsd->result=NULL;
}
__end_process_group_replication_result:
proxy_info("GR: %s:%d , viable=%s , ro=%s, trx=%ld, err=%s\n", mmsd->hostname, mmsd->port, (viable_candidate ? "YES": "NO") , (read_only ? "YES": "NO") , transactions_behind, ( mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "") );
//proxy_info("GR: %s:%d , viable=%s , ro=%s, trx=%ld, err=%s\n", mmsd->hostname, mmsd->port, (viable_candidate ? "YES": "NO") , (read_only ? "YES": "NO") , transactions_behind, ( mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "") );
if (mmsd->mysql_error_msg) {
//proxy_warning("GR: %s:%d , viable=%s , ro=%s, trx=%ld, err=%s\n", mmsd->hostname, mmsd->port, (viable_candidate ? "YES": "NO") , (read_only ? "YES": "NO") , transactions_behind, ( mmsd->mysql_error_msg ? mmsd->mysql_error_msg : "") );
}
unsigned long long time_now=realtime_time();
time_now=time_now-(mmsd->t2 - start_time);
pthread_mutex_lock(&GloMyMon->group_replication_mutex);
//auto it =
// TODO : complete this
@ -804,13 +811,36 @@ __end_process_group_replication_result:
MyGR_monitor_node *node=NULL;
if (it2!=GloMyMon->Group_Replication_Hosts_Map.end()) {
node=it2->second;
node->add_entry(time_now,transactions_behind,viable_candidate,read_only,NULL);
node->add_entry(time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1) , transactions_behind,viable_candidate,read_only,mmsd->mysql_error_msg);
} else {
node = new MyGR_monitor_node(mmsd->hostname,mmsd->port,mmsd->writer_hostgroup);
node->add_entry(time_now,transactions_behind,viable_candidate,read_only,NULL);
node->add_entry(time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1) , transactions_behind,viable_candidate,read_only,mmsd->mysql_error_msg);
GloMyMon->Group_Replication_Hosts_Map.insert(std::make_pair(s,node));
}
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);
// NOTE: we update MyHGM outside the mutex group_replication_mutex
if (mmsd->mysql_error_msg) { // there was an error checking the status of the server, surely we need to reconfigure GR
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg);
} else {
if (viable_candidate==false) {
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"viable_candidate=NO");
} else {
if (read_only==true) {
if (transactions_behind > mmsd->max_transactions_behind) {
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging");
} else {
MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES");
}
} else {
// the node is a writer
// TODO: for now we don't care about the number of writers
MyHGM->update_group_replication_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup);
}
}
}
// clean up
if (l<110) {
} else {
free(s);
@ -1554,7 +1584,9 @@ void * MySQL_Monitor::monitor_group_replication() {
for (std::vector<SQLite3_row *>::iterator it = Group_Replication_Hosts_resultset->rows.begin() ; it != Group_Replication_Hosts_resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[1],atoi(r->fields[2]), NULL, atoi(r->fields[3]));
mmsd->writer_hostgroup=atoi(r->fields[1]);
mmsd->writer_hostgroup=atoi(r->fields[0]);
mmsd->writer_is_also_reader=atoi(r->fields[4]);
mmsd->max_transactions_behind=atoi(r->fields[5]);
mmsd->mondb=monitordb;
//pthread_t thr_;
//if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) {
@ -1846,6 +1878,7 @@ MyGR_monitor_node::MyGR_monitor_node(char *_a, int _p, int _whg) {
int i;
for (i=0;i<MyGR_Nentries;i++) {
last_entries[i].error=NULL;
last_entries[i].start_time=0;
}
}
@ -1856,7 +1889,7 @@ MyGR_monitor_node::~MyGR_monitor_node() {
}
// return true if status changed
bool MyGR_monitor_node::add_entry(unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error) {
bool MyGR_monitor_node::add_entry(unsigned long long _st, unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error) {
bool ret=false;
if (idx_last_entry==-1) ret=true;
int prev_last_entry=idx_last_entry;
@ -1864,6 +1897,7 @@ bool MyGR_monitor_node::add_entry(unsigned long long _ct, long long _tb, bool _p
if (idx_last_entry>=MyGR_Nentries) {
idx_last_entry=0;
}
last_entries[idx_last_entry].start_time=_st;
last_entries[idx_last_entry].check_time=_ct;
last_entries[idx_last_entry].transactions_behind=_tb;
last_entries[idx_last_entry].primary_partition=_pp;
@ -1894,3 +1928,42 @@ bool MyGR_monitor_node::add_entry(unsigned long long _ct, long long _tb, bool _p
}
return ret;
}
void MySQL_Monitor::populate_monitor_mysql_server_group_replication_log() {
sqlite3 *mondb=monitordb->get_db();
int rc;
//char *query=NULL;
char *query1=NULL;
query1=(char *)"INSERT INTO mysql_server_group_replication_log VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)";
sqlite3_stmt *statement1=NULL;
pthread_mutex_lock(&GloMyMon->group_replication_mutex);
rc=sqlite3_prepare_v2(mondb, query1, -1, &statement1, 0);
assert(rc==SQLITE_OK);
monitordb->execute((char *)"DELETE FROM mysql_server_group_replication_log");
std::map<std::string, MyGR_monitor_node *>::iterator it2;
MyGR_monitor_node *node=NULL;
for (it2=GloMyMon->Group_Replication_Hosts_Map.begin(); it2!=GloMyMon->Group_Replication_Hosts_Map.end(); ++it2) {
std::string s=it2->first;
node=it2->second;
std::size_t found=s.find_last_of(":");
std::string host=s.substr(0,found);
std::string port=s.substr(found+1);
int i;
for (i=0; i<MyGR_Nentries; i++) {
if (node->last_entries[i].start_time) {
rc=sqlite3_bind_text(statement1, 1, host.c_str(), -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, atoi(port.c_str())); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, node->last_entries[i].start_time ); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 4, node->last_entries[i].check_time ); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 5, ( node->last_entries[i].primary_partition ? (char *)"YES" : (char *)"NO" ) , -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 6, ( node->last_entries[i].read_only ? (char *)"YES" : (char *)"NO" ) , -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 7, node->last_entries[i].transactions_behind ); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 8, node->last_entries[i].error , -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
}
}
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);
}

@ -97,6 +97,8 @@ extern Query_Processor *GloQPro;
extern MySQL_Threads_Handler *GloMTH;
extern MySQL_Logger *GloMyLogger;
extern MySQL_STMT_Manager *GloMyStmt;
extern MySQL_Monitor *GloMyMon;
#define PANIC(msg) { perror(msg); exit(EXIT_FAILURE); }
int rc, arg_on=1, arg_off=0;
@ -1222,6 +1224,8 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
bool runtime_mysql_servers=false;
bool runtime_mysql_query_rules=false;
bool monitor_mysql_server_group_replication_log=false;
if (strcasestr(query_no_space,"processlist"))
// This will match the following usecases:
// SHOW PROCESSLIST
@ -1264,6 +1268,9 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
}
}
}
if (strstr(query_no_space,"mysql_server_group_replication_log")) {
monitor_mysql_server_group_replication_log=true; refresh=true;
}
// if (stats_mysql_processlist || stats_mysql_connection_pool || stats_mysql_query_digest || stats_mysql_query_digest_reset) {
if (refresh==true) {
pthread_mutex_lock(&admin_mutex);
@ -1303,6 +1310,11 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
save_scheduler_runtime_to_database(true);
}
}
if (monitor_mysql_server_group_replication_log) {
if (GloMyMon) {
GloMyMon->populate_monitor_mysql_server_group_replication_log();
}
}
pthread_mutex_unlock(&admin_mutex);
}
}

Loading…
Cancel
Save