Merge branch 'v3.0' into fix/issue-4855

Signed-off-by: René Cannaò <rene@proxysql.com>
pull/5232/head
René Cannaò 4 months ago committed by GitHub
commit d188715a7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,5 +1,11 @@
#ifndef CLASS_GTID_Server_Data_H
#define CLASS_GTID_Server_Data_H
#include <cstddef>
#include <cstdint>
#include <string>
#include <proxysql_gtid.h>
class GTID_Server_Data {
public:
char *address;
@ -24,4 +30,7 @@ class GTID_Server_Data {
void read_all_gtids();
void dump();
};
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end);
#endif // CLASS_GTID_Server_Data_H

@ -104,6 +104,7 @@
#include <stddef.h>
#include <string.h>
#include <sys/types.h>
#include <cstdint>
#include <algorithm>
#include <functional>
#include <iostream>

@ -2,6 +2,7 @@
#define PROXYSQL_GTID
// highly inspired by libslave
// https://github.com/vozbu/libslave/
#include <string>
#include <unordered_map>
#include <list>
#include <utility>

@ -549,6 +549,36 @@ void ProxySQL_Admin::flush_mysql_variables___database_to_runtime(SQLite3DB *db,
flush_GENERIC_variables__checksum__database_to_runtime("mysql", checksum, epoch);
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
/**
* @brief Check and warn if TCP keepalive is disabled for MySQL connections.
*
* This safety check warns users when mysql-use_tcp_keepalive is set to false,
* which can cause connection instability in certain deployment scenarios.
*
* @warning Disabling TCP keepalive is unsafe when ProxySQL is deployed behind:
* - Network load balancers with idle connection timeouts
* - NAT firewalls with connection state timeout
* - Cloud environments with connection pooling
* - Any intermediate network device that drops idle connections
*
* @why_unsafe TCP keepalive sends periodic keep-alive packets on idle connections.
* When disabled:
* - Load balancers may drop connections from their connection pools
* - NAT devices may remove connection state from their tables
* - Cloud load balancers (AWS ELB, GCP Load Balancer, etc.) may terminate
* connections during idle periods
* - Results in sudden connection failures and "connection reset" errors
* - Can cause application downtime and poor user experience
*
* @recommendation Always set mysql-use_tcp_keepalive=true when deploying
* behind load balancers or in cloud environments.
*/
// Check for TCP keepalive setting and warn if disabled
int mysql_use_tcp_keepalive = GloMTH->get_variable_int((char *)"use_tcp_keepalive");
if (mysql_use_tcp_keepalive == 0) {
proxy_warning("mysql-use_tcp_keepalive is set to false. This may cause connection drops when ProxySQL is behind a network load balancer. Consider setting this to true.\n");
}
}
if (resultset) delete resultset;
}
@ -885,6 +915,40 @@ void ProxySQL_Admin::flush_pgsql_variables___database_to_runtime(SQLite3DB* db,
GloVars.checksums_values.mysql_variables.checksum, GloVars.checksums_values.mysql_variables.epoch
);
*/
/**
* @brief Check and warn if TCP keepalive is disabled for PostgreSQL connections.
*
* This safety check warns users when pgsql-use_tcp_keepalive is set to false,
* which can cause connection instability in certain deployment scenarios.
*
* @warning Disabling TCP keepalive is unsafe when ProxySQL is deployed behind:
* - Network load balancers with idle connection timeouts
* - NAT firewalls with connection state timeout
* - Cloud environments with connection pooling
* - Any intermediate network device that drops idle connections
*
* @why_unsafe TCP keepalive sends periodic keep-alive packets on idle connections.
* When disabled for PostgreSQL:
* - Load balancers may drop connections from their connection pools
* - NAT devices may remove connection state from their tables
* - Cloud load balancers (AWS ELB, GCP Load Balancer, etc.) may terminate
* connections during idle periods
* - PostgreSQL connections may appear "stale" to the database server
* - Results in sudden connection failures and "connection reset" errors
* - Can cause application downtime and poor user experience
*
* @note PostgreSQL connections are often long-lived and benefit greatly from
* TCP keepalive, especially in connection-pooled environments.
*
* @recommendation Always set pgsql-use_tcp_keepalive=true when deploying
* behind load balancers or in cloud environments.
*/
// Check for TCP keepalive setting and warn if disabled
int pgsql_use_tcp_keepalive = GloPTH->get_variable_int((char *)"use_tcp_keepalive");
if (pgsql_use_tcp_keepalive == 0) {
proxy_warning("pgsql-use_tcp_keepalive is set to false. This may cause connection drops when ProxySQL is behind a network load balancer. Consider setting this to true.\n");
}
}
if (resultset) delete resultset;
}

@ -1,3 +1,4 @@
#include "GTID_Server_Data.h"
#include "MySQL_HostGroups_Manager.h"
#include "ev.h"
@ -40,6 +41,29 @@ static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int rev
return;
}
/**
* @brief Data reception callback for established GTID server connections
*
* This callback handles reading GTID data from established connections to binlog readers.
* It processes incoming GTID information and manages connection failures gracefully.
*
* On successful read:
* - Processes the received GTID data
* - Calls dump() to parse and update GTID sets
*
* On read failure:
* - Marks the server connection as inactive
* - Sets gtid_missing_nodes flag to trigger reconnection
* - Performs proper cleanup of socket and watcher resources
* - Clears the watcher reference to maintain clean state
*
* @param loop The event loop (unused in this implementation)
* @param w The I/O watcher for data reception
* @param revents The events that triggered this callback
*
* @note This function is critical for maintaining GTID synchronization stability
* @note Proper resource cleanup prevents memory leaks and maintains system stability
*/
void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
pthread_mutex_lock(&ev_loop_mutex);
if (revents & EV_READ) {
@ -47,21 +71,14 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
bool rc = true;
rc = sd->readall();
if (rc == false) {
//delete sd;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
MyHGM->gtid_missing_nodes = true;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
sd->active = false;
proxy_warning("GTID: failed to read from ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
free(w);
sd->w = nullptr;
} else {
sd->dump();
}
@ -69,50 +86,63 @@ void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
pthread_mutex_unlock(&ev_loop_mutex);
}
/**
* @brief Connection establishment callback for GTID server connections
*
* This callback is triggered when a non-blocking connect() operation completes.
* It handles both successful connections and connection failures with proper
* resource cleanup and state management.
*
* On successful connection:
* - Stops and frees the connect watcher
* - Creates a new read watcher for data reception
* - Starts the read watcher to begin GTID data processing
*
* On connection failure:
* - Marks server as inactive
* - Logs appropriate warning messages
* - Performs proper cleanup of socket and watcher resources
*
* @param loop The event loop (unused in this implementation)
* @param w The I/O watcher for the connection
* @param revents The events that triggered this callback
*
* @note This function ensures proper resource management to prevent memory leaks
* @note Takes ev_loop_mutex to ensure thread-safe operations
*/
void connect_cb(EV_P_ ev_io *w, int revents) {
pthread_mutex_lock(&ev_loop_mutex);
struct ev_io * c = w;
if (revents & EV_WRITE) {
int optval = 0;
socklen_t optlen = sizeof(optval);
if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) ||
(optval != 0)) {
/* Connection failed; try the next address in the list. */
//int errnum = optval ? optval : errno;
ev_io_stop(MyHGM->gtid_ev_loop, w);
close(w->fd);
int fd = w->fd;
GTID_Server_Data *sd = (GTID_Server_Data *)w->data;
// connect() completed, this watcher is no longer needed
ev_io_stop(MyHGM->gtid_ev_loop, w);
free(w);
sd->w = nullptr;
// Based on fd status, proceed to next step -> waiting for read event on the socket
int error = 0;
socklen_t optlen = sizeof(error);
int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
if (rc == -1 || error != 0) {
/* connection failed */
MyHGM->gtid_missing_nodes = true;
GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data;
GTID_Server_Data *sd = custom_data;
std::string s1 = sd->address;
s1.append(":");
s1.append(std::to_string(sd->mysql_port));
sd->active = false;
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = MyHGM->gtid_map.find(s1);
if (it2 != MyHGM->gtid_map.end()) {
//MyHGM->gtid_map.erase(it2);
it2->second = NULL;
delete sd;
}
//delete custom_data;
free(c);
close(fd);
} else {
ev_io_stop(MyHGM->gtid_ev_loop, w);
int fd=w->fd;
struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io));
new_w->data = w->data;
GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data;
custom_data->w = new_w;
free(w);
ev_io_init(new_w, reader_cb, fd, EV_READ);
ev_io_start(MyHGM->gtid_ev_loop, new_w);
struct ev_io *read_watcher = (struct ev_io *) malloc(sizeof(struct ev_io));
read_watcher->data = sd;
sd->w = read_watcher;
ev_io_init(read_watcher, reader_cb, fd, EV_READ);
ev_io_start(MyHGM->gtid_ev_loop, read_watcher);
}
}
pthread_mutex_unlock(&ev_loop_mutex);
}
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) {
struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port) {
int s;
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
@ -149,8 +179,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io));
if (c) {
ev_io_init(c, connect_cb, s, EV_WRITE);
GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port);
c->data = (void *)custom_data;
return c;
}
/* else error */
@ -158,8 +186,6 @@ struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_p
return NULL;
}
GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
active = true;
w = _w;
@ -187,27 +213,50 @@ GTID_Server_Data::~GTID_Server_Data() {
free(data);
}
/**
* @brief Reads data from the GTID server connection socket
*
* Reads available data from the socket connection to the binlog reader.
* Handles different read conditions to provide robust connection management:
* - Successful read: Data is appended to internal buffer
* - EOF (rc == 0): Connection was gracefully closed by peer
* - Error conditions: Distinguishes between transient (EINTR/EAGAIN) and fatal errors
*
* This function is critical for maintaining stable GTID synchronization and
* properly detecting connection failures for reconnection handling.
*
* @return bool True if read was successful or should be retried, false on fatal errors
*
* @note Expands buffer automatically when full to prevent data loss
* @note EINTR and EAGAIN are not treated as errors for non-blocking sockets
*/
bool GTID_Server_Data::readall() {
bool ret = true;
if (size == len) {
// buffer is full, expand
resize(len*2);
resize(len * 2);
}
int rc = 0;
rc = read(w->fd,data+len,size-len);
rc = read(w->fd, data+len, size-len);
if (rc > 0) {
len += rc;
return true;
}
int myerr = errno;
if (rc == 0) {
proxy_info("Read returned EOF\n");
return false;
}
// rc == -1
proxy_error("Read failed, error %d\n", myerr);
if(myerr == EINTR || myerr == EAGAIN) {
// non-blocking fd, so this should not be considered as an error
return true;
} else {
int myerr = errno;
proxy_error("Read returned %d bytes, error %d\n", rc, myerr);
if (
(rc == 0) ||
(rc==-1 && myerr != EINTR && myerr != EAGAIN)
) {
ret = false;
}
return false;
}
return ret;
}
@ -239,9 +288,6 @@ void GTID_Server_Data::dump() {
return;
}
read_all_gtids();
//int rc = write(1,data+pos,len-pos);
fflush(stdout);
///pos += rc;
if (pos >= len/2) {
memmove(data,data+pos,len-pos);
len = len-pos;
@ -285,13 +331,12 @@ bool GTID_Server_Data::read_next_gtid() {
bs[l-3] = '\0';
char *saveptr1=NULL;
char *saveptr2=NULL;
//char *saveptr3=NULL;
char *token = NULL;
char *subtoken = NULL;
//char *subtoken2 = NULL;
char *str1 = NULL;
char *str2 = NULL;
//char *str3 = NULL;
bool updated = false;
for (str1 = bs; ; str1 = NULL) {
token = strtok_r(str1, ",", &saveptr1);
if (token == NULL) {
@ -312,34 +357,30 @@ bool GTID_Server_Data::read_next_gtid() {
p++;
}
}
//fprintf(stdout,"BS from %s\n", uuid_server);
} else { // we are reading the trxids
uint64_t trx_from;
uint64_t trx_to;
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
std::string s = uuid_server;
gtid_executed[s].emplace_back(trx_from, trx_to);
updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated;
}
}
}
pos += l+1;
free(bs);
//return true;
if (updated) {
events_read++;
}
} else {
strncpy(rec_msg,data+pos,l);
pos += l+1;
rec_msg[l] = 0;
//int rc = write(1,data+pos,l+1);
//fprintf(stdout,"%s\n", rec_msg);
if (rec_msg[0]=='I') {
//char rec_uuid[80];
uint64_t rec_trxid = 0;
char *a = NULL;
int ul = 0;
switch (rec_msg[1]) {
case '1':
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
a = strchr(rec_msg+3,':');
ul = a-rec_msg-3;
strncpy(uuid_server,rec_msg+3,ul);
@ -347,21 +388,17 @@ bool GTID_Server_Data::read_next_gtid() {
rec_trxid=atoll(a+1);
break;
case '2':
//sscanf(rec_msg+3,"%lu",&rec_trxid);
rec_trxid=atoll(rec_msg+3);
break;
default:
break;
}
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
std::string s = uuid_server;
gtid_t new_gtid = std::make_pair(s,rec_trxid);
addGtid(new_gtid,gtid_executed);
events_read++;
//return true;
}
}
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
return true;
}
@ -439,6 +476,66 @@ void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
}
}
/**
* @brief Adds or updates a GTID interval in the executed set
*
* This function intelligently merges GTID intervals to prevent events_count reset
* when a binlog reader reconnects and provides updated GTID sets. It handles
* reconnection scenarios where the server provides updated transaction ID ranges.
*
* For example, during reconnection:
* - Before disconnection: server_UUID:1-10
* - After reconnection: server_UUID:1-19
*
* This function will update the existing interval rather than replacing it,
* preserving the events_count metric accuracy.
*
* @param gtid_executed Reference to the GTID set to update
* @param server_uuid The server UUID string
* @param txid_start Starting transaction ID of the interval
* @param txid_end Ending transaction ID of the interval
* @return bool True if the GTID set was updated, false if interval already existed
*
* @note This function is critical for maintaining accurate GTID metrics across
* binlog reader reconnections and preventing events_count resets.
*/
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
bool updated = true;
auto it = gtid_executed.find(server_uuid);
if (it == gtid_executed.end()) {
gtid_executed[server_uuid].emplace_back(txid_start, txid_end);
return updated;
}
bool insert = true;
// When ProxySQL reconnects with binlog reader, it might
// receive updated txid intervals in the bootstrap message.
// For example,
// before disconnection -> server_UUID:1-10
// after reconnection -> server_UUID:1-19
auto &txid_intervals = it->second;
for (auto &interval : txid_intervals) {
if (interval.first == txid_start) {
if(interval.second == txid_end) {
updated = false;
} else {
interval.second = txid_end;
}
insert = false;
break;
}
}
if (insert) {
txid_intervals.emplace_back(txid_start, txid_end);
}
return updated;
}
void * GTID_syncer_run() {
//struct ev_loop * gtid_ev_loop;
//gtid_ev_loop = NULL;

@ -59,7 +59,7 @@ class MyHGC;
const int MYSQL_ERRORS_STATS_FIELD_NUM = 11;
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port);
struct ev_io * new_connect_watcher(char *address, uint16_t gtid_port, uint16_t mysql_port);
void * GTID_syncer_run();
static int wait_for_mysql(MYSQL *mysql, int status) {
@ -1680,80 +1680,101 @@ bool MySQL_HostGroups_Manager::gtid_exists(MySrvC *mysrvc, char * gtid_uuid, uin
return ret;
}
/**
* @brief Generates and manages GTID connection tables for all MySQL servers
*
* This function synchronizes the GTID server connections with the current MySQL server
* configuration. It handles server additions, removals, and reconnections with improved
* lifecycle management for stable operation.
*
* The function operates in several phases:
* 1. Mark all existing GTID connections as potentially stale
* 2. Iterate through configured MySQL servers to validate existing connections
* 3. Establish new connections for servers that don't have active GTID connections
* 4. Clean up stale connections for servers that are no longer configured
*
* Key improvements in this implementation:
* - Uses stale_server tracking for efficient cleanup
* - Maintains proper lock ordering (gtid_rwlock wrlock)
* - Reuses existing GTID_Server_Data objects when possible
* - Proper cleanup of ev_io watchers and socket resources
*
* @note This function must be called with appropriate locking to prevent race conditions
* @note Servers with OFFLINE_HARD status are skipped for connection establishment
*
* @thread_safety Thread-safe when called with proper external synchronization
*/
void MySQL_HostGroups_Manager::generate_mysql_gtid_executed_tables() {
// NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place,
// e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed.
wrlock();
pthread_rwlock_wrlock(&gtid_rwlock);
// first, set them all as active = false
// first, add them all as stale entries
std::unordered_set<string> stale_server;
std::unordered_map<string, GTID_Server_Data *>::iterator it = gtid_map.begin();
while(it != gtid_map.end()) {
GTID_Server_Data * gtid_si = it->second;
if (gtid_si) {
gtid_si->active = false;
}
stale_server.emplace(it->first);
it++;
}
// NOTE: We are required to lock while iterating over 'MyHostGroups'. Otherwise race conditions could take place,
// e.g. servers could be purged by 'purge_mysql_servers_table' and invalid memory be accessed.
wrlock();
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
MySrvC *mysrvc=NULL;
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->gtid_port) {
std::string s1 = mysrvc->address;
s1.append(":");
s1.append(std::to_string(mysrvc->port));
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
it2 = gtid_map.find(s1);
GTID_Server_Data *gtid_is=NULL;
if (it2!=gtid_map.end()) {
gtid_is=it2->second;
if (gtid_is == NULL) {
gtid_map.erase(it2);
}
std::string srv = mysrvc->address;
srv.append(":");
srv.append(std::to_string(mysrvc->port));
GTID_Server_Data *gtid_sd = nullptr;
it = gtid_map.find(srv);
if (it != gtid_map.end()) {
gtid_sd = it->second;
stale_server.erase(srv);
}
if (gtid_sd && gtid_sd->active) {
continue;
}
if (gtid_is) {
gtid_is->active = true;
} else if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) {
// we didn't find it. Create it
/*
struct ev_io *watcher = (struct ev_io *)malloc(sizeof(struct ev_io));
gtid_is = new GTID_Server_Data(watcher, mysrvc->address, mysrvc->port, mysrvc->gtid_port);
gtid_map.emplace(s1,gtid_is);
*/
struct ev_io * c = NULL;
c = new_connector(mysrvc->address, mysrvc->gtid_port, mysrvc->port);
if (c) {
gtid_is = (GTID_Server_Data *)c->data;
gtid_map.emplace(s1,gtid_is);
//pthread_mutex_lock(&ev_loop_mutex);
ev_io_start(MyHGM->gtid_ev_loop,c);
//pthread_mutex_unlock(&ev_loop_mutex);
if (mysrvc->get_status() != MYSQL_SERVER_STATUS_OFFLINE_HARD) {
// a new server with gtid port
// OR an existing server, but we lost connection with binlog_reader
struct ev_io *cw = new_connect_watcher(mysrvc->address, mysrvc->gtid_port, mysrvc->port);
if (cw) {
if (!gtid_sd) {
gtid_sd = new GTID_Server_Data(cw, mysrvc->address, mysrvc->gtid_port, mysrvc->port);
cw->data = (void *)gtid_sd;
gtid_map.emplace(srv, gtid_sd);
} else {
gtid_sd->w = cw;
gtid_sd->active = true;
cw->data = (void *)gtid_sd;
}
ev_io_start(MyHGM->gtid_ev_loop, cw);
}
}
}
}
}
wrunlock();
std::vector<string> to_remove;
it = gtid_map.begin();
while(it != gtid_map.end()) {
GTID_Server_Data * gtid_si = it->second;
if (gtid_si && gtid_si->active == false) {
to_remove.push_back(it->first);
for (auto &srv : stale_server) {
it = gtid_map.find(srv);
GTID_Server_Data *gtid_sd = it->second;
if (gtid_sd->w) {
ev_io_stop(MyHGM->gtid_ev_loop, gtid_sd->w);
close(gtid_sd->w->fd);
free(gtid_sd->w);
}
it++;
}
for (std::vector<string>::iterator it3=to_remove.begin(); it3!=to_remove.end(); ++it3) {
it = gtid_map.find(*it3);
GTID_Server_Data * gtid_si = it->second;
ev_io_stop(MyHGM->gtid_ev_loop, gtid_si->w);
close(gtid_si->w->fd);
free(gtid_si->w);
gtid_map.erase(*it3);
gtid_map.erase(srv);
delete gtid_sd;
}
pthread_rwlock_unlock(&gtid_rwlock);
wrunlock();
}
/**
@ -5807,48 +5828,39 @@ void MySQL_HostGroups_Manager::p_update_mysql_gtid_executed() {
std::unordered_map<string, GTID_Server_Data*>::iterator it = gtid_map.begin();
while(it != gtid_map.end()) {
GTID_Server_Data* gtid_si = it->second;
std::string address {};
std::string port {};
std::string endpoint_id {};
if (gtid_si) {
address = std::string(gtid_si->address);
port = std::to_string(gtid_si->mysql_port);
} else {
std::string s = it->first;
std::size_t found = s.find_last_of(":");
address = s.substr(0, found);
port = s.substr(found + 1);
GTID_Server_Data* gtid_sd = it->second;
if (!gtid_sd) {
// invalid state
continue;
}
endpoint_id = address + ":" + port;
const auto& gitd_id_counter = this->status.p_gtid_executed_map.find(endpoint_id);
prometheus::Counter* gtid_counter = nullptr;
std::string endpoint = it->first;
std::string address = std::string(gtid_sd->address);
std::string port = std::to_string(gtid_sd->mysql_port);
if (gitd_id_counter == this->status.p_gtid_executed_map.end()) {
auto& gitd_counter =
prometheus::Counter* gtid_counter = nullptr;
const auto& pc_itr = this->status.p_gtid_executed_map.find(endpoint);
if (pc_itr == this->status.p_gtid_executed_map.end()) {
auto& gtid_counter_group =
this->status.p_dyn_counter_array[p_hg_dyn_counter::gtid_executed];
gtid_counter = std::addressof(gitd_counter->Add({
gtid_counter = std::addressof(gtid_counter_group->Add({
{ "hostname", address },
{ "port", port },
}));
this->status.p_gtid_executed_map.insert(
{
endpoint_id,
endpoint,
gtid_counter
}
);
} else {
gtid_counter = gitd_id_counter->second;
gtid_counter = pc_itr->second;
}
if (gtid_si) {
const auto& cur_executed_gtid = gtid_counter->Value();
gtid_counter->Increment(gtid_si->events_read - cur_executed_gtid);
}
const auto& cur_executed_gtid = gtid_counter->Value();
gtid_counter->Increment(gtid_sd->events_read - cur_executed_gtid);
it++;
}

@ -32,6 +32,7 @@
"mysql-reg_test_4723_query_cache_stores_empty_result-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ],
"mysql-reg_test_4867_query_rules-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ],
"reg_test_4855_affected_rows_ddl-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ],
"reg_test_5212_tcp_keepalive_warnings-t" : [ "default-g1" ],
"mysql-set_transaction-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ],
"mysql-sql_log_bin-error-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ],
"mysql_stmt_send_long_data_large-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ],

@ -1,6 +1,7 @@
#include <algorithm>
#include <chrono>
#include <cstring>
#include <deque>
#include <fcntl.h>
#include <iostream>
#include <numeric>
@ -13,6 +14,7 @@
#include "json.hpp"
#include "re2/re2.h"
#include <regex>
#include "proxysql_utils.h"
@ -1627,6 +1629,84 @@ pair<size_t,vector<line_match_t>> get_matching_lines(
return { insp_lines, found_matches };
}
std::pair<size_t,std::vector<line_match_t>> get_matching_lines_from_filename(
const std::string& filename, const std::string& s_regex, bool get_matches, size_t max_lines
) {
vector<line_match_t> found_matches {};
// Open file for reading
std::ifstream file(filename);
if (!file.is_open()) {
diag("get_matching_lines_from_filename ERROR: Cannot open file '%s'", filename.c_str());
return { 0, found_matches };
}
// Read file line by line, keeping only the last max_lines in a queue
std::deque<string> recent_lines {};
size_t total_lines_read = 0;
string next_line;
while (getline(file, next_line)) {
total_lines_read++;
// Add to queue and maintain size
recent_lines.push_back(next_line);
if (recent_lines.size() > max_lines) {
recent_lines.pop_front();
}
}
// Create regex object once before the loop
std::regex regex;
try {
regex = std::regex(s_regex);
} catch (const std::regex_error& e) {
diag("get_matching_lines_from_filename ERROR: Invalid regex '%s': %s", s_regex.c_str(), e.what());
return { 0, found_matches };
}
// Process the recent lines from the queue
for (const string& line : recent_lines) {
std::smatch match;
if (get_matches) {
if (std::regex_search(line, match, regex)) {
found_matches.push_back({ static_cast<fstream::pos_type>(0), line, match.str() });
}
} else {
if (std::regex_search(line, regex)) {
found_matches.push_back({ static_cast<fstream::pos_type>(0), line, "" });
}
}
}
// Debug output
diag("get_matching_lines_from_filename DEBUG: filename='%s', total_lines_read=%zu, max_lines=%zu, lines_examined=%zu, matches_found=%zu",
filename.c_str(), total_lines_read, max_lines, recent_lines.size(), found_matches.size());
#if 0
// Print the last lines being examined for debugging
diag("=== DEBUG: Last %zu lines examined from '%s' ===", recent_lines.size(), filename.c_str());
for (size_t i = 0; i < recent_lines.size(); i++) {
diag("Line %zu: %s", i+1, recent_lines[i].c_str());
}
diag("=== END DEBUG LINES ===");
// Print all matching lines for debugging
for (size_t i = 0; i < found_matches.size(); i++) {
const string& match_line = std::get<LINE>(found_matches[i]);
diag("Match %zu: %s", i+1, match_line.c_str());
}
#endif // 0
// Close file
file.close();
// Return actual number of matches found, not lines examined
return { found_matches.size(), found_matches };
}
const uint32_t USLEEP_SQLITE_LOCKED = 100;
int open_sqlite3_db(const string& f_path, sqlite3** db, int flags) {

@ -766,6 +766,57 @@ std::pair<size_t,std::vector<line_match_t>> get_matching_lines(
std::fstream& f_stream, const std::string& regex, bool get_matches=false
);
/**
* @brief Scan last N lines from a file and find lines matching a regex pattern.
*
* This function provides memory-efficient scanning of log files by processing only
* the last N lines instead of loading the entire file into memory. It uses a queue-based
* approach to maintain the most recent lines and applies regex matching to identify
* lines containing specific patterns.
*
* This is particularly useful for TAP tests that need to verify log messages were
* written during test execution, allowing efficient examination of recent log entries
* without loading entire log files that may be very large.
*
* @param filename Path to the file to scan
* @param regex Regular expression pattern to match against line content
* @param get_matches If true, capture and return the matched substrings; if false, only track matching lines
* @param max_lines Maximum number of lines from end of file to examine (controls memory usage and focus)
*
* @return std::pair<size_t, std::vector<line_match_t>> where:
* - first: Number of matches found
* - second: Vector of line_match_t tuples containing match information:
* * POS: File position (placeholder 0 for this implementation)
* * LINE: Complete line content that matched the pattern
* * MATCH: Matched substring if get_matches=true, empty string otherwise
*
* @note This function avoids stream sharing issues by opening its own file handle,
* making it safe for multiple concurrent calls within the same test.
*
* @warning This function reads the entire file to extract the last max_lines, so
* very large files will still incur I/O overhead, but memory usage is bounded.
*
* @example
* // Find TCP keepalive warnings in last 10 lines of ProxySQL log
* auto [match_count, matches] = get_matching_lines_from_filename(
* "/var/log/proxysql.log",
* ".*WARNING.*tcp_keepalive.*",
* true,
* 10
* );
* if (match_count > 0) {
* // Found TCP keepalive warnings
* for (const auto& match : matches) {
* const string& line = std::get<LINE>(match);
* printf("Warning: %s\n", line.c_str());
* }
* }
*/
std::pair<size_t,std::vector<line_match_t>> get_matching_lines_from_filename(
const std::string& filename, const std::string& regex, bool get_matches, size_t max_lines
);
/**
* @brief Row entries from 'debug_log' table, from debug database.
*/

@ -0,0 +1,110 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <fstream>
#include "mysql.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
using namespace std;
int main(int argc, char** argv) {
CommandLine cl;
// Plan for 6 tests
plan(6);
// Get connections
MYSQL* admin = mysql_init(NULL);
if (!admin) {
diag("Failed to initialize admin connection");
return exit_status();
}
if (!mysql_real_connect(admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
diag("Failed to connect to ProxySQL admin: %s", mysql_error(admin));
mysql_close(admin);
return exit_status();
}
// Get the log file path
const string log_path { get_env("REGULAR_INFRA_DATADIR") + "/proxysql.log" };
// Test 1: MySQL TCP keepalive warning
diag("Testing MySQL TCP keepalive warning when set to false");
{
// Set MySQL TCP keepalive to false
int query_err = mysql_query(admin, "SET mysql-use_tcp_keepalive='false'");
ok(query_err == 0, "SET mysql-use_tcp_keepalive='false' should succeed");
if (query_err != 0) {
diag("Error setting mysql-use_tcp_keepalive: %s", mysql_error(admin));
mysql_close(admin);
return exit_status();
}
// Load MySQL variables to runtime to trigger warning
query_err = mysql_query(admin, "LOAD MYSQL VARIABLES TO RUNTIME");
ok(query_err == 0, "LOAD MYSQL VARIABLES TO RUNTIME should succeed");
if (query_err != 0) {
diag("Error loading MySQL variables: %s", mysql_error(admin));
mysql_close(admin);
return exit_status();
}
// Wait a bit for the warning to be written to log
usleep(200000); // 200ms
// Check for the warning in the log - scan only last 10 lines using filename-based function
const string warning_regex { ".*WARNING.*mysql-use_tcp_keepalive is set to false.*" };
const auto& [match_count, warning_lines] = get_matching_lines_from_filename(log_path, warning_regex, true, 10);
// Scanning only last 10 lines ensures we're looking at recent log entries
ok(match_count > 0, "MySQL TCP keepalive warning should appear in log when set to false");
if (match_count == 0) {
diag("Expected MySQL TCP keepalive warning not found in last 10 lines of log");
}
}
// Test 2: PostgreSQL TCP keepalive warning
diag("Testing PostgreSQL TCP keepalive warning when set to false");
{
// Set PostgreSQL TCP keepalive to false
int query_err = mysql_query(admin, "SET pgsql-use_tcp_keepalive='false'");
ok(query_err == 0, "SET pgsql-use_tcp_keepalive='false' should succeed");
if (query_err != 0) {
diag("Error setting pgsql-use_tcp_keepalive: %s", mysql_error(admin));
mysql_close(admin);
return exit_status();
}
// Load PgSQL variables to runtime to trigger warning
query_err = mysql_query(admin, "LOAD PGSQL VARIABLES TO RUNTIME");
ok(query_err == 0, "LOAD PGSQL VARIABLES TO RUNTIME should succeed");
if (query_err != 0) {
diag("Error loading PgSQL variables: %s", mysql_error(admin));
mysql_close(admin);
return exit_status();
}
// Wait a bit for the warning to be written to log
usleep(500000); // 500ms
// Check for the warning in the log - scan only last 10 lines using filename-based function
const string warning_regex { ".*WARNING.*pgsql-use_tcp_keepalive is set to false.*" };
const auto& [match_count, warning_lines] = get_matching_lines_from_filename(log_path, warning_regex, true, 10);
// Scanning only last 10 lines ensures we're looking at recent log entries
ok(match_count > 0, "PostgreSQL TCP keepalive warning should appear in log when set to false");
if (match_count == 0) {
diag("Expected PostgreSQL TCP keepalive warning not found in last 10 lines of log");
}
}
mysql_close(admin);
return exit_status();
}

@ -90,6 +90,7 @@ int check_prepare_statement_mem_usage(MYSQL* proxysql_admin, MYSQL* proxysql, co
old_prep_stmt_backend_mem, new_prep_stmt_backend_mem);
mysql_stmt_close(stmt);
usleep(10000);
return EXIT_SUCCESS;
}

Loading…
Cancel
Save