Create stats_mysql_query_digest statements directly from the digest_umap

pull/4096/head
Javier Sánchez Parra 3 years ago
parent 6fb18bd2ac
commit 14ba7475d4

@ -5,6 +5,7 @@
#include <prometheus/counter.h>
#include <prometheus/gauge.h>
#include "query_processor.h"
#include "proxy_defines.h"
#include "proxysql.h"
#include "cpp.h"
@ -417,6 +418,10 @@ class ProxySQL_Admin {
void p_update_metrics();
void stats___mysql_query_rules();
void stats___save_mysql_query_digest_to_sqlite(
const bool reset, const bool copy, const SQLite3_result *resultset,
const umap_query_digest *digest_umap, const umap_query_digest_text *digest_text_umap
);
void stats___mysql_query_digests(bool reset, bool copy=false);
//void stats___mysql_query_digests_reset();
void stats___mysql_commands_counters();

@ -323,8 +323,8 @@ class Query_Processor {
SQLite3_result * get_stats_commands_counters();
SQLite3_result * get_query_digests();
SQLite3_result * get_query_digests_reset();
SQLite3_result * get_query_digests_v2();
SQLite3_result * get_query_digests_reset_v2();
SQLite3_result * get_query_digests_v2(const bool use_resultset = true);
SQLite3_result * get_query_digests_reset_v2(const bool use_resultset = true);
void get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt);
unsigned long long purge_query_digests(bool async_purge, bool parallel, char **msg);
unsigned long long purge_query_digests_async(char **msg);

@ -1,4 +1,5 @@
#include <iostream> // std::cout
#include <sstream> // std::stringstream
#include <fstream>
#include <algorithm> // std::sort
#include <memory>
@ -9412,6 +9413,105 @@ void ProxySQL_Admin::stats___proxysql_message_metrics(bool reset) {
delete resultset;
}
void ProxySQL_Admin::stats___save_mysql_query_digest_to_sqlite(
const bool reset, const bool copy, const SQLite3_result *resultset, const umap_query_digest *digest_umap,
const umap_query_digest_text *digest_text_umap
) {
statsdb->execute("BEGIN");
int rc;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
char *query1=NULL;
char *query32=NULL;
std::string query32s = "";
statsdb->execute("DELETE FROM stats_mysql_query_digest_reset");
statsdb->execute("DELETE FROM stats_mysql_query_digest");
if (reset) {
query1=(char *)"INSERT INTO stats_mysql_query_digest_reset VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)";
query32s = "INSERT INTO stats_mysql_query_digest_reset VALUES " + generate_multi_rows_query(32,14);
query32 = (char *)query32s.c_str();
} else {
query1=(char *)"INSERT INTO stats_mysql_query_digest VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)";
query32s = "INSERT INTO stats_mysql_query_digest VALUES " + generate_multi_rows_query(32,14);
query32 = (char *)query32s.c_str();
}
rc = statsdb->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, statsdb);
rc = statsdb->prepare_v2(query32, &statement32);
ASSERT_SQLITE_OK(rc, statsdb);
int row_idx=0;
int num_rows = resultset ? resultset->rows_count : digest_umap->size();
int max_bulk_row_idx = num_rows/32;
max_bulk_row_idx=max_bulk_row_idx*32;
auto it = digest_umap->cbegin();
int i = 0;
// If the function do not receives a resultset, it gets the values directly from the digest_umap
while (resultset ? i != resultset->rows_count : it != digest_umap->end()) {
QP_query_digest_stats *qds = (QP_query_digest_stats *)it->second;
SQLite3_row *row = resultset ? resultset->rows[i] : NULL; i++;
string digest_hex_str;
if (!resultset) {
std::ostringstream digest_stream;
digest_stream << "0x" << std::hex << qds->digest;
digest_hex_str = digest_stream.str();
}
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+1, resultset ? atoll(row->fields[11]) : qds->hid); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+2, resultset ? row->fields[0] : qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+3, resultset ? row->fields[1] : qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+4, resultset ? row->fields[2] : qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+5, resultset ? row->fields[3] : digest_hex_str.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+6, resultset ? row->fields[4] : qds->get_digest_text(digest_text_umap), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+7, resultset ? atoll(row->fields[5]) : qds->count_star); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+8, resultset ? atoll(row->fields[6]) : qds->first_seen); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+9, resultset ? atoll(row->fields[7]) : qds->last_seen); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+10, resultset ? atoll(row->fields[8]) : qds->sum_time); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+11, resultset ? atoll(row->fields[9]) : qds->min_time); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+12, resultset ? atoll(row->fields[10]) : qds->max_time); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+13, resultset ? atoll(row->fields[12]) : qds->rows_affected); ASSERT_SQLITE_OK(rc, statsdb); // rows affected
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+14, resultset ? atoll(row->fields[13]) : qds->rows_sent); ASSERT_SQLITE_OK(rc, statsdb); // rows sent
if (idx==31) {
SAFE_SQLITE3_STEP2(statement32);
rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, statsdb);
}
} else { // single row
rc=(*proxy_sqlite3_bind_int64)(statement1, 1, resultset ? atoll(row->fields[11]) : qds->hid); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement1, 2, resultset ? row->fields[0] : qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement1, 3, resultset ? row->fields[1] : qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement1, 4, resultset ? row->fields[2] : qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement1, 5, resultset ? row->fields[3] : digest_hex_str.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_text)(statement1, 6, resultset ? row->fields[4] : qds->get_digest_text(digest_text_umap), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 7, resultset ? atoll(row->fields[5]) : qds->count_star); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 8, resultset ? atoll(row->fields[6]) : qds->first_seen); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 9, resultset ? atoll(row->fields[7]) : qds->last_seen); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 10, resultset ? atoll(row->fields[8]) : qds->sum_time); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 11, resultset ? atoll(row->fields[9]) : qds->min_time); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 12, resultset ? atoll(row->fields[10]) : qds->max_time); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 13, resultset ? atoll(row->fields[12]) : qds->rows_affected); ASSERT_SQLITE_OK(rc, statsdb); // rows affected
rc=(*proxy_sqlite3_bind_int64)(statement1, 14, resultset ? atoll(row->fields[13]) : qds->rows_sent); ASSERT_SQLITE_OK(rc, statsdb); // rows sent
SAFE_SQLITE3_STEP2(statement1);
rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, statsdb);
rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, statsdb);
}
row_idx++;
if (resultset)
i++;
else
it++;
}
(*proxy_sqlite3_finalize)(statement1);
(*proxy_sqlite3_finalize)(statement32);
if (reset) {
if (copy) {
statsdb->execute("INSERT INTO stats_mysql_query_digest SELECT * FROM stats_mysql_query_digest_reset");
}
}
statsdb->execute("COMMIT");
}
void ProxySQL_Admin::stats___mysql_query_digests(bool reset, bool copy) {
if (!GloQPro) return;
SQLite3_result * resultset=NULL;

@ -31,6 +31,7 @@
#include <thread>
#include <future>
extern MySQL_Threads_Handler *GloMTH;
extern ProxySQL_Admin *GloAdmin;
static int int_cmp(const void *a, const void *b) {
const unsigned long long *ia = (const unsigned long long *)a;
@ -1188,7 +1189,7 @@ unsigned long long Query_Processor::get_query_digests_total_size() {
return ret;
}
SQLite3_result * Query_Processor::get_query_digests_v2() {
SQLite3_result * Query_Processor::get_query_digests_v2(const bool use_resultset) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n");
SQLite3_result *result = NULL;
// Create two auxiliary maps and swap its content with the main maps. This
@ -1203,60 +1204,64 @@ SQLite3_result * Query_Processor::get_query_digests_v2() {
unsigned long long curtime1;
unsigned long long curtime2;
size_t map_size = digest_umap_aux.size();
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
result = new SQLite3_result(14, true);
curtime1 = monotonic_time();
} else {
result = new SQLite3_result(14);
}
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"client_address");
result->add_column_definition(SQLITE_TEXT,"digest");
result->add_column_definition(SQLITE_TEXT,"digest_text");
result->add_column_definition(SQLITE_TEXT,"count_star");
result->add_column_definition(SQLITE_TEXT,"first_seen");
result->add_column_definition(SQLITE_TEXT,"last_seen");
result->add_column_definition(SQLITE_TEXT,"sum_time");
result->add_column_definition(SQLITE_TEXT,"min_time");
result->add_column_definition(SQLITE_TEXT,"max_time");
result->add_column_definition(SQLITE_TEXT,"rows_affected");
result->add_column_definition(SQLITE_TEXT,"rows_sent");
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
int n=DIGEST_STATS_FAST_THREADS;
get_query_digests_parallel_args args[n];
for (int i=0; i<n; i++) {
args[i].m=i;
//args[i].ret=0;
args[i].gu = &digest_umap_aux;
args[i].gtu = &digest_text_umap_aux;
args[i].result = result;
args[i].free_me = false;
}
for (int i=0; i<n; i++) {
if ( pthread_create(&args[i].thr, NULL, &get_query_digests_parallel, &args[i]) != 0 ) {
// LCOV_EXCL_START
assert(0);
// LCOV_EXCL_STOP
if (use_resultset) {
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
result = new SQLite3_result(14, true);
curtime1 = monotonic_time();
} else {
result = new SQLite3_result(14);
}
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"client_address");
result->add_column_definition(SQLITE_TEXT,"digest");
result->add_column_definition(SQLITE_TEXT,"digest_text");
result->add_column_definition(SQLITE_TEXT,"count_star");
result->add_column_definition(SQLITE_TEXT,"first_seen");
result->add_column_definition(SQLITE_TEXT,"last_seen");
result->add_column_definition(SQLITE_TEXT,"sum_time");
result->add_column_definition(SQLITE_TEXT,"min_time");
result->add_column_definition(SQLITE_TEXT,"max_time");
result->add_column_definition(SQLITE_TEXT,"rows_affected");
result->add_column_definition(SQLITE_TEXT,"rows_sent");
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
int n=DIGEST_STATS_FAST_THREADS;
get_query_digests_parallel_args args[n];
for (int i=0; i<n; i++) {
args[i].m=i;
args[i].gu = &digest_umap_aux;
args[i].gtu = &digest_text_umap_aux;
args[i].result = result;
args[i].free_me = false;
}
for (int i=0; i<n; i++) {
if ( pthread_create(&args[i].thr, NULL, &get_query_digests_parallel, &args[i]) != 0 ) {
// LCOV_EXCL_START
assert(0);
// LCOV_EXCL_STOP
}
}
for (int i=0; i<n; i++) {
pthread_join(args[i].thr, NULL);
}
} else {
for (
std::unordered_map<uint64_t, void *>::iterator it = digest_umap_aux.begin();
it != digest_umap_aux.end();
++it
) {
QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second;
query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t));
char **pta=qds->get_row(&digest_text_umap_aux, a);
result->add_row(pta);
free(a);
}
}
for (int i=0; i<n; i++) {
pthread_join(args[i].thr, NULL);
}
} else {
for (
std::unordered_map<uint64_t, void *>::iterator it = digest_umap_aux.begin();
it != digest_umap_aux.end();
++it
) {
QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second;
query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t));
char **pta=qds->get_row(&digest_text_umap_aux, a);
result->add_row(pta);
free(a);
}
}
GloAdmin->stats___save_mysql_query_digest_to_sqlite(
false, false, result, &digest_umap_aux, &digest_text_umap_aux
);
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
curtime2=monotonic_time();
curtime1 = curtime1/1000;
@ -1359,7 +1364,7 @@ SQLite3_result * Query_Processor::get_query_digests() {
return result;
}
SQLite3_result * Query_Processor::get_query_digests_reset_v2() {
SQLite3_result * Query_Processor::get_query_digests_reset_v2(const bool use_resultset) {
SQLite3_result *result = NULL;
umap_query_digest digest_umap_aux;
umap_query_digest_text digest_text_umap_aux;
@ -1369,68 +1374,73 @@ SQLite3_result * Query_Processor::get_query_digests_reset_v2() {
pthread_rwlock_unlock(&digest_rwlock);
unsigned long long curtime1;
unsigned long long curtime2;
bool free_me = true;
bool defer_free = true;
size_t map_size = digest_umap.size();
bool free_me = false;
bool defer_free = false;
int n=DIGEST_STATS_FAST_THREADS;
get_query_digests_parallel_args args[n];
size_t map_size = digest_umap_aux.size();
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
curtime1=monotonic_time();
result = new SQLite3_result(14, true);
} else {
result = new SQLite3_result(14);
}
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"client_address");
result->add_column_definition(SQLITE_TEXT,"digest");
result->add_column_definition(SQLITE_TEXT,"digest_text");
result->add_column_definition(SQLITE_TEXT,"count_star");
result->add_column_definition(SQLITE_TEXT,"first_seen");
result->add_column_definition(SQLITE_TEXT,"last_seen");
result->add_column_definition(SQLITE_TEXT,"sum_time");
result->add_column_definition(SQLITE_TEXT,"min_time");
result->add_column_definition(SQLITE_TEXT,"max_time");
result->add_column_definition(SQLITE_TEXT,"rows_affected");
result->add_column_definition(SQLITE_TEXT,"rows_sent");
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
for (int i=0; i<n; i++) {
args[i].m=i;
//args[i].ret=0;
args[i].gu = &digest_umap_aux;
args[i].gtu = &digest_text_umap_aux;
args[i].result = result;
args[i].free_me = free_me;
args[i].defer_free = defer_free;
}
for (int i=0; i<n; i++) {
if ( pthread_create(&args[i].thr, NULL, &get_query_digests_parallel, &args[i]) != 0 ) {
// LCOV_EXCL_START
assert(0);
// LCOV_EXCL_STOP
if (use_resultset) {
free_me = true;
defer_free = true;
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
curtime1=monotonic_time();
result = new SQLite3_result(14, true);
} else {
result = new SQLite3_result(14);
}
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"client_address");
result->add_column_definition(SQLITE_TEXT,"digest");
result->add_column_definition(SQLITE_TEXT,"digest_text");
result->add_column_definition(SQLITE_TEXT,"count_star");
result->add_column_definition(SQLITE_TEXT,"first_seen");
result->add_column_definition(SQLITE_TEXT,"last_seen");
result->add_column_definition(SQLITE_TEXT,"sum_time");
result->add_column_definition(SQLITE_TEXT,"min_time");
result->add_column_definition(SQLITE_TEXT,"max_time");
result->add_column_definition(SQLITE_TEXT,"rows_affected");
result->add_column_definition(SQLITE_TEXT,"rows_sent");
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
for (int i=0; i<n; i++) {
args[i].m=i;
args[i].gu = &digest_umap_aux;
args[i].gtu = &digest_text_umap_aux;
args[i].result = result;
args[i].free_me = free_me;
args[i].defer_free = defer_free;
}
for (int i=0; i<n; i++) {
if ( pthread_create(&args[i].thr, NULL, &get_query_digests_parallel, &args[i]) != 0 ) {
// LCOV_EXCL_START
assert(0);
// LCOV_EXCL_STOP
}
}
}
for (int i=0; i<n; i++) {
pthread_join(args[i].thr, NULL);
}
if (free_me == false) {
for (int i=0; i<n; i++) {
pthread_join(args[i].thr, NULL);
}
if (free_me == false) {
for (std::unordered_map<uint64_t, void *>::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) {
QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second;
delete qds;
}
}
} else {
for (std::unordered_map<uint64_t, void *>::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) {
QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second;
query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t));
char **pta=qds->get_row(&digest_text_umap_aux, a);
result->add_row(pta);
free(a);
delete qds;
}
}
} else {
for (std::unordered_map<uint64_t, void *>::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) {
QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second;
query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t));
char **pta=qds->get_row(&digest_text_umap_aux, a);
result->add_row(pta);
//qds->free_row(pta);
free(a);
delete qds;
}
}
GloAdmin->stats___save_mysql_query_digest_to_sqlite(
false, false, result, &digest_umap_aux, &digest_text_umap_aux
);
digest_umap_aux.clear();
// this part is always single-threaded
for (std::unordered_map<uint64_t, char *>::iterator it=digest_text_umap_aux.begin(); it!=digest_text_umap_aux.end(); ++it) {

Loading…
Cancel
Save