Fix current 'auto_increment_delay_multiplex_timeout_ms' behavior #3923

pull/3946/head
Javier Jaramago Fernández 4 years ago
parent f960a4d9b7
commit a67db17709

@ -1,5 +1,9 @@
#ifndef __CLASS_MYSQL_SESSION_H
#define __CLASS_MYSQL_SESSION_H
#include <functional>
#include <vector>
#include "proxysql.h"
#include "cpp.h"
#include "MySQL_Variables.h"
@ -155,7 +159,11 @@ class MySQL_Session
void init();
void reset();
void add_ldap_comment_to_pkt(PtrSize_t *);
/**
* @brief Performs the required housekeeping operations over the session and its connections before
* performing any processing on received client packets.
*/
void housekeeping_before_pkts();
int get_pkts_from_client(bool&, PtrSize_t&);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t&);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(PtrSize_t&);
@ -215,6 +223,12 @@ class MySQL_Session
PtrArray *mybes;
MySQL_Data_Stream *client_myds;
MySQL_Data_Stream *server_myds;
/*
* @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the
* maintenance thread. This values will be used to release the retained connection in the specific
* hostgroup in housekeeping operations, before client packet processing. Currently 'housekeeping_before_pkts'.
*/
std::vector<int32_t> hgs_expired_conns {};
char * default_schema;
char * user_attributes;
@ -319,6 +333,7 @@ class MySQL_Session
void Memory_Stats();
void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds);
bool handle_command_query_kill(PtrSize_t *);
void update_expired_conns(const std::vector<std::function<bool(MySQL_Connection*)>>&);
/**
* @brief Performs the final operations after current query has finished to be executed. It updates the session
* 'transaction_persistent_hostgroup', and updates the 'MySQL_Data_Stream' and 'MySQL_Connection' before

@ -95,6 +95,8 @@ class MySQL_Connection {
char scramble_buff[40];
unsigned long long creation_time;
unsigned long long last_time_used;
/* @brief Time at which the last 'event' was processed by 'handler' */
unsigned long long last_event_time;
unsigned long long timeout;
int auto_increment_delay_token;
int fd;
@ -217,11 +219,13 @@ class MySQL_Connection {
bool IsServerOffline();
bool IsAutoCommit();
bool AutocommitFalse_AndSavepoint();
bool MultiplexDisabled();
bool MultiplexDisabled(bool check_delay_token = true);
bool IsKeepMultiplexEnabledVariables(char *query_digest_text);
void ProcessQueryAndSetStatusFlags(char *query_digest_text);
void optimize();
void close_mysql();
uint64_t idle_time(uint64_t curtime);
bool expire_auto_increment_delay(uint64_t curtime, uint64_t timeout);
void set_is_client(); // used for local_stmts

@ -51,6 +51,8 @@
#define EXPMARIA
using std::function;
using std::vector;
static inline char is_digit(char c) {
if(c >= '0' && c <= '9')
@ -686,6 +688,28 @@ MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) {
return NULL; // NULL = backend not found
};
void MySQL_Session::update_expired_conns(const vector<function<bool(MySQL_Connection*)>>& checks) {
for (uint32_t i = 0; i < mybes->len; i++) {
MySQL_Backend* mybe = static_cast<MySQL_Backend*>(mybes->index(i));
MySQL_Data_Stream* myds = mybe != nullptr ? mybe->server_myds : nullptr;
MySQL_Connection* myconn = myds != nullptr ? myds->myconn : nullptr;
if (myconn != nullptr) {
const bool is_active_transaction = myconn->IsActiveTransaction();
const bool multiplex_disabled = myconn->MultiplexDisabled(false);
// Make sure the connection is reusable before performing any check
if (myconn->reusable==true && is_active_transaction==false && multiplex_disabled==false) {
for (const function<bool(MySQL_Connection*)>& check : checks) {
if (check(myconn)) {
this->hgs_expired_conns.push_back(mybe->hostgroup_id);
break;
}
}
}
}
}
}
MySQL_Backend * MySQL_Session::create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) {
MySQL_Backend *_mybe=new MySQL_Backend();
@ -4374,6 +4398,33 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
}
}
void MySQL_Session::housekeeping_before_pkts() {
if (mysql_thread___multiplexing) {
for (const int hg_id : hgs_expired_conns) {
MySQL_Backend* mybe = find_backend(hg_id);
if (mybe != nullptr) {
MySQL_Data_Stream* myds = mybe->server_myds;
if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) {
if (mysql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
} else {
myds->return_MySQL_Connection_To_Pool();
}
}
}
// We are required to perform a cleanup after consuming the elements, thus preventing any subsequent
// 'handler' call to perform recomputing of the already processed elements.
if (hgs_expired_conns.empty() == false) {
hgs_expired_conns.clear();
}
}
}
// this function was inline
void MySQL_Session::handler_rc0_Process_GTID(MySQL_Connection *myconn) {
if (myconn->get_gtid(mybe->gtid_uuid,&mybe->gtid_trxid)) {
@ -4426,6 +4477,7 @@ int MySQL_Session::handler() {
}
}
housekeeping_before_pkts();
handler_ret = get_pkts_from_client(wrong_pass, pkt);
if (handler_ret != 0) {
return handler_ret;

@ -1,4 +1,8 @@
//#define __CLASS_STANDARD_MYSQL_THREAD_H
#include <functional>
#include <vector>
#include "MySQL_HostGroups_Manager.h"
#include "prometheus_helpers.h"
#define MYSQL_THREAD_IMPLEMENTATION
@ -17,6 +21,9 @@
#include "MySQL_PreparedStatement.h"
#include "MySQL_Logger.hpp"
using std::vector;
using std::function;
#ifdef DEBUG
MySQL_Session *sess_stopat;
#endif
@ -3733,12 +3740,19 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig
}
}
if (sess->mybe && sess->mybe->server_myds && sess->mybe->server_myds->myconn) {
MySQL_Connection* myconn = sess->mybe->server_myds->myconn;
// Perform the maintenance of expired 'auto_increment_delay_multiplex' for connections on the session
if (mysql_thread___multiplexing) {
const auto auto_incr_delay_multiplex_check = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool {
const uint64_t multiplex_timeout_us = static_cast<uint64_t>(mysql_thread___auto_increment_delay_multiplex_timeout_ms) * 1000;
const bool timeout_expired = multiplex_timeout_us != 0 && myconn->expire_auto_increment_delay(curtime, multiplex_timeout_us);
return timeout_expired;
};
if (mysql_thread___auto_increment_delay_multiplex_timeout_ms != 0 && (sess_time/1000 > (unsigned long long)mysql_thread___auto_increment_delay_multiplex_timeout_ms)) {
myconn->auto_increment_delay_token = 0;
}
const vector<function<bool(MySQL_Connection*)>> expire_conn_checks {
auto_incr_delay_multiplex_check
};
sess->update_expired_conns(expire_conn_checks);
}
}

@ -560,6 +560,32 @@ bool MySQL_Connection::get_status(uint32_t status_flag) {
return this->status_flags & status_flag;
}
uint64_t MySQL_Connection::idle_time(uint64_t curtime) {
// ASYNC_QUERY_END not required due to being transient state
const bool is_idle = this->async_state_machine == ASYNC_IDLE;
if (is_idle) {
return curtime - this->last_event_time;
} else {
return 0;
}
}
bool MySQL_Connection::expire_auto_increment_delay(uint64_t curtime, uint64_t timeout) {
if (timeout == 0) {
return false;
}
uint64_t idle_time = this->idle_time(curtime);
if (idle_time > timeout) {
this->auto_increment_delay_token = 0;
return true;
} else {
return false;
}
}
void MySQL_Connection::set_status_sql_log_bin0(bool v) {
if (v) {
status_flags |= STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0;
@ -1013,6 +1039,7 @@ void MySQL_Connection::set_is_client() {
#define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0)
MDB_ASYNC_ST MySQL_Connection::handler(short event) {
this->last_event_time = myds->sess->thread->curtime;
unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event
if (mysql==NULL) {
// it is the first time handler() is being called
@ -2321,14 +2348,14 @@ bool MySQL_Connection::IsAutoCommit() {
return ret;
}
bool MySQL_Connection::MultiplexDisabled() {
bool MySQL_Connection::MultiplexDisabled(bool check_delay_token) {
// status_flags stores information about the status of the connection
// can be used to determine if multiplexing can be enabled or not
bool ret=false;
if (status_flags & (STATUS_MYSQL_CONNECTION_TRANSACTION|STATUS_MYSQL_CONNECTION_USER_VARIABLE|STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT|STATUS_MYSQL_CONNECTION_LOCK_TABLES|STATUS_MYSQL_CONNECTION_TEMPORARY_TABLE|STATUS_MYSQL_CONNECTION_GET_LOCK|STATUS_MYSQL_CONNECTION_NO_MULTIPLEX|STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0|STATUS_MYSQL_CONNECTION_FOUND_ROWS|STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) ) {
ret=true;
}
if (auto_increment_delay_token) return true;
if (check_delay_token && auto_increment_delay_token) return true;
return ret;
}

Loading…
Cancel
Save