diff --git a/doc/internal/Stats_API.txt b/doc/internal/Stats_API.txt new file mode 100644 index 000000000..76066b5ea --- /dev/null +++ b/doc/internal/Stats_API.txt @@ -0,0 +1,19 @@ + + + +This API should be simple. +Each module should collect statistics and stores them internally, preferibly +in a table format because this is how Admin will expect the results. + +Each module should implement 2 functions: +- stats_get_tables_list() returns an array of pointer to variables name, where + the last element is NULL; +- stat_get_table() returns the content of such table in the form of a + SQLite3_result resultset + + +Important note: +The reads should be considered in read committed isolation level. It is not +possible to read multiple tables at the same time, therefore it is possible +that the relation between these tables are not consistent. Due the temporary +nature of statistics information, this glitch is absolutely expected. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index d4b5fcc3c..1f01905f4 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -197,6 +197,7 @@ class MySQL_Connection_Pool; class PtrArray; class ProxySQL_ConfigFile; class MySQL_Server; +class SQLite3_result; //class MySQL_Servers; class MySQL_Hostgroup_Entry; class MySQL_Hostgroup; diff --git a/include/query_processor.h b/include/query_processor.h index a061be028..7b089cff2 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -17,7 +17,8 @@ struct _Query_Processor_rule_t { int cache_ttl; bool apply; void *regex_engine; - uint64_t hits; + int hits; + struct _Query_Processor_rule_t *parent; // pointer to parent, to speed up parent update }; @@ -59,8 +60,12 @@ class Query_Processor { virtual void init_thread() {}; virtual void end_thread() {}; virtual void commit() {}; // this applies all the changes in memory + virtual SQLite3_result * get_current_query_rules() {return NULL;}; + + virtual SQLite3_result * get_stats_query_rules() {return NULL;}; }; + typedef Query_Processor * create_Query_Processor_t(); #endif /* __CLASS_QUERY_PROCESSOR_H */ diff --git a/include/sqlite3db.h b/include/sqlite3db.h index d8f206730..60e51b61c 100644 --- a/include/sqlite3db.h +++ b/include/sqlite3db.h @@ -47,6 +47,18 @@ class SQLite3_row { //fields[i]=(sizes[i] ? strdup((char *)sqlite3_column_text(stmt,i)) : NULL); } }; + void add_fields(char **_fields) { + int i; + for (i=0;iadd_fields(stmt); rows.push_back(row); rows_count++; - return SQLITE_ROW; + return SQLITE_ROW; + }; + int add_row(char **_fields) { + SQLite3_row *row=new SQLite3_row(columns); + row->add_fields(_fields); + rows.push_back(row); + rows_count++; + return SQLITE_ROW; }; SQLite3_result(sqlite3_stmt *stmt) { rows_count=0; @@ -93,6 +112,10 @@ class SQLite3_result { } while (add_row(stmt)==SQLITE_ROW) {}; }; + SQLite3_result(int num_columns) { + rows_count=0; + columns=num_columns; + }; ~SQLite3_result() { for (std::vector::iterator it = column_definition.begin() ; it != column_definition.end(); ++it) { SQLite3_column *c=*it; diff --git a/lib/Standard_ProxySQL_Admin.cpp b/lib/Standard_ProxySQL_Admin.cpp index 3c947641e..a9be2c27b 100644 --- a/lib/Standard_ProxySQL_Admin.cpp +++ b/lib/Standard_ProxySQL_Admin.cpp @@ -20,6 +20,9 @@ static volatile int load_main_=0; static volatile bool nostart_=false; +static int __admin_refresh_interval=0; + + extern MySQL_Authentication *GloMyAuth; extern ProxySQL_Admin *GloAdmin; extern Query_Processor *GloQPro; @@ -42,6 +45,11 @@ pthread_mutex_t sock_mutex = PTHREAD_MUTEX_INITIALIZER; #define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0, username VARCHAR, schemaname VARCHAR, flagIN INT NOT NULL DEFAULT 0, match_pattern VARCHAR, negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0, flagOUT INT, replace_pattern VARCHAR, destination_hostgroup INT DEFAULT NULL, cache_ttl INT CHECK(cache_ttl > 0), apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0, FOREIGN KEY (destination_hostgroup) REFERENCES mysql_hostgroups (hostgroup_id))" #define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY, variable_value VARCHAR NOT NULL)" + +#define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY, hits INT NOT NULL)" + + + #ifdef DEBUG #define ADMIN_SQLITE_TABLE_DEBUG_LEVELS "CREATE TABLE debug_levels (module VARCHAR NOT NULL PRIMARY KEY, verbosity INT NOT NULL DEFAULT 0)" #endif /* DEBUG */ @@ -171,7 +179,8 @@ class Standard_ProxySQL_Admin: public ProxySQL_Admin { void __insert_or_replace_maintable_select_disktable(); void __delete_disktable(); void __insert_or_replace_disktable_select_maintable(); - void __attach_configdb_to_admindb(); +// void __attach_configdb_to_admindb(); + void __attach_db_to_admindb(SQLite3DB *db, char *alias); void __add_active_users(enum cred_username_type usertype); void __delete_inactive_users(enum cred_username_type usertype); @@ -226,6 +235,7 @@ class Standard_ProxySQL_Admin: public ProxySQL_Admin { void load_mysql_servers_to_runtime(); void save_mysql_servers_from_runtime(); char * load_mysql_query_rules_to_runtime(); + void save_mysql_query_rules_from_runtime(); void load_admin_variables_to_runtime() { flush_admin_variables___database_to_runtime(admindb, true); } void save_admin_variables_from_runtime() { flush_admin_variables___runtime_to_database(admindb, true, true, false); } @@ -233,6 +243,9 @@ class Standard_ProxySQL_Admin: public ProxySQL_Admin { void load_mysql_variables_to_runtime() { flush_mysql_variables___database_to_runtime(admindb, true); } void save_mysql_variables_from_runtime() { flush_mysql_variables___runtime_to_database(admindb, true, true, false); } + + void stats___mysql_query_rules(); + }; static Standard_ProxySQL_Admin *SPA=NULL; @@ -779,7 +792,6 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query return false; } -/* if ( (query_no_space_length==strlen("SAVE MYSQL QUERY RULES TO MEMORY") && !strncasecmp("SAVE MYSQL QUERY RULES TO MEMORY",query_no_space, query_no_space_length)) || @@ -791,12 +803,11 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query ) { proxy_debug(PROXY_DEBUG_ADMIN, 4, "Received %s command\n", query_no_space); Standard_ProxySQL_Admin *SPA=(Standard_ProxySQL_Admin *)pa; - SPA->save_mysql_users_runtime_to_database(); - proxy_debug(PROXY_DEBUG_ADMIN, 4, "Saved mysql users from RUNTIME\n"); + SPA->save_mysql_query_rules_from_runtime(); + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Saved mysql query rules from RUNTIME\n"); SPA->send_MySQL_OK(&sess->myprot_client, NULL); return false; } -*/ } if ((query_no_space_length>21) && ( (!strncasecmp("SAVE ADMIN VARIABLES ", query_no_space, 21)) || (!strncasecmp("LOAD ADMIN VARIABLES ", query_no_space, 21))) ) { @@ -1018,6 +1029,9 @@ void *child_mysql(void *arg) { //sess->myprot_client.generate_pkt_initial_handshake(sess->client_myds,true,NULL,NULL); sess->myprot_client.generate_pkt_initial_handshake(true,NULL,NULL); + + unsigned long oldtime=monotonic_time(); + unsigned long curtime=monotonic_time(); while (__sync_fetch_and_add(&glovars.shutdown,0)==0) { if (myds->available_data_out()) { @@ -1026,7 +1040,17 @@ void *child_mysql(void *arg) { fds[0].events=POLLIN; } fds[0].revents=0; - rc=poll(fds,nfds,2000); + //rc=poll(fds,nfds,2000); + rc=poll(fds,nfds,__sync_fetch_and_add(&__admin_refresh_interval,0)); + { + //FIXME: cleanup this block + curtime=monotonic_time(); + if (curtime>oldtime+__admin_refresh_interval) { + oldtime=curtime; + Standard_ProxySQL_Admin *SPA=(Standard_ProxySQL_Admin *)GloAdmin; + SPA->stats___mysql_query_rules(); + } + } if (rc == -1) { if (errno == EINTR) { continue; @@ -1343,10 +1367,16 @@ bool Standard_ProxySQL_Admin::init() { #endif /* DEBUG */ + insert_into_tables_defs(tables_defs_stats,"mysql_query_rules", STATS_SQLITE_TABLE_MYSQL_QUERY_RULES); + + check_and_build_standard_tables(admindb, tables_defs_admin); check_and_build_standard_tables(configdb, tables_defs_config); + check_and_build_standard_tables(statsdb, tables_defs_stats); - __attach_configdb_to_admindb(); + //__attach_configdb_to_admindb(); + __attach_db_to_admindb(configdb, (char *)"disk"); + __attach_db_to_admindb(statsdb, (char *)"stats"); #ifdef DEBUG flush_debug_levels_runtime_to_database(configdb, false); flush_debug_levels_runtime_to_database(admindb, true); @@ -1819,6 +1849,7 @@ bool Standard_ProxySQL_Admin::set_variable(char *name, char *value) { // this i int intv=atoi(value); if (intv > 100 && intv < 100000) { variables.refresh_interval=intv; + __admin_refresh_interval=intv; return true; } else { return false; @@ -1841,6 +1872,55 @@ bool Standard_ProxySQL_Admin::set_variable(char *name, char *value) { // this i } + + + +void Standard_ProxySQL_Admin::stats___mysql_query_rules() { + SQLite3_result * resultset=GloQPro->get_stats_query_rules(); + if (resultset==NULL) return; +// fprintf(stderr,"Number of columns: %d, rows: %d\n", result->columns, result->rows_count); + statsdb->execute("DELETE FROM stats_mysql_query_rules"); + char *a=(char *)"INSERT INTO stats_mysql_query_rules VALUES (\"%s\",\"%s\")"; + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + int arg_len=0; + for (int i=0; i<2; i++) { + arg_len+=strlen(r->fields[i]); + } + char *query=(char *)malloc(strlen(a)+arg_len+32); + sprintf(query,a,r->fields[0],r->fields[1]); + //fprintf(stderr,"%s\n",query); + statsdb->execute(query); + free(query); + } + delete resultset; +} + +void Standard_ProxySQL_Admin::save_mysql_query_rules_from_runtime() { + SQLite3_result * resultset=GloQPro->get_current_query_rules(); + if (resultset==NULL) return; +// fprintf(stderr,"Number of columns: %d, rows: %d\n", result->columns, result->rows_count); + admindb->execute("DELETE FROM mysql_query_rules"); + char *a=(char *)"INSERT INTO mysql_query_rules VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")"; + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + int arg_len=0; + for (int i=0; i<12; i++) { + arg_len+=strlen(r->fields[i]); + } + char *query=(char *)malloc(strlen(a)+arg_len+32); + sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5],r->fields[6],r->fields[7],r->fields[8],r->fields[9],r->fields[10],r->fields[11]); + fprintf(stderr,"%s\n",query); + admindb->execute(query); + free(query); + } + + //admindb->execute("UPDATE mysql_query_rules SET username=NULL WHERE username=\"\""); + //admindb->execute("UPDATE mysql_query_rules SET schemaname=NULL WHERE schemaname=\"\""); + + delete resultset; +} + void Standard_ProxySQL_Admin::flush_admin_variables___runtime_to_database(SQLite3DB *db, bool replace, bool del, bool onlyifempty) { proxy_debug(PROXY_DEBUG_ADMIN, 4, "Flushing ADMIN variables. Replace:%d, Delete:%d, Only_If_Empty:%d\n", replace, del, onlyifempty); if (onlyifempty) { @@ -2039,13 +2119,21 @@ void Standard_ProxySQL_Admin::flush_mysql_query_rules__from_memory_to_disk() { -void Standard_ProxySQL_Admin::__attach_configdb_to_admindb() { - const char *a="ATTACH DATABASE '%s' AS disk"; +void Standard_ProxySQL_Admin::__attach_db_to_admindb(SQLite3DB *db, char *alias) { +/* + * const char *a="ATTACH DATABASE '%s' AS disk"; int l=strlen(a)+strlen(configdb->get_url())+5; char *cmd=(char *)malloc(l); sprintf(cmd,a,configdb->get_url()); admindb->execute(cmd); free(cmd); +*/ + const char *a="ATTACH DATABASE '%s' AS %s"; + int l=strlen(a)+strlen(db->get_url())+strlen(alias)+5; + char *cmd=(char *)malloc(l); + sprintf(cmd,a,db->get_url(), alias); + admindb->execute(cmd); + free(cmd); } diff --git a/lib/Standard_Query_Processor.cpp b/lib/Standard_Query_Processor.cpp index dbe902cdf..a50a46fc5 100644 --- a/lib/Standard_Query_Processor.cpp +++ b/lib/Standard_Query_Processor.cpp @@ -11,6 +11,102 @@ #define QUERY_PROCESSOR_VERSION "0.1.728" +#define strdup_null(__c) ( __c ? strdup(__c) : __c ) +#define char_malloc (char *)malloc +#define free_null(__c) { if(__c) { free(__c); __c=NULL; } } + +#define itostr(__s, __i) { __s=char_malloc(16); sprintf(__s, "%d", __i); } + +class QP_rule_text_hitsonly { + public: + char **pta; + QP_rule_text_hitsonly(QP_rule_t *QPr) { + pta=NULL; + pta=(char **)malloc(sizeof(char *)*2); + itostr(pta[0], QPr->rule_id); + itostr(pta[1], QPr->hits); + } + ~QP_rule_text_hitsonly() { + for(int i=0; i<2; i++) { + free_null(pta[i]); + } + free(pta); + } +}; + +class QP_rule_text { + public: + char **pta; +/* + char * rule_id; + char * active; + char * username; + char * schemaname; + char * flagIN; + char * match_pattern; + char * negate_match_pattern; + char * flagOUT; + char * replace_pattern; + char * destination_hostgroup; + char * cache_ttl; + char * apply; + char * hits; +*/ + QP_rule_text(QP_rule_t *QPr) { + pta=NULL; + pta=(char **)malloc(sizeof(char *)*13); + itostr(pta[0], QPr->rule_id); + itostr(pta[1], QPr->active); + pta[2]=strdup_null(QPr->username); + pta[3]=strdup_null(QPr->schemaname); + itostr(pta[4], QPr->flagIN); + pta[5]=strdup_null(QPr->match_pattern); + itostr(pta[6], QPr->negate_match_pattern); + itostr(pta[7], QPr->flagOUT); + pta[8]=strdup_null(QPr->replace_pattern); + itostr(pta[9], QPr->destination_hostgroup); + itostr(pta[10], QPr->cache_ttl); + itostr(pta[11], QPr->apply); + itostr(pta[12], QPr->hits); +/* + itostr(rule_id, QPr->rule_id); + itostr(active, QPr->active); + username=strdup_null(QPr->username); + schemaname=strdup_null(QPr->schemaname); + itostr(flagIN, QPr->flagIN); + match_pattern=strdup_null(QPr->match_pattern); + itostr(negate_match_pattern, QPr->negate_match_pattern); + itostr(flagOUT, QPr->flagOUT); + replace_pattern=strdup_null(QPr->replace_pattern); + itostr(destination_hostgroup, QPr->destination_hostgroup); + itostr(cache_ttl, QPr->cache_ttl); + itostr(apply, QPr->apply); + itostr(hits, QPr->hits); +*/ + } + ~QP_rule_text() { + for(int i=0; i<13; i++) { + free_null(pta[i]); + } + free(pta); +/* + free_null(rule_id); + free_null(active); + free_null(username); + free_null(schemaname); + free_null(flagIN); + free_null(match_pattern); + free_null(negate_match_pattern); + free_null(flagOUT); + free_null(replace_pattern); + free_null(destination_hostgroup); + free_null(cache_ttl); + free_null(apply); + free_null(hits); +*/ + } +}; + struct __RE2_objects_t { re2::RE2::Options *opt; @@ -191,6 +287,56 @@ virtual void commit() { }; +virtual SQLite3_result * get_stats_query_rules() { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping query rules statistics, using Global version %d\n", version); + SQLite3_result *result=new SQLite3_result(2); + spin_rdlock(&rwlock); + QP_rule_t *qr1; + result->add_column_definition(SQLITE_TEXT,"rule_id"); + result->add_column_definition(SQLITE_TEXT,"hits"); + for (std::vector::iterator it=rules.begin(); it!=rules.end(); ++it) { + qr1=*it; + if (qr1->active) { + QP_rule_text_hitsonly *qt=new QP_rule_text_hitsonly(qr1); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping Query Rule id: %d\n", qr1->rule_id); + result->add_row(qt->pta); + delete qt; + } + } + spin_rdunlock(&rwlock); + return result; +} + +virtual SQLite3_result * get_current_query_rules() { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query rules, using Global version %d\n", version); + SQLite3_result *result=new SQLite3_result(13); + spin_rdlock(&rwlock); + QP_rule_t *qr1; + result->add_column_definition(SQLITE_TEXT,"rule_id"); + result->add_column_definition(SQLITE_TEXT,"active"); + result->add_column_definition(SQLITE_TEXT,"username"); + result->add_column_definition(SQLITE_TEXT,"schemaname"); + result->add_column_definition(SQLITE_TEXT,"flagIN"); + result->add_column_definition(SQLITE_TEXT,"match_pattern"); + result->add_column_definition(SQLITE_TEXT,"negate_match_pattern"); + result->add_column_definition(SQLITE_TEXT,"flagOUT"); + result->add_column_definition(SQLITE_TEXT,"replace_pattern"); + result->add_column_definition(SQLITE_TEXT,"destination_hostgroup"); + result->add_column_definition(SQLITE_TEXT,"cache_ttl"); + result->add_column_definition(SQLITE_TEXT,"apply"); + result->add_column_definition(SQLITE_TEXT,"hits"); + for (std::vector::iterator it=rules.begin(); it!=rules.end(); ++it) { + qr1=*it; + QP_rule_text *qt=new QP_rule_text(qr1); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping Query Rule id: %d\n", qr1->rule_id); + result->add_row(qt->pta); + delete qt; + } + spin_rdunlock(&rwlock); + return result; +} + + virtual QP_out_t * process_mysql_query(MySQL_Session *sess, void *ptr, unsigned int size, bool delete_original) { QP_out_t *ret=NULL; unsigned int len=size-sizeof(mysql_hdr)-1; @@ -210,6 +356,7 @@ virtual QP_out_t * process_mysql_query(MySQL_Session *sess, void *ptr, unsigned if (qr1->active) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Copying Query Rule id: %d\n", qr1->rule_id); qr2=new_query_rule(qr1->rule_id, qr1->active, qr1->username, qr1->schemaname, qr1->flagIN, qr1->match_pattern, qr1->negate_match_pattern, qr1->flagOUT, qr1->replace_pattern, qr1->destination_hostgroup, qr1->cache_ttl, qr1->apply); + qr2->parent=qr1; // pointer to parent to speed up parent update (hits) if (qr2->match_pattern) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Compiling regex for rule_id: %d, match_pattern: \n", qr2->rule_id, qr2->match_pattern); qr2->regex_engine=(void *)compile_query_rule(qr2); @@ -273,6 +420,27 @@ virtual QP_out_t * process_mysql_query(MySQL_Session *sess, void *ptr, unsigned qr->hits++; // this is done without atomic function because it updates only the local variables //ret=(QP_out_t *)malloc(sizeof(QP_out_t)); +{ + // FIXME: this block of code is only for testing + if ((qr->hits%20)==0) { + spin_rdlock(&rwlock); + if (__sync_add_and_fetch(&version,0) == _thr_SQP_version) { // extra safety check to avoid race conditions + __sync_fetch_and_add(&qr->parent->hits,20); + } +/* + QP_rule_t *qrg; + for (std::vector::iterator it=rules.begin(); it!=rules.end(); ++it) { + qrg=*it; + if (qrg->rule_id==qr->rule_id) { + __sync_fetch_and_add(&qrg->hits,20); + break; + } + } +*/ + spin_rdunlock(&rwlock); + } +} + if (qr->flagOUT >= 0) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has changed flagOUT\n", qr->rule_id); flagIN=qr->flagOUT; diff --git a/lib/mysql_session.cpp b/lib/mysql_session.cpp index 300ee428b..0bbc1a96c 100644 --- a/lib/mysql_session.cpp +++ b/lib/mysql_session.cpp @@ -754,15 +754,17 @@ __exit_DSS__STATE_NOT_INITIALIZED: status=WAITING_CLIENT_DATA; client_myds->DSS=STATE_SLEEP; if (qpo) { + if (qpo->cache_ttl>0) { // Fixed bug #145 + client_myds->PSarrayOUT->copy_add(server_myds->resultset,0,server_myds->resultset->len); + unsigned char *aa=server_myds->resultset2buffer(false); + while (server_myds->resultset->len) server_myds->resultset->remove_index(server_myds->resultset->len-1,NULL); + GloQC->set((unsigned char *)client_myds->query_SQL,strlen((char *)client_myds->query_SQL)+1,aa,server_myds->resultset_length,30); + l_free(server_myds->resultset_length,aa); + server_myds->resultset_length=0; + l_free(strlen((char *)client_myds->query_SQL)+1,client_myds->query_SQL); + } GloQPro->delete_QP_out(qpo); qpo=NULL; - client_myds->PSarrayOUT->copy_add(server_myds->resultset,0,server_myds->resultset->len); - unsigned char *aa=server_myds->resultset2buffer(false); - while (server_myds->resultset->len) server_myds->resultset->remove_index(server_myds->resultset->len-1,NULL); - GloQC->set((unsigned char *)client_myds->query_SQL,strlen((char *)client_myds->query_SQL)+1,aa,server_myds->resultset_length,30); - l_free(server_myds->resultset_length,aa); - server_myds->resultset_length=0; - l_free(strlen((char *)client_myds->query_SQL)+1,client_myds->query_SQL); } } break;