From df606f2c70dc705d182addda43062c39a7099caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sun, 25 Mar 2018 12:46:45 +0200 Subject: [PATCH] Added mysql variable reset_connection_algorithm Variable reset_connection_algorithm could either be: 1 = algorithm used up too version 1.4 2 = algorithm new since ProxySQL 2.0 (now default) When reset_connection_algorithm = 2 , MySQL_Thread itself tries to reset connections instead of relying on connections purger HGCU_thread_run() --- include/MySQL_Session.h | 2 + include/MySQL_Thread.h | 1 + include/proxysql_structs.h | 7 ++- lib/MySQL_Session.cpp | 113 ++++++++++++++++++++++++++++++++++++- lib/MySQL_Thread.cpp | 17 ++++++ lib/mysql_connection.cpp | 27 +++++++-- lib/mysql_data_stream.cpp | 14 ++++- 7 files changed, 169 insertions(+), 12 deletions(-) diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index edcd4f5a4..5f73d17fb 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -92,6 +92,7 @@ class MySQL_Session void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); int handler_again___status_PINGING_SERVER(); + int handler_again___status_RESETTING_CONNECTION(); void handler_again___new_thread_to_kill_connection(); bool handler_again___verify_backend_charset(); @@ -215,6 +216,7 @@ class MySQL_Session void reset_all_backends(); void writeout(); void Memory_Stats(); + void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds); }; #endif /* __CLASS_MYSQL_SESSION_ H */ diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 0b3f068bc..64af1a94d 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -395,6 +395,7 @@ class MySQL_Threads_Handler int default_query_timeout; int query_processor_iterations; int query_processor_regex; + int reset_connection_algorithm; int long_query_time; int hostgroup_manager_verbose; int binlog_reader_connect_retry_msec; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 5000080d0..978722d21 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -28,6 +28,7 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine ASYNC_CHANGE_USER_END, ASYNC_CHANGE_USER_SUCCESSFUL, ASYNC_CHANGE_USER_FAILED, + ASYNC_CHANGE_USER_TIMEOUT, ASYNC_PING_START, ASYNC_PING_CONT, ASYNC_PING_END, @@ -127,6 +128,7 @@ enum session_status { CHANGING_AUTOCOMMIT, CHANGING_USER_CLIENT, CHANGING_USER_SERVER, + RESETTING_CONNECTION, SETTING_INIT_CONNECT, SETTING_SQL_LOG_BIN, SETTING_SQL_MODE, @@ -620,6 +622,7 @@ __thread int mysql_thread___connect_timeout_server; __thread int mysql_thread___connect_timeout_server_max; __thread int mysql_thread___query_processor_iterations; __thread int mysql_thread___query_processor_regex; +__thread int mysql_thread___reset_connection_algorithm; __thread uint16_t mysql_thread___server_capabilities; __thread uint8_t mysql_thread___default_charset; __thread int mysql_thread___poll_timeout; @@ -726,6 +729,7 @@ extern __thread int mysql_thread___connect_timeout_server; extern __thread int mysql_thread___connect_timeout_server_max; extern __thread int mysql_thread___query_processor_iterations; extern __thread int mysql_thread___query_processor_regex; +extern __thread int mysql_thread___reset_connection_algorithm; extern __thread uint16_t mysql_thread___server_capabilities; extern __thread uint8_t mysql_thread___default_charset; extern __thread int mysql_thread___poll_timeout; @@ -791,6 +795,3 @@ extern __thread bool mysql_thread___session_debug; #endif /* DEBUG */ extern __thread unsigned int g_seed; #endif /* PROXYSQL_EXTERN */ - - - diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 8be87a774..5ca7fdb75 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -976,6 +976,57 @@ int MySQL_Session::handler_again___status_PINGING_SERVER() { return 0; } +int MySQL_Session::handler_again___status_RESETTING_CONNECTION() { + assert(mybe->server_myds->myconn); + MySQL_Data_Stream *myds=mybe->server_myds; + MySQL_Connection *myconn=myds->myconn; + if (myds->mypolls==NULL) { + thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime); + } + myds->DSS=STATE_MARIADB_QUERY; + // we recreate local_stmts : see issue #752 + delete myconn->local_stmts; + myconn->local_stmts=new MySQL_STMTs_local_v14(false); // false by default, it is a backend + int rc=myconn->async_change_user(myds->revents); + if (rc==0) { + __sync_fetch_and_add(&MyHGM->status.backend_change_user, 1); + //myds->myconn->userinfo->set(client_myds->myconn->userinfo); + myds->myconn->reset(); + myconn->async_state_machine=ASYNC_IDLE; +// if (mysql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { + myds->return_MySQL_Connection_To_Pool(); +// } else { +// myds->destroy_MySQL_Connection_From_Pool(true); +// } + delete mybe->server_myds; + mybe->server_myds=NULL; + set_status(NONE); + return -1; + } else { + if (rc==-1 || rc==-2) { + if (rc==-2) { + proxy_error("Change user timeout during COM_CHANGE_USER on %s , %d\n", myconn->parent->address, myconn->parent->port); + } else { // rc==-1 + int myerr=mysql_errno(myconn->mysql); + proxy_error("Detected an error during COM_CHANGE_USER on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->mysql)); + } + myds->destroy_MySQL_Connection_From_Pool(false); + myds->fd=0; + //delete mybe->server_myds; + //mybe->server_myds=NULL; + RequestEnd(myds); //fix bug #682 + return -1; + } else { + // rc==1 , nothing to do for now + if (myds->mypolls==NULL) { + thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime); + } + } + } + return 0; +} + + void MySQL_Session::handler_again___new_thread_to_kill_connection() { MySQL_Data_Stream *myds=mybe->server_myds; if (myds->myconn && myds->myconn->mysql) { @@ -2482,6 +2533,16 @@ handler_again: } break; + case RESETTING_CONNECTION: + { + int rc = handler_again___status_RESETTING_CONNECTION(); + if (rc==-1) { // we always destroy the session + handler_ret = -1; + return handler_ret; + } + } + break; + case PROCESSING_STMT_PREPARE: case PROCESSING_STMT_EXECUTE: case PROCESSING_QUERY: @@ -2707,7 +2768,11 @@ handler_again: myds->wait_until=0; myds->DSS=STATE_NOT_INITIALIZED; if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) { - myds->destroy_MySQL_Connection_From_Pool(true); + if (mysql_thread___reset_connection_algorithm == 2) { + create_new_session_and_reset_connection(myds); + } else { + myds->destroy_MySQL_Connection_From_Pool(true); + } } else { myds->return_MySQL_Connection_To_Pool(); } @@ -2864,7 +2929,11 @@ handler_again: proxy_warning("Retrying query.\n"); } } - myds->destroy_MySQL_Connection_From_Pool(true); + if (mysql_thread___reset_connection_algorithm == 2) { + create_new_session_and_reset_connection(myds); + } else { + myds->destroy_MySQL_Connection_From_Pool(true); + } myds->fd=0; if (retry_conn) { myds->DSS=STATE_NOT_INITIALIZED; @@ -2932,7 +3001,11 @@ handler_again: if (mysql_thread___multiplexing && (myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { myds->DSS=STATE_NOT_INITIALIZED; if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) { - myds->destroy_MySQL_Connection_From_Pool(true); + if (mysql_thread___reset_connection_algorithm == 2) { + create_new_session_and_reset_connection(myds); + } else { + myds->destroy_MySQL_Connection_From_Pool(true); + } } else { myds->return_MySQL_Connection_To_Pool(); } @@ -4382,3 +4455,37 @@ void MySQL_Session::Memory_Stats() { thread->status_variables.mysql_frontend_buffers_bytes+=frontend; thread->status_variables.mysql_session_internal_bytes+=internal; } + + +void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_myds) { + MySQL_Data_Stream *new_myds = NULL; + MySQL_Connection * mc = _myds->myconn; + // we remove the connection from the original data stream + _myds->detach_connection(); + _myds->unplug_backend(); + + // we create a brand new session, a new data stream, and attach the connection to it + MySQL_Session * new_sess = new MySQL_Session(); + new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid); + + new_myds = new_sess->mybe->server_myds; + new_myds->attach_connection(mc); + new_myds->assign_fd_from_mysql_conn(); + new_myds->myds_type = MYDS_BACKEND; + new_sess->to_process = 1; + new_myds->wait_until = thread->curtime + mysql_thread___connect_timeout_server*1000; // max_timeout + mc->last_time_used = thread->curtime; + new_myds->myprot.init(&new_myds, new_myds->myconn->userinfo, NULL); + new_sess->status = RESETTING_CONNECTION; + new_myds->DSS = STATE_MARIADB_QUERY; + thread->register_session_connection_handler(new_sess,true); + if (new_myds->mypolls==NULL) { + thread->mypolls.add(POLLIN|POLLOUT, new_myds->fd, new_myds, thread->curtime); + } + int rc = new_sess->handler(); + if (rc==-1) { + unsigned int sess_idx = thread->mysql_sessions->len-1; + thread->unregister_session(sess_idx); + delete new_sess; + } +} diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 672fb5662..6d91a85e9 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -277,6 +277,7 @@ static char * mysql_thread_variables_names[]= { (char *)"default_query_timeout", (char *)"query_processor_iterations", (char *)"query_processor_regex", + (char *)"reset_connection_algorithm", (char *)"long_query_time", (char *)"query_cache_size_MB", (char *)"ping_interval_server_msec", @@ -389,6 +390,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.default_query_timeout=24*3600*1000; variables.query_processor_iterations=0; variables.query_processor_regex=1; + variables.reset_connection_algorithm=2; variables.long_query_time=1000; variables.query_cache_size_MB=256; variables.init_connect=NULL; @@ -650,6 +652,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"default_query_timeout")) return (int)variables.default_query_timeout; if (!strcasecmp(name,"query_processor_iterations")) return (int)variables.query_processor_iterations; if (!strcasecmp(name,"query_processor_regex")) return (int)variables.query_processor_regex; + if (!strcasecmp(name,"reset_connection_algorithm")) return (int)variables.reset_connection_algorithm; if (!strcasecmp(name,"default_max_latency_ms")) return (int)variables.default_max_latency_ms; if (!strcasecmp(name,"long_query_time")) return (int)variables.long_query_time; if (!strcasecmp(name,"query_cache_size_MB")) return (int)variables.query_cache_size_MB; @@ -973,6 +976,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f sprintf(intbuf,"%d",variables.query_processor_regex); return strdup(intbuf); } + if (!strcasecmp(name,"reset_connection_algorithm")) { + sprintf(intbuf,"%d",variables.reset_connection_algorithm); + return strdup(intbuf); + } if (!strcasecmp(name,"default_max_latency_ms")) { sprintf(intbuf,"%d",variables.default_max_latency_ms); return strdup(intbuf); @@ -1511,6 +1518,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t return false; } } + if (!strcasecmp(name,"reset_connection_algorithm")) { + int intv=atoi(value); + if (intv >= 1 && intv <= 2) { + variables.reset_connection_algorithm=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"default_max_latency_ms")) { int intv=atoi(value); if (intv >= 0 && intv <= 20*24*3600*1000) { @@ -3301,6 +3317,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___default_query_timeout=GloMTH->get_variable_int((char *)"default_query_timeout"); mysql_thread___query_processor_iterations=GloMTH->get_variable_int((char *)"query_processor_iterations"); mysql_thread___query_processor_regex=GloMTH->get_variable_int((char *)"query_processor_regex"); + mysql_thread___reset_connection_algorithm=GloMTH->get_variable_int((char *)"reset_connection_algorithm"); mysql_thread___default_max_latency_ms=GloMTH->get_variable_int((char *)"default_max_latency_ms"); mysql_thread___long_query_time=GloMTH->get_variable_int((char *)"long_query_time"); mysql_thread___query_cache_size_MB=GloMTH->get_variable_int((char *)"query_cache_size_MB"); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 3c9dd500a..e566f10aa 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -419,9 +419,15 @@ void MySQL_Connection::connect_cont(short event) { void MySQL_Connection::change_user_start() { PROXY_TRACE(); //fprintf(stderr,"change_user_start FD %d\n", fd); - MySQL_Connection_userinfo *_ui=myds->sess->client_myds->myconn->userinfo; + MySQL_Connection_userinfo *_ui = NULL; + if (myds->sess->client_myds == NULL) { + // if client_myds is not defined, we are using CHANGE_USER to reset the connection + _ui = userinfo; + } else { + _ui = myds->sess->client_myds->myconn->userinfo; + userinfo->set(_ui); // fix for bug #605 + } char *auth_password=NULL; - userinfo->set(_ui); // fix for bug #605 if (userinfo->password) { if (userinfo->password[0]=='*') { // we don't have the real password, let's pass sha1 auth_password=userinfo->sha1_pass; @@ -650,10 +656,14 @@ handler_again: } break; case ASYNC_CHANGE_USER_CONT: - assert(myds->sess->status==CHANGING_USER_SERVER); + assert(myds->sess->status==CHANGING_USER_SERVER || myds->sess->status==RESETTING_CONNECTION); change_user_cont(event); if (async_exit_status) { - next_event(ASYNC_CHANGE_USER_CONT); + if (myds->sess->thread->curtime >= myds->wait_until) { + NEXT_IMMEDIATE(ASYNC_CHANGE_USER_TIMEOUT); + } else { + next_event(ASYNC_CHANGE_USER_CONT); + } } else { NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END); } @@ -670,6 +680,8 @@ handler_again: break; case ASYNC_CHANGE_USER_FAILED: break; + case ASYNC_CHANGE_USER_TIMEOUT: + break; case ASYNC_PING_START: ping_start(); if (async_exit_status) { @@ -1253,6 +1265,7 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length, // 0 when the ping is completed successfully // -1 when the ping is completed not successfully // 1 when the ping is not completed +// -2 on timeout // the calling function should check mysql error in mysql struct int MySQL_Connection::async_ping(short event) { PROXY_TRACE(); @@ -1307,6 +1320,9 @@ int MySQL_Connection::async_change_user(short event) { case ASYNC_CHANGE_USER_FAILED: return -1; break; + case ASYNC_CHANGE_USER_TIMEOUT: + return -2; + break; case ASYNC_IDLE: async_state_machine=ASYNC_CHANGE_USER_START; default: @@ -1323,6 +1339,9 @@ int MySQL_Connection::async_change_user(short event) { case ASYNC_CHANGE_USER_FAILED: return -1; break; + case ASYNC_CHANGE_USER_TIMEOUT: + return -2; + break; default: return 1; break; diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index bbbcdaf35..3223832c9 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -5,6 +5,8 @@ #define UNIX_PATH_MAX 108 #endif +extern MySQL_Threads_Handler *GloMTH; + #ifdef DEBUG static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) { @@ -1244,8 +1246,16 @@ void MySQL_Data_Stream::return_MySQL_Connection_To_Pool() { mc->last_time_used=sess->thread->curtime; unsigned long long intv = mysql_thread___connection_max_age_ms; intv *= 1000; - if ((intv) && (mc->last_time_used > mc->creation_time + intv)) { - destroy_MySQL_Connection_From_Pool(true); + if ( + ( (intv) && (mc->last_time_used > mc->creation_time + intv) ) + || + ( mc->local_stmts->get_num_backend_stmts() > (unsigned int)GloMTH->variables.max_stmts_per_connection ) + ) { + if (mysql_thread___reset_connection_algorithm == 2) { + sess->create_new_session_and_reset_connection(this); + } else { + destroy_MySQL_Connection_From_Pool(true); + } } else { detach_connection(); unplug_backend();