diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 86c281188..337a1a743 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -151,6 +151,7 @@ class MySQL_Data_Stream unsigned int connect_tries; int query_retries_on_failure; int connect_retries_on_failure; + uint32_t unexp_com_pings { 0 }; enum mysql_data_stream_status DSS; enum MySQL_DS_type myds_type; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 7391b5ab5..0caccecef 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -80,6 +80,7 @@ enum MySQL_Thread_status_variable { st_var_backend_lagging_during_query, st_var_backend_offline_during_query, st_var_unexpected_com_quit, + st_var_unexpected_com_ping, st_var_unexpected_packet, st_var_killed_connections, st_var_killed_queries, @@ -280,6 +281,7 @@ struct p_th_counter { queries_with_max_lag_ms__delayed, queries_with_max_lag_ms__total_wait_time_us, mysql_unexpected_frontend_com_quit, + mysql_unexpected_frontend_com_ping, hostgroup_locked_set_cmds, hostgroup_locked_queries, mysql_unexpected_frontend_packets, diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 17b452e3d..e64513056 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -7768,6 +7768,37 @@ void MySQL_Session::RequestEnd(MySQL_Data_Stream *myds,const unsigned int myerrn } myds->free_mysql_real_query(); } + + // NOTE: Unexpected-Ping-Handling: Section 2-2 + /////////////////////////////////////////////////////////////////////////////////////////////// + // Implements part-2 of the temporary workaround for the handling of unexpected 'COM_PING' packets + // received during query processing, while a resultset is yet being streamed to the client. We send a + // number of OK packets matching the 'queued' pings. This should ALWAYS be done before the session status + // goes back to 'WAITING_CLIENT_DATA', otherwise the flow between client-server could be compromised. By + // always sending the OK packets before this transisiton we ensure that the client doesn't hang waiting + // for response. + // + // @note This is a "temporary" solution that should be removed if packet queueing is implemented, since + // it will make the 'unexp_com_pings' field obsolete. + /////////////////////////////////////////////////////////////////////////////////////////////// + if (client_myds->unexp_com_pings) { + client_myds->setDSS_STATE_QUERY_SENT_NET(); + + while (client_myds->unexp_com_pings) { + proxy_warning("Sending OK packet for unexpected COM_PING packet\n"); + + client_myds->pkt_sid += 1; + uint16_t st = NumActiveTransactions() ? SERVER_STATUS_IN_TRANS : 0; + if (autocommit) { st |= SERVER_STATUS_AUTOCOMMIT; } + + client_myds->myprot.generate_pkt_OK(true, NULL, NULL, client_myds->pkt_sid, 0, 0, st, 0, NULL); + client_myds->unexp_com_pings--; + } + + client_myds->DSS = STATE_SLEEP; + } + /////////////////////////////////////////////////////////////////////////// + if (session_fast_forward == SESSION_FORWARD_TYPE_NONE) { // reset status of the session status=WAITING_CLIENT_DATA; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 330c37af0..5ca0dfd42 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -153,6 +153,7 @@ mythr_st_vars_t MySQL_Thread_status_variables_counter_array[] { { st_var_hostgroup_locked_set_cmds, p_th_counter::hostgroup_locked_set_cmds, (char *)"hostgroup_locked_set_cmds" }, { st_var_hostgroup_locked_queries, p_th_counter::hostgroup_locked_queries, (char *)"hostgroup_locked_queries" }, { st_var_unexpected_com_quit, p_th_counter::mysql_unexpected_frontend_com_quit,(char *)"mysql_unexpected_frontend_com_quit" }, + { st_var_unexpected_com_ping, p_th_counter::mysql_unexpected_frontend_com_ping,(char *)"mysql_unexpected_frontend_com_ping" }, { st_var_unexpected_packet, p_th_counter::mysql_unexpected_frontend_packets,(char *)"mysql_unexpected_frontend_packets" }, { st_var_queries_with_max_lag_ms__total_wait_time_us , p_th_counter::queries_with_max_lag_ms__total_wait_time_us, (char *)"queries_with_max_lag_ms__total_wait_time_us" }, { st_var_queries_with_max_lag_ms__delayed , p_th_counter::queries_with_max_lag_ms__delayed, (char *)"queries_with_max_lag_ms__delayed" }, @@ -750,6 +751,12 @@ th_metrics_map = std::make_tuple( "Total waited time due to connection selection because of 'max_lag' annotation.", metric_tags {} ), + std::make_tuple ( + p_th_counter::mysql_unexpected_frontend_com_ping, + "proxysql_mysql_unexpected_frontend_com_ping_total", + "Unexpected 'COM_PING' received from the client.", + metric_tags {} + ), std::make_tuple ( p_th_counter::mysql_unexpected_frontend_com_quit, "proxysql_mysql_unexpected_frontend_com_quit_total", diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 22e875da1..bb28d9194 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -475,6 +475,38 @@ void MySQL_Data_Stream::shut_hard() { */ void MySQL_Data_Stream::check_data_flow() { if ( (PSarrayIN->len || queue_data(queueIN) ) && ( PSarrayOUT->len || queue_data(queueOUT) ) ){ + // NOTE: Unexpected-Ping-Handling: Section 1-2 + /////////////////////////////////////////////////////////////////////////////////////////////// + // Implements part-1 of the temporary workaround for the handling of unexpected 'COM_PING' packets + // received during query processing, while a resultset is yet being streamed to the client. Received + // 'COM_PING' packets are queued in the form of a counter. This counter is later used to sent the + // corresponding number of 'OK' packets to the client. This should be ALWAYS done before + // 'MySQL_Session' transitions back to 'WAITING_CLIENT_DATA'. + // + // @note This is a "temporary" solution that should be removed if packet queueing is implemented, since + // it will make the 'unexp_com_pings' field obsolete. + /////////////////////////////////////////////////////////////////////////////////////////////// + if (PSarrayIN->len >= 1 && PSarrayIN->pdata[0].size == 5) { + const uint8_t c = *(static_cast(PSarrayIN->pdata[0].ptr) + sizeof(mysql_hdr)); + + if (c == _MYSQL_COM_PING && this->sess->status != WAITING_CLIENT_DATA) { + proxy_warning("Handling unexpected COM_PING packet\n"); + + // Queue the COM_PING for later handling at MySQL_Session level + this->unexp_com_pings += 1; + this->sess->thread->status_variables.stvar[st_var_unexpected_com_ping] += 1; + + // Discard the packet before session attempts to handle it + PtrSize_t pkt {}; + PSarrayIN->remove_index(0, &pkt); + l_free(pkt.size, pkt.ptr); + + // Return without further checks + return; + } + } + /////////////////////////////////////////////////////////////////////////////////////////////// + if (sess && sess->status == FAST_FORWARD && sess->session_fast_forward == SESSION_FORWARD_TYPE_PERMANENT) { // Permanent fast-forward sessions: log warning but continue proxy_warning("Session=%p, DataStream=%p -- Data at both ends of a MySQL data stream: IN <%d bytes %d packets> , OUT <%d bytes %d packets>\n", sess, this, queue_data(queueIN), PSarrayIN->len, queue_data(queueOUT), PSarrayOUT->len);