diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 53004f20f..29f8ef25d 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -5,6 +5,7 @@ #include #include +#include "query_processor.h" #include "proxy_defines.h" #include "proxysql.h" #include "cpp.h" @@ -421,7 +422,12 @@ class ProxySQL_Admin { void p_update_metrics(); void stats___mysql_query_rules(); - void stats___mysql_query_digests(bool reset, bool copy=false); + int stats___save_mysql_query_digest_to_sqlite( + const bool reset, const bool copy, const SQLite3_result *resultset, + const umap_query_digest *digest_umap, const umap_query_digest_text *digest_text_umap + ); + int stats___mysql_query_digests(bool reset, bool copy=false); + int stats___mysql_query_digests_v2(bool reset, bool copy, bool use_resultset); //void stats___mysql_query_digests_reset(); void stats___mysql_commands_counters(); void stats___mysql_processlist(); diff --git a/include/query_processor.h b/include/query_processor.h index 15f4ceeca..5e17903c0 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -61,8 +61,12 @@ class QP_query_digest_stats { unsigned long long rows_sent; int hid; QP_query_digest_stats(char *u, char *s, uint64_t d, char *dt, int h, char *ca); - void add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs); + void add_time( + unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs, + unsigned long long cnt = 1 + ); ~QP_query_digest_stats(); + char *get_digest_text(const umap_query_digest_text *digest_text_umap); char **get_row(umap_query_digest_text *digest_text_umap, query_digest_stats_pointers_t *qdsp); }; @@ -319,6 +323,10 @@ class Query_Processor { SQLite3_result * get_stats_commands_counters(); SQLite3_result * get_query_digests(); SQLite3_result * get_query_digests_reset(); + std::pair get_query_digests_v2(const bool use_resultset = true); + std::pair get_query_digests_reset_v2( + const bool copy, const bool use_resultset = true + ); void get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt); unsigned long long purge_query_digests(bool async_purge, bool parallel, char **msg); unsigned long long purge_query_digests_async(char **msg); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 96839466a..3169d1eac 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -1,4 +1,5 @@ #include // std::cout +#include // std::stringstream #include #include // std::sort #include @@ -3253,10 +3254,10 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign if (stats_mysql_processlist) stats___mysql_processlist(); if (stats_mysql_query_digest_reset) { - stats___mysql_query_digests(true, stats_mysql_query_digest); + stats___mysql_query_digests_v2(true, stats_mysql_query_digest, false); } else { if (stats_mysql_query_digest) { - stats___mysql_query_digests(false); + stats___mysql_query_digests_v2(false, false, false); } } if (stats_mysql_errors) @@ -4036,6 +4037,50 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) { run_query=false; free(msg); break; + case 22: + // get all the entries from the digest map, but WRITING to DB + // it uses multiple threads + // It locks the maps while generating the resultset + r1 = SPA->stats___mysql_query_digests(false, true); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1); + run_query=false; + break; + case 23: + // get all the entries from the digest map, but WRITING to DB + // it uses multiple threads for creating the resultset + r1 = SPA->stats___mysql_query_digests_v2(false, false, true); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1); + run_query=false; + break; + case 24: + // get all the entries from the digest map, but WRITING to DB + // Do not create a resultset, uses the digest_umap + r1 = SPA->stats___mysql_query_digests_v2(false, false, false); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1); + run_query=false; + break; + case 25: + // get all the entries from the digest map AND RESET, but WRITING to DB + // it uses multiple threads + // It locks the maps while generating the resultset + r1 = SPA->stats___mysql_query_digests(true, true); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1); + run_query=false; + break; + case 26: + // get all the entries from the digest map AND RESET, but WRITING to DB + // it uses multiple threads for creating the resultset + r1 = SPA->stats___mysql_query_digests_v2(true, true, true); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1); + run_query=false; + break; + case 27: + // get all the entries from the digest map AND RESET, but WRITING to DB + // Do not create a resultset, uses the digest_umap + r1 = SPA->stats___mysql_query_digests_v2(true, true, false); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, r1); + run_query=false; + break; case 31: { if (test_arg1==0) { @@ -9454,15 +9499,120 @@ void ProxySQL_Admin::stats___proxysql_message_metrics(bool reset) { delete resultset; } -void ProxySQL_Admin::stats___mysql_query_digests(bool reset, bool copy) { - if (!GloQPro) return; +int ProxySQL_Admin::stats___save_mysql_query_digest_to_sqlite( + const bool reset, const bool copy, const SQLite3_result *resultset, const umap_query_digest *digest_umap, + const umap_query_digest_text *digest_text_umap +) { + statsdb->execute("BEGIN"); + int rc; + sqlite3_stmt *statement1=NULL; + sqlite3_stmt *statement32=NULL; + char *query1=NULL; + char *query32=NULL; + std::string query32s = ""; + statsdb->execute("DELETE FROM stats_mysql_query_digest_reset"); + statsdb->execute("DELETE FROM stats_mysql_query_digest"); + if (reset) { + query1=(char *)"INSERT INTO stats_mysql_query_digest_reset VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + query32s = "INSERT INTO stats_mysql_query_digest_reset VALUES " + generate_multi_rows_query(32,14); + query32 = (char *)query32s.c_str(); + } else { + query1=(char *)"INSERT INTO stats_mysql_query_digest VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + query32s = "INSERT INTO stats_mysql_query_digest VALUES " + generate_multi_rows_query(32,14); + query32 = (char *)query32s.c_str(); + } + + rc = statsdb->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, statsdb); + rc = statsdb->prepare_v2(query32, &statement32); + ASSERT_SQLITE_OK(rc, statsdb); + int row_idx=0; + int num_rows = resultset ? resultset->rows_count : digest_umap->size(); + int max_bulk_row_idx = num_rows/32; + max_bulk_row_idx=max_bulk_row_idx*32; + auto it = resultset ? (std::unordered_map::iterator)NULL : digest_umap->cbegin(); + int i = 0; + // If the function do not receives a resultset, it gets the values directly from the digest_umap + while (resultset ? i != resultset->rows_count : it != digest_umap->end()) { + QP_query_digest_stats *qds = (QP_query_digest_stats *)(resultset ? NULL : it->second); + SQLite3_row *row = resultset ? resultset->rows[i] : NULL; + string digest_hex_str; + if (!resultset) { + std::ostringstream digest_stream; + digest_stream << "0x" << std::hex << qds->digest; + digest_hex_str = digest_stream.str(); + } + int idx=row_idx%32; + if (row_idxfields[11]) : qds->hid); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+2, resultset ? row->fields[0] : qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+3, resultset ? row->fields[1] : qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+4, resultset ? row->fields[2] : qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+5, resultset ? row->fields[3] : digest_hex_str.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement32, (idx*14)+6, resultset ? row->fields[4] : qds->get_digest_text(digest_text_umap), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+7, resultset ? atoll(row->fields[5]) : qds->count_star); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+8, resultset ? atoll(row->fields[6]) : qds->first_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+9, resultset ? atoll(row->fields[7]) : qds->last_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+10, resultset ? atoll(row->fields[8]) : qds->sum_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+11, resultset ? atoll(row->fields[9]) : qds->min_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+12, resultset ? atoll(row->fields[10]) : qds->max_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+13, resultset ? atoll(row->fields[12]) : qds->rows_affected); ASSERT_SQLITE_OK(rc, statsdb); // rows affected + rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*14)+14, resultset ? atoll(row->fields[13]) : qds->rows_sent); ASSERT_SQLITE_OK(rc, statsdb); // rows sent + if (idx==31) { + SAFE_SQLITE3_STEP2(statement32); + rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, statsdb); + } + } else { // single row + rc=(*proxy_sqlite3_bind_int64)(statement1, 1, resultset ? atoll(row->fields[11]) : qds->hid); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 2, resultset ? row->fields[0] : qds->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 3, resultset ? row->fields[1] : qds->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 4, resultset ? row->fields[2] : qds->client_address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 5, resultset ? row->fields[3] : digest_hex_str.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_text)(statement1, 6, resultset ? row->fields[4] : qds->get_digest_text(digest_text_umap), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 7, resultset ? atoll(row->fields[5]) : qds->count_star); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 8, resultset ? atoll(row->fields[6]) : qds->first_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 9, resultset ? atoll(row->fields[7]) : qds->last_seen); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 10, resultset ? atoll(row->fields[8]) : qds->sum_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 11, resultset ? atoll(row->fields[9]) : qds->min_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 12, resultset ? atoll(row->fields[10]) : qds->max_time); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_bind_int64)(statement1, 13, resultset ? atoll(row->fields[12]) : qds->rows_affected); ASSERT_SQLITE_OK(rc, statsdb); // rows affected + rc=(*proxy_sqlite3_bind_int64)(statement1, 14, resultset ? atoll(row->fields[13]) : qds->rows_sent); ASSERT_SQLITE_OK(rc, statsdb); // rows sent + SAFE_SQLITE3_STEP2(statement1); + rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, statsdb); + rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, statsdb); + } +#ifdef DEBUG + if (resultset) + assert(row_idx == i); +#endif + row_idx++; + if (resultset) + i++; + else + it++; + } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement32); + if (reset) { + if (copy) { + statsdb->execute("INSERT INTO stats_mysql_query_digest SELECT * FROM stats_mysql_query_digest_reset"); + } + } + statsdb->execute("COMMIT"); + + return row_idx; +} + +int ProxySQL_Admin::stats___mysql_query_digests(bool reset, bool copy) { + if (!GloQPro) return 0; SQLite3_result * resultset=NULL; if (reset==true) { resultset=GloQPro->get_query_digests_reset(); } else { resultset=GloQPro->get_query_digests(); } - if (resultset==NULL) return; + if (resultset==NULL) return 0; statsdb->execute("BEGIN"); int rc; sqlite3_stmt *statement1=NULL; @@ -9557,6 +9707,26 @@ void ProxySQL_Admin::stats___mysql_query_digests(bool reset, bool copy) { } statsdb->execute("COMMIT"); delete resultset; + + return row_idx; +} + +int ProxySQL_Admin::stats___mysql_query_digests_v2(bool reset, bool copy, bool use_resultset) { + if (!GloQPro) return 0; + std::pair res; + if (reset == true) { + res = GloQPro->get_query_digests_reset_v2(copy, use_resultset); + } else { + res = GloQPro->get_query_digests_v2(use_resultset); + } + + if (res.first == NULL) + return res.second; + + int num_rows = GloAdmin->stats___save_mysql_query_digest_to_sqlite(reset, copy, res.first, NULL, NULL); + delete res.first; + + return num_rows; } void ProxySQL_Admin::stats___mysql_client_host_cache(bool reset) { diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index debba9d22..22cc0367d 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -31,6 +31,7 @@ #include #include extern MySQL_Threads_Handler *GloMTH; +extern ProxySQL_Admin *GloAdmin; static int int_cmp(const void *a, const void *b) { const unsigned long long *ia = (const unsigned long long *)a; @@ -186,8 +187,11 @@ QP_query_digest_stats::QP_query_digest_stats(char *u, char *s, uint64_t d, char rows_sent=0; hid=h; } -void QP_query_digest_stats::add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs) { - count_star++; +void QP_query_digest_stats::add_time( + unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs, + unsigned long long cnt +) { + count_star += cnt; sum_time+=t; rows_affected+=ra; rows_sent+=rs; @@ -229,6 +233,30 @@ QP_query_digest_stats::~QP_query_digest_stats() { client_address=NULL; } } + +// Funtion to get the digest text associated to a QP_query_digest_stats. +// QP_query_digest_stats member type "char *digest_text" may by NULL, so we +// have to get the digest text from "digest_text_umap". +char *QP_query_digest_stats::get_digest_text(const umap_query_digest_text *digest_text_umap) { + char *digest_text_str = NULL; + + if (digest_text) { + digest_text_str = digest_text; + } else { + std::unordered_map::const_iterator it; + it = digest_text_umap->find(digest); + if (it != digest_text_umap->end()) { + digest_text_str = it->second; + } else { + // LCOV_EXCL_START + assert(0); + // LCOV_EXCL_STOP + } + } + + return digest_text_str; +} + char **QP_query_digest_stats::get_row(umap_query_digest_text *digest_text_umap, query_digest_stats_pointers_t *qdsp) { char **pta=qdsp->pta; @@ -244,19 +272,7 @@ char **QP_query_digest_stats::get_row(umap_query_digest_text *digest_text_umap, sprintf(qdsp->digest,"0x%016llX", (long long unsigned int)digest); pta[3]=qdsp->digest; - if (digest_text) { - pta[4]=digest_text; - } else { - std::unordered_map::iterator it; - it=digest_text_umap->find(digest); - if (it != digest_text_umap->end()) { - pta[4] = it->second; - } else { - // LCOV_EXCL_START - assert(0); - // LCOV_EXCL_STOP - } - } + pta[4] = get_digest_text(digest_text_umap); //sprintf(qdsp->count_star,"%u",count_star); my_itoa(qdsp->count_star, count_star); @@ -1033,51 +1049,46 @@ unsigned long long Query_Processor::purge_query_digests(bool async_purge, bool p unsigned long long Query_Processor::purge_query_digests_async(char **msg) { unsigned long long ret = 0; pthread_rwlock_wrlock(&digest_rwlock); + + + umap_query_digest digest_umap_aux; + umap_query_digest_text digest_text_umap_aux; + pthread_rwlock_wrlock(&digest_rwlock); + digest_umap.swap(digest_umap_aux); + digest_text_umap.swap(digest_text_umap_aux); + pthread_rwlock_unlock(&digest_rwlock); + int num_rows = 0; unsigned long long curtime1=monotonic_time(); - size_t map1_size = digest_umap.size(); - size_t map2_size = digest_text_umap.size(); + size_t map1_size = digest_umap_aux.size(); + size_t map2_size = digest_text_umap_aux.size(); ret = map1_size + map2_size; - unsigned long long i = 0; - QP_query_digest_stats **array1 = (QP_query_digest_stats **)malloc(sizeof(QP_query_digest_stats *)*map1_size); - char **array2 = (char **)malloc(sizeof(char *)*map2_size); - i=0; - for (std::unordered_map::iterator it=digest_umap.begin(); it!=digest_umap.end(); ++it) { - array1[i]=(QP_query_digest_stats *)it->second; - i++; - //delete qds; + + for ( + std::unordered_map::iterator it = digest_umap_aux.begin(); + it != digest_umap_aux.end(); + ++it + ) { + QP_query_digest_stats *qds = (QP_query_digest_stats *)it->second; + delete qds; } - i=0; - for (std::unordered_map::iterator it=digest_text_umap.begin(); it!=digest_text_umap.end(); ++it) { - array2[i] = it->second; - //free(it->second); - i++; + digest_umap_aux.clear(); + for (std::unordered_map::iterator it=digest_text_umap_aux.begin(); it!=digest_text_umap_aux.end(); ++it) { + free(it->second); } - digest_umap.erase(digest_umap.begin(),digest_umap.end()); - digest_text_umap.erase(digest_text_umap.begin(),digest_text_umap.end()); - pthread_rwlock_unlock(&digest_rwlock); - unsigned long long curtime2=monotonic_time(); - curtime1 = curtime1/1000; - curtime2 = curtime2/1000; + digest_text_umap_aux.clear(); + + if (map1_size >= DIGEST_STATS_FAST_MINSIZE) { - proxy_info("Purging stats_mysql_query_digest: locked for %llums to remove %lu entries\n", curtime2-curtime1, map1_size); - } - char buf[128]; - sprintf(buf, "Query digest map locked for %llums", curtime2-curtime1); - *msg = strdup(buf); - for (i=0; i Query_Processor::get_query_digests_v2(const bool use_resultset) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n"); + SQLite3_result *result = NULL; + // Create two auxiliary maps and swap its content with the main maps. This + // way, this function can read query digests stored until now while other + // threads write in the other map. We need to lock while swapping. + umap_query_digest digest_umap_aux, digest_umap_aux_2; + umap_query_digest_text digest_text_umap_aux, digest_text_umap_aux_2; + pthread_rwlock_wrlock(&digest_rwlock); + digest_umap.swap(digest_umap_aux); + digest_text_umap.swap(digest_text_umap_aux); + pthread_rwlock_unlock(&digest_rwlock); + int num_rows = 0; + unsigned long long curtime1; + unsigned long long curtime2; + size_t map_size = digest_umap_aux.size(); + curtime1 = monotonic_time(); // curtime1 must always be initialized + if (use_resultset) { + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + result = new SQLite3_result(14, true); + } else { + result = new SQLite3_result(14); + } + result->add_column_definition(SQLITE_TEXT,"hid"); + result->add_column_definition(SQLITE_TEXT,"schemaname"); + result->add_column_definition(SQLITE_TEXT,"username"); + result->add_column_definition(SQLITE_TEXT,"client_address"); + result->add_column_definition(SQLITE_TEXT,"digest"); + result->add_column_definition(SQLITE_TEXT,"digest_text"); + result->add_column_definition(SQLITE_TEXT,"count_star"); + result->add_column_definition(SQLITE_TEXT,"first_seen"); + result->add_column_definition(SQLITE_TEXT,"last_seen"); + result->add_column_definition(SQLITE_TEXT,"sum_time"); + result->add_column_definition(SQLITE_TEXT,"min_time"); + result->add_column_definition(SQLITE_TEXT,"max_time"); + result->add_column_definition(SQLITE_TEXT,"rows_affected"); + result->add_column_definition(SQLITE_TEXT,"rows_sent"); + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + int n=DIGEST_STATS_FAST_THREADS; + get_query_digests_parallel_args args[n]; + for (int i=0; i::iterator it = digest_umap_aux.begin(); + it != digest_umap_aux.end(); + ++it + ) { + QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; + query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t)); + char **pta=qds->get_row(&digest_text_umap_aux, a); + result->add_row(pta); + free(a); + } + } + } else { + num_rows = GloAdmin->stats___save_mysql_query_digest_to_sqlite( + false, false, NULL, &digest_umap_aux, &digest_text_umap_aux + ); + } + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + curtime2=monotonic_time(); + curtime1 = curtime1/1000; + curtime2 = curtime2/1000; + proxy_info("Running query on stats_mysql_query_digest: (not locked) %llums to retrieve %lu entries\n", curtime2-curtime1, map_size); + } + + // Once we finish creating the resultset or writing to SQLite, we use a + // second group of auxiliary maps to swap it with the first group of + // auxiliary maps. This way, we can merge the main maps and the first + // auxiliary maps without locking the mutex during the process. This is + // useful because writing to SQLite can take a lot of time, so the first + // group of auxiliary maps could grow large. + pthread_rwlock_wrlock(&digest_rwlock); + digest_umap.swap(digest_umap_aux_2); + digest_text_umap.swap(digest_text_umap_aux_2); + pthread_rwlock_unlock(&digest_rwlock); + + // Once we do the swap, we merge the content of the first auxiliary maps + // in the main maps and clear the content of the auxiliary maps. + for (const auto& element : digest_umap_aux_2) { + uint64_t digest = element.first; + QP_query_digest_stats *qds = (QP_query_digest_stats *)element.second; + std::unordered_map::iterator it = digest_umap_aux.find(digest); + if (it != digest_umap_aux.end()) { + // found + QP_query_digest_stats *qds_equal = (QP_query_digest_stats *)it->second; + qds_equal->add_time( + qds->min_time, qds->last_seen, qds->rows_affected, qds->rows_sent, qds->count_star + ); + delete qds; + } else { + digest_umap_aux.insert(element); + } + } + digest_text_umap.insert(digest_text_umap_aux.begin(), digest_text_umap_aux.end()); + digest_umap_aux_2.clear(); + digest_text_umap_aux_2.clear(); + + // Once we finish merging the main maps and the first auxiliary maps, we + // lock and swap the main maps with the second auxiliary maps. Then, we + // merge the content of the auxiliary maps in the main maps and clear the + // content of the auxiliary maps. + pthread_rwlock_wrlock(&digest_rwlock); + digest_umap_aux.swap(digest_umap); + digest_text_umap_aux.swap(digest_text_umap); + for (const auto& element : digest_umap_aux) { + uint64_t digest = element.first; + QP_query_digest_stats *qds = (QP_query_digest_stats *)element.second; + std::unordered_map::iterator it = digest_umap.find(digest); + if (it != digest_umap.end()) { + // found + QP_query_digest_stats *qds_equal = (QP_query_digest_stats *)it->second; + qds_equal->add_time( + qds->min_time, qds->last_seen, qds->rows_affected, qds->rows_sent, qds->count_star + ); + delete qds; + } else { + digest_umap.insert(element); + } + } + digest_text_umap.insert(digest_text_umap_aux.begin(), digest_text_umap_aux.end()); + pthread_rwlock_unlock(&digest_rwlock); + digest_umap_aux.clear(); + digest_text_umap_aux.clear(); + + std::pair res{result, num_rows}; + return res; +} + SQLite3_result * Query_Processor::get_query_digests() { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n"); SQLite3_result *result = NULL; @@ -1180,9 +1336,9 @@ SQLite3_result * Query_Processor::get_query_digests() { unsigned long long curtime1; unsigned long long curtime2; size_t map_size = digest_umap.size(); + curtime1 = monotonic_time(); // curtime1 must always be initialized if (map_size >= DIGEST_STATS_FAST_MINSIZE) { result = new SQLite3_result(14, true); - curtime1 = monotonic_time(); } else { result = new SQLite3_result(14); } @@ -1240,6 +1396,122 @@ SQLite3_result * Query_Processor::get_query_digests() { return result; } +std::pair Query_Processor::get_query_digests_reset_v2( + const bool copy, const bool use_resultset +) { + SQLite3_result *result = NULL; + umap_query_digest digest_umap_aux; + umap_query_digest_text digest_text_umap_aux; + pthread_rwlock_wrlock(&digest_rwlock); + digest_umap.swap(digest_umap_aux); + digest_text_umap.swap(digest_text_umap_aux); + pthread_rwlock_unlock(&digest_rwlock); + int num_rows = 0; + unsigned long long curtime1; + unsigned long long curtime2; + size_t map_size = digest_umap_aux.size(); // we need to use the new map + bool free_me = false; + bool defer_free = false; + int n=DIGEST_STATS_FAST_THREADS; + get_query_digests_parallel_args args[n]; + curtime1 = monotonic_time(); // curtime1 must always be initialized + if (use_resultset) { + free_me = true; + defer_free = true; + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + result = new SQLite3_result(14, true); + } else { + result = new SQLite3_result(14); + } + result->add_column_definition(SQLITE_TEXT,"hid"); + result->add_column_definition(SQLITE_TEXT,"schemaname"); + result->add_column_definition(SQLITE_TEXT,"username"); + result->add_column_definition(SQLITE_TEXT,"client_address"); + result->add_column_definition(SQLITE_TEXT,"digest"); + result->add_column_definition(SQLITE_TEXT,"digest_text"); + result->add_column_definition(SQLITE_TEXT,"count_star"); + result->add_column_definition(SQLITE_TEXT,"first_seen"); + result->add_column_definition(SQLITE_TEXT,"last_seen"); + result->add_column_definition(SQLITE_TEXT,"sum_time"); + result->add_column_definition(SQLITE_TEXT,"min_time"); + result->add_column_definition(SQLITE_TEXT,"max_time"); + result->add_column_definition(SQLITE_TEXT,"rows_affected"); + result->add_column_definition(SQLITE_TEXT,"rows_sent"); + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + for (int i=0; i::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) { + QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; + delete qds; + } + } + } else { + for (std::unordered_map::iterator it=digest_umap_aux.begin(); it!=digest_umap_aux.end(); ++it) { + QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second; + query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t)); + char **pta=qds->get_row(&digest_text_umap_aux, a); + result->add_row(pta); + free(a); + delete qds; + } + } + } else { + num_rows = GloAdmin->stats___save_mysql_query_digest_to_sqlite( + true, copy, result, &digest_umap_aux, &digest_text_umap_aux + ); + for ( + std::unordered_map::iterator it = digest_umap_aux.begin(); + it != digest_umap_aux.end(); + ++it + ) { + QP_query_digest_stats *qds = (QP_query_digest_stats *)it->second; + delete qds; + } + } + digest_umap_aux.clear(); + // this part is always single-threaded + for (std::unordered_map::iterator it=digest_text_umap_aux.begin(); it!=digest_text_umap_aux.end(); ++it) { + free(it->second); + } + digest_text_umap_aux.clear(); + if (map_size >= DIGEST_STATS_FAST_MINSIZE) { + curtime2=monotonic_time(); + curtime1 = curtime1/1000; + curtime2 = curtime2/1000; + proxy_info("Running query on stats_mysql_query_digest: (not locked) %llums to retrieve %lu entries\n", curtime2-curtime1, map_size); + if (free_me) { + if (defer_free) { + for (int i=0; i res{result, num_rows}; + return res; +} void Query_Processor::get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt) { pthread_rwlock_wrlock(&digest_rwlock); @@ -1258,8 +1530,8 @@ SQLite3_result * Query_Processor::get_query_digests_reset() { int n=DIGEST_STATS_FAST_THREADS; get_query_digests_parallel_args args[n]; size_t map_size = digest_umap.size(); + curtime1 = monotonic_time(); // curtime1 must always be initialized if (map_size >= DIGEST_STATS_FAST_MINSIZE) { - curtime1=monotonic_time(); result = new SQLite3_result(14, true); } else { result = new SQLite3_result(14); diff --git a/test/tap/tests/admin_various_commands3-t.cpp b/test/tap/tests/admin_various_commands3-t.cpp new file mode 100644 index 000000000..4a80f16b0 --- /dev/null +++ b/test/tap/tests/admin_various_commands3-t.cpp @@ -0,0 +1,132 @@ +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +using std::string; + +/* this test: + * enables mysql-have_ssl + * execute various command +*/ + +std::vector queries_t = { + "PROXYSQLTEST 22", + "PROXYSQLTEST 23", + "PROXYSQLTEST 24", + "PROXYSQLTEST 25", + "PROXYSQLTEST 26", + "PROXYSQLTEST 27", + "SELECT COUNT(*) FROM stats_mysql_query_digest" + }; + + +//std::vector vals = { 100, 345, 800, 999, 2037, 12345 }; +//std::vector vals = { 100, 345, 800, 999, 2037 }; +std::vector vals = { 100, 345, 800 }; + +std::vector queries = {}; + +int run_q(MYSQL *mysql, const char *q) { + MYSQL_QUERY(mysql,q); + return 0; +} +int main() { + CommandLine cl; + + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + srandom(123); + + + for (auto it = vals.begin() ; it != vals.end() ; it++) { + std::string q = "PROXYSQLTEST 1 " + std::to_string(*it); + queries.push_back(q); + for (int i=0; i<5; i++) { + queries.push_back(queries_t[rand()%queries_t.size()]); + } + queries.push_back("SELECT COUNT(*) FROM stats_mysql_query_digest"); + for (int i=0; i<5; i++) { + queries.push_back(queries_t[rand()%queries_t.size()]); + } + if (rand()%2 == 0) { + queries.push_back("SELECT COUNT(*) FROM stats_mysql_query_digest_reset"); + } else { + queries.push_back("TRUNCATE TABLE stats.stats_mysql_query_digest"); + } + } + queries.push_back("TRUNCATE TABLE stats.stats_mysql_query_digest"); + + + + MYSQL* proxysql_admin = mysql_init(NULL); + // Initialize connections + if (!proxysql_admin) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + + if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + + MYSQL_QUERY(proxysql_admin, "SET mysql-have_ssl='true'"); + MYSQL_QUERY(proxysql_admin, "SET mysql-have_compress='true'"); + MYSQL_QUERY(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + + + + unsigned int p = queries.size(); + for (std::vector::iterator it2 = queries.begin(); it2 != queries.end(); it2++) { + if ( + (strncasecmp(it2->c_str(), "SELECT ", 7)==0) + ) { + // extra test for each queries returning a resultset + p++; + } + } + plan(p); + diag("Running test with %lu queries", queries.size()); + + + for (std::vector::iterator it2 = queries.begin(); it2 != queries.end(); it2++) { + MYSQL* proxysql_admin = mysql_init(NULL); // local scope + if (!proxysql_admin) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + mysql_ssl_set(proxysql_admin, NULL, NULL, NULL, NULL, NULL); + if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, CLIENT_SSL|CLIENT_COMPRESS)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + int rc = run_q(proxysql_admin, it2->c_str()); + ok(rc==0, "Query: %s" , it2->c_str()); + if ( + (strncasecmp(it2->c_str(), "SELECT ", 7)==0) + ) { + MYSQL_RES* proxy_res = mysql_store_result(proxysql_admin); + unsigned long long num_rows = mysql_num_rows(proxy_res); + ok(num_rows != 0 , "Returned rows: %llu" , num_rows); + mysql_free_result(proxy_res); + } + mysql_close(proxysql_admin); + } + mysql_close(proxysql_admin); + + return exit_status(); +} diff --git a/test/tap/tests/test_digest_umap_aux-t.cpp b/test/tap/tests/test_digest_umap_aux-t.cpp new file mode 100644 index 000000000..4f9168b87 --- /dev/null +++ b/test/tap/tests/test_digest_umap_aux-t.cpp @@ -0,0 +1,291 @@ +/** + * @file test_digest_umap_aux-t.cpp + * @brief This tests that the auxiliary digest map is working correctly. + * @details This test sends dummy queries to ProxySQL while also sending + * queries to read table stats_mysql_query_digest. Then, it checks that the + * execution time of the dummy queries has no been afected by the execution + * time of the queries that read from table stats_mysql_query_digest. Finally, + * check that the data stored in stats_mysql_query_digest is correct. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "proxysql_utils.h" +#include "command_line.h" +#include "utils.h" +#include "tap.h" + +using std::vector; +using std::string; + +CommandLine cl; +double slowest_query = 0.0; +double fastest_query = 0.0; +std::atomic_bool stop(false); + +vector DUMMY_QUERIES = { + "SELECT 1", + "SELECT 1 UNION SELECT 2 UNION SELECT 3", + "SELECT 1 UNION SELECT 2", +}; +int num_dummy_queries_executed = 0; + +struct digest_stats { + int hostgroup; + string schemaname; + string username; + string client_address; + string digest; + string digest_text; + int count_star; + int first_seen; + int last_seen; + int sum_time; + int min_time; + int max_time; + int sum_rows_affected; + int sum_rows_sent; +}; + +class timer { +public: + std::chrono::time_point lastTime; + timer() : lastTime(std::chrono::high_resolution_clock::now()) {} + inline double elapsed() { + std::chrono::time_point thisTime = std::chrono::high_resolution_clock::now(); + double deltaTime = std::chrono::duration(thisTime-lastTime).count(); + lastTime = thisTime; + return deltaTime; + } +}; + +vector get_digest_stats(MYSQL* proxy_admin) { + const char* get_digest_stats_query = + "SELECT * FROM stats_mysql_query_digest WHERE username='root' AND " + "digest_text IN ('SELECT ?', 'SELECT ? UNION SELECT ?', 'SELECT ? UNION SELECT ? UNION SELECT ?') " + "ORDER BY hostgroup, schemaname, username, client_address, digest"; + diag("Running: %s", get_digest_stats_query); + vector ds_vector; + + int err = mysql_query(proxy_admin, get_digest_stats_query); + if (err) { + diag("Failed to executed query `%s`. Error: `%s`", get_digest_stats_query, mysql_error(proxy_admin)); + return ds_vector; + } + + MYSQL_RES *res = NULL; + res = mysql_store_result(proxy_admin); + MYSQL_ROW row; + while (row = mysql_fetch_row(res)) { + digest_stats ds = {}; + ds.hostgroup = atoi(row[0]); + ds.schemaname = row[1]; + ds.username = row[2]; + ds.client_address = row[3]; + ds.digest = row[4]; + ds.digest_text = row[5]; + ds.count_star = atoi(row[6]); + ds.first_seen = atoi(row[7]); + ds.last_seen = atoi(row[8]); + ds.sum_time = atoi(row[9]); + ds.min_time = atoi(row[10]); + ds.max_time = atoi(row[11]); + ds.sum_rows_affected = atoi(row[12]); + ds.sum_rows_sent = atoi(row[13]); + ds_vector.push_back(ds); + } + mysql_free_result(res); + + return ds_vector; +} + +void run_dummy_queries() { + MYSQL* proxy_mysql = mysql_init(NULL); + + if (!mysql_real_connect(proxy_mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy_mysql)); + slowest_query = -1.0; + return; + } + + vector execution_times = {}; + MYSQL_RES *res = NULL; + while (!stop) { + for (int i = 0; i < DUMMY_QUERIES.size(); i++) { + timer stopwatch; + int err = mysql_query(proxy_mysql, DUMMY_QUERIES[i]); + execution_times.push_back(stopwatch.elapsed()); + if (err) { + diag( + "Failed to executed query `%s`. Error: `%s`", + DUMMY_QUERIES[i], mysql_error(proxy_mysql) + ); + slowest_query = -1.0; + mysql_close(proxy_mysql); + return; + } + res = mysql_store_result(proxy_mysql); + mysql_free_result(res); + } + num_dummy_queries_executed++; + } + mysql_close(proxy_mysql); + + slowest_query = *std::max_element(execution_times.begin(), execution_times.end()); +} + +void run_stats_digest_query(MYSQL* proxy_admin) { + const char *count_digest_stats_query = "SELECT COUNT(*) FROM stats_mysql_query_digest"; + vector execution_times = {}; + const int num_queries = 3; + MYSQL_RES *res; + + for (int i; i < num_queries; i++) { + diag("Running: %s", count_digest_stats_query); + timer stopwatch; + int err = mysql_query(proxy_admin, count_digest_stats_query); + execution_times.push_back(stopwatch.elapsed()); + if (err) { + diag( + "Failed to executed query `%s`. Error: `%s`", + count_digest_stats_query, mysql_error(proxy_admin) + ); + fastest_query = -1.0; + return; + } + res = mysql_store_result(proxy_admin); + mysql_free_result(res); + } + + fastest_query = *std::min_element(execution_times.begin(), execution_times.end()); +} + +int main(int argc, char** argv) { + + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return EXIT_FAILURE; + } + + plan(1 + DUMMY_QUERIES.size() * 3); // always specify the number of tests that are going to be performed + + MYSQL *proxy_admin = mysql_init(NULL); + if (!mysql_real_connect(proxy_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy_admin)); + return EXIT_FAILURE; + } + + vector admin_queries = { + "DELETE FROM mysql_query_rules", + "LOAD MYSQL QUERY RULES TO RUNTIME", + "PROXYSQLTEST 1 1000", + }; + for (const auto &query : admin_queries) { + diag("Running: %s", query); + MYSQL_QUERY(proxy_admin, query); + } + + MYSQL *proxy_mysql = mysql_init(NULL); + if (!mysql_real_connect(proxy_mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy_mysql)); + mysql_close(proxy_admin); + return EXIT_FAILURE; + } + + MYSQL_RES *res = NULL; + for (const auto &query : DUMMY_QUERIES) { + diag("Running: %s", query); + MYSQL_QUERY(proxy_mysql, query); + res = mysql_store_result(proxy_mysql); + mysql_free_result(res); + } + mysql_close(proxy_mysql); + + vector ds_vector_before = get_digest_stats(proxy_admin); + + std::thread run_dummy_queries_thread(run_dummy_queries); + std::thread run_stats_digest_query_thread(run_stats_digest_query, proxy_admin); + + run_stats_digest_query_thread.join(); + if (fastest_query == -1.0) { + fprintf( + stderr, "File %s, line %d, Error: " + "thread run_stats_digest_query_thread finished with errors", __FILE__, __LINE__ + ); + mysql_close(proxy_admin); + return EXIT_FAILURE; + } + + stop = true; + run_dummy_queries_thread.join(); + if (slowest_query == -1.0) { + fprintf( + stderr, "File %s, line %d, Error: " + "thread run_dummy_queries_thread finished with errors", __FILE__, __LINE__ + ); + mysql_close(proxy_admin); + return EXIT_FAILURE; + } + + ok( + slowest_query < fastest_query, + "The slowest dummy query must be faster than the fastest digests stats query.\n" + " Slowest dummy query time: %f.\n" + " Fastest count digest stats query time: %f.", + slowest_query, fastest_query + ); + + vector ds_vector_after = get_digest_stats(proxy_admin); + for (int i = 0; i < DUMMY_QUERIES.size(); i++) { + ok( + ds_vector_before[i].hostgroup == ds_vector_after[i].hostgroup && + ds_vector_before[i].schemaname == ds_vector_after[i].schemaname && + ds_vector_before[i].username == ds_vector_after[i].username && + ds_vector_before[i].client_address == ds_vector_after[i].client_address && + ds_vector_before[i].digest == ds_vector_after[i].digest && + ds_vector_before[i].digest_text == ds_vector_after[i].digest_text && + ds_vector_before[i].first_seen - 1 <= ds_vector_after[i].first_seen && + ds_vector_after[i].first_seen <= ds_vector_before[i].first_seen + 1, + "Hostgroup, schemaname, username, client_address, digest, digest_test and first_seen " + "should be equal in both digest stats.\n" + " Hostgroup -> before:`%d` - after:`%d`.\n" + " Schemaname -> before:`%s` - after:`%s`.\n" + " Username -> before:`%s` - after:`%s`.\n" + " Client_address -> before:`%s` - after:`%s`.\n" + " Digests -> before:`%s` - after:`%s`.\n" + " Digests_text -> before:`%s` - after:`%s`.\n" + " First_seen -> before:`%d` - after:`%d`.", + ds_vector_before[i].hostgroup, ds_vector_after[i].hostgroup, + ds_vector_before[i].schemaname.c_str(), ds_vector_after[i].schemaname.c_str(), + ds_vector_before[i].username.c_str(), ds_vector_after[i].username.c_str(), + ds_vector_before[i].client_address.c_str(), ds_vector_after[i].client_address.c_str(), + ds_vector_before[i].digest.c_str(), ds_vector_after[i].digest.c_str(), + ds_vector_before[i].digest_text.c_str(), ds_vector_after[i].digest_text.c_str(), + ds_vector_before[i].first_seen, ds_vector_after[i].first_seen + ); + ok( + ds_vector_after[i].count_star - ds_vector_before[i].count_star == num_dummy_queries_executed, + "Query `%s` should be executed %d times. Act:'%d'", + ds_vector_after[i].digest_text.c_str(), num_dummy_queries_executed, + ds_vector_after[i].count_star - ds_vector_before[i].count_star + ); + ok( + ds_vector_before[i].last_seen < ds_vector_after[i].last_seen && + ds_vector_before[i].sum_time < ds_vector_after[i].sum_time, + "Last_seen and sum_time must have increased.\n" + " Last_seen -> before:`%d` - after:`%d`.\n" + " Sum_time -> before:`%d` - after:`%d`.", + ds_vector_before[i].last_seen, ds_vector_after[i].last_seen, + ds_vector_before[i].sum_time, ds_vector_after[i].sum_time + ); + } + + return exit_status(); +}