Remove redundant exception guards, remove parseLong, add checksum computation after adding a new server, and other small changes based on feedback

pull/4406/head
anphucbui 2 years ago
parent e0db02df27
commit bbb6176ebf

@ -1092,8 +1092,7 @@ class MySQL_HostGroups_Manager {
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
unsigned long long Get_Memory_Stats();
int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>> new_servers);
void rebuild_hostname_hostgroup_mapping();
void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>>& new_servers);
void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);

@ -447,7 +447,7 @@ class MySQL_Monitor {
static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql);
static void trigger_dns_cache_update();
void process_discovered_topology(const std::string& originating_server_hostname, vector<MYSQL_ROW> discovered_servers, int reader_hostgroup);
void process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup);
private:
std::vector<table_def_t *> *tables_defs_monitor;

@ -258,6 +258,4 @@ void close_all_non_term_fd(std::vector<int> excludeFDs);
*/
std::pair<int,const char*> get_dollar_quote_error(const char* version);
long parseLong(const char* s);
#endif

@ -8254,133 +8254,92 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv)
srv->ConnectionsFree->drop_all_connections();
}
MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *hostname, int port, char *username) {
string MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + string(username);
std::lock_guard<std::mutex> lock(Servers_SSL_Params_map_mutex);
auto it = Servers_SSL_Params_map.find(MapKey);
if (it != Servers_SSL_Params_map.end()) {
MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second);
return MSSP;
} else {
MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + ""; // search for empty username
it = Servers_SSL_Params_map.find(MapKey);
if (it != Servers_SSL_Params_map.end()) {
MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second);
return MSSP;
}
}
return NULL;
}
/**
* @brief Updates replication hostgroups by adding autodiscovered mysql servers.
* @details Adds each server from 'new_servers' to the 'runtime_mysql_servers' table.
* We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'.
* @param new_servers A vector of tuples where each tuple contains the values needed to add each new server.
*
* @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success.
*/
int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>> new_servers) {
int exit_code = EXIT_SUCCESS;
bool added_new_server = false;
void MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector<tuple<string, int, int>>& new_servers) {
int added_new_server;
wrlock();
try {
for (tuple<string, int, int> s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);
// Add the discovered server with default values
MySrvC* mysrvc = new MySrvC(
const_cast<char*>(host.c_str()), port, 0, 1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast<char*>("Discovered endpoint")
);
add(mysrvc, hostgroup_id);
proxy_info(
"Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n",
host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl
);
added_new_server = true;
}
// Add the discovered server with default values
for (tuple<string, int, int> s : new_servers) {
string host = std::get<0>(s);
uint16_t port = std::get<1>(s);
long int hostgroup_id = std::get<2>(s);
srv_info_t srv_info { host.c_str(), port, "AWS RDS" };
srv_opts_t srv_opts { -1, -1, -1 };
if (added_new_server) {
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();
update_table_mysql_servers_for_monitor(false);
rebuild_hostname_hostgroup_mapping();
}
} catch (...) {
exit_code = EXIT_FAILURE;
added_new_server = create_new_server_in_hg(hostgroup_id, srv_info, srv_opts);
}
wrunlock();
return exit_code;
}
/**
* @brief Rebuilds the 'hostname_hostgroup_mapping'
* @details Rebuilds the internal 'hostname_hostgroup_mapping' assuming new data has been entered
* and calculates new checksums for 'mysql_servers' and 'mysql_replication_hostgroups'.
*/
void MySQL_HostGroups_Manager::rebuild_hostname_hostgroup_mapping() {
proxy_info("Rebuilding 'Hostgroup_Manager_Mapping' due to checksums change - mysql_servers { old: 0x%lX, new: 0x%lX }, mysql_replication_hostgroups { old:0x%lX, new:0x%lX }\n",
hgsm_mysql_servers_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS],
hgsm_mysql_replication_hostgroups_checksum, table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]);
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
const char* query = "SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup WHERE status<>3 \
UNION \
SELECT DISTINCT hostname, port, '0' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=reader_hostgroup WHERE status<>3 \
ORDER BY hostname, port";
mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
hostgroup_server_mapping.clear();
if (resultset && resultset->rows_count) {
std::string fetched_server_id;
HostGroup_Server_Mapping* fetched_server_mapping = NULL;
// If servers were added, perform necessary updates to internal structures
if (added_new_server > -1) {
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); it++) {
SQLite3_row *r = *it;
// Update the global checksums after 'mysql_servers' regeneration
{
unique_ptr<SQLite3_result> resultset { get_admin_runtime_mysql_servers(mydb) };
string mysrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) };
save_runtime_mysql_servers(resultset.release());
const std::string& server_id = std::string(r->fields[0]) + ":::" + r->fields[1];
// Update the runtime_mysql_servers checksum with the new checksum
uint64_t raw_checksum = this->runtime_mysql_servers ? this->runtime_mysql_servers->raw_checksum() : 0;
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = raw_checksum;
if (fetched_server_mapping == NULL || server_id != fetched_server_id) {
auto itr = hostgroup_server_mapping.find(server_id);
// This is required for preserving coherence in the checksums, otherwise they would be inconsistent with `commit` generated checksums
SpookyHash rep_hgs_hash {};
bool init = false;
uint64_t servers_v2_hash = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_V2];
if (itr == hostgroup_server_mapping.end()) {
std::unique_ptr<HostGroup_Server_Mapping> server_mapping(new HostGroup_Server_Mapping(this));
fetched_server_mapping = server_mapping.get();
hostgroup_server_mapping.insert( std::pair<std::string,std::unique_ptr<MySQL_HostGroups_Manager::HostGroup_Server_Mapping>> {
server_id, std::move(server_mapping)
} );
} else {
fetched_server_mapping = itr->second.get();
if (servers_v2_hash) {
if (init == false) {
init = true;
rep_hgs_hash.Init(19, 3);
}
fetched_server_id = server_id;
rep_hgs_hash.Update(&servers_v2_hash, sizeof(servers_v2_hash));
}
HostGroup_Server_Mapping::Node node;
node.reader_hostgroup_id = atoi(r->fields[4]);
node.writer_hostgroup_id = atoi(r->fields[5]);
node.srv = reinterpret_cast<MySrvC*>(atoll(r->fields[6]));
HostGroup_Server_Mapping::Type type = (r->fields[2] && r->fields[2][0] == '1') ? HostGroup_Server_Mapping::Type::WRITER : HostGroup_Server_Mapping::Type::READER;
fetched_server_mapping->add(type, node);
}
}
delete resultset;
CUCFT1(
rep_hgs_hash, init, "mysql_replication_hostgroups", "writer_hostgroup",
table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS]
);
hgsm_mysql_servers_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS];
hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum[HGM_TABLES::MYSQL_REPLICATION_HOSTGROUPS];
}
proxy_info("Checksum for table %s is %s\n", "mysql_servers", mysrvs_checksum.c_str());
MySQLServers_SslParams * MySQL_HostGroups_Manager::get_Server_SSL_Params(char *hostname, int port, char *username) {
string MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + string(username);
std::lock_guard<std::mutex> lock(Servers_SSL_Params_map_mutex);
auto it = Servers_SSL_Params_map.find(MapKey);
if (it != Servers_SSL_Params_map.end()) {
MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second);
return MSSP;
} else {
MapKey = string(hostname) + string(rand_del) + to_string(port) + string(rand_del) + ""; // search for empty username
it = Servers_SSL_Params_map.find(MapKey);
if (it != Servers_SSL_Params_map.end()) {
MySQLServers_SslParams * MSSP = new MySQLServers_SslParams(it->second);
return MSSP;
pthread_mutex_lock(&GloVars.checksum_mutex);
update_glovars_mysql_servers_checksum(mysrvs_checksum);
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
update_table_mysql_servers_for_monitor(false);
update_hostgroup_manager_mappings();
}
return NULL;
wrunlock();
}

@ -3295,7 +3295,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
* @param discovered_servers A vector of servers discovered when querying the cluster's topology.
* @param reader_hostgroup Reader hostgroup to which we will add the discovered servers.
*/
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector<MYSQL_ROW> discovered_servers, int reader_hostgroup) {
void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, const vector<MYSQL_ROW>& discovered_servers, int reader_hostgroup) {
char *error = NULL;
int cols = 0;
int affected_rows = 0;
@ -3323,10 +3323,18 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s
// Loop through discovered servers and process the ones we haven't saved yet
for (MYSQL_ROW s : discovered_servers) {
string current_discovered_hostname = s[2];
string current_discovered_port = s[3];
string current_discovered_port_string = s[3];
int current_discovered_port_int;
try {
current_discovered_port_int = stoi(s[3]);
} catch (...) {
proxy_error("Unable to parse the port value during topology discovery: [%s]. Terminating discovery early.", current_discovered_port_string);
return;
}
if (find(saved_hostnames.begin(), saved_hostnames.end(), current_discovered_hostname) == saved_hostnames.end()) {
tuple<string, int, int> new_server(current_discovered_hostname, parseLong(current_discovered_port.c_str()), reader_hostgroup);
tuple<string, int, int> new_server(current_discovered_hostname, current_discovered_port_int, reader_hostgroup);
new_servers.push_back(new_server);
saved_hostnames.push_back(current_discovered_hostname);
}
@ -3334,13 +3342,7 @@ void MySQL_Monitor::process_discovered_topology(const std::string& originating_s
// Add the new servers if any
if (!new_servers.empty()) {
int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
if (successfully_added_all_servers == EXIT_FAILURE) {
proxy_info("Inserting auto-discovered servers failed.\n");
} else {
proxy_info("Inserting auto-discovered servers succeeded.\n");
}
MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_servers);
}
}
}
@ -7372,13 +7374,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
// Process the discovered servers and add them to 'runtime_mysql_servers'
if (!discovered_servers.empty()) {
try {
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup);
} catch (std::runtime_error &e) {
proxy_error("Error during topology auto-discovery: %s\n", e.what());
} catch (...) {
proxy_error("Unknown error during topology auto-discovery.\n");
}
process_discovered_topology(originating_server_hostname, discovered_servers, mmsd->reader_hostgroup);
}
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);

@ -504,22 +504,3 @@ std::pair<int,const char*> get_dollar_quote_error(const char* version) {
}
}
}
/**
* @brief Parses a string into a long.
* @details Parses a string into a long, with error checks. Throws an exception if parse fails.
* @param s The string to parse.
*
* @return The parsed value of the string as a long.
*/
long parseLong(const char* s) {
errno = 0;
char *temp;
long val = strtol(s, &temp, 0);
if (temp == s || *temp != '\0' || ((val == LONG_MIN || val == LONG_MAX) && errno == ERANGE)) {
throw std::runtime_error("Could not parse long.");
}
return val;
}

Loading…
Cancel
Save