diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 15c826fc9..5f65a38ce 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -1,5 +1,9 @@ #ifndef __CLASS_MYSQL_SESSION_H #define __CLASS_MYSQL_SESSION_H + +#include +#include + #include "proxysql.h" #include "cpp.h" #include "MySQL_Variables.h" @@ -155,7 +159,11 @@ class MySQL_Session void init(); void reset(); void add_ldap_comment_to_pkt(PtrSize_t *); - + /** + * @brief Performs the required housekeeping operations over the session and its connections before + * performing any processing on received client packets. + */ + void housekeeping_before_pkts(); int get_pkts_from_client(bool&, PtrSize_t&); void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t&); void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(PtrSize_t&); @@ -215,6 +223,12 @@ class MySQL_Session PtrArray *mybes; MySQL_Data_Stream *client_myds; MySQL_Data_Stream *server_myds; + /* + * @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the + * maintenance thread. This values will be used to release the retained connection in the specific + * hostgroup in housekeeping operations, before client packet processing. Currently 'housekeeping_before_pkts'. + */ + std::vector hgs_expired_conns {}; char * default_schema; char * user_attributes; @@ -319,6 +333,7 @@ class MySQL_Session void Memory_Stats(); void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds); bool handle_command_query_kill(PtrSize_t *); + void update_expired_conns(const std::vector>&); /** * @brief Performs the final operations after current query has finished to be executed. It updates the session * 'transaction_persistent_hostgroup', and updates the 'MySQL_Data_Stream' and 'MySQL_Connection' before diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 6283e149f..3b07fc741 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -95,6 +95,8 @@ class MySQL_Connection { char scramble_buff[40]; unsigned long long creation_time; unsigned long long last_time_used; + /* @brief Time at which the last 'event' was processed by 'handler' */ + unsigned long long last_event_time; unsigned long long timeout; int auto_increment_delay_token; int fd; @@ -217,11 +219,13 @@ class MySQL_Connection { bool IsServerOffline(); bool IsAutoCommit(); bool AutocommitFalse_AndSavepoint(); - bool MultiplexDisabled(); + bool MultiplexDisabled(bool check_delay_token = true); bool IsKeepMultiplexEnabledVariables(char *query_digest_text); void ProcessQueryAndSetStatusFlags(char *query_digest_text); void optimize(); void close_mysql(); + uint64_t idle_time(uint64_t curtime); + bool expire_auto_increment_delay(uint64_t curtime, uint64_t timeout); void set_is_client(); // used for local_stmts diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index e4c99625e..fd0b1de22 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -51,6 +51,8 @@ #define EXPMARIA +using std::function; +using std::vector; static inline char is_digit(char c) { if(c >= '0' && c <= '9') @@ -686,6 +688,28 @@ MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) { return NULL; // NULL = backend not found }; +void MySQL_Session::update_expired_conns(const vector>& checks) { + for (uint32_t i = 0; i < mybes->len; i++) { + MySQL_Backend* mybe = static_cast(mybes->index(i)); + MySQL_Data_Stream* myds = mybe != nullptr ? mybe->server_myds : nullptr; + MySQL_Connection* myconn = myds != nullptr ? myds->myconn : nullptr; + + if (myconn != nullptr) { + const bool is_active_transaction = myconn->IsActiveTransaction(); + const bool multiplex_disabled = myconn->MultiplexDisabled(false); + + // Make sure the connection is reusable before performing any check + if (myconn->reusable==true && is_active_transaction==false && multiplex_disabled==false) { + for (const function& check : checks) { + if (check(myconn)) { + this->hgs_expired_conns.push_back(mybe->hostgroup_id); + break; + } + } + } + } + } +} MySQL_Backend * MySQL_Session::create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) { MySQL_Backend *_mybe=new MySQL_Backend(); @@ -4374,6 +4398,33 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() { } } +void MySQL_Session::housekeeping_before_pkts() { + if (mysql_thread___multiplexing) { + for (const int hg_id : hgs_expired_conns) { + MySQL_Backend* mybe = find_backend(hg_id); + + if (mybe != nullptr) { + MySQL_Data_Stream* myds = mybe->server_myds; + + if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) { + if (mysql_thread___reset_connection_algorithm == 2) { + create_new_session_and_reset_connection(myds); + } else { + myds->destroy_MySQL_Connection_From_Pool(true); + } + } else { + myds->return_MySQL_Connection_To_Pool(); + } + } + } + // We are required to perform a cleanup after consuming the elements, thus preventing any subsequent + // 'handler' call to perform recomputing of the already processed elements. + if (hgs_expired_conns.empty() == false) { + hgs_expired_conns.clear(); + } + } +} + // this function was inline void MySQL_Session::handler_rc0_Process_GTID(MySQL_Connection *myconn) { if (myconn->get_gtid(mybe->gtid_uuid,&mybe->gtid_trxid)) { @@ -4426,6 +4477,7 @@ int MySQL_Session::handler() { } } + housekeeping_before_pkts(); handler_ret = get_pkts_from_client(wrong_pass, pkt); if (handler_ret != 0) { return handler_ret; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index c95d75641..d38013c22 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -1,4 +1,8 @@ //#define __CLASS_STANDARD_MYSQL_THREAD_H + +#include +#include + #include "MySQL_HostGroups_Manager.h" #include "prometheus_helpers.h" #define MYSQL_THREAD_IMPLEMENTATION @@ -17,6 +21,9 @@ #include "MySQL_PreparedStatement.h" #include "MySQL_Logger.hpp" +using std::vector; +using std::function; + #ifdef DEBUG MySQL_Session *sess_stopat; #endif @@ -3733,12 +3740,19 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig } } - if (sess->mybe && sess->mybe->server_myds && sess->mybe->server_myds->myconn) { - MySQL_Connection* myconn = sess->mybe->server_myds->myconn; + // Perform the maintenance of expired 'auto_increment_delay_multiplex' for connections on the session + if (mysql_thread___multiplexing) { + const auto auto_incr_delay_multiplex_check = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool { + const uint64_t multiplex_timeout_us = static_cast(mysql_thread___auto_increment_delay_multiplex_timeout_ms) * 1000; + const bool timeout_expired = multiplex_timeout_us != 0 && myconn->expire_auto_increment_delay(curtime, multiplex_timeout_us); + return timeout_expired; + }; - if (mysql_thread___auto_increment_delay_multiplex_timeout_ms != 0 && (sess_time/1000 > (unsigned long long)mysql_thread___auto_increment_delay_multiplex_timeout_ms)) { - myconn->auto_increment_delay_token = 0; - } + const vector> expire_conn_checks { + auto_incr_delay_multiplex_check + }; + + sess->update_expired_conns(expire_conn_checks); } } diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index c5b25b2e4..ff505ba72 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -560,6 +560,32 @@ bool MySQL_Connection::get_status(uint32_t status_flag) { return this->status_flags & status_flag; } +uint64_t MySQL_Connection::idle_time(uint64_t curtime) { + // ASYNC_QUERY_END not required due to being transient state + const bool is_idle = this->async_state_machine == ASYNC_IDLE; + + if (is_idle) { + return curtime - this->last_event_time; + } else { + return 0; + } +} + +bool MySQL_Connection::expire_auto_increment_delay(uint64_t curtime, uint64_t timeout) { + if (timeout == 0) { + return false; + } + + uint64_t idle_time = this->idle_time(curtime); + + if (idle_time > timeout) { + this->auto_increment_delay_token = 0; + return true; + } else { + return false; + } +} + void MySQL_Connection::set_status_sql_log_bin0(bool v) { if (v) { status_flags |= STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0; @@ -1013,6 +1039,7 @@ void MySQL_Connection::set_is_client() { #define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) MDB_ASYNC_ST MySQL_Connection::handler(short event) { + this->last_event_time = myds->sess->thread->curtime; unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event if (mysql==NULL) { // it is the first time handler() is being called @@ -2321,14 +2348,14 @@ bool MySQL_Connection::IsAutoCommit() { return ret; } -bool MySQL_Connection::MultiplexDisabled() { +bool MySQL_Connection::MultiplexDisabled(bool check_delay_token) { // status_flags stores information about the status of the connection // can be used to determine if multiplexing can be enabled or not bool ret=false; if (status_flags & (STATUS_MYSQL_CONNECTION_TRANSACTION|STATUS_MYSQL_CONNECTION_USER_VARIABLE|STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT|STATUS_MYSQL_CONNECTION_LOCK_TABLES|STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE|STATUS_MYSQL_CONNECTION_GET_LOCK|STATUS_MYSQL_CONNECTION_NO_MULTIPLEX|STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0|STATUS_MYSQL_CONNECTION_FOUND_ROWS|STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) ) { ret=true; } - if (auto_increment_delay_token) return true; + if (check_delay_token && auto_increment_delay_token) return true; return ret; }