@ -1586,7 +1586,7 @@ SQLite3_result * MySQL_HostGroups_Manager::execute_query(char *query, char **err
return resultset ;
}
void MySQL_HostGroups_Manager : : CUCFT1 ( SpookyHash & myhash , bool & init , const string & TableName , const string & ColumnName ) {
void MySQL_HostGroups_Manager : : CUCFT1 ( SpookyHash & myhash , bool & init , const string & TableName , const string & ColumnName , uint64_t & raw_checksum ) {
char * error = NULL ;
int cols = 0 ;
int affected_rows = 0 ;
@ -1600,6 +1600,7 @@ void MySQL_HostGroups_Manager::CUCFT1(SpookyHash& myhash, bool& init, const stri
myhash . Init ( 19 , 3 ) ;
}
uint64_t hash1_ = resultset - > raw_checksum ( ) ;
raw_checksum = hash1_ ;
myhash . Update ( & hash1_ , sizeof ( hash1_ ) ) ;
proxy_info ( " Checksum for table %s is 0x%lX \n " , TableName . c_str ( ) , hash1_ ) ;
}
@ -1610,11 +1611,11 @@ void MySQL_HostGroups_Manager::CUCFT1(SpookyHash& myhash, bool& init, const stri
}
void MySQL_HostGroups_Manager : : commit_update_checksums_from_tables ( SpookyHash & myhash , bool & init ) {
CUCFT1 ( myhash , init , " mysql_replication_hostgroups " , " writer_hostgroup " );
CUCFT1 ( myhash , init , " mysql_group_replication_hostgroups " , " writer_hostgroup " );
CUCFT1 ( myhash , init , " mysql_galera_hostgroups " , " writer_hostgroup " );
CUCFT1 ( myhash , init , " mysql_aws_aurora_hostgroups " , " writer_hostgroup " );
CUCFT1 ( myhash , init , " mysql_hostgroup_attributes " , " hostgroup_id " );
CUCFT1 ( myhash , init , " mysql_replication_hostgroups " , " writer_hostgroup " , table_resultset_checksum [ HGM_TABLES : : MYSQL_REPLICATION_HOSTGROUPS ] );
CUCFT1 ( myhash , init , " mysql_group_replication_hostgroups " , " writer_hostgroup " , table_resultset_checksum [ HGM_TABLES : : MYSQL_GROUP_REPLICATION_HOSTGROUPS ] );
CUCFT1 ( myhash , init , " mysql_galera_hostgroups " , " writer_hostgroup " , table_resultset_checksum [ HGM_TABLES : : MYSQL_GALERA_HOSTGROUPS ] );
CUCFT1 ( myhash , init , " mysql_aws_aurora_hostgroups " , " writer_hostgroup " , table_resultset_checksum [ HGM_TABLES : : MYSQL_AWS_AURORA_HOSTGROUPS ] );
CUCFT1 ( myhash , init , " mysql_hostgroup_attributes " , " hostgroup_id " , table_resultset_checksum [ HGM_TABLES : : MYSQL_HOSTGROUP_ATTRIBUTES ] );
}
bool MySQL_HostGroups_Manager : : commit (
@ -1867,8 +1868,9 @@ bool MySQL_HostGroups_Manager::commit(
}
if ( GloAdmin & & GloAdmin - > checksum_variables . checksum_mysql_servers ) {
uint64_t hash1 = 0 , hash2 = 0 ;
//if (GloAdmin && GloAdmin->checksum_variables.checksum_mysql_servers)
{
uint64_t hash1 = 0 , hash2 = 0 ;
SpookyHash myhash ;
char buf [ 80 ] ;
bool init = false ;
@ -1919,6 +1921,9 @@ bool MySQL_HostGroups_Manager::commit(
myhash . Init ( 19 , 3 ) ;
}
uint64_t hash1_ = resultset - > raw_checksum ( ) ;
table_resultset_checksum [ HGM_TABLES : : MYSQL_SERVERS ] = hash1_ ;
myhash . Update ( & hash1_ , sizeof ( hash1_ ) ) ;
proxy_info ( " Checksum for table %s is 0x%lX \n " , " mysql_servers " , hash1_ ) ;
}
@ -1953,6 +1958,58 @@ bool MySQL_HostGroups_Manager::commit(
pthread_mutex_unlock ( & GloVars . checksum_mutex ) ;
}
// fill Hostgroup_Manager_Mapping with latest records
if ( hgsm_mysql_servers_checksum ! = table_resultset_checksum [ HGM_TABLES : : MYSQL_SERVERS ] | |
hgsm_mysql_replication_hostgroups_checksum ! = table_resultset_checksum [ HGM_TABLES : : MYSQL_REPLICATION_HOSTGROUPS ] )
{
proxy_info ( " Checksum for table 'mysql_servers': old:0x%lX new:0x%lX \n " , hgsm_mysql_servers_checksum , table_resultset_checksum [ HGM_TABLES : : MYSQL_SERVERS ] ) ;
proxy_info ( " Checksum for table 'mysql_replication_hostgroups': old:0x%lX new:0x%lX \n " , hgsm_mysql_replication_hostgroups_checksum , table_resultset_checksum [ HGM_TABLES : : MYSQL_REPLICATION_HOSTGROUPS ] ) ;
char * error = NULL ;
int cols = 0 ;
int affected_rows = 0 ;
SQLite3_result * resultset = NULL ;
const char * query = " SELECT DISTINCT hostname, port, '1' is_writer, status, reader_hostgroup, writer_hostgroup, mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup \
UNION \
SELECT DISTINCT hostname , port , ' 0 ' is_writer , status , reader_hostgroup , writer_hostgroup , mem_pointer FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id = reader_hostgroup \
ORDER BY hostname , port " ;
mydb - > execute_statement ( query , & error , & cols , & affected_rows , & resultset ) ;
hostgroup_server_mapping . clear ( ) ;
if ( resultset & & resultset - > rows_count ) {
std : : string fetched_server_id ;
HostGroup_Server_Mapping * fetched_server_mapping = NULL ;
for ( std : : vector < SQLite3_row * > : : iterator it = resultset - > rows . begin ( ) ; it ! = resultset - > rows . end ( ) ; + + it ) {
SQLite3_row * r = * it ;
const std : : string & server_id = std : : string ( r - > fields [ 0 ] ) + " ::: " + r - > fields [ 1 ] ;
if ( fetched_server_mapping = = NULL | | server_id ! = fetched_server_id ) {
fetched_server_mapping = & hostgroup_server_mapping [ server_id ] ;
fetched_server_mapping - > set_HGM ( this ) ;
fetched_server_id = server_id ;
}
HostGroup_Server_Mapping : : Node node ;
node . server_status = static_cast < MySerStatus > ( atoi ( r - > fields [ 3 ] ) ) ;
node . reader_hostgroup_id = atoi ( r - > fields [ 4 ] ) ;
node . writer_hostgroup_id = atoi ( r - > fields [ 5 ] ) ;
node . srv = reinterpret_cast < MySrvC * > ( atoll ( r - > fields [ 6 ] ) ) ;
HostGroup_Server_Mapping : : Type type = ( r - > fields [ 2 ] & & r - > fields [ 2 ] [ 0 ] = = ' 1 ' ) ? HostGroup_Server_Mapping : : Type : : WRITER : HostGroup_Server_Mapping : : Type : : READER ;
fetched_server_mapping - > add ( type , node ) ;
}
}
delete resultset ;
hgsm_mysql_servers_checksum = table_resultset_checksum [ HGM_TABLES : : MYSQL_SERVERS ] ;
hgsm_mysql_replication_hostgroups_checksum = table_resultset_checksum [ HGM_TABLES : : MYSQL_REPLICATION_HOSTGROUPS ] ;
}
ev_async_send ( gtid_ev_loop , gtid_ev_async ) ;
__sync_fetch_and_add ( & status . servers_table_version , 1 ) ;
@ -4499,6 +4556,213 @@ void MySQL_HostGroups_Manager::read_only_action(char *hostname, int port, int re
free ( query ) ;
}
void MySQL_HostGroups_Manager : : read_only_action_v2 ( const std : : list < std : : tuple < std : : string , int , int > > & mysql_servers ) {
std : : string hostname ;
int port = - 1 ;
int read_only = - 1 ;
bool update_mysql_servers_table = false ;
unsigned long long curtime1 = monotonic_time ( ) ;
wrlock ( ) ;
for ( const auto & server : mysql_servers ) {
bool is_writer = false ;
std : : tie ( hostname , port , read_only ) = server ;
const std : : string & srv_id = hostname + " ::: " + std : : to_string ( port ) ;
auto itr = hostgroup_server_mapping . find ( srv_id ) ;
if ( itr = = hostgroup_server_mapping . end ( ) ) {
assert ( 0 ) ;
}
HostGroup_Server_Mapping & host_server_mapping = itr - > second ;
const std : : vector < HostGroup_Server_Mapping : : Node > & writer_map = host_server_mapping . get ( HostGroup_Server_Mapping : : Type : : WRITER ) ;
const std : : vector < HostGroup_Server_Mapping : : Node > & reader_map = host_server_mapping . get ( HostGroup_Server_Mapping : : Type : : READER ) ;
bool removed_offline_server = false ;
for ( size_t i = 0 ; i < writer_map . size ( ) ; ) {
const HostGroup_Server_Mapping : : Node & node = writer_map [ i ] ;
// remove offline hard server node from writer map
if ( node . server_status = = MYSQL_SERVER_STATUS_OFFLINE_HARD ) {
proxy_debug ( PROXY_DEBUG_MYSQL_CONNPOOL , 5 , " Removing server %s:%d in hostgroup=%d having status='OFFLINE_HARD' \n " , hostname . c_str ( ) , port , node . writer_hostgroup_id ) ;
host_server_mapping . remove ( HostGroup_Server_Mapping : : Type : : WRITER , i ) ;
removed_offline_server = true ;
continue ;
}
i + + ;
}
is_writer = ! writer_map . empty ( ) ;
if ( read_only = = 0 ) {
if ( is_writer = = false ) {
// the server has read_only=0 (writer), but we can't find any writer,
// so we copy all reader nodes to writer
host_server_mapping . copy ( HostGroup_Server_Mapping : : Type : : WRITER , HostGroup_Server_Mapping : : Type : : READER ) ;
if ( mysql_thread___monitor_writer_is_also_reader ) {
// server is also a reader, we copy all nodes from writer to reader (previous reader nodes will be reused)
host_server_mapping . copy ( HostGroup_Server_Mapping : : Type : : READER , HostGroup_Server_Mapping : : Type : : WRITER , false ) ;
} else {
// server can only be a writer
host_server_mapping . clear ( HostGroup_Server_Mapping : : Type : : READER ) ;
}
update_mysql_servers_table = true ;
} else {
bool act = false ;
// if the server was RO=0 on the previous check then no action is needed
if ( host_server_mapping . get_readonly_flag ( ) ! = 0 ) {
// it is the first time that we detect RO on this server
act = removed_offline_server ;
if ( act = = false ) {
for ( const auto & reader_node : reader_map ) {
for ( const auto & writer_node : writer_map ) {
if ( reader_node . writer_hostgroup_id = = writer_node . writer_hostgroup_id ) {
goto __writer_found ;
}
}
act = true ;
break ;
__writer_found :
continue ;
}
}
if ( act = = false ) {
// no action required, therefore we set readonly_flag to 0
proxy_info ( " read_only_action_v2() detected RO=0 on server %s:%d for the first time after commit(), but no need to reconfigure \n " , hostname . c_str ( ) , port ) ;
host_server_mapping . set_readonly_flag ( 0 ) ;
}
} else {
// the server was already detected as RO=0
// no action required
}
if ( act = = true ) { // there are servers either missing, or with stats=OFFLINE_HARD
// copy all reader nodes to writer
host_server_mapping . copy ( HostGroup_Server_Mapping : : Type : : WRITER , HostGroup_Server_Mapping : : Type : : READER ) ;
if ( mysql_thread___monitor_writer_is_also_reader ) {
// server is also a reader, we copy all nodes from writer to reader (previous reader nodes will be reused)
host_server_mapping . copy ( HostGroup_Server_Mapping : : Type : : READER , HostGroup_Server_Mapping : : Type : : WRITER , false ) ;
} else {
// server can only be a writer
host_server_mapping . clear ( HostGroup_Server_Mapping : : Type : : READER ) ;
}
update_mysql_servers_table = true ;
}
}
} else if ( read_only = = 1 ) {
if ( is_writer ) {
// the server has read_only=1 (reader), but we find it as writer, so we copy all writer nodes to reader (previous reader nodes will be reused)
host_server_mapping . copy ( HostGroup_Server_Mapping : : Type : : READER , HostGroup_Server_Mapping : : Type : : WRITER , false ) ;
// clearing all writer nodes
host_server_mapping . clear ( HostGroup_Server_Mapping : : Type : : WRITER ) ;
update_mysql_servers_table = true ;
}
} else {
// LCOV_EXCL_START
assert ( 0 ) ;
break ;
// LCOV_EXCL_STOP
}
}
if ( update_mysql_servers_table ) {
purge_mysql_servers_table ( ) ;
proxy_debug ( PROXY_DEBUG_MYSQL_CONNPOOL , 4 , " DELETE FROM mysql_servers \n " ) ;
mydb - > execute ( " DELETE FROM mysql_servers " ) ;
generate_mysql_servers_table ( ) ;
//if (GloAdmin && GloAdmin->checksum_variables.checksum_mysql_servers)
{
char * error = NULL ;
int cols = 0 ;
int affected_rows = 0 ;
SQLite3_result * resultset = NULL ;
mydb - > execute_statement ( MYHGM_GEN_ADMIN_RUNTIME_SERVERS , & error , & cols , & affected_rows , & resultset ) ;
save_runtime_mysql_servers ( resultset ) ; // assigning runtime_mysql_servers with updated mysql server resultset
resultset = NULL ;
char * query = ( char * ) " SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE status<>3 ORDER BY hostgroup_id, hostname, port " ;
mydb - > execute_statement ( query , & error , & cols , & affected_rows , & resultset ) ;
if ( resultset ) {
if ( resultset - > rows_count ) {
table_resultset_checksum [ HGM_TABLES : : MYSQL_SERVERS ] = resultset - > raw_checksum ( ) ;
hgsm_mysql_servers_checksum = table_resultset_checksum [ HGM_TABLES : : MYSQL_SERVERS ] ;
}
delete resultset ;
} else {
proxy_info ( " Checksum for table %s is 0x%lX \n " , " mysql_servers " , ( long unsigned int ) 0 ) ;
}
uint64_t hash = 0 , hash2 = 0 ;
SpookyHash myhash ;
myhash . Init ( 19 , 3 ) ;
hash = table_resultset_checksum [ HGM_TABLES : : MYSQL_SERVERS ] ;
myhash . Update ( & hash , sizeof ( hash ) ) ;
hash = table_resultset_checksum [ HGM_TABLES : : MYSQL_REPLICATION_HOSTGROUPS ] ;
myhash . Update ( & hash , sizeof ( hash ) ) ;
hash = table_resultset_checksum [ HGM_TABLES : : MYSQL_GROUP_REPLICATION_HOSTGROUPS ] ;
myhash . Update ( & hash , sizeof ( hash ) ) ;
hash = table_resultset_checksum [ HGM_TABLES : : MYSQL_GALERA_HOSTGROUPS ] ;
myhash . Update ( & hash , sizeof ( hash ) ) ;
hash = table_resultset_checksum [ HGM_TABLES : : MYSQL_AWS_AURORA_HOSTGROUPS ] ;
myhash . Update ( & hash , sizeof ( hash ) ) ;
hash = table_resultset_checksum [ HGM_TABLES : : MYSQL_HOSTGROUP_ATTRIBUTES ] ;
myhash . Update ( & hash , sizeof ( hash ) ) ;
myhash . Final ( & hash , & hash2 ) ;
char buf [ 80 ] ;
uint32_t d32 [ 2 ] ;
memcpy ( & d32 , & hash , sizeof ( hash ) ) ;
sprintf ( buf , " 0x%0X%0X " , d32 [ 0 ] , d32 [ 1 ] ) ;
pthread_mutex_lock ( & GloVars . checksum_mutex ) ;
GloVars . checksums_values . mysql_servers . set_checksum ( buf ) ;
GloVars . checksums_values . mysql_servers . version + + ;
//struct timespec ts;
//clock_gettime(CLOCK_REALTIME, &ts);
time_t t = time ( NULL ) ;
GloVars . checksums_values . mysql_servers . epoch = t ;
GloVars . checksums_values . updates_cnt + + ;
GloVars . generate_global_checksum ( ) ;
GloVars . epoch_version = t ;
pthread_mutex_unlock ( & GloVars . checksum_mutex ) ;
}
}
wrunlock ( ) ;
unsigned long long curtime2 = monotonic_time ( ) ;
curtime1 = curtime1 / 1000 ;
curtime2 = curtime2 / 1000 ;
proxy_info ( " MySQL_HostGroups_Manager::read_only_action_v2() locked for %llums \n " , curtime2 - curtime1 ) ;
}
// shun_and_killall
// this function is called only from MySQL_Monitor::monitor_ping()
@ -7358,3 +7622,127 @@ MySrvC* MySQL_HostGroups_Manager::find_server_in_hg(unsigned int _hid, const std
return f_server ;
}
void MySQL_HostGroups_Manager : : HostGroup_Server_Mapping : : copy ( Type dest_type , Type src_type , bool update_if_exists /*= true*/ ) {
const std : : vector < Node > & src_nodes = mapping [ src_type ] ;
if ( src_nodes . empty ( ) ) return ;
std : : vector < Node > & dest_nodes = mapping [ dest_type ] ;
std : : list < Node > append ;
for ( const auto & node : src_nodes ) {
for ( auto & parent_node : dest_nodes ) {
if ( node . reader_hostgroup_id = = parent_node . reader_hostgroup_id & &
node . writer_hostgroup_id = = parent_node . writer_hostgroup_id ) {
if ( update_if_exists ) {
MySrvC * new_srv = insert_HGM ( get_hostgroup_id ( dest_type , parent_node ) , node . srv ) ;
if ( ! new_srv ) assert ( 0 ) ;
parent_node . srv = new_srv ;
parent_node . server_status = node . server_status ;
}
goto __skip ;
}
}
append . push_back ( node ) ;
__skip :
continue ;
}
if ( append . empty ( ) ) {
return ;
}
if ( dest_nodes . capacity ( ) < ( dest_nodes . size ( ) + append . size ( ) ) )
dest_nodes . reserve ( dest_nodes . size ( ) + append . size ( ) ) ;
//dest_nodes.insert(dest_nodes.end(), append.begin(), append.end());
for ( auto & node : append ) {
MySrvC * new_srv = insert_HGM ( get_hostgroup_id ( dest_type , node ) , node . srv ) ;
if ( ! new_srv ) assert ( 0 ) ;
node . srv = new_srv ;
dest_nodes . push_back ( node ) ;
}
}
void MySQL_HostGroups_Manager : : HostGroup_Server_Mapping : : remove ( Type type , size_t index ) {
std : : vector < Node > & nodes = mapping [ type ] ;
// ensure that we're not attempting to access out of the bounds of the container.
assert ( index < nodes . size ( ) ) ;
remove_HGM ( nodes [ index ] . srv ) ;
//Swap the element with the back element, except in the case when we're the last element.
if ( index + 1 ! = nodes . size ( ) )
std : : swap ( nodes [ index ] , nodes . back ( ) ) ;
//Pop the back of the container, deleting our old element.
nodes . pop_back ( ) ;
}
void MySQL_HostGroups_Manager : : HostGroup_Server_Mapping : : clear ( Type type ) {
for ( const auto & node : mapping [ type ] ) {
remove_HGM ( node . srv ) ;
}
mapping [ type ] . clear ( ) ;
}
unsigned int MySQL_HostGroups_Manager : : HostGroup_Server_Mapping : : get_hostgroup_id ( Type type , size_t index ) const {
if ( type = = Type : : WRITER )
return mapping [ Type : : WRITER ] [ index ] . writer_hostgroup_id ;
else if ( type = = Type : : READER )
return mapping [ Type : : READER ] [ index ] . reader_hostgroup_id ;
else
assert ( 0 ) ;
}
unsigned int MySQL_HostGroups_Manager : : HostGroup_Server_Mapping : : get_hostgroup_id ( Type type , const Node & node ) const {
if ( type = = Type : : WRITER )
return node . writer_hostgroup_id ;
else if ( type = = Type : : READER )
return node . reader_hostgroup_id ;
else
assert ( 0 ) ;
}
MySrvC * MySQL_HostGroups_Manager : : HostGroup_Server_Mapping : : insert_HGM ( unsigned int hostgroup_id , const MySrvC * srv ) {
MyHGC * hostgroup_container = myHGM - > MyHGC_lookup ( hostgroup_id ) ;
if ( ! hostgroup_container )
return NULL ;
if ( GloMTH - > variables . hostgroup_manager_verbose ) {
proxy_info ( " Creating new server in HG %d : %s:%d , gtid_port=%d, weight=%d, status=%d \n " , hostgroup_id , srv - > address , srv - > port , srv - > gtid_port , srv - > weight , srv - > status ) ;
}
proxy_debug ( PROXY_DEBUG_MYSQL_CONNPOOL , 5 , " Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d \n " , srv - > address , srv - > port , srv - > weight , srv - > status , srv , hostgroup_id ) ;
MySrvC * new_srv = new MySrvC ( srv - > address , srv - > port , srv - > gtid_port , srv - > weight , srv - > status , srv - > compression ,
srv - > max_connections , srv - > max_replication_lag , srv - > use_ssl , srv - > max_latency_us , srv - > comment ) ;
hostgroup_container - > mysrvs - > add ( new_srv ) ;
return new_srv ;
}
void MySQL_HostGroups_Manager : : HostGroup_Server_Mapping : : remove_HGM ( MySrvC * srv ) {
proxy_warning ( " Removed server at address %p, hostgroup %d, address %s port %d. Setting status OFFLINE HARD and immediately dropping all free connections. Used connections will be dropped when trying to use them \n " , ( void * ) srv , srv - > myhgc - > hid , srv - > address , srv - > port ) ;
srv - > status = MYSQL_SERVER_STATUS_OFFLINE_HARD ;
srv - > ConnectionsFree - > drop_all_connections ( ) ;
}