@ -1,3 +1,4 @@
# include "GTID_Server_Data.h"
# include "MySQL_HostGroups_Manager.h"
# include "ev.h"
@ -40,6 +41,29 @@ static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int rev
return ;
}
/**
* @ brief Data reception callback for established GTID server connections
*
* This callback handles reading GTID data from established connections to binlog readers .
* It processes incoming GTID information and manages connection failures gracefully .
*
* On successful read :
* - Processes the received GTID data
* - Calls dump ( ) to parse and update GTID sets
*
* On read failure :
* - Marks the server connection as inactive
* - Sets gtid_missing_nodes flag to trigger reconnection
* - Performs proper cleanup of socket and watcher resources
* - Clears the watcher reference to maintain clean state
*
* @ param loop The event loop ( unused in this implementation )
* @ param w The I / O watcher for data reception
* @ param revents The events that triggered this callback
*
* @ note This function is critical for maintaining GTID synchronization stability
* @ note Proper resource cleanup prevents memory leaks and maintains system stability
*/
void reader_cb ( struct ev_loop * loop , struct ev_io * w , int revents ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
if ( revents & EV_READ ) {
@ -47,21 +71,14 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
bool rc = true ;
rc = sd - > readall ( ) ;
if ( rc = = false ) {
//delete sd;
std : : string s1 = sd - > address ;
s1 . append ( " : " ) ;
s1 . append ( std : : to_string ( sd - > mysql_port ) ) ;
MyHGM - > gtid_missing_nodes = true ;
proxy_warning ( " GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > mysql_port ) ;
std : : unordered_map < string , GTID_Server_Data * > : : iterator it2 ;
it2 = MyHGM - > gtid_map . find ( s1 ) ;
if ( it2 ! = MyHGM - > gtid_map . end ( ) ) {
//MyHGM->gtid_map.erase(it2);
it2 - > second = NULL ;
delete sd ;
}
sd - > active = false ;
proxy_warning ( " GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > mysql_port ) ;
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
close ( w - > fd ) ;
free ( w ) ;
sd - > w = nullptr ;
} else {
sd - > dump ( ) ;
}
@ -69,50 +86,63 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
pthread_mutex_unlock ( & ev_loop_mutex ) ;
}
/**
* @ brief Connection establishment callback for GTID server connections
*
* This callback is triggered when a non - blocking connect ( ) operation completes .
* It handles both successful connections and connection failures with proper
* resource cleanup and state management .
*
* On successful connection :
* - Stops and frees the connect watcher
* - Creates a new read watcher for data reception
* - Starts the read watcher to begin GTID data processing
*
* On connection failure :
* - Marks server as inactive
* - Logs appropriate warning messages
* - Performs proper cleanup of socket and watcher resources
*
* @ param loop The event loop ( unused in this implementation )
* @ param w The I / O watcher for the connection
* @ param revents The events that triggered this callback
*
* @ note This function ensures proper resource management to prevent memory leaks
* @ note Takes ev_loop_mutex to ensure thread - safe operations
*/
void connect_cb ( EV_P_ ev_io * w , int revents ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
struct ev_io * c = w ;
if ( revents & EV_WRITE ) {
int optval = 0 ;
socklen_t optlen = sizeof ( optval ) ;
if ( ( getsockopt ( w - > fd , SOL_SOCKET , SO_ERROR , & optval , & optlen ) = = - 1 ) | |
( optval ! = 0 ) ) {
/* Connection failed; try the next address in the list. */
//int errnum = optval ? optval : errno;
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
close ( w - > fd ) ;
int fd = w - > fd ;
GTID_Server_Data * sd = ( GTID_Server_Data * ) w - > data ;
// connect() completed, this watcher is no longer needed
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
free ( w ) ;
sd - > w = nullptr ;
// Based on fd status, proceed to next step -> waiting for read event on the socket
int error = 0 ;
socklen_t optlen = sizeof ( error ) ;
int rc = getsockopt ( fd , SOL_SOCKET , SO_ERROR , & error , & optlen ) ;
if ( rc = = - 1 | | error ! = 0 ) {
/* connection failed */
MyHGM - > gtid_missing_nodes = true ;
GTID_Server_Data * custom_data = ( GTID_Server_Data * ) w - > data ;
GTID_Server_Data * sd = custom_data ;
std : : string s1 = sd - > address ;
s1 . append ( " : " ) ;
s1 . append ( std : : to_string ( sd - > mysql_port ) ) ;
sd - > active = false ;
proxy_warning ( " GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d \n " , sd - > port , sd - > address , sd - > mysql_port ) ;
std : : unordered_map < string , GTID_Server_Data * > : : iterator it2 ;
it2 = MyHGM - > gtid_map . find ( s1 ) ;
if ( it2 ! = MyHGM - > gtid_map . end ( ) ) {
//MyHGM->gtid_map.erase(it2);
it2 - > second = NULL ;
delete sd ;
}
//delete custom_data;
free ( c ) ;
close ( fd ) ;
} else {
ev_io_stop ( MyHGM - > gtid_ev_loop , w ) ;
int fd = w - > fd ;
struct ev_io * new_w = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
new_w - > data = w - > data ;
GTID_Server_Data * custom_data = ( GTID_Server_Data * ) new_w - > data ;
custom_data - > w = new_w ;
free ( w ) ;
ev_io_init ( new_w , reader_cb , fd , EV_READ ) ;
ev_io_start ( MyHGM - > gtid_ev_loop , new_w ) ;
struct ev_io * read_watcher = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
read_watcher - > data = sd ;
sd - > w = read_watcher ;
ev_io_init ( read_watcher , reader_cb , fd , EV_READ ) ;
ev_io_start ( MyHGM - > gtid_ev_loop , read_watcher ) ;
}
}
pthread_mutex_unlock ( & ev_loop_mutex ) ;
}
struct ev_io * new_connect o r( char * address , uint16_t gtid_port , uint16_t mysql_port ) {
struct ev_io * new_connect _watche r( char * address , uint16_t gtid_port , uint16_t mysql_port ) {
int s ;
if ( ( s = socket ( AF_INET , SOCK_STREAM , 0 ) ) = = - 1 ) {
@ -149,8 +179,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
struct ev_io * c = ( struct ev_io * ) malloc ( sizeof ( struct ev_io ) ) ;
if ( c ) {
ev_io_init ( c , connect_cb , s , EV_WRITE ) ;
GTID_Server_Data * custom_data = new GTID_Server_Data ( c , address , gtid_port , mysql_port ) ;
c - > data = ( void * ) custom_data ;
return c ;
}
/* else error */
@ -158,8 +186,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
return NULL ;
}
GTID_Server_Data : : GTID_Server_Data ( struct ev_io * _w , char * _address , uint16_t _port , uint16_t _mysql_port ) {
active = true ;
w = _w ;
@ -187,27 +213,50 @@ GTID_Server_Data::~GTID_Server_Data() {
free ( data ) ;
}
/**
* @ brief Reads data from the GTID server connection socket
*
* Reads available data from the socket connection to the binlog reader .
* Handles different read conditions to provide robust connection management :
* - Successful read : Data is appended to internal buffer
* - EOF ( rc = = 0 ) : Connection was gracefully closed by peer
* - Error conditions : Distinguishes between transient ( EINTR / EAGAIN ) and fatal errors
*
* This function is critical for maintaining stable GTID synchronization and
* properly detecting connection failures for reconnection handling .
*
* @ return bool True if read was successful or should be retried , false on fatal errors
*
* @ note Expands buffer automatically when full to prevent data loss
* @ note EINTR and EAGAIN are not treated as errors for non - blocking sockets
*/
bool GTID_Server_Data : : readall ( ) {
bool ret = true ;
if ( size = = len ) {
// buffer is full, expand
resize ( len * 2 ) ;
resize ( len * 2 ) ;
}
int rc = 0 ;
rc = read ( w - > fd , data + len , size - len ) ;
rc = read ( w - > fd , data + len , size - len ) ;
if ( rc > 0 ) {
len + = rc ;
return true ;
}
int myerr = errno ;
if ( rc = = 0 ) {
proxy_info ( " Read returned EOF \n " ) ;
return false ;
}
// rc == -1
proxy_error ( " Read failed, error %d \n " , myerr ) ;
if ( myerr = = EINTR | | myerr = = EAGAIN ) {
// non-blocking fd, so this should not be considered as an error
return true ;
} else {
int myerr = errno ;
proxy_error ( " Read returned %d bytes, error %d \n " , rc , myerr ) ;
if (
( rc = = 0 ) | |
( rc = = - 1 & & myerr ! = EINTR & & myerr ! = EAGAIN )
) {
ret = false ;
}
return false ;
}
return ret ;
}
@ -239,9 +288,6 @@ void GTID_Server_Data::dump() {
return ;
}
read_all_gtids ( ) ;
//int rc = write(1,data+pos,len-pos);
fflush ( stdout ) ;
///pos += rc;
if ( pos > = len / 2 ) {
memmove ( data , data + pos , len - pos ) ;
len = len - pos ;
@ -285,13 +331,12 @@ bool GTID_Server_Data::read_next_gtid() {
bs [ l - 3 ] = ' \0 ' ;
char * saveptr1 = NULL ;
char * saveptr2 = NULL ;
//char *saveptr3=NULL;
char * token = NULL ;
char * subtoken = NULL ;
//char *subtoken2 = NULL;
char * str1 = NULL ;
char * str2 = NULL ;
//char *str3 = NULL;
bool updated = false ;
for ( str1 = bs ; ; str1 = NULL ) {
token = strtok_r ( str1 , " , " , & saveptr1 ) ;
if ( token = = NULL ) {
@ -312,34 +357,30 @@ bool GTID_Server_Data::read_next_gtid() {
p + + ;
}
}
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids
uint64_t trx_from ;
uint64_t trx_to ;
sscanf ( subtoken , " %lu-%lu " , & trx_from , & trx_to ) ;
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
std : : string s = uuid_server ;
gtid_executed [ s ] . emplace_back ( trx_from , trx_to ) ;
updated = addGtidInterval ( gtid_executed , uuid_server , trx_from , trx_to ) | | updated ;
}
}
}
pos + = l + 1 ;
free ( bs ) ;
//return true;
if ( updated ) {
events_read + + ;
}
} else {
strncpy ( rec_msg , data + pos , l ) ;
pos + = l + 1 ;
rec_msg [ l ] = 0 ;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if ( rec_msg [ 0 ] = = ' I ' ) {
//char rec_uuid[80];
uint64_t rec_trxid = 0 ;
char * a = NULL ;
int ul = 0 ;
switch ( rec_msg [ 1 ] ) {
case ' 1 ' :
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr ( rec_msg + 3 , ' : ' ) ;
ul = a - rec_msg - 3 ;
strncpy ( uuid_server , rec_msg + 3 , ul ) ;
@ -347,21 +388,17 @@ bool GTID_Server_Data::read_next_gtid() {
rec_trxid = atoll ( a + 1 ) ;
break ;
case ' 2 ' :
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid = atoll ( rec_msg + 3 ) ;
break ;
default :
break ;
}
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std : : string s = uuid_server ;
gtid_t new_gtid = std : : make_pair ( s , rec_trxid ) ;
addGtid ( new_gtid , gtid_executed ) ;
events_read + + ;
//return true;
}
}
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true ;
}
@ -439,6 +476,66 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
}
}
/**
* @ brief Adds or updates a GTID interval in the executed set
*
* This function intelligently merges GTID intervals to prevent events_count reset
* when a binlog reader reconnects and provides updated GTID sets . It handles
* reconnection scenarios where the server provides updated transaction ID ranges .
*
* For example , during reconnection :
* - Before disconnection : server_UUID : 1 - 10
* - After reconnection : server_UUID : 1 - 19
*
* This function will update the existing interval rather than replacing it ,
* preserving the events_count metric accuracy .
*
* @ param gtid_executed Reference to the GTID set to update
* @ param server_uuid The server UUID string
* @ param txid_start Starting transaction ID of the interval
* @ param txid_end Ending transaction ID of the interval
* @ return bool True if the GTID set was updated , false if interval already existed
*
* @ note This function is critical for maintaining accurate GTID metrics across
* binlog reader reconnections and preventing events_count resets .
*/
bool addGtidInterval ( gtid_set_t & gtid_executed , std : : string server_uuid , int64_t txid_start , int64_t txid_end ) {
bool updated = true ;
auto it = gtid_executed . find ( server_uuid ) ;
if ( it = = gtid_executed . end ( ) ) {
gtid_executed [ server_uuid ] . emplace_back ( txid_start , txid_end ) ;
return updated ;
}
bool insert = true ;
// When ProxySQL reconnects with binlog reader, it might
// receive updated txid intervals in the bootstrap message.
// For example,
// before disconnection -> server_UUID:1-10
// after reconnection -> server_UUID:1-19
auto & txid_intervals = it - > second ;
for ( auto & interval : txid_intervals ) {
if ( interval . first = = txid_start ) {
if ( interval . second = = txid_end ) {
updated = false ;
} else {
interval . second = txid_end ;
}
insert = false ;
break ;
}
}
if ( insert ) {
txid_intervals . emplace_back ( txid_start , txid_end ) ;
}
return updated ;
}
void * GTID_syncer_run ( ) {
//struct ev_loop * gtid_ev_loop;
//gtid_ev_loop = NULL;