diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 8378b4a53..a6ed700d4 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -310,6 +310,7 @@ class MySQL_Threads_Handler int query_retries_on_failure; int connect_retries_on_failure; int connect_retries_delay; + int connection_delay_multiplex_ms; int connection_max_age_ms; int connect_timeout_server; int connect_timeout_server_max; diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 217af46d2..d740945a7 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -90,6 +90,7 @@ class MySQL_Connection { bool processing_prepared_statement_prepare; bool processing_prepared_statement_execute; bool processing_multi_statement; + bool multiplex_delayed; MySQL_Connection(); ~MySQL_Connection(); bool set_autocommit(bool); diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 32d4d25c7..2bc3e8300 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -609,6 +609,7 @@ __thread int mysql_thread___shun_recovery_time_sec; __thread int mysql_thread___query_retries_on_failure; __thread int mysql_thread___connect_retries_on_failure; __thread int mysql_thread___connect_retries_delay; +__thread int mysql_thread___connection_delay_multiplex_ms; __thread int mysql_thread___connection_max_age_ms; __thread int mysql_thread___connect_timeout_server; __thread int mysql_thread___connect_timeout_server_max; @@ -706,6 +707,7 @@ extern __thread int mysql_thread___shun_recovery_time_sec; extern __thread int mysql_thread___query_retries_on_failure; extern __thread int mysql_thread___connect_retries_on_failure; extern __thread int mysql_thread___connect_retries_delay; +extern __thread int mysql_thread___connection_delay_multiplex_ms; extern __thread int mysql_thread___connection_max_age_ms; extern __thread int mysql_thread___connect_timeout_server; extern __thread int mysql_thread___connect_timeout_server_max; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 1351e0ae5..3ca6f4231 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -2212,6 +2212,29 @@ __get_pkts_from_client: handler_again: switch (status) { + case WAITING_CLIENT_DATA: + // housekeeping + if (mybes) { + MySQL_Backend *_mybe; + unsigned int i; + for (i=0; i < mybes->len; i++) { + _mybe=(MySQL_Backend *)mybes->index(i); + if (_mybe->server_myds) { + MySQL_Data_Stream *_myds=_mybe->server_myds; + if (_myds->myconn) { + if (_myds->myconn->multiplex_delayed) { + if (_myds->wait_until <= thread->curtime) { + _myds->wait_until=0; + _myds->myconn->multiplex_delayed=false; + _myds->DSS=STATE_NOT_INITIALIZED; + _myds->return_MySQL_Connection_To_Pool(); + } + } + } + } + } + } + break; case FAST_FORWARD: if (mybe->server_myds->mypolls==NULL) { // register the mysql_data_stream @@ -2440,12 +2463,22 @@ handler_again: RequestEnd(myds); if (mysql_thread___multiplexing && (myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - myds->DSS=STATE_NOT_INITIALIZED; - myds->return_MySQL_Connection_To_Pool(); + if (mysql_thread___connection_delay_multiplex_ms && mirror==false) { + myds->wait_until=thread->curtime+mysql_thread___connection_delay_multiplex_ms*1000; + myconn->async_state_machine=ASYNC_IDLE; + myconn->multiplex_delayed=true; + myds->DSS=STATE_MARIADB_GENERIC; + } else { + myconn->multiplex_delayed=false; + myds->wait_until=0; + myds->DSS=STATE_NOT_INITIALIZED; + myds->return_MySQL_Connection_To_Pool(); + } if (transaction_persistent==true) { transaction_persistent_hostgroup=-1; } } else { + myconn->multiplex_delayed=false; myconn->async_state_machine=ASYNC_IDLE; myds->DSS=STATE_MARIADB_GENERIC; if (transaction_persistent==true) { diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index be4fb46da..6ccbf8b82 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -197,6 +197,7 @@ static char * mysql_thread_variables_names[]= { (char *)"query_retries_on_failure", (char *)"connect_retries_on_failure", (char *)"connect_retries_delay", + (char *)"connection_delay_multiplex_ms", (char *)"connection_max_age_ms", (char *)"connect_timeout_server", (char *)"connect_timeout_server_max", @@ -304,6 +305,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.shun_recovery_time_sec=10; variables.query_retries_on_failure=1; variables.connect_retries_on_failure=10; + variables.connection_delay_multiplex_ms=0; variables.connection_max_age_ms=0; variables.connect_timeout_server=1000; variables.connect_timeout_server_max=10000; @@ -567,6 +569,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"shun_recovery_time_sec")) return (int)variables.shun_recovery_time_sec; if (!strcasecmp(name,"query_retries_on_failure")) return (int)variables.query_retries_on_failure; if (!strcasecmp(name,"connect_retries_on_failure")) return (int)variables.connect_retries_on_failure; + if (!strcasecmp(name,"connection_delay_multiplex_ms")) return (int)variables.connection_delay_multiplex_ms; if (!strcasecmp(name,"connection_max_age_ms")) return (int)variables.connection_max_age_ms; if (!strcasecmp(name,"connect_timeout_server")) return (int)variables.connect_timeout_server; if (!strcasecmp(name,"connect_timeout_server_max")) return (int)variables.connect_timeout_server_max; @@ -779,6 +782,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f sprintf(intbuf,"%d",variables.connect_retries_on_failure); return strdup(intbuf); } + if (!strcasecmp(name,"connection_delay_multiplex_ms")) { + sprintf(intbuf,"%d",variables.connection_delay_multiplex_ms); + return strdup(intbuf); + } if (!strcasecmp(name,"connection_max_age_ms")) { sprintf(intbuf,"%d",variables.connection_max_age_ms); return strdup(intbuf); @@ -1412,6 +1419,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t return false; } } + if (!strcasecmp(name,"connection_delay_multiplex_ms")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 300*1000) { + variables.connection_delay_multiplex_ms=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"connection_max_age_ms")) { int intv=atoi(value); if (intv >= 0 && intv <= 3600*24*1000) { @@ -2933,6 +2949,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___shun_recovery_time_sec=GloMTH->get_variable_int((char *)"shun_recovery_time_sec"); mysql_thread___query_retries_on_failure=GloMTH->get_variable_int((char *)"query_retries_on_failure"); mysql_thread___connect_retries_on_failure=GloMTH->get_variable_int((char *)"connect_retries_on_failure"); + mysql_thread___connection_delay_multiplex_ms=GloMTH->get_variable_int((char *)"connection_delay_multiplex_ms"); mysql_thread___connection_max_age_ms=GloMTH->get_variable_int((char *)"connection_max_age_ms"); mysql_thread___connect_timeout_server=GloMTH->get_variable_int((char *)"connect_timeout_server"); mysql_thread___connect_timeout_server_max=GloMTH->get_variable_int((char *)"connect_timeout_server_max"); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index f1746e960..a08d22f5a 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -176,6 +176,7 @@ MySQL_Connection::MySQL_Connection() { query.stmt_meta=NULL; query.stmt_result=NULL; largest_query_length=0; + multiplex_delayed=false; MyRS=NULL; creation_time=0; processing_multi_statement=false;