diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 617d1bf3c..30c11f7db 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -131,6 +131,16 @@ class AWS_Aurora_monitor_node { } }; +class AWS_RDS_topology_server { + public: + string addr; + int port; + unsigned int writer_hostgroup; + unordered_set hosts_in_topology; + + AWS_RDS_topology_server(const string &_str_a, int _p, int _whg); +}; + typedef struct _Galera_status_entry_t { unsigned long long start_time; unsigned long long check_time; @@ -469,7 +479,6 @@ class MySQL_Monitor { void add_topology_query_to_task(MySQL_Monitor_State_Data_Task_Type &task_type); bool is_aws_rds_topology_version_supported(const string& version); - private: std::vector *tables_defs_monitor; std::vector *tables_defs_monitor_internal; @@ -491,6 +500,7 @@ class MySQL_Monitor { SQLite3_result *Galera_Hosts_resultset; std::map AWS_Aurora_Hosts_Map; SQLite3_result *AWS_Aurora_Hosts_resultset; + std::map> AWS_RDS_Topology_Server_Map; uint64_t AWS_Aurora_Hosts_resultset_checksum; unsigned int num_threads; unsigned int aux_threads; diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 1dfb93f37..57b2ccf85 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -3472,15 +3472,16 @@ VALGRIND_ENABLE_ERROR_REPORTING; * @param reader_hostgroup Reader hostgroup to which we will add the discovered servers. */ void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector& discovered_servers, const MySQL_Monitor_State_Data* mmsd, int num_fields) { - // Check if the query result matches the query task type: exactly 3 for Multi-AZ DB Clusters, even number for blue/green deployment + // Check if the query needs to be changed because it matches a blue/green deployment: exactly 3 entries for Multi-AZ DB Clusters, even number for blue/green deployment if (rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() % 2 == 0) { - // With the AWS_RDS_TOPOLOGY_CHECK, we didn't get the role and status data, so we should retry on the next read_only check with the correct query + // With the AWS_RDS_TOPOLOGY_CHECK, we didn't get the role and status data, so we retry with the correct query on the next read_only check rds_topology_check_type = AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK; topology_loop = mysql_thread___monitor_aws_rds_topology_discovery_interval; return; } else if ((rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() != 3) || (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && discovered_servers.size() % 2 != 0)) { // Query result matches neither a Multi_AZ DB Cluster nor a Blue/Green deployment + // TODO: Account for topology metadata towards the end of a blue/green deployment switchover (possibly odd number of entries) rds_topology_check_type = AWS_RDS_TOPOLOGY_CHECK; // Set back to default rds_topology check proxy_debug(PROXY_DEBUG_MONITOR, 7, "Got a query result for the rds_topology metadata table but it matches neither Multi-AZ DB Clusters, nor a blue/green deployment. Number of records: %d\n", discovered_servers.size()); return; @@ -3494,83 +3495,73 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s uint32_t reader_hostgroup = (uint32_t)(mmsd->reader_hostgroup); - char *error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result *runtime_mysql_servers = NULL; + // Add the queried server or update its entry in the topology server map with the current timestamp + if (AWS_RDS_Topology_Server_Map.count(originating_server_hostname) == 0) { + auto queried_server = make_shared(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); - char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname"; - proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); - monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers); + AWS_RDS_Topology_Server_Map.insert({ originating_server_hostname, queried_server }); + } - if (error) { - proxy_error("Error on %s : %s\n", query, error); - } else { - unordered_set saved_hostnames; - saved_hostnames.insert(originating_server_hostname); - vector> new_servers; + vector> new_servers; - // Do a loop through the query results to save existing runtime server hostnames - for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { - SQLite3_row *r1 = *it; - string current_runtime_hostname = r1->fields[0]; - saved_hostnames.insert(current_runtime_hostname); + // Loop through discovered servers and process the ones we haven't saved yet + for (MYSQL_ROW row : discovered_servers) { + if ( !row ) { + proxy_warning("Received empty RDS topology record from %s.\n", originating_server_hostname.c_str()); + continue; } - - // Loop through discovered servers and process the ones we haven't saved yet - for (MYSQL_ROW row : discovered_servers) { - if ( !row ) { - proxy_warning("Received empty RDS topology record from %s.\n", originating_server_hostname.c_str()); - continue; - } - int current_discovered_read_only = 1; + int current_discovered_read_only = 1; VALGRIND_DISABLE_ERROR_REPORTING; - if (row[0]) { - if (!strcmp(row[0], "0") || !strcasecmp(row[0], "OFF")) - current_discovered_read_only = 0; - } + if (row[0]) { + if (!strcmp(row[0], "0") || !strcasecmp(row[0], "OFF")) + current_discovered_read_only = 0; + } VALGRIND_ENABLE_ERROR_REPORTING; - string current_discovered_hostname = row[2]; - string current_discovered_port_string = row[3]; - uint16_t current_discovered_port; - try { - current_discovered_port = (uint16_t)stoi(current_discovered_port_string); - } catch (...) { - proxy_error( - "Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n", - originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str() - ); - return; - } - string current_discovered_role, current_discovered_status, current_discovered_version; - if (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && num_fields >= 7) { - current_discovered_role = row[4]; - current_discovered_status = row[5]; - current_discovered_version = row[6]; - } - if (!current_discovered_version.empty() && !is_aws_rds_topology_version_supported(current_discovered_version)) { - proxy_warning("Discovered topology version (%s) is not compatible with supported version (%s)\n", - current_discovered_version.c_str(), SUPPORTED_AWS_RDS_TOPOLOGY_VERSION); - return; - } + string current_discovered_hostname = row[2]; + string current_discovered_port_string = row[3]; + uint16_t current_discovered_port; + try { + current_discovered_port = (uint16_t)stoi(current_discovered_port_string); + } catch (...) { + proxy_error( + "Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n", + originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str() + ); + return; + } + string current_discovered_role, current_discovered_status, current_discovered_version; + if (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && num_fields >= 7) { + current_discovered_role = row[4]; + current_discovered_status = row[5]; + current_discovered_version = row[6]; + } + if (!current_discovered_version.empty() && !is_aws_rds_topology_version_supported(current_discovered_version)) { + proxy_warning("Discovered topology version (%s) is not compatible with supported version (%s)\n", + current_discovered_version.c_str(), SUPPORTED_AWS_RDS_TOPOLOGY_VERSION); + return; + } - int64_t current_determined_weight = (int64_t)(-1L); // TODO: Add logic for selecting a different weight based on discovered role and status - int32_t use_ssl = 0; - if (mmsd->use_ssl) { - use_ssl = 1; - } - tuple discovered_server(current_discovered_hostname, current_discovered_port, reader_hostgroup, current_determined_weight, use_ssl); - if (!saved_hostnames.count(current_discovered_hostname)) { - // Server isn't in either hostgroup yet, adding as reader - proxy_info("%d: Adding new host '%s' to new server list in hostgroup [%ld].\n", __LINE__, std::get<0>(discovered_server).c_str(), std::get<2>(discovered_server)); - new_servers.push_back(discovered_server); - } + int64_t current_determined_weight = (int64_t)(-1L); // TODO: Add logic for selecting a different weight based on discovered role and status + int32_t use_ssl = 0; + if (mmsd->use_ssl) { + use_ssl = 1; } - // Add the new servers if any - if (!new_servers.empty() && (rds_topology_check_type != AWS_RDS_TOPOLOGY_CHECK || is_aws_rds_multi_az_db_cluster_topology(originating_server_hostname, new_servers))) { - MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers); + tuple discovered_server(current_discovered_hostname, current_discovered_port, reader_hostgroup, current_determined_weight, use_ssl); + if (!AWS_RDS_Topology_Server_Map.count(current_discovered_hostname)) { // TODO: update to also check for updated fields + // Server isn't in either hostgroup yet, adding as reader + proxy_info("%d: Adding new host '%s' to new server list in hostgroup [%ld].\n", __LINE__, std::get<0>(discovered_server).c_str(), std::get<2>(discovered_server)); + new_servers.push_back(discovered_server); + auto new_rds_topology_server = make_shared(current_discovered_hostname, current_discovered_port, reader_hostgroup); + AWS_RDS_Topology_Server_Map.insert({ current_discovered_hostname, new_rds_topology_server }); } + AWS_RDS_Topology_Server_Map[originating_server_hostname]->hosts_in_topology.insert(current_discovered_hostname); + // TODO: Add logic to remove hosts if they disappear from metadata + } + + // Add the new servers if any. The AWS_RDS_TOPOLOGY_CHECK is currently meant to only be used with RDS Multi-AZ DB clusters + if (!new_servers.empty() && (rds_topology_check_type != AWS_RDS_TOPOLOGY_CHECK || is_aws_rds_multi_az_db_cluster_topology(originating_server_hostname, new_servers))) { + MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers); } } @@ -6143,6 +6134,7 @@ bool AWS_Aurora_monitor_node::add_entry(AWS_Aurora_status_entry *ase) { return ret; // for now ignored } +AWS_RDS_topology_server::AWS_RDS_topology_server(const string &_str_a, int _p, int _whg) : addr(_str_a), port(_p), writer_hostgroup(_whg) {} typedef struct _host_def_t { char *host;