@ -41,6 +41,29 @@ static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int rev
return ;
}
/**
* @ brief Data reception callback for established GTID server connections
*
* This callback handles reading GTID data from established connections to binlog readers .
* It processes incoming GTID information and manages connection failures gracefully .
*
* On successful read :
* - Processes the received GTID data
* - Calls dump ( ) to parse and update GTID sets
*
* On read failure :
* - Marks the server connection as inactive
* - Sets gtid_missing_nodes flag to trigger reconnection
* - Performs proper cleanup of socket and watcher resources
* - Clears the watcher reference to maintain clean state
*
* @ param loop The event loop ( unused in this implementation )
* @ param w The I / O watcher for data reception
* @ param revents The events that triggered this callback
*
* @ note This function is critical for maintaining GTID synchronization stability
* @ note Proper resource cleanup prevents memory leaks and maintains system stability
*/
void reader_cb ( struct ev_loop * loop , struct ev_io * w , int revents ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
if ( revents & EV_READ ) {
@ -63,6 +86,30 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
pthread_mutex_unlock ( & ev_loop_mutex ) ;
}
/**
* @ brief Connection establishment callback for GTID server connections
*
* This callback is triggered when a non - blocking connect ( ) operation completes .
* It handles both successful connections and connection failures with proper
* resource cleanup and state management .
*
* On successful connection :
* - Stops and frees the connect watcher
* - Creates a new read watcher for data reception
* - Starts the read watcher to begin GTID data processing
*
* On connection failure :
* - Marks server as inactive
* - Logs appropriate warning messages
* - Performs proper cleanup of socket and watcher resources
*
* @ param loop The event loop ( unused in this implementation )
* @ param w The I / O watcher for the connection
* @ param revents The events that triggered this callback
*
* @ note This function ensures proper resource management to prevent memory leaks
* @ note Takes ev_loop_mutex to ensure thread - safe operations
*/
void connect_cb ( EV_P_ ev_io * w , int revents ) {
pthread_mutex_lock ( & ev_loop_mutex ) ;
if ( revents & EV_WRITE ) {
@ -166,6 +213,23 @@ GTID_Server_Data::~GTID_Server_Data() {
free ( data ) ;
}
/**
* @ brief Reads data from the GTID server connection socket
*
* Reads available data from the socket connection to the binlog reader .
* Handles different read conditions to provide robust connection management :
* - Successful read : Data is appended to internal buffer
* - EOF ( rc = = 0 ) : Connection was gracefully closed by peer
* - Error conditions : Distinguishes between transient ( EINTR / EAGAIN ) and fatal errors
*
* This function is critical for maintaining stable GTID synchronization and
* properly detecting connection failures for reconnection handling .
*
* @ return bool True if read was successful or should be retried , false on fatal errors
*
* @ note Expands buffer automatically when full to prevent data loss
* @ note EINTR and EAGAIN are not treated as errors for non - blocking sockets
*/
bool GTID_Server_Data : : readall ( ) {
if ( size = = len ) {
// buffer is full, expand
@ -412,6 +476,29 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
}
}
/**
* @ brief Adds or updates a GTID interval in the executed set
*
* This function intelligently merges GTID intervals to prevent events_count reset
* when a binlog reader reconnects and provides updated GTID sets . It handles
* reconnection scenarios where the server provides updated transaction ID ranges .
*
* For example , during reconnection :
* - Before disconnection : server_UUID : 1 - 10
* - After reconnection : server_UUID : 1 - 19
*
* This function will update the existing interval rather than replacing it ,
* preserving the events_count metric accuracy .
*
* @ param gtid_executed Reference to the GTID set to update
* @ param server_uuid The server UUID string
* @ param txid_start Starting transaction ID of the interval
* @ param txid_end Ending transaction ID of the interval
* @ return bool True if the GTID set was updated , false if interval already existed
*
* @ note This function is critical for maintaining accurate GTID metrics across
* binlog reader reconnections and preventing events_count resets .
*/
bool addGtidInterval ( gtid_set_t & gtid_executed , std : : string server_uuid , int64_t txid_start , int64_t txid_end ) {
bool updated = true ;