Implementation of Automatic shun failing backends (issue #339)

pull/340/head
René Cannaò 11 years ago
parent cdffa376ca
commit 6d496d46ee

@ -47,11 +47,16 @@ class MySrvC { // MySQL Server Container
unsigned int max_connections;
unsigned int connect_OK;
unsigned int connect_ERR;
time_t time_last_detected_error;
unsigned int connect_ERR_at_time_last_detected_error;
unsigned long long queries_sent;
bool shunned_automatic;
//uint8_t charset;
MySrvConnList *ConnectionsUsed;
MySrvConnList *ConnectionsFree;
MySrvC(char *, uint16_t, unsigned int, enum MySerStatus, unsigned int, unsigned int _max_connections);
~MySrvC();
void connect_error();
};
class MySrvList { // MySQL Server List

@ -254,6 +254,8 @@ class MySQL_Threads_Handler
bool monitor_timer_cached;
int ping_interval_server;
int ping_timeout_server;
int shun_on_failures;
int shun_recovery_time;
int connect_retries_on_failure;
int connect_retries_delay;
int connect_timeout_server;

@ -676,6 +676,8 @@ __thread int mysql_thread___long_query_time;
__thread int mysql_thread___free_connections_pct;
__thread int mysql_thread___ping_interval_server;
__thread int mysql_thread___ping_timeout_server;
__thread int mysql_thread___shun_on_failures;
__thread int mysql_thread___shun_recovery_time;
__thread int mysql_thread___connect_retries_on_failure;
__thread int mysql_thread___connect_retries_delay;
__thread int mysql_thread___connect_timeout_server;
@ -726,6 +728,8 @@ extern __thread int mysql_thread___long_query_time;
extern __thread int mysql_thread___free_connections_pct;
extern __thread int mysql_thread___ping_interval_server;
extern __thread int mysql_thread___ping_timeout_server;
extern __thread int mysql_thread___shun_on_failures;
extern __thread int mysql_thread___shun_recovery_time;
extern __thread int mysql_thread___connect_retries_on_failure;
extern __thread int mysql_thread___connect_retries_delay;
extern __thread int mysql_thread___connect_timeout_server;

@ -111,12 +111,34 @@ MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _st
max_connections=_max_connections;
connect_OK=0;
connect_ERR=0;
queries_sent=0;
time_last_detected_error=0;
connect_ERR_at_time_last_detected_error=0;
shunned_automatic=false;
//charset=_charset;
myhgc=NULL;
ConnectionsUsed=new MySrvConnList(this);
ConnectionsFree=new MySrvConnList(this);
}
void MySrvC::connect_error() {
// NOTE: this function operates without any mutex
// although, it is not extremely important if any counter is lost
// as a single connection failure won't make a significant difference
__sync_fetch_and_add(&connect_ERR,1);
time_t t=time(NULL);
if (t!=time_last_detected_error) {
time_last_detected_error=t;
connect_ERR_at_time_last_detected_error=1;
} else {
int max_failures = ( mysql_thread___shun_on_failures > mysql_thread___connect_retries_on_failure ? mysql_thread___connect_retries_on_failure : mysql_thread___shun_on_failures) ;
if (__sync_add_and_fetch(&connect_ERR_at_time_last_detected_error,1) >= (unsigned int)max_failures) {
status=MYSQL_SERVER_STATUS_SHUNNED;
shunned_automatic=true;
}
}
}
MySrvC::~MySrvC() {
if (address) free(address);
delete ConnectionsUsed;
@ -298,6 +320,9 @@ bool MySQL_HostGroups_Manager::commit() {
if (atoi(r->fields[4])!=atoi(r->fields[9])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[9]));
mysrvc->status=(MySerStatus)atoi(r->fields[9]);
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
mysrvc->shunned_automatic=false;
}
}
if (atoi(r->fields[5])!=atoi(r->fields[10])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[10]));
@ -453,6 +478,23 @@ MySrvC *MyHGC::get_random_MySrvC() {
if (mysrvc->ConnectionsUsed->conns->len < mysrvc->max_connections) { // consider this server only if didn't reach max_connections
sum+=mysrvc->weight;
}
} else {
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
// try to recover shunned servers
if (mysrvc->shunned_automatic && mysql_thread___shun_recovery_time) {
time_t t;
t=time(NULL);
// we do all these changes without locking . We assume the server is not used from long
// even if the server is still in used and any of the follow command fails it is not critical
// because this is only an attempt to recover a server that is probably dead anyway
if ((t - mysrvc->time_last_detected_error) > mysql_thread___shun_recovery_time) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
mysrvc->shunned_automatic=false;
mysrvc->connect_ERR_at_time_last_detected_error=0;
mysrvc->time_last_detected_error=0;
}
}
}
}
}
if (sum==0) {
@ -604,7 +646,7 @@ __exit_get_multiple_idle_connections:
}
SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
const int colnum=8;
const int colnum=9;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping Connection Pool\n");
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"hostgroup");
@ -615,6 +657,7 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
result->add_column_definition(SQLITE_TEXT,"ConnFree");
result->add_column_definition(SQLITE_TEXT,"ConnOK");
result->add_column_definition(SQLITE_TEXT,"ConnERR");
result->add_column_definition(SQLITE_TEXT,"Queries");
wrlock();
int i,j, k;
@ -663,6 +706,8 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
pta[6]=strdup(buf);
sprintf(buf,"%u", mysrvc->connect_ERR);
pta[7]=strdup(buf);
sprintf(buf,"%llu", mysrvc->queries_sent);
pta[8]=strdup(buf);
result->add_row(pta);
for (k=0; k<colnum; k++) {
if (pta[k])

@ -134,6 +134,8 @@ void MySQL_Listeners_Manager::del(unsigned int idx) {
}
static char * mysql_thread_variables_names[]= {
(char *)"shun_on_failures",
(char *)"shun_recovery_time",
(char *)"connect_retries_on_failure",
(char *)"connect_retries_delay",
(char *)"connect_timeout_server",
@ -196,6 +198,8 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
shutdown_=0;
spinlock_rwlock_init(&rwlock);
pthread_attr_init(&attr);
variables.shun_on_failures=5;
variables.shun_recovery_time=10;
variables.connect_retries_on_failure=5;
variables.connect_timeout_server=1000;
variables.connect_timeout_server_max=10000;
@ -352,6 +356,8 @@ int MySQL_Threads_Handler::get_variable_int(char *name) {
if (!strcasecmp(name,"monitor_query_timeout")) return (int)variables.monitor_query_timeout;
if (!strcasecmp(name,"monitor_timer_cached")) return (int)variables.monitor_timer_cached;
}
if (!strcasecmp(name,"shun_on_failures")) return (int)variables.shun_on_failures;
if (!strcasecmp(name,"shun_recovery_time")) return (int)variables.shun_recovery_time;
if (!strcasecmp(name,"connect_retries_on_failure")) return (int)variables.connect_retries_on_failure;
if (!strcasecmp(name,"connect_timeout_server")) return (int)variables.connect_timeout_server;
if (!strcasecmp(name,"connect_timeout_server_max")) return (int)variables.connect_timeout_server_max;
@ -436,6 +442,14 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
}
return strdup(c->csname);
}
if (!strcasecmp(name,"shun_on_failures")) {
sprintf(intbuf,"%d",variables.shun_on_failures);
return strdup(intbuf);
}
if (!strcasecmp(name,"shun_recovery_time")) {
sprintf(intbuf,"%d",variables.shun_recovery_time);
return strdup(intbuf);
}
if (!strcasecmp(name,"connect_retries_on_failure")) {
sprintf(intbuf,"%d",variables.connect_retries_on_failure);
return strdup(intbuf);
@ -738,6 +752,24 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
return false;
}
}
if (!strcasecmp(name,"shun_on_failures")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 10000000) {
variables.shun_on_failures=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"shun_recovery_time")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 3600*24*365) {
variables.shun_recovery_time=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"connect_retries_on_failure")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 1000) {
@ -1543,6 +1575,8 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___long_query_time=GloMTH->get_variable_int((char *)"long_query_time");
mysql_thread___ping_interval_server=GloMTH->get_variable_int((char *)"ping_interval_server");
mysql_thread___ping_timeout_server=GloMTH->get_variable_int((char *)"ping_timeout_server");
mysql_thread___shun_on_failures=GloMTH->get_variable_int((char *)"shun_on_failures");
mysql_thread___shun_recovery_time=GloMTH->get_variable_int((char *)"shun_recovery_time");
mysql_thread___connect_retries_on_failure=GloMTH->get_variable_int((char *)"connect_retries_on_failure");
mysql_thread___connect_timeout_server=GloMTH->get_variable_int((char *)"connect_timeout_server");
mysql_thread___connect_timeout_server_max=GloMTH->get_variable_int((char *)"connect_timeout_server_max");

@ -62,7 +62,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY , hits INT NOT NULL)"
#define STATS_SQLITE_TABLE_MYSQL_COMMANDS_COUNTERS "CREATE TABLE stats_mysql_commands_counters (Command VARCHAR NOT NULL PRIMARY KEY , Total_Time_us INT NOT NULL , Total_cnt INT NOT NULL , cnt_100us INT NOT NULL , cnt_500us INT NOT NULL , cnt_1ms INT NOT NULL , cnt_5ms INT NOT NULL , cnt_10ms INT NOT NULL , cnt_50ms INT NOT NULL , cnt_100ms INT NOT NULL , cnt_500ms INT NOT NULL , cnt_1s INT NOT NULL , cnt_5s INT NOT NULL , cnt_10s INT NOT NULL , cnt_INFs)"
#define STATS_SQLITE_TABLE_MYSQL_PROCESSLIST "CREATE TABLE stats_mysql_processlist (ThreadID INT NOT NULL , SessionID INTEGER PRIMARY KEY , user VARCHAR , db VARCHAR , cli_host VARCHAR , cli_port VARCHAR , hostgroup VARCHAR , l_srv_host VARCHAR , l_srv_port VARCHAR , srv_host VARCHAR , srv_port VARCHAR , command VARCHAR , time_ms INT NOT NULL , info VARCHAR)"
#define STATS_SQLITE_TABLE_MYSQL_CONNECTION_POOL "CREATE TABLE stats_mysql_connection_pool (hostgroup VARCHAR , srv_host VARCHAR , srv_port VARCHAR , status VARCHAR , ConnUsed INT , ConnFree INT, ConnOK INT, ConnERR INT)"
#define STATS_SQLITE_TABLE_MYSQL_CONNECTION_POOL "CREATE TABLE stats_mysql_connection_pool (hostgroup VARCHAR , srv_host VARCHAR , srv_port VARCHAR , status VARCHAR , ConnUsed INT , ConnFree INT , ConnOK INT , ConnERR INT , Queries INT)"
#define STATS_SQLITE_TABLE_MYSQL_QUERY_DIGEST "CREATE TABLE stats_mysql_query_digest (schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , PRIMARY KEY(schemaname, username, digest))"
@ -2219,15 +2219,15 @@ void ProxySQL_Admin::stats___mysql_connection_pool() {
if (resultset==NULL) return;
statsdb->execute("BEGIN");
statsdb->execute("DELETE FROM stats_mysql_connection_pool");
char *a=(char *)"INSERT INTO stats_mysql_connection_pool VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")";
char *a=(char *)"INSERT INTO stats_mysql_connection_pool VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
for (int i=0; i<8; i++) {
for (int i=0; i<9; i++) {
arg_len+=strlen(r->fields[i]);
}
char *query=(char *)malloc(strlen(a)+arg_len+32);
sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5],r->fields[6],r->fields[7]);
sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5],r->fields[6],r->fields[7],r->fields[8]);
statsdb->execute(query);
free(query);
}

@ -389,11 +389,11 @@ handler_again:
__sync_fetch_and_add(&parent->connect_OK,1);
break;
case ASYNC_CONNECT_FAILED:
__sync_fetch_and_add(&parent->connect_ERR,1);
parent->connect_error();
break;
case ASYNC_CONNECT_TIMEOUT:
proxy_error("Connect timeout on %s:%d : %llu - %llu = %llu\n", parent->address, parent->port, myds->sess->thread->curtime , myds->wait_until, myds->sess->thread->curtime - myds->wait_until);
__sync_fetch_and_add(&parent->connect_ERR,1);
parent->connect_error();
break;
case ASYNC_CHANGE_USER_START:
change_user_start();
@ -454,6 +454,7 @@ handler_again:
break;
case ASYNC_QUERY_START:
real_query_start();
__sync_fetch_and_add(&parent->queries_sent,1);
if (async_exit_status) {
next_event(ASYNC_QUERY_CONT);
} else {

Loading…
Cancel
Save