@ -70,6 +70,7 @@ struct cmp_str {
class MySQL_Monitor_Connection_Pool {
private :
pthread_mutex_t mutex ;
int size ;
//std::map<std::pair<char *, std::list<MYSQL *>* > my_connections;
std : : map < char * , std : : list < MYSQL * > * , cmp_str > my_connections ;
@ -78,13 +79,94 @@ class MySQL_Monitor_Connection_Pool {
~ MySQL_Monitor_Connection_Pool ( ) ;
MYSQL * get_connection ( char * hostname , int port ) ;
void put_connection ( char * hostname , int port , MYSQL * my ) ;
void purge_missing_servers ( SQLite3_result * resultset ) ;
} ;
MySQL_Monitor_Connection_Pool : : MySQL_Monitor_Connection_Pool ( ) {
size = 0 ;
pthread_mutex_init ( & mutex , NULL ) ;
}
MySQL_Monitor_Connection_Pool : : ~ MySQL_Monitor_Connection_Pool ( ) {
purge_missing_servers ( NULL ) ;
}
void MySQL_Monitor_Connection_Pool : : purge_missing_servers ( SQLite3_result * resultset ) {
# define POLLUTE_LENGTH 8
char pollute_buf [ POLLUTE_LENGTH ] ;
srand ( monotonic_time ( ) ) ;
for ( int i = 0 ; i < POLLUTE_LENGTH ; i + + ) {
pollute_buf [ i ] = ( char ) rand ( ) ;
}
std : : list < MYSQL * > * purge_lst = NULL ;
purge_lst = new std : : list < MYSQL * > ;
pthread_mutex_lock ( & mutex ) ;
if ( resultset = = NULL ) {
goto __purge_all ;
}
for ( std : : vector < SQLite3_row * > : : iterator it = resultset - > rows . begin ( ) ; it ! = resultset - > rows . end ( ) ; + + it ) {
// for each host configured ...
SQLite3_row * r = * it ;
char * buf = ( char * ) malloc ( 16 + strlen ( r - > fields [ 0 ] ) ) ;
sprintf ( buf , " %s:%s " , r - > fields [ 0 ] , r - > fields [ 1 ] ) ;
std : : map < char * , std : : list < MYSQL * > * , cmp_str > : : iterator it2 ;
it2 = my_connections . find ( buf ) ; // find the host
free ( buf ) ;
if ( it2 ! = my_connections . end ( ) ) { // if the host exists
std : : list < MYSQL * > * lst = it2 - > second ;
std : : list < MYSQL * > : : const_iterator it3 ;
for ( it3 = lst - > begin ( ) ; it3 ! = lst - > end ( ) ; + + it3 ) {
MYSQL * my = * it3 ;
memcpy ( my - > net . buff , pollute_buf , 8 ) ; // pollute this buffer
//
}
}
}
__purge_all :
std : : map < char * , std : : list < MYSQL * > * > : : iterator it ;
//std::map<std::string, std::map<std::string, std::string>>::iterator it_type;
for ( it = my_connections . begin ( ) ; it ! = my_connections . end ( ) ; it + + ) {
std : : list < MYSQL * > * lst = it - > second ;
if ( ! lst - > empty ( ) ) {
std : : list < MYSQL * > : : const_iterator it3 ;
it3 = lst - > begin ( ) ;
MYSQL * my = * it3 ;
if ( memcmp ( my - > net . buff , pollute_buf , 8 ) ) {
// the buffer is not polluted, it means it didn't match previously
while ( ! lst - > empty ( ) ) {
my = lst - > front ( ) ;
lst - > pop_front ( ) ;
purge_lst - > push_back ( my ) ;
}
} else {
// try to keep maximum 2 free connections
// dropping all the others
while ( lst - > size ( ) > 2 ) {
my = lst - > front ( ) ;
lst - > pop_front ( ) ;
purge_lst - > push_back ( my ) ;
}
}
}
}
pthread_mutex_unlock ( & mutex ) ;
char quit_buff [ 5 ] ;
memset ( quit_buff , 0 , 5 ) ;
quit_buff [ 0 ] = 1 ;
quit_buff [ 4 ] = 1 ;
// close all idle connections
while ( ! purge_lst - > empty ( ) ) {
MYSQL * my = purge_lst - > front ( ) ;
purge_lst - > pop_front ( ) ;
int fd = my - > net . fd ;
int wb = write ( fd , quit_buff , 5 ) ;
fd + = wb ; // dummy, to make compiler happy
fd - = wb ; // dummy, to make compiler happy
mysql_close_no_command ( my ) ;
shutdown ( fd , SHUT_RDWR ) ;
}
delete purge_lst ;
}
MYSQL * MySQL_Monitor_Connection_Pool : : get_connection ( char * hostname , int port ) {
@ -92,6 +174,7 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port)
//it = my_connections.find(std::make_pair(hostname,port));
char * buf = ( char * ) malloc ( 16 + strlen ( hostname ) ) ;
sprintf ( buf , " %s:%d " , hostname , port ) ;
pthread_mutex_lock ( & mutex ) ;
it = my_connections . find ( buf ) ;
free ( buf ) ;
if ( it ! = my_connections . end ( ) ) {
@ -100,9 +183,12 @@ MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port)
MYSQL * ret = lst - > front ( ) ;
lst - > pop_front ( ) ;
size - - ;
pthread_mutex_unlock ( & mutex ) ;
memset ( ret - > net . buff , 0 , 8 ) ; // reset what was polluted
return ret ;
}
}
pthread_mutex_unlock ( & mutex ) ;
return NULL ;
}
@ -111,6 +197,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS
std : : map < char * , std : : list < MYSQL * > * , cmp_str > : : iterator it ;
char * buf = ( char * ) malloc ( 16 + strlen ( hostname ) ) ;
sprintf ( buf , " %s:%d " , hostname , port ) ;
pthread_mutex_lock ( & mutex ) ;
it = my_connections . find ( buf ) ;
std : : list < MYSQL * > * lst = NULL ;
if ( it = = my_connections . end ( ) ) {
@ -121,6 +208,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS
lst = it - > second ;
}
lst - > push_back ( my ) ;
pthread_mutex_unlock ( & mutex ) ;
}
enum MySQL_Monitor_State_Data_Task_Type {
@ -565,6 +653,7 @@ MySQL_Monitor::~MySQL_Monitor() {
delete tables_defs_monitor ;
delete monitordb ;
delete admindb ;
delete My_Conn_Pool ;
} ;
@ -630,6 +719,13 @@ void * MySQL_Monitor::monitor_connect() {
unsigned int glover ;
t1 = monotonic_time ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
next_loop_at = 0 ;
}
if ( t1 < next_loop_at ) {
goto __sleep_monitor_connect_loop ;
}
@ -643,19 +739,13 @@ void * MySQL_Monitor::monitor_connect() {
// create libevent base
libevent_base = event_base_new ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
//proxy_error("%s\n", "MySQL_Monitor - CONNECT - refreshing variables");
}
proxy_debug ( PROXY_DEBUG_ADMIN , 4 , " %s \n " , query ) ;
admindb - > execute_statement ( query , & error , & cols , & affected_rows , & resultset ) ;
if ( error ) {
proxy_error ( " Error on %s : %s \n " , query , error ) ;
goto __end_monitor_connect_loop ;
} else {
GloMyMon - > My_Conn_Pool - > purge_missing_servers ( resultset ) ;
if ( resultset - > rows_count = = 0 ) {
goto __end_monitor_connect_loop ;
}
@ -725,6 +815,10 @@ __sleep_monitor_connect_loop:
usleep ( st ) ;
}
}
if ( mysql_thr ) {
delete mysql_thr ;
mysql_thr = NULL ;
}
return NULL ;
}
@ -754,9 +848,16 @@ void * MySQL_Monitor::monitor_ping() {
SQLite3_result * resultset = NULL ;
MySQL_Monitor_State_Data * * sds = NULL ;
int i = 0 ;
char * query = ( char * ) " SELECT DISTINCT hostname, port FROM mysql_servers " ;
char * query = ( char * ) " SELECT DISTINCT hostname, port FROM mysql_servers WHERE status!='OFFLINE_HARD' " ;
t1 = monotonic_time ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
next_loop_at = 0 ;
}
if ( t1 < next_loop_at ) {
goto __sleep_monitor_ping_loop ;
}
@ -770,13 +871,6 @@ void * MySQL_Monitor::monitor_ping() {
// create libevent base
libevent_base = event_base_new ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
//proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables");
}
proxy_debug ( PROXY_DEBUG_ADMIN , 4 , " %s \n " , query ) ;
admindb - > execute_statement ( query , & error , & cols , & affected_rows , & resultset ) ;
if ( error ) {
@ -861,6 +955,10 @@ __sleep_monitor_ping_loop:
usleep ( st ) ;
}
}
if ( mysql_thr ) {
delete mysql_thr ;
mysql_thr = NULL ;
}
return NULL ;
}
@ -887,9 +985,16 @@ void * MySQL_Monitor::monitor_read_only() {
SQLite3_result * resultset = NULL ;
MySQL_Monitor_State_Data * * sds = NULL ;
int i = 0 ;
char * query = ( char * ) " SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup " ;
char * query = ( char * ) " SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status!='OFFLINE_HARD' " ;
t1 = monotonic_time ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
next_loop_at = 0 ;
}
if ( t1 < next_loop_at ) {
goto __sleep_monitor_read_only ;
}
@ -903,13 +1008,6 @@ void * MySQL_Monitor::monitor_read_only() {
// create libevent base
libevent_base = event_base_new ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
//proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables");
}
proxy_debug ( PROXY_DEBUG_ADMIN , 4 , " %s \n " , query ) ;
// admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
resultset = MyHGM - > execute_query ( query , & error ) ;
@ -1036,6 +1134,10 @@ __sleep_monitor_read_only:
usleep ( st ) ;
}
}
if ( mysql_thr ) {
delete mysql_thr ;
mysql_thr = NULL ;
}
return NULL ;
}
@ -1065,6 +1167,13 @@ void * MySQL_Monitor::monitor_replication_lag() {
char * query = ( char * ) " SELECT hostgroup_id, hostname, port, max_replication_lag FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT LIKE 'OFFLINE%' " ;
t1 = monotonic_time ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
next_loop_at = 0 ;
}
if ( t1 < next_loop_at ) {
goto __sleep_monitor_replication_lag ;
}
@ -1078,13 +1187,6 @@ void * MySQL_Monitor::monitor_replication_lag() {
// create libevent base
libevent_base = event_base_new ( ) ;
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
//proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables");
}
proxy_debug ( PROXY_DEBUG_ADMIN , 4 , " %s \n " , query ) ;
// admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
resultset = MyHGM - > execute_query ( query , & error ) ;
@ -1207,6 +1309,10 @@ __sleep_monitor_replication_lag:
usleep ( st ) ;
}
}
if ( mysql_thr ) {
delete mysql_thr ;
mysql_thr = NULL ;
}
return NULL ;
}
@ -1227,6 +1333,7 @@ void * MySQL_Monitor::run() {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
//proxy_error("%s\n","MySQL_Monitor refreshing variables");
My_Conn_Pool - > purge_missing_servers ( NULL ) ;
}
usleep ( 500000 ) ;
}
@ -1234,5 +1341,9 @@ void * MySQL_Monitor::run() {
monitor_ping_thread - > join ( ) ;
monitor_read_only_thread - > join ( ) ;
monitor_replication_lag_thread - > join ( ) ;
if ( mysql_thr ) {
delete mysql_thr ;
mysql_thr = NULL ;
}
return NULL ;
} ;