From 2e276f06f4fb8e14586c9bd7bc29be3ad7c6e5cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 10 Mar 2016 22:23:24 +0000 Subject: [PATCH] Second commit to implement mirroring Expanded table `mysql_query_rules` to add `mirror_hostgroup` Expanded struct `_Query_Processor_rule_t` and class `Query_Processor_Output` to support `mirror_hostgroup` Added new variable `mirror_hostgroup` in `MySQL_Session` The following logic ensure that a mirror_hostgroup set when processing the query in the original session is preserved when reprocessing the query in the mirror session: * when a new mirror session is *created* `mirror_hostgroup` is set according to the result of Query_Processor * when a mirror session is *executed* , `mirror_hostgroup` is copied into `default_hostgroup` Current limitations: * queries larger than 15MB aren't mirrorred * upgrade from 1.1 is still not possible it will wipe table mysql_query_rules --- include/MySQL_Session.h | 1 + include/query_processor.h | 5 ++++- lib/MySQL_Session.cpp | 13 +++++++++---- lib/ProxySQL_Admin.cpp | 32 +++++++++++++++++++------------- lib/Query_Processor.cpp | 28 +++++++++++++++++++++------- 5 files changed, 54 insertions(+), 25 deletions(-) diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 1470a28c6..00d841995 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -94,6 +94,7 @@ class MySQL_Session enum session_status status; int current_hostgroup; int default_hostgroup; + int mirror_hostgroup; int active_transactions; int autocommit_on_hostgroup; char * default_schema; diff --git a/include/query_processor.h b/include/query_processor.h index 4e3317cad..3fa5aeee2 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -72,6 +72,7 @@ struct _Query_Processor_rule_t { int reconnect; int timeout; int delay; + int mirror_hostgroup; char *error_msg; bool apply; void *regex_engine1; @@ -100,6 +101,7 @@ class Query_Processor_Output { void *ptr; unsigned int size; int destination_hostgroup; + int mirror_hostgroup; int cache_ttl; int reconnect; int timeout; @@ -116,6 +118,7 @@ class Query_Processor_Output { ptr=NULL; size=0; destination_hostgroup=-1; + mirror_hostgroup=-1; cache_ttl=-1; reconnect=-1; timeout=-1; @@ -207,7 +210,7 @@ class Query_Processor { void wrunlock(); // explicit write unlock bool insert(QP_rule_t *qr, bool lock=true); // insert a new rule. Uses a generic void pointer to a structure that may vary depending from the Query Processor // virtual bool insert_locked(QP_rule_t *qr) {return false;}; // call this instead of insert() in case lock was already acquired via wrlock() - QP_rule_t * new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int delay, char *error_msg, bool apply); // to use a generic query rule struct, this is generated by this function and returned as generic void pointer + QP_rule_t * new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int delay, int mirror_hostgroup, char *error_msg, bool apply); // to use a generic query rule struct, this is generated by this function and returned as generic void pointer void delete_query_rule(QP_rule_t *qr); // destructor //virtual bool remove(int rule_id, bool lock=true) {return false;}; // FIXME: not implemented yet, should be implemented at all ? // virtual bool remove_locked(int rule_id) {return false;}; // call this instead of remove() in case lock was already acquired via wrlock() diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 3c3a51e86..ed4d30388 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -674,10 +674,11 @@ __get_pkts_from_client: assert(qpo); // GloQPro->process_mysql_query() should always return a qpo rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); if (rc_break==true) { break; } - + if (mirror==true) { + default_hostgroup=mirror_hostgroup; + } if (mirror==false) { - if (pkt.size < 1000000 && CurrentQuery.is_select_NOT_for_update()==true) { - // this is a prototype for creating a mirror, only for SELECT + if (qpo->mirror_hostgroup >= 0 && pkt.size < 15*1024*1024 ) { MySQL_Session *newsess=new MySQL_Session(); newsess->client_myds = new MySQL_Data_Stream(); newsess->client_myds->DSS=STATE_SLEEP; @@ -691,7 +692,7 @@ __get_pkts_from_client: newsess->client_myds->attach_connection(myconn); newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess); newsess->to_process=1; - newsess->default_hostgroup=default_hostgroup; + newsess->mirror_hostgroup=qpo->mirror_hostgroup; // in the new session we copy the mirror hostgroup newsess->default_schema=strdup(default_schema); newsess->mirror=true; newsess->mirrorPkt.size=pkt.size; @@ -1773,6 +1774,10 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C CurrentQuery.begin((unsigned char *)pkt->ptr,pkt->size,true); delete qpo->new_query; } + if (mirror==true) { // for mirror session we exit here + current_hostgroup=mirror_hostgroup; + return false; + } if (qpo->cache_ttl>0) { uint32_t resbuf=0; unsigned char *aa=GloQC->get( diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 21603a533..d81d02b5a 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -58,7 +58,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER; #define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers (hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )" #define ADMIN_SQLITE_TABLE_MYSQL_USERS "CREATE TABLE mysql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , default_schema VARCHAR , schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))" -#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" +#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" #define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY , variable_value VARCHAR NOT NULL)" #define ADMIN_SQLITE_TABLE_MYSQL_REPLICATION_HOSTGROUPS "CREATE TABLE mysql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , UNIQUE (reader_hostgroup))" @@ -69,7 +69,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER; #define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_REPLICATION_HOSTGROUPS "CREATE TABLE runtime_mysql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , UNIQUE (reader_hostgroup))" -#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_QUERY_RULES "CREATE TABLE runtime_mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" +#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_QUERY_RULES "CREATE TABLE runtime_mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" #define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY , hits INT NOT NULL)" #define STATS_SQLITE_TABLE_MYSQL_COMMANDS_COUNTERS "CREATE TABLE stats_mysql_commands_counters (Command VARCHAR NOT NULL PRIMARY KEY , Total_Time_us INT NOT NULL , Total_cnt INT NOT NULL , cnt_100us INT NOT NULL , cnt_500us INT NOT NULL , cnt_1ms INT NOT NULL , cnt_5ms INT NOT NULL , cnt_10ms INT NOT NULL , cnt_50ms INT NOT NULL , cnt_100ms INT NOT NULL , cnt_500ms INT NOT NULL , cnt_1s INT NOT NULL , cnt_5s INT NOT NULL , cnt_10s INT NOT NULL , cnt_INFs)" @@ -2883,15 +2883,15 @@ void ProxySQL_Admin::save_mysql_query_rules_from_runtime(bool _runtime) { //char *a=(char *)"INSERT INTO mysql_query_rules VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")"; char *a=NULL; if (_runtime) { - a=(char *)"INSERT INTO runtime_mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; + a=(char *)"INSERT INTO runtime_mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; } else { - a=(char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; + a=(char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"; } for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; int arg_len=0; - char *buffs[17]; - for (int i=0; i<17; i++) { + char *buffs[18]; + for (int i=0; i<18; i++) { if (r->fields[i]) { int l=strlen(r->fields[i])+4; arg_len+=l; @@ -2922,12 +2922,13 @@ void ProxySQL_Admin::save_mysql_query_rules_from_runtime(bool _runtime) { ( strcmp(r->fields[12],"-1")==0 ? "NULL" : r->fields[12] ), // reconnect ( strcmp(r->fields[13],"-1")==0 ? "NULL" : r->fields[13] ), // timeout ( strcmp(r->fields[14],"-1")==0 ? "NULL" : r->fields[14] ), // delay - buffs[15], // error_msg - ( strcmp(r->fields[16],"-1")==0 ? "NULL" : r->fields[16] ) // apply + ( strcmp(r->fields[15],"-1")==0 ? "NULL" : r->fields[15] ), // mirror_hostgroup + buffs[16], // error_msg + ( strcmp(r->fields[17],"-1")==0 ? "NULL" : r->fields[17] ) // apply ); //fprintf(stderr,"%s\n",query); admindb->execute(query); - for (int i=0; i<16; i++) { + for (int i=0; i<18; i++) { free(buffs[i]); } free(query); @@ -3444,7 +3445,7 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() { int affected_rows=0; if (GloQPro==NULL) return (char *)"Global Query Processor not started: command impossible to run"; SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, error_msg, apply FROM main.mysql_query_rules WHERE active=1"; + char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply FROM main.mysql_query_rules WHERE active=1"; admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { proxy_error("Error on %s : %s\n", query, error); @@ -3470,8 +3471,9 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() { (r->fields[11]==NULL ? -1 : atol(r->fields[11])), (r->fields[12]==NULL ? -1 : atol(r->fields[12])), (r->fields[13]==NULL ? -1 : atol(r->fields[13])), - r->fields[14], // error_msg - (atoi(r->fields[15])==1 ? true : false) + (r->fields[14]==NULL ? -1 : atol(r->fields[14])), // mirror_hostgroup + r->fields[15], // error_msg + (atoi(r->fields[16])==1 ? true : false) ); GloQPro->insert(nqpr, false); } @@ -3575,7 +3577,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { int i; int rows=0; admindb->execute("PRAGMA foreign_keys = OFF"); - char *q=(char *)"INSERT OR REPLACE INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, error_msg, apply) VALUES (%d, %d, %s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %s, %s, %d)"; + char *q=(char *)"INSERT OR REPLACE INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply) VALUES (%d, %d, %s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d)"; for (i=0; i< count; i++) { const Setting &rule = mysql_query_rules[i]; int rule_id; @@ -3594,6 +3596,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { bool replace_pattern_exists=false; std::string replace_pattern; int destination_hostgroup=-1; + int mirror_hostgroup=-1; int cache_ttl=-1; int reconnect=-1; int timeout=-1; @@ -3612,6 +3615,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { rule.lookupValue("flagOUT", flagOUT); if (rule.lookupValue("replace_pattern", replace_pattern)) replace_pattern_exists=true; rule.lookupValue("destination_hostgroup", destination_hostgroup); + rule.lookupValue("mirror_hostgroup", mirror_hostgroup); rule.lookupValue("cache_ttl", cache_ttl); rule.lookupValue("reconnect", reconnect); rule.lookupValue("timeout", timeout); @@ -3635,6 +3639,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { strlen(std::to_string(cache_ttl).c_str()) + 4 + strlen(std::to_string(reconnect).c_str()) + 4 + strlen(std::to_string(timeout).c_str()) + 4 + + strlen(std::to_string(mirror_hostgroup).c_str()) + 4 + strlen(std::to_string(delay).c_str()) + 4 + ( error_msg_exists ? strlen(error_msg.c_str()) : 0 ) + 4 + strlen(std::to_string(apply).c_str()) + 4 + @@ -3679,6 +3684,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() { ( reconnect >= 0 ? std::to_string(reconnect).c_str() : "NULL") , ( timeout >= 0 ? std::to_string(timeout).c_str() : "NULL") , ( delay >= 0 ? std::to_string(delay).c_str() : "NULL") , + ( mirror_hostgroup >= 0 ? std::to_string(mirror_hostgroup).c_str() : "NULL") , error_msg.c_str(), ( apply == 0 ? 0 : 1) ); diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index a59b932c2..dddcb7477 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -37,7 +37,7 @@ class QP_rule_text { char **pta; int num_fields; QP_rule_text(QP_rule_t *QPr) { - num_fields=18; + num_fields=19; pta=NULL; pta=(char **)malloc(sizeof(char *)*num_fields); itostr(pta[0], (long long)QPr->rule_id); @@ -55,9 +55,10 @@ class QP_rule_text { itostr(pta[12], (long long)QPr->reconnect); itostr(pta[13], (long long)QPr->timeout); itostr(pta[14], (long long)QPr->delay); - pta[15]=strdup_null(QPr->error_msg); - itostr(pta[16], (long long)QPr->apply); - itostr(pta[17], (long long)QPr->hits); + itostr(pta[15], (long long)QPr->mirror_hostgroup); + pta[16]=strdup_null(QPr->error_msg); + itostr(pta[17], (long long)QPr->apply); + itostr(pta[18], (long long)QPr->hits); } ~QP_rule_text() { for(int i=0; irule_id=rule_id; newQR->active=active; @@ -440,6 +441,7 @@ QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *user newQR->reconnect=reconnect; newQR->timeout=timeout; newQR->delay=delay; + newQR->mirror_hostgroup=mirror_hostgroup; newQR->error_msg=(error_msg ? strdup(error_msg) : NULL); newQR->apply=apply; newQR->regex_engine1=NULL; @@ -533,7 +535,7 @@ SQLite3_result * Query_Processor::get_stats_query_rules() { SQLite3_result * Query_Processor::get_current_query_rules() { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query rules, using Global version %d\n", version); - SQLite3_result *result=new SQLite3_result(18); + SQLite3_result *result=new SQLite3_result(19); spin_rdlock(&rwlock); QP_rule_t *qr1; result->add_column_definition(SQLITE_TEXT,"rule_id"); @@ -551,6 +553,7 @@ SQLite3_result * Query_Processor::get_current_query_rules() { result->add_column_definition(SQLITE_TEXT,"reconnect"); result->add_column_definition(SQLITE_TEXT,"timeout"); result->add_column_definition(SQLITE_TEXT,"delay"); + result->add_column_definition(SQLITE_TEXT,"mirror_hostgroup"); result->add_column_definition(SQLITE_TEXT,"error_msg"); result->add_column_definition(SQLITE_TEXT,"apply"); result->add_column_definition(SQLITE_TEXT,"hits"); @@ -644,7 +647,7 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses qr1=*it; if (qr1->active) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Copying Query Rule id: %d\n", qr1->rule_id); - qr2=new_query_rule(qr1->rule_id, qr1->active, qr1->username, qr1->schemaname, qr1->flagIN, qr1->match_digest, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, qr1->cache_ttl, qr1->reconnect, qr1->timeout, qr1->delay, qr1->error_msg, qr1->apply); + qr2=new_query_rule(qr1->rule_id, qr1->active, qr1->username, qr1->schemaname, qr1->flagIN, qr1->match_digest, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, qr1->cache_ttl, qr1->reconnect, qr1->timeout, qr1->delay, qr1->mirror_hostgroup, qr1->error_msg, qr1->apply); qr2->parent=qr1; // pointer to parent to speed up parent update (hits) if (qr2->match_digest) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Compiling regex for rule_id: %d, match_digest: \n", qr2->rule_id, qr2->match_digest); @@ -759,6 +762,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set delay: %d. Session will%s be paused for %dms\n", qr->rule_id, qr->delay, (qr->delay == 0 ? " NOT" : "" ) , qr->delay); ret->delay=qr->delay; } + if (qr->mirror_hostgroup >= 0) { + // Note: negative mirror_hostgroup means this rule doesn't change the mirror + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror hostgroup: %d. A new session will be created\n", qr->rule_id, qr->mirror_hostgroup); + ret->mirror_hostgroup=qr->mirror_hostgroup; + } if (qr->error_msg) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->error_msg); proxy_warning("User \"%s\" has issued query that has been filtered: %s \n " , sess->client_myds->myconn->userinfo->username, query); @@ -1084,6 +1092,12 @@ bool Query_Processor::query_parser_first_comment(Query_Processor_Output *qpo, ch qpo->destination_hostgroup=t; } } + if (!strcasecmp(key,"mirror")) { + if (c >= '0' && c <= '9') { // it is a digit + int t=atoi(value); + qpo->mirror_hostgroup=t; + } + } } free(key); free(value);