Add auxiliary maps in get_query_digests() to improve performance

Otherwise, all queries processed by ProxySQL have to wait for
get_query_digest() to finish reading the maps.

With this patch, ProxySQL stores query digests in the auxiliary maps
while the main maps are being used by get_query_digests(). Once
get_query_digests() finish reading the main maps, it merge the auxiliary
maps in to the main maps and clear the content of the auxiliary maps.
pull/4096/head
Javier Sánchez Parra 3 years ago
parent 2790ed9fab
commit 263d645bad

@ -61,7 +61,10 @@ class QP_query_digest_stats {
unsigned long long rows_sent;
int hid;
QP_query_digest_stats(char *u, char *s, uint64_t d, char *dt, int h, char *ca);
void add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs);
void add_time(
unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs,
unsigned long long cnt = 1
);
~QP_query_digest_stats();
char **get_row(umap_query_digest_text *digest_text_umap, query_digest_stats_pointers_t *qdsp);
};
@ -319,6 +322,7 @@ class Query_Processor {
SQLite3_result * get_stats_commands_counters();
SQLite3_result * get_query_digests();
SQLite3_result * get_query_digests_reset();
SQLite3_result * get_query_digests_v2();
void get_query_digests_reset(umap_query_digest *uqd, umap_query_digest_text *uqdt);
unsigned long long purge_query_digests(bool async_purge, bool parallel, char **msg);
unsigned long long purge_query_digests_async(char **msg);

@ -186,8 +186,11 @@ QP_query_digest_stats::QP_query_digest_stats(char *u, char *s, uint64_t d, char
rows_sent=0;
hid=h;
}
void QP_query_digest_stats::add_time(unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs) {
count_star++;
void QP_query_digest_stats::add_time(
unsigned long long t, unsigned long long n, unsigned long long ra, unsigned long long rs,
unsigned long long cnt
) {
count_star += cnt;
sum_time+=t;
rows_affected+=ra;
rows_sent+=rs;
@ -1173,6 +1176,110 @@ unsigned long long Query_Processor::get_query_digests_total_size() {
return ret;
}
SQLite3_result * Query_Processor::get_query_digests_v2() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n");
SQLite3_result *result = NULL;
// Create two auxiliary maps and swap its content with the main maps. This
// way, this function can read query digests stored until now while other
// threads write in the other map. We need to lock while swapping.
umap_query_digest digest_umap_aux;
umap_query_digest_text digest_text_umap_aux;
pthread_rwlock_wrlock(&digest_rwlock);
digest_umap.swap(digest_umap_aux);
digest_text_umap.swap(digest_text_umap_aux);
pthread_rwlock_unlock(&digest_rwlock);
unsigned long long curtime1;
unsigned long long curtime2;
size_t map_size = digest_umap_aux.size();
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
result = new SQLite3_result(14, true);
curtime1 = monotonic_time();
} else {
result = new SQLite3_result(14);
}
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"client_address");
result->add_column_definition(SQLITE_TEXT,"digest");
result->add_column_definition(SQLITE_TEXT,"digest_text");
result->add_column_definition(SQLITE_TEXT,"count_star");
result->add_column_definition(SQLITE_TEXT,"first_seen");
result->add_column_definition(SQLITE_TEXT,"last_seen");
result->add_column_definition(SQLITE_TEXT,"sum_time");
result->add_column_definition(SQLITE_TEXT,"min_time");
result->add_column_definition(SQLITE_TEXT,"max_time");
result->add_column_definition(SQLITE_TEXT,"rows_affected");
result->add_column_definition(SQLITE_TEXT,"rows_sent");
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
int n=DIGEST_STATS_FAST_THREADS;
get_query_digests_parallel_args args[n];
for (int i=0; i<n; i++) {
args[i].m=i;
//args[i].ret=0;
args[i].gu = &digest_umap_aux;
args[i].gtu = &digest_text_umap_aux;
args[i].result = result;
args[i].free_me = false;
}
for (int i=0; i<n; i++) {
if ( pthread_create(&args[i].thr, NULL, &get_query_digests_parallel, &args[i]) != 0 ) {
// LCOV_EXCL_START
assert(0);
// LCOV_EXCL_STOP
}
}
for (int i=0; i<n; i++) {
pthread_join(args[i].thr, NULL);
}
} else {
for (
std::unordered_map<uint64_t, void *>::iterator it = digest_umap_aux.begin();
it != digest_umap_aux.end();
++it
) {
QP_query_digest_stats *qds=(QP_query_digest_stats *)it->second;
query_digest_stats_pointers_t *a = (query_digest_stats_pointers_t *)malloc(sizeof(query_digest_stats_pointers_t));
char **pta=qds->get_row(&digest_text_umap_aux, a);
result->add_row(pta);
free(a);
}
}
if (map_size >= DIGEST_STATS_FAST_MINSIZE) {
curtime2=monotonic_time();
curtime1 = curtime1/1000;
curtime2 = curtime2/1000;
proxy_info("Running query on stats_mysql_query_digest: %llums to retrieve %lu entries\n", curtime2-curtime1, map_size);
}
// Once the reading finishes, we lock and swap again both maps. Then, we
// merge the content of the auxiliary maps in the main maps and clear the
// content of the auxiliary maps.
pthread_rwlock_wrlock(&digest_rwlock);
digest_umap_aux.swap(digest_umap);
digest_text_umap_aux.swap(digest_text_umap);
for (const auto& element : digest_umap_aux) {
uint64_t digest = element.first;
QP_query_digest_stats *qds = (QP_query_digest_stats *)element.second;
std::unordered_map<uint64_t, void *>::iterator it = digest_umap.find(digest);
if (it != digest_umap.end()) {
// found
QP_query_digest_stats *qds_equal = (QP_query_digest_stats *)it->second;
qds_equal->add_time(
qds->min_time, qds->last_seen, qds->rows_affected, qds->rows_sent, qds->count_star
);
} else {
digest_umap.insert(element);
}
}
digest_text_umap.insert(digest_text_umap_aux.begin(), digest_text_umap_aux.end());
digest_umap_aux.clear();
digest_text_umap_aux.clear();
pthread_rwlock_unlock(&digest_rwlock);
return result;
}
SQLite3_result * Query_Processor::get_query_digests() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n");
SQLite3_result *result = NULL;

Loading…
Cancel
Save