diff --git a/lib/Base_HostGroups_Manager.cpp b/lib/Base_HostGroups_Manager.cpp index 34679ad73..4a27e7888 100644 --- a/lib/Base_HostGroups_Manager.cpp +++ b/lib/Base_HostGroups_Manager.cpp @@ -1905,205 +1905,6 @@ void MySQL_HostGroups_Manager::generate_mysql_replication_hostgroups_table() { } -void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table() { - if (incoming_group_replication_hostgroups==NULL) { - return; - } - int rc; - sqlite3_stmt *statement=NULL; - //sqlite3 *mydb3=mydb->get_db(); - char *query=(char *)"INSERT INTO mysql_group_replication_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"; - //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query, -1, &statement, 0); - rc = mydb->prepare_v2(query, &statement); - ASSERT_SQLITE_OK(rc, mydb); - proxy_info("New mysql_group_replication_hostgroups table\n"); - pthread_mutex_lock(&Group_Replication_Info_mutex); - for (std::map::iterator it1 = Group_Replication_Info_Map.begin() ; it1 != Group_Replication_Info_Map.end(); ++it1) { - Group_Replication_Info *info=NULL; - info=it1->second; - info->__active=false; - } - for (std::vector::iterator it = incoming_group_replication_hostgroups->rows.begin() ; it != incoming_group_replication_hostgroups->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); - int active=atoi(r->fields[4]); - int max_writers=atoi(r->fields[5]); - int writer_is_also_reader=atoi(r->fields[6]); - int max_transactions_behind=atoi(r->fields[7]); - proxy_info("Loading MySQL Group Replication info for (%d,%d,%d,%d,%s,%d,%d,%d,\"%s\")\n", writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,(active ? "on" : "off"),max_writers,writer_is_also_reader,max_transactions_behind,r->fields[8]); - rc=(*proxy_sqlite3_bind_int64)(statement, 1, writer_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 2, backup_writer_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 3, reader_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 4, offline_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 5, active); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 6, max_writers); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 7, writer_is_also_reader); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 8, max_transactions_behind); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_text)(statement, 9, r->fields[8], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); - - SAFE_SQLITE3_STEP2(statement); - rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mydb); - std::map::iterator it2; - it2 = Group_Replication_Info_Map.find(writer_hostgroup); - Group_Replication_Info *info=NULL; - if (it2!=Group_Replication_Info_Map.end()) { - info=it2->second; - bool changed=false; - changed=info->update(backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, writer_is_also_reader, r->fields[8]); - if (changed) { - //info->need_converge=true; - } - } else { - info=new Group_Replication_Info(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, writer_is_also_reader, r->fields[8]); - //info->need_converge=true; - Group_Replication_Info_Map.insert(Group_Replication_Info_Map.begin(), std::pair(writer_hostgroup,info)); - } - } - (*proxy_sqlite3_finalize)(statement); - delete incoming_group_replication_hostgroups; - incoming_group_replication_hostgroups=NULL; - - // remove missing ones - for (auto it3 = Group_Replication_Info_Map.begin(); it3 != Group_Replication_Info_Map.end(); ) { - Group_Replication_Info *info=it3->second; - if (info->__active==false) { - delete info; - it3 = Group_Replication_Info_Map.erase(it3); - } else { - it3++; - } - } - // TODO: it is now time to compute all the changes - - - // it is now time to build a new structure in Monitor - generate_mysql_group_replication_hostgroups_monitor_resultset(); - pthread_mutex_unlock(&Group_Replication_Info_mutex); -} - -void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_monitor_resultset() { - pthread_mutex_lock(&GloMyMon->group_replication_mutex); - { - char *error=NULL; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl , writer_is_also_reader , max_transactions_behind FROM " - " mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR " - " hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup WHERE active=1 GROUP BY hostname, port"; - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (resultset) { - if (GloMyMon->Group_Replication_Hosts_resultset) { - delete GloMyMon->Group_Replication_Hosts_resultset; - } - GloMyMon->Group_Replication_Hosts_resultset=resultset; - } - } - pthread_mutex_unlock(&GloMyMon->group_replication_mutex); -} - -void MySQL_HostGroups_Manager::generate_mysql_galera_hostgroups_table() { - if (incoming_galera_hostgroups==NULL) { - return; - } - int rc; - sqlite3_stmt *statement=NULL; - //sqlite3 *mydb3=mydb->get_db(); - char *query=(char *)"INSERT INTO mysql_galera_hostgroups(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,writer_is_also_reader,max_transactions_behind,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)"; - //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query, -1, &statement, 0); - rc = mydb->prepare_v2(query, &statement); - ASSERT_SQLITE_OK(rc, mydb); - proxy_info("New mysql_galera_hostgroups table\n"); - pthread_mutex_lock(&Galera_Info_mutex); - for (std::map::iterator it1 = Galera_Info_Map.begin() ; it1 != Galera_Info_Map.end(); ++it1) { - Galera_Info *info=NULL; - info=it1->second; - info->__active=false; - } - for (std::vector::iterator it = incoming_galera_hostgroups->rows.begin() ; it != incoming_galera_hostgroups->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); - int active=atoi(r->fields[4]); - int max_writers=atoi(r->fields[5]); - int writer_is_also_reader=atoi(r->fields[6]); - int max_transactions_behind=atoi(r->fields[7]); - proxy_info("Loading Galera info for (%d,%d,%d,%d,%s,%d,%d,%d,\"%s\")\n", writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,(active ? "on" : "off"),max_writers,writer_is_also_reader,max_transactions_behind,r->fields[8]); - rc=(*proxy_sqlite3_bind_int64)(statement, 1, writer_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 2, backup_writer_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 3, reader_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 4, offline_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 5, active); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 6, max_writers); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 7, writer_is_also_reader); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 8, max_transactions_behind); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_text)(statement, 9, r->fields[8], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); - - SAFE_SQLITE3_STEP2(statement); - rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mydb); - std::map::iterator it2; - it2 = Galera_Info_Map.find(writer_hostgroup); - Galera_Info *info=NULL; - if (it2!=Galera_Info_Map.end()) { - info=it2->second; - bool changed=false; - changed=info->update(backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, writer_is_also_reader, r->fields[8]); - if (changed) { - //info->need_converge=true; - } - } else { - info=new Galera_Info(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup, max_writers, max_transactions_behind, (bool)active, writer_is_also_reader, r->fields[8]); - //info->need_converge=true; - Galera_Info_Map.insert(Galera_Info_Map.begin(), std::pair(writer_hostgroup,info)); - } - } - (*proxy_sqlite3_finalize)(statement); - delete incoming_galera_hostgroups; - incoming_galera_hostgroups=NULL; - - // remove missing ones - for (auto it3 = Galera_Info_Map.begin(); it3 != Galera_Info_Map.end(); ) { - Galera_Info *info=it3->second; - if (info->__active==false) { - delete info; - it3 = Galera_Info_Map.erase(it3); - } else { - it3++; - } - } - // TODO: it is now time to compute all the changes - - - // it is now time to build a new structure in Monitor - pthread_mutex_lock(&GloMyMon->galera_mutex); - { - char *error=NULL; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl , writer_is_also_reader , max_transactions_behind " - " FROM mysql_servers JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR " - " hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup WHERE active=1 GROUP BY writer_hostgroup, hostname, port"; - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (resultset) { - if (GloMyMon->Galera_Hosts_resultset) { - delete GloMyMon->Galera_Hosts_resultset; - } - GloMyMon->Galera_Hosts_resultset=resultset; - } - } - pthread_mutex_unlock(&GloMyMon->galera_mutex); - - pthread_mutex_unlock(&Galera_Info_mutex); -} - void MySQL_HostGroups_Manager::update_table_mysql_servers_for_monitor(bool lock) { if (lock) { wrlock(); @@ -2758,127 +2559,6 @@ void MySQL_HostGroups_Manager::replication_lag_action(const std::listmysrvs->cnt(); j++) { - MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); - proxy_debug( - PROXY_DEBUG_MONITOR, 6, "Server 'MySrvC' - address: %s, port: %d, status: %d\n", mysrvc->address, - mysrvc->port, (int)mysrvc->get_status() - ); - - if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { - - if (enable == true) { - if (mysrvc->get_status() == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG || mysrvc->get_status() == MYSQL_SERVER_STATUS_SHUNNED) { - mysrvc->set_status(MYSQL_SERVER_STATUS_ONLINE); - proxy_info("Re-enabling server %u:%s:%d from replication lag\n", myhgc->hid, address, port); - } - } else { - if (mysrvc->get_status()==MYSQL_SERVER_STATUS_ONLINE) { - proxy_warning("Shunning 'soft' server %u:%s:%d with replication lag, count number: %d\n", myhgc->hid, address, port, lag_count); - mysrvc->set_status(MYSQL_SERVER_STATUS_SHUNNED); - } else { - if (mysrvc->get_status()==MYSQL_SERVER_STATUS_SHUNNED) { - if (lag_count >= ( mysql_thread___monitor_groupreplication_max_transactions_behind_count * 2 )) { - proxy_warning("Shunning 'hard' server %u:%s:%d with replication lag, count number: %d\n", myhgc->hid, address, port, lag_count); - mysrvc->set_status(MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG); - } - } - } - } - } - } -} - -void MySQL_HostGroups_Manager::group_replication_lag_action( - int _hid, char *address, unsigned int port, int lag_counts, bool read_only, bool enable -) { - GloAdmin->mysql_servers_wrlock(); - wrlock(); - - int reader_hostgroup = 0; - // bool writer_is_also_reader = false; - - // Get the reader_hostgroup for the supplied writter hostgroup - const std::string t_reader_hostgroup_query { - "SELECT reader_hostgroup,writer_is_also_reader FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" - }; - std::string reader_hostgroup_query {}; - string_format(t_reader_hostgroup_query, reader_hostgroup_query, _hid); - - int cols=0; - char *error=NULL; - int affected_rows=0; - SQLite3_result* rhid_res=NULL; - SQLite3_row* rhid_row=nullptr; - - mydb->execute_statement( - reader_hostgroup_query.c_str(), &error , &cols , &affected_rows , &rhid_res - ); - - // If the server isn't present in the supplied hostgroup, there is nothing to do. - if (rhid_res->rows.empty() || rhid_res->rows[0]->get_size() == 0) { - goto __exit_replication_lag_action; - } - - rhid_row = rhid_res->rows[0]; - reader_hostgroup = atoi(rhid_row->fields[0]); - - { - MyHGC* myhgc = nullptr; - - if ( - mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 0 || - mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 2 || - enable - ) { - if (read_only == false) { - myhgc = MyHGM->MyHGC_find(_hid); - group_replication_lag_action_set_server_status(myhgc, address, port, lag_counts, enable); - } - } - - if ( - mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 1 || - mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 2 || - enable - ) { - myhgc = MyHGM->MyHGC_find(reader_hostgroup); - group_replication_lag_action_set_server_status(myhgc, address, port, lag_counts, enable); - } - } - - if (rhid_res != nullptr) { - delete rhid_res; - rhid_res = nullptr; - } - -__exit_replication_lag_action: - - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); -} - void MySQL_HostGroups_Manager::drop_all_idle_connections() { // NOTE: the caller should hold wrlock int i, j; @@ -4326,1484 +4006,51 @@ bool Group_Replication_Info::update(int b, int r, int o, int mw, int mtb, bool _ return ret; } -void MySQL_HostGroups_Manager::update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *_error) { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+32); - sprintf(query,q,_hostname,_port); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (error) { - free(error); - error=NULL; - } - free(query); - if (resultset) { // we lock only if needed - if (resultset->rows_count) { - proxy_warning("Group Replication: setting host %s:%d offline because: %s\n", _hostname, _port, _error); - GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - // NOTE: Only updated the servers that have belong to the same cluster. - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" - " SELECT %d UNION ALL" - " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" - " SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" - ")"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); - mydb->execute(query); - // NOTE: Only delete the servers that have belong to the same cluster. - q=(char*)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" - " SELECT %d UNION ALL" - " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" - " SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" - ")"; - sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); - mydb->execute(query); - //free(query); - // q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; - // sprintf(query,q,_hostname,_port,_writer_hostgroup); - q=(char *)"UPDATE mysql_servers_incoming SET status=(CASE " - " (SELECT status FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND" - " hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)) WHEN 2 THEN 2 ELSE 0 END)" - " WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; - sprintf(query,q,_hostname,_port,_writer_hostgroup,_hostname,_port,_writer_hostgroup); - mydb->execute(query); - //free(query); - converge_group_replication_config(_writer_hostgroup); - commit(); - wrlock(); - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; - sprintf(query,q,_writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup); - mydb->execute(query); - generate_mysql_servers_table(&writer_hostgroup); - generate_mysql_servers_table(&backup_writer_hostgroup); - generate_mysql_servers_table(&reader_hostgroup); - generate_mysql_servers_table(&offline_hostgroup); - } - } - delete resultset2; - resultset2=NULL; - } - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - free(query); - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } -} - -/** - * @brief Set the server specified by the supplied 'hostname:port' and hostgroup as 'reader'. - * @details Tries to find the server in any hostgroup other than in the desired on, in case of not - * finding it: - * 1. Move the server to the reader hostgroup, preserving status if required. - * 2. Deletes the server from other hostgroups in the cluster (determined by 'writer_hostgroup'). - * 3. Converge the current configuration and rebuild the hostgroups. - * - * PRESERVE-OFFLINE_SOFT: When moving the target server, always preserve the OFFLINE_SOFT state. - * - * @param _hostname Hostname of the target server to be set as writer. - * @param _port Port of the target server to be set as writer. - * @param _writer_hostgroup 'writer_hostgroup' of the cluster in which server is going to be placed as writer. - * @param _error Reason why the server has beens et as 'read_only'. - */ -void MySQL_HostGroups_Manager::update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *_error) { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+32); - sprintf(query,q,_hostname,_port); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - if (error) { - free(error); - error=NULL; - } - free(query); - if (resultset) { // we lock only if needed - if (resultset->rows_count) { - proxy_warning("Group Replication: setting host %s:%d (part of cluster with writer_hostgroup=%d) in read_only because: %s\n", _hostname, _port, _writer_hostgroup, _error); - GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - // NOTE: Only updated the servers that have belong to the same cluster. - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" - " SELECT %d UNION ALL" - " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" - " SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" - ")"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); - mydb->execute(query); - // NOTE: Only delete the servers that have belong to the same cluster. - q=(char*)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN (" - " SELECT %d UNION ALL" - " SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL" - " SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d" - ")"; - sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup); - mydb->execute(query); - //free(query); - // NOTE: In case of the server being 'OFFLINE_SOFT' we preserve this status. Otherwise we set the server as 'ONLINE'. - q=(char *)"UPDATE mysql_servers_incoming SET status=(CASE " - " (SELECT status FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND" - " hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)) WHEN 2 THEN 2 ELSE 0 END)" - " WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)"; - sprintf(query,q,_hostname,_port,_writer_hostgroup,_hostname,_port,_writer_hostgroup); - mydb->execute(query); - //free(query); - converge_group_replication_config(_writer_hostgroup); - commit(); - wrlock(); - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; - sprintf(query,q,writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup); - mydb->execute(query); - generate_mysql_servers_table(&writer_hostgroup); - generate_mysql_servers_table(&backup_writer_hostgroup); - generate_mysql_servers_table(&reader_hostgroup); - generate_mysql_servers_table(&offline_hostgroup); - } - } - delete resultset2; - resultset2=NULL; - } - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - free(query); +class MySQL_Errors_stats { + public: + int hostgroup; + char *hostname; + int port; + char *username; + char *client_address; + char *schemaname; + int err_no; + char *last_error; + time_t first_seen; + time_t last_seen; + unsigned long long count_star; + MySQL_Errors_stats(int hostgroup_, char *hostname_, int port_, char *username_, char *address_, char *schemaname_, int err_no_, char *last_error_, time_t tn) { + hostgroup = hostgroup_; + if (hostname_) { + hostname = strdup(hostname_); + } else { + hostname = strdup((char *)""); } - } - if (resultset) { - delete resultset; - resultset=NULL; - } -} - -/** - * @brief Set the server specified by the supplied 'hostname:port' and hostgroup as 'writer'. - * @details Tries to find the server as an already present 'writer' in the desired hostgroup, in case of not - * finding it: - * 1. Move the server to target writer, preserving status if required. - * 2. Deletes the server from other hostgroups in the cluster (determined by 'writer_hostgroup'). - * 3. If 'writer_is_also_reader', place the server also in the 'reader_hostgroup'. - * 4. Converge the current configuration and rebuild the hostgroups. - * - * If the server is already found, no action should be taken, and no reconfiguration triggered. - * - * FOUND_AS_SHUNNED: If writer is found as SHUNNED is considered as a found writer, since we don't take - * reconfiguration actions based on SHUNNED state. - * - * FOUND_AS_BACKUP_WRITER: If server is found in the 'backup_writer_hostgroup' is because the server has been - * previously considered as a writer, and due to an exceeding number of writers, the server - * ended in the 'backup_writer_hostgroup'. If the server is to be removed from this hostgroup - * and placed in other one, is something that should be done when converging to the final state - * after other actions 'set_offline|set_read_only' (via 'converge_group_replication_config'), - * otherwise, we will continously trying to place the already WRITER server as WRITER and - * constantly retriggering a servers reconfiguration everytime the available number of writers - * exceeds 'max_writers'. - * - * PRESERVE-OFFLINE_SOFT: When server is not found as writer, but is found as 'OFFLINE_SOFT' this state is - * present when setting this server in the 'writer_hostgroup'. - * - * @param _hostname Hostname of the target server to be set as writer. - * @param _port Port of the target server to be set as writer. - * @param _writer_hostgroup 'writer_hostgroup' of the cluster in which server is going to be placed as writer. - */ -void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup) { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - q=(char *)"SELECT hostgroup_id, status FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+32); - sprintf(query,q,_hostname,_port); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - if (error) { - free(error); - error=NULL; - } - free(query); - - int writer_is_also_reader=0; - bool found_writer=false; - bool found_reader=false; - int read_HG=-1; - int offline_HG=-1; - int backup_writer_HG=-1; - bool need_converge=false; - int status=0; - bool offline_soft_found=false; - - if (resultset) { - // let's get info about this cluster - pthread_mutex_lock(&Group_Replication_Info_mutex); - std::map::iterator it2; - it2 = Group_Replication_Info_Map.find(_writer_hostgroup); - Group_Replication_Info *info=NULL; - if (it2!=Group_Replication_Info_Map.end()) { - info=it2->second; - writer_is_also_reader=info->writer_is_also_reader; - read_HG=info->reader_hostgroup; - offline_HG=info->offline_hostgroup; - backup_writer_HG=info->backup_writer_hostgroup; - need_converge=info->need_converge; - info->need_converge=false; + port = port_; + if (username_) { + username = strdup(username_); + } else { + username = strdup((char *)""); } - pthread_mutex_unlock(&Group_Replication_Info_mutex); - - if (resultset->rows_count) { - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - offline_soft_found = atoi(r->fields[1]) == 2 ? true : false; - - if (hostgroup==_writer_hostgroup) { - status = atoi(r->fields[1]); - if (status == 0 || status == 2) { - found_writer=true; - } - // NOTE: 'SHUNNED' state is possible if server has reached 'myhgm.mysql_servers' table. - // This can occur due to a reconfiguration that triggered a call to 'generate_mysql_servers_table' - // (e.g. selecting from 'runtime_mysql_servers' would have this effect). In this case, we - // don't want to trigger any reconfiguration as the server is indeed found and we don't perform - // reconfigurations based on 'SHUNNED' state. - if (status == 1) { - found_writer=true; - } - } - if (read_HG>=0) { - if (hostgroup==read_HG) { - found_reader=true; - } - } - // NOTE: See 'FOUND_AS_BACKUP_WRITER' on function documentation. - if (hostgroup == backup_writer_HG) { - found_writer = true; - } - } + if (address_) { + client_address = strdup(address_); + } else { + client_address = strdup((char *)""); } - - // NOTE: In case of a writer not being found but a 'OFFLINE_SOFT' status - // is found in a hostgroup, 'OFFLINE_SOFT' status should be preserved. - if (found_writer == false) { - if (offline_soft_found) { - status = 2; - } + if (schemaname_) { + schemaname = strdup(schemaname_); + } else { + schemaname = strdup((char *)""); } - if (need_converge==false) { - if (found_writer) { // maybe no-op - if ( - (writer_is_also_reader==0 && found_reader==false) - || - (writer_is_also_reader > 0 && found_reader==true) - ) { // either both true or both false - delete resultset; - resultset=NULL; - } - } + err_no = err_no_; + if (last_error_) { + last_error = strdup(last_error_); + } else { + last_error = strdup((char *)""); } - } - - if (resultset) { // if we reach there, there is some action to perform - if (resultset->rows_count) { - need_converge=false; - proxy_warning("Group Replication: setting host %s:%d as writer\n", _hostname, _port); - - GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - // NOTE: Only updated the servers that have belong to the same cluster. - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id IN (%d, %d, %d)"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+256); - sprintf(query,q,_writer_hostgroup,_hostname,_port,backup_writer_HG,read_HG,offline_HG); - mydb->execute(query); - // NOTE: Only delete the servers that have belong to the same cluster. - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN (%d, %d, %d)"; - sprintf(query,q,_hostname,_port,backup_writer_HG,read_HG,offline_HG); - mydb->execute(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=%d WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; - // NOTE: In case of the server being 'OFFLINE_SOFT' we preserve this status. Otherwise - // we set the server as 'ONLINE'. - sprintf(query, q, (status == 2 ? 2 : 0 ), _hostname, _port, _writer_hostgroup); - mydb->execute(query); - //free(query); - if (writer_is_also_reader && read_HG>=0) { - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming (hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - free(query); - query=(char *)malloc(strlen(q)+strlen(_hostname)+256); - sprintf(query,q,read_HG,_writer_hostgroup,_hostname,_port); - mydb->execute(query); - } - converge_group_replication_config(_writer_hostgroup); - commit(); - wrlock(); - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, max_writers, writer_is_also_reader FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); -// int max_writers=atoi(r->fields[4]); -// int int_writer_is_also_reader=atoi(r->fields[5]); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; - sprintf(query,q,_writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup); - mydb->execute(query); - generate_mysql_servers_table(&writer_hostgroup); - generate_mysql_servers_table(&backup_writer_hostgroup); - generate_mysql_servers_table(&reader_hostgroup); - generate_mysql_servers_table(&offline_hostgroup); - } - } - delete resultset2; - resultset2=NULL; - } - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - free(query); - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } -} - -/** - * @brief Completes the tuning of 'mysql_servers_incoming', dealing with final writers placing. - * @details This functions assumes this pre-conditions: - * - GloAdmin->mysql_servers_wrlock() was already called. - * - mysql_servers_incoming has already entries copied from mysql_servers and ready to be loaded. - * - * Checks that the following conditions are met over the supplied hostgroup: - * 1. Number of placed writers exceeds the max number of writers. - * 2. Not enough writers are placed in the hostgroup, place 'backup_writers' if any. - * 3. In case 'writer_is_also_reader' is '2', place *ONLY* 'backup_writers' as readers. - * - * NOTE: Right now we consider 'SHUNNED' and 'ONLINE' writers equivalent in terms of server placement. This - * means that when counting writers for either placement or removal from 'backup_writer_hostgroup', we - * require taking into account 'SHUNNED' and 'ONLINE' writers. - * - * @param _writer_hostgroup Target hostgroup on which perform the final server placement. - */ -void MySQL_HostGroups_Manager::converge_group_replication_config(int _writer_hostgroup) { - - // we first gather info about the cluster - pthread_mutex_lock(&Group_Replication_Info_mutex); - std::map::iterator it2; - it2 = Group_Replication_Info_Map.find(_writer_hostgroup); - Group_Replication_Info *info=NULL; - if (it2!=Group_Replication_Info_Map.end()) { - info=it2->second; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - // We are required to consider both 'ONLINE' and 'SHUNNED' servers for 'backup_writer_hostgroup' - // placement since they are equivalent for server placement. Check 'NOTE' at function @details. - q=(char *)"SELECT hostgroup_id,hostname,port FROM mysql_servers_incoming WHERE status=0 OR status=1 AND hostgroup_id IN (%d, %d, %d, %d) ORDER BY weight DESC, hostname DESC, port DESC"; - query=(char *)malloc(strlen(q)+256); - sprintf(query, q, info->writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup, info->offline_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - free(query); - if (resultset) { - if (resultset->rows_count) { - int num_writers=0; - int num_backup_writers=0; - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->writer_hostgroup) { - num_writers++; - } else { - if (hostgroup==info->backup_writer_hostgroup) { - num_backup_writers++; - } - } - } - if (num_writers > info->max_writers) { // there are more writers than allowed - int to_move=num_writers-info->max_writers; - if (GloMTH->variables.hostgroup_manager_verbose > 1) { - proxy_info("Group replication: max_writers=%d , moving %d nodes from writer HG %d to backup HG %d\n", info->max_writers, to_move, info->writer_hostgroup, info->backup_writer_hostgroup); - } - for (std::vector::reverse_iterator it = resultset->rows.rbegin() ; it != resultset->rows.rend(); ++it) { - SQLite3_row *r=*it; - if (to_move) { - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->writer_hostgroup) { - q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET status=0, hostgroup_id=%d WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(r->fields[1])+128); - sprintf(query,q,info->backup_writer_hostgroup,info->writer_hostgroup,r->fields[1],atoi(r->fields[2])); - mydb->execute(query); - free(query); - to_move--; - } - } - } - } else { - if (num_writers < info->max_writers && num_backup_writers) { // or way too low writer - int to_move= ( (info->max_writers - num_writers) < num_backup_writers ? (info->max_writers - num_writers) : num_backup_writers); - if (GloMTH->variables.hostgroup_manager_verbose) { - proxy_info("Group replication: max_writers=%d , moving %d nodes from backup HG %d to writer HG %d\n", info->max_writers, to_move, info->backup_writer_hostgroup, info->writer_hostgroup); - } - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - if (to_move) { - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->backup_writer_hostgroup) { - q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET status=0, hostgroup_id=%d WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(r->fields[1])+128); - sprintf(query,q,info->writer_hostgroup,info->backup_writer_hostgroup,r->fields[1],atoi(r->fields[2])); - mydb->execute(query); - free(query); - to_move--; - } - } - } - } - } - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } - if (info->writer_is_also_reader==2) { - q=(char *)"SELECT hostgroup_id,hostname,port FROM mysql_servers_incoming WHERE status=0 AND hostgroup_id IN (%d, %d, %d, %d) ORDER BY weight DESC, hostname DESC, port DESC"; - query=(char *)malloc(strlen(q)+256); - sprintf(query, q, info->writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup, info->offline_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - free(query); - if (resultset) { - if (resultset->rows_count) { - int num_writers=0; - int num_backup_writers=0; - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->writer_hostgroup) { - num_writers++; - } else { - if (hostgroup==info->backup_writer_hostgroup) { - num_backup_writers++; - } - } - } - if (num_backup_writers) { // there are backup writers, only these will be used as readers - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostgroup_id=%d"; - query=(char *)malloc(strlen(q) + 128); - sprintf(query,q, info->reader_hostgroup); - mydb->execute(query); - free(query); - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming (hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE hostgroup_id=%d"; - query=(char *)malloc(strlen(q) + 128); - sprintf(query,q, info->reader_hostgroup, info->backup_writer_hostgroup); - mydb->execute(query); - free(query); - } - } - delete resultset; - resultset=NULL; - } - } - } else { - // we couldn't find the cluster, exits - } - pthread_mutex_unlock(&Group_Replication_Info_mutex); -} - -void MySQL_HostGroups_Manager::update_group_replication_add_autodiscovered( - const string& _host, int _port, int _wr_hg -) { - pthread_mutex_lock(&Group_Replication_Info_mutex); - const auto gr_info_map_it = this->Group_Replication_Info_Map.find(_wr_hg); - int32_t reader_hg = -1; - - if (gr_info_map_it == Group_Replication_Info_Map.end()) { - assert(0); - } else { - reader_hg = gr_info_map_it->second->reader_hostgroup; - } - pthread_mutex_unlock(&Group_Replication_Info_mutex); - - wrlock(); - - MyHGC *myhgc = MyHGC_lookup(reader_hg); - bool srv_found = false; - bool srv_found_offline = false; - - for (uint32_t j = 0; j < myhgc->mysrvs->cnt(); j++) { - MySrvC* mysrvc = static_cast(myhgc->mysrvs->servers->index(j)); - - // If the server is found as 'OFFLINE_HARD' we reset the 'MySrvC' values corresponding with the - // 'servers_defaults' (as in a new 'MySrvC' creation). We then later update these values with the - // 'servers_defaults' attributes from its corresponding 'MyHGC'. This way we ensure uniform behavior - // of new servers, and 'OFFLINE_HARD' ones when a user update 'servers_defaults' values, and reloads - // the servers to runtime. - if (strcmp(mysrvc->address,_host.c_str())==0 && mysrvc->port==_port) { - srv_found = true; - if (mysrvc->get_status() == MYSQL_SERVER_STATUS_OFFLINE_HARD) { - reset_hg_attrs_server_defaults(mysrvc); - update_hg_attrs_server_defaults(mysrvc, mysrvc->myhgc); - proxy_info( - "Found healthy previously discovered GR node %s:%d as 'OFFLINE_HARD', setting back as 'ONLINE' with:" - " hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", - _host.c_str(), _port, reader_hg, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl - ); - mysrvc->set_status(MYSQL_SERVER_STATUS_ONLINE); - srv_found_offline = true; - } - } - } - - if (srv_found == false) { - MySrvC* mysrvc = new MySrvC( - const_cast(_host.c_str()), _port, 0, -1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast("") - ); - add(mysrvc, reader_hg); - proxy_info( - "Adding new discovered GR node %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", - _host.c_str(), _port, reader_hg, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl - ); - } - - if (srv_found == false || srv_found_offline) { - purge_mysql_servers_table(); - - mydb->execute("DELETE FROM mysql_servers"); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); - - generate_mysql_servers_table(); - - // Update the global checksums after 'mysql_servers' regeneration - { - unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; - string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; - save_runtime_mysql_servers(resultset.release()); - proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str()); - - pthread_mutex_lock(&GloVars.checksum_mutex); - update_glovars_mysql_servers_checksum(mysrvs_checksum); - pthread_mutex_unlock(&GloVars.checksum_mutex); - } - - update_table_mysql_servers_for_monitor(false); - generate_mysql_group_replication_hostgroups_monitor_resultset(); - } - - wrunlock(); -} - -Galera_Info::Galera_Info(int w, int b, int r, int o, int mw, int mtb, bool _a, int _w, char *c) { - comment=NULL; - if (c) { - comment=strdup(c); - } - writer_hostgroup=w; - backup_writer_hostgroup=b; - reader_hostgroup=r; - offline_hostgroup=o; - max_writers=mw; - max_transactions_behind=mtb; - active=_a; - writer_is_also_reader=_w; - current_num_writers=0; - current_num_backup_writers=0; - current_num_readers=0; - current_num_offline=0; - __active=true; - need_converge=true; -} - -Galera_Info::~Galera_Info() { - if (comment) { - free(comment); - comment=NULL; - } -} - -bool Galera_Info::update(int b, int r, int o, int mw, int mtb, bool _a, int _w, char *c) { - bool ret=false; - __active=true; - if (backup_writer_hostgroup!=b) { - backup_writer_hostgroup=b; - ret=true; - } - if (reader_hostgroup!=r) { - reader_hostgroup=r; - ret=true; - } - if (offline_hostgroup!=o) { - offline_hostgroup=o; - ret=true; - } - if (max_writers!=mw) { - max_writers=mw; - ret=true; - } - if (max_transactions_behind!=mtb) { - max_transactions_behind=mtb; - ret=true; - } - if (active!=_a) { - active=_a; - ret=true; - } - if (writer_is_also_reader!=_w) { - writer_is_also_reader=_w; - ret=true; - } - // for comment we don't change return value - if (comment) { - if (c) { - if (strcmp(comment,c)) { - free(comment); - comment=strdup(c); - } - } else { - free(comment); - comment=NULL; - } - } else { - if (c) { - comment=strdup(c); - } - } - return ret; -} - -static void print_galera_nodes_last_status() { - std::unique_ptr result { new SQLite3_result(13) }; - - result->add_column_definition(SQLITE_TEXT,"hostname"); - result->add_column_definition(SQLITE_TEXT,"port"); - result->add_column_definition(SQLITE_TEXT,"start_time"); - result->add_column_definition(SQLITE_TEXT,"check_time"); - result->add_column_definition(SQLITE_TEXT,"primary_partition"); - result->add_column_definition(SQLITE_TEXT,"read_only"); - result->add_column_definition(SQLITE_TEXT,"wsrep_local_recv_queue"); - result->add_column_definition(SQLITE_TEXT,"wsrep_local_state"); - result->add_column_definition(SQLITE_TEXT,"wsrep_desync"); - result->add_column_definition(SQLITE_TEXT,"wsrep_reject_queries"); - result->add_column_definition(SQLITE_TEXT,"wsrep_sst_donor_rejects_queries"); - result->add_column_definition(SQLITE_TEXT,"pxc_maint_mode"); - result->add_column_definition(SQLITE_TEXT,"error"); - - pthread_mutex_lock(&GloMyMon->galera_mutex); - - for (auto node_it = GloMyMon->Galera_Hosts_Map.begin(); node_it != GloMyMon->Galera_Hosts_Map.end(); node_it++) { - std::string s { node_it->first }; - std::size_t colon_pos { s.find_last_of(":") }; - std::string host { s.substr(0, colon_pos) }; - std::string port { s.substr(colon_pos + 1) }; - Galera_monitor_node* node { node_it->second }; - - if (node->last_entry()->start_time) { - std::string error { "" }; - - if (node->last_entry()->error) { - error = std::string { node->last_entry()->error }; - } - - result->add_row( - host.c_str(), - port.c_str(), - std::to_string(node->last_entry()->start_time).c_str(), - std::to_string(node->last_entry()->check_time).c_str(), - std::to_string(node->last_entry()->primary_partition).c_str(), - std::to_string(node->last_entry()->read_only).c_str(), - std::to_string(node->last_entry()->wsrep_local_recv_queue).c_str(), - std::to_string(node->last_entry()->wsrep_local_state).c_str(), - std::to_string(node->last_entry()->wsrep_desync).c_str(), - std::to_string(node->last_entry()->wsrep_reject_queries).c_str(), - std::to_string(node->last_entry()->wsrep_sst_donor_rejects_queries).c_str(), - std::to_string(node->last_entry()->pxc_maint_mode).c_str(), - error.c_str(), - NULL - ); - } - } - - pthread_mutex_unlock(&GloMyMon->galera_mutex); - - proxy_info("Galera: Node status changed by ProxySQL, dumping all galera nodes status:\n"); - result->dump_to_stderr(); -} - -void MySQL_HostGroups_Manager::update_galera_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *_error, bool soft) { - bool set_offline = false; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s' AND port=%d AND status=0"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+1024); // increased this buffer as it is used for other queries too - sprintf(query,q,_hostname,_port); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); - if (error) { -// free(error); - error=NULL; - } - //free(query); - GloAdmin->mysql_servers_wrlock(); - if (resultset) { // we lock only if needed - if (resultset->rows_count) { - // the server was found. It needs to be set offline - set_offline = true; - } else { // the server is already offline, but we check if needs to be taken back online because there are no other writers - SQLite3_result *numw_result = NULL; - // we search for writers - q=(char *)"SELECT 1 FROM mysql_servers WHERE hostgroup_id=%d AND status=0"; - //query=(char *)malloc(strlen(q) + (sizeof(_writer_hostgroup) * 8 + 1)); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error , &cols , &affected_rows , &numw_result); - //free(query); - if (numw_result) { - if (numw_result->rows_count == 0) { // we have no writers - set_offline = true; - } - delete numw_result; - } - } - - auto info = get_galera_node_info(_writer_hostgroup); - if (set_offline && info) { - mydb->execute("DELETE FROM mysql_servers_incoming"); - mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - if (soft==false) { // default behavior - q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET hostgroup_id=%d, status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+128); - sprintf(query,q,info->offline_hostgroup,_hostname,_port,_writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup); - mydb->execute(query); - //free(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup); - mydb->execute(query); - //free(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup); - mydb->execute(query); - //free(query); - } else { - q=(char *)"INSERT OR REPLACE INTO mysql_servers_incoming SELECT %d, hostname, port, gtid_port, weight, 0, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d)"; - sprintf(query,q,info->offline_hostgroup,_hostname,_port,_writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup); - mydb->execute(query); - // we just delete the servers from the 'backup_writer_hostgroup', to keep servers from reader hostgroup, - // so they can be 'SHUNNED'. See #3182 - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; - sprintf(query,q,_hostname,_port, info->backup_writer_hostgroup); - mydb->execute(query); - // we update the servers from 'mysql_servers_incoming' to be SHUNNED in both, 'writer_hostgroup' and 'reader_hostgroup' - // this way we prevent it's removal from the hostgroup, and the closing of its current connections. See #3182 - q=(char *)"UPDATE mysql_servers_incoming SET status=1 WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d)"; - sprintf(query,q,_hostname,_port,_writer_hostgroup,info->reader_hostgroup); - mydb->execute(query); - } - converge_galera_config(_writer_hostgroup); - uint64_t checksum_current = 0; - uint64_t checksum_incoming = 0; - { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset_servers=NULL; - char *query_local=NULL; - char *q1 = NULL; - char *q2 = NULL; - char *error=NULL; - q1 = (char *)"SELECT DISTINCT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, mysql_servers.comment FROM mysql_servers JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; - q2 = (char *)"SELECT DISTINCT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, mysql_servers_incoming.comment FROM mysql_servers_incoming JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; - query_local = (char *)malloc(strlen(q2)+128); - sprintf(query_local,q1,_writer_hostgroup); - mydb->execute_statement(query_local, &error , &cols , &affected_rows , &resultset_servers); - if (error == NULL) { - if (resultset_servers) { - checksum_current = resultset_servers->raw_checksum(); - } - } - if (resultset_servers) { - delete resultset_servers; - resultset_servers = NULL; - } - sprintf(query_local,q2,_writer_hostgroup); - mydb->execute_statement(query_local, &error , &cols , &affected_rows , &resultset_servers); - if (error == NULL) { - if (resultset_servers) { - checksum_incoming = resultset_servers->raw_checksum(); - } - } - if (resultset_servers) { - delete resultset_servers; - resultset_servers = NULL; - } - free(query_local); - } - if (checksum_incoming!=checksum_current) { - proxy_warning("Galera: setting host %s:%d offline because: %s\n", _hostname, _port, _error); - print_galera_nodes_last_status(); - commit(); - wrlock(); - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_galera_hostgroups WHERE writer_hostgroup=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; - sprintf(query,q,_writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup); - mydb->execute(query); - generate_mysql_servers_table(&writer_hostgroup); - generate_mysql_servers_table(&backup_writer_hostgroup); - generate_mysql_servers_table(&reader_hostgroup); - generate_mysql_servers_table(&offline_hostgroup); - } - } - delete resultset2; - resultset2=NULL; - } - wrunlock(); - } else { - proxy_warning("Galera: skipping setting offline node %s:%d from hostgroup %d because won't change the list of ONLINE nodes\n", _hostname, _port, _writer_hostgroup); - print_galera_nodes_last_status(); - } - } - } - free(query); - GloAdmin->mysql_servers_wrunlock(); - if (resultset) { - delete resultset; - resultset=NULL; - } -} - -void MySQL_HostGroups_Manager::update_galera_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *_error) { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+32); - sprintf(query,q,_hostname,_port); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - if (error) { - free(error); - error=NULL; - } - free(query); - - auto info = get_galera_node_info(_writer_hostgroup); - if (resultset && info) { // we lock only if needed - if (resultset->rows_count) { - proxy_warning("Galera: setting host %s:%d (part of cluster with writer_hostgroup=%d) in read_only because: %s\n", _hostname, _port, _writer_hostgroup, _error); - print_galera_nodes_last_status(); - GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d)"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+512); - sprintf(query, q, info->reader_hostgroup, _hostname, _port, info->writer_hostgroup, info->backup_writer_hostgroup, info->offline_hostgroup); - mydb->execute(query); - //free(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port, info->offline_hostgroup, info->backup_writer_hostgroup, info->writer_hostgroup, info->writer_hostgroup); - mydb->execute(query); - //free(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,info->reader_hostgroup); - mydb->execute(query); - //free(query); - converge_galera_config(_writer_hostgroup); - commit(); - wrlock(); - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup FROM mysql_galera_hostgroups WHERE writer_hostgroup=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; - sprintf(query,q,writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup); - mydb->execute(query); - generate_mysql_servers_table(&writer_hostgroup); - generate_mysql_servers_table(&backup_writer_hostgroup); - generate_mysql_servers_table(&reader_hostgroup); - generate_mysql_servers_table(&offline_hostgroup); - } - } - delete resultset2; - resultset2=NULL; - } - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - free(query); - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } -} - -Galera_Info *MySQL_HostGroups_Manager::get_galera_node_info(int hostgroup) { - pthread_mutex_lock(&Galera_Info_mutex); - auto it2 = Galera_Info_Map.find(hostgroup); - Galera_Info *info = nullptr; - if (it2 != Galera_Info_Map.end()) { - info = it2->second; - } - pthread_mutex_unlock(&Galera_Info_mutex); - - return info; -} - -void MySQL_HostGroups_Manager::update_galera_set_writer(char *_hostname, int _port, int _writer_hostgroup) { - std::lock_guard lock(galera_set_writer_mutex); - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - q=(char *)"SELECT hostgroup_id,status FROM mysql_servers JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+32); - sprintf(query,q,_hostname,_port); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - if (error) { - free(error); - error=NULL; - } - free(query); - - int writer_is_also_reader=0; - bool found_writer=false; - bool found_reader=false; - int read_HG=-1; - bool need_converge=false; - int max_writers = 0; - Galera_Info *info=NULL; - - if (resultset) { - // let's get info about this cluster - pthread_mutex_lock(&Galera_Info_mutex); - std::map::iterator it2; - it2 = Galera_Info_Map.find(_writer_hostgroup); - - if (it2!=Galera_Info_Map.end()) { - info=it2->second; - writer_is_also_reader=info->writer_is_also_reader; - read_HG=info->reader_hostgroup; - need_converge=info->need_converge; - info->need_converge=false; - max_writers = info->max_writers; - } - pthread_mutex_unlock(&Galera_Info_mutex); - - if (resultset->rows_count) { - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - if (hostgroup==_writer_hostgroup) { - int status=atoi(r->fields[1]); - if (status==0) - found_writer=true; - } - if (read_HG>=0) { - if (hostgroup==read_HG) { - found_reader=true; - } - } - } - } - - if (need_converge == false) { - SQLite3_result *resultset2=NULL; - q = (char *)"SELECT COUNT(*) FROM mysql_servers WHERE hostgroup_id=%d AND status=0"; - query=(char *)malloc(strlen(q)+32); - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int nwriters = atoi(r->fields[0]); - if (nwriters > max_writers) { - proxy_warning("Galera: too many writers in HG %d. Max=%d, current=%d\n", _writer_hostgroup, max_writers, nwriters); - need_converge = true; - } - } - } - delete resultset2; - } - free(query); - } - - if (need_converge==false) { - if (found_writer) { // maybe no-op - if ( - (writer_is_also_reader==0 && found_reader==false) - || - (writer_is_also_reader == 1 && found_reader==true) - || - (writer_is_also_reader == 2) - ) { // either both true or both false - delete resultset; - resultset=NULL; - } - } - } - } - - if (resultset) { // if we reach there, there is some action to perform - if (resultset->rows_count) { - need_converge=false; - - GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d, %d)"; - query=(char *)malloc(strlen(q)+strlen(_hostname)+1024); // increased this buffer as it is used for other queries too - sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup, info->reader_hostgroup, info->backup_writer_hostgroup, info->offline_hostgroup); - mydb->execute(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; - sprintf(query,q,_hostname,_port,_writer_hostgroup); - mydb->execute(query); - //free(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id in (%d, %d, %d)"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port, info->reader_hostgroup, info->backup_writer_hostgroup, info->offline_hostgroup); - mydb->execute(query); - //free(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=%d"; - //query=(char *)malloc(strlen(q)+strlen(_hostname)+64); - sprintf(query,q,_hostname,_port,_writer_hostgroup); - mydb->execute(query); - //free(query); - if (writer_is_also_reader && read_HG>=0) { - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming (hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - sprintf(query,q,read_HG,_writer_hostgroup,_hostname,_port); - mydb->execute(query); - } - converge_galera_config(_writer_hostgroup); - uint64_t checksum_current = 0; - uint64_t checksum_incoming = 0; - { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset_servers=NULL; - char *query=NULL; - char *q1 = NULL; - char *q2 = NULL; - char *error=NULL; - q1 = (char *)"SELECT DISTINCT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, mysql_servers.comment FROM mysql_servers JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; - q2 = (char *)"SELECT DISTINCT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, mysql_servers_incoming.comment FROM mysql_servers_incoming JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; - query = (char *)malloc(strlen(q2)+128); - sprintf(query,q1,_writer_hostgroup); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset_servers); - if (error == NULL) { - if (resultset_servers) { - checksum_current = resultset_servers->raw_checksum(); - } - } - if (resultset_servers) { - delete resultset_servers; - resultset_servers = NULL; - } - sprintf(query,q2,_writer_hostgroup); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset_servers); - if (error == NULL) { - if (resultset_servers) { - checksum_incoming = resultset_servers->raw_checksum(); - } - } - if (resultset_servers) { - delete resultset_servers; - resultset_servers = NULL; - } - free(query); - } - if (checksum_incoming!=checksum_current) { - proxy_warning("Galera: setting host %s:%d as writer\n", _hostname, _port); - print_galera_nodes_last_status(); - commit(); - wrlock(); - SQLite3_result *resultset2=NULL; - q=(char *)"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, max_writers, writer_is_also_reader FROM mysql_galera_hostgroups WHERE writer_hostgroup=%d"; - sprintf(query,q,_writer_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset2); - if (resultset2) { - if (resultset2->rows_count) { - for (std::vector::iterator it = resultset2->rows.begin() ; it != resultset2->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int backup_writer_hostgroup=atoi(r->fields[1]); - int reader_hostgroup=atoi(r->fields[2]); - int offline_hostgroup=atoi(r->fields[3]); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d , %d , %d)"; - sprintf(query,q,_writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup); - mydb->execute(query); - generate_mysql_servers_table(&writer_hostgroup); - generate_mysql_servers_table(&backup_writer_hostgroup); - generate_mysql_servers_table(&reader_hostgroup); - generate_mysql_servers_table(&offline_hostgroup); - } - } - delete resultset2; - resultset2=NULL; - } - wrunlock(); - } else { - if (GloMTH->variables.hostgroup_manager_verbose > 1) { - proxy_warning("Galera: skipping setting node %s:%d from hostgroup %d as writer because won't change the list of ONLINE nodes in writer hostgroup\n", _hostname, _port, _writer_hostgroup); - } - } - GloAdmin->mysql_servers_wrunlock(); - free(query); - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } -} - -// this function completes the tuning of mysql_servers_incoming -// it assumes that before calling converge_galera_config() -// * GloAdmin->mysql_servers_wrlock() was already called -// * mysql_servers_incoming has already entries copied from mysql_servers and ready to be loaded -// at this moment, it is only used to check if there are more than one writer -void MySQL_HostGroups_Manager::converge_galera_config(int _writer_hostgroup) { - - // we first gather info about the cluster - pthread_mutex_lock(&Galera_Info_mutex); - std::map::iterator it2; - it2 = Galera_Info_Map.find(_writer_hostgroup); - Galera_Info *info=NULL; - if (it2!=Galera_Info_Map.end()) { - info=it2->second; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - q=(char *)"SELECT hostgroup_id,hostname,port FROM mysql_servers_incoming WHERE status=0 AND hostgroup_id IN (%d, %d, %d, %d) ORDER BY weight DESC, hostname DESC, port DESC"; - query=(char *)malloc(strlen(q)+256); - sprintf(query, q, info->writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup, info->offline_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - free(query); - if (resultset) { - if (resultset->rows_count) { - int num_writers=0; - int num_backup_writers=0; - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->writer_hostgroup) { - num_writers++; - } else { - if (hostgroup==info->backup_writer_hostgroup) { - num_backup_writers++; - } - } - } - if (num_writers > info->max_writers) { // there are more writers than allowed - int to_move=num_writers-info->max_writers; - int to_keep = info->max_writers; - if (GloMTH->variables.hostgroup_manager_verbose > 1) { - proxy_info("Galera: max_writers=%d , moving %d nodes from writer HG %d to backup HG %d\n", info->max_writers, to_move, info->writer_hostgroup, info->backup_writer_hostgroup); - } - //for (std::vector::reverse_iterator it = resultset->rows.rbegin() ; it != resultset->rows.rend(); ++it) { - // note: we change the iterator from reverse_iterator to forward iterator - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->writer_hostgroup) { - if (to_keep) { - q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET status=0 WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(r->fields[1])+128); - sprintf(query,q,info->writer_hostgroup,r->fields[1],atoi(r->fields[2])); - mydb->execute(query); - free(query); - to_keep--; - continue; - } - if (to_move) { - // if the server is already in writer hostgroup, we set to shunned #2656 - q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET status=1 WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(r->fields[1])+128); - sprintf(query,q,info->writer_hostgroup,r->fields[1],atoi(r->fields[2])); - mydb->execute(query); - free(query); - //q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET status=0, hostgroup_id=%d WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - // we copy the server from the writer hostgroup in the backup writer hostgroup #2656 - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming SELECT %d, hostname, port, gtid_port, weight, 0, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(r->fields[1])+128); - sprintf(query,q,info->backup_writer_hostgroup,info->writer_hostgroup,r->fields[1],atoi(r->fields[2])); - mydb->execute(query); - free(query); - to_move--; - } - } - } - } else { - if (num_writers < info->max_writers && num_backup_writers) { // or way too low writer - int to_move= ( (info->max_writers - num_writers) < num_backup_writers ? (info->max_writers - num_writers) : num_backup_writers); - if (GloMTH->variables.hostgroup_manager_verbose) { - proxy_info("Galera: max_writers=%d , moving %d nodes from backup HG %d to writer HG %d\n", info->max_writers, to_move, info->backup_writer_hostgroup, info->writer_hostgroup); - } - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - if (to_move) { - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->backup_writer_hostgroup) { - q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET status=0, hostgroup_id=%d WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q)+strlen(r->fields[1])+128); - sprintf(query,q,info->writer_hostgroup,info->backup_writer_hostgroup,r->fields[1],atoi(r->fields[2])); - if (GloMTH->variables.hostgroup_manager_verbose) { - proxy_info("Galera: %s\n", query); - } - mydb->execute(query); - free(query); - to_move--; - } - } - } - } else { - if (num_writers == 0 && num_backup_writers == 0) { - proxy_warning("Galera: we couldn't find any healthy node for writer HG %d\n", info->writer_hostgroup); - // ask Monitor to get the status of the whole cluster - std::vector * pn = GloMyMon->galera_find_possible_last_nodes(info->writer_hostgroup); - if (pn->size()) { - std::vector::iterator it2; - for (it2=pn->begin(); it2!=pn->end(); ++it2) { - string s0 = *it2; - proxy_info("Galera: possible writer candidate for HG %d: %s\n", info->writer_hostgroup, s0.c_str()); - } - char *error=NULL; - int cols; - int affected_rows; - SQLite3_result *resultset2=NULL; - q = (char *)"SELECT hostname, port FROM mysql_servers_incoming WHERE hostgroup_id IN (%d, %d, %d, %d) ORDER BY weight DESC, hostname DESC, port DESC"; - query=(char *)malloc(strlen(q) + 256); - sprintf(query,q, info->writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup, info->offline_hostgroup); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset2); - free(query); - if (resultset2) { - bool stop = false; - for (std::vector::iterator it = resultset2->rows.begin() ; (it != resultset2->rows.end()) && !stop ; ++it) { - SQLite3_row *r=*it; - char *h = r->fields[0]; - int p = atoi(r->fields[1]); - if (h) { - for (it2=pn->begin(); (it2!=pn->end()) && !stop; ++it2) { - std::string s = string(*it2); - std::size_t found=s.find_last_of(":"); - std::string host=s.substr(0,found); - std::string port=s.substr(found+1); - int port_n = atoi(port.c_str()); - if (strcmp(h,host.c_str())==0) { - if (p == port_n) { - stop = true; // we found a host to make a writer - proxy_info("Galera: trying to use server %s:%s as a writer for HG %d\n", host.c_str(), port.c_str(), info->writer_hostgroup); - q=(char *)"UPDATE OR REPLACE mysql_servers_incoming SET status=0, hostgroup_id=%d WHERE hostgroup_id IN (%d, %d, %d, %d) AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q) + s.length() + 512); - sprintf(query,q,info->writer_hostgroup, info->writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup, info->offline_hostgroup, host.c_str(), port_n); - mydb->execute(query); - free(query); - int writer_is_also_reader = info->writer_is_also_reader; - if (writer_is_also_reader) { - int read_HG = info->reader_hostgroup; - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming (hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s' AND port=%d"; - query=(char *)malloc(strlen(q) + s.length() + 128); - sprintf(query,q,read_HG, info->writer_hostgroup, host.c_str(), port_n); - mydb->execute(query); - free(query); - } - } - } - } - } - } - delete resultset2; - } - } - delete pn; - } - } - } - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } - if (info->writer_is_also_reader==2) { - q=(char *)"SELECT hostgroup_id,hostname,port FROM mysql_servers_incoming WHERE status=0 AND hostgroup_id IN (%d, %d, %d, %d) ORDER BY weight DESC, hostname DESC, port DESC"; - query=(char *)malloc(strlen(q)+256); - sprintf(query, q, info->writer_hostgroup, info->backup_writer_hostgroup, info->reader_hostgroup, info->offline_hostgroup); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - free(query); - if (resultset) { - if (resultset->rows_count) { - int num_writers=0; - int num_backup_writers=0; - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - if (hostgroup==info->writer_hostgroup) { - num_writers++; - } else { - if (hostgroup==info->backup_writer_hostgroup) { - num_backup_writers++; - } - } - } - // just delete the readers which are right now part of the writer hostgroup, preserving - // any current reader which is only in the reader hostgroup. This is because if a server - // is only part of the reader hostgroup at this point, means that it's there because of a - // reason beyond ProxySQL control, e.g. having READ_ONLY=1. - // Update for #3182: - // We just want to remove 'readers' which are 'ONLINE' right now, otherwise, - // we could be removing the introduced 'SHUNNED' readers, placed there by an 'offline soft' - // operation. - q=(char*)"DELETE FROM mysql_servers_incoming where hostgroup_id=%d and (hostname,port) in (SELECT hostname,port FROM mysql_servers_incoming WHERE hostgroup_id=%d AND status=0)"; - query=(char*)malloc(strlen(q) + 128); - sprintf(query, q, info->reader_hostgroup, info->writer_hostgroup); - mydb->execute(query); - free(query); - - if (num_backup_writers) { // there are backup writers, only these will be used as readers - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming (hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE hostgroup_id=%d"; - query=(char *)malloc(strlen(q) + 128); - sprintf(query,q, info->reader_hostgroup, info->backup_writer_hostgroup); - mydb->execute(query); - free(query); - } - } - delete resultset; - resultset=NULL; - } - } - } else { - // we couldn't find the cluster, exits - } - pthread_mutex_unlock(&Galera_Info_mutex); -} - - -class MySQL_Errors_stats { - public: - int hostgroup; - char *hostname; - int port; - char *username; - char *client_address; - char *schemaname; - int err_no; - char *last_error; - time_t first_seen; - time_t last_seen; - unsigned long long count_star; - MySQL_Errors_stats(int hostgroup_, char *hostname_, int port_, char *username_, char *address_, char *schemaname_, int err_no_, char *last_error_, time_t tn) { - hostgroup = hostgroup_; - if (hostname_) { - hostname = strdup(hostname_); - } else { - hostname = strdup((char *)""); - } - port = port_; - if (username_) { - username = strdup(username_); - } else { - username = strdup((char *)""); - } - if (address_) { - client_address = strdup(address_); - } else { - client_address = strdup((char *)""); - } - if (schemaname_) { - schemaname = strdup(schemaname_); - } else { - schemaname = strdup((char *)""); - } - err_no = err_no_; - if (last_error_) { - last_error = strdup(last_error_); - } else { - last_error = strdup((char *)""); - } - last_seen = tn; - first_seen = tn; - count_star = 1; + last_seen = tn; + first_seen = tn; + count_star = 1; } ~MySQL_Errors_stats() { if (hostname) { @@ -5922,163 +4169,46 @@ void MySQL_HostGroups_Manager::add_mysql_errors(int hostgroup, char *hostname, i /* mes->last_seen = tn; if (strcmp(mes->last_error,last_error)) { - free(mes->last_error); - mes->last_error = strdup(last_error); - mes->count_star++; - } -*/ - } else { - mes = new MySQL_Errors_stats(hostgroup, hostname, port, username, address, schemaname, err_no, last_error, tn); - mysql_errors_umap.insert(std::make_pair(hash1,(void *)mes)); - } - pthread_mutex_unlock(&mysql_errors_mutex); -} - -SQLite3_result * MySQL_HostGroups_Manager::get_mysql_errors(bool reset) { - SQLite3_result *result=new SQLite3_result(MYSQL_ERRORS_STATS_FIELD_NUM); - pthread_mutex_lock(&mysql_errors_mutex); - result->add_column_definition(SQLITE_TEXT,"hid"); - result->add_column_definition(SQLITE_TEXT,"hostname"); - result->add_column_definition(SQLITE_TEXT,"port"); - result->add_column_definition(SQLITE_TEXT,"username"); - result->add_column_definition(SQLITE_TEXT,"client_address"); - result->add_column_definition(SQLITE_TEXT,"schemaname"); - result->add_column_definition(SQLITE_TEXT,"err_no"); - result->add_column_definition(SQLITE_TEXT,"count_star"); - result->add_column_definition(SQLITE_TEXT,"first_seen"); - result->add_column_definition(SQLITE_TEXT,"last_seen"); - result->add_column_definition(SQLITE_TEXT,"last_error"); - for (std::unordered_map::iterator it=mysql_errors_umap.begin(); it!=mysql_errors_umap.end(); ++it) { - MySQL_Errors_stats *mes=(MySQL_Errors_stats *)it->second; - char **pta=mes->get_row(); - result->add_row(pta); - mes->free_row(pta); - if (reset) { - delete mes; - } - } - if (reset) { - mysql_errors_umap.erase(mysql_errors_umap.begin(),mysql_errors_umap.end()); - } - pthread_mutex_unlock(&mysql_errors_mutex); - return result; -} - -AWS_Aurora_Info::AWS_Aurora_Info(int w, int r, int _port, char *_end_addr, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c) { - comment=NULL; - if (c) { - comment=strdup(c); - } - writer_hostgroup=w; - reader_hostgroup=r; - max_lag_ms=maxl; - add_lag_ms=al; - min_lag_ms=minl; - lag_num_checks=lnc; - check_interval_ms=ci; - check_timeout_ms=ct; - writer_is_also_reader=wiar; - new_reader_weight=nrw; - active=_a; - __active=true; - //need_converge=true; - aurora_port = _port; - domain_name = strdup(_end_addr); -} - -AWS_Aurora_Info::~AWS_Aurora_Info() { - if (comment) { - free(comment); - comment=NULL; - } - if (domain_name) { - free(domain_name); - domain_name=NULL; - } -} - -bool AWS_Aurora_Info::update(int r, int _port, char *_end_addr, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c) { - bool ret=false; - __active=true; - if (reader_hostgroup!=r) { - reader_hostgroup=r; - ret=true; - } - if (max_lag_ms!=maxl) { - max_lag_ms=maxl; - ret=true; - } - if (add_lag_ms!=al) { - add_lag_ms=al; - ret=true; - } - if (min_lag_ms!=minl) { - min_lag_ms=minl; - ret=true; - } - if (lag_num_checks!=lnc) { - lag_num_checks=lnc; - ret=true; - } - if (check_interval_ms!=ci) { - check_interval_ms=ci; - ret=true; - } - if (check_timeout_ms!=ct) { - check_timeout_ms=ct; - ret=true; - } - if (writer_is_also_reader != wiar) { - writer_is_also_reader = wiar; - ret = true; - } - if (new_reader_weight != nrw) { - new_reader_weight = nrw; - ret = true; - } - if (active!=_a) { - active=_a; - ret=true; - } - if (aurora_port != _port) { - aurora_port = _port; - ret = true; - } - if (domain_name) { - if (_end_addr) { - if (strcmp(domain_name,_end_addr)) { - free(domain_name); - domain_name = strdup(_end_addr); - ret = true; - } - } else { - free(domain_name); - domain_name=NULL; - ret = true; + free(mes->last_error); + mes->last_error = strdup(last_error); + mes->count_star++; } +*/ } else { - if (_end_addr) { - domain_name=strdup(_end_addr); - ret = true; - } + mes = new MySQL_Errors_stats(hostgroup, hostname, port, username, address, schemaname, err_no, last_error, tn); + mysql_errors_umap.insert(std::make_pair(hash1,(void *)mes)); } - // for comment we don't change return value - if (comment) { - if (c) { - if (strcmp(comment,c)) { - free(comment); - comment=strdup(c); - } - } else { - free(comment); - comment=NULL; - } - } else { - if (c) { - comment=strdup(c); + pthread_mutex_unlock(&mysql_errors_mutex); +} + +SQLite3_result * MySQL_HostGroups_Manager::get_mysql_errors(bool reset) { + SQLite3_result *result=new SQLite3_result(MYSQL_ERRORS_STATS_FIELD_NUM); + pthread_mutex_lock(&mysql_errors_mutex); + result->add_column_definition(SQLITE_TEXT,"hid"); + result->add_column_definition(SQLITE_TEXT,"hostname"); + result->add_column_definition(SQLITE_TEXT,"port"); + result->add_column_definition(SQLITE_TEXT,"username"); + result->add_column_definition(SQLITE_TEXT,"client_address"); + result->add_column_definition(SQLITE_TEXT,"schemaname"); + result->add_column_definition(SQLITE_TEXT,"err_no"); + result->add_column_definition(SQLITE_TEXT,"count_star"); + result->add_column_definition(SQLITE_TEXT,"first_seen"); + result->add_column_definition(SQLITE_TEXT,"last_seen"); + result->add_column_definition(SQLITE_TEXT,"last_error"); + for (std::unordered_map::iterator it=mysql_errors_umap.begin(); it!=mysql_errors_umap.end(); ++it) { + MySQL_Errors_stats *mes=(MySQL_Errors_stats *)it->second; + char **pta=mes->get_row(); + result->add_row(pta); + mes->free_row(pta); + if (reset) { + delete mes; } } - return ret; + if (reset) { + mysql_errors_umap.erase(mysql_errors_umap.begin(),mysql_errors_umap.end()); + } + pthread_mutex_unlock(&mysql_errors_mutex); + return result; } /** @@ -6340,208 +4470,6 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_ssl_params_table() { incoming_mysql_servers_ssl_params=NULL; } -void MySQL_HostGroups_Manager::generate_mysql_aws_aurora_hostgroups_table() { - if (incoming_aws_aurora_hostgroups==NULL) { - return; - } - int rc; - sqlite3_stmt *statement=NULL; - //sqlite3 *mydb3=mydb->get_db(); - char *query=(char *)"INSERT INTO mysql_aws_aurora_hostgroups(writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms,check_interval_ms," - "check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment) VALUES " - "(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; - //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query, -1, &statement, 0); - rc = mydb->prepare_v2(query, &statement); - ASSERT_SQLITE_OK(rc, mydb); - proxy_info("New mysql_aws_aurora_hostgroups table\n"); - pthread_mutex_lock(&AWS_Aurora_Info_mutex); - for (std::map::iterator it1 = AWS_Aurora_Info_Map.begin() ; it1 != AWS_Aurora_Info_Map.end(); ++it1) { - AWS_Aurora_Info *info=NULL; - info=it1->second; - info->__active=false; - } - for (std::vector::iterator it = incoming_aws_aurora_hostgroups->rows.begin() ; it != incoming_aws_aurora_hostgroups->rows.end(); ++it) { - SQLite3_row *r=*it; - int writer_hostgroup=atoi(r->fields[0]); - int reader_hostgroup=atoi(r->fields[1]); - int active=atoi(r->fields[2]); - int aurora_port = atoi(r->fields[3]); - int max_lag_ms = atoi(r->fields[5]); - int check_interval_ms = atoi(r->fields[6]); - int check_timeout_ms = atoi(r->fields[7]); - int writer_is_also_reader = atoi(r->fields[8]); - int new_reader_weight = atoi(r->fields[9]); - int add_lag_ms = atoi(r->fields[10]); - int min_lag_ms = atoi(r->fields[11]); - int lag_num_checks = atoi(r->fields[12]); - proxy_info("Loading AWS Aurora info for (%d,%d,%s,%d,\"%s\",%d,%d,%d,%d,%d,%d,\"%s\")\n", writer_hostgroup,reader_hostgroup,(active ? "on" : "off"),aurora_port, - r->fields[4],max_lag_ms,add_lag_ms,min_lag_ms,lag_num_checks,check_interval_ms,check_timeout_ms,r->fields[13]); - rc=(*proxy_sqlite3_bind_int64)(statement, 1, writer_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 2, reader_hostgroup); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 3, active); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 4, aurora_port); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_text)(statement, 5, r->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 6, max_lag_ms); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 7, check_interval_ms); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 8, check_timeout_ms); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 9, writer_is_also_reader); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 10, new_reader_weight); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 11, add_lag_ms); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 12, min_lag_ms); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_int64)(statement, 13, lag_num_checks); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_bind_text)(statement, 14, r->fields[13], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); - - SAFE_SQLITE3_STEP2(statement); - rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mydb); - rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mydb); - std::map::iterator it2; - it2 = AWS_Aurora_Info_Map.find(writer_hostgroup); - AWS_Aurora_Info *info=NULL; - if (it2!=AWS_Aurora_Info_Map.end()) { - info=it2->second; - bool changed=false; - changed=info->update(reader_hostgroup, aurora_port, r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, (bool)active, writer_is_also_reader, new_reader_weight, r->fields[10]); - if (changed) { - //info->need_converge=true; - } - } else { - info=new AWS_Aurora_Info(writer_hostgroup, reader_hostgroup, aurora_port, r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, (bool)active, writer_is_also_reader, new_reader_weight, r->fields[10]); - //info->need_converge=true; - AWS_Aurora_Info_Map.insert(AWS_Aurora_Info_Map.begin(), std::pair(writer_hostgroup,info)); - } - } - (*proxy_sqlite3_finalize)(statement); - delete incoming_aws_aurora_hostgroups; - incoming_aws_aurora_hostgroups=NULL; - - // remove missing ones - for (auto it3 = AWS_Aurora_Info_Map.begin(); it3 != AWS_Aurora_Info_Map.end(); ) { - AWS_Aurora_Info *info=it3->second; - if (info->__active==false) { - delete info; - it3 = AWS_Aurora_Info_Map.erase(it3); - } else { - it3++; - } - } - // TODO: it is now time to compute all the changes - - - // it is now time to build a new structure in Monitor - pthread_mutex_lock(&GloMyMon->aws_aurora_mutex); - update_aws_aurora_hosts_monitor_resultset(false); - pthread_mutex_unlock(&GloMyMon->aws_aurora_mutex); - - pthread_mutex_unlock(&AWS_Aurora_Info_mutex); -} - - - -//void MySQL_HostGroups_Manager::aws_aurora_replication_lag_action(int _whid, int _rhid, char *address, unsigned int port, float current_replication_lag, bool enable, bool verbose) { -// this function returns false is the server is in the wrong HG -bool MySQL_HostGroups_Manager::aws_aurora_replication_lag_action(int _whid, int _rhid, char *_server_id, float current_replication_lag_ms, bool enable, bool is_writer, bool verbose) { - bool ret = false; // return false by default - bool reader_found_in_whg = false; - if (is_writer) { - // if the server is a writer, we will set ret back to true once found - ret = false; - } - unsigned port = 3306; - char *domain_name = strdup((char *)""); - { - pthread_mutex_lock(&AWS_Aurora_Info_mutex); - std::map::iterator it2; - it2 = AWS_Aurora_Info_Map.find(_whid); - AWS_Aurora_Info *info=NULL; - if (it2!=AWS_Aurora_Info_Map.end()) { - info=it2->second; - if (info->domain_name) { - free(domain_name); - domain_name = strdup(info->domain_name); - } - port = info->aurora_port; - } - pthread_mutex_unlock(&AWS_Aurora_Info_mutex); - } - char *address = (char *)malloc(strlen(_server_id)+strlen(domain_name)+1); - sprintf(address,"%s%s",_server_id,domain_name); - GloAdmin->mysql_servers_wrlock(); - wrlock(); - int i,j; - for (i=0; i<(int)MyHostGroups->len; i++) { - MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); - if (_whid!=(int)myhgc->hid && _rhid!=(int)myhgc->hid) continue; - for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) { - MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); - if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { - // we found the server - if (enable==false) { - if (mysrvc->get_status() == MYSQL_SERVER_STATUS_ONLINE) { - if (verbose) { - proxy_warning("Shunning server %s:%d from HG %u with replication lag of %f microseconds\n", address, port, myhgc->hid, current_replication_lag_ms); - } - mysrvc->set_status(MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG); - } - } else { - if (mysrvc->get_status() == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { - if (verbose) { - proxy_warning("Re-enabling server %s:%d from HG %u with replication lag of %f microseconds\n", address, port, myhgc->hid, current_replication_lag_ms); - } - mysrvc->set_status(MYSQL_SERVER_STATUS_ONLINE); - } - } - mysrvc->aws_aurora_current_lag_us = current_replication_lag_ms * 1000; - if (mysrvc->get_status() == MYSQL_SERVER_STATUS_ONLINE || mysrvc->get_status() == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { - // we perform check only if ONLINE or lagging - if (ret) { - if (_whid==(int)myhgc->hid && is_writer==false) { - // the server should be a reader - // but it is in the writer hostgroup - ret = false; - reader_found_in_whg = true; - } - } else { - if (is_writer==true) { - if (_whid==(int)myhgc->hid) { - // the server should be a writer - // and we found it in the writer hostgroup - ret = true; - } - } else { - if (_rhid==(int)myhgc->hid) { - // the server should be a reader - // and we found it in the reader hostgroup - ret = true; - } - } - } - } - if (ret==false) - if (is_writer==true) - if (enable==true) - if (_whid==(int)myhgc->hid) - if (mysrvc->get_status() == MYSQL_SERVER_STATUS_OFFLINE_HARD) { - mysrvc->set_status(MYSQL_SERVER_STATUS_ONLINE); - proxy_warning("Re-enabling server %s:%d from HG %u because it is a writer\n", address, port, myhgc->hid); - ret = true; - } - //goto __exit_aws_aurora_replication_lag_action; - } - } - } -//__exit_aws_aurora_replication_lag_action: - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - if (ret == true) { - if (reader_found_in_whg == true) { - ret = false; - } - } - free(address); - free(domain_name); - return ret; -} - int MySQL_HostGroups_Manager::create_new_server_in_hg( uint32_t hid, const srv_info_t& srv_info, const srv_opts_t& srv_opts ) { @@ -6622,420 +4550,6 @@ int MySQL_HostGroups_Manager::remove_server_in_hg(uint32_t hid, const string& ad return 0; } -// FIXME: complete this!! -void MySQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid, char *_server_id, bool verbose) { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - //q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_galera_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=offline_hostgroup WHERE hostname='%s' AND port=%d AND status<>3"; - q=(char *)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3 AND hostgroup_id IN (%d, %d)"; - - int writer_is_also_reader=0; - int new_reader_weight = 1; - bool found_writer=false; - bool found_reader=false; - int _writer_hostgroup = _whid; - int aurora_port = 3306; - char *domain_name = strdup((char *)""); - int read_HG=-1; - { - pthread_mutex_lock(&AWS_Aurora_Info_mutex); - std::map::iterator it2; - it2 = AWS_Aurora_Info_Map.find(_writer_hostgroup); - AWS_Aurora_Info *info=NULL; - if (it2!=AWS_Aurora_Info_Map.end()) { - info=it2->second; - writer_is_also_reader=info->writer_is_also_reader; - new_reader_weight = info->new_reader_weight; - read_HG = info->reader_hostgroup; - if (info->domain_name) { - free(domain_name); - domain_name = strdup(info->domain_name); - } - aurora_port = info->aurora_port; - } - pthread_mutex_unlock(&AWS_Aurora_Info_mutex); - } - - query=(char *)malloc(strlen(q)+strlen(_server_id)+strlen(domain_name)+1024*1024); - sprintf(query, q, _server_id, domain_name, aurora_port, _whid, _rhid); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - if (error) { - free(error); - error=NULL; - } - - if (resultset) { - if (resultset->rows_count) { - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int hostgroup=atoi(r->fields[0]); - if (hostgroup==_writer_hostgroup) { - found_writer=true; - } - if (read_HG>=0) { - if (hostgroup==read_HG) { - found_reader=true; - } - } - } - } - - if (found_writer) { // maybe no-op - if ( - (writer_is_also_reader==0 && found_reader==false) - || - (writer_is_also_reader > 0 && found_reader==true) - ) { // either both true or both false - delete resultset; - resultset=NULL; - } - } - } - - if (resultset) { - // If we reach there, there is some action to perform. - // This should be the case most of the time, - // because the calling function knows if an action is required. - if (resultset->rows_count) { - GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - q=(char *)"INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE hostgroup_id=%d"; - sprintf(query,q,_rhid); - mydb->execute(query); - q=(char *)"INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; - sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port); - mydb->execute(query); - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; - sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port, _writer_hostgroup); - mydb->execute(query); - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; - sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); - mydb->execute(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; - sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); - mydb->execute(query); - - // we need to move the old writer into the reader HG - q=(char *)"DELETE FROM mysql_servers_incoming WHERE status=3 AND hostgroup_id=%d"; - sprintf(query,q,_rhid); - mydb->execute(query); - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming SELECT %d, hostname, port, gtid_port, %d, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE hostgroup_id=%d AND status=0"; - sprintf(query,q,_rhid, new_reader_weight, _whid); - mydb->execute(query); - - if (writer_is_also_reader && read_HG>=0) { - q=(char *)"INSERT OR IGNORE INTO mysql_servers_incoming (hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; - sprintf(query, q, read_HG, _writer_hostgroup, _server_id, domain_name, aurora_port); - mydb->execute(query); - q = (char *)"UPDATE mysql_servers_incoming SET weight=%d WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; - sprintf(query, q, new_reader_weight, read_HG, _server_id, domain_name, aurora_port); - mydb->execute(query); - } - uint64_t checksum_current = 0; - uint64_t checksum_incoming = 0; - { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset_servers=NULL; - char *query=NULL; - char *q1 = NULL; - char *q2 = NULL; - char *error=NULL; - q1 = (char *)"SELECT DISTINCT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, mysql_servers.comment FROM mysql_servers JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; - q2 = (char *)"SELECT DISTINCT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, mysql_servers_incoming.comment FROM mysql_servers_incoming JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; - query = (char *)malloc(strlen(q2)+128); - sprintf(query,q1,_writer_hostgroup); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset_servers); - if (error == NULL) { - if (resultset_servers) { - checksum_current = resultset_servers->raw_checksum(); - } - } - if (resultset_servers) { - delete resultset_servers; - resultset_servers = NULL; - } - sprintf(query,q2,_writer_hostgroup); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset_servers); - if (error == NULL) { - if (resultset_servers) { - checksum_incoming = resultset_servers->raw_checksum(); - } - } - if (resultset_servers) { - delete resultset_servers; - resultset_servers = NULL; - } - free(query); - } - if (checksum_incoming!=checksum_current) { - proxy_warning("AWS Aurora: setting host %s%s:%d as writer\n", _server_id, domain_name, aurora_port); - q = (char *)"INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE hostgroup_id NOT IN (%d, %d)"; - sprintf(query, q, _rhid, _whid); - mydb->execute(query); - commit(); - wrlock(); - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; - sprintf(query,q,_whid,_rhid); - mydb->execute(query); - generate_mysql_servers_table(&_whid); - generate_mysql_servers_table(&_rhid); - wrunlock(); - } else { - if (GloMTH->variables.hostgroup_manager_verbose > 1) { - proxy_warning("AWS Aurora: skipping setting node %s%s:%d from hostgroup %d as writer because won't change the list of ONLINE nodes in writer hostgroup\n", _server_id, domain_name, aurora_port, _writer_hostgroup); - } - } - GloAdmin->mysql_servers_wrunlock(); - free(query); - query = NULL; - } else { - string full_hostname { string { _server_id } + string { domain_name } }; - - GloAdmin->mysql_servers_wrlock(); - wrlock(); - - srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora AWS" }; - srv_opts_t wr_srv_opts { -1, -1, -1 }; - - int wr_res = create_new_server_in_hg(_writer_hostgroup, srv_info, wr_srv_opts); - int rd_res = -1; - - // WRITER can also be placed as READER, or could previously be one - if (writer_is_also_reader && read_HG >= 0) { - srv_opts_t rd_srv_opts { new_reader_weight, -1, -1 }; - rd_res = create_new_server_in_hg(read_HG, srv_info, rd_srv_opts); - } - - // A new server has been created, or an OFFLINE_HARD brought back as ONLINE - if (wr_res == 0 || rd_res == 0) { - proxy_info( - "AWS Aurora: setting new auto-discovered host %s:%d as writer\n", full_hostname.c_str(), aurora_port - ); - purge_mysql_servers_table(); - - const char del_srvs_query_t[] { "DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)" }; - const string del_srvs_query { cstr_format(del_srvs_query_t, _whid, _rhid).str }; - mydb->execute(del_srvs_query.c_str()); - - generate_mysql_servers_table(&_whid); - generate_mysql_servers_table(&_rhid); - - // Update the global checksums after 'mysql_servers' regeneration - { - unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; - string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; - save_runtime_mysql_servers(resultset.release()); - proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str()); - - pthread_mutex_lock(&GloVars.checksum_mutex); - update_glovars_mysql_servers_checksum(mysrvs_checksum); - pthread_mutex_unlock(&GloVars.checksum_mutex); - } - - // Because 'commit' isn't called, we are required to update 'mysql_servers_for_monitor'. - update_table_mysql_servers_for_monitor(false); - // Update AWS Aurora resultset used for monitoring - update_aws_aurora_hosts_monitor_resultset(true); - } - - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } - if (query) { - free(query); - } - free(domain_name); -} - -void MySQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid, char *_server_id) { - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - char *query=NULL; - char *q=NULL; - char *error=NULL; - int _writer_hostgroup = _whid; - int aurora_port = 3306; - int new_reader_weight = 0; - char *domain_name = strdup((char *)""); - { - pthread_mutex_lock(&AWS_Aurora_Info_mutex); - std::map::iterator it2; - it2 = AWS_Aurora_Info_Map.find(_writer_hostgroup); - AWS_Aurora_Info *info=NULL; - if (it2!=AWS_Aurora_Info_Map.end()) { - info=it2->second; - if (info->domain_name) { - free(domain_name); - domain_name = strdup(info->domain_name); - } - aurora_port = info->aurora_port; - new_reader_weight = info->new_reader_weight; - } - pthread_mutex_unlock(&AWS_Aurora_Info_mutex); - } - q = (char*)"SELECT hostgroup_id FROM mysql_servers JOIN mysql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3 AND hostgroup_id IN (%d,%d)"; - query=(char *)malloc(strlen(q)+strlen(_server_id)+strlen(domain_name)+32+32+32); - sprintf(query, q, _server_id, domain_name, aurora_port, _whid, _rhid); - mydb->execute_statement(query, &error, &cols , &affected_rows , &resultset); - if (error) { - free(error); - error=NULL; - } - free(query); - if (resultset) { // we lock only if needed - if (resultset->rows_count) { - proxy_warning("AWS Aurora: setting host %s%s:%d (part of cluster with writer_hostgroup=%d) in a reader, moving from writer_hostgroup %d to reader_hostgroup %d\n", _server_id, domain_name, aurora_port, _whid, _whid, _rhid); - GloAdmin->mysql_servers_wrlock(); - mydb->execute("DELETE FROM mysql_servers_incoming"); - mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers"); - // If server present as WRITER try moving it to 'reader_hostgroup'. - q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; - query=(char *)malloc(strlen(q)+strlen(_server_id)+strlen(domain_name)+512); - sprintf(query, q, _rhid, _server_id, domain_name, aurora_port, _whid); - mydb->execute(query); - // Reader could previously be also a reader, in which case previous operation 'UPDATE OR IGNORE' - // did nothing. If server is still in the 'writer_hostgroup', we should remove it. - q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; - sprintf(query, q, _server_id, domain_name, aurora_port, _whid); - mydb->execute(query); - q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; - sprintf(query, q, _server_id, domain_name, aurora_port, _rhid); - mydb->execute(query); - commit(); - wrlock(); - - q=(char *)"DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)"; - sprintf(query,q,_whid,_rhid); - mydb->execute(query); - generate_mysql_servers_table(&_whid); - generate_mysql_servers_table(&_rhid); - - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - free(query); - } else { - // we couldn't find the server - // autodiscovery algorithm here - string full_hostname { string { _server_id } + string { domain_name } }; - GloAdmin->mysql_servers_wrlock(); - wrlock(); - - srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora AWS" }; - srv_opts_t srv_opts { new_reader_weight, -1, -1 }; - int wr_res = create_new_server_in_hg(_rhid, srv_info, srv_opts); - - // A new server has been created, or an OFFLINE_HARD brought back as ONLINE - if (wr_res == 0) { - purge_mysql_servers_table(); - - const char del_srvs_query_t[] { "DELETE FROM mysql_servers WHERE hostgroup_id IN (%d , %d)" }; - const string del_srvs_query { cstr_format(del_srvs_query_t, _whid, _rhid).str }; - mydb->execute(del_srvs_query.c_str()); - - generate_mysql_servers_table(&_whid); - generate_mysql_servers_table(&_rhid); - - // Update the global checksums after 'mysql_servers' regeneration - { - unique_ptr resultset { get_admin_runtime_mysql_servers(mydb) }; - string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; - save_runtime_mysql_servers(resultset.release()); - proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str()); - - pthread_mutex_lock(&GloVars.checksum_mutex); - update_glovars_mysql_servers_checksum(mysrvs_checksum); - pthread_mutex_unlock(&GloVars.checksum_mutex); - } - - // Because 'commit' isn't called, we are required to update 'mysql_servers_for_monitor'. - update_table_mysql_servers_for_monitor(false); - // Update AWS Aurora resultset used for monitoring - update_aws_aurora_hosts_monitor_resultset(true); - } - - wrunlock(); - GloAdmin->mysql_servers_wrunlock(); - } - } - if (resultset) { - delete resultset; - resultset=NULL; - } - free(domain_name); -} - -const char SELECT_AWS_AURORA_SERVERS_FOR_MONITOR[] { - "SELECT writer_hostgroup, reader_hostgroup, hostname, port, MAX(use_ssl) use_ssl, max_lag_ms, check_interval_ms," - " check_timeout_ms, add_lag_ms, min_lag_ms, lag_num_checks FROM mysql_servers" - " JOIN mysql_aws_aurora_hostgroups ON" - " hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE active=1 AND status NOT IN (2,3)" - " GROUP BY writer_hostgroup, hostname, port" -}; - -void MySQL_HostGroups_Manager::update_aws_aurora_hosts_monitor_resultset(bool lock) { - if (lock) { - pthread_mutex_lock(&AWS_Aurora_Info_mutex); - pthread_mutex_lock(&GloMyMon->aws_aurora_mutex); - } - - SQLite3_result* resultset = nullptr; - { - char* error = nullptr; - int cols = 0; - int affected_rows = 0; - mydb->execute_statement(SELECT_AWS_AURORA_SERVERS_FOR_MONITOR, &error, &cols, &affected_rows, &resultset); - } - - if (resultset) { - if (GloMyMon->AWS_Aurora_Hosts_resultset) { - delete GloMyMon->AWS_Aurora_Hosts_resultset; - } - GloMyMon->AWS_Aurora_Hosts_resultset=resultset; - GloMyMon->AWS_Aurora_Hosts_resultset_checksum=resultset->raw_checksum(); - } - - if (lock) { - pthread_mutex_unlock(&GloMyMon->aws_aurora_mutex); - pthread_mutex_unlock(&AWS_Aurora_Info_mutex); - } -} - -MySrvC* MySQL_HostGroups_Manager::find_server_in_hg(unsigned int _hid, const std::string& addr, int port) { - MySrvC* f_server = nullptr; - - MyHGC* myhgc = nullptr; - for (uint32_t i = 0; i < MyHostGroups->len; i++) { - myhgc = static_cast(MyHostGroups->index(i)); - - if (myhgc->hid == _hid) { - break; - } - } - - if (myhgc != nullptr) { - for (uint32_t j = 0; j < myhgc->mysrvs->cnt(); j++) { - MySrvC* mysrvc = static_cast(myhgc->mysrvs->servers->index(j)); - - if (strcmp(mysrvc->address, addr.c_str()) == 0 && mysrvc->port == port) { - f_server = mysrvc; - } - } - } - - return f_server; -} - void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::copy_if_not_exists(Type dest_type, Type src_type) { assert(dest_type != src_type); diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index f6a01530d..4f7925959 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -2230,79 +2230,6 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql(const string& name) return resultset; } -#if 0 -/** - * @brief Create a new MySQL host group container. - * - * This function creates a new instance of the MySQL host group container (`MyHGC`) with - * the specified host group ID and returns a pointer to it. - * - * @param _hid The host group ID for the new container. - * @return A pointer to the newly created `MyHGC` instance. - */ -MyHGC * MySQL_HostGroups_Manager::MyHGC_create(unsigned int _hid) { - MyHGC *myhgc=new MyHGC(_hid); - return myhgc; -} - -/** - * @brief Find a MySQL host group container by host group ID. - * - * This function searches for a MySQL host group container with the specified host group ID - * in the list of host groups. If found, it returns a pointer to the container; otherwise, - * it returns a null pointer. - * - * @param _hid The host group ID to search for. - * @return A pointer to the found `MyHGC` instance if found; otherwise, a null pointer. - */ -MyHGC * MySQL_HostGroups_Manager::MyHGC_find(unsigned int _hid) { - if (MyHostGroups->len < 100) { - // for few HGs, we use the legacy search - for (unsigned int i=0; ilen; i++) { - MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i); - if (myhgc->hid==_hid) { - return myhgc; - } - } - } else { - // for a large number of HGs, we use the unordered_map - // this search is slower for a small number of HGs, therefore we use - // it only for large number of HGs - std::unordered_map::const_iterator it = MyHostGroups_map.find(_hid); - if (it != MyHostGroups_map.end()) { - MyHGC *myhgc = it->second; - return myhgc; - } - } - return NULL; -} -/** - * @brief Lookup or create a MySQL host group container by host group ID. - * - * This function looks up a MySQL host group container with the specified host group ID. If - * found, it returns a pointer to the existing container; otherwise, it creates a new container - * with the specified host group ID, adds it to the list of host groups, and returns a pointer - * to it. - * - * @param _hid The host group ID to lookup or create. - * @return A pointer to the found or newly created `MyHGC` instance. - * @note The function assertion fails if a newly created container is not found. - */ -MyHGC * MySQL_HostGroups_Manager::MyHGC_lookup(unsigned int _hid) { - MyHGC *myhgc=NULL; - myhgc=MyHGC_find(_hid); - if (myhgc==NULL) { - myhgc=MyHGC_create(_hid); - } else { - return myhgc; - } - assert(myhgc); - MyHostGroups->add(myhgc); - MyHostGroups_map.emplace(_hid,myhgc); - return myhgc; -} -#endif // 0 - void MySQL_HostGroups_Manager::increase_reset_counter() { wrlock(); status.myconnpoll_reset++;