@ -1,3 +1,4 @@
# include "GTID_Server_Data.h"
# include "MySQL_HostGroups_Manager.h"
# include "ev.h"
@ -47,21 +48,14 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
bool rc = true ;
rc = sd - > readall ( ) ;
if ( rc = = false ) {
//delete sd;
std : : string s1 = sd - > address ;
s1 . append ( " : " ) ;
s1 . append ( std : : to_string ( sd - > mysql_port ) ) ;
MyHGM - > gtid_missing_nodes = true ;
proxy_warning ( " GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > mysql_port ) ;
std : : unordered_map < string , GTID_Server_Data * > : : iterator it2 ;
it2 = MyHGM - > gtid_map . find ( s1 ) ;
if ( it2 ! = MyHGM - > gtid_map . end ( ) ) {
//MyHGM->gtid_map.erase(it2);
it2 - > second = NULL ;
delete sd ;
}
sd - > active = false ;
proxy_warning ( " GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > mysql_port ) ;
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
close ( w - > fd ) ;
free ( w ) ;
sd - > w = nullptr ;
} else {
sd - > dump ( ) ;
}
@ -71,48 +65,37 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
void connect_cb ( EV_P_ ev_io * w , int revents ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
struct ev_io * c = w ;
if ( revents & EV_WRITE ) {
int optval = 0 ;
socklen_t optlen = sizeof ( optval ) ;
if ( ( getsockopt ( w - > fd , SOL_SOCKET , SO_ERROR , & optval , & optlen ) = = - 1 ) | |
( optval ! = 0 ) ) {
/* Connection failed; try the next address in the list. */
//int errnum = optval ? optval : errno;
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
close ( w - > fd ) ;
int fd = w - > fd ;
GTID_Server_Data * sd = ( GTID_Server_Data * ) w - > data ;
// connect() completed, this watcher is no longer needed
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
free ( w ) ;
sd - > w = nullptr ;
// Based on fd status, proceed to next step -> waiting for read event on the socket
int error = 0 ;
socklen_t optlen = sizeof ( error ) ;
int rc = getsockopt ( fd , SOL_SOCKET , SO_ERROR , & error , & optlen ) ;
if ( rc = = - 1 | | error ! = 0 ) {
/* connection failed */
MyHGM - > gtid_missing_nodes = true ;
GTID_Server_Data * custom_data = ( GTID_Server_Data * ) w - > data ;
GTID_Server_Data * sd = custom_data ;
std : : string s1 = sd - > address ;
s1 . append ( " : " ) ;
s1 . append ( std : : to_string ( sd - > mysql_port ) ) ;
sd - > active = false ;
proxy_warning ( " GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > mysql_port ) ;
std : : unordered_map < string , GTID_Server_Data * > : : iterator it2 ;
it2 = MyHGM - > gtid_map . find ( s1 ) ;
if ( it2 ! = MyHGM - > gtid_map . end ( ) ) {
//MyHGM->gtid_map.erase(it2);
it2 - > second = NULL ;
delete sd ;
}
//delete custom_data;
free ( c ) ;
close ( fd ) ;
} else {
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
int fd = w - > fd ;
struct ev_io * new_w = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
new_w - > data = w - > data ;
GTID_Server_Data * custom_data = ( GTID_Server_Data * ) new_w - > data ;
custom_data - > w = new_w ;
free ( w ) ;
ev_io_init ( new_w , reader_cb , fd , EV_READ ) ;
ev_io_start ( MyHGM - > gtid_ev_loop , new_w ) ;
struct ev_io * read_watcher = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
read_watcher - > data = sd ;
sd - > w = read_watcher ;
ev_io_init ( read_watcher , reader_cb , fd , EV_READ ) ;
ev_io_start ( MyHGM - > gtid_ev_loop , read_watcher ) ;
}
}
pthread_mutex_unlock ( & ev_loop_mutex ) ;
}
struct ev_io * new_connect o r( char * address , uint16_t gtid_port , uint16_t mysql_port ) {
struct ev_io * new_connect _watche r( char * address , uint16_t gtid_port , uint16_t mysql_port ) {
int s ;
if ( ( s = socket ( AF_INET , SOCK_STREAM , 0 ) ) = = - 1 ) {
@ -149,8 +132,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
struct ev_io * c = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
if ( c ) {
ev_io_init ( c , connect_cb , s , EV_WRITE ) ;
GTID_Server_Data * custom_data = new GTID_Server_Data ( c , address , gtid_port , mysql_port ) ;
c - > data = ( void * ) custom_data ;
return c ;
}
/* else error */
@ -158,8 +139,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
return NULL ;
}
GTID_Server_Data : : GTID_Server_Data ( struct ev_io * _w , char * _address , uint16_t _port , uint16_t _mysql_port ) {
active = true ;
w = _w ;
@ -188,26 +167,32 @@ GTID_Server_Data::~GTID_Server_Data() {
}
bool GTID_Server_Data : : readall ( ) {
bool ret = true ;
if ( size = = len ) {
// buffer is full, expand
resize ( len * 2 ) ;
resize ( len * 2 ) ;
}
int rc = 0 ;
rc = read ( w - > fd , data + len , size - len ) ;
rc = read ( w - > fd , data + len , size - len ) ;
if ( rc > 0 ) {
len + = rc ;
return true ;
}
int myerr = errno ;
if ( rc = = 0 ) {
proxy_info ( " Read returned EOF \n " ) ;
return false ;
}
// rc == -1
proxy_error ( " Read failed, error %d \n " , myerr ) ;
if ( myerr = = EINTR | | myerr = = EAGAIN ) {
// non-blocking fd, so this should not be considered as an error
return true ;
} else {
int myerr = errno ;
proxy_error ( " Read returned %d bytes, error %d \n " , rc , myerr ) ;
if (
( rc = = 0 ) | |
( rc = = - 1 & & myerr ! = EINTR & & myerr ! = EAGAIN )
) {
ret = false ;
}
return false ;
}
return ret ;
}
@ -239,9 +224,6 @@ void GTID_Server_Data::dump() {
return ;
}
read_all_gtids ( ) ;
//int rc = write(1,data+pos,len-pos);
fflush ( stdout ) ;
///pos += rc;
if ( pos > = len / 2 ) {
memmove ( data , data + pos , len - pos ) ;
len = len - pos ;
@ -285,13 +267,12 @@ bool GTID_Server_Data::read_next_gtid() {
bs [ l - 3 ] = ' \0 ' ;
char * saveptr1 = NULL ;
char * saveptr2 = NULL ;
//char *saveptr3=NULL;
char * token = NULL ;
char * subtoken = NULL ;
//char *subtoken2 = NULL;
char * str1 = NULL ;
char * str2 = NULL ;
//char *str3 = NULL;
bool updated = false ;
for ( str1 = bs ; ; str1 = NULL ) {
token = strtok_r ( str1 , " , " , & saveptr1 ) ;
if ( token = = NULL ) {
@ -312,34 +293,30 @@ bool GTID_Server_Data::read_next_gtid() {
p + + ;
}
}
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids
uint64_t trx_from ;
uint64_t trx_to ;
sscanf ( subtoken , " %lu-%lu " , & trx_from , & trx_to ) ;
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
std : : string s = uuid_server ;
gtid_executed [ s ] . emplace_back ( trx_from , trx_to ) ;
updated = addGtidInterval ( gtid_executed , uuid_server , trx_from , trx_to ) | | updated ;
}
}
}
pos + = l + 1 ;
free ( bs ) ;
//return true;
if ( updated ) {
events_read + + ;
}
} else {
strncpy ( rec_msg , data + pos , l ) ;
pos + = l + 1 ;
rec_msg [ l ] = 0 ;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if ( rec_msg [ 0 ] = = ' I ' ) {
//char rec_uuid[80];
uint64_t rec_trxid = 0 ;
char * a = NULL ;
int ul = 0 ;
switch ( rec_msg [ 1 ] ) {
case ' 1 ' :
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr ( rec_msg + 3 , ' : ' ) ;
ul = a - rec_msg - 3 ;
strncpy ( uuid_server , rec_msg + 3 , ul ) ;
@ -347,21 +324,17 @@ bool GTID_Server_Data::read_next_gtid() {
rec_trxid = atoll ( a + 1 ) ;
break ;
case ' 2 ' :
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid = atoll ( rec_msg + 3 ) ;
break ;
default :
break ;
}
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std : : string s = uuid_server ;
gtid_t new_gtid = std : : make_pair ( s , rec_trxid ) ;
addGtid ( new_gtid , gtid_executed ) ;
events_read + + ;
//return true;
}
}
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true ;
}
@ -439,6 +412,43 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
}
}
bool addGtidInterval ( gtid_set_t & gtid_executed , std : : string server_uuid , int64_t txid_start , int64_t txid_end ) {
bool updated = true ;
auto it = gtid_executed . find ( server_uuid ) ;
if ( it = = gtid_executed . end ( ) ) {
gtid_executed [ server_uuid ] . emplace_back ( txid_start , txid_end ) ;
return updated ;
}
bool insert = true ;
// When ProxySQL reconnects with binlog reader, it might
// receive updated txid intervals in the bootstrap message.
// For example,
// before disconnection -> server_UUID:1-10
// after reconnection -> server_UUID:1-19
auto & txid_intervals = it - > second ;
for ( auto & interval : txid_intervals ) {
if ( interval . first = = txid_start ) {
if ( interval . second = = txid_end ) {
updated = false ;
} else {
interval . second = txid_end ;
}
insert = false ;
break ;
}
}
if ( insert ) {
txid_intervals . emplace_back ( txid_start , txid_end ) ;
}
return updated ;
}
void * GTID_syncer_run ( ) {
//struct ev_loop * gtid_ev_loop;
//gtid_ev_loop = NULL;