diff --git a/include/query_processor.h b/include/query_processor.h index 15f4ceeca..567d53b37 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -61,7 +61,10 @@ class QP_query_digest_stats { unsigned long long rows_sent; int hid; QP_query_digest_stats(char *u, char *s, uint64_t d, char *dt, int h, char *ca); - void add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs); + void add_time( + unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs, + unsigned long long cnt = 1 + ); ~QP_query_digest_stats(); char **get_row(umap_query_digest_text *digest_text_umap, query_digest_stats_pointers_t *qdsp); }; @@ -319,6 +322,7 @@ class Query_Processor { SQLite3_result * get_stats_commands_counters(); SQLite3_result * get_query_digests(); SQLite3_result * get_query_digests_reset(); + SQLite3_result * get_query_digests_v2(); void get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt); unsigned long long purge_query_digests(bool async_purge, bool parallel, char **msg); unsigned long long purge_query_digests_async(char **msg); diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index debba9d22..5923ce0ed 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -186,8 +186,11 @@ QP_query_digest_stats::QP_query_digest_stats(char *u, char *s, uint64_t d, char rows_sent=0; hid=h; } -void QP_query_digest_stats::add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs) { - count_star++; +void QP_query_digest_stats::add_time( + unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs, + unsigned long long cnt +) { + count_star += cnt; sum_time+=t; rows_affected+=ra; rows_sent+=rs; @@ -1173,6 +1176,110 @@ unsigned long long Query_Processor::get_query_digests_total_size() { return ret; } +SQLite3_result * Query_Processor::get_query_digests_v2() { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n"); + SQLite3_result *result = NULL; + // Create two auxiliary maps and swap its content with the main maps. This + // way, this function can read query digests stored until now while other + // threads write in the other map. We need to lock while swapping. + umap_query_digest digest_umap_aux; + umap_query_digest_text digest_text_umap_aux; + pthread_rwlock_wrlock(&digest_rwlock); + digest_umap.swap(digest_umap_aux); + digest_text_umap.swap(digest_text_umap_aux); + pthread_rwlock_unlock(&digest_rwlock); + unsigned long long curtime1; + unsigned long long curtime2; + size_t map_size = digest_umap_aux.size(); + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + result = new SQLite3_result(14, true); + curtime1 = monotonic_time(); + } else { + result = new SQLite3_result(14); + } + result->add_column_definition(SQLITE_TEXT,"hid"); + result->add_column_definition(SQLITE_TEXT,"schemaname"); + result->add_column_definition(SQLITE_TEXT,"username"); + result->add_column_definition(SQLITE_TEXT,"client_address"); + result->add_column_definition(SQLITE_TEXT,"digest"); + result->add_column_definition(SQLITE_TEXT,"digest_text"); + result->add_column_definition(SQLITE_TEXT,"count_star"); + result->add_column_definition(SQLITE_TEXT,"first_seen"); + result->add_column_definition(SQLITE_TEXT,"last_seen"); + result->add_column_definition(SQLITE_TEXT,"sum_time"); + result->add_column_definition(SQLITE_TEXT,"min_time"); + result->add_column_definition(SQLITE_TEXT,"max_time"); + result->add_column_definition(SQLITE_TEXT,"rows_affected"); + result->add_column_definition(SQLITE_TEXT,"rows_sent"); + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + int n=DIGEST_STATS_FAST_THREADS; + get_query_digests_parallel_args args[n]; + for (int i=0; i::iterator it = digest_umap_aux.begin(); + it != digest_umap_aux.end(); + ++it + ) { + QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; + query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t)); + char **pta=qds->get_row(&digest_text_umap_aux, a); + result->add_row(pta); + free(a); + } + } + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + curtime2=monotonic_time(); + curtime1 = curtime1/1000; + curtime2 = curtime2/1000; + proxy_info("Running query on stats_mysql_query_digest: %llums to retrieve %lu entries\n", curtime2-curtime1, map_size); + } + + // Once the reading finishes, we lock and swap again both maps. Then, we + // merge the content of the auxiliary maps in the main maps and clear the + // content of the auxiliary maps. + pthread_rwlock_wrlock(&digest_rwlock); + digest_umap_aux.swap(digest_umap); + digest_text_umap_aux.swap(digest_text_umap); + for (const auto& element : digest_umap_aux) { + uint64_t digest = element.first; + QP_query_digest_stats *qds = (QP_query_digest_stats *)element.second; + std::unordered_map::iterator it = digest_umap.find(digest); + if (it != digest_umap.end()) { + // found + QP_query_digest_stats *qds_equal = (QP_query_digest_stats *)it->second; + qds_equal->add_time( + qds->min_time, qds->last_seen, qds->rows_affected, qds->rows_sent, qds->count_star + ); + } else { + digest_umap.insert(element); + } + } + digest_text_umap.insert(digest_text_umap_aux.begin(), digest_text_umap_aux.end()); + digest_umap_aux.clear(); + digest_text_umap_aux.clear(); + pthread_rwlock_unlock(&digest_rwlock); + + return result; +} + SQLite3_result * Query_Processor::get_query_digests() { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n"); SQLite3_result *result = NULL;