@ -592,6 +592,12 @@ void MySQL_Monitor_State_Data::init_async() {
task_timeout_ = mysql_thread___monitor_read_only_timeout ;
task_handler_ = & MySQL_Monitor_State_Data : : read_only_handler ;
break ;
case MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY :
query_ = QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY ;
async_state_machine_ = ASYNC_QUERY_START ;
task_timeout_ = mysql_thread___monitor_read_only_timeout ;
task_handler_ = & MySQL_Monitor_State_Data : : read_only_handler ;
break ;
# else // TEST_READONLY
case MON_READ_ONLY :
case MON_INNODB_READ_ONLY :
@ -1597,6 +1603,8 @@ void * monitor_read_only_thread(void *arg) {
mmsd - > async_exit_status = mysql_query_start ( & mmsd - > interr , mmsd - > mysql , " SELECT @@global.read_only&@@global.innodb_read_only read_only " ) ;
} else if ( mmsd - > get_task_type ( ) = = MON_READ_ONLY__OR__INNODB_READ_ONLY ) {
mmsd - > async_exit_status = mysql_query_start ( & mmsd - > interr , mmsd - > mysql , " SELECT @@global.read_only|@@global.innodb_read_only read_only " ) ;
} else if ( mmsd - > get_task_type ( ) = = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY ) {
mmsd - > async_exit_status = mysql_query_start ( & mmsd - > interr , mmsd - > mysql , QUERY_READ_ONLY_AND_AWS_TOPOLOGY_DISCOVERY ) ;
} else { // default
mmsd - > async_exit_status = mysql_query_start ( & mmsd - > interr , mmsd - > mysql , " SELECT @@global.read_only read_only " ) ;
}
@ -3256,159 +3264,14 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return ret ;
}
/**
* @ brief Discovers the topology of a server .
* @ details Discovers the topology of the server specified by hostname and port .
* The monitor user must explicitly be granted permissions to view ' mysql . rds_topology ' .
* @ param hostname Hostname of the server .
* @ param port Server port .
*
* @ return Returns a vector of ' MYSQL_ROW ' objects which contain the discovered servers .
*/
vector < MYSQL_ROW > MySQL_Monitor : : discover_topology ( const char * hostname , int port ) {
std : : unique_ptr < MySQL_Monitor_State_Data > mmsd ( new MySQL_Monitor_State_Data ( MON_CONNECT , const_cast < char * > ( hostname ) , port ) ) ;
mmsd - > mondb = monitordb ;
mmsd - > mysql = My_Conn_Pool - > get_connection ( mmsd - > hostname , mmsd - > port , mmsd . get ( ) ) ;
unsigned long long start_time = monotonic_time ( ) ;
mmsd - > t1 = start_time ;
bool read_only_success = false ;
bool crc = false ;
if ( mmsd - > mysql = = NULL ) { // we don't have a connection, let's create it
bool rc ;
rc = mmsd - > create_new_connection ( ) ;
if ( mmsd - > mysql ) {
GloMyMon - > My_Conn_Pool - > conn_register ( mmsd . get ( ) ) ;
}
crc = true ;
if ( rc = = false ) {
unsigned long long now = monotonic_time ( ) ;
char * new_error = ( char * ) malloc ( 50 + strlen ( mmsd - > mysql_error_msg ) ) ;
snprintf ( new_error , sizeof ( mmsd - > mysql_error_msg ) , " timeout on creating new connection: %s " , mmsd - > mysql_error_msg ) ;
free ( mmsd - > mysql_error_msg ) ;
mmsd - > mysql_error_msg = new_error ;
proxy_error ( " Timeout on discover_topology check for %s:%d after %lldms. Unable to create a connection. If the server is overload, increase mysql-monitor_connect_timeout. Error: %s. \n " , mmsd - > hostname , mmsd - > port , ( now - mmsd - > t1 ) / 1000 , new_error ) ;
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT ) ;
goto __exit_monitor_discover_topology ;
}
}
mmsd - > interr = 0 ; // reset the value
mmsd - > async_exit_status = mysql_query_start ( & mmsd - > interr , mmsd - > mysql , " SELECT * from mysql.rds_topology " ) ;
while ( mmsd - > async_exit_status ) {
const unsigned long long now = monotonic_time ( ) ;
mmsd - > async_exit_status = wait_for_mysql ( mmsd - > mysql , mmsd - > async_exit_status ) ;
if ( now > mmsd - > t1 + mysql_thread___monitor_read_only_timeout * 1000 ) {
mmsd - > mysql_error_msg = strdup ( " timeout check " ) ;
proxy_error ( " Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout. \n " , mmsd - > hostname , mmsd - > port , ( now - mmsd - > t1 ) / 1000 ) ;
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT ) ;
goto __exit_monitor_discover_topology ;
}
if ( mmsd - > interr ) {
// error during query
mmsd - > mysql_error_msg = strdup ( mysql_error ( mmsd - > mysql ) ) ;
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , mysql_errno ( mmsd - > mysql ) ) ;
goto __exit_monitor_discover_topology ;
}
if ( ( mmsd - > async_exit_status & MYSQL_WAIT_TIMEOUT ) = = 0 ) {
mmsd - > async_exit_status = mysql_query_cont ( & mmsd - > interr , mmsd - > mysql , mmsd - > async_exit_status ) ;
}
}
if ( mmsd - > interr ) {
// error during query
mmsd - > mysql_error_msg = strdup ( mysql_error ( mmsd - > mysql ) ) ;
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , mysql_errno ( mmsd - > mysql ) ) ;
goto __exit_monitor_discover_topology ;
}
mmsd - > async_exit_status = mysql_store_result_start ( & mmsd - > result , mmsd - > mysql ) ;
while ( mmsd - > async_exit_status & & ( ( mmsd - > async_exit_status & MYSQL_WAIT_TIMEOUT ) = = 0 ) ) {
mmsd - > async_exit_status = wait_for_mysql ( mmsd - > mysql , mmsd - > async_exit_status ) ;
const unsigned long long now = monotonic_time ( ) ;
if ( now > mmsd - > t1 + mysql_thread___monitor_read_only_timeout * 1000 ) {
mmsd - > mysql_error_msg = strdup ( " timeout check " ) ;
proxy_error ( " Timeout on discover_topology check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_read_only_timeout. \n " , mmsd - > hostname , mmsd - > port , ( now - mmsd - > t1 ) / 1000 ) ;
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , ER_PROXYSQL_READ_ONLY_CHECK_TIMEOUT ) ;
goto __exit_monitor_discover_topology ;
}
if ( ( mmsd - > async_exit_status & MYSQL_WAIT_TIMEOUT ) = = 0 ) {
mmsd - > async_exit_status = mysql_store_result_cont ( & mmsd - > result , mmsd - > mysql , mmsd - > async_exit_status ) ;
}
}
if ( mmsd - > interr ) { // ping failed
mmsd - > mysql_error_msg = strdup ( mysql_error ( mmsd - > mysql ) ) ;
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , mysql_errno ( mmsd - > mysql ) ) ;
}
__exit_monitor_discover_topology :
if ( mmsd - > mysql ) {
// if we reached here we didn't put the connection back
if ( mmsd - > mysql_error_msg ) {
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , mysql_errno ( mmsd - > mysql ) ) ;
GloMyMon - > My_Conn_Pool - > conn_unregister ( mmsd . get ( ) ) ;
mysql_close ( mmsd - > mysql ) ; // if we reached here we should destroy it
mmsd - > mysql = NULL ;
} else {
if ( crc ) {
bool rc = mmsd - > set_wait_timeout ( ) ;
if ( rc ) {
GloMyMon - > My_Conn_Pool - > put_connection ( mmsd - > hostname , mmsd - > port , mmsd - > mysql ) ;
} else {
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , mysql_errno ( mmsd - > mysql ) ) ;
GloMyMon - > My_Conn_Pool - > conn_unregister ( mmsd . get ( ) ) ;
mysql_close ( mmsd - > mysql ) ; // set_wait_timeout failed
}
mmsd - > mysql = NULL ;
} else { // really not sure how we reached here, drop it
MyHGM - > p_update_mysql_error_counter ( p_mysql_error_type : : proxysql , mmsd - > hostgroup_id , mmsd - > hostname , mmsd - > port , mysql_errno ( mmsd - > mysql ) ) ;
GloMyMon - > My_Conn_Pool - > conn_unregister ( mmsd . get ( ) ) ;
mysql_close ( mmsd - > mysql ) ;
mmsd - > mysql = NULL ;
}
}
}
// Process the output of the query, if any
vector < MYSQL_ROW > discovered_rows ;
if ( mmsd - > result ) {
MYSQL_FIELD * fields = mysql_fetch_fields ( mmsd - > result ) ;
int num_fields = mysql_num_fields ( mmsd - > result ) ;
for ( int i = 0 ; i < num_fields ; i + + ) {
MYSQL_ROW curr_row = mysql_fetch_row ( mmsd - > result ) ;
string discovered_hostname = curr_row [ 1 ] ;
string discovered_port = curr_row [ 2 ] ;
if ( strcmp ( hostname , curr_row [ 1 ] ) ! = 0 ) {
discovered_rows . push_back ( curr_row ) ;
}
}
mysql_free_result ( mmsd - > result ) ;
mmsd - > result = NULL ;
} else {
proxy_info ( " Unable to query for topology. \n " ) ;
}
return discovered_rows ;
}
/**
* @ brief Discovers the topology of a server and adds the discovered servers to ' mysql_servers ' .
* @ details Helper method which calls the ' discover_topology ' method as well as
* ' MySQL_HostGroups_Manager : : add_discovered_servers_to_mysql_servers_and_replication_hostgroups ' in order to discover topology
* and then add it to ' mysql_servers ' .
* @ brief Processes the discovered servers to eventually add them to ' runtime_mysql_servers ' .
* @ details This method takes a vector of discovered servers , compares them against the existing servers , and adds the new servers to ' runtime_mysql_servers ' with the
* values from their originating server .
* @ param originating_server_hostname A string which denotes the hostname of the originating server , from which the discovered servers were queried and found .
* @ param discovered_servers A vector of servers discovered when querying the cluster ' s topology .
*/
void MySQL_Monitor : : discover_topology_and_add_to_mysql_servers( ) {
void MySQL_Monitor : : process_discovered_topology ( const std : : string & originating_server_hostname , vector < MYSQL_ROW > discovered_servers ) {
char * error = NULL ;
int cols = 0 ;
int affected_rows = 0 ;
@ -3421,65 +3284,59 @@ void MySQL_Monitor::discover_topology_and_add_to_mysql_servers() {
if ( error ) {
proxy_error ( " Error on %s : %s \n " , query , error ) ;
} else {
set < string > existing_servers ;
vector < string > servers_to_add ;
unordered_map < string , MySQL_HostGroups_Manager : : serverDetails > hostname_values_mapping ;
set < string > existing_runtime_servers ;
unordered_map < string , MySQL_HostGroups_Manager : : serverDetails > new_server_values_mapping ;
// Do an initial loop through query results to keep track of existing server hostnames
// Do an initial loop through the query results to keep track of existing runtime server hostnames
for ( std : : vector < SQLite3_row * > : : iterator it = runtime_mysql_servers - > rows . begin ( ) ; it ! = runtime_mysql_servers - > rows . end ( ) ; it + + ) {
SQLite3_row * r1 = * it ;
string current_hostname = r1 - > fields [ 1 ] ;
if ( std : : find ( existing_servers . begin ( ) , existing_servers . end ( ) , current_hostname ) = = existing_servers . end ( ) ) {
existing_servers . insert ( current_hostname ) ;
}
}
// Discover topology for each server in runtime_mysql_servers that have an aws endpoint
for ( std : : vector < SQLite3_row * > : : iterator it = runtime_mysql_servers - > rows . begin ( ) ; it ! = runtime_mysql_servers - > rows . end ( ) ; it + + ) {
SQLite3_row * r1 = * it ;
long int hostgroup = parseLong ( r1 - > fields [ 0 ] ) ;
string current_hostname = r1 - > fields [ 1 ] ;
long int port = parseLong ( r1 - > fields [ 2 ] ) ;
if ( current_hostname . find ( AWS_ENDPOINT_SUFFIX_STRING ) ! = std : : string : : npos ) {
vector < MYSQL_ROW > discovered_servers = GloMyMon - > discover_topology ( current_hostname . c_str ( ) , port ) ;
if ( ! discovered_servers . empty ( ) ) {
for ( MYSQL_ROW s : discovered_servers ) {
vector < string > value_vector ;
string discovered_id = s [ 0 ] ;
string discovered_hostname = s [ 1 ] ;
string discovered_port = s [ 2 ] ;
// Add discovered servers that don't already exist in runtime_mysql_servers
if ( std : : find ( existing_servers . begin ( ) , existing_servers . end ( ) , discovered_hostname ) = = existing_servers . end ( ) ) {
servers_to_add . push_back ( discovered_hostname ) ;
MySQL_HostGroups_Manager : : serverDetails original_server_values = {
parseLong ( r1 - > fields [ 0 ] ) , // hostgroup_id
r1 - > fields [ 1 ] , // hostname
parseLong ( discovered_port . c_str ( ) ) , // port, use from topology discovery instead of from originating server
parseLong ( r1 - > fields [ 3 ] ) , // gtid_port
r1 - > fields [ 4 ] , // status, but not using it
parseLong ( r1 - > fields [ 5 ] ) , // weight
parseLong ( r1 - > fields [ 6 ] ) , // compression
parseLong ( r1 - > fields [ 7 ] ) , // max_connections
parseLong ( r1 - > fields [ 8 ] ) , // max_replication_lag
parseLong ( r1 - > fields [ 9 ] ) , // use_ssl
parseLong ( r1 - > fields [ 10 ] ) , // max_latency_ms
r1 - > fields [ 11 ] // comment, but not using it
} ;
hostname_values_mapping [ discovered_hostname ] = original_server_values ;
}
string current_runtime_hostname = r1 - > fields [ 1 ] ;
if ( std : : find ( existing_runtime_servers . begin ( ) , existing_runtime_servers . end ( ) , current_runtime_hostname ) = = existing_runtime_servers . end ( ) ) {
existing_runtime_servers . insert ( current_runtime_hostname ) ;
}
}
// Loop through discovered servers and process the ones we plan to add
for ( MYSQL_ROW s : discovered_servers ) {
string current_discovered_id = s [ 1 ] ;
string current_discovered_hostname = s [ 2 ] ;
string current_discovered_port = s [ 3 ] ;
// We only add the discovered server if it is not the originating server and it does not already exist in 'runtime_mysql_servers' and it is not already saved to be added
bool already_exists = std : : find ( existing_runtime_servers . begin ( ) , existing_runtime_servers . end ( ) , current_discovered_hostname ) ! = existing_runtime_servers . end ( ) ;
bool already_saved = new_server_values_mapping . find ( current_discovered_hostname ) ! = new_server_values_mapping . end ( ) ;
if ( current_discovered_hostname ! = originating_server_hostname & & ! already_exists & & ! already_saved ) {
// Search for the originating server's values and store it
for ( std : : vector < SQLite3_row * > : : iterator it = runtime_mysql_servers - > rows . begin ( ) ; it ! = runtime_mysql_servers - > rows . end ( ) ; it + + ) {
SQLite3_row * r1 = * it ;
string current_runtime_hostname = r1 - > fields [ 1 ] ;
if ( current_runtime_hostname = = originating_server_hostname ) {
MySQL_HostGroups_Manager : : serverDetails originating_server_values ;
originating_server_values . hostgroup_id = parseLong ( r1 - > fields [ 0 ] ) ;
originating_server_values . originating_hostname = current_runtime_hostname ;
originating_server_values . port = parseLong ( current_discovered_port . c_str ( ) ) ;
originating_server_values . gtid_port = parseLong ( r1 - > fields [ 3 ] ) ;
originating_server_values . status = r1 - > fields [ 4 ] ; // not used
originating_server_values . weight = parseLong ( r1 - > fields [ 5 ] ) ;
originating_server_values . compression = parseLong ( r1 - > fields [ 6 ] ) ;
originating_server_values . max_connections = parseLong ( r1 - > fields [ 7 ] ) ;
originating_server_values . max_replication_lag = parseLong ( r1 - > fields [ 8 ] ) ;
originating_server_values . use_ssl = parseLong ( r1 - > fields [ 9 ] ) ;
originating_server_values . max_latency_ms = parseLong ( r1 - > fields [ 10 ] ) ;
originating_server_values . comment = r1 - > fields [ 11 ] ; // not used
new_server_values_mapping [ current_discovered_hostname ] = originating_server_values ;
}
}
}
}
if ( ! servers_to_add . empty ( ) ) {
int successfully_added_all_servers = MyHGM - > add_discovered_servers_to_mysql_servers_and_replication_hostgroups ( servers_to_add , hostname_values_mapping ) ;
// Add the new servers if any
if ( ! new_server_values_mapping . empty ( ) ) {
int successfully_added_all_servers = MyHGM - > add_discovered_servers_to_mysql_servers_and_replication_hostgroups ( new_server_values_mapping ) ;
if ( successfully_added_all_servers = = EXIT_FAILURE ) {
proxy_info ( " Inserting auto-discovered servers failed. \n " ) ;
@ -3504,9 +3361,10 @@ void * MySQL_Monitor::monitor_read_only() {
unsigned long long t2 ;
unsigned long long next_loop_at = 0 ;
int topology_loop = 0 ;
int topology_loop_max = mysql_thread___monitor_ topology_discovery_interval;
int topology_loop_max = mysql_thread___monitor_ aws_rds_ topology_discovery_interval;
while ( GloMyMon - > shutdown = = false & & mysql_thread___monitor_enabled = = true ) {
bool do_discovery_check = false ;
unsigned int glover ;
char * error = NULL ;
@ -3523,17 +3381,6 @@ void * MySQL_Monitor::monitor_read_only() {
next_loop_at = 0 ;
}
if ( topology_loop > = topology_loop_max ) {
try {
discover_topology_and_add_to_mysql_servers ( ) ;
topology_loop = 0 ;
} catch ( std : : runtime_error & e ) {
proxy_error ( " Error during topology auto-discovery: %s \n " , e . what ( ) ) ;
} catch ( . . . ) {
proxy_error ( " Unknown error during topology auto-discovery. \n " ) ;
}
}
topology_loop + = 1 ;
if ( t1 < next_loop_at ) {
goto __sleep_monitor_read_only ;
@ -3551,8 +3398,14 @@ void * MySQL_Monitor::monitor_read_only() {
goto __end_monitor_read_only_loop ;
}
if ( topology_loop > = topology_loop_max ) {
do_discovery_check = true ;
topology_loop = 0 ;
}
topology_loop + = 1 ;
// resultset must be initialized before calling monitor_read_only_async
monitor_read_only_async ( resultset ) ;
monitor_read_only_async ( resultset , do_discovery_check );
if ( shutdown ) return NULL ;
__end_monitor_read_only_loop :
@ -7417,7 +7270,7 @@ bool MySQL_Monitor::monitor_read_only_process_ready_tasks(const std::vector<MySQ
std : : list < read_only_server_t > mysql_servers ;
for ( auto & mmsd : mmsds ) {
string originating_server_hostname = mmsd - > hostname ;
const auto task_result = mmsd - > get_task_result ( ) ;
assert ( task_result ! = MySQL_Monitor_State_Data_Task_Result : : TASK_RESULT_PENDING ) ;
@ -7487,6 +7340,44 @@ VALGRIND_ENABLE_ERROR_REPORTING;
}
rc = ( * proxy_sqlite3_bind_int64 ) ( statement , 5 , read_only ) ; ASSERT_SQLITE_OK ( rc , mmsd - > mondb ) ;
} else if ( fields & & mmsd - > get_task_type ( ) = = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY ) {
// Process the read_only field as above and store the first server
vector < MYSQL_ROW > discovered_servers ;
for ( k = 0 ; k < num_fields ; k + + ) {
if ( strcmp ( ( char * ) " read_only " , ( char * ) fields [ k ] . name ) = = 0 ) {
j = k ;
}
}
if ( j > - 1 ) {
MYSQL_ROW row = mysql_fetch_row ( mmsd - > result ) ;
if ( row ) {
discovered_servers . push_back ( row ) ;
VALGRIND_DISABLE_ERROR_REPORTING ;
if ( row [ j ] ) {
if ( ! strcmp ( row [ j ] , " 0 " ) | | ! strcasecmp ( row [ j ] , " OFF " ) )
read_only = 0 ;
}
VALGRIND_ENABLE_ERROR_REPORTING ;
}
}
// Store the remaining servers
int num_rows = mysql_num_rows ( mmsd - > result ) ;
for ( int i = 1 ; i < num_rows ; i + + ) {
MYSQL_ROW row = mysql_fetch_row ( mmsd - > result ) ;
discovered_servers . push_back ( row ) ;
}
// Process the discovered servers and add them to 'runtime_mysql_servers'
if ( ! discovered_servers . empty ( ) ) {
try {
process_discovered_topology ( originating_server_hostname , discovered_servers ) ;
} catch ( std : : runtime_error & e ) {
proxy_error ( " Error during topology auto-discovery: %s \n " , e . what ( ) ) ;
} catch ( . . . ) {
proxy_error ( " Unknown error during topology auto-discovery. \n " ) ;
}
}
} else {
proxy_error ( " mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994 \n " , mmsd - > hostname , mmsd - > port ) ;
rc = ( * proxy_sqlite3_bind_null ) ( statement , 5 ) ; ASSERT_SQLITE_OK ( rc , mmsd - > mondb ) ;
@ -7547,7 +7438,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return true ;
}
void MySQL_Monitor : : monitor_read_only_async ( SQLite3_result * resultset ) {
void MySQL_Monitor : : monitor_read_only_async ( SQLite3_result * resultset , bool do_discovery_check ) {
assert ( resultset ) ;
std : : vector < std : : unique_ptr < MySQL_Monitor_State_Data > > mmsds ;
@ -7570,6 +7461,12 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3_result* resultset) {
} else if ( strcasecmp ( r - > fields [ 3 ] , ( char * ) " read_only|innodb_read_only " ) = = 0 ) {
task_type = MON_READ_ONLY__OR__INNODB_READ_ONLY ;
}
// Change task type if it's time to do discovery check. Only for aws rds endpoints
string hostname = r - > fields [ 0 ] ;
if ( do_discovery_check & & hostname . find ( AWS_ENDPOINT_SUFFIX_STRING ) ! = std : : string : : npos ) {
task_type = MON_READ_ONLY__AND__AWS_RDS_TOPOLOGY_DISCOVERY ;
}
}
std : : unique_ptr < MySQL_Monitor_State_Data > mmsd (