From d885d4a7376ef73b57dd03261a473fe01668a375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sun, 22 May 2016 14:33:23 +0000 Subject: [PATCH] Implementation of max retries Added global variables mysql-query_retries_on_failure (default 1) Added new field mysql_query_rules.retries (default NULL) --- include/MySQL_Data_Stream.h | 1 + include/MySQL_Thread.h | 1 + include/proxysql_structs.h | 2 ++ include/query_processor.h | 5 +++- lib/MySQL_Session.cpp | 44 +++++++++++++++++++++++--------- lib/MySQL_Thread.cpp | 17 +++++++++++++ lib/ProxySQL_Admin.cpp | 50 +++++++++++++++++++++---------------- lib/Query_Processor.cpp | 36 ++++++++++++++++++-------- lib/mysql_data_stream.cpp | 1 + 9 files changed, 111 insertions(+), 46 deletions(-) diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 35232aa0d..e95d196e0 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -115,6 +115,7 @@ class MySQL_Data_Stream } proxy_addr; unsigned int connect_tries; + int query_retries_on_failure; int connect_retries_on_failure; enum mysql_data_stream_status DSS; enum MySQL_DS_type myds_type; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 8aaa59386..a40ae599a 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -265,6 +265,7 @@ class MySQL_Threads_Handler int ping_timeout_server; int shun_on_failures; int shun_recovery_time_sec; + int query_retries_on_failure; int connect_retries_on_failure; int connect_retries_delay; int connection_max_age_ms; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 63e5edcf5..d541925da 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -711,6 +711,7 @@ __thread int mysql_thread___ping_interval_server_msec; __thread int mysql_thread___ping_timeout_server; __thread int mysql_thread___shun_on_failures; __thread int mysql_thread___shun_recovery_time_sec; +__thread int mysql_thread___query_retries_on_failure; __thread int mysql_thread___connect_retries_on_failure; __thread int mysql_thread___connect_retries_delay; __thread int mysql_thread___connection_max_age_ms; @@ -785,6 +786,7 @@ extern __thread int mysql_thread___ping_interval_server_msec; extern __thread int mysql_thread___ping_timeout_server; extern __thread int mysql_thread___shun_on_failures; extern __thread int mysql_thread___shun_recovery_time_sec; +extern __thread int mysql_thread___query_retries_on_failure; extern __thread int mysql_thread___connect_retries_on_failure; extern __thread int mysql_thread___connect_retries_delay; extern __thread int mysql_thread___connection_max_age_ms; diff --git a/include/query_processor.h b/include/query_processor.h index f80d47fb4..1b90a024a 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -75,6 +75,7 @@ struct _Query_Processor_rule_t { int cache_ttl; int reconnect; int timeout; + int retries; int delay; int mirror_hostgroup; int mirror_flagOUT; @@ -112,6 +113,7 @@ class Query_Processor_Output { int cache_ttl; int reconnect; int timeout; + int retries; int delay; char *error_msg; int log; @@ -131,6 +133,7 @@ class Query_Processor_Output { cache_ttl=-1; reconnect=-1; timeout=-1; + retries=-1; delay=-1; log=-1; new_query=NULL; @@ -218,7 +221,7 @@ class Query_Processor { void wrunlock(); // explicit write unlock bool insert(QP_rule_t *qr, bool lock=true); // insert a new rule. Uses a generic void pointer to a structure that may vary depending from the Query Processor // virtual bool insert_locked(QP_rule_t *qr) {return false;}; // call this instead of insert() in case lock was already acquired via wrlock() - QP_rule_t * new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *client_addr, char *proxy_addr, int proxy_port, char *digest, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int delay, int mirror_hostgroup, int mirror_flagOUT, char *error_msg, int log, bool apply); // to use a generic query rule struct, this is generated by this function and returned as generic void pointer + QP_rule_t * new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *client_addr, char *proxy_addr, int proxy_port, char *digest, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int retries, int delay, int mirror_hostgroup, int mirror_flagOUT, char *error_msg, int log, bool apply); // to use a generic query rule struct, this is generated by this function and returned as generic void pointer void delete_query_rule(QP_rule_t *qr); // destructor //virtual bool remove(int rule_id, bool lock=true) {return false;}; // FIXME: not implemented yet, should be implemented at all ? // virtual bool remove_locked(int rule_id) {return false;}; // call this instead of remove() in case lock was already acquired via wrlock() diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 33db3380b..a6df37fc0 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -745,6 +745,14 @@ __get_pkts_from_client: } mybe=find_or_create_backend(current_hostgroup); status=PROCESSING_QUERY; + // set query retries + mybe->server_myds->query_retries_on_failure=mysql_thread___query_retries_on_failure; + // if a number of retries is set in mysql_query_rules, that takes priority + if (qpo) { + if (qpo->retries >= 0) { + mybe->server_myds->query_retries_on_failure=qpo->retries; + } + } mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure; mybe->server_myds->wait_until=0; pause_until=0; @@ -1058,11 +1066,15 @@ handler_again: } bool retry_conn=false; proxy_error("Detected an offline server during query: %s, %d\n", myconn->parent->address, myconn->parent->port); - if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - if (myds->myconn->MyRS && myds->myconn->MyRS->transfer_started) { - // transfer to frontend has started, we cannot retry - } else { - retry_conn=true; + if (myds->query_retries_on_failure > 0) { + myds->query_retries_on_failure--; + if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { + if (myds->myconn->MyRS && myds->myconn->MyRS->transfer_started) { + // transfer to frontend has started, we cannot retry + } else { + retry_conn=true; + proxy_warning("Retrying query.\n"); + } } } myds->destroy_MySQL_Connection_From_Pool(false); @@ -1080,11 +1092,15 @@ handler_again: // client error, serious proxy_error("Detected a broken connection during query on (%d,%s,%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql)); //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { - if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - if (myds->myconn->MyRS && myds->myconn->MyRS->transfer_started) { - // transfer to frontend has started, we cannot retry - } else { - retry_conn=true; + if (myds->query_retries_on_failure > 0) { + myds->query_retries_on_failure--; + if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { + if (myds->myconn->MyRS && myds->myconn->MyRS->transfer_started) { + // transfer to frontend has started, we cannot retry + } else { + retry_conn=true; + proxy_warning("Retrying query.\n"); + } } } myds->destroy_MySQL_Connection_From_Pool(false); @@ -1110,8 +1126,12 @@ handler_again: case 1290: // read-only case 1047: // WSREP has not yet prepared node for application use case 1053: // Server shutdown in progress - if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - retry_conn=true; + if (myds->query_retries_on_failure > 0) { + myds->query_retries_on_failure--; + if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { + retry_conn=true; + proxy_warning("Retrying query.\n"); + } } myds->destroy_MySQL_Connection_From_Pool(true); myds->fd=0; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 2aac25d46..036962f9f 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -149,6 +149,7 @@ void MySQL_Listeners_Manager::del(unsigned int idx) { static char * mysql_thread_variables_names[]= { (char *)"shun_on_failures", (char *)"shun_recovery_time_sec", + (char *)"query_retries_on_failure", (char *)"connect_retries_on_failure", (char *)"connect_retries_delay", (char *)"connection_max_age_ms", @@ -231,6 +232,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { pthread_attr_init(&attr); variables.shun_on_failures=5; variables.shun_recovery_time_sec=10; + variables.query_retries_on_failure=1; variables.connect_retries_on_failure=5; variables.connection_max_age_ms=0; variables.connect_timeout_server=1000; @@ -445,6 +447,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { } if (!strcasecmp(name,"shun_on_failures")) return (int)variables.shun_on_failures; if (!strcasecmp(name,"shun_recovery_time_sec")) return (int)variables.shun_recovery_time_sec; + if (!strcasecmp(name,"query_retries_on_failure")) return (int)variables.query_retries_on_failure; if (!strcasecmp(name,"connect_retries_on_failure")) return (int)variables.connect_retries_on_failure; if (!strcasecmp(name,"connection_max_age_ms")) return (int)variables.connection_max_age_ms; if (!strcasecmp(name,"connect_timeout_server")) return (int)variables.connect_timeout_server; @@ -568,6 +571,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f sprintf(intbuf,"%d",variables.shun_recovery_time_sec); return strdup(intbuf); } + if (!strcasecmp(name,"query_retries_on_failure")) { + sprintf(intbuf,"%d",variables.query_retries_on_failure); + return strdup(intbuf); + } if (!strcasecmp(name,"connect_retries_on_failure")) { sprintf(intbuf,"%d",variables.connect_retries_on_failure); return strdup(intbuf); @@ -1000,6 +1007,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t return false; } } + if (!strcasecmp(name,"query_retries_on_failure")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 1000) { + variables.query_retries_on_failure=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"connect_retries_on_failure")) { int intv=atoi(value); if (intv >= 0 && intv <= 1000) { @@ -1921,6 +1937,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___ping_timeout_server=GloMTH->get_variable_int((char *)"ping_timeout_server"); mysql_thread___shun_on_failures=GloMTH->get_variable_int((char *)"shun_on_failures"); mysql_thread___shun_recovery_time_sec=GloMTH->get_variable_int((char *)"shun_recovery_time_sec"); + mysql_thread___query_retries_on_failure=GloMTH->get_variable_int((char *)"query_retries_on_failure"); mysql_thread___connect_retries_on_failure=GloMTH->get_variable_int((char *)"connect_retries_on_failure"); mysql_thread___connection_max_age_ms=GloMTH->get_variable_int((char *)"connection_max_age_ms"); mysql_thread___connect_timeout_server=GloMTH->get_variable_int((char *)"connect_timeout_server"); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index bcd0372bd..557d32f4a 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -76,7 +76,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER; #define ADMIN_SQLITE_TABLE_MYSQL_USERS "CREATE TABLE mysql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , default_schema VARCHAR , schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))" -#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT , digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" +#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT , digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , retries INT CHECK (retries>=0 AND retries <=1000) , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" // mysql_query_rules in v1.1.0 #define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES_V1_1_0 "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" @@ -85,7 +85,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER; #define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES_V1_2_0a "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" // mysql_query_rules in v1.2.0g -#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES_V1_2_0g "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT , digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" +#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES_V1_2_0g "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT , digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , retries INT CHECK (retries>=0 AND retries <=1000) , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" #define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY , variable_value VARCHAR NOT NULL)" @@ -97,7 +97,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER; #define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_REPLICATION_HOSTGROUPS "CREATE TABLE runtime_mysql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , UNIQUE (reader_hostgroup))" -#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_QUERY_RULES "CREATE TABLE runtime_mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT , digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" +#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_QUERY_RULES "CREATE TABLE runtime_mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT , digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , retries INT CHECK (retries>=0 AND retries <=1000) , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" #define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY , hits INT NOT NULL)" #define STATS_SQLITE_TABLE_MYSQL_COMMANDS_COUNTERS "CREATE TABLE stats_mysql_commands_counters (Command VARCHAR NOT NULL PRIMARY KEY , Total_Time_us INT NOT NULL , Total_cnt INT NOT NULL , cnt_100us INT NOT NULL , cnt_500us INT NOT NULL , cnt_1ms INT NOT NULL , cnt_5ms INT NOT NULL , cnt_10ms INT NOT NULL , cnt_50ms INT NOT NULL , cnt_100ms INT NOT NULL , cnt_500ms INT NOT NULL , cnt_1s INT NOT NULL , cnt_5s INT NOT NULL , cnt_10s INT NOT NULL , cnt_INFs)" @@ -3029,15 +3029,15 @@ void ProxySQL_Admin::save_mysql_query_rules_from_runtime(bool _runtime) { //char *a=(char *)"INSERT INTO mysql_query_rules VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")"; char *a=NULL; if (_runtime) { - a=(char *)"INSERT INTO runtime_mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; + a=(char *)"INSERT INTO runtime_mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; } else { - a=(char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; + a=(char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; } for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; int arg_len=0; - char *buffs[24]; - for (int i=0; i<24; i++) { + char *buffs[25]; + for (int i=0; i<25; i++) { if (r->fields[i]) { int l=strlen(r->fields[i])+4; arg_len+=l; @@ -3071,16 +3071,17 @@ void ProxySQL_Admin::save_mysql_query_rules_from_runtime(bool _runtime) { ( strcmp(r->fields[15],"-1")==0 ? "NULL" : r->fields[15] ), // cache_ttl ( strcmp(r->fields[16],"-1")==0 ? "NULL" : r->fields[16] ), // reconnect ( strcmp(r->fields[17],"-1")==0 ? "NULL" : r->fields[17] ), // timeout - ( strcmp(r->fields[18],"-1")==0 ? "NULL" : r->fields[18] ), // delay - ( strcmp(r->fields[19],"-1")==0 ? "NULL" : r->fields[19] ), // mirror_flagOUT - ( strcmp(r->fields[20],"-1")==0 ? "NULL" : r->fields[20] ), // mirror_hostgroup - buffs[21], // error_msg - ( strcmp(r->fields[22],"-1")==0 ? "NULL" : r->fields[22] ), // log - ( strcmp(r->fields[23],"-1")==0 ? "NULL" : r->fields[23] ) // apply + ( strcmp(r->fields[18],"-1")==0 ? "NULL" : r->fields[18] ), // retries + ( strcmp(r->fields[19],"-1")==0 ? "NULL" : r->fields[19] ), // delay + ( strcmp(r->fields[20],"-1")==0 ? "NULL" : r->fields[20] ), // mirror_flagOUT + ( strcmp(r->fields[21],"-1")==0 ? "NULL" : r->fields[21] ), // mirror_hostgroup + buffs[22], // error_msg + ( strcmp(r->fields[23],"-1")==0 ? "NULL" : r->fields[23] ), // log + ( strcmp(r->fields[24],"-1")==0 ? "NULL" : r->fields[24] ) // apply ); //fprintf(stderr,"%s\n",query); admindb->execute(query); - for (int i=0; i<24; i++) { + for (int i=0; i<25; i++) { free(buffs[i]); } free(query); @@ -3614,7 +3615,7 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() { int affected_rows=0; if (GloQPro==NULL) return (char *)"Global Query Processor not started: command impossible to run"; SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply FROM main.mysql_query_rules WHERE active=1"; + char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply FROM main.mysql_query_rules WHERE active=1"; admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { proxy_error("Error on %s : %s\n", query, error); @@ -3643,12 +3644,13 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() { (r->fields[14]==NULL ? -1 : atol(r->fields[14])), // cache_ttl (r->fields[15]==NULL ? -1 : atol(r->fields[15])), // reconnect (r->fields[16]==NULL ? -1 : atol(r->fields[16])), // timeout - (r->fields[17]==NULL ? -1 : atol(r->fields[17])), // delay - (r->fields[18]==NULL ? -1 : atol(r->fields[18])), // mirror_flagOUT - (r->fields[19]==NULL ? -1 : atol(r->fields[19])), // mirror_hostgroup - r->fields[20], // error_msg - (r->fields[21]==NULL ? -1 : atol(r->fields[21])), // log - (atoi(r->fields[22])==1 ? true : false) + (r->fields[17]==NULL ? -1 : atol(r->fields[17])), // retries + (r->fields[18]==NULL ? -1 : atol(r->fields[18])), // delay + (r->fields[19]==NULL ? -1 : atol(r->fields[19])), // mirror_flagOUT + (r->fields[20]==NULL ? -1 : atol(r->fields[20])), // mirror_hostgroup + r->fields[21], // error_msg + (r->fields[22]==NULL ? -1 : atol(r->fields[22])), // log + (atoi(r->fields[23])==1 ? true : false) ); GloQPro->insert(nqpr, false); } @@ -3752,7 +3754,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { int i; int rows=0; admindb->execute("PRAGMA foreign_keys = OFF"); - char *q=(char *)"INSERT OR REPLACE INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply) VALUES (%d, %d, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d)"; + char *q=(char *)"INSERT OR REPLACE INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, mirror_flagOUT, mirror_hostgroup, error_msg, log, apply) VALUES (%d, %d, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d)"; for (i=0; i< count; i++) { const Setting &rule = mysql_query_rules[i]; int rule_id; @@ -3793,6 +3795,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { int cache_ttl=-1; int reconnect=-1; int timeout=-1; + int retries=-1; int delay=-1; bool error_msg_exists=false; std::string error_msg; @@ -3825,6 +3828,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { rule.lookupValue("cache_ttl", cache_ttl); rule.lookupValue("reconnect", reconnect); rule.lookupValue("timeout", timeout); + rule.lookupValue("retries", retries); rule.lookupValue("delay", delay); if (rule.lookupValue("error_msg", username)) error_msg_exists=true; @@ -3855,6 +3859,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { strlen(std::to_string(timeout).c_str()) + 4 + strlen(std::to_string(mirror_flagOUT).c_str()) + 4 + strlen(std::to_string(mirror_hostgroup).c_str()) + 4 + + strlen(std::to_string(retries).c_str()) + 4 + strlen(std::to_string(delay).c_str()) + 4 + ( error_msg_exists ? strlen(error_msg.c_str()) : 0 ) + 4 + strlen(std::to_string(log).c_str()) + 4 + @@ -3917,6 +3922,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { ( cache_ttl >= 0 ? std::to_string(cache_ttl).c_str() : "NULL") , ( reconnect >= 0 ? std::to_string(reconnect).c_str() : "NULL") , ( timeout >= 0 ? std::to_string(timeout).c_str() : "NULL") , + ( retries >= 0 ? std::to_string(retries).c_str() : "NULL") , ( delay >= 0 ? std::to_string(delay).c_str() : "NULL") , ( mirror_flagOUT >= 0 ? std::to_string(mirror_flagOUT).c_str() : "NULL") , ( mirror_hostgroup >= 0 ? std::to_string(mirror_hostgroup).c_str() : "NULL") , diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index 801974c3a..6f40f3851 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -37,7 +37,7 @@ class QP_rule_text { char **pta; int num_fields; QP_rule_text(QP_rule_t *QPr) { - num_fields=25; + num_fields=26; pta=NULL; pta=(char **)malloc(sizeof(char *)*num_fields); itostr(pta[0], (long long)QPr->rule_id); @@ -66,13 +66,14 @@ class QP_rule_text { itostr(pta[15], (long long)QPr->cache_ttl); itostr(pta[16], (long long)QPr->reconnect); itostr(pta[17], (long long)QPr->timeout); - itostr(pta[18], (long long)QPr->delay); - itostr(pta[19], (long long)QPr->mirror_flagOUT); - itostr(pta[20], (long long)QPr->mirror_hostgroup); - pta[21]=strdup_null(QPr->error_msg); - itostr(pta[22], (long long)QPr->log); - itostr(pta[23], (long long)QPr->apply); - itostr(pta[24], (long long)QPr->hits); + itostr(pta[18], (long long)QPr->retries); + itostr(pta[19], (long long)QPr->delay); + itostr(pta[20], (long long)QPr->mirror_flagOUT); + itostr(pta[21], (long long)QPr->mirror_hostgroup); + pta[22]=strdup_null(QPr->error_msg); + itostr(pta[23], (long long)QPr->log); + itostr(pta[24], (long long)QPr->apply); + itostr(pta[25], (long long)QPr->hits); } ~QP_rule_text() { for(int i=0; irule_id=rule_id; newQR->active=active; @@ -393,6 +394,7 @@ QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *user newQR->cache_ttl=cache_ttl; newQR->reconnect=reconnect; newQR->timeout=timeout; + newQR->retries=retries; newQR->delay=delay; newQR->mirror_flagOUT=mirror_flagOUT; newQR->mirror_hostgroup=mirror_hostgroup; @@ -503,7 +505,7 @@ SQLite3_result * Query_Processor::get_stats_query_rules() { SQLite3_result * Query_Processor::get_current_query_rules() { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query rules, using Global version %d\n", version); - SQLite3_result *result=new SQLite3_result(25); + SQLite3_result *result=new SQLite3_result(26); spin_rdlock(&rwlock); QP_rule_t *qr1; result->add_column_definition(SQLITE_TEXT,"rule_id"); @@ -524,6 +526,7 @@ SQLite3_result * Query_Processor::get_current_query_rules() { result->add_column_definition(SQLITE_TEXT,"cache_ttl"); result->add_column_definition(SQLITE_TEXT,"reconnect"); result->add_column_definition(SQLITE_TEXT,"timeout"); + result->add_column_definition(SQLITE_TEXT,"retries"); result->add_column_definition(SQLITE_TEXT,"delay"); result->add_column_definition(SQLITE_TEXT,"mirror_flagOUT"); result->add_column_definition(SQLITE_TEXT,"mirror_hostgroup"); @@ -633,7 +636,7 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses ( qr1->digest ? buf : NULL ) , qr1->match_digest, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, - qr1->cache_ttl, qr1->reconnect, qr1->timeout, qr1->delay, qr1->mirror_flagOUT, qr1->mirror_hostgroup, + qr1->cache_ttl, qr1->reconnect, qr1->timeout, qr1->retries, qr1->delay, qr1->mirror_flagOUT, qr1->mirror_hostgroup, qr1->error_msg, qr1->log, qr1->apply); qr2->parent=qr1; // pointer to parent to speed up parent update (hits) if (qr2->match_digest) { @@ -762,6 +765,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set timeout: %d. Query will%s be interrupted if exceeding %dms\n", qr->rule_id, qr->timeout, (qr->timeout == 0 ? " NOT" : "" ) , qr->timeout); ret->timeout=qr->timeout; } + if (qr->retries >= 0) { + // Note: negative retries means this rule doesn't change + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set retries: %d. Query will%s be re-executed %d times in case of failure\n", qr->rule_id, qr->retries); + ret->retries=qr->retries; + } if (qr->delay >= 0) { // Note: negative delay means this rule doesn't change proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set delay: %d. Session will%s be paused for %dms\n", qr->rule_id, qr->delay, (qr->delay == 0 ? " NOT" : "" ) , qr->delay); @@ -1130,6 +1138,12 @@ bool Query_Processor::query_parser_first_comment(Query_Processor_Output *qpo, ch qpo->delay=t; } } + if (!strcasecmp(key,"query_retries")) { + if (c >= '0' && c <= '9') { // it is a digit + int t=atoi(value); + qpo->retries=t; + } + } if (!strcasecmp(key,"query_timeout")) { if (c >= '0' && c <= '9') { // it is a digit int t=atoi(value); diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index dd0048019..59a5f69d0 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -108,6 +108,7 @@ MySQL_Data_Stream::MySQL_Data_Stream() { mysql_real_query.QueryPtr=NULL; mysql_real_query.QuerySize=0; + query_retries_on_failure=0; connect_retries_on_failure=0; max_connect_time=0; wait_until=0;