From 7ea78394f0ffc01be946c3835d5b6968fa2e2f0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 13 Dec 2016 19:01:03 +0000 Subject: [PATCH] Second commit for GR support --- include/MySQL_HostGroups_Manager.h | 25 +++++ include/MySQL_Monitor.hpp | 1 + lib/MySQL_HostGroups_Manager.cpp | 161 +++++++++++++++++++++++++++++ lib/MySQL_Monitor.cpp | 2 + lib/ProxySQL_Admin.cpp | 90 +++++++++++++++- 5 files changed, 277 insertions(+), 2 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 030e96188..4b86fbc2a 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -132,6 +132,27 @@ class MyHGC { // MySQL Host Group Container MySrvC *get_random_MySrvC(); }; +class Group_Replication_Info { + public: + int writer_hostgroup; + int backup_writer_hostgroup; + int reader_hostgroup; + int offline_hostgroup; + int max_writers; + int max_transactions_behind; + char *comment; + bool active; + bool writer_is_also_reader; + bool __active; + int current_num_writers; + int current_num_backup_writers; + int current_num_readers; + int current_num_offline; + Group_Replication_Info(int w, int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c); + bool update(int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c); + ~Group_Replication_Info(); +}; + class MySQL_HostGroups_Manager { private: SQLite3DB *admindb; @@ -154,6 +175,9 @@ class MySQL_HostGroups_Manager { void generate_mysql_group_replication_hostgroups_table(); SQLite3_result *incoming_group_replication_hostgroups; + pthread_mutex_t Group_Replication_Info_mutex; + std::map Group_Replication_Info_Map; + std::thread *HGCU_thread; public: @@ -199,6 +223,7 @@ class MySQL_HostGroups_Manager { SQLite3_result * execute_query(char *query, char **error); SQLite3_result *dump_table_mysql_servers(); SQLite3_result *dump_table_mysql_replication_hostgroups(); + SQLite3_result *dump_table_mysql_group_replication_hostgroups(); MyHGC * MyHGC_lookup(unsigned int); void MyConn_add_to_pool(MySQL_Connection *); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index e8b20a05e..898e7685d 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -111,6 +111,7 @@ class MySQL_Monitor { void drop_tables_defs(std::vector *tables_defs); void check_and_build_standard_tables(SQLite3DB *db, std::vector *tables_defs); public: + pthread_mutex_t group_replication_mutex; // for simplicity, a mutex instead of a rwlock std::map group_replication_hosts; unsigned int num_threads; wqueue queue; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index e8c191f96..10faadda4 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -375,6 +375,7 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() { status.frontend_init_db=0; status.frontend_set_names=0; status.frontend_use_db=0; + pthread_mutex_init(&Group_Replication_Info_mutex, NULL); #ifdef MHM_PTHREAD_MUTEX pthread_mutex_init(&lock, NULL); #else @@ -729,6 +730,12 @@ bool MySQL_HostGroups_Manager::commit() { //generate_mysql_servers_table(); generate_mysql_replication_hostgroups_table(); + + // group replication + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n"); + mydb->execute("DELETE FROM mysql_group_replication_hostgroups"); + generate_mysql_group_replication_hostgroups_table(); + __sync_fetch_and_add(&status.servers_table_version,1); wrunlock(); if (GloMTH) { @@ -914,7 +921,70 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table if (incoming_group_replication_hostgroups==NULL) { return; } + int rc; + sqlite3_stmt *statement=NULL; + sqlite3 *mydb3=mydb->get_db(); + char *query=(char *)"INSERT INTO mysql_group_replication_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"; + rc=sqlite3_prepare_v2(mydb3, query, -1, &statement, 0); + assert(rc==SQLITE_OK); proxy_info("New mysql_group_replication_hostgroups table\n"); + pthread_mutex_lock(&Group_Replication_Info_mutex); + for (std::map::iterator it1 = Group_Replication_Info_Map.begin() ; it1 != Group_Replication_Info_Map.end(); ++it1) { + Group_Replication_Info *info=NULL; + info=it1->second; + info->__active=false; + } + for (std::vector::iterator it = incoming_group_replication_hostgroups->rows.begin() ; it != incoming_group_replication_hostgroups->rows.end(); ++it) { + SQLite3_row *r=*it; + int writer_hostgroup=atoi(r->fields[0]); + int backup_writer_hostgroup=atoi(r->fields[1]); + int reader_hostgroup=atoi(r->fields[2]); + int offline_hostgroup=atoi(r->fields[3]); + int active=atoi(r->fields[4]); + int max_writers=atoi(r->fields[5]); + int writer_is_also_reader=atoi(r->fields[6]); + int max_transactions_behind=atoi(r->fields[7]); + proxy_info("Loading MySQL Group Replication info for (%d,%d,%d,%d,%s,%d,%d,%d,\"%s\")\n", writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,(active ? "on" : "off"),max_writers,writer_is_also_reader,max_transactions_behind,r->fields[8]); + rc=sqlite3_bind_int64(statement, 1, writer_hostgroup); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 2, backup_writer_hostgroup); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 3, reader_hostgroup); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 4, offline_hostgroup); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 5, active); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 6, max_writers); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 7, writer_is_also_reader); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 8, max_transactions_behind); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 9, r->fields[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + std::map::iterator it2; + it2 = Group_Replication_Info_Map.find(writer_hostgroup); + Group_Replication_Info *info=NULL; + if (it2!=Group_Replication_Info_Map.end()) { + info=it2->second; + bool changed=false; + changed=info->update(backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, (bool)writer_is_also_reader, r->fields[8]); + } else { + info=new Group_Replication_Info(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, (bool)writer_is_also_reader, r->fields[8]); + Group_Replication_Info_Map.insert(Group_Replication_Info_Map.begin(), std::pair(writer_hostgroup,info)); + } + } + sqlite3_finalize(statement); + incoming_group_replication_hostgroups=NULL; + + // remove missing ones + for (auto it3 = Group_Replication_Info_Map.begin(); it3 != Group_Replication_Info_Map.end(); ) { + Group_Replication_Info *info=it3->second; + if (info->__active==false) { + delete info; + it3 = Group_Replication_Info_Map.erase(it3); + } else { + it3++; + } + } + // TODO: it is now time to compute all the changes + pthread_mutex_unlock(&Group_Replication_Info_mutex); } SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() { @@ -951,6 +1021,19 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_replication_hostgrou return resultset; } +SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_group_replication_hostgroups() { + wrlock(); + char *error=NULL; + int cols=0; + int affected_rows=0; + SQLite3_result *resultset=NULL; + char *query=(char *)"SELECT writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment FROM mysql_group_replication_hostgroups"; + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); + mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + wrunlock(); + return resultset; +} + MyHGC * MySQL_HostGroups_Manager::MyHGC_create(unsigned int _hid) { MyHGC *myhgc=new MyHGC(_hid); return myhgc; @@ -1704,3 +1787,81 @@ unsigned long long MySQL_HostGroups_Manager::Get_Memory_Stats() { wrunlock(); return intsize; } + + +Group_Replication_Info::Group_Replication_Info(int w, int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c) { + comment=NULL; + if (c) { + comment=strdup(c); + } + writer_hostgroup=w; + backup_writer_hostgroup=b; + reader_hostgroup=r; + offline_hostgroup=o; + max_writers=mw; + max_transactions_behind=mtb; + active=_a; + writer_is_also_reader=_w; + current_num_writers=0; + current_num_backup_writers=0; + current_num_readers=0; + current_num_offline=0; + __active=true; +} + +Group_Replication_Info::~Group_Replication_Info() { + if (comment) { + free(comment); + comment=NULL; + } +} + +bool Group_Replication_Info::update(int b, int r, int o, int mw, int mtb, bool _a, bool _w, char *c) { + bool ret=false; + __active=true; + if (backup_writer_hostgroup!=b) { + backup_writer_hostgroup=b; + ret=true; + } + if (reader_hostgroup!=r) { + reader_hostgroup=r; + ret=true; + } + if (offline_hostgroup!=o) { + offline_hostgroup=o; + ret=true; + } + if (max_writers!=mw) { + max_writers=mw; + ret=true; + } + if (max_transactions_behind!=mtb) { + max_transactions_behind=mtb; + ret=true; + } + if (active!=_a) { + active=_a; + ret=true; + } + if (writer_is_also_reader!=_w) { + writer_is_also_reader=_w; + ret=true; + } + // for comment we don't change return value + if (comment) { + if (c) { + if (strcmp(comment,c)) { + free(comment); + comment=strdup(c); + } + } else { + free(comment); + comment=NULL; + } + } else { + if (c) { + comment=strdup(c); + } + } + return ret; +} diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 3b6b21cdf..533948db0 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -331,6 +331,8 @@ MySQL_Monitor::MySQL_Monitor() { My_Conn_Pool=new MySQL_Monitor_Connection_Pool(); + pthread_mutex_init(&group_replication_mutex,NULL); + shutdown=false; monitor_enabled=true; // default // create new SQLite datatabase diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 638b4d039..0fb5b7cdf 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -4308,6 +4308,49 @@ void ProxySQL_Admin::save_mysql_servers_runtime_to_database(bool _runtime) { } if(resultset) delete resultset; resultset=NULL; + + // dump mysql_group_replication_hostgroups + if (_runtime) { + query=(char *)"DELETE FROM main.runtime_mysql_group_replication_hostgroups"; + } else { + query=(char *)"DELETE FROM main.mysql_group_replication_hostgroups"; + } + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute(query); + resultset=MyHGM->dump_table_mysql_group_replication_hostgroups(); + if (resultset) { + int rc; + sqlite3_stmt *statement=NULL; + sqlite3 *mydb3=admindb->get_db(); + char *query=NULL; + if (_runtime) { + query=(char *)"INSERT INTO runtime_mysql_group_replication_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"; + } else { + query=(char *)"INSERT INTO mysql_group_replication_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"; + } + rc=sqlite3_prepare_v2(mydb3, query, -1, &statement, 0); + assert(rc==SQLITE_OK); + //proxy_info("New mysql_group_replication_hostgroups table\n"); + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + rc=sqlite3_bind_int64(statement, 1, atoi(r->fields[0])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 2, atoi(r->fields[1])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 3, atoi(r->fields[2])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 4, atoi(r->fields[3])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 5, atoi(r->fields[4])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 6, atoi(r->fields[5])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 7, atoi(r->fields[6])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 8, atoi(r->fields[7])); assert(rc==SQLITE_OK); + rc=sqlite3_bind_text(statement, 9, r->fields[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + } + sqlite3_finalize(statement); + } + if(resultset) delete resultset; + resultset=NULL; } @@ -4333,6 +4376,8 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() { int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; + SQLite3_result *resultset_replication=NULL; + SQLite3_result *resultset_group_replication=NULL; char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.mysql_servers"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); @@ -4391,16 +4436,57 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() { query=(char *)"SELECT a.* FROM mysql_replication_hostgroups a LEFT JOIN mysql_replication_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup IS NULL"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); - admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset_replication); //MyHGH->wrlock(); if (error) { proxy_error("Error on %s : %s\n", query, error); } else { - MyHGM->set_incoming_replication_hostgroups(resultset); + MyHGM->set_incoming_replication_hostgroups(resultset_replication); } + if (resultset) delete resultset; + resultset=NULL; + + // support for Group Replication, table mysql_group_replication_hostgroups + + // look for invalid combinations + query=(char *)"SELECT a.* FROM mysql_group_replication_hostgroups a JOIN mysql_group_replication_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup UNION ALL SELECT a.* FROM mysql_group_replication_hostgroups a JOIN mysql_group_replication_hostgroups b ON a.writer_hostgroup=b.backup_writer_hostgroup WHERE b.backup_writer_hostgroup UNION ALL SELECT a.* FROM mysql_group_replication_hostgroups a JOIN mysql_group_replication_hostgroups b ON a.writer_hostgroup=b.offline_hostgroup WHERE b.offline_hostgroup"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + proxy_error("Incompatible entry in mysql_group_replication_hostgroups will be ignored : ( %s , %s , %s , %s )\n", r->fields[0], r->fields[1], r->fields[2], r->fields[3]); + } + } + if (resultset) delete resultset; + resultset=NULL; + + query=(char *)"SELECT a.* FROM mysql_group_replication_hostgroups a LEFT JOIN mysql_group_replication_hostgroups b ON (a.writer_hostgroup=b.reader_hostgroup OR a.writer_hostgroup=b.backup_writer_hostgroup OR a.writer_hostgroup=b.offline_hostgroup) WHERE b.reader_hostgroup IS NULL AND b.backup_writer_hostgroup IS NULL AND b.offline_hostgroup IS NULL"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset_group_replication); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } else { + MyHGM->set_incoming_group_replication_hostgroups(resultset_group_replication); + } + + // commit all the changes MyHGM->commit(); + + // clean up if (resultset) delete resultset; + resultset=NULL; + if (resultset_replication) { + delete resultset_replication; + resultset_replication=NULL; + } + if (resultset_group_replication) { + delete resultset_replication; + resultset_group_replication=NULL; + } }