Add TAP test for fast forward grace close feature

- Rename and modify test to use MySQL C API mysql_binlog_* functions
- Implement throttled binlog reading with 5 iterations (no limit, 2s, 5s, 20s, 60s targets)
- Add diagnostics for debugging binlog fetch issues
- Set RPL options for file, position, server_id, and non-blocking flag
- Update Makefile to compile with MySQL client library
pull/5203/head
Rene Cannao 6 months ago
parent 44aa606caa
commit ae93966603

@ -3789,11 +3789,16 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) {
if (mybe->server_myds->status == MYSQL_SERVER_STATUS_OFFLINE_HARD || mybe->server_myds->fd == -1) {
if (!backend_closed_in_fast_forward) {
backend_closed_in_fast_forward = true;
cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << fast_forward_grace_start_time << " to " << thread->curtime << endl;
fast_forward_grace_start_time = thread->curtime;
}
}
if (backend_closed_in_fast_forward) {
if (client_myds->PSarrayOUT->len == 0 && (client_myds->queueOUT.head - client_myds->queueOUT.tail) == 0) {
if (
( mybe->server_myds == nullptr || ( mybe->server_myds && mybe->server_myds->PSarrayIN->len == 0 ) )
&&
(client_myds->PSarrayOUT->len == 0 && (client_myds->queueOUT.head - client_myds->queueOUT.tail) == 0)
) {
// buffers empty, close
handler_ret = -1;
} else if (thread->curtime - fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) {

@ -112,8 +112,6 @@ extern MySQL_Threads_Handler *GloMTH;
extern MySQL_Monitor *GloMyMon;
extern MySQL_Logger *GloMyLogger;
//__thread int mysql_thread___fast_forward_grace_close_ms;
typedef struct mythr_st_vars {
enum MySQL_Thread_status_variable v_idx;
p_th_counter::metric m_idx;
@ -2290,7 +2288,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["handle_unknown_charset"] = make_tuple(&variables.handle_unknown_charset, 0, HANDLE_UNKNOWN_CHARSET__MAX_HANDLE_VALUE, false);
VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false);
VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false);
VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false);
VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false);
VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false);
VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false);
VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false);
@ -3756,7 +3754,25 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
// if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library
if (myds->sess->session_fast_forward) { // if fast forward
if (myds->myds_type==MYDS_BACKEND) { // and backend
myds->sess->set_unhealthy(); // set unhealthy
// myds->sess->set_unhealthy(); // set unhealthy
// Fast Forward Grace Close Logic:
// If the backend closed during fast forward mode, we defer session closure to allow
// pending client output buffers to drain, preventing data loss.
// Detect if backend closed during fast forward
if (myds->sess->backend_closed_in_fast_forward == false) {
myds->sess->backend_closed_in_fast_forward = true;
//cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << myds->sess->fast_forward_grace_start_time << " to " << curtime << endl;
myds->sess->fast_forward_grace_start_time = curtime;
}
if (myds->sess->backend_closed_in_fast_forward) {
if (myds->PSarrayIN->len == 0 && myds->sess->client_myds->PSarrayOUT->len == 0 && (myds->sess->client_myds->queueOUT.head - myds->sess->client_myds->queueOUT.tail) == 0) {
// buffers empty, close
myds->sess->set_unhealthy(); // set unhealthy
} else if (curtime - myds->sess->fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) {
// timeout, close
myds->sess->set_unhealthy(); // set unhealthy
}
}
}
}
}
@ -3933,9 +3949,17 @@ void MySQL_Thread::ProcessAllSessions_Healthy0(MySQL_Session *sess, unsigned int
sess->client_myds->addr.port
);
} else {
string extra_info = "";
if (sess->backend_closed_in_fast_forward == true) {
unsigned long long lapse = curtime - sess->fast_forward_grace_start_time;
extra_info = "Yes , " + to_string(lapse/1000) + " ms ago";
} else {
extra_info = "No";
}
proxy_warning(
"Closing 'fast_forward' client connection %s:%d\n", sess->client_myds->addr.addr,
sess->client_myds->addr.port
"Closing 'fast_forward' client connection %s:%d . Backend already close: %s\n",
sess->client_myds->addr.addr, sess->client_myds->addr.port,
extra_info.c_str()
);
}
}
@ -4145,6 +4169,7 @@ void MySQL_Thread::refresh_variables() {
REFRESH_VARIABLE_INT(connect_timeout_server);
REFRESH_VARIABLE_INT(connect_timeout_server_max);
REFRESH_VARIABLE_INT(free_connections_pct);
REFRESH_VARIABLE_INT(fast_forward_grace_close_ms);
#ifdef IDLE_THREADS
REFRESH_VARIABLE_INT(session_idle_ms);
#endif // IDLE_THREADS

@ -577,11 +577,14 @@ int MySQL_Data_Stream::read_from_net() {
// Shutdown if we either received the EOF, or operation failed with non-retryable error.
if (ssl_recv_bytes==0 || (ssl_recv_bytes==-1 && errno != EINTR && errno != EAGAIN)) {
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && ssl_recv_bytes==0) {
if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
sess->backend_closed_in_fast_forward = true;
sess->fast_forward_grace_start_time = sess->thread->curtime;
sess->client_myds->defer_close_due_to_fast_forward = true;
return 0;
if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
if (sess->backend_closed_in_fast_forward == false) {
sess->backend_closed_in_fast_forward = true;
//cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << sess->fast_forward_grace_start_time << " to " << sess->thread->curtime << endl;
sess->fast_forward_grace_start_time = sess->thread->curtime;
sess->client_myds->defer_close_due_to_fast_forward = true;
}
//return 0;
}
}
proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p", sess, this);
@ -599,11 +602,14 @@ int MySQL_Data_Stream::read_from_net() {
int myds_errno=errno;
if (r==0 || (r==-1 && myds_errno != EINTR && myds_errno != EAGAIN)) {
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && r==0) {
if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
sess->backend_closed_in_fast_forward = true;
sess->fast_forward_grace_start_time = sess->thread->curtime;
sess->client_myds->defer_close_due_to_fast_forward = true;
return 0;
if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
if (sess->backend_closed_in_fast_forward == false) {
sess->backend_closed_in_fast_forward = true;
//cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << sess->fast_forward_grace_start_time << " to " << sess->thread->curtime << endl;
sess->fast_forward_grace_start_time = sess->thread->curtime;
sess->client_myds->defer_close_due_to_fast_forward = true;
}
//return 0;
}
}
shut_soft();
@ -639,11 +645,14 @@ int MySQL_Data_Stream::read_from_net() {
// this is a final check
// Only if the amount of data read is 0 or less, then we check POLLHUP
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward) {
if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
sess->backend_closed_in_fast_forward = true;
sess->fast_forward_grace_start_time = sess->thread->curtime;
sess->client_myds->defer_close_due_to_fast_forward = true;
return 0;
if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) {
if (sess->backend_closed_in_fast_forward == false) {
sess->backend_closed_in_fast_forward = true;
//cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << sess->fast_forward_grace_start_time << " to " << sess->thread->curtime << endl;
sess->fast_forward_grace_start_time = sess->thread->curtime;
sess->client_myds->defer_close_due_to_fast_forward = true;
}
//return 0;
}
}
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d", sess, this, revents, r);
@ -789,6 +798,15 @@ void MySQL_Data_Stream::set_pollout() {
_pollfd->events = myconn->wait_events;
} else {
_pollfd->events = POLLIN;
if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && sess->backend_closed_in_fast_forward == true) {
// this is a fast forward session where the backend connection was already closed
// if we set POLLIN : the thread will spin on poll() until the socket is closed
// if we do not set POLLIN : we won't be able to timeout
if (sess->thread->curtime - sess->fast_forward_grace_start_time < (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) {
// for the reason listed above, we remove POLLIN unless the timeout has reached
_pollfd->events = 0;
}
}
//if (PSarrayOUT->len || available_data_out() || queueOUT.partial || (encrypted && !SSL_is_init_finished(ssl))) {
if (PSarrayOUT->len || available_data_out() || queueOUT.partial) {
_pollfd->events |= POLLOUT;

@ -121,6 +121,7 @@ tests: tests-cpp \
reg_test_stmt_resultset_err_no_rows_libmysql-t \
prepare_statement_err3024_libmysql-t \
prepare_statement_err3024_async-t \
fast_forward_grace_close_libmysql-t \
reg_test_mariadb_stmt_store_result_libmysql-t \
reg_test_mariadb_stmt_store_result_async-t
tests:
@ -229,6 +230,9 @@ mysql_reconnect_libmariadb-t: mysql_reconnect.cpp $(TAP_LDIR)/libtap.so
mysql_reconnect_libmysql-t: mysql_reconnect.cpp $(TAP_LDIR)/libtap_mysql8.a
$(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@
fast_forward_grace_close_libmysql-t: fast_forward_grace_close.cpp $(TAP_LDIR)/libtap_mysql8.a
$(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@
test_match_eof_conn_cap_libmysql-t: test_match_eof_conn_cap.cpp $(TAP_LDIR)/libtap_mysql8.a
$(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@

@ -0,0 +1,190 @@
#include <algorithm>
#include <string>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <vector>
#include <tuple>
#include <sys/types.h>
#include <sys/wait.h>
#include <signal.h>
#include <ctime>
#include "mysql.h"
#include "mysqld_error.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
#ifndef BINLOG_DUMP_NON_BLOCK
#define BINLOG_DUMP_NON_BLOCK 1
#endif // BINLOG_DUMP_NON_BLOCK
using std::string;
#if 0
// on the first iteration we used pv to throttle traffic
// Function to check if pv is available
bool is_pv_available() {
return system("which pv > /dev/null 2>&1") == 0;
}
#endif // 0
int main() {
// 0 means no limit
// we skip 8 because or the edge
std::vector<long> target_times = {0, 1, 2, 3, 4, 5, 6, 7, /* 8, */ 20, 30, 60};
plan(9 + target_times.size());
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to get the required environmental variables.");
return -1;
}
// 1. Generate a large binlog file
MYSQL* backend_conn = mysql_init(NULL);
if (!mysql_real_connect(backend_conn, cl.host, cl.username, cl.password, "test", cl.port, NULL, 0)) {
diag("Backend connection failed: %s", mysql_error(backend_conn));
return -1;
}
ok(1, "Connected to backend server");
MYSQL_QUERY(backend_conn, "CREATE TABLE IF NOT EXISTS dummy_log_table (id INT PRIMARY KEY AUTO_INCREMENT, data LONGTEXT)");
// MYSQL_QUERY(backend_conn, "INSERT INTO dummy_log_table (data) VALUES (REPEAT('a', 1024*50))");
// MYSQL_QUERY(backend_conn, "INSERT INTO dummy_log_table (data) VALUES (REPEAT('a', 1024*50))");
// MYSQL_QUERY(backend_conn, "INSERT INTO dummy_log_table (data) VALUES (REPEAT('a', 1024*50))");
int rc = mysql_query(backend_conn, "FLUSH LOGS");
ok(rc == 0, "Generated data and flushed logs on backend");
// 2. Configure ProxySQL
MYSQL* proxysql_admin = mysql_init(NULL);
if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
diag("Admin connection failed: %s", mysql_error(proxysql_admin));
mysql_close(backend_conn);
return -1;
}
ok(1, "Connected to ProxySQL admin");
rc = mysql_query(proxysql_admin, "UPDATE global_variables SET variable_value='8000' WHERE variable_name='mysql-fast_forward_grace_close_ms'");
ok(rc == 0, "Set mysql-fast_forward_grace_close_ms=8000");
rc = mysql_query(proxysql_admin, "SET mysql-have_ssl=0");
ok(rc == 0, "Set mysql-have_ssl=0");
rc = mysql_query(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME");
ok(rc == 0, "Loaded MYSQL variables to runtime");
// 3. Get first binary log file name and estimate total bytes
string binlog_file;
long total_bytes = 0;
if (mysql_query(backend_conn, "SHOW BINARY LOGS") == 0) {
MYSQL_RES *res = mysql_store_result(backend_conn);
if (res) {
MYSQL_ROW row;
while ((row = mysql_fetch_row(res))) {
if (!binlog_file.empty() || row[0]) {
if (binlog_file.empty()) {
binlog_file = row[0];
}
total_bytes += atol(row[1]);
}
}
mysql_free_result(res);
}
}
mysql_close(backend_conn);
ok(!binlog_file.empty(), "Retrieved first binary log file name: %s", binlog_file.c_str());
diag("Estimated total bytes: %ld", total_bytes);
#if 0
// 4. Check if pv is available
if (!is_pv_available()) {
diag("pv is not available, cannot run the test");
mysql_close(proxysql_admin);
return -1;
}
ok(1, "pv is available");
#endif // 0
// 5. Run 5 iterations with different throttling using MariaDB RPL API
for (int i = 0; i < target_times.size() ; i++) {
// Connect for binlog reading
MYSQL* binlog_conn = mysql_init(NULL);
if (!mysql_real_connect(binlog_conn, cl.host, cl.mysql_username, cl.mysql_password, NULL, cl.port, NULL, 0)) {
diag("Binlog connection failed for iteration %d: %s", i, mysql_error(binlog_conn));
mysql_close(proxysql_admin);
return -1;
}
MYSQL_QUERY(binlog_conn, "SET @source_binlog_checksum='NONE'");
MYSQL_RPL rpl {};
rpl.file_name = const_cast<char*>(binlog_file.c_str());
rpl.start_position = 4;
rpl.server_id = 12345;
rpl.flags = BINLOG_DUMP_NON_BLOCK;
int rc = mysql_binlog_open(binlog_conn, &rpl);
if (rc != 0) {
diag("mysql_binlog_open failed for iteration %d: %s", i, mysql_error(binlog_conn));
mysql_close(binlog_conn);
mysql_close(proxysql_admin);
return -1;
}
diag("mysql_binlog_open succeeded for iteration %d", i);
long bytes_read = 0;
time_t start_time = time(NULL);
long target_rate = (i == 0) ? 0 : total_bytes / target_times[i];
bool reached_EOF = false;
while (true) {
rc = mysql_binlog_fetch(binlog_conn, &rpl);
if (rc != 0) break;
long tmp_bytes_read = bytes_read;
bytes_read += rpl.size;
const long chunk_size = 1024*1024;
if (bytes_read/chunk_size > tmp_bytes_read/chunk_size) {
diag("Bytes read: %ld", bytes_read);
}
if (target_rate > 0) {
usleep((rpl.size * 1000000LL) / target_rate);
}
if (rpl.size == 0) {
//when size is 0 , we reached EOF
reached_EOF = true;
break;
}
}
if (target_times[i] <= 8) {
ok(reached_EOF == true , "Reached EOF: %s . Total Bytes read: %ld", (reached_EOF == true ? "TRUE" : "FALSE") , bytes_read);
} else {
diag("Target time greater than grace time, it should fail -- Reached EOF should be FALSE");
ok(reached_EOF == false , "Reached EOF: %s . Total Bytes read: %ld", (reached_EOF == true ? "TRUE" : "FALSE") , bytes_read);
}
time_t end_time = time(NULL);
diag("Binlog fetch ended with rc=%d, error=%s", rc, (rc == 0 ? "None" : mysql_error(binlog_conn)));
mysql_binlog_close(binlog_conn, &rpl);
mysql_close(binlog_conn);
long taken = (long)(end_time - start_time);
char desc[50];
if (i == 0) strcpy(desc, "no limit");
else sprintf(desc, "target %ld s", target_times[i]);
diag("Iteration %d (%s): time %ld seconds, bytes %ld", i, desc, taken, bytes_read);
//ok(1, "Iteration %d completed", i);
}
// 8. Cleanup
rc = mysql_query(proxysql_admin, "UPDATE global_variables SET variable_value='0' WHERE variable_name='mysql-fast_forward_grace_close_ms'");
ok(rc == 0, "Reset mysql-fast_forward_grace_close_ms");
rc = mysql_query(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME");
ok(rc == 0, "Loaded MYSQL variables to runtime for cleanup");
backend_conn = mysql_init(NULL);
mysql_real_connect(backend_conn, cl.host, cl.username, cl.password, "test", cl.port, NULL, 0);
MYSQL_QUERY(backend_conn, "DROP TABLE dummy_log_table");
mysql_close(backend_conn);
mysql_close(proxysql_admin);
return exit_status();
}

@ -20,7 +20,8 @@ int main(int argc, char** argv) {
}
const std::string user = "root";
const std::string test_deps_path = getenv("TEST_DEPS");
const char * tdp = getenv("TEST_DEPS");
const std::string test_deps_path = ( tdp == nullptr ? "" : std::string(tdp) );
const int mysqlbinlog_res = system((test_deps_path + "/mysqlbinlog mysql1-bin.000001 "
"--read-from-remote-server --user " + user + " --password=" + user +

Loading…
Cancel
Save