Modify previous impl for 'auto_increment_delay_multiplex_timeout_ms' and fix 'connection_delay_multiplex_ms'

This commit contains an implementation rework and a fix:
 - Impl for 'auto_increment_delay_multiplex_timeout_ms' has been
   reworked in favor of reusing 'wait_until' to share logic with
   previous 'connection_delay_multiplex_ms' variable.
 - Fix previous 'connection_delay_multiplex_ms' behavior preventing
   connection retaining when traffic unrelated to target hostgroup is
   being received by the session.
pull/3946/head
Javier Jaramago Fernández 4 years ago
parent 583218e9c5
commit bcc6532d66

@ -95,8 +95,6 @@ class MySQL_Connection {
char scramble_buff[40];
unsigned long long creation_time;
unsigned long long last_time_used;
/* @brief Time at which the last 'event' was processed by 'handler' */
unsigned long long last_event_time;
unsigned long long timeout;
int auto_increment_delay_token;
int fd;
@ -224,8 +222,6 @@ class MySQL_Connection {
void ProcessQueryAndSetStatusFlags(char *query_digest_text);
void optimize();
void close_mysql();
uint64_t idle_time(uint64_t curtime);
bool expire_auto_increment_delay(uint64_t curtime, uint64_t timeout);
void set_is_client(); // used for local_stmts

@ -2726,6 +2726,7 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lo
MySrvC *mysrvc=NULL;
if (_lock)
wrlock();
c->auto_increment_delay_token = 0;
status.myconnpoll_push++;
mysrvc=(MySrvC *)c->parent;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);

@ -4377,6 +4377,9 @@ int MySQL_Session::RunQuery(MySQL_Data_Stream *myds, MySQL_Connection *myconn) {
// this function was inline
void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
// NOTE: Maintenance of 'multiplex_delayed' has been moved to 'housekeeping_before_pkts'. The previous impl
// is left below as an example of how to perform a more passive maintenance over session connections.
/*
if (mybes) {
MySQL_Backend *_mybe;
unsigned int i;
@ -4397,6 +4400,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
}
}
}
*/
}
void MySQL_Session::housekeeping_before_pkts() {
@ -7310,6 +7314,7 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_
new_myds->myprot.init(&new_myds, new_myds->myconn->userinfo, NULL);
new_sess->status = RESETTING_CONNECTION;
mc->async_state_machine = ASYNC_IDLE; // may not be true, but is used to correctly perform error handling
mc->auto_increment_delay_token = 0;
new_myds->DSS = STATE_MARIADB_QUERY;
thread->register_session_connection_handler(new_sess,true);
if (new_myds->mypolls==NULL) {
@ -7430,9 +7435,29 @@ void MySQL_Session::finishQuery(MySQL_Data_Stream *myds, MySQL_Connection *mycon
myds->myconn->set_status(true, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX);
}
}
if (mysql_thread___multiplexing && (myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
if (mysql_thread___connection_delay_multiplex_ms && mirror==false) {
myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000;
const bool is_active_transaction = myds->myconn->IsActiveTransaction();
const bool multiplex_disabled_by_status = myds->myconn->MultiplexDisabled(false);
const bool multiplex_delayed = myds->myconn->auto_increment_delay_token > 0;
const bool multiplex_delayed_with_timeout =
!multiplex_disabled_by_status && multiplex_delayed && mysql_thread___auto_increment_delay_multiplex_timeout_ms > 0;
const bool multiplex_disabled = !multiplex_disabled_by_status && (!multiplex_delayed || multiplex_delayed_with_timeout);
const bool conn_is_reusable = myds->myconn->reusable == true && !is_active_transaction && multiplex_disabled;
if (mysql_thread___multiplexing && conn_is_reusable) {
if ((mysql_thread___connection_delay_multiplex_ms || multiplex_delayed_with_timeout) && mirror==false) {
if (multiplex_delayed_with_timeout) {
uint64_t delay_multiplex_us = mysql_thread___connection_delay_multiplex_ms * 1000;
uint64_t auto_increment_delay_us = mysql_thread___auto_increment_delay_multiplex_timeout_ms * 1000;
uint64_t delay_us = delay_multiplex_us > auto_increment_delay_us ? delay_multiplex_us : auto_increment_delay_us;
myds->wait_until=thread->curtime + delay_us;
} else {
myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000;
}
myconn->async_state_machine=ASYNC_IDLE;
myconn->multiplex_delayed=true;
myds->DSS=STATE_MARIADB_GENERIC;

@ -3740,16 +3740,24 @@ void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsig
}
}
// Perform the maintenance of expired 'auto_increment_delay_multiplex' for connections on the session
// Perform the maintenance for expired connections on the session
if (mysql_thread___multiplexing) {
const auto auto_incr_delay_multiplex_check = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool {
const uint64_t multiplex_timeout_us = static_cast<uint64_t>(mysql_thread___auto_increment_delay_multiplex_timeout_ms) * 1000;
const bool timeout_expired = multiplex_timeout_us != 0 && myconn->expire_auto_increment_delay(curtime, multiplex_timeout_us);
const uint64_t multiplex_timeout_ms = mysql_thread___auto_increment_delay_multiplex_timeout_ms;
const bool multiplex_delayed_enabled = multiplex_timeout_ms != 0 && myconn->auto_increment_delay_token > 0;
const bool timeout_expired = multiplex_delayed_enabled && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime;
return timeout_expired;
};
const auto conn_delay_multiplex = [curtime=this->curtime] (MySQL_Connection* myconn) -> bool {
const bool multiplex_delayed = mysql_thread___connection_delay_multiplex_ms != 0 && myconn->multiplex_delayed == true;
const bool timeout_expired = multiplex_delayed && myconn->myds->wait_until != 0 && myconn->myds->wait_until < curtime;
return timeout_expired;
};
const vector<function<bool(MySQL_Connection*)>> expire_conn_checks {
auto_incr_delay_multiplex_check
auto_incr_delay_multiplex_check,
conn_delay_multiplex
};
sess->update_expired_conns(expire_conn_checks);

@ -560,32 +560,6 @@ bool MySQL_Connection::get_status(uint32_t status_flag) {
return this->status_flags & status_flag;
}
uint64_t MySQL_Connection::idle_time(uint64_t curtime) {
// ASYNC_QUERY_END not required due to being transient state
const bool is_idle = this->async_state_machine == ASYNC_IDLE;
if (is_idle) {
return curtime - this->last_event_time;
} else {
return 0;
}
}
bool MySQL_Connection::expire_auto_increment_delay(uint64_t curtime, uint64_t timeout) {
if (timeout == 0) {
return false;
}
uint64_t idle_time = this->idle_time(curtime);
if (idle_time > timeout) {
this->auto_increment_delay_token = 0;
return true;
} else {
return false;
}
}
void MySQL_Connection::set_status_sql_log_bin0(bool v) {
if (v) {
status_flags |= STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0;
@ -1039,7 +1013,6 @@ void MySQL_Connection::set_is_client() {
#define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0)
MDB_ASYNC_ST MySQL_Connection::handler(short event) {
this->last_event_time = myds->sess->thread->curtime;
unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event
if (mysql==NULL) {
// it is the first time handler() is being called

Loading…
Cancel
Save