@ -316,6 +316,15 @@ void * monitor_read_only_pthread(void *arg) {
return NULL ;
}
void * monitor_group_replication_pthread ( void * arg ) {
# ifndef NOJEM
bool cache = false ;
mallctl ( " thread.tcache.enabled " , NULL , NULL , & cache , sizeof ( bool ) ) ;
# endif
GloMyMon - > monitor_group_replication ( ) ;
return NULL ;
}
void * monitor_replication_lag_pthread ( void * arg ) {
# ifndef NOJEM
bool cache = false ;
@ -332,6 +341,7 @@ MySQL_Monitor::MySQL_Monitor() {
My_Conn_Pool = new MySQL_Monitor_Connection_Pool ( ) ;
pthread_mutex_init ( & group_replication_mutex , NULL ) ;
Group_Replication_Hosts_resultset = NULL ;
shutdown = false ;
monitor_enabled = true ; // default
@ -374,6 +384,10 @@ MySQL_Monitor::~MySQL_Monitor() {
delete monitordb ;
delete admindb ;
delete My_Conn_Pool ;
if ( Group_Replication_Hosts_resultset ) {
delete Group_Replication_Hosts_resultset ;
Group_Replication_Hosts_resultset = NULL ;
}
} ;
@ -768,6 +782,180 @@ __fast_exit_monitor_read_only_thread:
return NULL ;
}
void * monitor_group_replication_thread ( void * arg ) {
MySQL_Monitor_State_Data * mmsd = ( MySQL_Monitor_State_Data * ) arg ;
MySQL_Thread * mysql_thr = new MySQL_Thread ( ) ;
mysql_thr - > curtime = monotonic_time ( ) ;
mysql_thr - > refresh_variables ( ) ;
if ( ! GloMTH ) return NULL ; // quick exit during shutdown/restart
mmsd - > mysql = GloMyMon - > My_Conn_Pool - > get_connection ( mmsd - > hostname , mmsd - > port ) ;
unsigned long long start_time = mysql_thr - > curtime ;
mmsd - > t1 = start_time ;
bool crc = false ;
if ( mmsd - > mysql = = NULL ) { // we don't have a connection, let's create it
bool rc ;
rc = mmsd - > create_new_connection ( ) ;
crc = true ;
if ( rc = = false ) {
goto __fast_exit_monitor_group_replication_thread ;
}
}
mmsd - > t1 = monotonic_time ( ) ;
//async_exit_status=mysql_change_user_start(&ret_bool, mysql,"msandbox2","msandbox2","information_schema");
//mmsd->async_exit_status=mysql_ping_start(&mmsd->interr,mmsd->mysql);
mmsd - > async_exit_status = mysql_query_start ( & mmsd - > interr , mmsd - > mysql , " SELECT viable_candidate,read_only,transactions_behind FROM sys.gr_member_routing_candidate_status " ) ;
while ( mmsd - > async_exit_status ) {
mmsd - > async_exit_status = wait_for_mysql ( mmsd - > mysql , mmsd - > async_exit_status ) ;
unsigned long long now = monotonic_time ( ) ;
if ( now > mmsd - > t1 + mysql_thread___monitor_groupreplication_healthcheck_timeout * 1000 ) {
mmsd - > mysql_error_msg = strdup ( " timeout check " ) ;
proxy_error ( " Timeout on group replication health check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_groupreplication_healthcheck_timeout. Assuming read_only=1 \n " , mmsd - > hostname , mmsd - > port , ( now - mmsd - > t1 ) / 1000 ) ;
goto __exit_monitor_group_replication_thread ;
}
if ( GloMyMon - > shutdown = = true ) {
goto __fast_exit_monitor_group_replication_thread ; // exit immediately
}
if ( ( mmsd - > async_exit_status & MYSQL_WAIT_TIMEOUT ) = = 0 ) {
mmsd - > async_exit_status = mysql_query_cont ( & mmsd - > interr , mmsd - > mysql , mmsd - > async_exit_status ) ;
}
}
mmsd - > async_exit_status = mysql_store_result_start ( & mmsd - > result , mmsd - > mysql ) ;
while ( mmsd - > async_exit_status ) {
mmsd - > async_exit_status = wait_for_mysql ( mmsd - > mysql , mmsd - > async_exit_status ) ;
unsigned long long now = monotonic_time ( ) ;
if ( now > mmsd - > t1 + mysql_thread___monitor_groupreplication_healthcheck_timeout * 1000 ) {
mmsd - > mysql_error_msg = strdup ( " timeout check " ) ;
proxy_error ( " Timeout on group replication health check for %s:%d after %lldms. If the server is overload, increase mysql-monitor_groupreplication_healthcheck_timeout. Assuming read_only=1 \n " , mmsd - > hostname , mmsd - > port , ( now - mmsd - > t1 ) / 1000 ) ;
goto __exit_monitor_group_replication_thread ;
}
if ( GloMyMon - > shutdown = = true ) {
goto __fast_exit_monitor_group_replication_thread ; // exit immediately
}
if ( ( mmsd - > async_exit_status & MYSQL_WAIT_TIMEOUT ) = = 0 ) {
mmsd - > async_exit_status = mysql_store_result_cont ( & mmsd - > result , mmsd - > mysql , mmsd - > async_exit_status ) ;
}
}
if ( mmsd - > interr ) { // ping failed
mmsd - > mysql_error_msg = strdup ( mysql_error ( mmsd - > mysql ) ) ;
}
__exit_monitor_group_replication_thread :
mmsd - > t2 = monotonic_time ( ) ;
{
// TODO : complete this
char buf [ 128 ] ;
char * s = NULL ;
int l = strlen ( mmsd - > hostname ) ;
if ( l < 110 ) {
s = buf ;
} else {
s = ( char * ) malloc ( l + 16 ) ;
}
sprintf ( s , " %s:%d " , mmsd - > hostname , mmsd - > port ) ;
pthread_mutex_lock ( & GloMyMon - > group_replication_mutex ) ;
auto it =
// TODO : complete this
pthread_mutex_unlock ( & GloMyMon - > group_replication_mutex ) ;
if ( l < 110 ) {
} else {
free ( s ) ;
}
/*
sqlite3_stmt * statement = NULL ;
sqlite3 * mondb = mmsd - > mondb - > get_db ( ) ;
int rc ;
char * query = NULL ;
query = ( char * ) " INSERT OR REPLACE INTO mysql_server_read_only_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6) " ;
rc = sqlite3_prepare_v2 ( mondb , query , - 1 , & statement , 0 ) ;
assert ( rc = = SQLITE_OK ) ;
int read_only = 1 ; // as a safety mechanism , read_only=1 is the default
rc = sqlite3_bind_text ( statement , 1 , mmsd - > hostname , - 1 , SQLITE_TRANSIENT ) ; assert ( rc = = SQLITE_OK ) ;
rc = sqlite3_bind_int ( statement , 2 , mmsd - > port ) ; assert ( rc = = SQLITE_OK ) ;
unsigned long long time_now = realtime_time ( ) ;
time_now = time_now - ( mmsd - > t2 - start_time ) ;
rc = sqlite3_bind_int64 ( statement , 3 , time_now ) ; assert ( rc = = SQLITE_OK ) ;
rc = sqlite3_bind_int64 ( statement , 4 , ( mmsd - > mysql_error_msg ? 0 : mmsd - > t2 - mmsd - > t1 ) ) ; assert ( rc = = SQLITE_OK ) ;
if ( mmsd - > result ) {
int num_fields = 0 ;
int k = 0 ;
MYSQL_FIELD * fields = NULL ;
int j = - 1 ;
num_fields = mysql_num_fields ( mmsd - > result ) ;
fields = mysql_fetch_fields ( mmsd - > result ) ;
for ( k = 0 ; k < num_fields ; k + + ) {
//if (strcmp("VARIABLE_NAME", fields[k].name)==0) {
if ( strcmp ( ( char * ) " Value " , ( char * ) fields [ k ] . name ) = = 0 ) {
j = k ;
}
}
if ( j > - 1 ) {
MYSQL_ROW row = mysql_fetch_row ( mmsd - > result ) ;
if ( row ) {
if ( row [ j ] ) {
if ( ! strcmp ( row [ j ] , " 0 " ) | | ! strcasecmp ( row [ j ] , " OFF " ) )
read_only = 0 ;
}
}
}
// if (repl_lag>=0) {
rc = sqlite3_bind_int64 ( statement , 5 , read_only ) ; assert ( rc = = SQLITE_OK ) ;
// } else {
// rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK);
// }
mysql_free_result ( mmsd - > result ) ;
mmsd - > result = NULL ;
} else {
rc = sqlite3_bind_null ( statement , 5 ) ; assert ( rc = = SQLITE_OK ) ;
}
rc = sqlite3_bind_text ( statement , 6 , mmsd - > mysql_error_msg , - 1 , SQLITE_TRANSIENT ) ; assert ( rc = = SQLITE_OK ) ;
SAFE_SQLITE3_STEP ( statement ) ;
rc = sqlite3_clear_bindings ( statement ) ; assert ( rc = = SQLITE_OK ) ;
rc = sqlite3_reset ( statement ) ; assert ( rc = = SQLITE_OK ) ;
MyHGM - > read_only_action ( mmsd - > hostname , mmsd - > port , read_only ) ;
sqlite3_finalize ( statement ) ;
*/
}
if ( mmsd - > interr ) { // check failed
} else {
if ( crc = = false ) {
if ( mmsd - > mysql ) {
GloMyMon - > My_Conn_Pool - > put_connection ( mmsd - > hostname , mmsd - > port , mmsd - > mysql ) ;
mmsd - > mysql = NULL ;
}
}
}
__fast_exit_monitor_group_replication_thread :
if ( mmsd - > mysql ) {
// if we reached here we didn't put the connection back
if ( mmsd - > mysql_error_msg ) {
mysql_close ( mmsd - > mysql ) ; // if we reached here we should destroy it
mmsd - > mysql = NULL ;
} else {
if ( crc ) {
bool rc = mmsd - > set_wait_timeout ( ) ;
if ( rc ) {
GloMyMon - > My_Conn_Pool - > put_connection ( mmsd - > hostname , mmsd - > port , mmsd - > mysql ) ;
} else {
mysql_close ( mmsd - > mysql ) ; // set_wait_timeout failed
}
mmsd - > mysql = NULL ;
} else { // really not sure how we reached here, drop it
mysql_close ( mmsd - > mysql ) ;
mmsd - > mysql = NULL ;
}
}
}
delete mysql_thr ;
return NULL ;
}
void * monitor_replication_lag_thread ( void * arg ) {
MySQL_Monitor_State_Data * mmsd = ( MySQL_Monitor_State_Data * ) arg ;
MySQL_Thread * mysql_thr = new MySQL_Thread ( ) ;
@ -1376,6 +1564,132 @@ __sleep_monitor_read_only:
return NULL ;
}
void * MySQL_Monitor : : monitor_group_replication ( ) {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
// struct event_base *libevent_base;
unsigned int latest_table_servers_version = 0 ;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version ;
MySQL_Thread * mysql_thr = new MySQL_Thread ( ) ;
mysql_thr - > curtime = monotonic_time ( ) ;
MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH - > get_global_version ( ) ;
mysql_thr - > refresh_variables ( ) ;
if ( ! GloMTH ) return NULL ; // quick exit during shutdown/restart
unsigned long long t1 ;
unsigned long long t2 ;
unsigned long long next_loop_at = 0 ;
while ( GloMyMon - > shutdown = = false & & mysql_thread___monitor_enabled = = true ) {
unsigned int glover ;
// char *error=NULL;
// SQLite3_result *resultset=NULL;
// add support for SSL
// char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=writer_hostgroup hostgroup_id=reader_hostgroup WHERE status NOT LIKE 'OFFLINE\%' GROUP BY hostname, port";
t1 = monotonic_time ( ) ;
if ( ! GloMTH ) return NULL ; // quick exit during shutdown/restart
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
next_loop_at = 0 ;
}
pthread_mutex_lock ( & group_replication_mutex ) ;
if ( t1 < next_loop_at ) {
goto __sleep_monitor_group_replication ;
}
next_loop_at = t1 + 1000 * mysql_thread___monitor_groupreplication_healthcheck_interval ;
// proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
// admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
// resultset = MyHGM->execute_query(query, &error);
// assert(resultset);
if ( Group_Replication_Hosts_resultset = = NULL ) {
goto __end_monitor_group_replication_loop ;
// }
// if (error) {
// proxy_error("Error on %s : %s\n", query, error);
// goto __end_monitor_read_only_loop;
} else {
if ( Group_Replication_Hosts_resultset - > rows_count = = 0 ) {
goto __end_monitor_group_replication_loop ;
}
int us = 100 ;
if ( Group_Replication_Hosts_resultset - > rows_count ) {
us = mysql_thread___monitor_read_only_interval / 2 / Group_Replication_Hosts_resultset - > rows_count ;
}
for ( std : : vector < SQLite3_row * > : : iterator it = Group_Replication_Hosts_resultset - > rows . begin ( ) ; it ! = Group_Replication_Hosts_resultset - > rows . end ( ) ; + + it ) {
SQLite3_row * r = * it ;
MySQL_Monitor_State_Data * mmsd = new MySQL_Monitor_State_Data ( r - > fields [ 0 ] , atoi ( r - > fields [ 1 ] ) , NULL , atoi ( r - > fields [ 2 ] ) ) ;
mmsd - > mondb = monitordb ;
//pthread_t thr_;
//if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) {
// perror("Thread creation monitor_read_only_thread");
//}
WorkItem * item ;
item = new WorkItem ( mmsd , monitor_read_only_thread ) ;
GloMyMon - > queue . add ( item ) ;
usleep ( us ) ;
if ( GloMyMon - > shutdown ) {
pthread_mutex_unlock ( & group_replication_mutex ) ;
return NULL ;
}
}
}
__end_monitor_group_replication_loop :
pthread_mutex_unlock ( & group_replication_mutex ) ;
if ( mysql_thread___monitor_enabled = = true ) {
/*
sqlite3_stmt * statement = NULL ;
sqlite3 * mondb = monitordb - > get_db ( ) ;
int rc ;
char * query = NULL ;
query = ( char * ) " DELETE FROM mysql_server_read_only_log WHERE time_start_us < ?1 " ;
rc = sqlite3_prepare_v2 ( mondb , query , - 1 , & statement , 0 ) ;
assert ( rc = = SQLITE_OK ) ;
if ( mysql_thread___monitor_history < mysql_thread___monitor_ping_interval * ( mysql_thread___monitor_ping_max_failures + 1 ) ) { // issue #626
if ( mysql_thread___monitor_ping_interval < 3600000 )
mysql_thread___monitor_history = mysql_thread___monitor_ping_interval * ( mysql_thread___monitor_ping_max_failures + 1 ) ;
}
unsigned long long time_now = realtime_time ( ) ;
rc = sqlite3_bind_int64 ( statement , 1 , time_now - ( unsigned long long ) mysql_thread___monitor_history * 1000 ) ; assert ( rc = = SQLITE_OK ) ;
SAFE_SQLITE3_STEP ( statement ) ;
rc = sqlite3_clear_bindings ( statement ) ; assert ( rc = = SQLITE_OK ) ;
rc = sqlite3_reset ( statement ) ; assert ( rc = = SQLITE_OK ) ;
sqlite3_finalize ( statement ) ;
*/
}
// if (resultset)
// delete resultset;
__sleep_monitor_group_replication :
t2 = monotonic_time ( ) ;
if ( t2 < next_loop_at ) {
unsigned long long st = 0 ;
st = next_loop_at - t2 ;
if ( st > 500000 ) {
st = 500000 ;
}
usleep ( st ) ;
}
}
if ( mysql_thr ) {
delete mysql_thr ;
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
GloMyMon - > queue . add ( item ) ;
}
return NULL ;
}
void * MySQL_Monitor : : monitor_replication_lag ( ) {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
//struct event_base *libevent_base;
@ -1521,6 +1835,8 @@ __monitor_run:
pthread_create ( & monitor_ping_thread , & attr , & monitor_ping_pthread , NULL ) ;
pthread_t monitor_read_only_thread ;
pthread_create ( & monitor_read_only_thread , & attr , & monitor_read_only_pthread , NULL ) ;
pthread_t monitor_group_replication_thread ;
pthread_create ( & monitor_group_replication_thread , & attr , & monitor_group_replication_pthread , NULL ) ;
pthread_t monitor_replication_lag_thread ;
pthread_create ( & monitor_replication_lag_thread , & attr , & monitor_replication_lag_pthread , NULL ) ;
while ( shutdown = = false & & mysql_thread___monitor_enabled = = true ) {
@ -1566,6 +1882,7 @@ __monitor_run:
pthread_join ( monitor_connect_thread , NULL ) ;
pthread_join ( monitor_ping_thread , NULL ) ;
pthread_join ( monitor_read_only_thread , NULL ) ;
pthread_join ( monitor_group_replication_thread , NULL ) ;
pthread_join ( monitor_replication_lag_thread , NULL ) ;
while ( shutdown = = false ) {
unsigned int glover ;
@ -1598,6 +1915,10 @@ MyGR_monitor_node::MyGR_monitor_node(char *_a, int _p, int _whg) {
port = _p ;
idx_last_entry = - 1 ;
writer_hostgroup = _whg ;
int i ;
for ( i = 0 ; i < MyGR_Nentries ; i + + ) {
last_entries [ i ] . error = NULL ;
}
}
MyGR_monitor_node : : ~ MyGR_monitor_node ( ) {
@ -1607,7 +1928,7 @@ MyGR_monitor_node::~MyGR_monitor_node() {
}
// return true if status changed
bool MyGR_monitor_node : : add_entry ( unsigned long long _ct , long long _tb , bool _pp , bool _ro ) {
bool MyGR_monitor_node : : add_entry ( unsigned long long _ct , long long _tb , bool _pp , bool _ro , char * _error ) {
bool ret = false ;
if ( idx_last_entry = = - 1 ) ret = true ;
int prev_last_entry = idx_last_entry ;
@ -1619,6 +1940,13 @@ bool MyGR_monitor_node::add_entry(unsigned long long _ct, long long _tb, bool _p
last_entries [ idx_last_entry ] . transactions_behind = _tb ;
last_entries [ idx_last_entry ] . primary_partition = _pp ;
last_entries [ idx_last_entry ] . read_only = _ro ;
if ( last_entries [ idx_last_entry ] . error ) {
free ( last_entries [ idx_last_entry ] . error ) ;
last_entries [ idx_last_entry ] . error = NULL ;
}
if ( _error ) {
last_entries [ idx_last_entry ] . error = strdup ( _error ) ; // we always copy
}
if ( ret = = false ) {
if ( last_entries [ idx_last_entry ] . primary_partition ! = last_entries [ prev_last_entry ] . primary_partition ) {
ret = true ;
@ -1626,6 +1954,15 @@ bool MyGR_monitor_node::add_entry(unsigned long long _ct, long long _tb, bool _p
if ( last_entries [ idx_last_entry ] . read_only ! = last_entries [ prev_last_entry ] . read_only ) {
ret = true ;
}
if (
( last_entries [ idx_last_entry ] . error & & last_entries [ prev_last_entry ] . error = = NULL )
| |
( last_entries [ idx_last_entry ] . error = = NULL & & last_entries [ prev_last_entry ] . error )
| |
( last_entries [ idx_last_entry ] . error & & last_entries [ prev_last_entry ] . error & & strcmp ( last_entries [ idx_last_entry ] . error , last_entries [ prev_last_entry ] . error ) )
) {
ret = true ;
}
}
return ret ;
}