Add support for AWS RDS MySQL Multi-AZ Cluster auto-discovery

pull/4406/head
anphucbui 2 years ago
parent e6626673aa
commit 5a3037785f

@ -814,6 +814,24 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();
struct serverDetails {
long int hostgroup_id;
string hostname;
uint16_t port;
uint16_t gtid_port;
string status;
int64_t weight;
unsigned int compression;
int64_t max_connections;
unsigned int max_replication_lag;
int32_t use_ssl;
unsigned int max_latency_ms;
string comment;
};
int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<string> servers_to_add, unordered_map<string, MySQL_HostGroups_Manager::serverDetails> hostname_values_mapping);
void rebuild_hostname_hostgroup_mapping();
void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);

@ -56,6 +56,8 @@ struct cmp_str {
#define N_L_ASE 16
#define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com"
/*
Implementation of monitoring in AWS Aurora will be different than previous modules
@ -418,6 +420,8 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();
vector<MYSQL_ROW> discover_topology(const char* hostname, int port);
void discover_topology_and_add_to_mysql_servers();
private:
std::vector<table_def_t *> *tables_defs_monitor;

@ -305,6 +305,7 @@ struct p_th_gauge {
mysql_monitor_ping_interval,
mysql_monitor_ping_timeout,
mysql_monitor_ping_max_failures,
mysql_monitor_topology_discovery_interval,
mysql_monitor_read_only_interval,
mysql_monitor_read_only_timeout,
mysql_monitor_writer_is_also_reader,
@ -386,6 +387,8 @@ class MySQL_Threads_Handler
int monitor_ping_max_failures;
//! Monitor ping timeout. Unit: 'ms'.
int monitor_ping_timeout;
//! Monitor topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'.
int monitor_topology_discovery_interval;
//! Monitor read only timeout. Unit: 'ms'.
int monitor_read_only_interval;
//! Monitor read only timeout. Unit: 'ms'.

@ -899,6 +899,7 @@ __thread int mysql_thread___monitor_connect_timeout;
__thread int mysql_thread___monitor_ping_interval;
__thread int mysql_thread___monitor_ping_max_failures;
__thread int mysql_thread___monitor_ping_timeout;
__thread int mysql_thread___monitor_topology_discovery_interval;
__thread int mysql_thread___monitor_read_only_interval;
__thread int mysql_thread___monitor_read_only_timeout;
__thread int mysql_thread___monitor_read_only_max_timeout_count;
@ -1068,6 +1069,7 @@ extern __thread int mysql_thread___monitor_connect_timeout;
extern __thread int mysql_thread___monitor_ping_interval;
extern __thread int mysql_thread___monitor_ping_max_failures;
extern __thread int mysql_thread___monitor_ping_timeout;
extern __thread int mysql_thread___monitor_topology_discovery_interval;
extern __thread int mysql_thread___monitor_read_only_interval;
extern __thread int mysql_thread___monitor_read_only_timeout;
extern __thread int mysql_thread___monitor_read_only_max_timeout_count;

@ -218,4 +218,6 @@ void close_all_non_term_fd(std::vector<int> excludeFDs);
*/
std::pair<int,const char*> get_dollar_quote_error(const char* version);
long parseLong(const char* s);
#endif

@ -1969,65 +1969,7 @@ bool MySQL_HostGroups_Manager::commit(
if (hgsm_mysql_servers_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] ||
hgsm_mysql_replication_hostgroups_checksum != table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS])
{
proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n",
hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS],
hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]);
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \
UNION \
SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \
ORDER BY hostname, port";
mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
hostgroup_server_mapping.clear();
if (resultset && resultset->rows_count) {
std::string fetched_server_id;
HostGroup_Server_Mapping* fetched_server_mapping = NULL;
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1];
if (fetched_server_mapping == NULL || server_id != fetched_server_id) {
auto itr = hostgroup_server_mapping.find(server_id);
if (itr == hostgroup_server_mapping.end()) {
std::unique_ptr<HostGroup_Server_Mapping> server_mapping(new HostGroup_Server_Mapping(this));
fetched_server_mapping = server_mapping.get();
hostgroup_server_mapping.insert( std::pair<std::string,std::unique_ptr<MySQL_HostGroups_Manager::HostGroup_Server_Mapping>> {
server_id, std::move(server_mapping)
} );
}
else {
fetched_server_mapping = itr->second.get();
}
fetched_server_id = server_id;
}
HostGroup_Server_Mapping::Node node;
//node.server_status = static_cast<MySerStatus>(atoi(r->fields[3]));
node.reader_hostgroup_id = atoi(r->fields[4]);
node.writer_hostgroup_id = atoi(r->fields[5]);
node.srv = reinterpret_cast<MySrvC*>(atoll(r->fields[6]));
HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER;
fetched_server_mapping->add(type, node);
}
}
delete resultset;
hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS];
hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS];
rebuild_hostname_hostgroup_mapping();
}
ev_async_send(gtid_ev_loop, gtid_ev_async);
@ -7977,3 +7919,120 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv)
srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD;
srv->ConnectionsFree->drop_all_connections();
}
/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'servers_to_add' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param servers_to_add A vector containing the strings representing the hostnames of servers to add to 'mysql_servers'.
* @param hostname_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct.
*
* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success.
*/
int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<string> servers_to_add, unordered_map<string, MySQL_HostGroups_Manager::serverDetails> hostname_values_mapping) {
int exit_code = EXIT_SUCCESS;
wrlock();
try {
for (string host : servers_to_add) {
long int hostgroup_id = hostname_values_mapping[host].hostgroup_id;
uint16_t port = hostname_values_mapping[host].port;
uint16_t gtid_port = hostname_values_mapping[host].gtid_port;
int64_t weight = hostname_values_mapping[host].weight;
unsigned int compression = hostname_values_mapping[host].compression;
int64_t max_connections = hostname_values_mapping[host].max_connections;
unsigned int max_replication_lag = hostname_values_mapping[host].max_replication_lag;
int32_t use_ssl = hostname_values_mapping[host].use_ssl;
unsigned int max_latency_ms = hostname_values_mapping[host].max_latency_ms;
MySrvC* mysrvc = new MySrvC(
const_cast<char*>(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE,
compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, const_cast<char*>("Discovered endpoint")
);
add(mysrvc, hostgroup_id);
proxy_info(
"Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n",
host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl
);
}
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();
update_table_mysql_servers_for_monitor(false);
rebuild_hostname_hostgroup_mapping();
} catch (...) {
exit_code = EXIT_FAILURE;
}
wrunlock();
return exit_code;
}
/**
* @brief Rebuilds the 'hostname_hostgroup_mapping'
* @details Rebuilds the internal 'hostname_hostgroup_mapping' assuming new data has been entered
* and calculates new checksums for 'mysql_servers' and 'mysql_replication_hostgroups'.
*/
void MySQL_HostGroups_Manager::rebuild_hostname_hostgroup_mapping() {
proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n",
hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS],
hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]);
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \
UNION \
SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \
ORDER BY hostname, port";
mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
hostgroup_server_mapping.clear();
if (resultset && resultset->rows_count) {
std::string fetched_server_id;
HostGroup_Server_Mapping* fetched_server_mapping = NULL;
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); it++) {
SQLite3_row *r = *it;
const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1];
if (fetched_server_mapping == NULL || server_id != fetched_server_id) {
auto itr = hostgroup_server_mapping.find(server_id);
if (itr == hostgroup_server_mapping.end()) {
std::unique_ptr<HostGroup_Server_Mapping> server_mapping(new HostGroup_Server_Mapping(this));
fetched_server_mapping = server_mapping.get();
hostgroup_server_mapping.insert( std::pair<std::string,std::unique_ptr<MySQL_HostGroups_Manager::HostGroup_Server_Mapping>> {
server_id, std::move(server_mapping)
} );
} else {
fetched_server_mapping = itr->second.get();
}
fetched_server_id = server_id;
}
HostGroup_Server_Mapping::Node node;
node.reader_hostgroup_id = atoi(r->fields[4]);
node.writer_hostgroup_id = atoi(r->fields[5]);
node.srv = reinterpret_cast<MySrvC*>(atoll(r->fields[6]));
HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER;
fetched_server_mapping->add(type, node);
}
}
delete resultset;
hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS];
hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS];
}

@ -3256,6 +3256,240 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return ret;
}
/**
* @brief Discovers the topology of a server.
* @details Discovers the topology of the server specified by hostname and port.
* The monitor user must explicitly be granted permissions to view 'mysql.rds_topology'.
* @param hostname Hostname of the server.
* @param port Server port.
*
* @return Returns a vector of 'MYSQL_ROW' objects which contain the discovered servers.
*/
vector<MYSQL_ROW> MySQL_Monitor::discover_topology(const char* hostname, int port) {
std::unique_ptr<MySQL_Monitor_State_Data> mmsd(new MySQL_Monitor_State_Data(MON_CONNECT, const_cast<char*> (hostname), port));
mmsd->mondb = monitordb;
mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());
unsigned long long start_time = monotonic_time();
mmsd->t1=start_time;
bool read_only_success = false;
bool crc = false;
if (mmsd->mysql == NULL) { // we don't have a connection, let's create it
bool rc;
rc = mmsd->create_new_connection();
if (mmsd->mysql) {
GloMyMon->My_Conn_Pool->conn_register(mmsd.get());
}
crc = true;
if (rc == false) {
unsigned long long now = monotonic_time();
char *new_error = (char *) malloc(50 + strlen(mmsd->mysql_error_msg));
snprintf(new_error, sizeof(mmsd->mysql_error_msg), "timeout on creating new connection: %s", mmsd->mysql_error_msg);
free(mmsd->mysql_error_msg);
mmsd->mysql_error_msg = new_error;
proxy_error("Timeout on discover_topology check for %s:%d after %lldms. Unable to create a connection. If the server is overload, increase mysql-monitor_connect_timeout. Error: %s.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000, new_error);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT);
goto __exit_monitor_discover_topology;
}
}
mmsd->interr = 0; // reset the value
mmsd->async_exit_status = mysql_query_start(&mmsd->interr,mmsd->mysql, "SELECT * from mysql.rds_topology");
while (mmsd->async_exit_status) {
const unsigned long long now = monotonic_time();
mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status);
if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) {
mmsd->mysql_error_msg = strdup("timeout check");
proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT);
goto __exit_monitor_discover_topology;
}
if (mmsd->interr) {
// error during query
mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql));
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
goto __exit_monitor_discover_topology;
}
if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) {
mmsd->async_exit_status = mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status);
}
}
if (mmsd->interr) {
// error during query
mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql));
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
goto __exit_monitor_discover_topology;
}
mmsd->async_exit_status = mysql_store_result_start(&mmsd->result,mmsd->mysql);
while (mmsd->async_exit_status && ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0)) {
mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status);
const unsigned long long now = monotonic_time();
if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) {
mmsd->mysql_error_msg = strdup("timeout check");
proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT);
goto __exit_monitor_discover_topology;
}
if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) {
mmsd->async_exit_status = mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status);
}
}
if (mmsd->interr) { // ping failed
mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql));
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
}
__exit_monitor_discover_topology:
if (mmsd->mysql) {
// if we reached here we didn't put the connection back
if (mmsd->mysql_error_msg) {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get());
mysql_close(mmsd->mysql); // if we reached here we should destroy it
mmsd->mysql = NULL;
} else {
if (crc) {
bool rc = mmsd->set_wait_timeout();
if (rc) {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
} else {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get());
mysql_close(mmsd->mysql); // set_wait_timeout failed
}
mmsd->mysql = NULL;
} else { // really not sure how we reached here, drop it
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql));
GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get());
mysql_close(mmsd->mysql);
mmsd->mysql = NULL;
}
}
}
// Process the output of the query, if any
vector<MYSQL_ROW> discovered_rows;
if (mmsd->result) {
MYSQL_FIELD *fields = mysql_fetch_fields(mmsd->result);
int num_fields = mysql_num_fields(mmsd->result);
for (int i = 0; i < num_fields; i++) {
MYSQL_ROW curr_row = mysql_fetch_row(mmsd->result);
string discovered_hostname = curr_row[1];
string discovered_port = curr_row[2];
if (strcmp(hostname, curr_row[1]) != 0) {
discovered_rows.push_back(curr_row);
}
}
mysql_free_result(mmsd->result);
mmsd->result = NULL;
} else {
proxy_info("Unable to query for topology.\n");
}
return discovered_rows;
}
/**
* @brief Discovers the topology of a server and adds the discovered servers to 'mysql_servers'.
* @details Helper method which calls the 'discover_topology' method as well as
* 'MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups' in order to discover topology
* and then add it to 'mysql_servers'.
*/
void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() {
char *error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result *runtime_mysql_servers = NULL;
char *query=(char *)"SELECT hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.runtime_mysql_servers ORDER BY hostgroup_id, hostname, port";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &runtime_mysql_servers);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
set<string> existing_servers;
vector<string> servers_to_add;
unordered_map<string, MySQL_HostGroups_Manager::serverDetails> hostname_values_mapping;
// Do an initial loop through query results to keep track of existing server hostnames
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
string current_hostname = r1->fields[1];
if (std::find(existing_servers.begin(), existing_servers.end(), current_hostname) == existing_servers.end()) {
existing_servers.insert(current_hostname);
}
}
// Discover topology for each server in runtime_mysql_servers that have an aws endpoint
for (std::vector<SQLite3_row *>::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) {
SQLite3_row *r1 = *it;
long int hostgroup = parseLong(r1->fields[0]);
string current_hostname = r1->fields[1];
long int port = parseLong(r1->fields[2]);
if (current_hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) {
vector<MYSQL_ROW> discovered_servers = GloMyMon->discover_topology(current_hostname.c_str(), port);
if (!discovered_servers.empty()) {
for (MYSQL_ROW s: discovered_servers) {
vector<string> value_vector;
string discovered_id = s[0];
string discovered_hostname = s[1];
string discovered_port = s[2];
// Add discovered servers that don't already exist in runtime_mysql_servers
if (std::find(existing_servers.begin(), existing_servers.end(), discovered_hostname) == existing_servers.end()) {
servers_to_add.push_back(discovered_hostname);
MySQL_HostGroups_Manager::serverDetails original_server_values = {
parseLong(r1->fields[0]), // hostgroup_id
r1->fields[1], // hostname
parseLong(discovered_port.c_str()), // port, use from topology discovery instead of from originating server
parseLong(r1->fields[3]), // gtid_port
r1->fields[4], // status, but not using it
parseLong(r1->fields[5]), // weight
parseLong(r1->fields[6]), // compression
parseLong(r1->fields[7]), // max_connections
parseLong(r1->fields[8]), // max_replication_lag
parseLong(r1->fields[9]), // use_ssl
parseLong(r1->fields[10]), // max_latency_ms
r1->fields[11] // comment, but not using it
};
hostname_values_mapping[discovered_hostname] = original_server_values;
}
}
}
}
}
if (!servers_to_add.empty()) {
int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(servers_to_add, hostname_values_mapping);
if (successfully_added_all_servers == EXIT_FAILURE) {
proxy_info("Inserting auto-discovered servers failed.\n");
} else {
proxy_info("Inserting auto-discovered servers succeeded.\n");
}
}
}
}
void * MySQL_Monitor::monitor_read_only() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
@ -3269,6 +3503,8 @@ void * MySQL_Monitor::monitor_read_only() {
unsigned long long t1;
unsigned long long t2;
unsigned long long next_loop_at=0;
int topology_loop = 0;
int topology_loop_max = mysql_thread___monitor_topology_discovery_interval;
while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) {
@ -3287,6 +3523,18 @@ void * MySQL_Monitor::monitor_read_only() {
next_loop_at=0;
}
if (topology_loop >= topology_loop_max) {
try {
discover_topology_and_add_to_mysql_servers();
topology_loop = 0;
} catch (std::runtime_error &e) {
proxy_error("Error during topology auto-discovery: %s\n", e.what());
} catch (...) {
proxy_error("Unknown error during topology auto-discovery.\n");
}
}
topology_loop += 1;
if (t1 < next_loop_at) {
goto __sleep_monitor_read_only;
}

@ -307,6 +307,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_ping_interval",
(char *)"monitor_ping_max_failures",
(char *)"monitor_ping_timeout",
(char *)"monitor_topology_discovery_interval",
(char *)"monitor_read_only_interval",
(char *)"monitor_read_only_timeout",
(char *)"monitor_read_only_max_timeout_count",
@ -822,6 +823,12 @@ th_metrics_map = std::make_tuple(
"Reached maximum ping attempts from monitor.",
metric_tags {}
),
std::make_tuple (
p_th_gauge::mysql_monitor_topology_discovery_interval,
"proxysql_mysql_monitor_topology_discovery_interval",
"How frequently a topology discovery is performed, e.g. a value of 500 means one topology discovery every 500 read-only checks ",
metric_tags {}
),
std::make_tuple (
p_th_gauge::mysql_monitor_read_only_interval,
"proxysql_mysql_monitor_read_only_interval_seconds",
@ -912,6 +919,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_ping_interval=8000;
variables.monitor_ping_max_failures=3;
variables.monitor_ping_timeout=1000;
variables.monitor_topology_discovery_interval=1000;
variables.monitor_read_only_interval=1000;
variables.monitor_read_only_timeout=800;
variables.monitor_read_only_max_timeout_count=3;
@ -2021,6 +2029,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600*1000, false);
VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000*1000, false);
VariablesPointers_int["monitor_topology_discovery_interval"] = make_tuple(&variables.monitor_topology_discovery_interval, 1, 100000, false);
VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7*24*3600*1000, false);
VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600*1000, false);
VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000*1000, false);
@ -3922,6 +3931,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval");
mysql_thread___monitor_ping_max_failures=GloMTH->get_variable_int((char *)"monitor_ping_max_failures");
mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout");
mysql_thread___monitor_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_topology_discovery_interval");
mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval");
mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout");
mysql_thread___monitor_read_only_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_read_only_max_timeout_count");
@ -5201,6 +5211,7 @@ void MySQL_Threads_Handler::p_update_metrics() {
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_enabled]->Set(this->variables.monitor_enabled);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_timeout]->Set(this->variables.monitor_ping_timeout/1000.0);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_max_failures]->Set(this->variables.monitor_ping_max_failures);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_topology_discovery_interval]->Set(this->variables.monitor_topology_discovery_interval);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_interval]->Set(this->variables.monitor_read_only_interval/1000.0);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_timeout]->Set(this->variables.monitor_read_only_timeout/1000.0);
this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_writer_is_also_reader]->Set(this->variables.monitor_writer_is_also_reader);

@ -4,6 +4,7 @@
#include <functional>
#include <sstream>
#include <algorithm>
#include <climits>
#include <fcntl.h>
#include <poll.h>
@ -451,3 +452,22 @@ std::pair<int,const char*> get_dollar_quote_error(const char* version) {
}
}
}
/**
* @brief Parses a string into a long.
* @details Parses a string into a long, with error checks. Throws an exception if parse fails.
* @param s The string to parse.
*
* @return The parsed value of the string as a long.
*/
long parseLong(const char* s) {
errno = 0;
char *temp;
long val = strtol(s, &temp, 0);
if (temp == s || *temp != '\0' || ((val == LONG_MIN || val == LONG_MAX) && errno == ERANGE)) {
throw std::runtime_error("Could not parse long.");
}
return val;
}

Loading…
Cancel
Save