|
|
|
|
@ -30,6 +30,7 @@ static re2_t * compile_query_rule(QP_rule_t *qr) {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static void __delete_query_rule(QP_rule_t *qr) {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Deleting rule in %p : rule_id:%d, active:%d, username=%s, schemaname=%s, flagIN:%d, %smatch_pattern=\"%s\", flagOUT:%d replace_pattern=\"%s\", destination_hostgroup:%d, apply:%d\n", qr, qr->rule_id, qr->active, qr->username, qr->schemaname, qr->flagIN, (qr->negate_match_pattern ? "(!)" : "") , qr->match_pattern, qr->flagOUT, qr->replace_pattern, qr->destination_hostgroup, qr->apply);
|
|
|
|
|
if (qr->username)
|
|
|
|
|
free(qr->username);
|
|
|
|
|
if (qr->schemaname)
|
|
|
|
|
@ -47,7 +48,11 @@ static void __delete_query_rule(QP_rule_t *qr) {
|
|
|
|
|
free(qr);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// delete all the query rules in a Query Processor Table
|
|
|
|
|
// Note that this function is called by GloQPro with &rules (generic table)
|
|
|
|
|
// and is called by each mysql thread with _thr_SQP_rules (per thread table)
|
|
|
|
|
static void __reset_rules(std::vector<QP_rule_t *> * qrs) {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Resetting rules in Query Processor Table %p\n", qrs);
|
|
|
|
|
if (qrs==NULL) return;
|
|
|
|
|
QP_rule_t *qr;
|
|
|
|
|
for (std::vector<QP_rule_t *>::iterator it=qrs->begin(); it!=qrs->end(); ++it) {
|
|
|
|
|
@ -58,6 +63,7 @@ static void __reset_rules(std::vector<QP_rule_t *> * qrs) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// per thread variables
|
|
|
|
|
__thread unsigned int _thr_SQP_version;
|
|
|
|
|
__thread std::vector<QP_rule_t *> * _thr_SQP_rules;
|
|
|
|
|
|
|
|
|
|
@ -72,6 +78,7 @@ protected:
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
Standard_Query_Processor() {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Initializing Query Processor with version=0\n");
|
|
|
|
|
spinlock_rwlock_init(&rwlock);
|
|
|
|
|
version=0;
|
|
|
|
|
};
|
|
|
|
|
@ -80,13 +87,16 @@ virtual ~Standard_Query_Processor() {
|
|
|
|
|
__reset_rules(&rules);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// This function is called by each thread when it starts. It create a Query Processor Table for each thread
|
|
|
|
|
virtual void init_thread() {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Initializing Per-Thread Query Processor Table with version=0\n");
|
|
|
|
|
_thr_SQP_version=0;
|
|
|
|
|
_thr_SQP_rules=new std::vector<QP_rule_t *>;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
virtual void end_thread() {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Destroying Per-Thread Query Processor Table with version=%d\n", _thr_SQP_version);
|
|
|
|
|
__reset_rules(_thr_SQP_rules);
|
|
|
|
|
delete _thr_SQP_rules;
|
|
|
|
|
};
|
|
|
|
|
@ -120,6 +130,7 @@ virtual QP_rule_t * new_query_rule(int rule_id, bool active, char *username, cha
|
|
|
|
|
newQR->cache_ttl=cache_ttl;
|
|
|
|
|
newQR->apply=apply;
|
|
|
|
|
newQR->regex_engine=NULL;
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Creating new rule in %p : rule_id:%d, active:%d, username=%s, schemaname=%s, flagIN:%d, %smatch_pattern=\"%s\", flagOUT:%d replace_pattern=\"%s\", destination_hostgroup:%d, apply:%d\n", newQR, newQR->rule_id, newQR->active, newQR->username, newQR->schemaname, newQR->flagIN, (newQR->negate_match_pattern ? "(!)" : "") , newQR->match_pattern, newQR->flagOUT, newQR->replace_pattern, newQR->destination_hostgroup, newQR->apply);
|
|
|
|
|
return newQR;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@ -164,13 +175,17 @@ virtual bool insert(QP_rule_t *qr, bool lock) {
|
|
|
|
|
|
|
|
|
|
virtual void sort(bool lock) {
|
|
|
|
|
if (lock) spin_wrlock(&rwlock);
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Sorting rules\n");
|
|
|
|
|
std::sort (rules.begin(), rules.end(), rules_sort_comp_function);
|
|
|
|
|
if (lock) spin_wrunlock(&rwlock);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// when commit is called, the version number is increased and the this will trigger the mysql threads to get a new Query Processor Table
|
|
|
|
|
// The operation is asynchronous
|
|
|
|
|
virtual void commit() {
|
|
|
|
|
spin_wrlock(&rwlock);
|
|
|
|
|
__sync_add_and_fetch(&version,1);
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Increasing version number to %d - all threads will notice this and refresh their rules\n", version);
|
|
|
|
|
spin_wrunlock(&rwlock);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@ -183,22 +198,25 @@ virtual QP_out_t * process_mysql_query(MySQL_Session *sess, void *ptr, unsigned
|
|
|
|
|
query[len]=0;
|
|
|
|
|
if (__sync_add_and_fetch(&version,0) > _thr_SQP_version) {
|
|
|
|
|
// update local rules;
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Detected a changed in version. Global:%d , local:%d . Refreshing...\n", version, _thr_SQP_version);
|
|
|
|
|
spin_rdlock(&rwlock);
|
|
|
|
|
_thr_SQP_version=__sync_add_and_fetch(&version,0);
|
|
|
|
|
__reset_rules(_thr_SQP_rules);
|
|
|
|
|
spin_rdunlock(&rwlock);
|
|
|
|
|
QP_rule_t *qr1;
|
|
|
|
|
QP_rule_t *qr2;
|
|
|
|
|
for (std::vector<QP_rule_t *>::iterator it=rules.begin(); it!=rules.end(); ++it) {
|
|
|
|
|
qr1=*it;
|
|
|
|
|
if (qr1->active) {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Copying Query Rule id: %d\n", qr1->rule_id);
|
|
|
|
|
qr2=new_query_rule(qr1->rule_id, qr1->active, qr1->username, qr1->schemaname, qr1->flagIN, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, qr1->cache_ttl, qr1->apply);
|
|
|
|
|
if (qr2->match_pattern) {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Compiling regex for rule_id: %d, match_pattern: \n", qr2->rule_id, qr2->match_pattern);
|
|
|
|
|
qr2->regex_engine=(void *)compile_query_rule(qr2);
|
|
|
|
|
}
|
|
|
|
|
_thr_SQP_rules->push_back(qr2);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
spin_rdunlock(&rwlock); // unlock should be after the copy
|
|
|
|
|
}
|
|
|
|
|
QP_rule_t *qr;
|
|
|
|
|
re2_t *re2p;
|
|
|
|
|
@ -218,6 +236,7 @@ __exit_process_mysql_query:
|
|
|
|
|
return ret;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// this function is called by mysql_session to free the result generated by process_mysql_query()
|
|
|
|
|
virtual void delete_QP_out(QP_out_t *o) {
|
|
|
|
|
l_free(sizeof(QP_out_t),o);
|
|
|
|
|
};
|
|
|
|
|
|