Remove spinlock from Query Processor #977

pull/1056/head
René Cannaò 9 years ago
parent a0bdae67d1
commit 281e9cd619

@ -3,6 +3,7 @@
#include "proxysql.h"
#include "cpp.h"
#define PROXYSQL_QPRO_PTHREAD_MUTEX
typedef std::unordered_map<std::uint64_t, void *> umap_query_digest;
@ -162,11 +163,19 @@ class Command_Counter {
class Query_Processor {
private:
umap_query_digest digest_umap;
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_t digest_rwlock;
#else
rwlock_t digest_rwlock;
int64_t padding; // to get rwlock cache aligned
#endif
enum MYSQL_COM_QUERY_command __query_parser_command_type(SQP_par_t *qp);
protected:
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_t rwlock;
#else
rwlock_t rwlock;
#endif
std::vector<QP_rule_t *> rules;
Command_Counter * commands_counters[MYSQL_COM_QUERY___NONE];
volatile unsigned int version;

@ -310,8 +310,13 @@ Query_Processor::Query_Processor() {
exit(EXIT_FAILURE);
}
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Initializing Query Processor with version=0\n");
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_init(&rwlock, NULL);
pthread_rwlock_init(&digest_rwlock, NULL);
#else
spinlock_rwlock_init(&rwlock);
spinlock_rwlock_init(&digest_rwlock);
#endif
version=0;
for (int i=0; i<MYSQL_COM_QUERY___NONE; i++) commands_counters[i]=new Command_Counter(i);
@ -400,11 +405,19 @@ void Query_Processor::print_version() {
};
void Query_Processor::wrlock() {
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&rwlock);
#else
spin_wrlock(&rwlock);
#endif
};
void Query_Processor::wrunlock() {
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_wrunlock(&rwlock);
#endif
};
QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *username, char *schemaname, int flagIN, char *client_addr, char *proxy_addr, int proxy_port, char *digest, char *match_digest, char *match_pattern, bool negate_match_pattern, char *re_modifiers, int flagOUT, char *replace_pattern, int destination_hostgroup, int cache_ttl, int reconnect, int timeout, int retries, int delay, int next_query_flagIN, int mirror_flagOUT, int mirror_hostgroup, char *error_msg, int sticky_conn, int multiplex, int log, bool apply, char *comment) {
@ -474,33 +487,71 @@ void Query_Processor::delete_query_rule(QP_rule_t *qr) {
};
void Query_Processor::reset_all(bool lock) {
if (lock) spin_wrlock(&rwlock);
if (lock)
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&rwlock);
#else
spin_wrlock(&rwlock);
#endif
__reset_rules(&rules);
if (lock) spin_wrunlock(&rwlock);
if (lock)
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_wrunlock(&rwlock);
#endif
};
bool Query_Processor::insert(QP_rule_t *qr, bool lock) {
bool ret=true;
if (lock) spin_wrlock(&rwlock);
if (lock)
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&rwlock);
#else
spin_wrlock(&rwlock);
#endif
rules.push_back(qr);
if (lock) spin_wrunlock(&rwlock);
if (lock)
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_wrunlock(&rwlock);
#endif
return ret;
};
void Query_Processor::sort(bool lock) {
if (lock) spin_wrlock(&rwlock);
if (lock)
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&rwlock);
#else
spin_wrlock(&rwlock);
#endif
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Sorting rules\n");
std::sort (rules.begin(), rules.end(), rules_sort_comp_function);
if (lock) spin_wrunlock(&rwlock);
if (lock)
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_wrunlock(&rwlock);
#endif
};
// when commit is called, the version number is increased and the this will trigger the mysql threads to get a new Query Processor Table
// The operation is asynchronous
void Query_Processor::commit() {
spin_wrlock(&rwlock);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&rwlock);
#else
spin_wrlock(&rwlock);
#endif
__sync_add_and_fetch(&version,1);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Increasing version number to %d - all threads will notice this and refresh their rules\n", version);
spin_wrunlock(&rwlock);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_wrunlock(&rwlock);
#endif
};
SQLite3_result * Query_Processor::get_stats_commands_counters() {
@ -531,7 +582,11 @@ SQLite3_result * Query_Processor::get_stats_commands_counters() {
SQLite3_result * Query_Processor::get_stats_query_rules() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping query rules statistics, using Global version %d\n", version);
SQLite3_result *result=new SQLite3_result(2);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_rdlock(&rwlock);
#else
spin_rdlock(&rwlock);
#endif
QP_rule_t *qr1;
result->add_column_definition(SQLITE_TEXT,"rule_id");
result->add_column_definition(SQLITE_TEXT,"hits");
@ -544,14 +599,22 @@ SQLite3_result * Query_Processor::get_stats_query_rules() {
delete qt;
}
}
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_rdunlock(&rwlock);
#endif
return result;
}
SQLite3_result * Query_Processor::get_current_query_rules() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query rules, using Global version %d\n", version);
SQLite3_result *result=new SQLite3_result(30);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_rdlock(&rwlock);
#else
spin_rdlock(&rwlock);
#endif
QP_rule_t *qr1;
result->add_column_definition(SQLITE_TEXT,"rule_id");
result->add_column_definition(SQLITE_TEXT,"active");
@ -591,14 +654,22 @@ SQLite3_result * Query_Processor::get_current_query_rules() {
result->add_row(qt->pta);
delete qt;
}
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_rdunlock(&rwlock);
#endif
return result;
}
SQLite3_result * Query_Processor::get_query_digests() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current query digest\n");
SQLite3_result *result=new SQLite3_result(11);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_rdlock(&digest_rwlock);
#else
spin_rdlock(&digest_rwlock);
#endif
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"usernname");
@ -616,13 +687,21 @@ SQLite3_result * Query_Processor::get_query_digests() {
result->add_row(pta);
qds->free_row(pta);
}
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&digest_rwlock);
#else
spin_rdunlock(&digest_rwlock);
#endif
return result;
}
SQLite3_result * Query_Processor::get_query_digests_reset() {
SQLite3_result *result=new SQLite3_result(11);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&digest_rwlock);
#else
spin_wrlock(&digest_rwlock);
#endif
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"usernname");
@ -642,7 +721,11 @@ SQLite3_result * Query_Processor::get_query_digests_reset() {
delete qds;
}
digest_umap.erase(digest_umap.begin(),digest_umap.end());
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&digest_rwlock);
#else
spin_wrunlock(&digest_rwlock);
#endif
return result;
}
@ -670,7 +753,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
if (__sync_add_and_fetch(&version,0) > _thr_SQP_version) {
// update local rules;
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Detected a changed in version. Global:%d , local:%d . Refreshing...\n", version, _thr_SQP_version);
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_rdlock(&rwlock);
#else
spin_rdlock(&rwlock);
#endif
_thr_SQP_version=__sync_add_and_fetch(&version,0);
__reset_rules(_thr_SQP_rules);
QP_rule_t *qr1;
@ -712,7 +799,11 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
_thr_SQP_rules->push_back(qr2);
}
}
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_rdunlock(&rwlock); // unlock should be after the copy
#endif
}
QP_rule_t *qr;
re2_t *re2p;
@ -980,7 +1071,11 @@ void Query_Processor::update_query_processor_stats() {
// It acquires a read lock to ensure that the rules table doesn't change
// Yet, because it has to update vales, it uses atomic operations
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Updating query rules statistics\n");
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_rdlock(&rwlock);
#else
spin_rdlock(&rwlock);
#endif
if (__sync_add_and_fetch(&version,0) == _thr_SQP_version) {
QP_rule_t *qr;
for (std::vector<QP_rule_t *>::iterator it=_thr_SQP_rules->begin(); it!=_thr_SQP_rules->end(); ++it) {
@ -991,7 +1086,11 @@ void Query_Processor::update_query_processor_stats() {
}
}
}
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&rwlock);
#else
spin_rdunlock(&rwlock);
#endif
for (int i=0; i<MYSQL_COM_QUERY___NONE; i++) {
for (int j=0; j<13; j++) {
if (_thr_commands_counters[i]->counters[j]) {
@ -1086,8 +1185,11 @@ unsigned long long Query_Processor::query_parser_update_counters(MySQL_Session *
}
void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info) {
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_wrlock(&digest_rwlock);
#else
spin_wrlock(&digest_rwlock);
#endif
QP_query_digest_stats *qds;
std::unordered_map<uint64_t, void *>::iterator it;
@ -1106,7 +1208,11 @@ void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connecti
digest_umap.insert(std::make_pair(qp->digest_total,(void *)qds));
}
#ifdef PROXYSQL_QPRO_PTHREAD_MUTEX
pthread_rwlock_unlock(&digest_rwlock);
#else
spin_wrunlock(&digest_rwlock);
#endif
}
char * Query_Processor::get_digest_text(SQP_par_t *qp) {

Loading…
Cancel
Save