diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index ce15efa67..81576bbc0 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -5,6 +5,7 @@ #include #include +#include "query_processor.h" #include "proxy_defines.h" #include "proxysql.h" #include "cpp.h" @@ -417,6 +418,10 @@ class ProxySQL_Admin { void p_update_metrics(); void stats___mysql_query_rules(); + void stats___save_mysql_query_digest_to_sqlite( + const bool reset, const bool copy, const SQLite3_result *resultset, + const umap_query_digest *digest_umap, const umap_query_digest_text *digest_text_umap + ); void stats___mysql_query_digests(bool reset, bool copy=false); //void stats___mysql_query_digests_reset(); void stats___mysql_commands_counters(); diff --git a/include/query_processor.h b/include/query_processor.h index d9110d13f..2e8cf247d 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -323,8 +323,8 @@ class Query_Processor { SQLite3_result * get_stats_commands_counters(); SQLite3_result * get_query_digests(); SQLite3_result * get_query_digests_reset(); - SQLite3_result * get_query_digests_v2(); - SQLite3_result * get_query_digests_reset_v2(); + SQLite3_result * get_query_digests_v2(const bool use_resultset = true); + SQLite3_result * get_query_digests_reset_v2(const bool use_resultset = true); void get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt); unsigned long long purge_query_digests(bool async_purge, bool parallel, char **msg); unsigned long long purge_query_digests_async(char **msg); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index feafdcefd..abe51f9bd 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -1,4 +1,5 @@ #include // std::cout +#include // std::stringstream #include #include // std::sort #include @@ -9412,6 +9413,105 @@ void ProxySQL_Admin::stats___proxysql_message_metrics(bool reset) { delete resultset; } +void ProxySQL_Admin::stats___save_mysql_query_digest_to_sqlite( + const bool reset, const bool copy, const SQLite3_result *resultset, const umap_query_digest *digest_umap, + const umap_query_digest_text *digest_text_umap +) { + statsdb->execute("BEGIN"); + int rc; + sqlite3_stmt *statement1=NULL; + sqlite3_stmt *statement32=NULL; + char *query1=NULL; + char *query32=NULL; + std::string query32s = ""; + statsdb->execute("DELETE FROM stats_mysql_query_digest_reset"); + statsdb->execute("DELETE FROM stats_mysql_query_digest"); + if (reset) { + query1=(char *)"INSERT INTO stats_mysql_query_digest_reset VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + query32s = "INSERT INTO stats_mysql_query_digest_reset VALUES " + generate_multi_rows_query(32,14); + query32 = (char *)query32s.c_str(); + } else { + query1=(char *)"INSERT INTO stats_mysql_query_digest VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + query32s = "INSERT INTO stats_mysql_query_digest VALUES " + generate_multi_rows_query(32,14); + query32 = (char *)query32s.c_str(); + } + + rc = statsdb->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, statsdb); + rc = statsdb->prepare_v2(query32, &statement32); + ASSERT_SQLITE_OK(rc, statsdb); + int row_idx=0; + int num_rows = resultset ? resultset->rows_count : digest_umap->size(); + int max_bulk_row_idx = num_rows/32; + max_bulk_row_idx=max_bulk_row_idx*32; + auto it = digest_umap->cbegin(); + int i = 0; + // If the function do not receives a resultset, it gets the values directly from the digest_umap + while (resultset ? i != resultset->rows_count : it != digest_umap->end()) { + QP_query_digest_stats *qds = (QP_query_digest_stats *)it->second; + SQLite3_row *row = resultset ? resultset->rows[i] : NULL; i++; + string digest_hex_str; + if (!resultset) { + std::ostringstream digest_stream; + digest_stream << "0x" << std::hex << qds->digest; + digest_hex_str = digest_stream.str(); + } + int idx=row_idx%32; + if (row_idxfields[11]) : qds->hid); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+2, resultset ? row->fields[0] : qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+3, resultset ? row->fields[1] : qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+4, resultset ? row->fields[2] : qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+5, resultset ? row->fields[3] : digest_hex_str.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+6, resultset ? row->fields[4] : qds->get_digest_text(digest_text_umap), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+7, resultset ? atoll(row->fields[5]) : qds->count_star); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+8, resultset ? atoll(row->fields[6]) : qds->first_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+9, resultset ? atoll(row->fields[7]) : qds->last_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+10, resultset ? atoll(row->fields[8]) : qds->sum_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+11, resultset ? atoll(row->fields[9]) : qds->min_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+12, resultset ? atoll(row->fields[10]) : qds->max_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+13, resultset ? atoll(row->fields[12]) : qds->rows_affected); ASSERT_SQLITE_OK(rc, statsdb); // rows affected + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+14, resultset ? atoll(row->fields[13]) : qds->rows_sent); ASSERT_SQLITE_OK(rc, statsdb); // rows sent + if (idx==31) { + SAFE_SQLITE3_STEP2(statement32); + rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, statsdb); + } + } else { // single row + rc=(*proxy_sqlite3_bind_int64)(statement1, 1, resultset ? atoll(row->fields[11]) : qds->hid); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 2, resultset ? row->fields[0] : qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 3, resultset ? row->fields[1] : qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 4, resultset ? row->fields[2] : qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 5, resultset ? row->fields[3] : digest_hex_str.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 6, resultset ? row->fields[4] : qds->get_digest_text(digest_text_umap), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 7, resultset ? atoll(row->fields[5]) : qds->count_star); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 8, resultset ? atoll(row->fields[6]) : qds->first_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 9, resultset ? atoll(row->fields[7]) : qds->last_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 10, resultset ? atoll(row->fields[8]) : qds->sum_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 11, resultset ? atoll(row->fields[9]) : qds->min_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 12, resultset ? atoll(row->fields[10]) : qds->max_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 13, resultset ? atoll(row->fields[12]) : qds->rows_affected); ASSERT_SQLITE_OK(rc, statsdb); // rows affected + rc=(*proxy_sqlite3_bind_int64)(statement1, 14, resultset ? atoll(row->fields[13]) : qds->rows_sent); ASSERT_SQLITE_OK(rc, statsdb); // rows sent + SAFE_SQLITE3_STEP2(statement1); + rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, statsdb); + } + row_idx++; + if (resultset) + i++; + else + it++; + } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement32); + if (reset) { + if (copy) { + statsdb->execute("INSERT INTO stats_mysql_query_digest SELECT * FROM stats_mysql_query_digest_reset"); + } + } + statsdb->execute("COMMIT"); +} + void ProxySQL_Admin::stats___mysql_query_digests(bool reset, bool copy) { if (!GloQPro) return; SQLite3_result * resultset=NULL; diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index 2bc89256a..ac2ba66cb 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -31,6 +31,7 @@ #include #include extern MySQL_Threads_Handler *GloMTH; +extern ProxySQL_Admin *GloAdmin; static int int_cmp(const void *a, const void *b) { const unsigned long long *ia = (const unsigned long long *)a; @@ -1188,7 +1189,7 @@ unsigned long long Query_Processor::get_query_digests_total_size() { return ret; } -SQLite3_result * Query_Processor::get_query_digests_v2() { +SQLite3_result * Query_Processor::get_query_digests_v2(const bool use_resultset) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n"); SQLite3_result *result = NULL; // Create two auxiliary maps and swap its content with the main maps. This @@ -1203,60 +1204,64 @@ SQLite3_result * Query_Processor::get_query_digests_v2() { unsigned long long curtime1; unsigned long long curtime2; size_t map_size = digest_umap_aux.size(); - if (map_size >= DIGEST_STATS_FAST_MINSIZE) { - result = new SQLite3_result(14, true); - curtime1 = monotonic_time(); - } else { - result = new SQLite3_result(14); - } - result->add_column_definition(SQLITE_TEXT,"hid"); - result->add_column_definition(SQLITE_TEXT,"schemaname"); - result->add_column_definition(SQLITE_TEXT,"username"); - result->add_column_definition(SQLITE_TEXT,"client_address"); - result->add_column_definition(SQLITE_TEXT,"digest"); - result->add_column_definition(SQLITE_TEXT,"digest_text"); - result->add_column_definition(SQLITE_TEXT,"count_star"); - result->add_column_definition(SQLITE_TEXT,"first_seen"); - result->add_column_definition(SQLITE_TEXT,"last_seen"); - result->add_column_definition(SQLITE_TEXT,"sum_time"); - result->add_column_definition(SQLITE_TEXT,"min_time"); - result->add_column_definition(SQLITE_TEXT,"max_time"); - result->add_column_definition(SQLITE_TEXT,"rows_affected"); - result->add_column_definition(SQLITE_TEXT,"rows_sent"); - if (map_size >= DIGEST_STATS_FAST_MINSIZE) { - int n=DIGEST_STATS_FAST_THREADS; - get_query_digests_parallel_args args[n]; - for (int i=0; i= DIGEST_STATS_FAST_MINSIZE) { + result = new SQLite3_result(14, true); + curtime1 = monotonic_time(); + } else { + result = new SQLite3_result(14); + } + result->add_column_definition(SQLITE_TEXT,"hid"); + result->add_column_definition(SQLITE_TEXT,"schemaname"); + result->add_column_definition(SQLITE_TEXT,"username"); + result->add_column_definition(SQLITE_TEXT,"client_address"); + result->add_column_definition(SQLITE_TEXT,"digest"); + result->add_column_definition(SQLITE_TEXT,"digest_text"); + result->add_column_definition(SQLITE_TEXT,"count_star"); + result->add_column_definition(SQLITE_TEXT,"first_seen"); + result->add_column_definition(SQLITE_TEXT,"last_seen"); + result->add_column_definition(SQLITE_TEXT,"sum_time"); + result->add_column_definition(SQLITE_TEXT,"min_time"); + result->add_column_definition(SQLITE_TEXT,"max_time"); + result->add_column_definition(SQLITE_TEXT,"rows_affected"); + result->add_column_definition(SQLITE_TEXT,"rows_sent"); + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + int n=DIGEST_STATS_FAST_THREADS; + get_query_digests_parallel_args args[n]; + for (int i=0; i::iterator it = digest_umap_aux.begin(); + it != digest_umap_aux.end(); + ++it + ) { + QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; + query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t)); + char **pta=qds->get_row(&digest_text_umap_aux, a); + result->add_row(pta); + free(a); } - } - for (int i=0; i::iterator it = digest_umap_aux.begin(); - it != digest_umap_aux.end(); - ++it - ) { - QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; - query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t)); - char **pta=qds->get_row(&digest_text_umap_aux, a); - result->add_row(pta); - free(a); } } + GloAdmin->stats___save_mysql_query_digest_to_sqlite( + false, false, result, &digest_umap_aux, &digest_text_umap_aux + ); if (map_size >= DIGEST_STATS_FAST_MINSIZE) { curtime2=monotonic_time(); curtime1 = curtime1/1000; @@ -1359,7 +1364,7 @@ SQLite3_result * Query_Processor::get_query_digests() { return result; } -SQLite3_result * Query_Processor::get_query_digests_reset_v2() { +SQLite3_result * Query_Processor::get_query_digests_reset_v2(const bool use_resultset) { SQLite3_result *result = NULL; umap_query_digest digest_umap_aux; umap_query_digest_text digest_text_umap_aux; @@ -1369,68 +1374,73 @@ SQLite3_result * Query_Processor::get_query_digests_reset_v2() { pthread_rwlock_unlock(&digest_rwlock); unsigned long long curtime1; unsigned long long curtime2; - bool free_me = true; - bool defer_free = true; + size_t map_size = digest_umap.size(); + bool free_me = false; + bool defer_free = false; int n=DIGEST_STATS_FAST_THREADS; get_query_digests_parallel_args args[n]; - size_t map_size = digest_umap_aux.size(); - if (map_size >= DIGEST_STATS_FAST_MINSIZE) { - curtime1=monotonic_time(); - result = new SQLite3_result(14, true); - } else { - result = new SQLite3_result(14); - } - result->add_column_definition(SQLITE_TEXT,"hid"); - result->add_column_definition(SQLITE_TEXT,"schemaname"); - result->add_column_definition(SQLITE_TEXT,"username"); - result->add_column_definition(SQLITE_TEXT,"client_address"); - result->add_column_definition(SQLITE_TEXT,"digest"); - result->add_column_definition(SQLITE_TEXT,"digest_text"); - result->add_column_definition(SQLITE_TEXT,"count_star"); - result->add_column_definition(SQLITE_TEXT,"first_seen"); - result->add_column_definition(SQLITE_TEXT,"last_seen"); - result->add_column_definition(SQLITE_TEXT,"sum_time"); - result->add_column_definition(SQLITE_TEXT,"min_time"); - result->add_column_definition(SQLITE_TEXT,"max_time"); - result->add_column_definition(SQLITE_TEXT,"rows_affected"); - result->add_column_definition(SQLITE_TEXT,"rows_sent"); - if (map_size >= DIGEST_STATS_FAST_MINSIZE) { - for (int i=0; i= DIGEST_STATS_FAST_MINSIZE) { + curtime1=monotonic_time(); + result = new SQLite3_result(14, true); + } else { + result = new SQLite3_result(14); + } + result->add_column_definition(SQLITE_TEXT,"hid"); + result->add_column_definition(SQLITE_TEXT,"schemaname"); + result->add_column_definition(SQLITE_TEXT,"username"); + result->add_column_definition(SQLITE_TEXT,"client_address"); + result->add_column_definition(SQLITE_TEXT,"digest"); + result->add_column_definition(SQLITE_TEXT,"digest_text"); + result->add_column_definition(SQLITE_TEXT,"count_star"); + result->add_column_definition(SQLITE_TEXT,"first_seen"); + result->add_column_definition(SQLITE_TEXT,"last_seen"); + result->add_column_definition(SQLITE_TEXT,"sum_time"); + result->add_column_definition(SQLITE_TEXT,"min_time"); + result->add_column_definition(SQLITE_TEXT,"max_time"); + result->add_column_definition(SQLITE_TEXT,"rows_affected"); + result->add_column_definition(SQLITE_TEXT,"rows_sent"); + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + for (int i=0; i::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) { + QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; + delete qds; + } + } + } else { for (std::unordered_map::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) { QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; + query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t)); + char **pta=qds->get_row(&digest_text_umap_aux, a); + result->add_row(pta); + free(a); delete qds; } } - } else { - for (std::unordered_map::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) { - QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; - query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t)); - char **pta=qds->get_row(&digest_text_umap_aux, a); - result->add_row(pta); - //qds->free_row(pta); - free(a); - delete qds; - } } + GloAdmin->stats___save_mysql_query_digest_to_sqlite( + false, false, result, &digest_umap_aux, &digest_text_umap_aux + ); digest_umap_aux.clear(); // this part is always single-threaded for (std::unordered_map::iterator it=digest_text_umap_aux.begin(); it!=digest_text_umap_aux.end(); ++it) {