From 281e9cd619e1f22fa1aef2c23861418fad67e31c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 8 Jun 2017 23:44:37 +0200 Subject: [PATCH] Remove spinlock from Query Processor #977 --- include/query_processor.h | 9 +++ lib/Query_Processor.cpp | 124 +++++++++++++++++++++++++++++++++++--- 2 files changed, 124 insertions(+), 9 deletions(-) diff --git a/include/query_processor.h b/include/query_processor.h index 993e93587..a78b995b4 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -3,6 +3,7 @@ #include "proxysql.h" #include "cpp.h" +#define PROXYSQL_QPRO_PTHREAD_MUTEX typedef std::unordered_map umap_query_digest; @@ -162,11 +163,19 @@ class Command_Counter { class Query_Processor { private: umap_query_digest digest_umap; +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_t digest_rwlock; +#else rwlock_t digest_rwlock; int64_t padding; // to get rwlock cache aligned +#endif enum MYSQL_COM_QUERY_command __query_parser_command_type(SQP_par_t *qp); protected: +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_t rwlock; +#else rwlock_t rwlock; +#endif std::vector rules; Command_Counter * commands_counters[MYSQL_COM_QUERY___NONE]; volatile unsigned int version; diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index b14bfa2ac..2e961424d 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -310,8 +310,13 @@ Query_Processor::Query_Processor() { exit(EXIT_FAILURE); } proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Initializing Query Processor with version=0\n"); +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_init(&rwlock, NULL); + pthread_rwlock_init(&digest_rwlock, NULL); +#else spinlock_rwlock_init(&rwlock); spinlock_rwlock_init(&digest_rwlock); +#endif version=0; for (int i=0; iadd_column_definition(SQLITE_TEXT,"rule_id"); result->add_column_definition(SQLITE_TEXT,"hits"); @@ -544,14 +599,22 @@ SQLite3_result * Query_Processor::get_stats_query_rules() { delete qt; } } +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_unlock(&rwlock); +#else spin_rdunlock(&rwlock); +#endif return result; } SQLite3_result * Query_Processor::get_current_query_rules() { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query rules, using Global version %d\n", version); SQLite3_result *result=new SQLite3_result(30); +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_rdlock(&rwlock); +#else spin_rdlock(&rwlock); +#endif QP_rule_t *qr1; result->add_column_definition(SQLITE_TEXT,"rule_id"); result->add_column_definition(SQLITE_TEXT,"active"); @@ -591,14 +654,22 @@ SQLite3_result * Query_Processor::get_current_query_rules() { result->add_row(qt->pta); delete qt; } +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_unlock(&rwlock); +#else spin_rdunlock(&rwlock); +#endif return result; } SQLite3_result * Query_Processor::get_query_digests() { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n"); SQLite3_result *result=new SQLite3_result(11); +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_rdlock(&digest_rwlock); +#else spin_rdlock(&digest_rwlock); +#endif result->add_column_definition(SQLITE_TEXT,"hid"); result->add_column_definition(SQLITE_TEXT,"schemaname"); result->add_column_definition(SQLITE_TEXT,"usernname"); @@ -616,13 +687,21 @@ SQLite3_result * Query_Processor::get_query_digests() { result->add_row(pta); qds->free_row(pta); } +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_unlock(&digest_rwlock); +#else spin_rdunlock(&digest_rwlock); +#endif return result; } SQLite3_result * Query_Processor::get_query_digests_reset() { SQLite3_result *result=new SQLite3_result(11); +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_wrlock(&digest_rwlock); +#else spin_wrlock(&digest_rwlock); +#endif result->add_column_definition(SQLITE_TEXT,"hid"); result->add_column_definition(SQLITE_TEXT,"schemaname"); result->add_column_definition(SQLITE_TEXT,"usernname"); @@ -642,7 +721,11 @@ SQLite3_result * Query_Processor::get_query_digests_reset() { delete qds; } digest_umap.erase(digest_umap.begin(),digest_umap.end()); +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_unlock(&digest_rwlock); +#else spin_wrunlock(&digest_rwlock); +#endif return result; } @@ -670,7 +753,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses if (__sync_add_and_fetch(&version,0) > _thr_SQP_version) { // update local rules; proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Detected a changed in version. Global:%d , local:%d . Refreshing...\n", version, _thr_SQP_version); +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_rdlock(&rwlock); +#else spin_rdlock(&rwlock); +#endif _thr_SQP_version=__sync_add_and_fetch(&version,0); __reset_rules(_thr_SQP_rules); QP_rule_t *qr1; @@ -712,7 +799,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses _thr_SQP_rules->push_back(qr2); } } +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_unlock(&rwlock); +#else spin_rdunlock(&rwlock); // unlock should be after the copy +#endif } QP_rule_t *qr; re2_t *re2p; @@ -980,7 +1071,11 @@ void Query_Processor::update_query_processor_stats() { // It acquires a read lock to ensure that the rules table doesn't change // Yet, because it has to update vales, it uses atomic operations proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Updating query rules statistics\n"); +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_rdlock(&rwlock); +#else spin_rdlock(&rwlock); +#endif if (__sync_add_and_fetch(&version,0) == _thr_SQP_version) { QP_rule_t *qr; for (std::vector::iterator it=_thr_SQP_rules->begin(); it!=_thr_SQP_rules->end(); ++it) { @@ -991,7 +1086,11 @@ void Query_Processor::update_query_processor_stats() { } } } +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_unlock(&rwlock); +#else spin_rdunlock(&rwlock); +#endif for (int i=0; icounters[j]) { @@ -1086,8 +1185,11 @@ unsigned long long Query_Processor::query_parser_update_counters(MySQL_Session * } void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info) { +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_wrlock(&digest_rwlock); +#else spin_wrlock(&digest_rwlock); - +#endif QP_query_digest_stats *qds; std::unordered_map::iterator it; @@ -1106,7 +1208,11 @@ void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connecti digest_umap.insert(std::make_pair(qp->digest_total,(void *)qds)); } +#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX + pthread_rwlock_unlock(&digest_rwlock); +#else spin_wrunlock(&digest_rwlock); +#endif } char * Query_Processor::get_digest_text(SQP_par_t *qp) {