stats_history.history_mysql_query_digest #2368

Implementation of new table stats_history.history_mysql_query_digest #2368

Implemented command `SAVE MYSQL DIGEST TO DISK`.
Save to disk with really minimal memory footprint compared to querying `stats_mysql_query_digest` or `stats_mysql_query_digest_reset` .

Implemented new admin variable `admin-stats_mysql_query_digest_to_disk` to automatically save to disk.
pull/2370/head
René Cannaò 6 years ago
parent fc4d229e38
commit 6bed334db1

@ -63,6 +63,8 @@
#define STATSDB_SQLITE_TABLE_MYSQL_QUERY_CACHE_DAY "CREATE TABLE mysql_query_cache_day (timestamp INT NOT NULL, count_GET INT NOT NULL, count_GET_OK INT NOT NULL, count_SET INT NOT NULL, bytes_IN INT NOT NULL, bytes_OUT INT NOT NULL, Entries_Purged INT NOT NULL, Entries_In_Cache INT NOT NULL, Memory_Bytes INT NOT NULL, PRIMARY KEY (timestamp))"
#define STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_DIGEST "CREATE TABLE history_mysql_query_digest (hostgroup INT , schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , client_address VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , sum_rows_affected INTEGER NOT NULL , sum_rows_sent INTEGER NOT NULL)"
class ProxySQL_Statistics {
SQLite3DB *statsdb_mem; // internal statistics DB
std::vector<table_def_t *> *tables_defs_statsdb_mem;
@ -72,6 +74,7 @@ class ProxySQL_Statistics {
void drop_tables_defs(std::vector<table_def_t *> *tables_defs);
void check_and_build_standard_tables(SQLite3DB *db, std::vector<table_def_t *> *tables_defs);
unsigned long long next_timer_MySQL_Threads_Handler;
unsigned long long next_timer_mysql_query_digest_to_disk;
unsigned long long next_timer_system_cpu;
#ifndef NOJEM
unsigned long long next_timer_system_memory;
@ -83,6 +86,7 @@ class ProxySQL_Statistics {
int stats_mysql_connections;
int stats_mysql_query_cache;
int stats_system_cpu;
int stats_mysql_query_digest_to_disk;
#ifndef NOJEM
int stats_system_memory;
#endif
@ -93,6 +97,7 @@ class ProxySQL_Statistics {
void init();
void print_version();
bool MySQL_Threads_Handler_timetoget(unsigned long long);
bool mysql_query_digest_to_disk_timetoget(unsigned long long);
bool system_cpu_timetoget(unsigned long long);
#ifndef NOJEM
bool system_memory_timetoget(unsigned long long);

@ -100,6 +100,7 @@ class ProxySQL_Admin {
int stats_mysql_connection_pool;
int stats_mysql_connections;
int stats_mysql_query_cache;
int stats_mysql_query_digest_to_disk;
int stats_system_cpu;
int stats_system_memory;
int mysql_show_processlist_extended;
@ -309,7 +310,7 @@ class ProxySQL_Admin {
#endif /* PROXYSQLCLICKHOUSE */
void vacuum_stats(bool);
int FlushDigestTableToDisk(SQLite3DB *);
#ifdef TEST_AURORA
void enable_aurora_testing();

@ -3,6 +3,14 @@
#include "proxysql.h"
#include "cpp.h"
// Optimization introduced in 2.0.6
// to avoid a lot of unnecessary copy
#define DIGEST_STATS_FAST_1
#define DIGEST_STATS_FAST_MINSIZE 100000
#define DIGEST_STATS_FAST_THREADS 4
#define FAST_ROUTING_NEW208
@ -15,6 +23,60 @@ KHASH_MAP_INIT_STR(khStrInt, int)
typedef std::unordered_map<std::uint64_t, void *> umap_query_digest;
typedef std::unordered_map<std::uint64_t, char *> umap_query_digest_text;
#ifdef DIGEST_STATS_FAST_1
typedef struct _query_digest_stats_pointers_t {
char *pta[14];
char digest[24];
char count_star[24];
char first_seen[24];
char last_seen[24];
char sum_time[24];
char min_time[24];
char max_time[24];
char hid[24];
char rows_affected[24];
char rows_sent[24];
} query_digest_stats_pointers_t;
#endif
class QP_query_digest_stats {
public:
uint64_t digest;
char *digest_text;
char *username;
char *schemaname;
char *client_address;
#ifdef DIGEST_STATS_FAST_1
char username_buf[24];
char schemaname_buf[24];
char client_address_buf[24];
#endif
time_t first_seen;
time_t last_seen;
unsigned int count_star;
unsigned long long sum_time;
unsigned long long min_time;
unsigned long long max_time;
unsigned long long rows_affected;
unsigned long long rows_sent;
int hid;
QP_query_digest_stats(char *u, char *s, uint64_t d, char *dt, int h, char *ca);
void add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs);
~QP_query_digest_stats();
#ifdef DIGEST_STATS_FAST_1
char **get_row(umap_query_digest_text *digest_text_umap, query_digest_stats_pointers_t *qdsp);
#else
char **get_row(umap_query_digest_text *digest_text_umap);
#endif
#ifdef DIGEST_STATS_FAST_1
#else
void free_row(char **pta);
#endif
};
struct _Query_Processor_rule_t {
int rule_id;
bool active;
@ -253,6 +315,7 @@ class Query_Processor {
SQLite3_result * get_stats_commands_counters();
SQLite3_result * get_query_digests();
SQLite3_result * get_query_digests_reset();
void get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt);
unsigned long long purge_query_digests(bool async_purge, bool parallel, char **msg);
unsigned long long purge_query_digests_async(char **msg);
unsigned long long purge_query_digests_sync(bool parallel);
@ -260,7 +323,6 @@ class Query_Processor {
unsigned long long get_query_digests_total_size();
unsigned long long get_rules_mem_used();
// fast routing
SQLite3_result * fast_routing_resultset;
void load_fast_routing(SQLite3_result *resultset);

@ -401,6 +401,7 @@ static char * admin_variables_names[]= {
(char *)"stats_mysql_connections",
(char *)"stats_mysql_connection_pool",
(char *)"stats_mysql_query_cache",
(char *)"stats_mysql_query_digest_to_disk",
(char *)"stats_system_cpu",
(char *)"stats_system_memory",
(char *)"mysql_ifaces",
@ -439,18 +440,161 @@ static ProxySQL_Admin *SPA=NULL;
static void * (*child_func[3]) (void *arg);
int ProxySQL_Test___GetDigestTable(bool reset) {
int ProxySQL_Test___GetDigestTable(bool reset, bool use_swap) {
int r = 0;
if (!GloQPro) return 0;
SQLite3_result * resultset=NULL;
if (reset==true) {
resultset=GloQPro->get_query_digests_reset();
if (use_swap == false) {
SQLite3_result * resultset=NULL;
if (reset==true) {
resultset=GloQPro->get_query_digests_reset();
} else {
resultset=GloQPro->get_query_digests();
}
if (resultset==NULL) return 0;
r = resultset->rows_count;
delete resultset;
} else {
resultset=GloQPro->get_query_digests();
umap_query_digest uqd;
umap_query_digest_text uqdt;
GloQPro->get_query_digests_reset(&uqd, &uqdt);
r = uqd.size();
for (std::unordered_map<uint64_t, void *>::iterator it=uqd.begin(); it!=uqd.end(); ++it) {
QP_query_digest_stats * qds = (QP_query_digest_stats *)it->second;
delete qds;
}
uqd.erase(uqd.begin(),uqd.end());
for (std::unordered_map<uint64_t, char *>::iterator it=uqdt.begin(); it!=uqdt.end(); ++it) {
free(it->second);
}
uqdt.erase(uqdt.begin(),uqdt.end());
}
if (resultset==NULL) return 0;
r = resultset->rows_count;
delete resultset;
return r;
}
int ProxySQL_Admin::FlushDigestTableToDisk(SQLite3DB *_db) {
int r = 0;
if (!GloQPro) return 0;
umap_query_digest uqd;
umap_query_digest_text uqdt;
GloQPro->get_query_digests_reset(&uqd, &uqdt);
r = uqd.size();
SQLite3DB * sdb = _db;
sdb->execute("BEGIN");
int rc;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
char *query1=NULL;
char *query32=NULL;
query1=(char *)"INSERT INTO history_mysql_query_digest VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)";
query32=(char *)"INSERT INTO history_mysql_query_digest VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14), (?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28), (?29, ?30, ?31, ?32, ?33, ?34, ?35, ?36, ?37, ?38, ?39, ?40, ?41, ?42), (?43, ?44, ?45, ?46, ?47, ?48, ?49, ?50, ?51, ?52, ?53, ?54, ?55, ?56), (?57, ?58, ?59, ?60, ?61, ?62, ?63, ?64, ?65, ?66, ?67, ?68, ?69, ?70), (?71, ?72, ?73, ?74, ?75, ?76, ?77, ?78, ?79, ?80, ?81, ?82, ?83, ?84), (?85, ?86, ?87, ?88, ?89, ?90, ?91, ?92, ?93, ?94, ?95, ?96, ?97, ?98), (?99, ?100, ?101, ?102, ?103, ?104, ?105, ?106, ?107, ?108, ?109, ?110, ?111, ?112), (?113, ?114, ?115, ?116, ?117, ?118, ?119, ?120, ?121, ?122, ?123, ?124, ?125, ?126), (?127, ?128, ?129, ?130, ?131, ?132, ?133, ?134, ?135, ?136, ?137, ?138, ?139, ?140), (?141, ?142, ?143, ?144, ?145, ?146, ?147, ?148, ?149, ?150, ?151, ?152, ?153, ?154), (?155, ?156, ?157, ?158, ?159, ?160, ?161, ?162, ?163, ?164, ?165, ?166, ?167, ?168), (?169, ?170, ?171, ?172, ?173, ?174, ?175, ?176, ?177, ?178, ?179, ?180, ?181, ?182), (?183, ?184, ?185, ?186, ?187, ?188, ?189, ?190, ?191, ?192, ?193, ?194, ?195, ?196), (?197, ?198, ?199, ?200, ?201, ?202, ?203, ?204, ?205, ?206, ?207, ?208, ?209, ?210), (?211, ?212, ?213, ?214, ?215, ?216, ?217, ?218, ?219, ?220, ?221, ?222, ?223, ?224), (?225, ?226, ?227, ?228, ?229, ?230, ?231, ?232, ?233, ?234, ?235, ?236, ?237, ?238), (?239, ?240, ?241, ?242, ?243, ?244, ?245, ?246, ?247, ?248, ?249, ?250, ?251, ?252), (?253, ?254, ?255, ?256, ?257, ?258, ?259, ?260, ?261, ?262, ?263, ?264, ?265, ?266), (?267, ?268, ?269, ?270, ?271, ?272, ?273, ?274, ?275, ?276, ?277, ?278, ?279, ?280), (?281, ?282, ?283, ?284, ?285, ?286, ?287, ?288, ?289, ?290, ?291, ?292, ?293, ?294), (?295, ?296, ?297, ?298, ?299, ?300, ?301, ?302, ?303, ?304, ?305, ?306, ?307, ?308), (?309, ?310, ?311, ?312, ?313, ?314, ?315, ?316, ?317, ?318, ?319, ?320, ?321, ?322), (?323, ?324, ?325, ?326, ?327, ?328, ?329, ?330, ?331, ?332, ?333, ?334, ?335, ?336), (?337, ?338, ?339, ?340, ?341, ?342, ?343, ?344, ?345, ?346, ?347, ?348, ?349, ?350), (?351, ?352, ?353, ?354, ?355, ?356, ?357, ?358, ?359, ?360, ?361, ?362, ?363, ?364), (?365, ?366, ?367, ?368, ?369, ?370, ?371, ?372, ?373, ?374, ?375, ?376, ?377, ?378), (?379, ?380, ?381, ?382, ?383, ?384, ?385, ?386, ?387, ?388, ?389, ?390, ?391, ?392), (?393, ?394, ?395, ?396, ?397, ?398, ?399, ?400, ?401, ?402, ?403, ?404, ?405, ?406), (?407, ?408, ?409, ?410, ?411, ?412, ?413, ?414, ?415, ?416, ?417, ?418, ?419, ?420), (?421, ?422, ?423, ?424, ?425, ?426, ?427, ?428, ?429, ?430, ?431, ?432, ?433, ?434), (?435, ?436, ?437, ?438, ?439, ?440, ?441, ?442, ?443, ?444, ?445, ?446, ?447, ?448)";
rc = sdb->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, sdb);
rc = sdb->prepare_v2(query32, &statement32);
ASSERT_SQLITE_OK(rc, sdb);
int row_idx=0;
int max_bulk_row_idx=r/32;
max_bulk_row_idx=max_bulk_row_idx*32;
query_digest_stats_pointers_t qdsp;
time_t __now;
time(&__now);
unsigned long long curtime=monotonic_time();
time_t seen_time;
for (std::unordered_map<uint64_t, void *>::iterator it=uqd.begin(); it!=uqd.end(); ++it) {
QP_query_digest_stats * qds = (QP_query_digest_stats *)it->second;
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
rc=sqlite3_bind_int64(statement32, (idx*14)+1, qds->hid); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_text(statement32, (idx*14)+2, qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_text(statement32, (idx*14)+3, qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_text(statement32, (idx*14)+4, qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
sprintf(qdsp.digest,"0x%016llX", (long long unsigned int)qds->digest);
rc=sqlite3_bind_text(statement32, (idx*14)+5, qdsp.digest, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
if (qds->digest_text) {
rc=sqlite3_bind_text(statement32, (idx*14)+6, qds->digest_text, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
} else {
std::unordered_map<uint64_t, char *>::iterator it2;
it2=uqdt.find(qds->digest);
if (it2 != uqdt.end()) {
rc=sqlite3_bind_text(statement32, (idx*14)+6, it2->second, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
} else {
assert(0);
}
}
rc=sqlite3_bind_int64(statement32, (idx*14)+7, qds->count_star); ASSERT_SQLITE_OK(rc, sdb);
{
seen_time = __now - curtime/1000000 + qds->first_seen/1000000;
rc=sqlite3_bind_int64(statement32, (idx*14)+8, seen_time); ASSERT_SQLITE_OK(rc, sdb);
}
{
seen_time = __now - curtime/1000000 + qds->last_seen/1000000;
rc=sqlite3_bind_int64(statement32, (idx*14)+9, seen_time); ASSERT_SQLITE_OK(rc, sdb);
}
rc=sqlite3_bind_int64(statement32, (idx*14)+10, qds->sum_time); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_int64(statement32, (idx*14)+11, qds->min_time); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_int64(statement32, (idx*14)+12, qds->max_time); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_int64(statement32, (idx*14)+13, qds->rows_affected); ASSERT_SQLITE_OK(rc, sdb); // rows affected
rc=sqlite3_bind_int64(statement32, (idx*14)+14, qds->rows_sent); ASSERT_SQLITE_OK(rc, sdb); // rows sent
if (idx==31) {
SAFE_SQLITE3_STEP2(statement32);
rc=sqlite3_clear_bindings(statement32); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_reset(statement32); ASSERT_SQLITE_OK(rc, sdb);
if (row_idx%100==0) {
sdb->execute("COMMIT");
sdb->execute("BEGIN");
}
}
} else { // single row
rc=sqlite3_bind_int64(statement1, 1, qds->hid); ASSERT_SQLITE_OK(rc, sdb);
assert(qds->schemaname);
rc=sqlite3_bind_text(statement1, 2, qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_text(statement1, 3, qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_text(statement1, 4, qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
sprintf(qdsp.digest,"0x%016llX", (long long unsigned int)qds->digest);
rc=sqlite3_bind_text(statement1, 5, qdsp.digest, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
if (qds->digest_text) {
rc=sqlite3_bind_text(statement1, 6, qds->digest_text, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
} else {
std::unordered_map<uint64_t, char *>::iterator it2;
it2=uqdt.find(qds->digest);
if (it2 != uqdt.end()) {
rc=sqlite3_bind_text(statement1, 6, it2->second, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, sdb);
} else {
assert(0);
}
}
rc=sqlite3_bind_int64(statement1, 7, qds->count_star); ASSERT_SQLITE_OK(rc, sdb);
{
seen_time = __now - curtime/1000000 + qds->first_seen/1000000;
rc=sqlite3_bind_int64(statement1, 8, seen_time); ASSERT_SQLITE_OK(rc, sdb);
}
{
seen_time = __now - curtime/1000000 + qds->last_seen/1000000;
rc=sqlite3_bind_int64(statement1, 9, seen_time); ASSERT_SQLITE_OK(rc, sdb);
}
rc=sqlite3_bind_int64(statement1, 10, qds->sum_time); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_int64(statement1, 11, qds->min_time); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_int64(statement1, 12, qds->max_time); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_bind_int64(statement1, 13, qds->rows_affected); ASSERT_SQLITE_OK(rc, sdb); // rows affected
rc=sqlite3_bind_int64(statement1, 14, qds->rows_sent); ASSERT_SQLITE_OK(rc, sdb); // rows sent
SAFE_SQLITE3_STEP2(statement1);
rc=sqlite3_clear_bindings(statement1); ASSERT_SQLITE_OK(rc, sdb);
rc=sqlite3_reset(statement1); ASSERT_SQLITE_OK(rc, sdb);
}
row_idx++;
}
sqlite3_finalize(statement1);
sqlite3_finalize(statement32);
sdb->execute("COMMIT");
for (std::unordered_map<uint64_t, void *>::iterator it=uqd.begin(); it!=uqd.end(); ++it) {
QP_query_digest_stats * qds = (QP_query_digest_stats *)it->second;
delete qds;
}
uqd.erase(uqd.begin(),uqd.end());
for (std::unordered_map<uint64_t, char *>::iterator it=uqdt.begin(); it!=uqdt.end(); ++it) {
free(it->second);
}
uqdt.erase(uqdt.begin(),uqdt.end());
return r;
}
@ -1269,6 +1413,17 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query
}
#endif /* PROXYSQLCLICKHOUSE */
if ((query_no_space_length>17) && ( (!strcasecmp("SAVE MYSQL DIGEST TO DISK", query_no_space) ) )) {
proxy_info("Received %s command\n", query_no_space);
unsigned long long curtime1=monotonic_time();
int r1 = SPA->FlushDigestTableToDisk(SPA->statsdb_disk);
unsigned long long curtime2=monotonic_time();
curtime1 = curtime1/1000;
curtime2 = curtime2/1000;
proxy_info("Saved stats_mysql_query_digest to disk: %llums to write %llu entries\n", curtime2-curtime1, r1);
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1);
return false;
}
if ((query_no_space_length>17) && ( (!strncasecmp("SAVE MYSQL USERS ", query_no_space, 17)) || (!strncasecmp("LOAD MYSQL USERS ", query_no_space, 17))) ) {
if (
@ -2744,14 +2899,14 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) {
case 2:
// get all the entries from the digest map, but without writing to DB
// it uses multiple threads
r1 = ProxySQL_Test___GetDigestTable(false);
r1 = ProxySQL_Test___GetDigestTable(false, false);
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1);
run_query=false;
break;
case 3:
// get all the entries from the digest map and reset, but without writing to DB
// it uses multiple threads
r1 = ProxySQL_Test___GetDigestTable(true);
r1 = ProxySQL_Test___GetDigestTable(true, false);
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1);
run_query=false;
break;
@ -2774,6 +2929,20 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) {
free(msg);
run_query=false;
break;
case 7:
// get all the entries from the digest map and reset, but without writing to DB
// it uses multiple threads
// it locks for a very short time and doesn't use SQLite3_result, but swap
r1 = ProxySQL_Test___GetDigestTable(true, true);
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1);
run_query=false;
break;
case 8:
// get all the entries from the digest map and reset, AND write to DB
r1 = SPA->FlushDigestTableToDisk(SPA->statsdb_disk);
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1);
run_query=false;
break;
case 11:
// generate random mysql_query_rules_fast_routing
if (test_arg1==0) {
@ -3945,6 +4114,14 @@ __end_while_pool:
}
}
}
if (GloProxyStats->mysql_query_digest_to_disk_timetoget(curtime)) {
unsigned long long curtime1=monotonic_time();
int r1 = SPA->FlushDigestTableToDisk(SPA->statsdb_disk);
unsigned long long curtime2=monotonic_time();
curtime1 = curtime1/1000;
curtime2 = curtime2/1000;
proxy_info("Automatically saved stats_mysql_query_digest to disk: %llums to write %llu entries\n", curtime2-curtime1, r1);
}
if (GloProxyStats->system_cpu_timetoget(curtime)) {
GloProxyStats->system_cpu_sets();
}
@ -4087,11 +4264,13 @@ ProxySQL_Admin::ProxySQL_Admin() {
variables.stats_mysql_connection_pool = 60;
variables.stats_mysql_connections = 60;
variables.stats_mysql_query_cache = 60;
variables.stats_mysql_query_digest_to_disk = 0;
variables.stats_system_cpu = 60;
variables.stats_system_memory = 60;
GloProxyStats->variables.stats_mysql_connection_pool = 60;
GloProxyStats->variables.stats_mysql_connections = 60;
GloProxyStats->variables.stats_mysql_query_cache = 60;
GloProxyStats->variables.stats_mysql_query_digest_to_disk = 0;
GloProxyStats->variables.stats_system_cpu = 60;
#ifndef NOJEM
GloProxyStats->variables.stats_system_memory = 60;
@ -5297,6 +5476,10 @@ char * ProxySQL_Admin::get_variable(char *name) {
sprintf(intbuf,"%d",variables.stats_mysql_query_cache);
return strdup(intbuf);
}
if (!strcasecmp(name,"stats_mysql_query_digest_to_disk")) {
sprintf(intbuf,"%d",variables.stats_mysql_query_digest_to_disk);
return strdup(intbuf);
}
if (!strcasecmp(name,"stats_system_cpu")) {
sprintf(intbuf,"%d",variables.stats_system_cpu);
return strdup(intbuf);
@ -5583,6 +5766,16 @@ bool ProxySQL_Admin::set_variable(char *name, char *value) { // this is the pub
return false;
}
}
if (!strcasecmp(name,"stats_mysql_query_digest_to_disk")) {
int intv=atoi(value);
if (intv >= 0 && intv < 24*3600) {
variables.stats_mysql_query_digest_to_disk=intv;
GloProxyStats->variables.stats_mysql_query_digest_to_disk=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"stats_system_cpu")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 600) {

@ -55,6 +55,7 @@ ProxySQL_Statistics::ProxySQL_Statistics() {
statsdb_disk->execute("PRAGMA synchronous=0");
next_timer_MySQL_Threads_Handler = 0;
next_timer_mysql_query_digest_to_disk = 0;
next_timer_system_cpu = 0;
#ifndef NOJEM
next_timer_system_memory = 0;
@ -100,10 +101,19 @@ void ProxySQL_Statistics::init() {
insert_into_tables_defs(tables_defs_statsdb_disk,"myhgm_connections_hour", STATSDB_SQLITE_TABLE_MYHGM_CONNECTIONS_HOUR);
insert_into_tables_defs(tables_defs_statsdb_disk,"myhgm_connections_day", STATSDB_SQLITE_TABLE_MYHGM_CONNECTIONS_DAY);
insert_into_tables_defs(tables_defs_statsdb_disk,"history_mysql_query_digest", STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_DIGEST);
disk_upgrade_mysql_connections();
check_and_build_standard_tables(statsdb_mem, tables_defs_statsdb_disk);
check_and_build_standard_tables(statsdb_disk, tables_defs_statsdb_disk);
// statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_hostgroup ON history_mysql_query_digest (hostgroup)");
// statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_username ON history_mysql_query_digest (username)");
// statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_schemaname ON history_mysql_query_digest (schemaname)");
// statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_digest ON history_mysql_query_digest (digest)");
statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_first_seen ON history_mysql_query_digest (first_seen)");
// statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_digest_last_seen ON history_mysql_query_digest (last_seen)");
}
void ProxySQL_Statistics::disk_upgrade_mysql_connections() {
@ -201,6 +211,22 @@ bool ProxySQL_Statistics::MySQL_Query_Cache_timetoget(unsigned long long curtime
return false;
}
bool ProxySQL_Statistics::mysql_query_digest_to_disk_timetoget(unsigned long long curtime) {
unsigned int i = (unsigned int)variables.stats_mysql_query_digest_to_disk;
if (i) {
if (
( curtime > next_timer_mysql_query_digest_to_disk )
||
( curtime + i*1000*1000 < next_timer_mysql_query_digest_to_disk )
) {
next_timer_mysql_query_digest_to_disk = curtime/1000/1000 + i;
next_timer_mysql_query_digest_to_disk = next_timer_mysql_query_digest_to_disk * 1000 * 1000;
return true;
}
}
return false;
}
bool ProxySQL_Statistics::system_cpu_timetoget(unsigned long long curtime) {
unsigned int i = (unsigned int)variables.stats_system_cpu;
if (i) {

@ -20,11 +20,6 @@
#define QP_RE_MOD_CASELESS 1
#define QP_RE_MOD_GLOBAL 2
// Optimization introduced in 2.0.6
// to avoid a lot of unnecessary copy
#define DIGEST_STATS_FAST_1
#define DIGEST_STATS_FAST_MINSIZE 100000
#define DIGEST_STATS_FAST_THREADS 4
#ifdef DIGEST_STATS_FAST_1
#include <thread>
@ -145,279 +140,235 @@ void my_itoa(char s[], unsigned long long n)
reverse(s);
}
typedef struct _query_digest_stats_pointers_t {
char *pta[14];
char digest[24];
char count_star[24];
char first_seen[24];
char last_seen[24];
char sum_time[24];
char min_time[24];
char max_time[24];
char hid[24];
char rows_affected[24];
char rows_sent[24];
} query_digest_stats_pointers_t;
#endif
class QP_query_digest_stats {
public:
uint64_t digest;
char *digest_text;
char *username;
char *schemaname;
char *client_address;
#ifdef DIGEST_STATS_FAST_1
char username_buf[24];
char schemaname_buf[24];
char client_address_buf[24];
#endif
time_t first_seen;
time_t last_seen;
unsigned int count_star;
unsigned long long sum_time;
unsigned long long min_time;
unsigned long long max_time;
unsigned long long rows_affected;
unsigned long long rows_sent;
int hid;
QP_query_digest_stats(char *u, char *s, uint64_t d, char *dt, int h, char *ca) {
digest=d;
digest_text=NULL;
if (dt) {
digest_text=strndup(dt, mysql_thread___query_digests_max_digest_length);
}
QP_query_digest_stats::QP_query_digest_stats(char *u, char *s, uint64_t d, char *dt, int h, char *ca) {
digest=d;
digest_text=NULL;
if (dt) {
digest_text=strndup(dt, mysql_thread___query_digests_max_digest_length);
}
#ifdef DIGEST_STATS_FAST_1
if (strlen(u) < sizeof(username_buf)) {
strcpy(username_buf,u);
username = username_buf;
} else {
username=strdup(u);
}
if (strlen(s) < sizeof(schemaname_buf)) {
strcpy(schemaname_buf,s);
schemaname = schemaname_buf;
} else {
schemaname=strdup(s);
}
if (strlen(ca) < sizeof(client_address_buf)) {
strcpy(client_address_buf,ca);
client_address = client_address_buf;
} else {
client_address=strdup(ca);
}
#else
if (strlen(u) < sizeof(username_buf)) {
strcpy(username_buf,u);
username = username_buf;
} else {
username=strdup(u);
}
if (strlen(s) < sizeof(schemaname_buf)) {
strcpy(schemaname_buf,s);
schemaname = schemaname_buf;
} else {
schemaname=strdup(s);
}
if (strlen(ca) < sizeof(client_address_buf)) {
strcpy(client_address_buf,ca);
client_address = client_address_buf;
} else {
client_address=strdup(ca);
}
#else
username=strdup(u);
schemaname=strdup(s);
client_address=strdup(ca);
#endif
count_star=0;
first_seen=0;
last_seen=0;
sum_time=0;
min_time=0;
max_time=0;
rows_affected=0;
rows_sent=0;
hid=h;
}
void add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs) {
count_star++;
sum_time+=t;
rows_affected+=ra;
rows_sent+=rs;
if (t < min_time || min_time==0) {
if (t) min_time = t;
}
if (t > max_time) {
max_time = t;
}
if (first_seen==0) {
first_seen=n;
}
last_seen=n;
}
~QP_query_digest_stats() {
if (digest_text) {
free(digest_text);
digest_text=NULL;
}
if (username) {
count_star=0;
first_seen=0;
last_seen=0;
sum_time=0;
min_time=0;
max_time=0;
rows_affected=0;
rows_sent=0;
hid=h;
}
void QP_query_digest_stats::add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs) {
count_star++;
sum_time+=t;
rows_affected+=ra;
rows_sent+=rs;
if (t < min_time || min_time==0) {
if (t) min_time = t;
}
if (t > max_time) {
max_time = t;
}
if (first_seen==0) {
first_seen=n;
}
last_seen=n;
}
QP_query_digest_stats::~QP_query_digest_stats() {
if (digest_text) {
free(digest_text);
digest_text=NULL;
}
if (username) {
#ifdef DIGEST_STATS_FAST_1
if (username == username_buf) {
} else {
free(username);
}
#else
if (username == username_buf) {
} else {
free(username);
#endif
username=NULL;
}
if (schemaname) {
#ifdef DIGEST_STATS_FAST_1
if (schemaname == schemaname_buf) {
} else {
free(schemaname);
}
#else
free(schemaname);
free(username);
#endif
schemaname=NULL;
}
if (client_address) {
username=NULL;
}
if (schemaname) {
#ifdef DIGEST_STATS_FAST_1
if (client_address == client_address_buf) {
} else {
free(client_address);
}
if (schemaname == schemaname_buf) {
} else {
free(schemaname);
}
#else
free(client_address);
free(schemaname);
#endif
client_address=NULL;
schemaname=NULL;
}
if (client_address) {
#ifdef DIGEST_STATS_FAST_1
if (client_address == client_address_buf) {
} else {
free(client_address);
}
#else
free(client_address);
#endif
client_address=NULL;
}
}
#ifdef DIGEST_STATS_FAST_1
char **get_row(umap_query_digest_text *digest_text_umap, query_digest_stats_pointers_t *qdsp) {
char **pta=qdsp->pta;
char **QP_query_digest_stats::get_row(umap_query_digest_text *digest_text_umap, query_digest_stats_pointers_t *qdsp) {
char **pta=qdsp->pta;
#else
char **get_row(umap_query_digest_text *digest_text_umap) {
char buf[128];
char **pta=(char **)malloc(sizeof(char *)*14);
char **QP_query_digest_stats::get_row(umap_query_digest_text *digest_text_umap) {
char buf[128];
char **pta=(char **)malloc(sizeof(char *)*14);
#endif
#ifdef DIGEST_STATS_FAST_1
assert(schemaname);
pta[0]=schemaname;
assert(username);
pta[1]=username;
assert(client_address);
pta[2]=client_address;
assert(schemaname);
pta[0]=schemaname;
assert(username);
pta[1]=username;
assert(client_address);
pta[2]=client_address;
#else
assert(schemaname);
pta[0]=strdup(schemaname);
assert(username);
pta[1]=strdup(username);
assert(client_address);
pta[2]=strdup(client_address);
assert(schemaname);
pta[0]=strdup(schemaname);
assert(username);
pta[1]=strdup(username);
assert(client_address);
pta[2]=strdup(client_address);
#endif
#ifdef DIGEST_STATS_FAST_1
sprintf(qdsp->digest,"0x%016llX", (long long unsigned int)digest);
pta[3]=qdsp->digest;
sprintf(qdsp->digest,"0x%016llX", (long long unsigned int)digest);
pta[3]=qdsp->digest;
#else
sprintf(buf,"0x%016llX", (long long unsigned int)digest);
pta[3]=strdup(buf);
sprintf(buf,"0x%016llX", (long long unsigned int)digest);
pta[3]=strdup(buf);
#endif
if (digest_text) {
if (digest_text) {
#ifdef DIGEST_STATS_FAST_1
pta[4]=digest_text;
pta[4]=digest_text;
#else
pta[4]=strdup(digest_text);
pta[4]=strdup(digest_text);
#endif
} else {
std::unordered_map<uint64_t, char *>::iterator it;
it=digest_text_umap->find(digest);
if (it != digest_text_umap->end()) {
} else {
std::unordered_map<uint64_t, char *>::iterator it;
it=digest_text_umap->find(digest);
if (it != digest_text_umap->end()) {
#ifdef DIGEST_STATS_FAST_1
pta[4] = it->second;
pta[4] = it->second;
#else
pta[4] = strdup(it->second);
pta[4] = strdup(it->second);
#endif
} else {
assert(0);
}
} else {
assert(0);
}
}
#ifdef DIGEST_STATS_FAST_1
//sprintf(qdsp->count_star,"%u",count_star);
my_itoa(qdsp->count_star, count_star);
pta[5]=qdsp->count_star;
//sprintf(qdsp->count_star,"%u",count_star);
my_itoa(qdsp->count_star, count_star);
pta[5]=qdsp->count_star;
#else
sprintf(buf,"%u",count_star);
pta[5]=strdup(buf);
sprintf(buf,"%u",count_star);
pta[5]=strdup(buf);
#endif
time_t __now;
time(&__now);
unsigned long long curtime=monotonic_time();
time_t seen_time;
seen_time= __now - curtime/1000000 + first_seen/1000000;
time_t __now;
time(&__now);
unsigned long long curtime=monotonic_time();
time_t seen_time;
seen_time= __now - curtime/1000000 + first_seen/1000000;
#ifdef DIGEST_STATS_FAST_1
//sprintf(qdsp->first_seen,"%ld", seen_time);
my_itoa(qdsp->first_seen, seen_time);
pta[6]=qdsp->first_seen;
//sprintf(qdsp->first_seen,"%ld", seen_time);
my_itoa(qdsp->first_seen, seen_time);
pta[6]=qdsp->first_seen;
#else
sprintf(buf,"%ld", seen_time);
pta[6]=strdup(buf);
sprintf(buf,"%ld", seen_time);
pta[6]=strdup(buf);
#endif
seen_time= __now - curtime/1000000 + last_seen/1000000;
seen_time= __now - curtime/1000000 + last_seen/1000000;
#ifdef DIGEST_STATS_FAST_1
//sprintf(qdsp->last_seen,"%ld", seen_time);
my_itoa(qdsp->last_seen, seen_time);
pta[7]=qdsp->last_seen;
//sprintf(qdsp->sum_time,"%llu",sum_time);
my_itoa(qdsp->sum_time,sum_time);
pta[8]=qdsp->sum_time;
//sprintf(qdsp->min_time,"%llu",min_time);
my_itoa(qdsp->min_time,min_time);
pta[9]=qdsp->min_time;
//sprintf(qdsp->max_time,"%llu",max_time);
my_itoa(qdsp->max_time,max_time);
pta[10]=qdsp->max_time;
// we are reverting this back to the use of sprintf instead of my_itoa
// because with my_itoa we are losing the sign
// see issue #2285
sprintf(qdsp->hid,"%d",hid);
//my_itoa(qdsp->hid,hid);
pta[11]=qdsp->hid;
//sprintf(qdsp->rows_affected,"%llu",rows_affected);
my_itoa(qdsp->rows_affected,rows_affected);
pta[12]=qdsp->rows_affected;
//sprintf(qdsp->rows_sent,"%llu",rows_sent);
my_itoa(qdsp->rows_sent,rows_sent);
pta[13]=qdsp->rows_sent;
#else
sprintf(buf,"%ld", seen_time);
pta[7]=strdup(buf);
sprintf(buf,"%llu",sum_time);
pta[8]=strdup(buf);
sprintf(buf,"%llu",min_time);
pta[9]=strdup(buf);
sprintf(buf,"%llu",max_time);
pta[10]=strdup(buf);
sprintf(buf,"%d",hid);
pta[11]=strdup(buf);
sprintf(buf,"%llu",rows_affected);
pta[12]=strdup(buf);
sprintf(buf,"%llu",rows_sent);
pta[13]=strdup(buf);
#endif
return pta;
}
/*
void free_row(query_digest_stats_pointers_t *qdsp) {
//free(qdsp->pta[0]);
//free(qdsp->pta[1]);
//free(qdsp->pta[2]);
//free(qdsp->pta[4]);
}
*/
//sprintf(qdsp->last_seen,"%ld", seen_time);
my_itoa(qdsp->last_seen, seen_time);
pta[7]=qdsp->last_seen;
//sprintf(qdsp->sum_time,"%llu",sum_time);
my_itoa(qdsp->sum_time,sum_time);
pta[8]=qdsp->sum_time;
//sprintf(qdsp->min_time,"%llu",min_time);
my_itoa(qdsp->min_time,min_time);
pta[9]=qdsp->min_time;
//sprintf(qdsp->max_time,"%llu",max_time);
my_itoa(qdsp->max_time,max_time);
pta[10]=qdsp->max_time;
// we are reverting this back to the use of sprintf instead of my_itoa
// because with my_itoa we are losing the sign
// see issue #2285
sprintf(qdsp->hid,"%d",hid);
//my_itoa(qdsp->hid,hid);
pta[11]=qdsp->hid;
//sprintf(qdsp->rows_affected,"%llu",rows_affected);
my_itoa(qdsp->rows_affected,rows_affected);
pta[12]=qdsp->rows_affected;
//sprintf(qdsp->rows_sent,"%llu",rows_sent);
my_itoa(qdsp->rows_sent,rows_sent);
pta[13]=qdsp->rows_sent;
#else
sprintf(buf,"%ld", seen_time);
pta[7]=strdup(buf);
sprintf(buf,"%llu",sum_time);
pta[8]=strdup(buf);
sprintf(buf,"%llu",min_time);
pta[9]=strdup(buf);
sprintf(buf,"%llu",max_time);
pta[10]=strdup(buf);
sprintf(buf,"%d",hid);
pta[11]=strdup(buf);
sprintf(buf,"%llu",rows_affected);
pta[12]=strdup(buf);
sprintf(buf,"%llu",rows_sent);
pta[13]=strdup(buf);
#endif
return pta;
}
#ifdef DIGEST_STATS_FAST_1
#else
void free_row(char **pta) {
int i;
for (i=0;i<14;i++) {
assert(pta[i]);
free(pta[i]);
}
free(pta);
void QP_query_digest_stats::free_row(char **pta) {
int i;
for (i=0;i<14;i++) {
assert(pta[i]);
free(pta[i]);
}
free(pta);
}
#endif
};
struct __RE2_objects_t {
pcrecpp::RE_Options *opt1;
@ -1476,6 +1427,22 @@ SQLite3_result * Query_Processor::get_query_digests() {
return result;
}
void Query_Processor::get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt) {
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&digest_rwlock);
#else
spin_wrlock(&digest_rwlock);
#endif
digest_umap.swap(*uqd);
digest_text_umap.swap(*uqdt);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&digest_rwlock);
#else
spin_wrunlock(&digest_rwlock);
#endif
}
SQLite3_result * Query_Processor::get_query_digests_reset() {
SQLite3_result *result = NULL;
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX

Loading…
Cancel
Save