diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index ea75310aa..c83b44961 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -816,7 +816,7 @@ class MySQL_HostGroups_Manager { struct serverDetails { long int hostgroup_id; - string hostname; + string originating_hostname; uint16_t port; uint16_t gtid_port; string status; @@ -829,7 +829,7 @@ class MySQL_HostGroups_Manager { string comment; }; - int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping); + int add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map new_server_values_mapping); void rebuild_hostname_hostgroup_mapping(); void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 1d5eb3bc8..667fc26dd 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -57,6 +57,7 @@ struct cmp_str { #define N_L_ASE 16 #define AWS_ENDPOINT_SUFFIX_STRING "rds.amazonaws.com" +#define QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY "SELECT @@global.read_only read_only, id, endpoint, port from mysql.rds_topology" /* @@ -191,7 +192,8 @@ enum MySQL_Monitor_State_Data_Task_Type { MON_GROUP_REPLICATION, MON_REPLICATION_LAG, MON_GALERA, - MON_AWS_AURORA + MON_AWS_AURORA, + MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY }; enum class MySQL_Monitor_State_Data_Task_Result { @@ -420,8 +422,7 @@ class MySQL_Monitor { static bool update_dns_cache_from_mysql_conn(const MYSQL* mysql); static void trigger_dns_cache_update(); - vector discover_topology(const char* hostname, int port); - void discover_topology_and_add_to_mysql_servers(); + void process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers); private: std::vector *tables_defs_monitor; @@ -533,7 +534,7 @@ private: * Note: Calling init_async is mandatory before executing tasks asynchronously. */ void monitor_ping_async(SQLite3_result* resultset); - void monitor_read_only_async(SQLite3_result* resultset); + void monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check); void monitor_replication_lag_async(SQLite3_result* resultset); void monitor_group_replication_async(); void monitor_galera_async(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 20e39a504..66d8f740d 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -305,7 +305,7 @@ struct p_th_gauge { mysql_monitor_ping_interval, mysql_monitor_ping_timeout, mysql_monitor_ping_max_failures, - mysql_monitor_topology_discovery_interval, + mysql_monitor_aws_rds_topology_discovery_interval, mysql_monitor_read_only_interval, mysql_monitor_read_only_timeout, mysql_monitor_writer_is_also_reader, @@ -387,8 +387,8 @@ class MySQL_Threads_Handler int monitor_ping_max_failures; //! Monitor ping timeout. Unit: 'ms'. int monitor_ping_timeout; - //! Monitor topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'. - int monitor_topology_discovery_interval; + //! Monitor aws rds topology discovery interval. Unit: 'one discovery check per X monitor_read_only checks'. + int monitor_aws_rds_topology_discovery_interval; //! Monitor read only timeout. Unit: 'ms'. int monitor_read_only_interval; //! Monitor read only timeout. Unit: 'ms'. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 56e55682c..81c72d65e 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -899,7 +899,7 @@ __thread int mysql_thread___monitor_connect_timeout; __thread int mysql_thread___monitor_ping_interval; __thread int mysql_thread___monitor_ping_max_failures; __thread int mysql_thread___monitor_ping_timeout; -__thread int mysql_thread___monitor_topology_discovery_interval; +__thread int mysql_thread___monitor_aws_rds_topology_discovery_interval; __thread int mysql_thread___monitor_read_only_interval; __thread int mysql_thread___monitor_read_only_timeout; __thread int mysql_thread___monitor_read_only_max_timeout_count; @@ -1069,7 +1069,7 @@ extern __thread int mysql_thread___monitor_connect_timeout; extern __thread int mysql_thread___monitor_ping_interval; extern __thread int mysql_thread___monitor_ping_max_failures; extern __thread int mysql_thread___monitor_ping_timeout; -extern __thread int mysql_thread___monitor_topology_discovery_interval; +extern __thread int mysql_thread___monitor_aws_rds_topology_discovery_interval; extern __thread int mysql_thread___monitor_read_only_interval; extern __thread int mysql_thread___monitor_read_only_timeout; extern __thread int mysql_thread___monitor_read_only_max_timeout_count; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 90e899443..51a46548e 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -7922,29 +7922,36 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv) /** * @brief Updates replication hostgroups by adding autodiscovered mysql servers. -* @details Adds each server from 'servers_to_add' to the 'runtime_mysql_servers' table. +* @details Adds each server from 'new_server_values_mapping' to the 'runtime_mysql_servers' table. * We then rebuild the 'mysql_servers' table as well as the internal 'hostname_hostgroup_mapping'. -* @param servers_to_add A vector containing the strings representing the hostnames of servers to add to 'mysql_servers'. -* @param hostname_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct. +* @param new_server_values_mapping A mapping containing the hostname of the discovered server mapped to the metadata of server from which it was discovered, stored in a 'serverDetails' struct. * * @return Returns EXIT_FAILURE code on failure and EXIT_SUCCESS code on success. */ -int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(vector servers_to_add, unordered_map hostname_values_mapping) { +int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups(unordered_map new_server_values_mapping) { int exit_code = EXIT_SUCCESS; + bool added_new_server = false; wrlock(); try { - for (string host : servers_to_add) { - long int hostgroup_id = hostname_values_mapping[host].hostgroup_id; - uint16_t port = hostname_values_mapping[host].port; - - uint16_t gtid_port = hostname_values_mapping[host].gtid_port; - int64_t weight = hostname_values_mapping[host].weight; - unsigned int compression = hostname_values_mapping[host].compression; - int64_t max_connections = hostname_values_mapping[host].max_connections; - unsigned int max_replication_lag = hostname_values_mapping[host].max_replication_lag; - int32_t use_ssl = hostname_values_mapping[host].use_ssl; - unsigned int max_latency_ms = hostname_values_mapping[host].max_latency_ms; + for (const auto &s : new_server_values_mapping) { + if (new_server_values_mapping.find(s.first) == new_server_values_mapping.end()) { + continue; + } + + string host = s.first; + MySQL_HostGroups_Manager::serverDetails new_server_values = new_server_values_mapping[host]; + + long int hostgroup_id = new_server_values.hostgroup_id; + uint16_t port = new_server_values.port; + + uint16_t gtid_port = new_server_values.gtid_port; + int64_t weight = new_server_values.weight; + unsigned int compression = new_server_values.compression; + int64_t max_connections = new_server_values.max_connections; + unsigned int max_replication_lag = new_server_values.max_replication_lag; + int32_t use_ssl = new_server_values.use_ssl; + unsigned int max_latency_ms = new_server_values.max_latency_ms; MySrvC* mysrvc = new MySrvC( const_cast(host.c_str()), port, gtid_port, weight, MYSQL_SERVER_STATUS_ONLINE, @@ -7956,15 +7963,18 @@ int MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replic "Adding new discovered server %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n", host.c_str(), port, hostgroup_id, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl ); - } - purge_mysql_servers_table(); - mydb->execute("DELETE FROM mysql_servers"); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); - generate_mysql_servers_table(); - update_table_mysql_servers_for_monitor(false); - rebuild_hostname_hostgroup_mapping(); + added_new_server = true; + } + if (added_new_server) { + purge_mysql_servers_table(); + mydb->execute("DELETE FROM mysql_servers"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); + generate_mysql_servers_table(); + update_table_mysql_servers_for_monitor(false); + rebuild_hostname_hostgroup_mapping(); + } } catch (...) { exit_code = EXIT_FAILURE; } diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 536d312db..79c6988d9 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -592,6 +592,12 @@ void MySQL_Monitor_State_Data::init_async() { task_timeout_ = mysql_thread___monitor_read_only_timeout; task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; break; + case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY: + query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY; + async_state_machine_ = ASYNC_QUERY_START; + task_timeout_ = mysql_thread___monitor_read_only_timeout; + task_handler_ = &MySQL_Monitor_State_Data::read_only_handler; + break; #else // TEST_READONLY case MON_READ_ONLY: case MON_INNODB_READ_ONLY: @@ -1597,6 +1603,8 @@ void * monitor_read_only_thread(void *arg) { mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only&@@global.innodb_read_only read_only"); } else if (mmsd->get_task_type() == MON_READ_ONLY__OR__INNODB_READ_ONLY) { mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only|@@global.innodb_read_only read_only"); + } else if (mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY); } else { // default mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT @@global.read_only read_only"); } @@ -3256,159 +3264,14 @@ VALGRIND_ENABLE_ERROR_REPORTING; return ret; } - -/** -* @brief Discovers the topology of a server. -* @details Discovers the topology of the server specified by hostname and port. -* The monitor user must explicitly be granted permissions to view 'mysql.rds_topology'. -* @param hostname Hostname of the server. -* @param port Server port. -* -* @return Returns a vector of 'MYSQL_ROW' objects which contain the discovered servers. -*/ -vector MySQL_Monitor::discover_topology(const char* hostname, int port) { - std::unique_ptr mmsd(new MySQL_Monitor_State_Data(MON_CONNECT, const_cast (hostname), port)); - mmsd->mondb = monitordb; - mmsd->mysql = My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); - - unsigned long long start_time = monotonic_time(); - mmsd->t1=start_time; - - bool read_only_success = false; - bool crc = false; - if (mmsd->mysql == NULL) { // we don't have a connection, let's create it - bool rc; - rc = mmsd->create_new_connection(); - if (mmsd->mysql) { - GloMyMon->My_Conn_Pool->conn_register(mmsd.get()); - } - crc = true; - if (rc == false) { - unsigned long long now = monotonic_time(); - char *new_error = (char *) malloc(50 + strlen(mmsd->mysql_error_msg)); - snprintf(new_error, sizeof(mmsd->mysql_error_msg), "timeout on creating new connection: %s", mmsd->mysql_error_msg); - free(mmsd->mysql_error_msg); - mmsd->mysql_error_msg = new_error; - proxy_error("Timeout on discover_topology check for %s:%d after %lldms. Unable to create a connection. If the server is overload, increase mysql-monitor_connect_timeout. Error: %s.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000, new_error); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT); - goto __exit_monitor_discover_topology; - } - } - - mmsd->interr = 0; // reset the value - mmsd->async_exit_status = mysql_query_start(&mmsd->interr,mmsd->mysql, "SELECT * from mysql.rds_topology"); - while (mmsd->async_exit_status) { - const unsigned long long now = monotonic_time(); - mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); - - if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { - mmsd->mysql_error_msg = strdup("timeout check"); - proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); - goto __exit_monitor_discover_topology; - } - - if (mmsd->interr) { - // error during query - mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - goto __exit_monitor_discover_topology; - } - - if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { - mmsd->async_exit_status = mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status); - } - } - - if (mmsd->interr) { - // error during query - mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - goto __exit_monitor_discover_topology; - } - - mmsd->async_exit_status = mysql_store_result_start(&mmsd->result,mmsd->mysql); - while (mmsd->async_exit_status && ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0)) { - mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); - const unsigned long long now = monotonic_time(); - - if (now > mmsd->t1 + mysql_thread___monitor_read_only_timeout * 1000) { - mmsd->mysql_error_msg = strdup("timeout check"); - proxy_error("Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout.\n", mmsd->hostname, mmsd->port, (now-mmsd->t1)/1000); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT); - goto __exit_monitor_discover_topology; - } - - if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { - mmsd->async_exit_status = mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status); - } - } - - if (mmsd->interr) { // ping failed - mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - } - -__exit_monitor_discover_topology: - if (mmsd->mysql) { - // if we reached here we didn't put the connection back - if (mmsd->mysql_error_msg) { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); - mysql_close(mmsd->mysql); // if we reached here we should destroy it - mmsd->mysql = NULL; - } else { - if (crc) { - bool rc = mmsd->set_wait_timeout(); - if (rc) { - GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql); - } else { - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); - mysql_close(mmsd->mysql); // set_wait_timeout failed - } - mmsd->mysql = NULL; - } else { // really not sure how we reached here, drop it - MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)); - GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); - mysql_close(mmsd->mysql); - mmsd->mysql = NULL; - } - } - } - - // Process the output of the query, if any - vector discovered_rows; - if (mmsd->result) { - MYSQL_FIELD *fields = mysql_fetch_fields(mmsd->result); - int num_fields = mysql_num_fields(mmsd->result); - - for (int i = 0; i < num_fields; i++) { - MYSQL_ROW curr_row = mysql_fetch_row(mmsd->result); - string discovered_hostname = curr_row[1]; - string discovered_port = curr_row[2]; - - if (strcmp(hostname, curr_row[1]) != 0) { - discovered_rows.push_back(curr_row); - } - } - - mysql_free_result(mmsd->result); - mmsd->result = NULL; - } else { - proxy_info("Unable to query for topology.\n"); - } - - return discovered_rows; -} - /** -* @brief Discovers the topology of a server and adds the discovered servers to 'mysql_servers'. -* @details Helper method which calls the 'discover_topology' method as well as -* 'MySQL_HostGroups_Manager::add_discovered_servers_to_mysql_servers_and_replication_hostgroups' in order to discover topology -* and then add it to 'mysql_servers'. +* @brief Processes the discovered servers to eventually add them to 'runtime_mysql_servers'. +* @details This method takes a vector of discovered servers, compares them against the existing servers, and adds the new servers to 'runtime_mysql_servers' with the +* values from their originating server. +* @param originating_server_hostname A string which denotes the hostname of the originating server, from which the discovered servers were queried and found. +* @param discovered_servers A vector of servers discovered when querying the cluster's topology. */ -void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() { +void MySQL_Monitor::process_discovered_topology(const std::string& originating_server_hostname, vector discovered_servers) { char *error = NULL; int cols = 0; int affected_rows = 0; @@ -3421,65 +3284,59 @@ void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() { if (error) { proxy_error("Error on %s : %s\n", query, error); } else { - set existing_servers; - vector servers_to_add; - unordered_map hostname_values_mapping; + set existing_runtime_servers; + unordered_map new_server_values_mapping; - // Do an initial loop through query results to keep track of existing server hostnames + // Do an initial loop through the query results to keep track of existing runtime server hostnames for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { SQLite3_row *r1 = *it; - string current_hostname = r1->fields[1]; - if (std::find(existing_servers.begin(), existing_servers.end(), current_hostname) == existing_servers.end()) { - existing_servers.insert(current_hostname); - } - } - - // Discover topology for each server in runtime_mysql_servers that have an aws endpoint - for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { - SQLite3_row *r1 = *it; - - long int hostgroup = parseLong(r1->fields[0]); - string current_hostname = r1->fields[1]; - long int port = parseLong(r1->fields[2]); - - if (current_hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { - vector discovered_servers = GloMyMon->discover_topology(current_hostname.c_str(), port); - - if (!discovered_servers.empty()) { - for (MYSQL_ROW s: discovered_servers) { - vector value_vector; - string discovered_id = s[0]; - string discovered_hostname = s[1]; - string discovered_port = s[2]; - - // Add discovered servers that don't already exist in runtime_mysql_servers - if (std::find(existing_servers.begin(), existing_servers.end(), discovered_hostname) == existing_servers.end()) { - servers_to_add.push_back(discovered_hostname); - - MySQL_HostGroups_Manager::serverDetails original_server_values = { - parseLong(r1->fields[0]), // hostgroup_id - r1->fields[1], // hostname - parseLong(discovered_port.c_str()), // port, use from topology discovery instead of from originating server - parseLong(r1->fields[3]), // gtid_port - r1->fields[4], // status, but not using it - parseLong(r1->fields[5]), // weight - parseLong(r1->fields[6]), // compression - parseLong(r1->fields[7]), // max_connections - parseLong(r1->fields[8]), // max_replication_lag - parseLong(r1->fields[9]), // use_ssl - parseLong(r1->fields[10]), // max_latency_ms - r1->fields[11] // comment, but not using it - }; - - hostname_values_mapping[discovered_hostname] = original_server_values; - } + string current_runtime_hostname = r1->fields[1]; + if (std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_runtime_hostname) == existing_runtime_servers.end()) { + existing_runtime_servers.insert(current_runtime_hostname); + } + } + + // Loop through discovered servers and process the ones we plan to add + for (MYSQL_ROW s : discovered_servers) { + string current_discovered_id = s[1]; + string current_discovered_hostname = s[2]; + string current_discovered_port = s[3]; + + // We only add the discovered server if it is not the originating server and it does not already exist in 'runtime_mysql_servers' and it is not already saved to be added + bool already_exists = std::find(existing_runtime_servers.begin(), existing_runtime_servers.end(), current_discovered_hostname) != existing_runtime_servers.end(); + bool already_saved = new_server_values_mapping.find(current_discovered_hostname) != new_server_values_mapping.end(); + if (current_discovered_hostname != originating_server_hostname && !already_exists && !already_saved) { + // Search for the originating server's values and store it + for (std::vector::iterator it = runtime_mysql_servers->rows.begin(); it != runtime_mysql_servers->rows.end(); it++) { + SQLite3_row *r1 = *it; + + string current_runtime_hostname = r1->fields[1]; + if (current_runtime_hostname == originating_server_hostname) { + MySQL_HostGroups_Manager::serverDetails originating_server_values; + + originating_server_values.hostgroup_id = parseLong(r1->fields[0]); + originating_server_values.originating_hostname = current_runtime_hostname; + originating_server_values.port = parseLong(current_discovered_port.c_str()); + originating_server_values.gtid_port = parseLong(r1->fields[3]); + originating_server_values.status = r1->fields[4]; // not used + originating_server_values.weight = parseLong(r1->fields[5]); + originating_server_values.compression = parseLong(r1->fields[6]); + originating_server_values.max_connections = parseLong(r1->fields[7]); + originating_server_values.max_replication_lag = parseLong(r1->fields[8]); + originating_server_values.use_ssl = parseLong(r1->fields[9]); + originating_server_values.max_latency_ms = parseLong(r1->fields[10]); + originating_server_values.comment = r1->fields[11]; // not used + + new_server_values_mapping[current_discovered_hostname] = originating_server_values; } } } } - if (!servers_to_add.empty()) { - int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(servers_to_add, hostname_values_mapping); + + // Add the new servers if any + if (!new_server_values_mapping.empty()) { + int successfully_added_all_servers = MyHGM->add_discovered_servers_to_mysql_servers_and_replication_hostgroups(new_server_values_mapping); if (successfully_added_all_servers == EXIT_FAILURE) { proxy_info("Inserting auto-discovered servers failed.\n"); @@ -3504,9 +3361,10 @@ void * MySQL_Monitor::monitor_read_only() { unsigned long long t2; unsigned long long next_loop_at=0; int topology_loop = 0; - int topology_loop_max = mysql_thread___monitor_topology_discovery_interval; + int topology_loop_max = mysql_thread___monitor_aws_rds_topology_discovery_interval; while (GloMyMon->shutdown==false && mysql_thread___monitor_enabled==true) { + bool do_discovery_check = false; unsigned int glover; char *error=NULL; @@ -3523,17 +3381,6 @@ void * MySQL_Monitor::monitor_read_only() { next_loop_at=0; } - if (topology_loop >= topology_loop_max) { - try { - discover_topology_and_add_to_mysql_servers(); - topology_loop = 0; - } catch (std::runtime_error &e) { - proxy_error("Error during topology auto-discovery: %s\n", e.what()); - } catch (...) { - proxy_error("Unknown error during topology auto-discovery.\n"); - } - } - topology_loop += 1; if (t1 < next_loop_at) { goto __sleep_monitor_read_only; @@ -3551,8 +3398,14 @@ void * MySQL_Monitor::monitor_read_only() { goto __end_monitor_read_only_loop; } + if (topology_loop >= topology_loop_max) { + do_discovery_check = true; + topology_loop = 0; + } + topology_loop += 1; + // resultset must be initialized before calling monitor_read_only_async - monitor_read_only_async(resultset); + monitor_read_only_async(resultset, do_discovery_check); if (shutdown) return NULL; __end_monitor_read_only_loop: @@ -7417,7 +7270,7 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector mysql_servers; for (auto& mmsd : mmsds) { - + string originating_server_hostname = mmsd->hostname; const auto task_result = mmsd->get_task_result(); assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING); @@ -7487,6 +7340,44 @@ VALGRIND_ENABLE_ERROR_REPORTING; } rc = (*proxy_sqlite3_bind_int64)(statement, 5, read_only); ASSERT_SQLITE_OK(rc, mmsd->mondb); + } else if (fields && mmsd->get_task_type() == MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY) { + // Process the read_only field as above and store the first server + vector discovered_servers; + for (k = 0; k < num_fields; k++) { + if (strcmp((char*)"read_only", (char*)fields[k].name) == 0) { + j = k; + } + } + if (j > -1) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + if (row) { + discovered_servers.push_back(row); +VALGRIND_DISABLE_ERROR_REPORTING; + if (row[j]) { + if (!strcmp(row[j], "0") || !strcasecmp(row[j], "OFF")) + read_only = 0; + } +VALGRIND_ENABLE_ERROR_REPORTING; + } + } + + // Store the remaining servers + int num_rows = mysql_num_rows(mmsd->result); + for (int i = 1; i < num_rows; i++) { + MYSQL_ROW row = mysql_fetch_row(mmsd->result); + discovered_servers.push_back(row); + } + + // Process the discovered servers and add them to 'runtime_mysql_servers' + if (!discovered_servers.empty()) { + try { + process_discovered_topology(originating_server_hostname, discovered_servers); + } catch (std::runtime_error &e) { + proxy_error("Error during topology auto-discovery: %s\n", e.what()); + } catch (...) { + proxy_error("Unknown error during topology auto-discovery.\n"); + } + } } else { proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port); rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb); @@ -7547,7 +7438,7 @@ VALGRIND_ENABLE_ERROR_REPORTING; return true; } -void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) { +void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset, bool do_discovery_check) { assert(resultset); std::vector> mmsds; @@ -7570,6 +7461,12 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) { } else if (strcasecmp(r->fields[3], (char*)"read_only|innodb_read_only") == 0) { task_type = MON_READ_ONLY__OR__INNODB_READ_ONLY; } + + // Change task type if it's time to do discovery check. Only for aws rds endpoints + string hostname = r->fields[0]; + if (do_discovery_check && hostname.find(AWS_ENDPOINT_SUFFIX_STRING) != std::string::npos) { + task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY; + } } std::unique_ptr mmsd( diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 1efbbfb19..544414d43 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -307,7 +307,7 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_ping_interval", (char *)"monitor_ping_max_failures", (char *)"monitor_ping_timeout", - (char *)"monitor_topology_discovery_interval", + (char *)"monitor_aws_rds_topology_discovery_interval", (char *)"monitor_read_only_interval", (char *)"monitor_read_only_timeout", (char *)"monitor_read_only_max_timeout_count", @@ -824,8 +824,8 @@ th_metrics_map = std::make_tuple( metric_tags {} ), std::make_tuple ( - p_th_gauge::mysql_monitor_topology_discovery_interval, - "proxysql_mysql_monitor_topology_discovery_interval", + p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval, + "proxysql_mysql_monitor_aws_rds_topology_discovery_interval", "How frequently a topology discovery is performed, e.g. a value of 500 means one topology discovery every 500 read-only checks ", metric_tags {} ), @@ -919,7 +919,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_ping_interval=8000; variables.monitor_ping_max_failures=3; variables.monitor_ping_timeout=1000; - variables.monitor_topology_discovery_interval=1000; + variables.monitor_aws_rds_topology_discovery_interval=1000; variables.monitor_read_only_interval=1000; variables.monitor_read_only_timeout=800; variables.monitor_read_only_max_timeout_count=3; @@ -2029,7 +2029,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000*1000, false); - VariablesPointers_int["monitor_topology_discovery_interval"] = make_tuple(&variables.monitor_topology_discovery_interval, 1, 100000, false); + VariablesPointers_int["monitor_aws_rds_topology_discovery_interval"] = make_tuple(&variables.monitor_aws_rds_topology_discovery_interval, 1, 100000, false); VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7*24*3600*1000, false); VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600*1000, false); VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000*1000, false); @@ -3931,7 +3931,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval"); mysql_thread___monitor_ping_max_failures=GloMTH->get_variable_int((char *)"monitor_ping_max_failures"); mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout"); - mysql_thread___monitor_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_topology_discovery_interval"); + mysql_thread___monitor_aws_rds_topology_discovery_interval=GloMTH->get_variable_int((char *)"monitor_aws_rds_topology_discovery_interval"); mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval"); mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout"); mysql_thread___monitor_read_only_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_read_only_max_timeout_count"); @@ -5211,7 +5211,7 @@ void MySQL_Threads_Handler::p_update_metrics() { this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_enabled]->Set(this->variables.monitor_enabled); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_timeout]->Set(this->variables.monitor_ping_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_ping_max_failures]->Set(this->variables.monitor_ping_max_failures); - this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_topology_discovery_interval]->Set(this->variables.monitor_topology_discovery_interval); + this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_aws_rds_topology_discovery_interval]->Set(this->variables.monitor_aws_rds_topology_discovery_interval); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_interval]->Set(this->variables.monitor_read_only_interval/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_read_only_timeout]->Set(this->variables.monitor_read_only_timeout/1000.0); this->status_variables.p_gauge_array[p_th_gauge::mysql_monitor_writer_is_also_reader]->Set(this->variables.monitor_writer_is_also_reader);