@ -69,39 +69,6 @@ static pthread_mutex_t ev_loop_mutex;
const int PgSQL_ERRORS_STATS_FIELD_NUM = 11 ;
#if 0
static std : : string gtid_executed_to_string ( gtid_set_t & gtid_executed ) ;
static void addGtid ( const gtid_t & gtid , gtid_set_t & gtid_executed ) ;
static void gtid_async_cb ( struct ev_loop * loop , struct ev_async * watcher , int revents ) {
if ( glovars . shutdown ) {
ev_break ( loop ) ;
}
pthread_mutex_lock ( & ev_loop_mutex ) ;
PgHGM - > gtid_missing_nodes = false ;
PgHGM - > generate_pgsql_gtid_executed_tables ( ) ;
pthread_mutex_unlock ( & ev_loop_mutex ) ;
return ;
}
static void gtid_timer_cb ( struct ev_loop * loop , struct ev_timer * timer , int revents ) {
if ( GloMTH = = nullptr ) { return ; }
ev_timer_stop ( loop , timer ) ;
ev_timer_set ( timer , __sync_add_and_fetch ( & GloMTH - > variables . binlog_reader_connect_retry_msec , 0 ) / 1000 , 0 ) ;
if ( glovars . shutdown ) {
ev_break ( loop ) ;
}
if ( PgHGM - > gtid_missing_nodes ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
PgHGM - > gtid_missing_nodes = false ;
PgHGM - > generate_pgsql_gtid_executed_tables ( ) ;
pthread_mutex_unlock ( & ev_loop_mutex ) ;
}
ev_timer_start ( loop , timer ) ;
return ;
}
# endif // 0
static int wait_for_pgsql ( MYSQL * mysql , int status ) {
struct pollfd pfd ;
int timeout , res ;
@ -169,435 +136,6 @@ T PgSQL_j_get_srv_default_int_val(
return static_cast < T > ( - 1 ) ;
}
#if 0
static void reader_cb ( struct ev_loop * loop , struct ev_io * w , int revents ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
if ( revents & EV_READ ) {
PgSQL_GTID_Server_Data * sd = ( PgSQL_GTID_Server_Data * ) w - > data ;
bool rc = true ;
rc = sd - > readall ( ) ;
if ( rc = = false ) {
//delete sd;
std : : string s1 = sd - > address ;
s1 . append ( " : " ) ;
s1 . append ( std : : to_string ( sd - > pgsql_port ) ) ;
PgHGM - > gtid_missing_nodes = true ;
proxy_warning ( " GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > pgsql_port ) ;
std : : unordered_map < string , PgSQL_GTID_Server_Data * > : : iterator it2 ;
it2 = PgHGM - > gtid_map . find ( s1 ) ;
if ( it2 ! = PgHGM - > gtid_map . end ( ) ) {
//PgHGM->gtid_map.erase(it2);
it2 - > second = NULL ;
delete sd ;
}
ev_io_stop ( PgHGM - > gtid_ev_loop , w ) ;
free ( w ) ;
} else {
sd - > dump ( ) ;
}
}
pthread_mutex_unlock ( & ev_loop_mutex ) ;
}
static void connect_cb ( EV_P_ ev_io * w , int revents ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
struct ev_io * c = w ;
if ( revents & EV_WRITE ) {
int optval = 0 ;
socklen_t optlen = sizeof ( optval ) ;
if ( ( getsockopt ( w - > fd , SOL_SOCKET , SO_ERROR , & optval , & optlen ) = = - 1 ) | |
( optval ! = 0 ) ) {
/* Connection failed; try the next address in the list. */
//int errnum = optval ? optval : errno;
ev_io_stop ( PgHGM - > gtid_ev_loop , w ) ;
close ( w - > fd ) ;
PgHGM - > gtid_missing_nodes = true ;
PgSQL_GTID_Server_Data * custom_data = ( PgSQL_GTID_Server_Data * ) w - > data ;
PgSQL_GTID_Server_Data * sd = custom_data ;
std : : string s1 = sd - > address ;
s1 . append ( " : " ) ;
s1 . append ( std : : to_string ( sd - > pgsql_port ) ) ;
proxy_warning ( " GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > pgsql_port ) ;
std : : unordered_map < string , PgSQL_GTID_Server_Data * > : : iterator it2 ;
it2 = PgHGM - > gtid_map . find ( s1 ) ;
if ( it2 ! = PgHGM - > gtid_map . end ( ) ) {
//PgHGM->gtid_map.erase(it2);
it2 - > second = NULL ;
delete sd ;
}
//delete custom_data;
free ( c ) ;
} else {
ev_io_stop ( PgHGM - > gtid_ev_loop , w ) ;
int fd = w - > fd ;
struct ev_io * new_w = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
new_w - > data = w - > data ;
PgSQL_GTID_Server_Data * custom_data = ( PgSQL_GTID_Server_Data * ) new_w - > data ;
custom_data - > w = new_w ;
free ( w ) ;
ev_io_init ( new_w , reader_cb , fd , EV_READ ) ;
ev_io_start ( PgHGM - > gtid_ev_loop , new_w ) ;
}
}
pthread_mutex_unlock ( & ev_loop_mutex ) ;
}
static struct ev_io * new_connector ( char * address , uint16_t gtid_port , uint16_t pgsql_port ) {
//struct sockaddr_in a;
int s ;
if ( ( s = socket ( AF_INET , SOCK_STREAM , 0 ) ) = = - 1 ) {
perror ( " socket " ) ;
close ( s ) ;
return NULL ;
}
/*
memset ( & a , 0 , sizeof ( a ) ) ;
a . sin_port = htons ( gtid_port ) ;
a . sin_family = AF_INET ;
if ( ! inet_aton ( address , ( struct in_addr * ) & a . sin_addr . s_addr ) ) {
perror ( " bad IP address format " ) ;
close ( s ) ;
return NULL ;
}
*/
ioctl_FIONBIO ( s , 1 ) ;
struct addrinfo hints ;
struct addrinfo * res = NULL ;
memset ( & hints , 0 , sizeof ( hints ) ) ;
hints . ai_protocol = IPPROTO_TCP ;
hints . ai_family = AF_UNSPEC ;
hints . ai_socktype = SOCK_STREAM ;
char str_port [ NI_MAXSERV + 1 ] ;
sprintf ( str_port , " %d " , gtid_port ) ;
int gai_rc = getaddrinfo ( address , str_port , & hints , & res ) ;
if ( gai_rc ) {
freeaddrinfo ( res ) ;
//exit here
return NULL ;
}
//int status = connect(s, (struct sockaddr *) &a, sizeof(a));
int status = connect ( s , res - > ai_addr , res - > ai_addrlen ) ;
if ( ( status = = 0 ) | | ( ( status = = - 1 ) & & ( errno = = EINPROGRESS ) ) ) {
struct ev_io * c = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
if ( c ) {
ev_io_init ( c , connect_cb , s , EV_WRITE ) ;
PgSQL_GTID_Server_Data * custom_data = new PgSQL_GTID_Server_Data ( c , address , gtid_port , pgsql_port ) ;
c - > data = ( void * ) custom_data ;
return c ;
}
/* else error */
}
return NULL ;
}
PgSQL_GTID_Server_Data : : PgSQL_GTID_Server_Data ( struct ev_io * _w , char * _address , uint16_t _port , uint16_t _pgsql_port ) {
active = true ;
w = _w ;
size = 1024 ; // 1KB buffer
data = ( char * ) malloc ( size ) ;
memset ( uuid_server , 0 , sizeof ( uuid_server ) ) ;
pos = 0 ;
len = 0 ;
address = strdup ( _address ) ;
port = _port ;
pgsql_port = _pgsql_port ;
events_read = 0 ;
}
void PgSQL_GTID_Server_Data : : resize ( size_t _s ) {
char * data_ = ( char * ) malloc ( _s ) ;
memcpy ( data_ , data , ( _s > size ? size : _s ) ) ;
size = _s ;
free ( data ) ;
data = data_ ;
}
PgSQL_GTID_Server_Data : : ~ PgSQL_GTID_Server_Data ( ) {
free ( address ) ;
free ( data ) ;
}
bool PgSQL_GTID_Server_Data : : readall ( ) {
bool ret = true ;
if ( size = = len ) {
// buffer is full, expand
resize ( len * 2 ) ;
}
int rc = 0 ;
rc = read ( w - > fd , data + len , size - len ) ;
if ( rc > 0 ) {
len + = rc ;
} else {
int myerr = errno ;
proxy_error ( " Read returned %d bytes, error %d \n " , rc , myerr ) ;
if (
( rc = = 0 ) | |
( rc = = - 1 & & myerr ! = EINTR & & myerr ! = EAGAIN )
) {
ret = false ;
}
}
return ret ;
}
bool PgSQL_GTID_Server_Data : : gtid_exists ( char * gtid_uuid , uint64_t gtid_trxid ) {
std : : string s = gtid_uuid ;
auto it = gtid_executed . find ( s ) ;
// fprintf(stderr,"Checking if server %s:%d has GTID %s:%lu ... ", address, port, gtid_uuid, gtid_trxid);
if ( it = = gtid_executed . end ( ) ) {
// fprintf(stderr,"NO\n");
return false ;
}
for ( auto itr = it - > second . begin ( ) ; itr ! = it - > second . end ( ) ; + + itr ) {
if ( ( int64_t ) gtid_trxid > = itr - > first & & ( int64_t ) gtid_trxid < = itr - > second ) {
// fprintf(stderr,"YES\n");
return true ;
}
}
// fprintf(stderr,"NO\n");
return false ;
}
void PgSQL_GTID_Server_Data : : read_all_gtids ( ) {
while ( read_next_gtid ( ) ) {
}
}
void PgSQL_GTID_Server_Data : : dump ( ) {
if ( len = = 0 ) {
return ;
}
read_all_gtids ( ) ;
//int rc = write(1,data+pos,len-pos);
fflush ( stdout ) ;
///pos += rc;
if ( pos > = len / 2 ) {
memmove ( data , data + pos , len - pos ) ;
len = len - pos ;
pos = 0 ;
}
}
bool PgSQL_GTID_Server_Data : : writeout ( ) {
bool ret = true ;
if ( len = = 0 ) {
return ret ;
}
int rc = 0 ;
rc = write ( w - > fd , data + pos , len - pos ) ;
if ( rc > 0 ) {
pos + = rc ;
if ( pos > = len / 2 ) {
memmove ( data , data + pos , len - pos ) ;
len = len - pos ;
pos = 0 ;
}
}
return ret ;
}
bool PgSQL_GTID_Server_Data : : read_next_gtid ( ) {
if ( len = = 0 ) {
return false ;
}
void * nlp = NULL ;
nlp = memchr ( data + pos , ' \n ' , len - pos ) ;
if ( nlp = = NULL ) {
return false ;
}
int l = ( char * ) nlp - ( data + pos ) ;
char rec_msg [ 80 ] ;
if ( strncmp ( data + pos , ( char * ) " ST= " , 3 ) = = 0 ) {
// we are reading the bootstrap
char * bs = ( char * ) malloc ( l + 1 - 3 ) ; // length + 1 (null byte) - 3 (header)
memcpy ( bs , data + pos + 3 , l - 3 ) ;
bs [ l - 3 ] = ' \0 ' ;
char * saveptr1 = NULL ;
char * saveptr2 = NULL ;
//char *saveptr3=NULL;
char * token = NULL ;
char * subtoken = NULL ;
//char *subtoken2 = NULL;
char * str1 = NULL ;
char * str2 = NULL ;
//char *str3 = NULL;
for ( str1 = bs ; ; str1 = NULL ) {
token = strtok_r ( str1 , " , " , & saveptr1 ) ;
if ( token = = NULL ) {
break ;
}
int j = 0 ;
for ( str2 = token ; ; str2 = NULL ) {
subtoken = strtok_r ( str2 , " : " , & saveptr2 ) ;
if ( subtoken = = NULL ) {
break ;
}
j + + ;
if ( j % 2 = = 1 ) { // we are reading the uuid
char * p = uuid_server ;
for ( unsigned int k = 0 ; k < strlen ( subtoken ) ; k + + ) {
if ( subtoken [ k ] ! = ' - ' ) {
* p = subtoken [ k ] ;
p + + ;
}
}
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids
uint64_t trx_from ;
uint64_t trx_to ;
sscanf ( subtoken , " %lu-%lu " , & trx_from , & trx_to ) ;
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
std : : string s = uuid_server ;
gtid_executed [ s ] . emplace_back ( trx_from , trx_to ) ;
}
}
}
pos + = l + 1 ;
free ( bs ) ;
//return true;
} else {
strncpy ( rec_msg , data + pos , l ) ;
pos + = l + 1 ;
rec_msg [ l ] = 0 ;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if ( rec_msg [ 0 ] = = ' I ' ) {
//char rec_uuid[80];
uint64_t rec_trxid = 0 ;
char * a = NULL ;
int ul = 0 ;
switch ( rec_msg [ 1 ] ) {
case ' 1 ' :
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr ( rec_msg + 3 , ' : ' ) ;
ul = a - rec_msg - 3 ;
strncpy ( uuid_server , rec_msg + 3 , ul ) ;
uuid_server [ ul ] = 0 ;
rec_trxid = atoll ( a + 1 ) ;
break ;
case ' 2 ' :
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid = atoll ( rec_msg + 3 ) ;
break ;
default :
break ;
}
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std : : string s = uuid_server ;
gtid_t new_gtid = std : : make_pair ( s , rec_trxid ) ;
addGtid ( new_gtid , gtid_executed ) ;
events_read + + ;
//return true;
}
}
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true ;
}
static std : : string gtid_executed_to_string ( gtid_set_t & gtid_executed ) {
std : : string gtid_set ;
for ( auto it = gtid_executed . begin ( ) ; it ! = gtid_executed . end ( ) ; + + it ) {
std : : string s = it - > first ;
s . insert ( 8 , " - " ) ;
s . insert ( 13 , " - " ) ;
s . insert ( 18 , " - " ) ;
s . insert ( 23 , " - " ) ;
s = s + " : " ;
for ( auto itr = it - > second . begin ( ) ; itr ! = it - > second . end ( ) ; + + itr ) {
std : : string s2 = s ;
s2 = s2 + std : : to_string ( itr - > first ) ;
s2 = s2 + " - " ;
s2 = s2 + std : : to_string ( itr - > second ) ;
s2 = s2 + " , " ;
gtid_set = gtid_set + s2 ;
}
}
// Extract latest comma only in case 'gtid_executed' isn't empty
if ( gtid_set . empty ( ) = = false ) {
gtid_set . pop_back ( ) ;
}
return gtid_set ;
}
static void addGtid ( const gtid_t & gtid , gtid_set_t & gtid_executed ) {
auto it = gtid_executed . find ( gtid . first ) ;
if ( it = = gtid_executed . end ( ) )
{
gtid_executed [ gtid . first ] . emplace_back ( gtid . second , gtid . second ) ;
return ;
}
bool flag = true ;
for ( auto itr = it - > second . begin ( ) ; itr ! = it - > second . end ( ) ; + + itr )
{
if ( gtid . second > = itr - > first & & gtid . second < = itr - > second )
return ;
if ( gtid . second + 1 = = itr - > first )
{
- - itr - > first ;
flag = false ;
break ;
}
else if ( gtid . second = = itr - > second + 1 )
{
+ + itr - > second ;
flag = false ;
break ;
}
else if ( gtid . second < itr - > first )
{
it - > second . emplace ( itr , gtid . second , gtid . second ) ;
return ;
}
}
if ( flag )
it - > second . emplace_back ( gtid . second , gtid . second ) ;
for ( auto itr = it - > second . begin ( ) ; itr ! = it - > second . end ( ) ; + + itr )
{
auto next_itr = std : : next ( itr ) ;
if ( next_itr ! = it - > second . end ( ) & & itr - > second + 1 = = next_itr - > first )
{
itr - > second = next_itr - > second ;
it - > second . erase ( next_itr ) ;
break ;
}
}
}
static void * GTID_syncer_run ( ) {
//struct ev_loop * gtid_ev_loop;
//gtid_ev_loop = NULL;
PgHGM - > gtid_ev_loop = ev_loop_new ( EVBACKEND_POLL | EVFLAG_NOENV ) ;
if ( PgHGM - > gtid_ev_loop = = NULL ) {
proxy_error ( " could not initialise GTID sync loop \n " ) ;
exit ( EXIT_FAILURE ) ;
}
//ev_async_init(gtid_ev_async, gtid_async_cb);
//ev_async_start(gtid_ev_loop, gtid_ev_async);
PgHGM - > gtid_ev_timer = ( struct ev_timer * ) malloc ( sizeof ( struct ev_timer ) ) ;
ev_async_init ( PgHGM - > gtid_ev_async , gtid_async_cb ) ;
ev_async_start ( PgHGM - > gtid_ev_loop , PgHGM - > gtid_ev_async ) ;
//ev_timer_init(PgHGM->gtid_ev_timer, gtid_timer_cb, __sync_add_and_fetch(&GloMTH->variables.binlog_reader_connect_retry_msec,0)/1000, 0);
ev_timer_init ( PgHGM - > gtid_ev_timer , gtid_timer_cb , 3 , 0 ) ;
ev_timer_start ( PgHGM - > gtid_ev_loop , PgHGM - > gtid_ev_timer ) ;
//ev_ref(gtid_ev_loop);
ev_run ( PgHGM - > gtid_ev_loop , 0 ) ;
//sleep(1000);
return NULL ;
}
# endif // 0
//static void * HGCU_thread_run() {
static void * HGCU_thread_run ( ) {
PtrArray * conn_array = new PtrArray ( ) ;
@ -725,12 +263,6 @@ PgSQL_Connection * PgSQL_SrvConnList::remove(int _k) {
return ( PgSQL_Connection * ) conns - > remove_index_fast ( _k ) ;
}
/*
unsigned int PgSQL_SrvConnList : : conns_length ( ) {
return conns - > len ;
}
*/
PgSQL_SrvConnList : : PgSQL_SrvConnList ( PgSQL_SrvC * _mysrvc ) {
mysrvc = _mysrvc ;
conns = new PtrArray ( ) ;
@ -749,37 +281,6 @@ PgSQL_SrvConnList::~PgSQL_SrvConnList() {
delete conns ;
}
#if 0
PgSQL_SrvList : : PgSQL_SrvList ( PgSQL_HGC * _myhgc ) {
myhgc = _myhgc ;
servers = new PtrArray ( ) ;
}
void PgSQL_SrvList : : add ( PgSQL_SrvC * s ) {
if ( s - > myhgc = = NULL ) {
s - > myhgc = myhgc ;
}
servers - > add ( s ) ;
}
int PgSQL_SrvList : : find_idx ( PgSQL_SrvC * s ) {
for ( unsigned int i = 0 ; i < servers - > len ; i + + ) {
PgSQL_SrvC * mysrv = ( PgSQL_SrvC * ) servers - > index ( i ) ;
if ( mysrv = = s ) {
return ( unsigned int ) i ;
}
}
return - 1 ;
}
void PgSQL_SrvList : : remove ( PgSQL_SrvC * s ) {
int i = find_idx ( s ) ;
assert ( i > = 0 ) ;
servers - > remove_index_fast ( ( unsigned int ) i ) ;
}
# endif // 0
void PgSQL_SrvConnList : : drop_all_connections ( ) {
proxy_debug ( PROXY_DEBUG_MYSQL_CONNPOOL , 7 , " Dropping all connections (%u total) on PgSQL_SrvConnList %p for server %s:%d , hostgroup=%d , status=%d \n " , conns_length ( ) , this , mysrvc - > address , mysrvc - > port , mysrvc - > myhgc - > hid , mysrvc - > status ) ;
while ( conns_length ( ) ) {
@ -926,62 +427,6 @@ PgSQL_SrvC::~PgSQL_SrvC() {
delete ConnectionsFree ;
}
#if 0
PgSQL_SrvList : : ~ PgSQL_SrvList ( ) {
myhgc = NULL ;
while ( servers - > len ) {
PgSQL_SrvC * mysrvc = ( PgSQL_SrvC * ) servers - > remove_index_fast ( 0 ) ;
delete mysrvc ;
}
delete servers ;
}
PgSQL_HGC : : PgSQL_HGC ( int _hid ) {
hid = _hid ;
mysrvs = new PgSQL_SrvList ( this ) ;
current_time_now = 0 ;
new_connections_now = 0 ;
attributes . initialized = false ;
reset_attributes ( ) ;
// Uninitialized server defaults. Should later be initialized via 'pgsql_hostgroup_attributes'.
servers_defaults . weight = - 1 ;
servers_defaults . max_connections = - 1 ;
servers_defaults . use_ssl = - 1 ;
}
void PgSQL_HGC : : reset_attributes ( ) {
if ( attributes . initialized = = false ) {
attributes . init_connect = NULL ;
attributes . comment = NULL ;
attributes . ignore_session_variables_text = NULL ;
}
attributes . initialized = true ;
attributes . configured = false ;
attributes . max_num_online_servers = 1000000 ;
attributes . throttle_connections_per_sec = 1000000 ;
attributes . autocommit = - 1 ;
attributes . free_connections_pct = 10 ;
attributes . handle_warnings = - 1 ;
attributes . multiplex = true ;
attributes . connection_warming = false ;
free ( attributes . init_connect ) ;
attributes . init_connect = NULL ;
free ( attributes . comment ) ;
attributes . comment = NULL ;
free ( attributes . ignore_session_variables_text ) ;
attributes . ignore_session_variables_text = NULL ;
if ( attributes . ignore_session_variables_json ) {
delete attributes . ignore_session_variables_json ;
attributes . ignore_session_variables_json = NULL ;
}
}
PgSQL_HGC : : ~ PgSQL_HGC ( ) {
reset_attributes ( ) ; // free all memory
delete mysrvs ;
}
# endif // 0
using metric_name = std : : string ;
using metric_help = std : : string ;
using metric_tags = std : : map < std : : string , std : : string > ;