Added mysql variable reset_connection_algorithm

Variable reset_connection_algorithm could either be:
1 = algorithm used up too version 1.4
2 = algorithm new since ProxySQL 2.0 (now default)

When reset_connection_algorithm = 2 , MySQL_Thread itself tries to reset connections instead of relying on connections purger HGCU_thread_run()
pull/1434/head
René Cannaò 8 years ago
parent be7f687a43
commit df606f2c70

@ -92,6 +92,7 @@ class MySQL_Session
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
int handler_again___status_PINGING_SERVER();
int handler_again___status_RESETTING_CONNECTION();
void handler_again___new_thread_to_kill_connection();
bool handler_again___verify_backend_charset();
@ -215,6 +216,7 @@ class MySQL_Session
void reset_all_backends();
void writeout();
void Memory_Stats();
void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds);
};
#endif /* __CLASS_MYSQL_SESSION_ H */

@ -395,6 +395,7 @@ class MySQL_Threads_Handler
int default_query_timeout;
int query_processor_iterations;
int query_processor_regex;
int reset_connection_algorithm;
int long_query_time;
int hostgroup_manager_verbose;
int binlog_reader_connect_retry_msec;

@ -28,6 +28,7 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine
ASYNC_CHANGE_USER_END,
ASYNC_CHANGE_USER_SUCCESSFUL,
ASYNC_CHANGE_USER_FAILED,
ASYNC_CHANGE_USER_TIMEOUT,
ASYNC_PING_START,
ASYNC_PING_CONT,
ASYNC_PING_END,
@ -127,6 +128,7 @@ enum session_status {
CHANGING_AUTOCOMMIT,
CHANGING_USER_CLIENT,
CHANGING_USER_SERVER,
RESETTING_CONNECTION,
SETTING_INIT_CONNECT,
SETTING_SQL_LOG_BIN,
SETTING_SQL_MODE,
@ -620,6 +622,7 @@ __thread int mysql_thread___connect_timeout_server;
__thread int mysql_thread___connect_timeout_server_max;
__thread int mysql_thread___query_processor_iterations;
__thread int mysql_thread___query_processor_regex;
__thread int mysql_thread___reset_connection_algorithm;
__thread uint16_t mysql_thread___server_capabilities;
__thread uint8_t mysql_thread___default_charset;
__thread int mysql_thread___poll_timeout;
@ -726,6 +729,7 @@ extern __thread int mysql_thread___connect_timeout_server;
extern __thread int mysql_thread___connect_timeout_server_max;
extern __thread int mysql_thread___query_processor_iterations;
extern __thread int mysql_thread___query_processor_regex;
extern __thread int mysql_thread___reset_connection_algorithm;
extern __thread uint16_t mysql_thread___server_capabilities;
extern __thread uint8_t mysql_thread___default_charset;
extern __thread int mysql_thread___poll_timeout;
@ -791,6 +795,3 @@ extern __thread bool mysql_thread___session_debug;
#endif /* DEBUG */
extern __thread unsigned int g_seed;
#endif /* PROXYSQL_EXTERN */

@ -976,6 +976,57 @@ int MySQL_Session::handler_again___status_PINGING_SERVER() {
return 0;
}
int MySQL_Session::handler_again___status_RESETTING_CONNECTION() {
assert(mybe->server_myds->myconn);
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime);
}
myds->DSS=STATE_MARIADB_QUERY;
// we recreate local_stmts : see issue #752
delete myconn->local_stmts;
myconn->local_stmts=new MySQL_STMTs_local_v14(false); // false by default, it is a backend
int rc=myconn->async_change_user(myds->revents);
if (rc==0) {
__sync_fetch_and_add(&MyHGM->status.backend_change_user, 1);
//myds->myconn->userinfo->set(client_myds->myconn->userinfo);
myds->myconn->reset();
myconn->async_state_machine=ASYNC_IDLE;
// if (mysql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
myds->return_MySQL_Connection_To_Pool();
// } else {
// myds->destroy_MySQL_Connection_From_Pool(true);
// }
delete mybe->server_myds;
mybe->server_myds=NULL;
set_status(NONE);
return -1;
} else {
if (rc==-1 || rc==-2) {
if (rc==-2) {
proxy_error("Change user timeout during COM_CHANGE_USER on %s , %d\n", myconn->parent->address, myconn->parent->port);
} else { // rc==-1
int myerr=mysql_errno(myconn->mysql);
proxy_error("Detected an error during COM_CHANGE_USER on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->mysql));
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
//delete mybe->server_myds;
//mybe->server_myds=NULL;
RequestEnd(myds); //fix bug #682
return -1;
} else {
// rc==1 , nothing to do for now
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime);
}
}
}
return 0;
}
void MySQL_Session::handler_again___new_thread_to_kill_connection() {
MySQL_Data_Stream *myds=mybe->server_myds;
if (myds->myconn && myds->myconn->mysql) {
@ -2482,6 +2533,16 @@ handler_again:
}
break;
case RESETTING_CONNECTION:
{
int rc = handler_again___status_RESETTING_CONNECTION();
if (rc==-1) { // we always destroy the session
handler_ret = -1;
return handler_ret;
}
}
break;
case PROCESSING_STMT_PREPARE:
case PROCESSING_STMT_EXECUTE:
case PROCESSING_QUERY:
@ -2707,7 +2768,11 @@ handler_again:
myds->wait_until=0;
myds->DSS=STATE_NOT_INITIALIZED;
if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) {
myds->destroy_MySQL_Connection_From_Pool(true);
if (mysql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
} else {
myds->return_MySQL_Connection_To_Pool();
}
@ -2864,7 +2929,11 @@ handler_again:
proxy_warning("Retrying query.\n");
}
}
myds->destroy_MySQL_Connection_From_Pool(true);
if (mysql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
@ -2932,7 +3001,11 @@ handler_again:
if (mysql_thread___multiplexing && (myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
myds->DSS=STATE_NOT_INITIALIZED;
if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit()==false) {
myds->destroy_MySQL_Connection_From_Pool(true);
if (mysql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
} else {
myds->return_MySQL_Connection_To_Pool();
}
@ -4382,3 +4455,37 @@ void MySQL_Session::Memory_Stats() {
thread->status_variables.mysql_frontend_buffers_bytes+=frontend;
thread->status_variables.mysql_session_internal_bytes+=internal;
}
void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_myds) {
MySQL_Data_Stream *new_myds = NULL;
MySQL_Connection * mc = _myds->myconn;
// we remove the connection from the original data stream
_myds->detach_connection();
_myds->unplug_backend();
// we create a brand new session, a new data stream, and attach the connection to it
MySQL_Session * new_sess = new MySQL_Session();
new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid);
new_myds = new_sess->mybe->server_myds;
new_myds->attach_connection(mc);
new_myds->assign_fd_from_mysql_conn();
new_myds->myds_type = MYDS_BACKEND;
new_sess->to_process = 1;
new_myds->wait_until = thread->curtime + mysql_thread___connect_timeout_server*1000; // max_timeout
mc->last_time_used = thread->curtime;
new_myds->myprot.init(&new_myds, new_myds->myconn->userinfo, NULL);
new_sess->status = RESETTING_CONNECTION;
new_myds->DSS = STATE_MARIADB_QUERY;
thread->register_session_connection_handler(new_sess,true);
if (new_myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, new_myds->fd, new_myds, thread->curtime);
}
int rc = new_sess->handler();
if (rc==-1) {
unsigned int sess_idx = thread->mysql_sessions->len-1;
thread->unregister_session(sess_idx);
delete new_sess;
}
}

@ -277,6 +277,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"default_query_timeout",
(char *)"query_processor_iterations",
(char *)"query_processor_regex",
(char *)"reset_connection_algorithm",
(char *)"long_query_time",
(char *)"query_cache_size_MB",
(char *)"ping_interval_server_msec",
@ -389,6 +390,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.default_query_timeout=24*3600*1000;
variables.query_processor_iterations=0;
variables.query_processor_regex=1;
variables.reset_connection_algorithm=2;
variables.long_query_time=1000;
variables.query_cache_size_MB=256;
variables.init_connect=NULL;
@ -650,6 +652,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) {
if (!strcasecmp(name,"default_query_timeout")) return (int)variables.default_query_timeout;
if (!strcasecmp(name,"query_processor_iterations")) return (int)variables.query_processor_iterations;
if (!strcasecmp(name,"query_processor_regex")) return (int)variables.query_processor_regex;
if (!strcasecmp(name,"reset_connection_algorithm")) return (int)variables.reset_connection_algorithm;
if (!strcasecmp(name,"default_max_latency_ms")) return (int)variables.default_max_latency_ms;
if (!strcasecmp(name,"long_query_time")) return (int)variables.long_query_time;
if (!strcasecmp(name,"query_cache_size_MB")) return (int)variables.query_cache_size_MB;
@ -973,6 +976,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
sprintf(intbuf,"%d",variables.query_processor_regex);
return strdup(intbuf);
}
if (!strcasecmp(name,"reset_connection_algorithm")) {
sprintf(intbuf,"%d",variables.reset_connection_algorithm);
return strdup(intbuf);
}
if (!strcasecmp(name,"default_max_latency_ms")) {
sprintf(intbuf,"%d",variables.default_max_latency_ms);
return strdup(intbuf);
@ -1511,6 +1518,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
return false;
}
}
if (!strcasecmp(name,"reset_connection_algorithm")) {
int intv=atoi(value);
if (intv >= 1 && intv <= 2) {
variables.reset_connection_algorithm=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"default_max_latency_ms")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 20*24*3600*1000) {
@ -3301,6 +3317,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___default_query_timeout=GloMTH->get_variable_int((char *)"default_query_timeout");
mysql_thread___query_processor_iterations=GloMTH->get_variable_int((char *)"query_processor_iterations");
mysql_thread___query_processor_regex=GloMTH->get_variable_int((char *)"query_processor_regex");
mysql_thread___reset_connection_algorithm=GloMTH->get_variable_int((char *)"reset_connection_algorithm");
mysql_thread___default_max_latency_ms=GloMTH->get_variable_int((char *)"default_max_latency_ms");
mysql_thread___long_query_time=GloMTH->get_variable_int((char *)"long_query_time");
mysql_thread___query_cache_size_MB=GloMTH->get_variable_int((char *)"query_cache_size_MB");

@ -419,9 +419,15 @@ void MySQL_Connection::connect_cont(short event) {
void MySQL_Connection::change_user_start() {
PROXY_TRACE();
//fprintf(stderr,"change_user_start FD %d\n", fd);
MySQL_Connection_userinfo *_ui=myds->sess->client_myds->myconn->userinfo;
MySQL_Connection_userinfo *_ui = NULL;
if (myds->sess->client_myds == NULL) {
// if client_myds is not defined, we are using CHANGE_USER to reset the connection
_ui = userinfo;
} else {
_ui = myds->sess->client_myds->myconn->userinfo;
userinfo->set(_ui); // fix for bug #605
}
char *auth_password=NULL;
userinfo->set(_ui); // fix for bug #605
if (userinfo->password) {
if (userinfo->password[0]=='*') { // we don't have the real password, let's pass sha1
auth_password=userinfo->sha1_pass;
@ -650,10 +656,14 @@ handler_again:
}
break;
case ASYNC_CHANGE_USER_CONT:
assert(myds->sess->status==CHANGING_USER_SERVER);
assert(myds->sess->status==CHANGING_USER_SERVER || myds->sess->status==RESETTING_CONNECTION);
change_user_cont(event);
if (async_exit_status) {
next_event(ASYNC_CHANGE_USER_CONT);
if (myds->sess->thread->curtime >= myds->wait_until) {
NEXT_IMMEDIATE(ASYNC_CHANGE_USER_TIMEOUT);
} else {
next_event(ASYNC_CHANGE_USER_CONT);
}
} else {
NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END);
}
@ -670,6 +680,8 @@ handler_again:
break;
case ASYNC_CHANGE_USER_FAILED:
break;
case ASYNC_CHANGE_USER_TIMEOUT:
break;
case ASYNC_PING_START:
ping_start();
if (async_exit_status) {
@ -1253,6 +1265,7 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length,
// 0 when the ping is completed successfully
// -1 when the ping is completed not successfully
// 1 when the ping is not completed
// -2 on timeout
// the calling function should check mysql error in mysql struct
int MySQL_Connection::async_ping(short event) {
PROXY_TRACE();
@ -1307,6 +1320,9 @@ int MySQL_Connection::async_change_user(short event) {
case ASYNC_CHANGE_USER_FAILED:
return -1;
break;
case ASYNC_CHANGE_USER_TIMEOUT:
return -2;
break;
case ASYNC_IDLE:
async_state_machine=ASYNC_CHANGE_USER_START;
default:
@ -1323,6 +1339,9 @@ int MySQL_Connection::async_change_user(short event) {
case ASYNC_CHANGE_USER_FAILED:
return -1;
break;
case ASYNC_CHANGE_USER_TIMEOUT:
return -2;
break;
default:
return 1;
break;

@ -5,6 +5,8 @@
#define UNIX_PATH_MAX 108
#endif
extern MySQL_Threads_Handler *GloMTH;
#ifdef DEBUG
static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) {
@ -1244,8 +1246,16 @@ void MySQL_Data_Stream::return_MySQL_Connection_To_Pool() {
mc->last_time_used=sess->thread->curtime;
unsigned long long intv = mysql_thread___connection_max_age_ms;
intv *= 1000;
if ((intv) && (mc->last_time_used > mc->creation_time + intv)) {
destroy_MySQL_Connection_From_Pool(true);
if (
( (intv) && (mc->last_time_used > mc->creation_time + intv) )
||
( mc->local_stmts->get_num_backend_stmts() > (unsigned int)GloMTH->variables.max_stmts_per_connection )
) {
if (mysql_thread___reset_connection_algorithm == 2) {
sess->create_new_session_and_reset_connection(this);
} else {
destroy_MySQL_Connection_From_Pool(true);
}
} else {
detach_connection();
unplug_backend();

Loading…
Cancel
Save