Modified process_discovered_topology to use an in-memory list of discovered hosts

pull/4992/head
Niels Vogell 9 months ago
parent 80e100a153
commit cee3185e2a

@ -131,6 +131,16 @@ class AWS_Aurora_monitor_node {
}
};
class AWS_RDS_topology_server {
public:
string addr;
int port;
unsigned int writer_hostgroup;
unordered_set<string> hosts_in_topology;
AWS_RDS_topology_server(const string &_str_a, int _p, int _whg);
};
typedef struct _Galera_status_entry_t {
unsigned long long start_time;
unsigned long long check_time;
@ -469,7 +479,6 @@ class MySQL_Monitor {
void add_topology_query_to_task(MySQL_Monitor_State_Data_Task_Type &task_type);
bool is_aws_rds_topology_version_supported(const string& version);
private:
std::vector<table_def_t *> *tables_defs_monitor;
std::vector<table_def_t *> *tables_defs_monitor_internal;
@ -491,6 +500,7 @@ class MySQL_Monitor {
SQLite3_result *Galera_Hosts_resultset;
std::map<std::string, AWS_Aurora_monitor_node *> AWS_Aurora_Hosts_Map;
SQLite3_result *AWS_Aurora_Hosts_resultset;
std::map<std::string, shared_ptr<AWS_RDS_topology_server>> AWS_RDS_Topology_Server_Map;
uint64_t AWS_Aurora_Hosts_resultset_checksum;
unsigned int num_threads;
unsigned int aux_threads;

@ -3472,15 +3472,16 @@ VALGRIND_ENABLE_ERROR_REPORTING;
* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers.
*/
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, const MySQL_Monitor_State_Data* mmsd, int num_fields) {
// Check if the query result matches the query task type: exactly 3 for Multi-AZ DB Clusters, even number for blue/green deployment
// Check if the query needs to be changed because it matches a blue/green deployment: exactly 3 entries for Multi-AZ DB Clusters, even number for blue/green deployment
if (rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() % 2 == 0) {
// With the AWS_RDS_TOPOLOGY_CHECK, we didn't get the role and status data, so we should retry on the next read_only check with the correct query
// With the AWS_RDS_TOPOLOGY_CHECK, we didn't get the role and status data, so we retry with the correct query on the next read_only check
rds_topology_check_type = AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK;
topology_loop = mysql_thread___monitor_aws_rds_topology_discovery_interval;
return;
} else if ((rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() != 3)
|| (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && discovered_servers.size() % 2 != 0)) {
// Query result matches neither a Multi_AZ DB Cluster nor a Blue/Green deployment
// TODO: Account for topology metadata towards the end of a blue/green deployment switchover (possibly odd number of entries)
rds_topology_check_type = AWS_RDS_TOPOLOGY_CHECK; // Set back to default rds_topology check
proxy_debug(PROXY_DEBUG_MONITOR, 7, "Got a query result for the rds_topology metadata table but it matches neither Multi-AZ DB Clusters, nor a blue/green deployment. Number of records: %d\n", discovered_servers.size());
return;
@ -3494,83 +3495,73 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s
uint32_t reader_hostgroup = (uint32_t)(mmsd->reader_hostgroup);
char *error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result *runtime_mysql_servers = NULL;
// Add the queried server or update its entry in the topology server map with the current timestamp
if (AWS_RDS_Topology_Server_Map.count(originating_server_hostname) == 0) {
auto queried_server = make_shared<AWS_RDS_topology_server>(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup);
char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers);
AWS_RDS_Topology_Server_Map.insert({ originating_server_hostname, queried_server });
}
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
unordered_set<string> saved_hostnames;
saved_hostnames.insert(originating_server_hostname);
vector<tuple<string, uint16_t, uint32_t, int64_t, int32_t>> new_servers;
vector<tuple<string, uint16_t, uint32_t, int64_t, int32_t>> new_servers;
// Do a loop through the query results to save existing runtime server hostnames
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
string current_runtime_hostname = r1->fields[0];
saved_hostnames.insert(current_runtime_hostname);
// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW row : discovered_servers) {
if ( !row ) {
proxy_warning("Received empty RDS topology record from %s.\n", originating_server_hostname.c_str());
continue;
}
// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW row : discovered_servers) {
if ( !row ) {
proxy_warning("Received empty RDS topology record from %s.\n", originating_server_hostname.c_str());
continue;
}
int current_discovered_read_only = 1;
int current_discovered_read_only = 1;
VALGRIND_DISABLE_ERROR_REPORTING;
if (row[0]) {
if (!strcmp(row[0], "0") || !strcasecmp(row[0], "OFF"))
current_discovered_read_only = 0;
}
if (row[0]) {
if (!strcmp(row[0], "0") || !strcasecmp(row[0], "OFF"))
current_discovered_read_only = 0;
}
VALGRIND_ENABLE_ERROR_REPORTING;
string current_discovered_hostname = row[2];
string current_discovered_port_string = row[3];
uint16_t current_discovered_port;
try {
current_discovered_port = (uint16_t)stoi(current_discovered_port_string);
} catch (...) {
proxy_error(
"Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n",
originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str()
);
return;
}
string current_discovered_role, current_discovered_status, current_discovered_version;
if (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && num_fields >= 7) {
current_discovered_role = row[4];
current_discovered_status = row[5];
current_discovered_version = row[6];
}
if (!current_discovered_version.empty() && !is_aws_rds_topology_version_supported(current_discovered_version)) {
proxy_warning("Discovered topology version (%s) is not compatible with supported version (%s)\n",
current_discovered_version.c_str(), SUPPORTED_AWS_RDS_TOPOLOGY_VERSION);
return;
}
string current_discovered_hostname = row[2];
string current_discovered_port_string = row[3];
uint16_t current_discovered_port;
try {
current_discovered_port = (uint16_t)stoi(current_discovered_port_string);
} catch (...) {
proxy_error(
"Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n",
originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str()
);
return;
}
string current_discovered_role, current_discovered_status, current_discovered_version;
if (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && num_fields >= 7) {
current_discovered_role = row[4];
current_discovered_status = row[5];
current_discovered_version = row[6];
}
if (!current_discovered_version.empty() && !is_aws_rds_topology_version_supported(current_discovered_version)) {
proxy_warning("Discovered topology version (%s) is not compatible with supported version (%s)\n",
current_discovered_version.c_str(), SUPPORTED_AWS_RDS_TOPOLOGY_VERSION);
return;
}
int64_t current_determined_weight = (int64_t)(-1L); // TODO: Add logic for selecting a different weight based on discovered role and status
int32_t use_ssl = 0;
if (mmsd->use_ssl) {
use_ssl = 1;
}
tuple<string, uint16_t, uint32_t, int64_t, int32_t> discovered_server(current_discovered_hostname, current_discovered_port, reader_hostgroup, current_determined_weight, use_ssl);
if (!saved_hostnames.count(current_discovered_hostname)) {
// Server isn't in either hostgroup yet, adding as reader
proxy_info("%d: Adding new host '%s' to new server list in hostgroup [%ld].\n", __LINE__, std::get<0>(discovered_server).c_str(), std::get<2>(discovered_server));
new_servers.push_back(discovered_server);
}
int64_t current_determined_weight = (int64_t)(-1L); // TODO: Add logic for selecting a different weight based on discovered role and status
int32_t use_ssl = 0;
if (mmsd->use_ssl) {
use_ssl = 1;
}
// Add the new servers if any
if (!new_servers.empty() && (rds_topology_check_type != AWS_RDS_TOPOLOGY_CHECK || is_aws_rds_multi_az_db_cluster_topology(originating_server_hostname, new_servers))) {
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
tuple<string, uint16_t, uint32_t, int64_t, int32_t> discovered_server(current_discovered_hostname, current_discovered_port, reader_hostgroup, current_determined_weight, use_ssl);
if (!AWS_RDS_Topology_Server_Map.count(current_discovered_hostname)) { // TODO: update to also check for updated fields
// Server isn't in either hostgroup yet, adding as reader
proxy_info("%d: Adding new host '%s' to new server list in hostgroup [%ld].\n", __LINE__, std::get<0>(discovered_server).c_str(), std::get<2>(discovered_server));
new_servers.push_back(discovered_server);
auto new_rds_topology_server = make_shared<AWS_RDS_topology_server>(current_discovered_hostname, current_discovered_port, reader_hostgroup);
AWS_RDS_Topology_Server_Map.insert({ current_discovered_hostname, new_rds_topology_server });
}
AWS_RDS_Topology_Server_Map[originating_server_hostname]->hosts_in_topology.insert(current_discovered_hostname);
// TODO: Add logic to remove hosts if they disappear from metadata
}
// Add the new servers if any. The AWS_RDS_TOPOLOGY_CHECK is currently meant to only be used with RDS Multi-AZ DB clusters
if (!new_servers.empty() && (rds_topology_check_type != AWS_RDS_TOPOLOGY_CHECK || is_aws_rds_multi_az_db_cluster_topology(originating_server_hostname, new_servers))) {
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
}
}
@ -6143,6 +6134,7 @@ bool AWS_Aurora_monitor_node::add_entry(AWS_Aurora_status_entry *ase) {
return ret; // for now ignored
}
AWS_RDS_topology_server::AWS_RDS_topology_server(const string &_str_a, int _p, int _whg) : addr(_str_a), port(_p), writer_hostgroup(_whg) {}
typedef struct _host_def_t {
char *host;

Loading…
Cancel
Save