Niels Arke 2 weeks ago committed by GitHub
commit be52229891
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1087,7 +1087,7 @@ class MySQL_HostGroups_Manager : public Base_HostGroups_Manager<MyHGC> {
void set_Readyset_status(char *hostname, int port, enum MySerStatus status);
unsigned long long Get_Memory_Stats();
void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector<tuple<string, int, int>>& new_servers);
void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector<tuple<string, uint16_t, uint32_t, int64_t, int32_t>>& new_servers);
void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);

@ -59,8 +59,12 @@ struct cmp_str {
#define N_L_ASE 16
#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com"
#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology"
#define QUERY_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology"
#define QUERY_INNODB_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY "SELECT @@global.innodb_read_only read_only, id, endpoint, port from mysql.rds_topology"
#define QUERY_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY "SELECT @@global.read_only AS read_only, id, endpoint, port, role, status, version FROM mysql.rds_topology"
#define QUERY_INNODB_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY "SELECT @@global.innodb_read_only AS read_only, id, endpoint, port, role, status, version FROM mysql.rds_topology"
#define SUPPORTED_AWS_RDS_TOPOLOGY_VERSION "1.0"
/*
Implementation of monitoring in AWS Aurora will be different than previous modules
@ -127,6 +131,16 @@ class AWS_Aurora_monitor_node {
}
};
class AWS_RDS_topology_server {
public:
string addr;
int port;
unsigned int writer_hostgroup;
unordered_set<string> hosts_in_topology;
AWS_RDS_topology_server(const string &_str_a, int _p, int _whg);
};
typedef struct _Galera_status_entry_t {
unsigned long long start_time;
unsigned long long check_time;
@ -203,7 +217,16 @@ enum MySQL_Monitor_State_Data_Task_Type {
MON_REPLICATION_LAG,
MON_GALERA,
MON_AWS_AURORA,
MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY
MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY,
MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY,
MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY,
MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY,
};
enum MySQL_Monitor_Aws_Metadata_Check {
AWS_RDS_TOPOLOGY_CHECK,
AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK,
NONE
};
enum class MySQL_Monitor_State_Data_Task_Result {
@ -451,7 +474,6 @@ struct DNS_Resolve_Data {
unsigned int refresh_intv = 0;
};
class MySQL_Monitor {
public:
static std::string dns_lookup(const std::string& hostname, bool return_hostname_if_lookup_fails = true, size_t* ip_count = NULL);
@ -459,8 +481,12 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();
void process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup);
bool is_aws_rds_multi_az_db_cluster_topology(const std::vector<MYSQL_ROW>& discovered_servers);
void process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, const MySQL_Monitor_State_Data* mmsd, int num_fields);
bool is_aws_rds_multi_az_db_cluster_topology(const string& originating_server_hostname, const vector<tuple<string, uint16_t, uint32_t, int64_t, int32_t>>& discovered_servers);
bool is_aws_rds_topology_query_task(const MySQL_Monitor_State_Data_Task_Type& task_type);
bool mysql_row_matches_query_task(const unordered_set<string> &field_names, const MySQL_Monitor_State_Data_Task_Type &task_type);
void add_topology_query_to_task(MySQL_Monitor_State_Data_Task_Type &task_type);
bool is_aws_rds_topology_version_supported(const string& version);
private:
std::vector<table_def_t *> *tables_defs_monitor;
@ -483,6 +509,7 @@ class MySQL_Monitor {
SQLite3_result *Galera_Hosts_resultset;
std::map<std::string, AWS_Aurora_monitor_node *> AWS_Aurora_Hosts_Map;
SQLite3_result *AWS_Aurora_Hosts_resultset;
std::map<std::string, shared_ptr<AWS_RDS_topology_server>> AWS_RDS_Topology_Server_Map;
uint64_t AWS_Aurora_Hosts_resultset_checksum;
unsigned int num_threads;
unsigned int aux_threads;
@ -510,6 +537,8 @@ class MySQL_Monitor {
bool shutdown;
pthread_mutex_t mon_en_mutex;
bool monitor_enabled;
MySQL_Monitor_Aws_Metadata_Check rds_topology_check_type = MySQL_Monitor_Aws_Metadata_Check::AWS_RDS_TOPOLOGY_CHECK;
int topology_loop = 0;
SQLite3DB *admindb; // internal database
SQLite3DB *monitordb; // internal database
SQLite3DB *monitor_internal_db; // internal database
@ -572,7 +601,7 @@ private:
* Note: Calling init_async is mandatory before executing tasks asynchronously.
*/
void monitor_ping_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check);
void monitor_read_only_async(SQLite3_result* resultset);
void monitor_replication_lag_async(SQLite3_result* resultset);
void monitor_group_replication_async();
void monitor_galera_async();

@ -7026,7 +7026,7 @@ MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *h
* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server.
*/
void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(
const vector<tuple<string, int, int>>& new_servers
const vector<tuple<string, uint16_t, uint32_t, int64_t, int32_t>>& new_servers
) {
int added_new_server = -1;
@ -7034,15 +7034,20 @@ void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_repli
wrlock();
// Add the discovered server with default values
for (const tuple<string, int, int>& s : new_servers) {
for (const tuple<string, uint16_t, uint32_t, int64_t, int32_t>& s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);
srv_info_t srv_info { host.c_str(), port, "AWS RDS" };
srv_opts_t srv_opts { -1, -1, -1 };
uint32_t hostgroup_id = std::get<2>(s);
int64_t weight = std::get<3>(s);
int32_t use_ssl = std::get<4>(s);
srv_info_t srv_info { host.c_str(), (uint16_t)port, "AWS RDS" };
srv_opts_t srv_opts { weight, -1, use_ssl };
added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts);
int res = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts);
if (added_new_server < 0) {
added_new_server = res;
}
}
// If servers were added, perform necessary updates to internal structures

@ -671,7 +671,25 @@ void MySQL_Monitor_State_Data::init_async() {
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY:
query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY;
query_ = QUERY_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY;
async_state_machine_ = ASYNC_QUERY_START;
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
case MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY:
query_ = QUERY_INNODB_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY;
async_state_machine_ = ASYNC_QUERY_START;
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
case MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY:
query_ = QUERY_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY;
async_state_machine_ = ASYNC_QUERY_START;
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
case MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY:
query_ = QUERY_INNODB_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY;
async_state_machine_ = ASYNC_QUERY_START;
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
@ -1676,7 +1694,13 @@ void * monitor_read_only_thread(const std::vector<MySQL_Monitor_State_Data*>& da
} else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only");
} else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY);
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY);
} else if (mmsd->get_task_type() == MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_INNODB_READ_ONLY_AND_AWS_RDS_TOPOLOGY_DISCOVERY);
} else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY);
} else if (mmsd->get_task_type() == MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_INNODB_READ_ONLY_AND_AWS_BLUE_GREEN_TOPOLOGY_DISCOVERY);
} else { // default
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only");
}
@ -1770,7 +1794,7 @@ __exit_monitor_read_only_thread:
int j=-1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
if (fields && num_fields == 1) {
if (fields && num_fields >= 1) {
for(k = 0; k < num_fields; k++) {
if (strcmp((char *)"read_only", (char *)fields[k].name)==0) {
j=k;
@ -3361,58 +3385,97 @@ VALGRIND_ENABLE_ERROR_REPORTING;
* @param discovered_servers A vector of servers discovered when querying the cluster's topology.
* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers.
*/
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup) {
char *error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result *runtime_mysql_servers = NULL;
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, const MySQL_Monitor_State_Data* mmsd, int num_fields) {
// Check if the query needs to be changed because it matches a blue/green deployment: exactly 3 entries for Multi-AZ DB Clusters, even number for blue/green deployment
if (rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() % 2 == 0) {
// With the AWS_RDS_TOPOLOGY_CHECK, we didn't get the role and status data, so we retry with the correct query on the next read_only check
rds_topology_check_type = AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK;
topology_loop = mysql_thread___monitor_aws_rds_topology_discovery_interval;
return;
} else if ((rds_topology_check_type == AWS_RDS_TOPOLOGY_CHECK && discovered_servers.size() != 3)
|| (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && discovered_servers.size() % 2 != 0)) {
// Query result matches neither a Multi_AZ DB Cluster nor a Blue/Green deployment
// TODO: Account for topology metadata towards the end of a blue/green deployment switchover (possibly odd number of entries)
rds_topology_check_type = AWS_RDS_TOPOLOGY_CHECK; // Set back to default rds_topology check
proxy_debug(PROXY_DEBUG_MONITOR, 7, "Got a query result for the rds_topology metadata table but it matches neither Multi-AZ DB Clusters, nor a blue/green deployment. Number of records: %d\n", discovered_servers.size());
return;
}
char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
vector<tuple<string, int, int>> new_servers;
vector<string> saved_hostnames;
saved_hostnames.push_back(originating_server_hostname);
if (num_fields < 4) {
proxy_error("Received row with too few fields. num_field = %d\n", num_fields);
return;
}
// Do an initial loop through the query results to save existing runtime server hostnames
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
string current_runtime_hostname = r1->fields[0];
uint32_t reader_hostgroup = (uint32_t)(mmsd->reader_hostgroup);
saved_hostnames.push_back(current_runtime_hostname);
}
// Add the queried server or update its entry in the topology server map with the current timestamp
if (AWS_RDS_Topology_Server_Map.count(originating_server_hostname) == 0) {
auto queried_server = make_shared<AWS_RDS_topology_server>(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup);
// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW s : discovered_servers) {
string current_discovered_hostname = s[2];
string current_discovered_port_string = s[3];
int current_discovered_port_int;
AWS_RDS_Topology_Server_Map.insert({ originating_server_hostname, queried_server });
}
try {
current_discovered_port_int = stoi(s[3]);
} catch (...) {
proxy_error(
"Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n",
originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str()
);
return;
}
vector<tuple<string, uint16_t, uint32_t, int64_t, int32_t>> new_servers;
if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) {
tuple<string, int, int> new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup);
new_servers.push_back(new_server);
saved_hostnames.push_back(current_discovered_hostname);
}
// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW row : discovered_servers) {
if ( !row ) {
proxy_warning("Received empty RDS topology record from %s.\n", originating_server_hostname.c_str());
continue;
}
int current_discovered_read_only = 1;
VALGRIND_DISABLE_ERROR_REPORTING;
if (row[0]) {
if (!strcmp(row[0], "0") || !strcasecmp(row[0], "OFF"))
current_discovered_read_only = 0;
}
VALGRIND_ENABLE_ERROR_REPORTING;
string current_discovered_hostname = row[2];
string current_discovered_port_string = row[3];
uint16_t current_discovered_port;
try {
current_discovered_port = (uint16_t)stoi(current_discovered_port_string);
} catch (...) {
proxy_error(
"Unable to parse port value coming from '%s' during topology discovery ('%s':%s). Terminating discovery early.\n",
originating_server_hostname.c_str(), current_discovered_hostname.c_str(), current_discovered_port_string.c_str()
);
return;
}
string current_discovered_role, current_discovered_status, current_discovered_version;
if (rds_topology_check_type == AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK && num_fields >= 7) {
current_discovered_role = row[4];
current_discovered_status = row[5];
current_discovered_version = row[6];
}
if (!current_discovered_version.empty() && !is_aws_rds_topology_version_supported(current_discovered_version)) {
proxy_warning("Discovered topology version (%s) is not compatible with supported version (%s)\n",
current_discovered_version.c_str(), SUPPORTED_AWS_RDS_TOPOLOGY_VERSION);
return;
}
int64_t current_determined_weight = (int64_t)(-1L); // TODO: Add logic for selecting a different weight based on discovered role and status
int32_t use_ssl = 0;
if (mmsd->use_ssl) {
use_ssl = 1;
}
// Add the new servers if any
if (!new_servers.empty()) {
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
tuple<string, uint16_t, uint32_t, int64_t, int32_t> discovered_server(current_discovered_hostname, current_discovered_port, reader_hostgroup, current_determined_weight, use_ssl);
if (!AWS_RDS_Topology_Server_Map.count(current_discovered_hostname)) { // TODO: update to also check for updated fields
// Server isn't in either hostgroup yet, adding as reader
proxy_info("%d: Adding new host '%s' to new server list in hostgroup [%ld].\n", __LINE__, std::get<0>(discovered_server).c_str(), std::get<2>(discovered_server));
new_servers.push_back(discovered_server);
auto new_rds_topology_server = make_shared<AWS_RDS_topology_server>(current_discovered_hostname, current_discovered_port, reader_hostgroup);
AWS_RDS_Topology_Server_Map.insert({ current_discovered_hostname, new_rds_topology_server });
}
AWS_RDS_Topology_Server_Map[originating_server_hostname]->hosts_in_topology.insert(current_discovered_hostname);
// TODO: Add logic to remove hosts if they disappear from metadata
}
// Add the new servers if any. The AWS_RDS_TOPOLOGY_CHECK is currently meant to only be used with RDS Multi-AZ DB clusters
if (!new_servers.empty() && (rds_topology_check_type != AWS_RDS_TOPOLOGY_CHECK || is_aws_rds_multi_az_db_cluster_topology(originating_server_hostname, new_servers))) {
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
}
}
@ -3422,27 +3485,65 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s
* @param discovered_servers A vector of servers discovered when querying the cluster's topology.
* @return Returns 'true' if all conditions are met and 'false' otherwise.
*/
bool MySQL_Monitor::is_aws_rds_multi_az_db_cluster_topology(const std::vector<MYSQL_ROW>& discovered_servers) {
if (discovered_servers.size() != 3) {
bool MySQL_Monitor::is_aws_rds_multi_az_db_cluster_topology(const string& originating_servername, const std::vector<tuple<string, uint16_t, uint32_t, int64_t, int32_t>>& discovered_servers) {
if (discovered_servers.size() != 2) {
return false;
}
const std::vector<std::string> instance_names = {"-instance-1", "-instance-2", "-instance-3"};
int identified_hosts = 0;
for (const std::string& instance_str : instance_names) {
for (MYSQL_ROW server : discovered_servers) {
if (server[2] == NULL || (server[2][0] == '\0')) {
continue;
}
vector<string> hostnames(1, originating_servername);
for (tuple<string, uint16_t, uint32_t, int64_t, int32_t> server : discovered_servers) {
string hostname = std::get<0>(server);
if (hostname.empty()) {
continue;
}
hostnames.push_back(hostname);
}
std::string current_discovered_hostname = server[2];
if (current_discovered_hostname.find(instance_str) != std::string::npos) {
++identified_hosts;
break;
}
const unordered_set<string> expected_instance_names = {"-instance-1", "-instance-2", "-instance-3"};
unordered_set<string> discovered_instance_names;
for( string hostname : hostnames) {
size_t domain_start = hostname.find('.');
if (domain_start != string::npos && domain_start > 11) {
string prospect_instance_suffix = hostname.substr(domain_start - 11, 11);
discovered_instance_names.insert(prospect_instance_suffix);
}
}
return (identified_hosts == 3);
return (expected_instance_names == discovered_instance_names);
}
bool MySQL_Monitor::is_aws_rds_topology_query_task(const MySQL_Monitor_State_Data_Task_Type &task_type) {
return task_type == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY
|| task_type == MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY
|| task_type == MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY
|| task_type == MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY;
}
bool MySQL_Monitor::mysql_row_matches_query_task(const unordered_set<string> &field_names, const MySQL_Monitor_State_Data_Task_Type &task_type)
{
if (task_type == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY || task_type == MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
return field_names == unordered_set<string>({ "read_only", "id", "endpoint", "port"});
}
return field_names == unordered_set<string>({"read_only", "id", "endpoint", "port", "role", "status", "version"});
}
void MySQL_Monitor::add_topology_query_to_task(MySQL_Monitor_State_Data_Task_Type &task_type)
{
switch (rds_topology_check_type) {
case AWS_RDS_TOPOLOGY_CHECK:
if (task_type == MON_READ_ONLY)
task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY;
else if (task_type == MON_INNODB_READ_ONLY)
task_type = MON_INNODB_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY;
break;
case AWS_RDS_BLUE_GREEN_DEPLOYMENT_STATE_CHECK:
if (task_type == MON_READ_ONLY)
task_type = MON_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY;
else if (task_type == MON_INNODB_READ_ONLY)
task_type = MON_INNODB_READ_ONLY__AND__AWS_RDS_BLUE_GREEN_TOPOLOGY_DISCOVERY;
break;
default:
proxy_warning("Attempting to add rds_topology query to unsupported read_only check.");
}
}
void * MySQL_Monitor::monitor_read_only() {
@ -3459,12 +3560,9 @@ void * MySQL_Monitor::monitor_read_only() {
unsigned long long t1;
unsigned long long t2;
unsigned long long next_loop_at=0;
int topology_loop = 0;
while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) {
int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval;
bool do_discovery_check = false;
unsigned int glover;
char *error=NULL;
SQLite3_result *resultset=NULL;
@ -3480,7 +3578,6 @@ void * MySQL_Monitor::monitor_read_only() {
next_loop_at=0;
}
if (t1 < next_loop_at) {
goto __sleep_monitor_read_only;
}
@ -3499,14 +3596,19 @@ void * MySQL_Monitor::monitor_read_only() {
if (topology_loop_max > 0) { // if the discovery interval is set to zero, do not query for the topology
if (topology_loop >= topology_loop_max) {
do_discovery_check = true;
if (rds_topology_check_type == NONE) {
proxy_info("Setting topology check to aws_rds_topology_check\n");
rds_topology_check_type = AWS_RDS_TOPOLOGY_CHECK;
}
topology_loop = 0;
}
topology_loop += 1;
} else {
rds_topology_check_type = NONE;
}
// resultset must be initialized before calling monitor_read_only_async
monitor_read_only_async(resultset, do_discovery_check);
monitor_read_only_async(resultset);
if (shutdown) return NULL;
__end_monitor_read_only_loop:
@ -5920,6 +6022,7 @@ bool AWS_Aurora_monitor_node::add_entry(AWS_Aurora_status_entry *ase) {
return ret; // for now ignored
}
AWS_RDS_topology_server::AWS_RDS_topology_server(const string &_str_a, int _p, int _whg) : addr(_str_a), port(_p), writer_hostgroup(_whg) {}
typedef struct _host_def_t {
char *host;
@ -7546,21 +7649,25 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector<MySQ
int num_fields = 0;
int k = 0;
MYSQL_FIELD* fields = mysql_fetch_fields(mmsd->result);
int j = -1;
int i_ro = -1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
if (fields && num_fields == 1) {
unordered_set<string> field_names;
MYSQL_ROW row;
if (fields && num_fields >= 1) {
for (k = 0; k < num_fields; k++) {
if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) {
j = k;
i_ro = k;
}
// For multiple task types we add extra fields. To make sure all the expected fields are there, we store them
field_names.insert(string(fields[k].name));
}
if (j > -1) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (i_ro > -1) {
row = mysql_fetch_row(mmsd->result);
if (row) {
VALGRIND_DISABLE_ERROR_REPORTING;
if (row[j]) {
if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF"))
if (row[i_ro]) {
if (!strcmp(row[i_ro], "0") || !strcasecmp(row[i_ro], "OFF"))
read_only = 0;
}
VALGRIND_ENABLE_ERROR_REPORTING;
@ -7568,37 +7675,22 @@ VALGRIND_ENABLE_ERROR_REPORTING;
}
rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
// Process the read_only field as above and store the first server
vector<MYSQL_ROW> discovered_servers;
for (k = 0; k < num_fields; k++) {
if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) {
j = k;
}
}
if (j > -1) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
if (is_aws_rds_topology_query_task(mmsd->get_task_type()) && mysql_row_matches_query_task(field_names, mmsd->get_task_type())) {
// Process the read_only field as above and store the first server
vector<MYSQL_ROW> discovered_servers;
discovered_servers.push_back(row);
// Store the remaining servers
int num_rows = mysql_num_rows(mmsd->result);
for (int i = 1; i < num_rows; i++) {
row = mysql_fetch_row(mmsd->result);
discovered_servers.push_back(row);
VALGRIND_DISABLE_ERROR_REPORTING;
if (row[j]) {
if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF"))
read_only = 0;
}
VALGRIND_ENABLE_ERROR_REPORTING;
}
}
// Store the remaining servers
int num_rows = mysql_num_rows(mmsd->result);
for (int i = 1; i < num_rows; i++) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
discovered_servers.push_back(row);
}
// Process the discovered servers and add them to 'runtime_mysql_servers' (process only for AWS RDS Multi-AZ DB Clusters)
if (!discovered_servers.empty() && is_aws_rds_multi_az_db_cluster_topology(discovered_servers)) {
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup);
// Process the discovered servers and add them to 'runtime_mysql_servers'
if (!discovered_servers.empty()) {
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd, num_fields);
}
}
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
@ -7659,7 +7751,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return true;
}
void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) {
void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
assert(resultset);
std::vector<std::unique_ptr<MySQL_Monitor_State_Data>> mmsds;
@ -7685,8 +7777,8 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_d
// Change task type if it's time to do discovery check. Only for aws rds endpoints
string hostname = r->fields[0];
if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) {
task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY;
if (hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos && rds_topology_check_type != NONE) {
add_topology_query_to_task(task_type);
}
}
@ -8530,3 +8622,8 @@ void MySQL_Monitor::monitor_galera_async() {
template class WorkItem<MySQL_Monitor_State_Data>;
template class WorkItem<DNS_Resolve_Data>;
bool MySQL_Monitor::is_aws_rds_topology_version_supported(const string& version) {
// TODO: implement better check that considers minor and major versions
return version == SUPPORTED_AWS_RDS_TOPOLOGY_VERSION;
}

Loading…
Cancel
Save