From ae93966603274852a1ec602a3efd020a3d0fad99 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Wed, 12 Nov 2025 17:31:13 +0100 Subject: [PATCH] Add TAP test for fast forward grace close feature - Rename and modify test to use MySQL C API mysql_binlog_* functions - Implement throttled binlog reading with 5 iterations (no limit, 2s, 5s, 20s, 60s targets) - Add diagnostics for debugging binlog fetch issues - Set RPL options for file, position, server_id, and non-blocking flag - Update Makefile to compile with MySQL client library --- lib/MySQL_Session.cpp | 7 +- lib/MySQL_Thread.cpp | 37 +++- lib/mysql_data_stream.cpp | 48 +++-- test/tap/tests/Makefile | 4 + test/tap/tests/fast_forward_grace_close.cpp | 190 ++++++++++++++++++ ...com_binlog_dump_enables_fast_forward-t.cpp | 3 +- 6 files changed, 266 insertions(+), 23 deletions(-) create mode 100644 test/tap/tests/fast_forward_grace_close.cpp diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index ab79d532a..c42333cbe 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -3789,11 +3789,16 @@ int MySQL_Session::GPFC_Statuses2(bool& wrong_pass, PtrSize_t& pkt) { if (mybe->server_myds->status == MYSQL_SERVER_STATUS_OFFLINE_HARD || mybe->server_myds->fd == -1) { if (!backend_closed_in_fast_forward) { backend_closed_in_fast_forward = true; + cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << fast_forward_grace_start_time << " to " << thread->curtime << endl; fast_forward_grace_start_time = thread->curtime; } } if (backend_closed_in_fast_forward) { - if (client_myds->PSarrayOUT->len == 0 && (client_myds->queueOUT.head - client_myds->queueOUT.tail) == 0) { + if ( + ( mybe->server_myds == nullptr || ( mybe->server_myds && mybe->server_myds->PSarrayIN->len == 0 ) ) + && + (client_myds->PSarrayOUT->len == 0 && (client_myds->queueOUT.head - client_myds->queueOUT.tail) == 0) + ) { // buffers empty, close handler_ret = -1; } else if (thread->curtime - fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) { diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index f9fb9688d..e299d3dab 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -112,8 +112,6 @@ extern MySQL_Threads_Handler *GloMTH; extern MySQL_Monitor *GloMyMon; extern MySQL_Logger *GloMyLogger; -//__thread int mysql_thread___fast_forward_grace_close_ms; - typedef struct mythr_st_vars { enum MySQL_Thread_status_variable v_idx; p_th_counter::metric m_idx; @@ -2290,7 +2288,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["handle_unknown_charset"] = make_tuple(&variables.handle_unknown_charset, 0, HANDLE_UNKNOWN_CHARSET__MAX_HANDLE_VALUE, false); VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false); VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false); - VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false); + VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false); VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false); VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false); VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false); @@ -3756,7 +3754,25 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned // if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library if (myds->sess->session_fast_forward) { // if fast forward if (myds->myds_type==MYDS_BACKEND) { // and backend - myds->sess->set_unhealthy(); // set unhealthy + // myds->sess->set_unhealthy(); // set unhealthy + // Fast Forward Grace Close Logic: + // If the backend closed during fast forward mode, we defer session closure to allow + // pending client output buffers to drain, preventing data loss. + // Detect if backend closed during fast forward + if (myds->sess->backend_closed_in_fast_forward == false) { + myds->sess->backend_closed_in_fast_forward = true; + //cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << myds->sess->fast_forward_grace_start_time << " to " << curtime << endl; + myds->sess->fast_forward_grace_start_time = curtime; + } + if (myds->sess->backend_closed_in_fast_forward) { + if (myds->PSarrayIN->len == 0 && myds->sess->client_myds->PSarrayOUT->len == 0 && (myds->sess->client_myds->queueOUT.head - myds->sess->client_myds->queueOUT.tail) == 0) { + // buffers empty, close + myds->sess->set_unhealthy(); // set unhealthy + } else if (curtime - myds->sess->fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) { + // timeout, close + myds->sess->set_unhealthy(); // set unhealthy + } + } } } } @@ -3933,9 +3949,17 @@ void MySQL_Thread::ProcessAllSessions_Healthy0(MySQL_Session *sess, unsigned int sess->client_myds->addr.port ); } else { + string extra_info = ""; + if (sess->backend_closed_in_fast_forward == true) { + unsigned long long lapse = curtime - sess->fast_forward_grace_start_time; + extra_info = "Yes , " + to_string(lapse/1000) + " ms ago"; + } else { + extra_info = "No"; + } proxy_warning( - "Closing 'fast_forward' client connection %s:%d\n", sess->client_myds->addr.addr, - sess->client_myds->addr.port + "Closing 'fast_forward' client connection %s:%d . Backend already close: %s\n", + sess->client_myds->addr.addr, sess->client_myds->addr.port, + extra_info.c_str() ); } } @@ -4145,6 +4169,7 @@ void MySQL_Thread::refresh_variables() { REFRESH_VARIABLE_INT(connect_timeout_server); REFRESH_VARIABLE_INT(connect_timeout_server_max); REFRESH_VARIABLE_INT(free_connections_pct); + REFRESH_VARIABLE_INT(fast_forward_grace_close_ms); #ifdef IDLE_THREADS REFRESH_VARIABLE_INT(session_idle_ms); #endif // IDLE_THREADS diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index d307183b5..8616746fc 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -577,11 +577,14 @@ int MySQL_Data_Stream::read_from_net() { // Shutdown if we either received the EOF, or operation failed with non-retryable error. if (ssl_recv_bytes==0 || (ssl_recv_bytes==-1 && errno != EINTR && errno != EAGAIN)) { if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && ssl_recv_bytes==0) { - if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { - sess->backend_closed_in_fast_forward = true; - sess->fast_forward_grace_start_time = sess->thread->curtime; - sess->client_myds->defer_close_due_to_fast_forward = true; - return 0; + if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + if (sess->backend_closed_in_fast_forward == false) { + sess->backend_closed_in_fast_forward = true; + //cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << sess->fast_forward_grace_start_time << " to " << sess->thread->curtime << endl; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + } + //return 0; } } proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p", sess, this); @@ -599,11 +602,14 @@ int MySQL_Data_Stream::read_from_net() { int myds_errno=errno; if (r==0 || (r==-1 && myds_errno != EINTR && myds_errno != EAGAIN)) { if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && r==0) { - if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { - sess->backend_closed_in_fast_forward = true; - sess->fast_forward_grace_start_time = sess->thread->curtime; - sess->client_myds->defer_close_due_to_fast_forward = true; - return 0; + if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + if (sess->backend_closed_in_fast_forward == false) { + sess->backend_closed_in_fast_forward = true; + //cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << sess->fast_forward_grace_start_time << " to " << sess->thread->curtime << endl; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + } + //return 0; } } shut_soft(); @@ -639,11 +645,14 @@ int MySQL_Data_Stream::read_from_net() { // this is a final check // Only if the amount of data read is 0 or less, then we check POLLHUP if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward) { - if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { - sess->backend_closed_in_fast_forward = true; - sess->fast_forward_grace_start_time = sess->thread->curtime; - sess->client_myds->defer_close_due_to_fast_forward = true; - return 0; + if (PSarrayIN->len > 0 || sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + if (sess->backend_closed_in_fast_forward == false) { + sess->backend_closed_in_fast_forward = true; + //cerr << __FILE__ << ":" << __LINE__ << " grace_start_time from " << sess->fast_forward_grace_start_time << " to " << sess->thread->curtime << endl; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + } + //return 0; } } proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d", sess, this, revents, r); @@ -789,6 +798,15 @@ void MySQL_Data_Stream::set_pollout() { _pollfd->events = myconn->wait_events; } else { _pollfd->events = POLLIN; + if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && sess->backend_closed_in_fast_forward == true) { + // this is a fast forward session where the backend connection was already closed + // if we set POLLIN : the thread will spin on poll() until the socket is closed + // if we do not set POLLIN : we won't be able to timeout + if (sess->thread->curtime - sess->fast_forward_grace_start_time < (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) { + // for the reason listed above, we remove POLLIN unless the timeout has reached + _pollfd->events = 0; + } + } //if (PSarrayOUT->len || available_data_out() || queueOUT.partial || (encrypted && !SSL_is_init_finished(ssl))) { if (PSarrayOUT->len || available_data_out() || queueOUT.partial) { _pollfd->events |= POLLOUT; diff --git a/test/tap/tests/Makefile b/test/tap/tests/Makefile index ae8833715..136d7db01 100644 --- a/test/tap/tests/Makefile +++ b/test/tap/tests/Makefile @@ -121,6 +121,7 @@ tests: tests-cpp \ reg_test_stmt_resultset_err_no_rows_libmysql-t \ prepare_statement_err3024_libmysql-t \ prepare_statement_err3024_async-t \ + fast_forward_grace_close_libmysql-t \ reg_test_mariadb_stmt_store_result_libmysql-t \ reg_test_mariadb_stmt_store_result_async-t tests: @@ -229,6 +230,9 @@ mysql_reconnect_libmariadb-t: mysql_reconnect.cpp $(TAP_LDIR)/libtap.so mysql_reconnect_libmysql-t: mysql_reconnect.cpp $(TAP_LDIR)/libtap_mysql8.a $(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@ +fast_forward_grace_close_libmysql-t: fast_forward_grace_close.cpp $(TAP_LDIR)/libtap_mysql8.a + $(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@ + test_match_eof_conn_cap_libmysql-t: test_match_eof_conn_cap.cpp $(TAP_LDIR)/libtap_mysql8.a $(CXX) -DLIBMYSQL_HELPER8 -DDISABLE_WARNING_COUNT_LOGGING $< -I$(TEST_MYSQL8_IDIR) -I$(TEST_MYSQL8_EDIR) -L$(TEST_MYSQL8_LDIR) -lmysqlclient -ltap_mysql8 -lresolv $(CUSTOMARGS) -o $@ diff --git a/test/tap/tests/fast_forward_grace_close.cpp b/test/tap/tests/fast_forward_grace_close.cpp new file mode 100644 index 000000000..624938b91 --- /dev/null +++ b/test/tap/tests/fast_forward_grace_close.cpp @@ -0,0 +1,190 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mysql.h" +#include "mysqld_error.h" + +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +#ifndef BINLOG_DUMP_NON_BLOCK +#define BINLOG_DUMP_NON_BLOCK 1 +#endif // BINLOG_DUMP_NON_BLOCK + +using std::string; + +#if 0 + +// on the first iteration we used pv to throttle traffic +// Function to check if pv is available +bool is_pv_available() { + return system("which pv > /dev/null 2>&1") == 0; +} +#endif // 0 + +int main() { + // 0 means no limit + // we skip 8 because or the edge + std::vector target_times = {0, 1, 2, 3, 4, 5, 6, 7, /* 8, */ 20, 30, 60}; + plan(9 + target_times.size()); + + CommandLine cl; + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + // 1. Generate a large binlog file + MYSQL* backend_conn = mysql_init(NULL); + if (!mysql_real_connect(backend_conn, cl.host, cl.username, cl.password, "test", cl.port, NULL, 0)) { + diag("Backend connection failed: %s", mysql_error(backend_conn)); + return -1; + } + ok(1, "Connected to backend server"); + MYSQL_QUERY(backend_conn, "CREATE TABLE IF NOT EXISTS dummy_log_table (id INT PRIMARY KEY AUTO_INCREMENT, data LONGTEXT)"); +// MYSQL_QUERY(backend_conn, "INSERT INTO dummy_log_table (data) VALUES (REPEAT('a', 1024*50))"); +// MYSQL_QUERY(backend_conn, "INSERT INTO dummy_log_table (data) VALUES (REPEAT('a', 1024*50))"); +// MYSQL_QUERY(backend_conn, "INSERT INTO dummy_log_table (data) VALUES (REPEAT('a', 1024*50))"); + int rc = mysql_query(backend_conn, "FLUSH LOGS"); + ok(rc == 0, "Generated data and flushed logs on backend"); + + // 2. Configure ProxySQL + MYSQL* proxysql_admin = mysql_init(NULL); + if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + diag("Admin connection failed: %s", mysql_error(proxysql_admin)); + mysql_close(backend_conn); + return -1; + } + ok(1, "Connected to ProxySQL admin"); + + rc = mysql_query(proxysql_admin, "UPDATE global_variables SET variable_value='8000' WHERE variable_name='mysql-fast_forward_grace_close_ms'"); + ok(rc == 0, "Set mysql-fast_forward_grace_close_ms=8000"); + rc = mysql_query(proxysql_admin, "SET mysql-have_ssl=0"); + ok(rc == 0, "Set mysql-have_ssl=0"); + rc = mysql_query(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + ok(rc == 0, "Loaded MYSQL variables to runtime"); + + // 3. Get first binary log file name and estimate total bytes + string binlog_file; + long total_bytes = 0; + if (mysql_query(backend_conn, "SHOW BINARY LOGS") == 0) { + MYSQL_RES *res = mysql_store_result(backend_conn); + if (res) { + MYSQL_ROW row; + while ((row = mysql_fetch_row(res))) { + if (!binlog_file.empty() || row[0]) { + if (binlog_file.empty()) { + binlog_file = row[0]; + } + total_bytes += atol(row[1]); + } + } + mysql_free_result(res); + } + } + mysql_close(backend_conn); + ok(!binlog_file.empty(), "Retrieved first binary log file name: %s", binlog_file.c_str()); + diag("Estimated total bytes: %ld", total_bytes); + +#if 0 + // 4. Check if pv is available + if (!is_pv_available()) { + diag("pv is not available, cannot run the test"); + mysql_close(proxysql_admin); + return -1; + } + ok(1, "pv is available"); +#endif // 0 + + // 5. Run 5 iterations with different throttling using MariaDB RPL API + for (int i = 0; i < target_times.size() ; i++) { + // Connect for binlog reading + MYSQL* binlog_conn = mysql_init(NULL); + if (!mysql_real_connect(binlog_conn, cl.host, cl.mysql_username, cl.mysql_password, NULL, cl.port, NULL, 0)) { + diag("Binlog connection failed for iteration %d: %s", i, mysql_error(binlog_conn)); + mysql_close(proxysql_admin); + return -1; + } + + MYSQL_QUERY(binlog_conn, "SET @source_binlog_checksum='NONE'"); + MYSQL_RPL rpl {}; + rpl.file_name = const_cast(binlog_file.c_str()); + rpl.start_position = 4; + rpl.server_id = 12345; + rpl.flags = BINLOG_DUMP_NON_BLOCK; + int rc = mysql_binlog_open(binlog_conn, &rpl); + if (rc != 0) { + diag("mysql_binlog_open failed for iteration %d: %s", i, mysql_error(binlog_conn)); + mysql_close(binlog_conn); + mysql_close(proxysql_admin); + return -1; + } + diag("mysql_binlog_open succeeded for iteration %d", i); + + long bytes_read = 0; + time_t start_time = time(NULL); + long target_rate = (i == 0) ? 0 : total_bytes / target_times[i]; + bool reached_EOF = false; + + while (true) { + rc = mysql_binlog_fetch(binlog_conn, &rpl); + if (rc != 0) break; + long tmp_bytes_read = bytes_read; + bytes_read += rpl.size; + const long chunk_size = 1024*1024; + if (bytes_read/chunk_size > tmp_bytes_read/chunk_size) { + diag("Bytes read: %ld", bytes_read); + } + if (target_rate > 0) { + usleep((rpl.size * 1000000LL) / target_rate); + } + if (rpl.size == 0) { + //when size is 0 , we reached EOF + reached_EOF = true; + break; + } + } + if (target_times[i] <= 8) { + ok(reached_EOF == true , "Reached EOF: %s . Total Bytes read: %ld", (reached_EOF == true ? "TRUE" : "FALSE") , bytes_read); + } else { + diag("Target time greater than grace time, it should fail -- Reached EOF should be FALSE"); + ok(reached_EOF == false , "Reached EOF: %s . Total Bytes read: %ld", (reached_EOF == true ? "TRUE" : "FALSE") , bytes_read); + } + time_t end_time = time(NULL); + diag("Binlog fetch ended with rc=%d, error=%s", rc, (rc == 0 ? "None" : mysql_error(binlog_conn))); + mysql_binlog_close(binlog_conn, &rpl); + mysql_close(binlog_conn); + + long taken = (long)(end_time - start_time); + char desc[50]; + if (i == 0) strcpy(desc, "no limit"); + else sprintf(desc, "target %ld s", target_times[i]); + diag("Iteration %d (%s): time %ld seconds, bytes %ld", i, desc, taken, bytes_read); + //ok(1, "Iteration %d completed", i); + } + + // 8. Cleanup + rc = mysql_query(proxysql_admin, "UPDATE global_variables SET variable_value='0' WHERE variable_name='mysql-fast_forward_grace_close_ms'"); + ok(rc == 0, "Reset mysql-fast_forward_grace_close_ms"); + rc = mysql_query(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + ok(rc == 0, "Loaded MYSQL variables to runtime for cleanup"); + + backend_conn = mysql_init(NULL); + mysql_real_connect(backend_conn, cl.host, cl.username, cl.password, "test", cl.port, NULL, 0); + MYSQL_QUERY(backend_conn, "DROP TABLE dummy_log_table"); + mysql_close(backend_conn); + + mysql_close(proxysql_admin); + + return exit_status(); +} diff --git a/test/tap/tests/test_com_binlog_dump_enables_fast_forward-t.cpp b/test/tap/tests/test_com_binlog_dump_enables_fast_forward-t.cpp index 581cf39bf..f62f752a9 100644 --- a/test/tap/tests/test_com_binlog_dump_enables_fast_forward-t.cpp +++ b/test/tap/tests/test_com_binlog_dump_enables_fast_forward-t.cpp @@ -20,7 +20,8 @@ int main(int argc, char** argv) { } const std::string user = "root"; - const std::string test_deps_path = getenv("TEST_DEPS"); + const char * tdp = getenv("TEST_DEPS"); + const std::string test_deps_path = ( tdp == nullptr ? "" : std::string(tdp) ); const int mysqlbinlog_res = system((test_deps_path + "/mysqlbinlog mysql1-bin.000001 " "--read-from-remote-server --user " + user + " --password=" + user +