Enhancement on Query Processor and draft statistics interface

- changed hits from uint64_t to int;
- QP_rule_t has a pointer to its parent : this is done to speed up lookup
- implemented get_current_query_rules() to execute SAVE MYSQL QUERY RULES FROM RUNTIME (issue #93)
- implemented get_stats_query_rules() to save statistics on stats_mysql_query_rules (issue #139)
- created 2 new classes to easily process statistics in Query Processor : QP_rule_text and QP_rule_text_hitsonly
- extended classes SQLite3_row and SQLite3_result
- every __admin_refresh_interval milliseconds Admin refreshes statistics
- defined table stats_mysql_query_rules
- deprecated __attach_configdb_to_admindb() , replaced with __attach_db_to_admindb (used in issue #142)
- to admindb are now attached both configdb and statsdb (issue #142)
- drafted a way to update hits on Query Processor . This method will go away when issue #146 will be implemented
- fixed crashing bug in issue #145
pull/190/head
René Cannaò 12 years ago
parent b01ab34e49
commit 322a4f021f

@ -0,0 +1,19 @@
This API should be simple.
Each module should collect statistics and stores them internally, preferibly
in a table format because this is how Admin will expect the results.
Each module should implement 2 functions:
- stats_get_tables_list() returns an array of pointer to variables name, where
the last element is NULL;
- stat_get_table() returns the content of such table in the form of a
SQLite3_result resultset
Important note:
The reads should be considered in read committed isolation level. It is not
possible to read multiple tables at the same time, therefore it is possible
that the relation between these tables are not consistent. Due the temporary
nature of statistics information, this glitch is absolutely expected.

@ -197,6 +197,7 @@ class MySQL_Connection_Pool;
class PtrArray;
class ProxySQL_ConfigFile;
class MySQL_Server;
class SQLite3_result;
//class MySQL_Servers;
class MySQL_Hostgroup_Entry;
class MySQL_Hostgroup;

@ -17,7 +17,8 @@ struct _Query_Processor_rule_t {
int cache_ttl;
bool apply;
void *regex_engine;
uint64_t hits;
int hits;
struct _Query_Processor_rule_t *parent; // pointer to parent, to speed up parent update
};
@ -59,8 +60,12 @@ class Query_Processor {
virtual void init_thread() {};
virtual void end_thread() {};
virtual void commit() {}; // this applies all the changes in memory
virtual SQLite3_result * get_current_query_rules() {return NULL;};
virtual SQLite3_result * get_stats_query_rules() {return NULL;};
};
typedef Query_Processor * create_Query_Processor_t();
#endif /* __CLASS_QUERY_PROCESSOR_H */

@ -47,6 +47,18 @@ class SQLite3_row {
//fields[i]=(sizes[i] ? strdup((char *)sqlite3_column_text(stmt,i)) : NULL);
}
};
void add_fields(char **_fields) {
int i;
for (i=0;i<cnt;i++) {
if (_fields[i]) {
sizes[i]=strlen(_fields[i]);
fields[i]=strdup(_fields[i]);
} else {
sizes[i]=0;
fields[i]=strdup((char *)"");
}
}
};
};
class SQLite3_column {
@ -83,7 +95,14 @@ class SQLite3_result {
row->add_fields(stmt);
rows.push_back(row);
rows_count++;
return SQLITE_ROW;
return SQLITE_ROW;
};
int add_row(char **_fields) {
SQLite3_row *row=new SQLite3_row(columns);
row->add_fields(_fields);
rows.push_back(row);
rows_count++;
return SQLITE_ROW;
};
SQLite3_result(sqlite3_stmt *stmt) {
rows_count=0;
@ -93,6 +112,10 @@ class SQLite3_result {
}
while (add_row(stmt)==SQLITE_ROW) {};
};
SQLite3_result(int num_columns) {
rows_count=0;
columns=num_columns;
};
~SQLite3_result() {
for (std::vector<SQLite3_column *>::iterator it = column_definition.begin() ; it != column_definition.end(); ++it) {
SQLite3_column *c=*it;

@ -20,6 +20,9 @@
static volatile int load_main_=0;
static volatile bool nostart_=false;
static int __admin_refresh_interval=0;
extern MySQL_Authentication *GloMyAuth;
extern ProxySQL_Admin *GloAdmin;
extern Query_Processor *GloQPro;
@ -42,6 +45,11 @@ pthread_mutex_t sock_mutex = PTHREAD_MUTEX_INITIALIZER;
#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0, username VARCHAR, schemaname VARCHAR, flagIN INT NOT NULL DEFAULT 0, match_pattern VARCHAR, negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0, flagOUT INT, replace_pattern VARCHAR, destination_hostgroup INT DEFAULT NULL, cache_ttl INT CHECK(cache_ttl > 0), apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0, FOREIGN KEY (destination_hostgroup) REFERENCES mysql_hostgroups (hostgroup_id))"
#define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY, variable_value VARCHAR NOT NULL)"
#define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY, hits INT NOT NULL)"
#ifdef DEBUG
#define ADMIN_SQLITE_TABLE_DEBUG_LEVELS "CREATE TABLE debug_levels (module VARCHAR NOT NULL PRIMARY KEY, verbosity INT NOT NULL DEFAULT 0)"
#endif /* DEBUG */
@ -171,7 +179,8 @@ class Standard_ProxySQL_Admin: public ProxySQL_Admin {
void __insert_or_replace_maintable_select_disktable();
void __delete_disktable();
void __insert_or_replace_disktable_select_maintable();
void __attach_configdb_to_admindb();
// void __attach_configdb_to_admindb();
void __attach_db_to_admindb(SQLite3DB *db, char *alias);
void __add_active_users(enum cred_username_type usertype);
void __delete_inactive_users(enum cred_username_type usertype);
@ -226,6 +235,7 @@ class Standard_ProxySQL_Admin: public ProxySQL_Admin {
void load_mysql_servers_to_runtime();
void save_mysql_servers_from_runtime();
char * load_mysql_query_rules_to_runtime();
void save_mysql_query_rules_from_runtime();
void load_admin_variables_to_runtime() { flush_admin_variables___database_to_runtime(admindb, true); }
void save_admin_variables_from_runtime() { flush_admin_variables___runtime_to_database(admindb, true, true, false); }
@ -233,6 +243,9 @@ class Standard_ProxySQL_Admin: public ProxySQL_Admin {
void load_mysql_variables_to_runtime() { flush_mysql_variables___database_to_runtime(admindb, true); }
void save_mysql_variables_from_runtime() { flush_mysql_variables___runtime_to_database(admindb, true, true, false); }
void stats___mysql_query_rules();
};
static Standard_ProxySQL_Admin *SPA=NULL;
@ -779,7 +792,6 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query
return false;
}
/*
if (
(query_no_space_length==strlen("SAVE MYSQL QUERY RULES TO MEMORY") && !strncasecmp("SAVE MYSQL QUERY RULES TO MEMORY",query_no_space, query_no_space_length))
||
@ -791,12 +803,11 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query
) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Received %s command\n", query_no_space);
Standard_ProxySQL_Admin *SPA=(Standard_ProxySQL_Admin *)pa;
SPA->save_mysql_users_runtime_to_database();
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Saved mysql users from RUNTIME\n");
SPA->save_mysql_query_rules_from_runtime();
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Saved mysql query rules from RUNTIME\n");
SPA->send_MySQL_OK(&sess->myprot_client, NULL);
return false;
}
*/
}
if ((query_no_space_length>21) && ( (!strncasecmp("SAVE ADMIN VARIABLES ", query_no_space, 21)) || (!strncasecmp("LOAD ADMIN VARIABLES ", query_no_space, 21))) ) {
@ -1018,6 +1029,9 @@ void *child_mysql(void *arg) {
//sess->myprot_client.generate_pkt_initial_handshake(sess->client_myds,true,NULL,NULL);
sess->myprot_client.generate_pkt_initial_handshake(true,NULL,NULL);
unsigned long oldtime=monotonic_time();
unsigned long curtime=monotonic_time();
while (__sync_fetch_and_add(&glovars.shutdown,0)==0) {
if (myds->available_data_out()) {
@ -1026,7 +1040,17 @@ void *child_mysql(void *arg) {
fds[0].events=POLLIN;
}
fds[0].revents=0;
rc=poll(fds,nfds,2000);
//rc=poll(fds,nfds,2000);
rc=poll(fds,nfds,__sync_fetch_and_add(&__admin_refresh_interval,0));
{
//FIXME: cleanup this block
curtime=monotonic_time();
if (curtime>oldtime+__admin_refresh_interval) {
oldtime=curtime;
Standard_ProxySQL_Admin *SPA=(Standard_ProxySQL_Admin *)GloAdmin;
SPA->stats___mysql_query_rules();
}
}
if (rc == -1) {
if (errno == EINTR) {
continue;
@ -1343,10 +1367,16 @@ bool Standard_ProxySQL_Admin::init() {
#endif /* DEBUG */
insert_into_tables_defs(tables_defs_stats,"mysql_query_rules", STATS_SQLITE_TABLE_MYSQL_QUERY_RULES);
check_and_build_standard_tables(admindb, tables_defs_admin);
check_and_build_standard_tables(configdb, tables_defs_config);
check_and_build_standard_tables(statsdb, tables_defs_stats);
__attach_configdb_to_admindb();
//__attach_configdb_to_admindb();
__attach_db_to_admindb(configdb, (char *)"disk");
__attach_db_to_admindb(statsdb, (char *)"stats");
#ifdef DEBUG
flush_debug_levels_runtime_to_database(configdb, false);
flush_debug_levels_runtime_to_database(admindb, true);
@ -1819,6 +1849,7 @@ bool Standard_ProxySQL_Admin::set_variable(char *name, char *value) { // this i
int intv=atoi(value);
if (intv > 100 && intv < 100000) {
variables.refresh_interval=intv;
__admin_refresh_interval=intv;
return true;
} else {
return false;
@ -1841,6 +1872,55 @@ bool Standard_ProxySQL_Admin::set_variable(char *name, char *value) { // this i
}
void Standard_ProxySQL_Admin::stats___mysql_query_rules() {
SQLite3_result * resultset=GloQPro->get_stats_query_rules();
if (resultset==NULL) return;
// fprintf(stderr,"Number of columns: %d, rows: %d\n", result->columns, result->rows_count);
statsdb->execute("DELETE FROM stats_mysql_query_rules");
char *a=(char *)"INSERT INTO stats_mysql_query_rules VALUES (\"%s\",\"%s\")";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
for (int i=0; i<2; i++) {
arg_len+=strlen(r->fields[i]);
}
char *query=(char *)malloc(strlen(a)+arg_len+32);
sprintf(query,a,r->fields[0],r->fields[1]);
//fprintf(stderr,"%s\n",query);
statsdb->execute(query);
free(query);
}
delete resultset;
}
void Standard_ProxySQL_Admin::save_mysql_query_rules_from_runtime() {
SQLite3_result * resultset=GloQPro->get_current_query_rules();
if (resultset==NULL) return;
// fprintf(stderr,"Number of columns: %d, rows: %d\n", result->columns, result->rows_count);
admindb->execute("DELETE FROM mysql_query_rules");
char *a=(char *)"INSERT INTO mysql_query_rules VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
for (int i=0; i<12; i++) {
arg_len+=strlen(r->fields[i]);
}
char *query=(char *)malloc(strlen(a)+arg_len+32);
sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5],r->fields[6],r->fields[7],r->fields[8],r->fields[9],r->fields[10],r->fields[11]);
fprintf(stderr,"%s\n",query);
admindb->execute(query);
free(query);
}
//admindb->execute("UPDATE mysql_query_rules SET username=NULL WHERE username=\"\"");
//admindb->execute("UPDATE mysql_query_rules SET schemaname=NULL WHERE schemaname=\"\"");
delete resultset;
}
void Standard_ProxySQL_Admin::flush_admin_variables___runtime_to_database(SQLite3DB *db, bool replace, bool del, bool onlyifempty) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Flushing ADMIN variables. Replace:%d, Delete:%d, Only_If_Empty:%d\n", replace, del, onlyifempty);
if (onlyifempty) {
@ -2039,13 +2119,21 @@ void Standard_ProxySQL_Admin::flush_mysql_query_rules__from_memory_to_disk() {
void Standard_ProxySQL_Admin::__attach_configdb_to_admindb() {
const char *a="ATTACH DATABASE '%s' AS disk";
void Standard_ProxySQL_Admin::__attach_db_to_admindb(SQLite3DB *db, char *alias) {
/*
* const char *a="ATTACH DATABASE '%s' AS disk";
int l=strlen(a)+strlen(configdb->get_url())+5;
char *cmd=(char *)malloc(l);
sprintf(cmd,a,configdb->get_url());
admindb->execute(cmd);
free(cmd);
*/
const char *a="ATTACH DATABASE '%s' AS %s";
int l=strlen(a)+strlen(db->get_url())+strlen(alias)+5;
char *cmd=(char *)malloc(l);
sprintf(cmd,a,db->get_url(), alias);
admindb->execute(cmd);
free(cmd);
}

@ -11,6 +11,102 @@
#define QUERY_PROCESSOR_VERSION "0.1.728"
#define strdup_null(__c) ( __c ? strdup(__c) : __c )
#define char_malloc (char *)malloc
#define free_null(__c) { if(__c) { free(__c); __c=NULL; } }
#define itostr(__s, __i) { __s=char_malloc(16); sprintf(__s, "%d", __i); }
class QP_rule_text_hitsonly {
public:
char **pta;
QP_rule_text_hitsonly(QP_rule_t *QPr) {
pta=NULL;
pta=(char **)malloc(sizeof(char *)*2);
itostr(pta[0], QPr->rule_id);
itostr(pta[1], QPr->hits);
}
~QP_rule_text_hitsonly() {
for(int i=0; i<2; i++) {
free_null(pta[i]);
}
free(pta);
}
};
class QP_rule_text {
public:
char **pta;
/*
char * rule_id;
char * active;
char * username;
char * schemaname;
char * flagIN;
char * match_pattern;
char * negate_match_pattern;
char * flagOUT;
char * replace_pattern;
char * destination_hostgroup;
char * cache_ttl;
char * apply;
char * hits;
*/
QP_rule_text(QP_rule_t *QPr) {
pta=NULL;
pta=(char **)malloc(sizeof(char *)*13);
itostr(pta[0], QPr->rule_id);
itostr(pta[1], QPr->active);
pta[2]=strdup_null(QPr->username);
pta[3]=strdup_null(QPr->schemaname);
itostr(pta[4], QPr->flagIN);
pta[5]=strdup_null(QPr->match_pattern);
itostr(pta[6], QPr->negate_match_pattern);
itostr(pta[7], QPr->flagOUT);
pta[8]=strdup_null(QPr->replace_pattern);
itostr(pta[9], QPr->destination_hostgroup);
itostr(pta[10], QPr->cache_ttl);
itostr(pta[11], QPr->apply);
itostr(pta[12], QPr->hits);
/*
itostr(rule_id, QPr->rule_id);
itostr(active, QPr->active);
username=strdup_null(QPr->username);
schemaname=strdup_null(QPr->schemaname);
itostr(flagIN, QPr->flagIN);
match_pattern=strdup_null(QPr->match_pattern);
itostr(negate_match_pattern, QPr->negate_match_pattern);
itostr(flagOUT, QPr->flagOUT);
replace_pattern=strdup_null(QPr->replace_pattern);
itostr(destination_hostgroup, QPr->destination_hostgroup);
itostr(cache_ttl, QPr->cache_ttl);
itostr(apply, QPr->apply);
itostr(hits, QPr->hits);
*/
}
~QP_rule_text() {
for(int i=0; i<13; i++) {
free_null(pta[i]);
}
free(pta);
/*
free_null(rule_id);
free_null(active);
free_null(username);
free_null(schemaname);
free_null(flagIN);
free_null(match_pattern);
free_null(negate_match_pattern);
free_null(flagOUT);
free_null(replace_pattern);
free_null(destination_hostgroup);
free_null(cache_ttl);
free_null(apply);
free_null(hits);
*/
}
};
struct __RE2_objects_t {
re2::RE2::Options *opt;
@ -191,6 +287,56 @@ virtual void commit() {
};
virtual SQLite3_result * get_stats_query_rules() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping query rules statistics, using Global version %d\n", version);
SQLite3_result *result=new SQLite3_result(2);
spin_rdlock(&rwlock);
QP_rule_t *qr1;
result->add_column_definition(SQLITE_TEXT,"rule_id");
result->add_column_definition(SQLITE_TEXT,"hits");
for (std::vector<QP_rule_t *>::iterator it=rules.begin(); it!=rules.end(); ++it) {
qr1=*it;
if (qr1->active) {
QP_rule_text_hitsonly *qt=new QP_rule_text_hitsonly(qr1);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping Query Rule id: %d\n", qr1->rule_id);
result->add_row(qt->pta);
delete qt;
}
}
spin_rdunlock(&rwlock);
return result;
}
virtual SQLite3_result * get_current_query_rules() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query rules, using Global version %d\n", version);
SQLite3_result *result=new SQLite3_result(13);
spin_rdlock(&rwlock);
QP_rule_t *qr1;
result->add_column_definition(SQLITE_TEXT,"rule_id");
result->add_column_definition(SQLITE_TEXT,"active");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"flagIN");
result->add_column_definition(SQLITE_TEXT,"match_pattern");
result->add_column_definition(SQLITE_TEXT,"negate_match_pattern");
result->add_column_definition(SQLITE_TEXT,"flagOUT");
result->add_column_definition(SQLITE_TEXT,"replace_pattern");
result->add_column_definition(SQLITE_TEXT,"destination_hostgroup");
result->add_column_definition(SQLITE_TEXT,"cache_ttl");
result->add_column_definition(SQLITE_TEXT,"apply");
result->add_column_definition(SQLITE_TEXT,"hits");
for (std::vector<QP_rule_t *>::iterator it=rules.begin(); it!=rules.end(); ++it) {
qr1=*it;
QP_rule_text *qt=new QP_rule_text(qr1);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping Query Rule id: %d\n", qr1->rule_id);
result->add_row(qt->pta);
delete qt;
}
spin_rdunlock(&rwlock);
return result;
}
virtual QP_out_t * process_mysql_query(MySQL_Session *sess, void *ptr, unsigned int size, bool delete_original) {
QP_out_t *ret=NULL;
unsigned int len=size-sizeof(mysql_hdr)-1;
@ -210,6 +356,7 @@ virtual QP_out_t * process_mysql_query(MySQL_Session *sess, void *ptr, unsigned
if (qr1->active) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Copying Query Rule id: %d\n", qr1->rule_id);
qr2=new_query_rule(qr1->rule_id, qr1->active, qr1->username, qr1->schemaname, qr1->flagIN, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, qr1->cache_ttl, qr1->apply);
qr2->parent=qr1; // pointer to parent to speed up parent update (hits)
if (qr2->match_pattern) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Compiling regex for rule_id: %d, match_pattern: \n", qr2->rule_id, qr2->match_pattern);
qr2->regex_engine=(void *)compile_query_rule(qr2);
@ -273,6 +420,27 @@ virtual QP_out_t * process_mysql_query(MySQL_Session *sess, void *ptr, unsigned
qr->hits++; // this is done without atomic function because it updates only the local variables
//ret=(QP_out_t *)malloc(sizeof(QP_out_t));
{
// FIXME: this block of code is only for testing
if ((qr->hits%20)==0) {
spin_rdlock(&rwlock);
if (__sync_add_and_fetch(&version,0) == _thr_SQP_version) { // extra safety check to avoid race conditions
__sync_fetch_and_add(&qr->parent->hits,20);
}
/*
QP_rule_t *qrg;
for (std::vector<QP_rule_t *>::iterator it=rules.begin(); it!=rules.end(); ++it) {
qrg=*it;
if (qrg->rule_id==qr->rule_id) {
__sync_fetch_and_add(&qrg->hits,20);
break;
}
}
*/
spin_rdunlock(&rwlock);
}
}
if (qr->flagOUT >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has changed flagOUT\n", qr->rule_id);
flagIN=qr->flagOUT;

@ -754,15 +754,17 @@ __exit_DSS__STATE_NOT_INITIALIZED:
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
if (qpo) {
if (qpo->cache_ttl>0) { // Fixed bug #145
client_myds->PSarrayOUT->copy_add(server_myds->resultset,0,server_myds->resultset->len);
unsigned char *aa=server_myds->resultset2buffer(false);
while (server_myds->resultset->len) server_myds->resultset->remove_index(server_myds->resultset->len-1,NULL);
GloQC->set((unsigned char *)client_myds->query_SQL,strlen((char *)client_myds->query_SQL)+1,aa,server_myds->resultset_length,30);
l_free(server_myds->resultset_length,aa);
server_myds->resultset_length=0;
l_free(strlen((char *)client_myds->query_SQL)+1,client_myds->query_SQL);
}
GloQPro->delete_QP_out(qpo);
qpo=NULL;
client_myds->PSarrayOUT->copy_add(server_myds->resultset,0,server_myds->resultset->len);
unsigned char *aa=server_myds->resultset2buffer(false);
while (server_myds->resultset->len) server_myds->resultset->remove_index(server_myds->resultset->len-1,NULL);
GloQC->set((unsigned char *)client_myds->query_SQL,strlen((char *)client_myds->query_SQL)+1,aa,server_myds->resultset_length,30);
l_free(server_myds->resultset_length,aa);
server_myds->resultset_length=0;
l_free(strlen((char *)client_myds->query_SQL)+1,client_myds->query_SQL);
}
}
break;

Loading…
Cancel
Save