Added column mirror_flagOUT in mysql_query_rules

Mirroring starts when either mirror_hostgroup or mirror_flagOUT is set
pull/525/head
René Cannaò 10 years ago
parent 2e276f06f4
commit bc8c5e175a

@ -95,6 +95,7 @@ class MySQL_Session
int current_hostgroup;
int default_hostgroup;
int mirror_hostgroup;
int mirror_flagOUT;
int active_transactions;
int autocommit_on_hostgroup;
char * default_schema;

@ -73,6 +73,7 @@ struct _Query_Processor_rule_t {
int timeout;
int delay;
int mirror_hostgroup;
int mirror_flagOUT;
char *error_msg;
bool apply;
void *regex_engine1;
@ -102,6 +103,7 @@ class Query_Processor_Output {
unsigned int size;
int destination_hostgroup;
int mirror_hostgroup;
int mirror_flagOUT;
int cache_ttl;
int reconnect;
int timeout;
@ -119,6 +121,7 @@ class Query_Processor_Output {
size=0;
destination_hostgroup=-1;
mirror_hostgroup=-1;
mirror_flagOUT=-1;
cache_ttl=-1;
reconnect=-1;
timeout=-1;
@ -210,7 +213,7 @@ class Query_Processor {
void wrunlock(); // explicit write unlock
bool insert(QP_rule_t *qr, bool lock=true); // insert a new rule. Uses a generic void pointer to a structure that may vary depending from the Query Processor
// virtual bool insert_locked(QP_rule_t *qr) {return false;}; // call this instead of insert() in case lock was already acquired via wrlock()
QP_rule_t * new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int delay, int mirror_hostgroup, char *error_msg, bool apply); // to use a generic query rule struct, this is generated by this function and returned as generic void pointer
QP_rule_t * new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int delay, int mirror_hostgroup, int mirror_flagOUT, char *error_msg, bool apply); // to use a generic query rule struct, this is generated by this function and returned as generic void pointer
void delete_query_rule(QP_rule_t *qr); // destructor
//virtual bool remove(int rule_id, bool lock=true) {return false;}; // FIXME: not implemented yet, should be implemented at all ?
// virtual bool remove_locked(int rule_id) {return false;}; // call this instead of remove() in case lock was already acquired via wrlock()

@ -218,6 +218,8 @@ MySQL_Session::MySQL_Session() {
current_hostgroup=-1;
default_hostgroup=-1;
mirror_hostgroup=-1;
mirror_flagOUT=-1;
transaction_persistent_hostgroup=-1;
transaction_persistent=false;
active_transactions=0;
@ -674,11 +676,14 @@ __get_pkts_from_client:
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt);
if (rc_break==true) { break; }
if (mirror==true) {
default_hostgroup=mirror_hostgroup;
}
//if (mirror==true) {
// // this is not required anymore, because now
// // GloQPro->process_mysql_query() knows if we are inside a mirror session
// // and changes qpo->default_hostgroup
// default_hostgroup=mirror_hostgroup;
//}
if (mirror==false) {
if (qpo->mirror_hostgroup >= 0 && pkt.size < 15*1024*1024 ) {
if (pkt.size < 15*1024*1024 && (qpo->mirror_hostgroup >= 0 || qpo->mirror_flagOUT >= 0)) {
MySQL_Session *newsess=new MySQL_Session();
newsess->client_myds = new MySQL_Data_Stream();
newsess->client_myds->DSS=STATE_SLEEP;
@ -693,6 +698,7 @@ __get_pkts_from_client:
newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess);
newsess->to_process=1;
newsess->mirror_hostgroup=qpo->mirror_hostgroup; // in the new session we copy the mirror hostgroup
newsess->mirror_flagOUT=qpo->mirror_flagOUT; // in the new session we copy the mirror flagOUT
newsess->default_schema=strdup(default_schema);
newsess->mirror=true;
newsess->mirrorPkt.size=pkt.size;
@ -1775,7 +1781,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
delete qpo->new_query;
}
if (mirror==true) { // for mirror session we exit here
current_hostgroup=mirror_hostgroup;
current_hostgroup=qpo->destination_hostgroup;
return false;
}
if (qpo->cache_ttl>0) {

@ -58,7 +58,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers (hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_MYSQL_USERS "CREATE TABLE mysql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , default_schema VARCHAR , schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))"
#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)"
#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)"
#define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY , variable_value VARCHAR NOT NULL)"
#define ADMIN_SQLITE_TABLE_MYSQL_REPLICATION_HOSTGROUPS "CREATE TABLE mysql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , UNIQUE (reader_hostgroup))"
@ -69,7 +69,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_REPLICATION_HOSTGROUPS "CREATE TABLE runtime_mysql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , UNIQUE (reader_hostgroup))"
#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_QUERY_RULES "CREATE TABLE runtime_mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)"
#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_QUERY_RULES "CREATE TABLE runtime_mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)"
#define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY , hits INT NOT NULL)"
#define STATS_SQLITE_TABLE_MYSQL_COMMANDS_COUNTERS "CREATE TABLE stats_mysql_commands_counters (Command VARCHAR NOT NULL PRIMARY KEY , Total_Time_us INT NOT NULL , Total_cnt INT NOT NULL , cnt_100us INT NOT NULL , cnt_500us INT NOT NULL , cnt_1ms INT NOT NULL , cnt_5ms INT NOT NULL , cnt_10ms INT NOT NULL , cnt_50ms INT NOT NULL , cnt_100ms INT NOT NULL , cnt_500ms INT NOT NULL , cnt_1s INT NOT NULL , cnt_5s INT NOT NULL , cnt_10s INT NOT NULL , cnt_INFs)"
@ -2883,15 +2883,15 @@ void ProxySQL_Admin::save_mysql_query_rules_from_runtime(bool _runtime) {
//char *a=(char *)"INSERT INTO mysql_query_rules VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")";
char *a=NULL;
if (_runtime) {
a=(char *)"INSERT INTO runtime_mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)";
a=(char *)"INSERT INTO runtime_mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)";
} else {
a=(char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)";
a=(char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, apply) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)";
}
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
char *buffs[18];
for (int i=0; i<18; i++) {
char *buffs[19];
for (int i=0; i<19; i++) {
if (r->fields[i]) {
int l=strlen(r->fields[i])+4;
arg_len+=l;
@ -2922,13 +2922,14 @@ void ProxySQL_Admin::save_mysql_query_rules_from_runtime(bool _runtime) {
( strcmp(r->fields[12],"-1")==0 ? "NULL" : r->fields[12] ), // reconnect
( strcmp(r->fields[13],"-1")==0 ? "NULL" : r->fields[13] ), // timeout
( strcmp(r->fields[14],"-1")==0 ? "NULL" : r->fields[14] ), // delay
( strcmp(r->fields[15],"-1")==0 ? "NULL" : r->fields[15] ), // mirror_hostgroup
buffs[16], // error_msg
( strcmp(r->fields[17],"-1")==0 ? "NULL" : r->fields[17] ) // apply
( strcmp(r->fields[15],"-1")==0 ? "NULL" : r->fields[15] ), // mirror_flagOUT
( strcmp(r->fields[16],"-1")==0 ? "NULL" : r->fields[16] ), // mirror_hostgroup
buffs[17], // error_msg
( strcmp(r->fields[18],"-1")==0 ? "NULL" : r->fields[18] ) // apply
);
//fprintf(stderr,"%s\n",query);
admindb->execute(query);
for (int i=0; i<18; i++) {
for (int i=0; i<19; i++) {
free(buffs[i]);
}
free(query);
@ -3445,7 +3446,7 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() {
int affected_rows=0;
if (GloQPro==NULL) return (char *)"Global Query Processor not started: command impossible to run";
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply FROM main.mysql_query_rules WHERE active=1";
char *query=(char *)"SELECT rule_id, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, apply FROM main.mysql_query_rules WHERE active=1";
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
@ -3471,9 +3472,10 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() {
(r->fields[11]==NULL ? -1 : atol(r->fields[11])),
(r->fields[12]==NULL ? -1 : atol(r->fields[12])),
(r->fields[13]==NULL ? -1 : atol(r->fields[13])),
(r->fields[14]==NULL ? -1 : atol(r->fields[14])), // mirror_hostgroup
r->fields[15], // error_msg
(atoi(r->fields[16])==1 ? true : false)
(r->fields[14]==NULL ? -1 : atol(r->fields[14])), // mirror_flagOUT
(r->fields[15]==NULL ? -1 : atol(r->fields[15])), // mirror_hostgroup
r->fields[16], // error_msg
(atoi(r->fields[17])==1 ? true : false)
);
GloQPro->insert(nqpr, false);
}
@ -3577,7 +3579,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() {
int i;
int rows=0;
admindb->execute("PRAGMA foreign_keys = OFF");
char *q=(char *)"INSERT OR REPLACE INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_hostgroup, error_msg, apply) VALUES (%d, %d, %s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d)";
char *q=(char *)"INSERT OR REPLACE INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, match_digest, match_pattern, negate_match_pattern, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, delay, mirror_flagOUT, mirror_hostgroup, error_msg, apply) VALUES (%d, %d, %s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d)";
for (i=0; i< count; i++) {
const Setting &rule = mysql_query_rules[i];
int rule_id;
@ -3596,6 +3598,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() {
bool replace_pattern_exists=false;
std::string replace_pattern;
int destination_hostgroup=-1;
int mirror_flagOUT=-1;
int mirror_hostgroup=-1;
int cache_ttl=-1;
int reconnect=-1;
@ -3615,6 +3618,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() {
rule.lookupValue("flagOUT", flagOUT);
if (rule.lookupValue("replace_pattern", replace_pattern)) replace_pattern_exists=true;
rule.lookupValue("destination_hostgroup", destination_hostgroup);
rule.lookupValue("mirror_flagOUT", mirror_flagOUT);
rule.lookupValue("mirror_hostgroup", mirror_hostgroup);
rule.lookupValue("cache_ttl", cache_ttl);
rule.lookupValue("reconnect", reconnect);
@ -3639,6 +3643,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() {
strlen(std::to_string(cache_ttl).c_str()) + 4 +
strlen(std::to_string(reconnect).c_str()) + 4 +
strlen(std::to_string(timeout).c_str()) + 4 +
strlen(std::to_string(mirror_flagOUT).c_str()) + 4 +
strlen(std::to_string(mirror_hostgroup).c_str()) + 4 +
strlen(std::to_string(delay).c_str()) + 4 +
( error_msg_exists ? strlen(error_msg.c_str()) : 0 ) + 4 +
@ -3684,6 +3689,7 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() {
( reconnect >= 0 ? std::to_string(reconnect).c_str() : "NULL") ,
( timeout >= 0 ? std::to_string(timeout).c_str() : "NULL") ,
( delay >= 0 ? std::to_string(delay).c_str() : "NULL") ,
( mirror_flagOUT >= 0 ? std::to_string(mirror_flagOUT).c_str() : "NULL") ,
( mirror_hostgroup >= 0 ? std::to_string(mirror_hostgroup).c_str() : "NULL") ,
error_msg.c_str(),
( apply == 0 ? 0 : 1)

@ -37,7 +37,7 @@ class QP_rule_text {
char **pta;
int num_fields;
QP_rule_text(QP_rule_t *QPr) {
num_fields=19;
num_fields=20;
pta=NULL;
pta=(char **)malloc(sizeof(char *)*num_fields);
itostr(pta[0], (long long)QPr->rule_id);
@ -55,10 +55,11 @@ class QP_rule_text {
itostr(pta[12], (long long)QPr->reconnect);
itostr(pta[13], (long long)QPr->timeout);
itostr(pta[14], (long long)QPr->delay);
itostr(pta[15], (long long)QPr->mirror_hostgroup);
pta[16]=strdup_null(QPr->error_msg);
itostr(pta[17], (long long)QPr->apply);
itostr(pta[18], (long long)QPr->hits);
itostr(pta[15], (long long)QPr->mirror_flagOUT);
itostr(pta[16], (long long)QPr->mirror_hostgroup);
pta[17]=strdup_null(QPr->error_msg);
itostr(pta[18], (long long)QPr->apply);
itostr(pta[19], (long long)QPr->hits);
}
~QP_rule_text() {
for(int i=0; i<num_fields; i++) {
@ -424,7 +425,7 @@ void Query_Processor::wrunlock() {
QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int delay, int mirror_hostgroup, char *error_msg, bool apply) {
QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *match_digest, char *match_pattern, bool negate_match_pattern, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int delay, int mirror_flagOUT, int mirror_hostgroup, char *error_msg, bool apply) {
QP_rule_t * newQR=(QP_rule_t *)malloc(sizeof(QP_rule_t));
newQR->rule_id=rule_id;
newQR->active=active;
@ -441,6 +442,7 @@ QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *user
newQR->reconnect=reconnect;
newQR->timeout=timeout;
newQR->delay=delay;
newQR->mirror_flagOUT=mirror_flagOUT;
newQR->mirror_hostgroup=mirror_hostgroup;
newQR->error_msg=(error_msg ? strdup(error_msg) : NULL);
newQR->apply=apply;
@ -535,7 +537,7 @@ SQLite3_result * Query_Processor::get_stats_query_rules() {
SQLite3_result * Query_Processor::get_current_query_rules() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query rules, using Global version %d\n", version);
SQLite3_result *result=new SQLite3_result(19);
SQLite3_result *result=new SQLite3_result(20);
spin_rdlock(&rwlock);
QP_rule_t *qr1;
result->add_column_definition(SQLITE_TEXT,"rule_id");
@ -553,6 +555,7 @@ SQLite3_result * Query_Processor::get_current_query_rules() {
result->add_column_definition(SQLITE_TEXT,"reconnect");
result->add_column_definition(SQLITE_TEXT,"timeout");
result->add_column_definition(SQLITE_TEXT,"delay");
result->add_column_definition(SQLITE_TEXT,"mirror_flagOUT");
result->add_column_definition(SQLITE_TEXT,"mirror_hostgroup");
result->add_column_definition(SQLITE_TEXT,"error_msg");
result->add_column_definition(SQLITE_TEXT,"apply");
@ -647,7 +650,7 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
qr1=*it;
if (qr1->active) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Copying Query Rule id: %d\n", qr1->rule_id);
qr2=new_query_rule(qr1->rule_id, qr1->active, qr1->username, qr1->schemaname, qr1->flagIN, qr1->match_digest, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, qr1->cache_ttl, qr1->reconnect, qr1->timeout, qr1->delay, qr1->mirror_hostgroup, qr1->error_msg, qr1->apply);
qr2=new_query_rule(qr1->rule_id, qr1->active, qr1->username, qr1->schemaname, qr1->flagIN, qr1->match_digest, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, qr1->cache_ttl, qr1->reconnect, qr1->timeout, qr1->delay, qr1->mirror_flagOUT, qr1->mirror_hostgroup, qr1->error_msg, qr1->apply);
qr2->parent=qr1; // pointer to parent to speed up parent update (hits)
if (qr2->match_digest) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Compiling regex for rule_id: %d, match_digest: \n", qr2->rule_id, qr2->match_digest);
@ -665,6 +668,20 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
QP_rule_t *qr;
re2_t *re2p;
int flagIN=0;
if (sess->mirror==true) {
// we are into a mirror session
// we immediately set a destination_hostgroup
ret->destination_hostgroup=sess->mirror_hostgroup;
if (sess->mirror_flagOUT != -1) {
// the original session has set a mirror flagOUT
flagIN=sess->mirror_flagOUT;
} else {
// the original session did NOT set any mirror flagOUT
// so we exit here
// the only thing set so far is destination_hostgroup
goto __exit_process_mysql_query;
}
}
for (std::vector<QP_rule_t *>::iterator it=_thr_SQP_rules->begin(); it!=_thr_SQP_rules->end(); ++it) {
qr=*it;
if (qr->flagIN != flagIN) {
@ -762,6 +779,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set delay: %d. Session will%s be paused for %dms\n", qr->rule_id, qr->delay, (qr->delay == 0 ? " NOT" : "" ) , qr->delay);
ret->delay=qr->delay;
}
if (qr->mirror_flagOUT >= 0) {
// Note: negative mirror_flagOUT means this rule doesn't change the mirror flagOUT
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror flagOUT: %d\n", qr->rule_id, qr->mirror_flagOUT);
ret->mirror_flagOUT=qr->mirror_flagOUT;
}
if (qr->mirror_hostgroup >= 0) {
// Note: negative mirror_hostgroup means this rule doesn't change the mirror
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror hostgroup: %d. A new session will be created\n", qr->rule_id, qr->mirror_hostgroup);
@ -798,9 +820,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
__exit_process_mysql_query:
// FIXME : there is too much data being copied around
l_free(len+1,query);
if (qp && qp->first_comment) {
// we have a comment to parse
query_parser_first_comment(ret, qp->first_comment);
if (sess->mirror==false) { // we process comments only on original queries, not on mirrors
if (qp && qp->first_comment) {
// we have a comment to parse
query_parser_first_comment(ret, qp->first_comment);
}
}
return ret;
};

Loading…
Cancel
Save