Merge pull request #4406 from anphucbui/v2.x

Add support for AWS RDS MySQL Multi-AZ Cluster auto-discovery
pull/4490/head
Javier Jaramago Fernández 2 years ago committed by GitHub
commit 5d7e5d9e00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1092,6 +1092,8 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();
void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>>& new_servers);
void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);

@ -56,6 +56,9 @@ struct cmp_str {
#define N_L_ASE 16
#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com"
#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology"
/*
Implementation of monitoring in AWS Aurora will be different than previous modules
@ -197,7 +200,8 @@ enum MySQL_Monitor_State_Data_Task_Type {
MON_GROUP_REPLICATION,
MON_REPLICATION_LAG,
MON_GALERA,
MON_AWS_AURORA
MON_AWS_AURORA,
MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY
};
enum class MySQL_Monitor_State_Data_Task_Result {
@ -229,6 +233,7 @@ public:
char *hostname;
int port;
int writer_hostgroup; // used only by group replication
int reader_hostgroup;
bool writer_is_also_reader; // used only by group replication
int max_transactions_behind; // used only by group replication
int max_transactions_behind_count; // used only by group replication
@ -442,6 +447,7 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();
void process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup);
private:
std::vector<table_def_t *> *tables_defs_monitor;
@ -553,7 +559,7 @@ private:
* Note: Calling init_async is mandatory before executing tasks asynchronously.
*/
void monitor_ping_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset);
void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check);
void monitor_replication_lag_async(SQLite3_result* resultset);
void monitor_group_replication_async();
void monitor_galera_async();

@ -304,6 +304,7 @@ struct p_th_gauge {
mysql_monitor_ping_interval,
mysql_monitor_ping_timeout,
mysql_monitor_ping_max_failures,
mysql_monitor_aws_rds_topology_discovery_interval,
mysql_monitor_read_only_interval,
mysql_monitor_read_only_timeout,
mysql_monitor_writer_is_also_reader,
@ -385,6 +386,8 @@ class MySQL_Threads_Handler
int monitor_ping_max_failures;
//! Monitor ping timeout. Unit: 'ms'.
int monitor_ping_timeout;
//! Monitor aws rds topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'.
int monitor_aws_rds_topology_discovery_interval;
//! Monitor read only timeout. Unit: 'ms'.
int monitor_read_only_interval;
//! Monitor read only timeout. Unit: 'ms'.

@ -902,6 +902,7 @@ __thread int mysql_thread___monitor_connect_timeout;
__thread int mysql_thread___monitor_ping_interval;
__thread int mysql_thread___monitor_ping_max_failures;
__thread int mysql_thread___monitor_ping_timeout;
__thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
__thread int mysql_thread___monitor_read_only_interval;
__thread int mysql_thread___monitor_read_only_timeout;
__thread int mysql_thread___monitor_read_only_max_timeout_count;
@ -1073,6 +1074,7 @@ extern __thread int mysql_thread___monitor_connect_timeout;
extern __thread int mysql_thread___monitor_ping_interval;
extern __thread int mysql_thread___monitor_ping_max_failures;
extern __thread int mysql_thread___monitor_ping_timeout;
extern __thread int mysql_thread___monitor_aws_rds_topology_discovery_interval;
extern __thread int mysql_thread___monitor_read_only_interval;
extern __thread int mysql_thread___monitor_read_only_timeout;
extern __thread int mysql_thread___monitor_read_only_max_timeout_count;

@ -2261,6 +2261,7 @@ bool MySQL_HostGroups_Manager::commit(
// fill Hostgroup_Manager_Mapping with latest records
update_hostgroup_manager_mappings();
ev_async_send(gtid_ev_loop, gtid_ev_async);
__sync_fetch_and_add(&status.servers_table_version,1);
@ -8583,3 +8584,75 @@ MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *h
}
return NULL;
}
/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server.
*/
void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>>& new_servers) {
int added_new_server;
wrlock();
// Add the discovered server with default values
for (tuple<string, int, int> s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);
srv_info_t srv_info { host.c_str(), port, "AWS RDS" };
srv_opts_t srv_opts { -1, -1, -1 };
added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts);
}
// If servers were added, perform necessary updates to internal structures
if (added_new_server > -1) {
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();
// Update the global checksums after 'mysql_servers' regeneration
{
unique_ptr<SQLite3_result> resultset { get_admin_runtime_mysql_servers(mydb) };
string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) };
save_runtime_mysql_servers(resultset.release());
// Update the runtime_mysql_servers checksum with the new checksum
uint64_t raw_checksum = this->runtime_mysql_servers ? this->runtime_mysql_servers->raw_checksum() : 0;
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = raw_checksum;
// This is required for preserving coherence in the checksums, otherwise they would be inconsistent with `commit` generated checksums
SpookyHash rep_hgs_hash {};
bool init = false;
uint64_t servers_v2_hash = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_V2];
if (servers_v2_hash) {
if (init == false) {
init = true;
rep_hgs_hash.Init(19, 3);
}
rep_hgs_hash.Update(&servers_v2_hash, sizeof(servers_v2_hash));
}
CUCFT1(
rep_hgs_hash, init, "mysql_replication_hostgroups", "writer_hostgroup",
table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]
);
proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str());
pthread_mutex_lock(&GloVars.checksum_mutex);
update_glovars_mysql_servers_checksum(mysrvs_checksum);
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
update_table_mysql_servers_for_monitor(false);
update_hostgroup_manager_mappings();
}
wrunlock();
}

@ -616,6 +616,12 @@ void MySQL_Monitor_State_Data::init_async() {
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY:
query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY;
async_state_machine_ = ASYNC_QUERY_START;
task_timeout_ = mysql_thread___monitor_read_only_timeout;
task_handler_ = &MySQL_Monitor_State_Data::read_only_handler;
break;
#else // TEST_READONLY
case MON_READ_ONLY:
case MON_INNODB_READ_ONLY:
@ -1623,6 +1629,8 @@ void * monitor_read_only_thread(void *arg) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only&@@global.innodb_read_only read_only");
} else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only");
} else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY);
} else { // default
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only");
}
@ -3282,6 +3290,65 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return ret;
}
/**
* @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'.
* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers'.
* @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found.
* @param discovered_servers A vector of servers discovered when querying the cluster's topology.
* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers.
*/
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup) {
char *error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result *runtime_mysql_servers = NULL;
char *query=(char *)"SELECT DISTINCT hostname FROM monitor_internal.mysql_servers ORDER BY hostname";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
monitordb->execute_statement(query, &error, &cols, &affected_rows, &runtime_mysql_servers);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
vector<tuple<string, int, int>> new_servers;
vector<string> saved_hostnames;
saved_hostnames.push_back(originating_server_hostname);
// Do an initial loop through the query results to save existing runtime server hostnames
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
string current_runtime_hostname = r1->fields[0];
saved_hostnames.push_back(current_runtime_hostname);
}
// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW s : discovered_servers) {
string current_discovered_hostname = s[2];
string current_discovered_port_string = s[3];
int current_discovered_port_int;
try {
current_discovered_port_int = stoi(s[3]);
} catch (...) {
proxy_error("Unable to parse the port value during topology discovery: [%s]. Terminating discovery early.", current_discovered_port_string);
return;
}
if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) {
tuple<string, int, int> new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup);
new_servers.push_back(new_server);
saved_hostnames.push_back(current_discovered_hostname);
}
}
// Add the new servers if any
if (!new_servers.empty()) {
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
}
}
}
void * MySQL_Monitor::monitor_read_only() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
@ -3295,14 +3362,17 @@ void * MySQL_Monitor::monitor_read_only() {
unsigned long long t1;
unsigned long long t2;
unsigned long long next_loop_at=0;
int topology_loop = 0;
int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval;
while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) {
bool do_discovery_check = false;
unsigned int glover;
char *error=NULL;
SQLite3_result *resultset=NULL;
// add support for SSL
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()";
t1=monotonic_time();
if (!GloMTH) return NULL; // quick exit during shutdown/restart
@ -3313,6 +3383,7 @@ void * MySQL_Monitor::monitor_read_only() {
next_loop_at=0;
}
if (t1 < next_loop_at) {
goto __sleep_monitor_read_only;
}
@ -3329,8 +3400,14 @@ void * MySQL_Monitor::monitor_read_only() {
goto __end_monitor_read_only_loop;
}
if (topology_loop >= topology_loop_max) {
do_discovery_check = true;
topology_loop = 0;
}
topology_loop += 1;
// resultset must be initialized before calling monitor_read_only_async
monitor_read_only_async(resultset);
monitor_read_only_async(resultset, do_discovery_check);
if (shutdown) return NULL;
__end_monitor_read_only_loop:
@ -7199,7 +7276,7 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector<MySQ
std::list<read_only_server_t> mysql_servers;
for (auto& mmsd : mmsds) {
string originating_server_hostname = mmsd->hostname;
const auto task_result = mmsd->get_task_result();
assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING);
@ -7269,6 +7346,38 @@ VALGRIND_ENABLE_ERROR_REPORTING;
}
rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) {
// Process the read_only field as above and store the first server
vector<MYSQL_ROW> discovered_servers;
for (k = 0; k < num_fields; k++) {
if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) {
j = k;
}
}
if (j > -1) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
discovered_servers.push_back(row);
VALGRIND_DISABLE_ERROR_REPORTING;
if (row[j]) {
if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF"))
read_only = 0;
}
VALGRIND_ENABLE_ERROR_REPORTING;
}
}
// Store the remaining servers
int num_rows = mysql_num_rows(mmsd->result);
for (int i = 1; i < num_rows; i++) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
discovered_servers.push_back(row);
}
// Process the discovered servers and add them to 'runtime_mysql_servers'
if (!discovered_servers.empty()) {
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup);
}
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
@ -7329,7 +7438,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return true;
}
void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) {
assert(resultset);
std::vector<std::unique_ptr<MySQL_Monitor_State_Data>> mmsds;
@ -7352,11 +7461,18 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
} else if (strcasecmp(r->fields[3], (char*)"read_only|innodb_read_only") == 0) {
task_type = MON_READ_ONLY__OR__INNODB_READ_ONLY;
}
// Change task type if it's time to do discovery check. Only for aws rds endpoints
string hostname = r->fields[0];
if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) {
task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY;
}
}
std::unique_ptr<MySQL_Monitor_State_Data> mmsd(
new MySQL_Monitor_State_Data(task_type, r->fields[0], atoi(r->fields[1]), atoi(r->fields[2])));
mmsd->reader_hostgroup = atoi(r->fields[4]); // set reader_hostgroup
mmsd->mondb = monitordb;
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());

@ -307,6 +307,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_ping_interval",
(char *)"monitor_ping_max_failures",
(char *)"monitor_ping_timeout",
(char *)"monitor_aws_rds_topology_discovery_interval",
(char *)"monitor_read_only_interval",
(char *)"monitor_read_only_timeout",
(char *)"monitor_read_only_max_timeout_count",
@ -823,6 +824,12 @@ th_metrics_map = std::make_tuple(
"Reached maximum ping attempts from monitor.",
metric_tags {}
),
std::make_tuple (
p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval,
"proxysql_mysql_monitor_aws_rds_topology_discovery_interval",
"How frequently a topology discovery is performed, e.g. a value of 500 means one topology discovery every 500 read-only checks ",
metric_tags {}
),
std::make_tuple (
p_th_gauge::mysql_monitor_read_only_interval,
"proxysql_mysql_monitor_read_only_interval_seconds",
@ -914,6 +921,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_ping_interval=8000;
variables.monitor_ping_max_failures=3;
variables.monitor_ping_timeout=1000;
variables.monitor_aws_rds_topology_discovery_interval=1000;
variables.monitor_read_only_interval=1000;
variables.monitor_read_only_timeout=800;
variables.monitor_read_only_max_timeout_count=3;
@ -2081,6 +2089,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600*1000, false);
VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000*1000, false);
VariablesPointers_int["monitor_aws_rds_topology_discovery_interval"] = make_tuple(&variables.monitor_aws_rds_topology_discovery_interval, 1, 100000, false);
VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7*24*3600*1000, false);
VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600*1000, false);
VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000*1000, false);
@ -4230,6 +4239,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval");
mysql_thread___monitor_ping_max_failures=GloMTH->get_variable_int((char *)"monitor_ping_max_failures");
mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout");
mysql_thread___monitor_aws_rds_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_aws_rds_topology_discovery_interval");
mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval");
mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout");
mysql_thread___monitor_read_only_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_read_only_max_timeout_count");
@ -5555,6 +5565,7 @@ void MySQL_Threads_Handler::p_update_metrics() {
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_enabled]->Set(this->variables.monitor_enabled);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_timeout]->Set(this->variables.monitor_ping_timeout/1000.0);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_max_failures]->Set(this->variables.monitor_ping_max_failures);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval]->Set(this->variables.monitor_aws_rds_topology_discovery_interval);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_interval]->Set(this->variables.monitor_read_only_interval/1000.0);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_timeout]->Set(this->variables.monitor_read_only_timeout/1000.0);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_writer_is_also_reader]->Set(this->variables.monitor_writer_is_also_reader);

@ -5,6 +5,7 @@
#include <functional>
#include <sstream>
#include <algorithm>
#include <climits>
#include <fcntl.h>
#include <poll.h>

Loading…
Cancel
Save