Second commit for GR support

v1.4.0
René Cannaò 10 years ago
parent 3c2ced2a7d
commit 7ea78394f0

@ -132,6 +132,27 @@ class MyHGC { // MySQL Host Group Container
MySrvC *get_random_MySrvC();
};
class Group_Replication_Info {
public:
int writer_hostgroup;
int backup_writer_hostgroup;
int reader_hostgroup;
int offline_hostgroup;
int max_writers;
int max_transactions_behind;
char *comment;
bool active;
bool writer_is_also_reader;
bool __active;
int current_num_writers;
int current_num_backup_writers;
int current_num_readers;
int current_num_offline;
Group_Replication_Info(int w, int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c);
bool update(int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c);
~Group_Replication_Info();
};
class MySQL_HostGroups_Manager {
private:
SQLite3DB *admindb;
@ -154,6 +175,9 @@ class MySQL_HostGroups_Manager {
void generate_mysql_group_replication_hostgroups_table();
SQLite3_result *incoming_group_replication_hostgroups;
pthread_mutex_t Group_Replication_Info_mutex;
std::map<int , Group_Replication_Info *> Group_Replication_Info_Map;
std::thread *HGCU_thread;
public:
@ -199,6 +223,7 @@ class MySQL_HostGroups_Manager {
SQLite3_result * execute_query(char *query, char **error);
SQLite3_result *dump_table_mysql_servers();
SQLite3_result *dump_table_mysql_replication_hostgroups();
SQLite3_result *dump_table_mysql_group_replication_hostgroups();
MyHGC * MyHGC_lookup(unsigned int);
void MyConn_add_to_pool(MySQL_Connection *);

@ -111,6 +111,7 @@ class MySQL_Monitor {
void drop_tables_defs(std::vector<table_def_t *> *tables_defs);
void check_and_build_standard_tables(SQLite3DB *db, std::vector<table_def_t *> *tables_defs);
public:
pthread_mutex_t group_replication_mutex; // for simplicity, a mutex instead of a rwlock
std::map<char *, MyGR_monitor_node *, cmp_str> group_replication_hosts;
unsigned int num_threads;
wqueue<WorkItem*> queue;

@ -375,6 +375,7 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() {
status.frontend_init_db=0;
status.frontend_set_names=0;
status.frontend_use_db=0;
pthread_mutex_init(&Group_Replication_Info_mutex, NULL);
#ifdef MHM_PTHREAD_MUTEX
pthread_mutex_init(&lock, NULL);
#else
@ -729,6 +730,12 @@ bool MySQL_HostGroups_Manager::commit() {
//generate_mysql_servers_table();
generate_mysql_replication_hostgroups_table();
// group replication
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_group_replication_hostgroups");
generate_mysql_group_replication_hostgroups_table();
__sync_fetch_and_add(&status.servers_table_version,1);
wrunlock();
if (GloMTH) {
@ -914,7 +921,70 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table
if (incoming_group_replication_hostgroups==NULL) {
return;
}
int rc;
sqlite3_stmt *statement=NULL;
sqlite3 *mydb3=mydb->get_db();
char *query=(char *)"INSERT INTO mysql_group_replication_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)";
rc=sqlite3_prepare_v2(mydb3, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
proxy_info("New mysql_group_replication_hostgroups table\n");
pthread_mutex_lock(&Group_Replication_Info_mutex);
for (std::map<int , Group_Replication_Info *>::iterator it1 = Group_Replication_Info_Map.begin() ; it1 != Group_Replication_Info_Map.end(); ++it1) {
Group_Replication_Info *info=NULL;
info=it1->second;
info->__active=false;
}
for (std::vector<SQLite3_row *>::iterator it = incoming_group_replication_hostgroups->rows.begin() ; it != incoming_group_replication_hostgroups->rows.end(); ++it) {
SQLite3_row *r=*it;
int writer_hostgroup=atoi(r->fields[0]);
int backup_writer_hostgroup=atoi(r->fields[1]);
int reader_hostgroup=atoi(r->fields[2]);
int offline_hostgroup=atoi(r->fields[3]);
int active=atoi(r->fields[4]);
int max_writers=atoi(r->fields[5]);
int writer_is_also_reader=atoi(r->fields[6]);
int max_transactions_behind=atoi(r->fields[7]);
proxy_info("Loading MySQL Group Replication info for (%d,%d,%d,%d,%s,%d,%d,%d,\"%s\")\n", writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,(active ? "on" : "off"),max_writers,writer_is_also_reader,max_transactions_behind,r->fields[8]);
rc=sqlite3_bind_int64(statement, 1, writer_hostgroup); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 2, backup_writer_hostgroup); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, reader_hostgroup); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, offline_hostgroup); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 5, active); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 6, max_writers); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 7, writer_is_also_reader); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 8, max_transactions_behind); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement, 9, r->fields[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
std::map<int , Group_Replication_Info *>::iterator it2;
it2 = Group_Replication_Info_Map.find(writer_hostgroup);
Group_Replication_Info *info=NULL;
if (it2!=Group_Replication_Info_Map.end()) {
info=it2->second;
bool changed=false;
changed=info->update(backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, (bool)writer_is_also_reader, r->fields[8]);
} else {
info=new Group_Replication_Info(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, (bool)writer_is_also_reader, r->fields[8]);
Group_Replication_Info_Map.insert(Group_Replication_Info_Map.begin(), std::pair<int, Group_Replication_Info *>(writer_hostgroup,info));
}
}
sqlite3_finalize(statement);
incoming_group_replication_hostgroups=NULL;
// remove missing ones
for (auto it3 = Group_Replication_Info_Map.begin(); it3 != Group_Replication_Info_Map.end(); ) {
Group_Replication_Info *info=it3->second;
if (info->__active==false) {
delete info;
it3 = Group_Replication_Info_Map.erase(it3);
} else {
it3++;
}
}
// TODO: it is now time to compute all the changes
pthread_mutex_unlock(&Group_Replication_Info_mutex);
}
SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() {
@ -951,6 +1021,19 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_replication_hostgrou
return resultset;
}
SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_group_replication_hostgroups() {
wrlock();
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment FROM mysql_group_replication_hostgroups";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
return resultset;
}
MyHGC * MySQL_HostGroups_Manager::MyHGC_create(unsigned int _hid) {
MyHGC *myhgc=new MyHGC(_hid);
return myhgc;
@ -1704,3 +1787,81 @@ unsigned long long MySQL_HostGroups_Manager::Get_Memory_Stats() {
wrunlock();
return intsize;
}
Group_Replication_Info::Group_Replication_Info(int w, int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c) {
comment=NULL;
if (c) {
comment=strdup(c);
}
writer_hostgroup=w;
backup_writer_hostgroup=b;
reader_hostgroup=r;
offline_hostgroup=o;
max_writers=mw;
max_transactions_behind=mtb;
active=_a;
writer_is_also_reader=_w;
current_num_writers=0;
current_num_backup_writers=0;
current_num_readers=0;
current_num_offline=0;
__active=true;
}
Group_Replication_Info::~Group_Replication_Info() {
if (comment) {
free(comment);
comment=NULL;
}
}
bool Group_Replication_Info::update(int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c) {
bool ret=false;
__active=true;
if (backup_writer_hostgroup!=b) {
backup_writer_hostgroup=b;
ret=true;
}
if (reader_hostgroup!=r) {
reader_hostgroup=r;
ret=true;
}
if (offline_hostgroup!=o) {
offline_hostgroup=o;
ret=true;
}
if (max_writers!=mw) {
max_writers=mw;
ret=true;
}
if (max_transactions_behind!=mtb) {
max_transactions_behind=mtb;
ret=true;
}
if (active!=_a) {
active=_a;
ret=true;
}
if (writer_is_also_reader!=_w) {
writer_is_also_reader=_w;
ret=true;
}
// for comment we don't change return value
if (comment) {
if (c) {
if (strcmp(comment,c)) {
free(comment);
comment=strdup(c);
}
} else {
free(comment);
comment=NULL;
}
} else {
if (c) {
comment=strdup(c);
}
}
return ret;
}

@ -331,6 +331,8 @@ MySQL_Monitor::MySQL_Monitor() {
My_Conn_Pool=new MySQL_Monitor_Connection_Pool();
pthread_mutex_init(&group_replication_mutex,NULL);
shutdown=false;
monitor_enabled=true; // default
// create new SQLite datatabase

@ -4308,6 +4308,49 @@ void ProxySQL_Admin::save_mysql_servers_runtime_to_database(bool _runtime) {
}
if(resultset) delete resultset;
resultset=NULL;
// dump mysql_group_replication_hostgroups
if (_runtime) {
query=(char *)"DELETE FROM main.runtime_mysql_group_replication_hostgroups";
} else {
query=(char *)"DELETE FROM main.mysql_group_replication_hostgroups";
}
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute(query);
resultset=MyHGM->dump_table_mysql_group_replication_hostgroups();
if (resultset) {
int rc;
sqlite3_stmt *statement=NULL;
sqlite3 *mydb3=admindb->get_db();
char *query=NULL;
if (_runtime) {
query=(char *)"INSERT INTO runtime_mysql_group_replication_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)";
} else {
query=(char *)"INSERT INTO mysql_group_replication_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)";
}
rc=sqlite3_prepare_v2(mydb3, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
//proxy_info("New mysql_group_replication_hostgroups table\n");
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
rc=sqlite3_bind_int64(statement, 1, atoi(r->fields[0])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 2, atoi(r->fields[1])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, atoi(r->fields[2])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, atoi(r->fields[3])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 5, atoi(r->fields[4])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 6, atoi(r->fields[5])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 7, atoi(r->fields[6])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 8, atoi(r->fields[7])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement, 9, r->fields[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
}
sqlite3_finalize(statement);
}
if(resultset) delete resultset;
resultset=NULL;
}
@ -4333,6 +4376,8 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
SQLite3_result *resultset_replication=NULL;
SQLite3_result *resultset_group_replication=NULL;
char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.mysql_servers";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
@ -4391,16 +4436,57 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() {
query=(char *)"SELECT a.* FROM mysql_replication_hostgroups a LEFT JOIN mysql_replication_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup IS NULL";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset_replication);
//MyHGH->wrlock();
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
MyHGM->set_incoming_replication_hostgroups(resultset);
MyHGM->set_incoming_replication_hostgroups(resultset_replication);
}
if (resultset) delete resultset;
resultset=NULL;
// support for Group Replication, table mysql_group_replication_hostgroups
// look for invalid combinations
query=(char *)"SELECT a.* FROM mysql_group_replication_hostgroups a JOIN mysql_group_replication_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup UNION ALL SELECT a.* FROM mysql_group_replication_hostgroups a JOIN mysql_group_replication_hostgroups b ON a.writer_hostgroup=b.backup_writer_hostgroup WHERE b.backup_writer_hostgroup UNION ALL SELECT a.* FROM mysql_group_replication_hostgroups a JOIN mysql_group_replication_hostgroups b ON a.writer_hostgroup=b.offline_hostgroup WHERE b.offline_hostgroup";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
proxy_error("Incompatible entry in mysql_group_replication_hostgroups will be ignored : ( %s , %s , %s , %s )\n", r->fields[0], r->fields[1], r->fields[2], r->fields[3]);
}
}
if (resultset) delete resultset;
resultset=NULL;
query=(char *)"SELECT a.* FROM mysql_group_replication_hostgroups a LEFT JOIN mysql_group_replication_hostgroups b ON (a.writer_hostgroup=b.reader_hostgroup OR a.writer_hostgroup=b.backup_writer_hostgroup OR a.writer_hostgroup=b.offline_hostgroup) WHERE b.reader_hostgroup IS NULL AND b.backup_writer_hostgroup IS NULL AND b.offline_hostgroup IS NULL";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset_group_replication);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
MyHGM->set_incoming_group_replication_hostgroups(resultset_group_replication);
}
// commit all the changes
MyHGM->commit();
// clean up
if (resultset) delete resultset;
resultset=NULL;
if (resultset_replication) {
delete resultset_replication;
resultset_replication=NULL;
}
if (resultset_group_replication) {
delete resultset_replication;
resultset_group_replication=NULL;
}
}

Loading…
Cancel
Save