diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 3b07fc741..186bc25fc 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -95,8 +95,6 @@ class MySQL_Connection { char scramble_buff[40]; unsigned long long creation_time; unsigned long long last_time_used; - /* @brief Time at which the last 'event' was processed by 'handler' */ - unsigned long long last_event_time; unsigned long long timeout; int auto_increment_delay_token; int fd; @@ -224,8 +222,6 @@ class MySQL_Connection { void ProcessQueryAndSetStatusFlags(char *query_digest_text); void optimize(); void close_mysql(); - uint64_t idle_time(uint64_t curtime); - bool expire_auto_increment_delay(uint64_t curtime, uint64_t timeout); void set_is_client(); // used for local_stmts diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index f19489a1c..a13f2c48c 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -2726,6 +2726,7 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lo MySrvC *mysrvc=NULL; if (_lock) wrlock(); + c->auto_increment_delay_token = 0; status.myconnpoll_push++; mysrvc=(MySrvC *)c->parent; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index a33860ed6..f5f7b281c 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -4377,6 +4377,9 @@ int MySQL_Session::RunQuery(MySQL_Data_Stream *myds, MySQL_Connection *myconn) { // this function was inline void MySQL_Session::handler___status_WAITING_CLIENT_DATA() { +// NOTE: Maintenance of 'multiplex_delayed' has been moved to 'housekeeping_before_pkts'. The previous impl +// is left below as an example of how to perform a more passive maintenance over session connections. +/* if (mybes) { MySQL_Backend *_mybe; unsigned int i; @@ -4397,6 +4400,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() { } } } +*/ } void MySQL_Session::housekeeping_before_pkts() { @@ -7310,6 +7314,7 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_ new_myds->myprot.init(&new_myds, new_myds->myconn->userinfo, NULL); new_sess->status = RESETTING_CONNECTION; mc->async_state_machine = ASYNC_IDLE; // may not be true, but is used to correctly perform error handling + mc->auto_increment_delay_token = 0; new_myds->DSS = STATE_MARIADB_QUERY; thread->register_session_connection_handler(new_sess,true); if (new_myds->mypolls==NULL) { @@ -7430,9 +7435,29 @@ void MySQL_Session::finishQuery(MySQL_Data_Stream *myds, MySQL_Connection *mycon myds->myconn->set_status(true, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX); } } - if (mysql_thread___multiplexing && (myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - if (mysql_thread___connection_delay_multiplex_ms && mirror==false) { - myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000; + + const bool is_active_transaction = myds->myconn->IsActiveTransaction(); + const bool multiplex_disabled_by_status = myds->myconn->MultiplexDisabled(false); + + const bool multiplex_delayed = myds->myconn->auto_increment_delay_token > 0; + const bool multiplex_delayed_with_timeout = + !multiplex_disabled_by_status && multiplex_delayed && mysql_thread___auto_increment_delay_multiplex_timeout_ms > 0; + + const bool multiplex_disabled = !multiplex_disabled_by_status && (!multiplex_delayed || multiplex_delayed_with_timeout); + const bool conn_is_reusable = myds->myconn->reusable == true && !is_active_transaction && multiplex_disabled; + + if (mysql_thread___multiplexing && conn_is_reusable) { + if ((mysql_thread___connection_delay_multiplex_ms || multiplex_delayed_with_timeout) && mirror==false) { + if (multiplex_delayed_with_timeout) { + uint64_t delay_multiplex_us = mysql_thread___connection_delay_multiplex_ms * 1000; + uint64_t auto_increment_delay_us = mysql_thread___auto_increment_delay_multiplex_timeout_ms * 1000; + uint64_t delay_us = delay_multiplex_us > auto_increment_delay_us ? delay_multiplex_us : auto_increment_delay_us; + + myds->wait_until=thread->curtime + delay_us; + } else { + myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000; + } + myconn->async_state_machine=ASYNC_IDLE; myconn->multiplex_delayed=true; myds->DSS=STATE_MARIADB_GENERIC; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index d38013c22..652a684ea 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3740,16 +3740,24 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig } } - // Perform the maintenance of expired 'auto_increment_delay_multiplex' for connections on the session + // Perform the maintenance for expired connections on the session if (mysql_thread___multiplexing) { const auto auto_incr_delay_multiplex_check = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool { - const uint64_t multiplex_timeout_us = static_cast(mysql_thread___auto_increment_delay_multiplex_timeout_ms) * 1000; - const bool timeout_expired = multiplex_timeout_us != 0 && myconn->expire_auto_increment_delay(curtime, multiplex_timeout_us); + const uint64_t multiplex_timeout_ms = mysql_thread___auto_increment_delay_multiplex_timeout_ms; + const bool multiplex_delayed_enabled = multiplex_timeout_ms != 0 && myconn->auto_increment_delay_token > 0; + const bool timeout_expired = multiplex_delayed_enabled && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime; + return timeout_expired; + }; + + const auto conn_delay_multiplex = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool { + const bool multiplex_delayed = mysql_thread___connection_delay_multiplex_ms != 0 && myconn->multiplex_delayed == true; + const bool timeout_expired = multiplex_delayed && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime; return timeout_expired; }; const vector> expire_conn_checks { - auto_incr_delay_multiplex_check + auto_incr_delay_multiplex_check, + conn_delay_multiplex }; sess->update_expired_conns(expire_conn_checks); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index ff505ba72..81646e719 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -560,32 +560,6 @@ bool MySQL_Connection::get_status(uint32_t status_flag) { return this->status_flags & status_flag; } -uint64_t MySQL_Connection::idle_time(uint64_t curtime) { - // ASYNC_QUERY_END not required due to being transient state - const bool is_idle = this->async_state_machine == ASYNC_IDLE; - - if (is_idle) { - return curtime - this->last_event_time; - } else { - return 0; - } -} - -bool MySQL_Connection::expire_auto_increment_delay(uint64_t curtime, uint64_t timeout) { - if (timeout == 0) { - return false; - } - - uint64_t idle_time = this->idle_time(curtime); - - if (idle_time > timeout) { - this->auto_increment_delay_token = 0; - return true; - } else { - return false; - } -} - void MySQL_Connection::set_status_sql_log_bin0(bool v) { if (v) { status_flags |= STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0; @@ -1039,7 +1013,6 @@ void MySQL_Connection::set_is_client() { #define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) MDB_ASYNC_ST MySQL_Connection::handler(short event) { - this->last_event_time = myds->sess->thread->curtime; unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event if (mysql==NULL) { // it is the first time handler() is being called