diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index c513aac2f..86c281188 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -176,6 +176,9 @@ class MySQL_Data_Stream char kill_type; bool encrypted; + // defer_close_due_to_fast_forward: Flag to prevent immediate closure of data stream + // during fast forward grace close, allowing buffers to drain. + bool defer_close_due_to_fast_forward; bool net_failure; uint8_t pkt_sid; diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 3319c5896..9d4d6fe39 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -403,6 +403,10 @@ class MySQL_Session: public Base_Sessionserver_myds->PSarrayOUT->add(pkt.ptr, pkt.size); + // Fast Forward Grace Close Logic: + // If the backend closed during fast forward mode, we defer session closure to allow + // pending client output buffers to drain, preventing data loss. + // Detect if backend closed during fast forward + if (mybe->server_myds->status == MYSQL_SERVER_STATUS_OFFLINE_HARD || mybe->server_myds->fd == -1) { + if (!backend_closed_in_fast_forward) { + backend_closed_in_fast_forward = true; + fast_forward_grace_start_time = thread->curtime; + } + } + if (backend_closed_in_fast_forward) { + if (client_myds->PSarrayOUT->len == 0 && (client_myds->queueOUT.head - client_myds->queueOUT.tail) == 0) { + // buffers empty, close + handler_ret = -1; + } else if (thread->curtime - fast_forward_grace_start_time > (unsigned long long)mysql_thread___fast_forward_grace_close_ms * 1000) { + // timeout, close + handler_ret = -1; + } + } break; // This state is required because it covers the following situation: // 1. A new connection is created by a client and the 'FAST_FORWARD' mode is enabled. diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 8abfdb80d..f9fb9688d 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -112,6 +112,8 @@ extern MySQL_Threads_Handler *GloMTH; extern MySQL_Monitor *GloMyMon; extern MySQL_Logger *GloMyLogger; +//__thread int mysql_thread___fast_forward_grace_close_ms; + typedef struct mythr_st_vars { enum MySQL_Thread_status_variable v_idx; p_th_counter::metric m_idx; @@ -511,6 +513,7 @@ static char * mysql_thread_variables_names[]= { (char *)"proxy_protocol_networks", (char *)"protocol_compression_level", (char *)"ignore_min_gtid_annotations", + (char *)"fast_forward_grace_close_ms", NULL }; @@ -1073,8 +1076,12 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.default_variables[i]=strdup(mysql_tracked_variables[i].default_value); } variables.default_session_track_gtids=strdup((char *)MYSQL_DEFAULT_SESSION_TRACK_GTIDS); + // fast_forward_grace_close_ms: Configurable timeout (in milliseconds) for the "fast forward grace close" feature. + // This feature prevents data loss in fast forward mode by deferring session closure when the backend + // connection closes unexpectedly, allowing time for pending client output to drain. variables.ping_interval_server_msec=10000; variables.ping_timeout_server=200; + variables.fast_forward_grace_close_ms=5000; variables.default_schema=strdup((char *)"information_schema"); variables.handle_unknown_charset=1; variables.interfaces=strdup((char *)""); @@ -2283,6 +2290,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["handle_unknown_charset"] = make_tuple(&variables.handle_unknown_charset, 0, HANDLE_UNKNOWN_CHARSET__MAX_HANDLE_VALUE, false); VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false); VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false); + VariablesPointers_int["fast_forward_grace_close_ms"] = make_tuple(&variables.fast_forward_grace_close_ms, 0, 3600*1000, false); VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false); VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false); VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false); diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 17a2c9f16..d307183b5 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -576,7 +576,15 @@ int MySQL_Data_Stream::read_from_net() { } else { // Shutdown if we either received the EOF, or operation failed with non-retryable error. if (ssl_recv_bytes==0 || (ssl_recv_bytes==-1 && errno != EINTR && errno != EAGAIN)) { - proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p\n", sess, this); + if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && ssl_recv_bytes==0) { + if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + sess->backend_closed_in_fast_forward = true; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + return 0; + } + } + proxy_debug(PROXY_DEBUG_NET, 5, "Received EOF, shutting down soft socket -- Session=%p, Datastream=%p", sess, this); shut_soft(); return -1; } @@ -590,6 +598,14 @@ int MySQL_Data_Stream::read_from_net() { if (encrypted==false) { int myds_errno=errno; if (r==0 || (r==-1 && myds_errno != EINTR && myds_errno != EAGAIN)) { + if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward && r==0) { + if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + sess->backend_closed_in_fast_forward = true; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + return 0; + } + } shut_soft(); } } else { @@ -622,7 +638,15 @@ int MySQL_Data_Stream::read_from_net() { if ( (revents & POLLHUP) ) { // this is a final check // Only if the amount of data read is 0 or less, then we check POLLHUP - proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d\n", sess, this, revents, r); + if (myds_type == MYDS_BACKEND && sess && sess->session_fast_forward) { + if (sess->client_myds->PSarrayOUT->len > 0 || queue_data(sess->client_myds->queueOUT) > 0) { + sess->backend_closed_in_fast_forward = true; + sess->fast_forward_grace_start_time = sess->thread->curtime; + sess->client_myds->defer_close_due_to_fast_forward = true; + return 0; + } + } + proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft. revents=%d , bytes read = %d", sess, this, revents, r); shut_soft(); } } else {