@ -10,6 +10,7 @@
# include <map>
# include <mutex>
# include <thread>
# include <future>
# include <prometheus/counter.h>
# include "MySQL_Protocol.h"
# include "MySQL_HostGroups_Manager.h"
@ -55,45 +56,53 @@ static MySQL_Monitor *GloMyMon;
} while ( rc = = SQLITE_LOCKED | | rc = = SQLITE_BUSY ) ; \
} while ( 0 )
template < typename T , bool check_monitor_enabled_flag = true >
class ConsumerThread : public Thread {
wqueue < WorkItem *> & m_queue ;
wqueue < WorkItem <T > *> & m_queue ;
int thrn ;
public :
ConsumerThread ( wqueue < WorkItem *> & queue , int _n ) : m_queue ( queue ) {
ConsumerThread ( wqueue < WorkItem <T > *> & queue , int _n ) : m_queue ( queue ) {
thrn = _n ;
}
void * run ( ) {
// Remove 1 item at a time and process it. Blocks if no items are
// available to process.
for ( int i = 0 ; ( thrn ? i < thrn : 1 ) ; i + + ) {
//VALGRIND_DISABLE_ERROR_REPORTING;
WorkItem * item = ( WorkItem * ) m_queue . remove ( ) ;
//VALGRIND_ENABLE_ERROR_REPORTING;
if ( item = = NULL ) {
for ( int i = 0 ; ( thrn ? i < thrn : 1 ) ; i + + ) {
//VALGRIND_DISABLE_ERROR_REPORTING;
WorkItem <T > * item = ( WorkItem < T > * ) m_queue . remove ( ) ;
//VALGRIND_ENABLE_ERROR_REPORTING;
if ( item = = NULL ) {
if ( thrn ) {
// we took a NULL item that wasn't meant to reach here! Add it again
WorkItem * item = NULL ;
GloMyMon- > queue - > add ( item ) ;
WorkItem < T > * item = NULL ;
m_queue. add ( item ) ;
}
// this is intentional to EXIT immediately
return NULL ;
}
if ( item - > routine ) { // NULL is allowed, do nothing for it
bool me = true ;
if ( check_monitor_enabled_flag ) {
pthread_mutex_lock ( & GloMyMon - > mon_en_mutex ) ;
me = GloMyMon - > monitor_enabled ;
pthread_mutex_unlock ( & GloMyMon - > mon_en_mutex ) ;
}
pthread_mutex_lock ( & GloMyMon - > mon_en_mutex ) ;
bool me = GloMyMon - > monitor_enabled ;
pthread_mutex_unlock ( & GloMyMon - > mon_en_mutex ) ;
if ( me ) {
item - > routine ( ( void * ) item - > mmsd ) ;
item - > routine ( ( void * ) item - > data ) ;
}
}
delete item - > mms d;
delete item - > data ;
delete item ;
}
return NULL ;
}
} ;
using DNSResolverThread = ConsumerThread < DNS_Resolve_Data , false > ;
static int wait_for_mysql ( MYSQL * mysql , int status ) {
struct pollfd pfd ;
@ -363,7 +372,7 @@ void MySQL_Monitor_Connection_Pool::purge_some_connections() {
MYSQL * my = ( MYSQL * ) srv - > conns - > remove_index_fast ( 0 ) ;
MySQL_Monitor_State_Data * mmsd = new MySQL_Monitor_State_Data ( ( char * ) " " , 0 , NULL , false ) ;
mmsd - > mysql = my ;
GloMyMon - > queue - > add ( new WorkItem ( mmsd , NULL ) ) ;
GloMyMon - > queue - > add ( new WorkItem < MySQL_Monitor_State_Data > ( mmsd , NULL ) ) ;
}
for ( unsigned int j = 0 ; j < srv - > conns - > len ; j + + ) {
MYSQL * my = ( MYSQL * ) srv - > conns - > index ( j ) ;
@ -372,7 +381,7 @@ void MySQL_Monitor_Connection_Pool::purge_some_connections() {
srv - > conns - > remove_index_fast ( j ) ;
MySQL_Monitor_State_Data * mmsd = new MySQL_Monitor_State_Data ( ( char * ) " " , 0 , NULL , false ) ;
mmsd - > mysql = my ;
GloMyMon - > queue - > add ( new WorkItem ( mmsd , NULL ) ) ;
GloMyMon - > queue - > add ( new WorkItem < MySQL_Monitor_State_Data > ( mmsd , NULL ) ) ;
}
}
}
@ -560,6 +569,19 @@ void * monitor_replication_lag_pthread(void *arg) {
return NULL ;
}
void * monitor_dns_cache_pthread ( void * arg ) {
# ifndef NOJEM
bool cache = false ;
mallctl ( " thread.tcache.enabled " , NULL , NULL , & cache , sizeof ( bool ) ) ;
# endif
while ( GloMTH = = NULL ) {
usleep ( 50000 ) ;
}
usleep ( 100000 ) ;
GloMyMon - > monitor_dns_cache ( ) ;
return NULL ;
}
using metric_name = std : : string ;
using metric_help = std : : string ;
using metric_tags = std : : map < std : : string , std : : string > ;
@ -674,6 +696,27 @@ mon_metrics_map = std::make_tuple(
metric_tags {
{ " status " , " err " }
}
) ,
// ====================================================================
// ====================================================================
std : : make_tuple (
p_mon_counter : : mysql_monitor_dns_cache_queried ,
" proxysql_mysql_monitor_dns_cache_queried " ,
" Number of dns queried 'dns_cache_queried' from 'monitor_dns_resolver_thread'. " ,
metric_tags { }
) ,
std : : make_tuple (
p_mon_counter : : mysql_monitor_dns_cache_lookup_success ,
" proxysql_mysql_monitor_dns_cache_lookup_success " ,
" Number of dns queried 'dns_cache_lookup_success' from 'monitor_dns_resolver_thread'. " ,
metric_tags { }
) ,
std : : make_tuple (
p_mon_counter : : mysql_monitor_dns_cache_record_updated ,
" proxysql_mysql_monitor_dns_cache_record_updated " ,
" Number of dns queried 'dns_cache_record_updated' from 'monitor_dns_resolver_thread'. " ,
metric_tags { }
)
// ====================================================================
} ,
@ -694,12 +737,12 @@ mon_metrics_map = std::make_tuple(
) ;
MySQL_Monitor : : MySQL_Monitor ( ) {
dns_cache = std : : make_shared < DNS_Cache > ( ) ;
GloMyMon = this ;
My_Conn_Pool = new MySQL_Monitor_Connection_Pool ( ) ;
queue = std : : unique_ptr < wqueue < WorkItem *> > ( new wqueue < WorkItem * > ( ) ) ;
queue = std : : unique_ptr < wqueue < WorkItem <MySQL_Monitor_State_Data > *> > ( new wqueue < WorkItem < MySQL_Monitor_State_Data > * > ( ) ) ;
pthread_mutex_init ( & group_replication_mutex , NULL ) ;
Group_Replication_Hosts_resultset = NULL ;
@ -754,7 +797,7 @@ MySQL_Monitor::MySQL_Monitor() {
num_threads = 2 ;
aux_threads = 0 ;
started_threads = 0 ;
connect_check_OK = 0 ;
connect_check_ERR = 0 ;
ping_check_OK = 0 ;
@ -763,8 +806,9 @@ MySQL_Monitor::MySQL_Monitor() {
read_only_check_ERR = 0 ;
replication_lag_check_OK = 0 ;
replication_lag_check_ERR = 0 ;
dns_cache_queried = 0 ;
dns_cache_lookup_success = 0 ;
dns_cache_record_updated = 0 ;
/*
if ( GloMTH ) {
@ -826,6 +870,9 @@ void MySQL_Monitor::p_update_metrics() {
p_update_counter ( this - > metrics . p_counter_array [ p_mon_counter : : mysql_monitor_read_only_check_err ] , GloMyMon - > read_only_check_ERR ) ;
p_update_counter ( this - > metrics . p_counter_array [ p_mon_counter : : mysql_monitor_replication_lag_check_ok ] , GloMyMon - > replication_lag_check_OK ) ;
p_update_counter ( this - > metrics . p_counter_array [ p_mon_counter : : mysql_monitor_replication_lag_check_err ] , GloMyMon - > replication_lag_check_ERR ) ;
p_update_counter ( this - > metrics . p_counter_array [ p_mon_counter : : mysql_monitor_dns_cache_queried ] , GloMyMon - > dns_cache_queried ) ;
p_update_counter ( this - > metrics . p_counter_array [ p_mon_counter : : mysql_monitor_dns_cache_lookup_success ] , GloMyMon - > dns_cache_lookup_success ) ;
p_update_counter ( this - > metrics . p_counter_array [ p_mon_counter : : mysql_monitor_dns_cache_record_updated ] , GloMyMon - > dns_cache_record_updated ) ;
}
}
@ -1184,7 +1231,7 @@ bool MySQL_Monitor_State_Data::create_new_connection() {
mysql_options4 ( mysql , MYSQL_OPT_CONNECT_ATTR_ADD , " _server_host " , hostname ) ;
MYSQL * myrc = NULL ;
if ( port ) {
myrc = mysql_real_connect ( mysql , hostname, mysql_thread___monitor_username , mysql_thread___monitor_password , NULL , port , NULL , 0 ) ;
myrc = mysql_real_connect ( mysql , MySQL_Monitor: : dns_lookup ( hostname) . c_str ( ) , mysql_thread___monitor_username , mysql_thread___monitor_password , NULL , port , NULL , 0 ) ;
} else {
myrc = mysql_real_connect ( mysql , " localhost " , mysql_thread___monitor_username , mysql_thread___monitor_password , NULL , 0 , hostname , 0 ) ;
}
@ -2544,8 +2591,8 @@ void * MySQL_Monitor::monitor_connect() {
if ( rc_ping ) { // only if server is responding to pings
MySQL_Monitor_State_Data * mmsd = new MySQL_Monitor_State_Data ( r - > fields [ 0 ] , atoi ( r - > fields [ 1 ] ) , NULL , atoi ( r - > fields [ 2 ] ) ) ;
mmsd - > mondb = monitordb ;
WorkItem * item ;
item = new WorkItem ( mmsd , monitor_connect_thread ) ;
WorkItem <MySQL_Monitor_State_Data > * item ;
item = new WorkItem < MySQL_Monitor_State_Data > ( mmsd , monitor_connect_thread ) ;
GloMyMon - > queue - > add ( item ) ;
usleep ( us ) ;
}
@ -2594,7 +2641,7 @@ __sleep_monitor_connect_loop:
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
return NULL ;
@ -2670,8 +2717,8 @@ void * MySQL_Monitor::monitor_ping() {
SQLite3_row * r = * it ;
MySQL_Monitor_State_Data * mmsd = new MySQL_Monitor_State_Data ( r - > fields [ 0 ] , atoi ( r - > fields [ 1 ] ) , NULL , atoi ( r - > fields [ 2 ] ) ) ;
mmsd - > mondb = monitordb ;
WorkItem * item ;
item = new WorkItem ( mmsd , monitor_ping_thread ) ;
WorkItem <MySQL_Monitor_State_Data > * item ;
item = new WorkItem < MySQL_Monitor_State_Data > ( mmsd , monitor_ping_thread ) ;
GloMyMon - > queue - > add ( item ) ;
usleep ( us ) ;
if ( GloMyMon - > shutdown ) return NULL ;
@ -2839,7 +2886,7 @@ __sleep_monitor_ping_loop:
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
return NULL ;
@ -2957,8 +3004,8 @@ void * MySQL_Monitor::monitor_read_only() {
}
}
mmsd - > mondb = monitordb ;
WorkItem * item ;
item = new WorkItem ( mmsd , monitor_read_only_thread ) ;
WorkItem <MySQL_Monitor_State_Data > * item ;
item = new WorkItem < MySQL_Monitor_State_Data > ( mmsd , monitor_read_only_thread ) ;
GloMyMon - > queue - > add ( item ) ;
usleep ( us ) ;
}
@ -3007,7 +3054,7 @@ __sleep_monitor_read_only:
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
return NULL ;
@ -3079,8 +3126,8 @@ void * MySQL_Monitor::monitor_group_replication() {
mmsd - > max_transactions_behind = atoi ( r - > fields [ 5 ] ) ;
mmsd - > max_transactions_behind_count = mysql_thread___monitor_groupreplication_max_transactions_behind_count ;
mmsd - > mondb = monitordb ;
WorkItem * item ;
item = new WorkItem ( mmsd , monitor_group_replication_thread ) ;
WorkItem <MySQL_Monitor_State_Data > * item ;
item = new WorkItem < MySQL_Monitor_State_Data > ( mmsd , monitor_group_replication_thread ) ;
GloMyMon - > queue - > add ( item ) ;
usleep ( us ) ;
}
@ -3137,7 +3184,7 @@ __sleep_monitor_group_replication:
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
return NULL ;
@ -3194,8 +3241,8 @@ void * MySQL_Monitor::monitor_galera() {
mmsd - > writer_is_also_reader = atoi ( r - > fields [ 4 ] ) ;
mmsd - > max_transactions_behind = atoi ( r - > fields [ 5 ] ) ;
mmsd - > mondb = monitordb ;
WorkItem * item ;
item = new WorkItem ( mmsd , monitor_galera_thread ) ;
WorkItem <MySQL_Monitor_State_Data > * item ;
item = new WorkItem < MySQL_Monitor_State_Data > ( mmsd , monitor_galera_thread ) ;
GloMyMon - > queue - > add ( item ) ;
usleep ( us ) ;
}
@ -3228,7 +3275,7 @@ __sleep_monitor_galera:
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
return NULL ;
@ -3295,8 +3342,8 @@ void * MySQL_Monitor::monitor_replication_lag() {
if ( rc_ping ) { // only if server is responding to pings
MySQL_Monitor_State_Data * mmsd = new MySQL_Monitor_State_Data ( r - > fields [ 1 ] , atoi ( r - > fields [ 2 ] ) , NULL , atoi ( r - > fields [ 4 ] ) , atoi ( r - > fields [ 0 ] ) ) ;
mmsd - > mondb = monitordb ;
WorkItem * item ;
item = new WorkItem ( mmsd , monitor_replication_lag_thread ) ;
WorkItem <MySQL_Monitor_State_Data > * item ;
item = new WorkItem < MySQL_Monitor_State_Data > ( mmsd , monitor_replication_lag_thread ) ;
GloMyMon - > queue - > add ( item ) ;
usleep ( us ) ;
}
@ -3345,12 +3392,328 @@ __sleep_monitor_replication_lag:
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
return NULL ;
}
bool validate_ip ( const std : : string & ip ) {
// check if ip is vaild IPV4 ip address
struct sockaddr_in sa4 ;
if ( inet_pton ( AF_INET , ip . c_str ( ) , & ( sa4 . sin_addr ) ) ! = 0 )
return true ;
// check if ip is vaild IPV6 ip address
struct sockaddr_in6 sa6 ;
if ( inet_pton ( AF_INET6 , ip . c_str ( ) , & ( sa6 . sin6_addr ) ) ! = 0 )
return true ;
return false ;
}
void * monitor_dns_resolver_thread ( void * args ) {
DNS_Resolve_Data * dns_resolve_data = static_cast < DNS_Resolve_Data * > ( args ) ;
struct addrinfo hints , * res = NULL ;
int gai_rc ;
int rc = 0 ;
/* set hints for getaddrinfo */
memset ( & hints , 0 , sizeof ( hints ) ) ;
hints . ai_protocol = IPPROTO_TCP ; /* TCP connections only */
hints . ai_family = AF_UNSPEC ; /* includes: IPv4, IPv6 or hostname */
hints . ai_socktype = SOCK_STREAM ;
//unsigned int wait_gai = 1;
//time_t start_t = time(NULL);
//while ((gai_rc = getaddrinfo(hostname.c_str(), NULL, &hints, &res)) == EAI_AGAIN)
//{
// if (time(NULL) - start_t > (time_t)10)
// break;
// usleep(wait_gai);
// wait_gai *= 2;
//}
try {
gai_rc = getaddrinfo ( dns_resolve_data - > hostname . c_str ( ) , NULL , & hints , & res ) ;
if ( gai_rc ! = 0 | | ! res )
{
proxy_error ( " An error occurred while resolving hostname: %s [%d] \n " , dns_resolve_data - > hostname . c_str ( ) , gai_rc ) ;
dns_resolve_data - > result . set_value ( std : : make_tuple < > ( false , DNS_Cache_Record ( ) ) ) ;
return NULL ;
}
char ip_addr [ INET6_ADDRSTRLEN ] ;
switch ( res - > ai_family ) {
case AF_INET : {
struct sockaddr_in * ipv4 = ( struct sockaddr_in * ) res - > ai_addr ;
inet_ntop ( res - > ai_addr - > sa_family , & ipv4 - > sin_addr , ip_addr , INET_ADDRSTRLEN ) ;
break ;
}
case AF_INET6 : {
struct sockaddr_in6 * ipv6 = ( struct sockaddr_in6 * ) res - > ai_addr ;
inet_ntop ( res - > ai_addr - > sa_family , & ipv6 - > sin6_addr , ip_addr , INET6_ADDRSTRLEN ) ;
break ;
}
}
freeaddrinfo ( res ) ;
if ( ip_addr ) {
if ( ! dns_resolve_data - > cached_ip . empty ( ) & & strncmp ( dns_resolve_data - > cached_ip . c_str ( ) , ip_addr , sizeof ( ip_addr ) - 1 ) = = 0 ) {
dns_resolve_data - > result . set_value ( std : : make_tuple < > ( true , DNS_Cache_Record ( dns_resolve_data - > hostname , ip_addr , monotonic_time ( ) + ( 1000 * dns_resolve_data - > ttl ) ) ) ) ;
}
else {
dns_resolve_data - > dns_cache - > add ( dns_resolve_data - > hostname , ip_addr ) ;
dns_resolve_data - > result . set_value ( std : : make_tuple < > ( true , DNS_Cache_Record ( dns_resolve_data - > hostname , ip_addr , monotonic_time ( ) + ( 1000 * dns_resolve_data - > ttl ) ) ) ) ;
}
return NULL ;
}
}
catch ( . . . )
{
}
dns_resolve_data - > result . set_value ( std : : make_tuple < > ( false , DNS_Cache_Record ( ) ) ) ;
return NULL ;
}
void * MySQL_Monitor : : monitor_dns_cache ( ) {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version ;
std : : unique_ptr < MySQL_Thread > mysql_thr ( new MySQL_Thread ( ) ) ;
mysql_thr - > curtime = monotonic_time ( ) ;
MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH - > get_global_version ( ) ;
mysql_thr - > refresh_variables ( ) ;
if ( ! GloMTH ) return NULL ; // quick exit during shutdown/restart
unsigned long long t1 ;
unsigned long long t2 ;
unsigned long long next_loop_at = 0 ;
std : : list < DNS_Cache_Record > dns_records_bookkeeping ;
wqueue < WorkItem < DNS_Resolve_Data > * > dns_resolver_queue ;
while ( GloMyMon - > shutdown = = false ) {
// update the 'monitor_internal.mysql_servers' table with the latest 'mysql_servers' from 'MyHGM'
{
std : : lock_guard < std : : mutex > mysql_servers_guard ( MyHGM - > mysql_servers_to_monitor_mutex ) ;
update_monitor_mysql_servers ( MyHGM - > mysql_servers_to_monitor ) ;
}
//if (mysql_thread___monitor_local_dns_cache_ttl == 0 ||
// mysql_thread___monitor_local_dns_cache_refresh_interval == 0
// ) {
//
// break;
//}
unsigned int glover ;
char * error = NULL ;
int cols = 0 ;
int affected_rows = 0 ;
SQLite3_result * resultset = NULL ;
char * query = ( char * ) " SELECT DISTINCT trim(hostname) FROM monitor_internal.mysql_servers " ;
t1 = monotonic_time ( ) ;
if ( ! GloMTH ) return NULL ; // quick exit during shutdown/restart
glover = GloMTH - > get_global_version ( ) ;
if ( MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover ;
mysql_thr - > refresh_variables ( ) ;
next_loop_at = 0 ;
}
if ( t1 < next_loop_at ) {
goto __sleep_monitor_dns_cache_loop ;
}
next_loop_at = t1 + ( 1000 * mysql_thread___monitor_local_dns_cache_refresh_interval ) ;
proxy_debug ( PROXY_DEBUG_ADMIN , 4 , " %s \n " , query ) ;
admindb - > execute_statement ( query , & error , & cols , & affected_rows , & resultset ) ;
if ( error ) {
proxy_error ( " Error on %s : %s \n " , query , error ) ;
goto __end_monitor_dns_cache_loop ;
}
else {
if ( resultset - > rows_count = = 0 ) {
goto __end_monitor_dns_cache_loop ;
}
constexpr unsigned int num_dns_resolver_threads = 1 ;
constexpr unsigned int num_dns_resolver_max_threads = 16 ;
constexpr unsigned int num_dns_resolver_queue_max_size = 128 ;
std : : vector < DNSResolverThread * > dns_resolver_threads ( num_dns_resolver_threads ) ;
for ( unsigned int i = 0 ; i < num_dns_resolver_threads ; i + + ) {
dns_resolver_threads [ i ] = new DNSResolverThread ( dns_resolver_queue , 0 ) ;
dns_resolver_threads [ i ] - > start ( 2048 , false ) ;
}
std : : set < std : : string > hostnames ;
for ( const auto row : resultset - > rows ) {
const std : : string & hostname = row - > fields [ 0 ] ;
if ( ! validate_ip ( hostname ) )
hostnames . insert ( hostname ) ;
}
std : : list < std : : future < std : : tuple < bool , DNS_Cache_Record > > > dns_resolve_result ;
if ( dns_records_bookkeeping . empty ( ) = = false ) {
unsigned long long current_time = monotonic_time ( ) ;
for ( auto itr = dns_records_bookkeeping . cbegin ( ) ;
itr ! = dns_records_bookkeeping . cend ( ) ; ) {
if ( hostnames . find ( itr - > hostname ) = = hostnames . end ( ) ) {
dns_cache - > remove ( itr - > hostname ) ; // to check
itr = dns_records_bookkeeping . erase ( itr ) ;
}
else {
hostnames . erase ( itr - > hostname ) ;
if ( current_time > = itr - > ttl ) {
std : : unique_ptr < DNS_Resolve_Data > dns_resolve_data ( new DNS_Resolve_Data ( ) ) ;
dns_resolve_data - > hostname = itr - > hostname ;
dns_resolve_data - > cached_ip = itr - > ip ;
dns_resolve_data - > ttl = mysql_thread___monitor_local_dns_cache_ttl ;
dns_resolve_data - > dns_cache = dns_cache ;
dns_resolve_result . emplace_back ( dns_resolve_data - > result . get_future ( ) ) ;
dns_resolver_queue . add ( new WorkItem < DNS_Resolve_Data > ( dns_resolve_data . release ( ) , monitor_dns_resolver_thread ) ) ;
itr = dns_records_bookkeeping . erase ( itr ) ;
continue ;
}
itr + + ;
}
}
}
{
unsigned int qsize = dns_resolver_queue . size ( ) ;
unsigned int num_threads = dns_resolver_threads . size ( ) ;
if ( qsize > num_dns_resolver_queue_max_size / 8 ) {
unsigned int threads_max = num_dns_resolver_max_threads ;
if ( threads_max > num_threads ) {
unsigned int new_threads = threads_max - num_threads ;
if ( ( qsize / 8 ) < new_threads ) {
new_threads = qsize / 8 ; // try to not burst threads
}
if ( new_threads ) {
unsigned int old_num_threads = num_threads ;
num_threads + = new_threads ;
dns_resolver_threads . resize ( num_threads ) ;
for ( unsigned int i = old_num_threads ; i < num_threads ; i + + ) {
dns_resolver_threads [ i ] = new DNSResolverThread ( dns_resolver_queue , 0 ) ;
dns_resolver_threads [ i ] - > start ( 2048 , false ) ;
}
}
}
}
}
if ( hostnames . empty ( ) = = false ) {
for ( const std : : string & host_name : hostnames ) {
std : : unique_ptr < DNS_Resolve_Data > dns_resolve_data ( new DNS_Resolve_Data ( ) ) ;
dns_resolve_data - > hostname = host_name ;
dns_resolve_data - > ttl = mysql_thread___monitor_local_dns_cache_ttl ;
dns_resolve_data - > dns_cache = dns_cache ;
dns_resolve_result . emplace_back ( dns_resolve_data - > result . get_future ( ) ) ;
dns_resolver_queue . add ( new WorkItem < DNS_Resolve_Data > ( dns_resolve_data . release ( ) , monitor_dns_resolver_thread ) ) ;
}
}
{
unsigned int qsize = dns_resolver_queue . size ( ) ;
unsigned int num_threads = dns_resolver_threads . size ( ) ;
if ( qsize > num_dns_resolver_queue_max_size / 8 ) {
unsigned int threads_max = num_dns_resolver_max_threads ;
if ( threads_max > num_threads ) {
unsigned int new_threads = threads_max - num_threads ;
if ( ( qsize / 8 ) < new_threads ) {
new_threads = qsize / 8 ; // try to not burst threads
}
if ( new_threads ) {
unsigned int old_num_threads = num_threads ;
num_threads + = new_threads ;
dns_resolver_threads . resize ( num_threads ) ;
for ( unsigned int i = old_num_threads ; i < num_threads ; i + + ) {
dns_resolver_threads [ i ] = new DNSResolverThread ( dns_resolver_queue , 0 ) ;
dns_resolver_threads [ i ] - > start ( 2048 , false ) ;
}
}
}
}
}
for ( size_t i = 0 ; i < dns_resolver_threads . size ( ) ; i + + )
dns_resolver_queue . add ( NULL ) ;
for ( auto & dns_result : dns_resolve_result ) {
auto ret_value = dns_result . get ( ) ;
if ( std : : get < 0 > ( ret_value ) ) {
DNS_Cache_Record dns_record = get < 1 > ( ret_value ) ;
dns_records_bookkeeping . emplace_back ( dns_record ) ;
proxy_error ( " Hostname: %s IP: %s \n " , dns_record . hostname . c_str ( ) , dns_record . ip . c_str ( ) ) ;
}
}
for ( DNSResolverThread * const dns_resolver_thread : dns_resolver_threads ) {
dns_resolver_thread - > join ( ) ;
delete dns_resolver_thread ;
}
if ( GloMyMon - > shutdown ) return NULL ;
}
__end_monitor_dns_cache_loop :
if ( resultset ) {
delete resultset ;
resultset = NULL ;
}
__sleep_monitor_dns_cache_loop :
t2 = monotonic_time ( ) ;
if ( t2 < next_loop_at ) {
unsigned long long st = 0 ;
st = next_loop_at - t2 ;
if ( st > 500000 ) {
st = 500000 ;
}
usleep ( st ) ;
}
}
return NULL ;
}
void * MySQL_Monitor : : run ( ) {
while ( GloMTH = = NULL ) {
@ -3365,26 +3728,37 @@ void * MySQL_Monitor::run() {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH - > get_global_version ( ) ;
mysql_thr - > refresh_variables ( ) ;
//if (!GloMTH) return NULL; // quick exit during shutdown/restart
pthread_attr_t attr ;
pthread_attr_init ( & attr ) ;
pthread_attr_setstacksize ( & attr , 2048 * 1024 ) ;
pthread_t monitor_dns_cache_thread ;
if ( pthread_create ( & monitor_dns_cache_thread , & attr , & monitor_dns_cache_pthread , NULL ) ! = 0 ) {
// LCOV_EXCL_START
proxy_error ( " Thread creation \n " ) ;
assert ( 0 ) ;
// LCOV_EXCL_STOP
}
__monitor_run :
while ( queue - > size ( ) ) { // this is a clean up in case Monitor was restarted
WorkItem * item = ( WorkItem * ) queue - > remove ( ) ;
WorkItem <MySQL_Monitor_State_Data > * item = ( WorkItem < MySQL_Monitor_State_Data > * ) queue - > remove ( ) ;
if ( item ) {
if ( item - > mmsd ) {
delete item - > mmsd ;
if ( item - > data ) {
delete item - > data ;
}
delete item ;
}
}
ConsumerThread * * threads = ( ConsumerThread * * ) malloc ( sizeof ( ConsumerThread * ) * num_threads ) ;
ConsumerThread < MySQL_Monitor_State_Data > * * threads = ( ConsumerThread < MySQL_Monitor_State_Data > * * ) malloc ( sizeof ( ConsumerThread < MySQL_Monitor_State_Data > * ) * num_threads ) ;
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
threads [ i ] = new ConsumerThread ( * queue , 0 ) ;
threads [ i ] = new ConsumerThread < MySQL_Monitor_State_Data > ( * queue , 0 ) ;
threads [ i ] - > start ( 2048 , false ) ;
}
started_threads + = num_threads ;
this - > metrics . p_counter_array [ p_mon_counter : : mysql_monitor_workers_started ] - > Increment ( num_threads ) ;
pthread_attr_t attr ;
pthread_attr_init ( & attr ) ;
pthread_attr_setstacksize ( & attr , 2048 * 1024 ) ;
pthread_t monitor_connect_thread ;
if ( pthread_create ( & monitor_connect_thread , & attr , & monitor_connect_pthread , NULL ) ! = 0 ) {
// LCOV_EXCL_START
@ -3446,10 +3820,10 @@ __monitor_run:
if ( old_num_threads < threads_min ) {
num_threads = threads_min ;
this - > metrics . p_gauge_array [ p_mon_gauge : : mysql_monitor_workers ] - > Set ( threads_min ) ;
threads = ( ConsumerThread * * ) realloc ( threads , sizeof ( ConsumerThread * ) * num_threads ) ;
threads = ( ConsumerThread < MySQL_Monitor_State_Data > * * ) realloc ( threads , sizeof ( ConsumerThread < MySQL_Monitor_State_Data > * ) * num_threads ) ;
started_threads + = ( num_threads - old_num_threads ) ;
for ( unsigned int i = old_num_threads ; i < num_threads ; i + + ) {
threads [ i ] = new ConsumerThread ( * queue , 0 ) ;
threads [ i ] = new ConsumerThread < MySQL_Monitor_State_Data > ( * queue , 0 ) ;
threads [ i ] - > start ( 2048 , false ) ;
}
}
@ -3475,10 +3849,10 @@ __monitor_run:
unsigned int old_num_threads = num_threads ;
num_threads + = new_threads ;
this - > metrics . p_gauge_array [ p_mon_gauge : : mysql_monitor_workers ] - > Increment ( new_threads ) ;
threads = ( ConsumerThread * * ) realloc ( threads , sizeof ( ConsumerThread * ) * num_threads ) ;
threads = ( ConsumerThread < MySQL_Monitor_State_Data > * * ) realloc ( threads , sizeof ( ConsumerThread < MySQL_Monitor_State_Data > * ) * num_threads ) ;
started_threads + = new_threads ;
for ( unsigned int i = old_num_threads ; i < num_threads ; i + + ) {
threads [ i ] = new ConsumerThread ( * queue , 0 ) ;
threads [ i ] = new ConsumerThread < MySQL_Monitor_State_Data > ( * queue , 0 ) ;
threads [ i ] - > start ( 2048 , false ) ;
}
}
@ -3494,11 +3868,11 @@ __monitor_run:
}
if ( qsize > 0 ) {
proxy_info ( " Monitor is starting %d helper threads \n " , qsize ) ;
ConsumerThread * * threads_aux = ( ConsumerThread * * ) malloc ( sizeof ( ConsumerThread * ) * qsize ) ;
ConsumerThread < MySQL_Monitor_State_Data > * * threads_aux = ( ConsumerThread < MySQL_Monitor_State_Data > * * ) malloc ( sizeof ( ConsumerThread < MySQL_Monitor_State_Data > * ) * qsize ) ;
aux_threads = qsize ;
started_threads + = aux_threads ;
for ( unsigned int i = 0 ; i < qsize ; i + + ) {
threads_aux [ i ] = new ConsumerThread ( * queue , 245 ) ;
threads_aux [ i ] = new ConsumerThread < MySQL_Monitor_State_Data > ( * queue , 245 ) ;
threads_aux [ i ] - > start ( 2048 , false ) ;
}
for ( unsigned int i = 0 ; i < qsize ; i + + ) {
@ -3512,7 +3886,7 @@ __monitor_run:
}
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
@ -3542,10 +3916,14 @@ __monitor_run:
}
usleep ( 200000 ) ;
}
pthread_join ( monitor_dns_cache_thread , NULL ) ;
if ( mysql_thr ) {
delete mysql_thr ;
mysql_thr = NULL ;
}
return NULL ;
} ;
@ -4865,7 +5243,7 @@ __sleep_monitor_aws_aurora:
mysql_thr = NULL ;
}
for ( unsigned int i = 0 ; i < num_threads ; i + + ) {
WorkItem * item = NULL ;
WorkItem < MySQL_Monitor_State_Data > * item = NULL ;
GloMyMon - > queue - > add ( item ) ;
}
return NULL ;
@ -5025,4 +5403,95 @@ void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int r
# endif // TEST_AURORA
}
std : : string MySQL_Monitor : : dns_lookup ( const std : : string & hostname ) {
static thread_local std : : shared_ptr < DNS_Cache > dns_cache_thread ;
const std : : string & hostname_trim = trim ( hostname ) ;
std : : string ip ;
if ( ! dns_cache_thread & & GloMyMon )
dns_cache_thread = GloMyMon - > dns_cache ;
if ( dns_cache_thread ) {
ip = dns_cache_thread - > lookup ( hostname_trim , false ) ;
}
return ! ip . empty ( ) ? ip : hostname_trim ;
}
std : : string MySQL_Monitor : : dns_lookup ( const char * hostname ) {
return MySQL_Monitor : : dns_lookup ( std : : string ( hostname ) ) ;
}
bool DNS_Cache : : add ( const std : : string & hostname , const std : : string & ip ) {
int rc = pthread_rwlock_wrlock ( & rwlock_ ) ;
assert ( rc = = 0 ) ;
try {
records . emplace ( hostname , ip ) ;
}
catch ( . . . ) { }
rc = pthread_rwlock_unlock ( & rwlock_ ) ;
assert ( rc = = 0 ) ;
if ( GloMyMon )
__sync_fetch_and_add ( & GloMyMon - > dns_cache_record_updated , 1 ) ;
return true ;
}
std : : string DNS_Cache : : lookup ( const std : : string & hostname , bool return_hostname_if_lookup_fails ) const {
std : : string ip ;
__sync_fetch_and_add ( & GloMyMon - > dns_cache_queried , 1 ) ;
int rc = pthread_rwlock_rdlock ( & rwlock_ ) ;
assert ( rc = = 0 ) ;
auto itr = records . find ( hostname ) ;
if ( itr ! = records . end ( ) ) {
ip = itr - > second ;
}
rc = pthread_rwlock_unlock ( & rwlock_ ) ;
assert ( rc = = 0 ) ;
if ( ! ip . empty ( ) & & GloMyMon ) {
__sync_fetch_and_add ( & GloMyMon - > dns_cache_lookup_success , 1 ) ;
}
if ( ip . empty ( ) & & return_hostname_if_lookup_fails ) {
ip = hostname ;
}
return ip ;
}
void DNS_Cache : : remove ( const std : : string & hostname ) {
int rc = pthread_rwlock_wrlock ( & rwlock_ ) ;
assert ( rc = = 0 ) ;
records . erase ( hostname ) ;
rc = pthread_rwlock_unlock ( & rwlock_ ) ;
assert ( rc = = 0 ) ;
if ( GloMyMon )
__sync_fetch_and_add ( & GloMyMon - > dns_cache_record_updated , 1 ) ;
}
void DNS_Cache : : bulk_update ( const std : : list < std : : pair < DNS_Cache_Record , OPERATION > > bulk_record ) {
pthread_rwlock_wrlock ( & rwlock_ ) ;
for ( const auto & dns_record : bulk_record ) {
if ( dns_record . second = = DNS_Cache : : OPERATION : : UPDATE ) {
records [ dns_record . first . hostname ] = dns_record . first . ip ;
}
else if ( dns_record . second = = DNS_Cache : : OPERATION : : REMOVE ) {
records . erase ( dns_record . first . hostname ) ;
}
if ( GloMyMon )
__sync_fetch_and_add ( & GloMyMon - > dns_cache_record_updated , 1 ) ;
}
pthread_rwlock_unlock ( & rwlock_ ) ;
}
template class WorkItem < MySQL_Monitor_State_Data > ;
template class WorkItem < DNS_Resolve_Data > ;