From 44aa606caa0a8fe9f1a9cba2439d4241262d8893 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Tue, 11 Nov 2025 09:23:04 +0000 Subject: [PATCH] Implement fast forward grace close feature to prevent data loss Problem: In fast forward mode, ProxySQL forwards packets directly from client to backend without buffering them. If the backend connection closes unexpectedly (e.g., due to server crash, network failure, or other issues), ProxySQL immediately closes the client session. This can result in data loss because the client may have sent additional data that hasn't been fully transmitted yet, as ProxySQL does not wait for the output buffers to drain. Solution: Implement a configurable grace period for session closure in fast forward mode. When the backend closes unexpectedly, instead of closing the session immediately, ProxySQL waits for a configurable timeout (fast_forward_grace_close_ms, default 5000ms) to allow any pending client output data to be sent. During this grace period: - If the client output buffers become empty, the session closes gracefully. - If the timeout expires, the session closes anyway to prevent indefinite hanging. Changes: - Added global variable mysql_thread___fast_forward_grace_close_ms (0-3600000ms) - Added session flags: backend_closed_in_fast_forward, fast_forward_grace_start_time - Added data stream flag: defer_close_due_to_fast_forward - Modified MySQL_Data_Stream::read_from_net() to detect backend EOF and initiate grace close if client buffers are not empty - Modified MySQL_Session::handler() FAST_FORWARD case to implement grace close logic with timeout and buffer checks - Added extensive inline documentation explaining the feature and its mechanics This prevents data loss in fast forward scenarios while maintaining bounded session lifetime. --- include/MySQL_Data_Stream.h | 3 +++ include/MySQL_Session.h | 4 ++++ include/MySQL_Thread.h | 3 +++ include/proxysql_structs.h | 2 ++ lib/MySQL_Session.cpp | 23 +++++++++++++++++++++++ lib/MySQL_Thread.cpp | 8 ++++++++ lib/mysql_data_stream.cpp | 28 ++++++++++++++++++++++++++-- 7 files changed, 69 insertions(+), 2 deletions(-) diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index c513aac2f..86c281188 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -176,6 +176,9 @@ class MySQL_Data_Stream char kill_type; bool encrypted; + // defer_close_due_to_fast_forward: Flag to prevent immediate closure of data stream + // during fast forward grace close, allowing buffers to drain. + bool defer_close_due_to_fast_forward; bool net_failure; uint8_t pkt_sid; diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 3319c5896..9d4d6fe39 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -403,6 +403,10 @@ class MySQL_Session: public Base_Sessionserver_myds->PSarrayOUT->add(pkt.ptr, pkt.size); + // Fast Forward Grace Close Logic: + // If the backend closed during fast forward mode, we defer session closure to allow + // pending client output buffers to drain, preventing data loss. + // Detect if backend closed during fast forward + if (mybe->server_myds->status == MYSQL_SERVER_STATUS_OFFLINE_HARD || mybe->server_myds->fd == -1) { + if (!backend_closed_in_fast_forward) { + backend_closed_in_fast_forward = true; + fast_forward_grace_start_time = thread->curtime; + } + } + if (backend_closed_in_fast_forward) { + if (client_myds->PSarrayOUT->len == 0 && (client_myds->queueOUT.head - client_myds->queueOUT.tail) == 0) { + // buffers empty, close + handler_ret = -1; + } else if (thread->curtime - fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) { + // timeout, close + handler_ret = -1; + } + } break; // This state is required because it covers the following situation: // 1. A new connection is created by a client and the 'FAST_FORWARD' mode is enabled. diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 8abfdb80d..f9fb9688d 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -112,6 +112,8 @@ extern MySQL_Threads_Handler *GloMTH; extern MySQL_Monitor *GloMyMon; extern MySQL_Logger *GloMyLogger; +//__thread int mysql_thread___fast_forward_grace_close_ms; + typedef struct mythr_st_vars { enum MySQL_Thread_status_variable v_idx; p_th_counter::metric m_idx; @@ -511,6 +513,7 @@ static char * mysql_thread_variables_names[]= { (char *)"proxy_protocol_networks", (char *)"protocol_compression_level", (char *)"ignore_min_gtid_annotations", + (char *)"fast_forward_grace_close_ms", NULL }; @@ -1073,8 +1076,12 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.default_variables[i]=strdup(mysql_tracked_variables[i].default_value); } variables.default_session_track_gtids=strdup((char *)MYSQL_DEFAULT_SESSION_TRACK_GTIDS); + // fast_forward_grace_close_ms: Configurable timeout (in milliseconds) for the "fast forward grace close" feature. + // This feature prevents data loss in fast forward mode by deferring session closure when the backend + // connection closes unexpectedly, allowing time for pending client output to drain. variables.ping_interval_server_msec=10000; variables.ping_timeout_server=200; + variables.fast_forward_grace_close_ms=5000; variables.default_schema=strdup((char *)"information_schema"); variables.handle_unknown_charset=1; variables.interfaces=strdup((char *)""); @@ -2283,6 +2290,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["handle_unknown_charset"] = make_tuple(&variables.handle_unknown_charset, 0, HANDLE_UNKNOWN_CHARSET__MAX_HANDLE_VALUE, false); VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false); VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false); + VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false); VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false); VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false); VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false); diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 17a2c9f16..d307183b5 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -576,7 +576,15 @@ int MySQL_Data_Stream::read_from_net() { } else { // Shutdown if we either received the EOF, or operation failed with non-retryable error. if (ssl_recv_bytes==0 || (ssl_recv_bytes==-1 && errno != EINTR && errno != EAGAIN)) { - proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p\n", sess, this); + if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && ssl_recv_bytes==0) { + if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + sess->backend_closed_in_fast_forward = true; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + return 0; + } + } + proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p", sess, this); shut_soft(); return -1; } @@ -590,6 +598,14 @@ int MySQL_Data_Stream::read_from_net() { if (encrypted==false) { int myds_errno=errno; if (r==0 || (r==-1 && myds_errno != EINTR && myds_errno != EAGAIN)) { + if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && r==0) { + if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + sess->backend_closed_in_fast_forward = true; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + return 0; + } + } shut_soft(); } } else { @@ -622,7 +638,15 @@ int MySQL_Data_Stream::read_from_net() { if ( (revents & POLLHUP) ) { // this is a final check // Only if the amount of data read is 0 or less, then we check POLLHUP - proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d\n", sess, this, revents, r); + if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward) { + if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + sess->backend_closed_in_fast_forward = true; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + return 0; + } + } + proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d", sess, this, revents, r); shut_soft(); } } else {