Add new variable 'mysql-query_rules_fast_routing_algorithm'

- Implemented new variable 'query_rules_fast_routing_algorithm' that
   allows choosing between the classic per-thread hashmaps for
   'query_rules_fast_routing' searches (1), and a new unique global
   hashmap (2).
 - Improved locking time on 'QueryProcessor::rwlock' during
   'load_mysql_query_rules_to_runtime'.
 - Changed computation of 'mysql_query_rules_memory' to be lazily
   updated by threads themselves while building the per-threads maps.
 - Fixed 'ProxySQL_Admin::stats___memory_metrics' report of the memory
   metrics.
 - Improved 'PROXYSQLTEST 17' command, for being able to micro-benchmark
   and check 'query_rules_fast_routing_algorithm'.
pull/4182/head
Javier Jaramago Fernández 3 years ago
parent 810f86a2f9
commit abccb39201

@ -524,6 +524,7 @@ class MySQL_Threads_Handler
int threshold_resultset_size;
int query_digests_max_digest_length;
int query_digests_max_query_length;
int query_rules_fast_routing_algorithm;
int wait_timeout;
int throttle_max_bytes_per_second_to_client;
int throttle_ratio_server_to_client;

@ -516,7 +516,7 @@ class ProxySQL_Admin {
#endif // TEST_GROUPREP
unsigned int ProxySQL_Test___GenerateRandom_mysql_query_rules_fast_routing(unsigned int, bool);
bool ProxySQL_Test___Verify_mysql_query_rules_fast_routing(int *ret1, int *ret2, int cnt, int dual);
bool ProxySQL_Test___Verify_mysql_query_rules_fast_routing(int *ret1, int *ret2, int cnt, int dual, int ths, bool lock, bool maps_per_thread);
void ProxySQL_Test___MySQL_HostGroups_Manager_generate_many_clusters();
unsigned long long ProxySQL_Test___MySQL_HostGroups_Manager_read_only_action();
#ifdef DEBUG

@ -837,6 +837,7 @@ __thread bool mysql_thread___query_digests_track_hostname;
__thread bool mysql_thread___query_digests_keep_comment;
__thread int mysql_thread___query_digests_max_digest_length;
__thread int mysql_thread___query_digests_max_query_length;
__thread int mysql_thread___query_rules_fast_routing_algorithm;
__thread bool mysql_thread___parse_failure_logs_digest;
__thread int mysql_thread___show_processlist_extended;
__thread int mysql_thread___session_idle_ms;
@ -1003,6 +1004,7 @@ extern __thread bool mysql_thread___query_digests_track_hostname;
extern __thread bool mysql_thread___query_digests_keep_comment;
extern __thread int mysql_thread___query_digests_max_digest_length;
extern __thread int mysql_thread___query_digests_max_query_length;
extern __thread int mysql_thread___query_rules_fast_routing_algorithm;
extern __thread bool mysql_thread___parse_failure_logs_digest;
extern __thread int mysql_thread___show_processlist_extended;
extern __thread int mysql_thread___session_idle_ms;

@ -255,6 +255,33 @@ class Command_Counter {
}
};
/**
* @brief Frees the supplied query rules and cleans the vector.
*/
void __reset_rules(std::vector<QP_rule_t*>* qrs);
/**
* @brief Helper type for performing the 'mysql_rules_fast_routing' hashmaps creation.
* @details Holds all the info 'Query_Processor' requires about the hashmap.
*/
struct fast_routing_hashmap_t {
SQLite3_result* rules_resultset;
unsigned long long rules_resultset_size;
khash_t(khStrInt)* rules_fast_routing;
char* rules_fast_routing___keys_values;
unsigned long long rules_fast_routing___keys_values___size;
};
/**
* @brief Helper type for backing up 'query_rules' memory structures.
* @details Used when reinitializing the query rules.
*/
struct rules_mem_sts_t {
std::vector<QP_rule_t*> query_rules;
char* rules_fast_routing___keys_values;
khash_t(khStrInt)* rules_fast_routing;
};
class Query_Processor {
private:
char rand_del[16];
@ -268,6 +295,7 @@ class Query_Processor {
khash_t(khStrInt) * rules_fast_routing;
char * rules_fast_routing___keys_values;
unsigned long long rules_fast_routing___keys_values___size;
unsigned long long rules_fast_routing___number;
Command_Counter * commands_counters[MYSQL_COM_QUERY___NONE];
// firewall
@ -289,7 +317,7 @@ class Query_Processor {
Query_Processor();
~Query_Processor();
void print_version();
void reset_all(bool lock=true);
rules_mem_sts_t reset_all(bool lock=true);
void wrlock(); // explicit write lock, to be used in multi-insert
void wrunlock(); // explicit write unlock
bool insert(QP_rule_t *qr, bool lock=true); // insert a new rule. Uses a generic void pointer to a structure that may vary depending from the Query Processor
@ -342,12 +370,25 @@ class Query_Processor {
// fast routing
SQLite3_result * fast_routing_resultset; // here we save a copy of resultset for query rules fast routing
void load_fast_routing(SQLite3_result *resultset);
/**
* @brief Creates a hashmap for 'rules_fast_routing' from the provided resultset.
* @param resultset A resulset from which to create a hashmap.
* @return A hashmap encapsulated into the 'fast_routing_hashmap_t' type.
*/
fast_routing_hashmap_t create_fast_routing_hashmap(SQLite3_result* resultset);
/**
* @brief Swaps the current 'rules_fast_routing' hashmap, updating all the required related info.
* @details This function assumes caller has taken write access over ''
* @param fast_routing_hashmap New hashmap and info replacing current.
* @return Old 'fast_routing_resultset' that has been replaced. Required to be freed by caller.
*/
SQLite3_result* load_fast_routing(const fast_routing_hashmap_t& fast_routing_hashmap);
int search_rules_fast_routing_dest_hg(khash_t(khStrInt)* _rules_fast_routing, const char* u, const char* s, int flagIN);
SQLite3_result * get_current_query_rules_fast_routing();
SQLite3_result * get_current_query_rules_fast_routing_inner();
int get_current_query_rules_fast_routing_count();
int testing___find_HG_in_mysql_query_rules_fast_routing(char *username, char *schemaname, int flagIN);
int testing___find_HG_in_mysql_query_rules_fast_routing_dual(char *username, char *schemaname, int flagIN);
int testing___find_HG_in_mysql_query_rules_fast_routing_dual(khash_t(khStrInt)* _rules_fast_routing, char *username, char *schemaname, int flagIN, bool lock);
// firewall
void load_mysql_firewall(SQLite3_result *u, SQLite3_result *r, SQLite3_result *sf);

@ -509,6 +509,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"query_digests_max_query_length",
(char *)"query_digests_grouping_limit",
(char *)"query_digests_groups_grouping_limit",
(char *)"query_rules_fast_routing_algorithm",
(char *)"wait_timeout",
(char *)"throttle_max_bytes_per_second_to_client",
(char *)"throttle_ratio_server_to_client",
@ -1119,6 +1120,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.threshold_resultset_size=4*1024*1024;
variables.query_digests_max_digest_length=2*1024;
variables.query_digests_max_query_length=65000; // legacy default
variables.query_rules_fast_routing_algorithm=1;
variables.wait_timeout=8*3600*1000;
variables.throttle_max_bytes_per_second_to_client=0;
variables.throttle_ratio_server_to_client=0;
@ -2203,6 +2205,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["query_digests_groups_grouping_limit"] = make_tuple(&variables.query_digests_groups_grouping_limit, 0, 2089, false);
VariablesPointers_int["query_digests_max_digest_length"] = make_tuple(&variables.query_digests_max_digest_length, 16, 1*1024*1024, false);
VariablesPointers_int["query_digests_max_query_length"] = make_tuple(&variables.query_digests_max_query_length, 16, 1*1024*1024, false);
VariablesPointers_int["query_rules_fast_routing_algorithm"] = make_tuple(&variables.query_rules_fast_routing_algorithm, 1, 2, false);
VariablesPointers_int["query_processor_iterations"] = make_tuple(&variables.query_processor_iterations, 0, 1000*1000, false);
VariablesPointers_int["query_processor_regex"] = make_tuple(&variables.query_processor_regex, 1, 2, false);
VariablesPointers_int["query_retries_on_failure"] = make_tuple(&variables.query_retries_on_failure, 0, 1000, false);
@ -3967,6 +3970,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___threshold_resultset_size=GloMTH->get_variable_int((char *)"threshold_resultset_size");
mysql_thread___query_digests_max_digest_length=GloMTH->get_variable_int((char *)"query_digests_max_digest_length");
mysql_thread___query_digests_max_query_length=GloMTH->get_variable_int((char *)"query_digests_max_query_length");
mysql_thread___query_rules_fast_routing_algorithm=GloMTH->get_variable_int((char *)"query_rules_fast_routing_algorithm");
mysql_thread___wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout");
mysql_thread___throttle_max_bytes_per_second_to_client=GloMTH->get_variable_int((char *)"throttle_max_bytes_per_second_to_client");
mysql_thread___throttle_ratio_server_to_client=GloMTH->get_variable_int((char *)"throttle_ratio_server_to_client");

@ -3891,10 +3891,12 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) {
int test_n = 0;
int test_arg1 = 0;
int test_arg2 = 0;
int test_arg3 = -1;
int test_arg4 = -1;
int r1 = 0;
proxy_warning("Received PROXYSQLTEST command: %s\n", query_no_space);
char *msg = NULL;
sscanf(query_no_space+strlen("PROXYSQLTEST "),"%d %d %d", &test_n, &test_arg1, &test_arg2);
sscanf(query_no_space+strlen("PROXYSQLTEST "),"%d %d %d %d %d", &test_n, &test_arg1, &test_arg2, &test_arg3, &test_arg4);
if (test_n) {
switch (test_n) {
case 1:
@ -4002,13 +4004,28 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) {
break;
case 14: // old algorithm
case 17: // perform dual lookup, with and without username
// verify all mysql_query_rules_fast_routing rules
// Allows to verify and benchmark 'mysql_query_rules_fast_routing'. Every options
// verifies all 'mysql_query_rules_fast_routing' rules:
// - Test num: 14 old algorithm, 17 perform a dual lookup.
// - arg1: 1-N Number of times the computation should be repeated.
// - arg2: 1-N Number of parallel threads for the test.
// - arg3: 1-0 Wether or not to acquire a read_lock before searching in the hashmap.
// - arg4: 1-0 Wether or not to create thread specific hashmaps for the search.
if (test_arg1==0) {
test_arg1=1;
}
// To preserve classic mode
if (test_arg3 == -1) {
test_arg3 = 1;
}
if (test_arg4 == -1) {
test_arg4 = 0;
}
{
int ret1, ret2;
bool bret = SPA->ProxySQL_Test___Verify_mysql_query_rules_fast_routing(&ret1, &ret2, test_arg1, (test_n==14 ? 0 : 1));
bool bret = SPA->ProxySQL_Test___Verify_mysql_query_rules_fast_routing(
&ret1, &ret2, test_arg1, (test_n==14 ? 0 : 1), test_arg2, test_arg3, test_arg4
);
if (bret) {
SPA->send_MySQL_OK(&sess->client_myds->myprot, (char *)"Verified all rules in mysql_query_rules_fast_routing", ret1);
} else {
@ -7372,59 +7389,137 @@ bool ProxySQL_Admin::ProxySQL_Test___Load_MySQL_Whitelist(int *ret1, int *ret2,
}
// if dual is not 0 , we call the new search algorithm
bool ProxySQL_Admin::ProxySQL_Test___Verify_mysql_query_rules_fast_routing(int *ret1, int *ret2, int cnt, int dual) {
bool ProxySQL_Admin::ProxySQL_Test___Verify_mysql_query_rules_fast_routing(
int *ret1, int *ret2, int cnt, int dual, int ths, bool lock, bool maps_per_thread
) {
// A thread param of '0' is equivalent to not testing
if (ths == 0) { ths = 1; }
char *q = (char *)"SELECT username, schemaname, flagIN, destination_hostgroup FROM mysql_query_rules_fast_routing ORDER BY RANDOM()";
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
int matching_rows = 0;
bool ret = true;
admindb->execute_statement(q, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", q, error);
*ret1 = -1;
return false;
} else {
*ret2 = resultset->rows_count;
int matching_rows = 0;
SQLite3_result *resultset=NULL;
{
char *error=NULL;
int cols=0;
int affected_rows=0;
admindb->execute_statement(q, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", q, error);
*ret1 = -1;
return false;
}
}
*ret2 = resultset->rows_count;
char *query2=(char *)"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM main.mysql_query_rules_fast_routing ORDER BY username, schemaname, flagIN";
SQLite3_result* resultset2 = nullptr;
if (maps_per_thread) {
char* error2 = nullptr;
int cols2 = 0;
int affected_rows2 = 0;
admindb->execute_statement(query2, &error2 , &cols2 , &affected_rows2 , &resultset2);
if (error2) {
proxy_error("Error on %s : %s\n", query2, error2);
return false;
}
}
vector<uint32_t> results(ths, 0);
vector<fast_routing_hashmap_t> th_hashmaps {};
if (maps_per_thread) {
for (uint32_t i = 0; i < ths; i++) {
th_hashmaps.push_back(GloQPro->create_fast_routing_hashmap(resultset2));
}
}
const auto perform_searches =
[&results,&dual](khash_t(khStrInt)* hashmap, SQLite3_result* resultset, uint32_t pos, bool lock) -> void
{
uint32_t matching_rows = 0;
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int dest_HG = atoi(r->fields[3]);
int ret_HG;
if (dual==0) {
// legacy algorithm
ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing(r->fields[0], r->fields[1], atoi(r->fields[2]));
int ret_HG = -1;
if (dual) {
ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing_dual(
hashmap, r->fields[0], r->fields[1], atoi(r->fields[2]), lock
);
} else {
ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing_dual(r->fields[0], r->fields[1], atoi(r->fields[2]));
ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing(
r->fields[0], r->fields[1], atoi(r->fields[2])
);
}
if (dest_HG == ret_HG) {
matching_rows++;
}
}
results[pos] = matching_rows;
};
proxy_info("Test with params - cnt: %d, threads: %d, lock: %d, maps_per_thread: %d\n", cnt, ths, lock, maps_per_thread);
unsigned long long curtime1 = monotonic_time() / 1000;
std::vector<std::thread> workers {};
for (uint32_t i = 0; i < ths; i++) {
khash_t(khStrInt)* hashmap = maps_per_thread ? th_hashmaps[i].rules_fast_routing : nullptr;
workers.push_back(std::thread(perform_searches, hashmap, resultset, i, lock));
}
if (matching_rows != resultset->rows_count) {
for (std::thread& w : workers) {
w.join();
}
matching_rows = results[0];
if (matching_rows != resultset->rows_count) {
ret = false;
}
*ret1 = matching_rows;
if (ret == true) {
if (cnt > 1) {
for (int i=1 ; i < cnt; i++) {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int dest_HG = atoi(r->fields[3]);
int ret_HG;
if (dual==0) {
// legacy algorithm
ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing(r->fields[0], r->fields[1], atoi(r->fields[2]));
} else {
ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing_dual(r->fields[0], r->fields[1], atoi(r->fields[2]));
}
assert(dest_HG==ret_HG);
std::vector<std::thread> workers {};
for (uint32_t i = 0; i < ths; i++) {
khash_t(khStrInt)* hashmap = maps_per_thread ? th_hashmaps[i].rules_fast_routing : nullptr;
workers.push_back(std::thread(perform_searches, hashmap, resultset, i, lock));
}
for (std::thread& w : workers) {
w.join();
}
}
}
}
unsigned long long curtime2 = monotonic_time() / 1000;
uint32_t total_maps_size = 0;
for (const fast_routing_hashmap_t& hashmap : th_hashmaps) {
total_maps_size += hashmap.rules_fast_routing___keys_values___size;
total_maps_size += kh_size(hashmap.rules_fast_routing) * ((sizeof(int) + sizeof(char *) + 4));
kh_destroy(khStrInt, hashmap.rules_fast_routing);
free(hashmap.rules_fast_routing___keys_values);
}
proxy_info("Test took %llums\n", curtime2 - curtime1);
proxy_info("Verified rows %d\n", results[0]);
proxy_info("Total maps size %dkb\n", total_maps_size / 1024);
if (resultset) delete resultset;
if (resultset2) delete resultset2;
return ret;
}
@ -8773,9 +8868,6 @@ void ProxySQL_Admin::stats___memory_metrics() {
}
if (GloQPro) {
unsigned long long mu = GloQPro->get_rules_mem_used();
if (GloMTH) {
mu += mu * GloMTH->num_threads;
}
vn=(char *)"mysql_query_rules_memory";
sprintf(bu,"%llu",mu);
query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16);
@ -12340,10 +12432,24 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_
} else if (error2) {
proxy_error("Error on %s : %s\n", query2, error2);
} else {
GloQPro->wrlock();
fast_routing_hashmap_t fast_routing_hashmap { GloQPro->create_fast_routing_hashmap(resultset2) };
#ifdef BENCHMARK_FASTROUTING_LOAD
for (int i=0; i<10; i++) {
#endif // BENCHMARK_FASTROUTING_LOAD
// Computed resultsets checksums outside of critical sections
uint64_t hash1 = 0;
uint64_t hash2 = 0;
if (
SQLite3_query_rules_resultset == nullptr ||
SQLite3_query_rules_fast_routing_resultset == nullptr
) {
hash1 = resultset->raw_checksum();
hash2 = resultset2->raw_checksum();
}
unsigned long long curtime1 = monotonic_time();
GloQPro->wrlock();
if (checksum_variables.checksum_mysql_query_rules) {
pthread_mutex_lock(&GloVars.checksum_mutex);
char* buff = nullptr;
@ -12354,8 +12460,6 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_
SQLite3_query_rules_resultset == nullptr ||
SQLite3_query_rules_fast_routing_resultset == nullptr
) {
uint64_t hash1 = resultset->raw_checksum();
uint64_t hash2 = resultset2->raw_checksum();
hash1 += hash2;
uint32_t d32[2];
memcpy(&d32, &hash1, sizeof(hash1));
@ -12393,7 +12497,7 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_
GloVars.checksums_values.mysql_query_rules.checksum, GloVars.checksums_values.mysql_query_rules.epoch
);
}
GloQPro->reset_all(false);
rules_mem_sts_t prev_rules_data { GloQPro->reset_all(false) };
QP_rule_t * nqpr;
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
@ -12462,13 +12566,30 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_
#else
// load the original resultset and resultset2
GloQPro->save_query_rules(resultset);
GloQPro->load_fast_routing(resultset2);
SQLite3_result* prev_fast_routing_resultset = GloQPro->load_fast_routing(fast_routing_hashmap);
#endif // BENCHMARK_FASTROUTING_LOAD
GloQPro->commit();
#ifdef BENCHMARK_FASTROUTING_LOAD
}
#endif // BENCHMARK_FASTROUTING_LOAD
GloQPro->wrunlock();
unsigned long long curtime2 = monotonic_time();
unsigned long long elapsed_ms = (curtime2/1000) - (curtime1/1000);
if (elapsed_ms > 5) {
proxy_info("Query processor locked for %llums\n", curtime2 - curtime1);
}
// Free previous 'fast_routing' structures outside of critical section
{
delete prev_fast_routing_resultset;
if (prev_rules_data.rules_fast_routing) {
kh_destroy(khStrInt, prev_rules_data.rules_fast_routing);
}
if (prev_rules_data.rules_fast_routing___keys_values) {
free(prev_rules_data.rules_fast_routing___keys_values);
}
__reset_rules(&prev_rules_data.query_rules);
}
}
// if (resultset) delete resultset; // never delete it. GloQPro saves it
// if (resultset2) delete resultset2; // never delete it. GloQPro saves it

@ -431,7 +431,7 @@ static void __delete_query_rule(QP_rule_t *qr) {
// delete all the query rules in a Query Processor Table
// Note that this function is called by GloQPro with &rules (generic table)
// and is called by each mysql thread with _thr_SQP_rules (per thread table)
static void __reset_rules(std::vector<QP_rule_t *> * qrs) {
void __reset_rules(std::vector<QP_rule_t *> * qrs) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Resetting rules in Query Processor Table %p\n", qrs);
if (qrs==NULL) return;
QP_rule_t *qr;
@ -546,7 +546,8 @@ Query_Processor::Query_Processor() {
}
query_rules_resultset = NULL;
fast_routing_resultset = NULL;
rules_fast_routing = kh_init(khStrInt); // create a hashtable
// 'rules_fast_routing' structures created on demand
rules_fast_routing = nullptr;
rules_fast_routing___keys_values = NULL;
rules_fast_routing___keys_values___size = 0;
new_req_conns_count = 0;
@ -555,7 +556,9 @@ Query_Processor::Query_Processor() {
Query_Processor::~Query_Processor() {
for (int i=0; i<MYSQL_COM_QUERY___NONE; i++) delete commands_counters[i];
__reset_rules(&rules);
kh_destroy(khStrInt, rules_fast_routing);
if (rules_fast_routing) {
kh_destroy(khStrInt, rules_fast_routing);
}
if (rules_fast_routing___keys_values) {
free(rules_fast_routing___keys_values);
rules_fast_routing___keys_values = NULL;
@ -597,7 +600,8 @@ void Query_Processor::init_thread() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Initializing Per-Thread Query Processor Table with version=0\n");
_thr_SQP_version=0;
_thr_SQP_rules=new std::vector<QP_rule_t *>;
_thr_SQP_rules_fast_routing = kh_init(khStrInt); // create a hashtable
// per-thread 'rules_fast_routing' structures are created on demand
_thr_SQP_rules_fast_routing = nullptr;
_thr___rules_fast_routing___keys_values = NULL;
for (int i=0; i<MYSQL_COM_QUERY___NONE; i++) _thr_commands_counters[i] = new Command_Counter(i);
};
@ -607,7 +611,9 @@ void Query_Processor::end_thread() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Destroying Per-Thread Query Processor Table with version=%d\n", _thr_SQP_version);
__reset_rules(_thr_SQP_rules);
delete _thr_SQP_rules;
kh_destroy(khStrInt, _thr_SQP_rules_fast_routing);
if (_thr_SQP_rules_fast_routing) {
kh_destroy(khStrInt, _thr_SQP_rules_fast_routing);
}
if (_thr___rules_fast_routing___keys_values) {
free(_thr___rules_fast_routing___keys_values);
_thr___rules_fast_routing___keys_values = NULL;
@ -727,21 +733,29 @@ void Query_Processor::delete_query_rule(QP_rule_t *qr) {
__delete_query_rule(qr);
};
void Query_Processor::reset_all(bool lock) {
rules_mem_sts_t Query_Processor::reset_all(bool lock) {
if (lock)
pthread_rwlock_wrlock(&rwlock);
__reset_rules(&rules);
rules_mem_sts_t hashmaps_data {};
this->rules.swap(hashmaps_data.query_rules);
if (rules_fast_routing) {
kh_destroy(khStrInt, rules_fast_routing);
rules_fast_routing = NULL;
rules_fast_routing = kh_init(khStrInt); // create a hashtable
hashmaps_data.rules_fast_routing = rules_fast_routing;
rules_fast_routing = nullptr;
}
free(rules_fast_routing___keys_values);
rules_fast_routing___keys_values = NULL;
rules_fast_routing___keys_values___size = 0;
if (rules_fast_routing___keys_values) {
hashmaps_data.rules_fast_routing___keys_values = rules_fast_routing___keys_values;
rules_fast_routing___keys_values = NULL;
rules_fast_routing___keys_values___size = 0;
}
if (lock)
pthread_rwlock_unlock(&rwlock);
rules_mem_used=0;
return hashmaps_data;
};
bool Query_Processor::insert(QP_rule_t *qr, bool lock) {
@ -914,6 +928,35 @@ SQLite3_result * Query_Processor::get_current_query_rules_fast_routing() {
return result;
}
int Query_Processor::search_rules_fast_routing_dest_hg(
khash_t(khStrInt)* _rules_fast_routing, const char* u, const char* s, int flagIN
) {
int dest_hg = -1;
char keybuf[256];
char * keybuf_ptr = keybuf;
size_t keylen = strlen(u)+strlen(rand_del)+strlen(s)+30; // 30 is a big number
if (keylen > 250) {
keybuf_ptr = (char *)malloc(keylen);
}
sprintf(keybuf_ptr,"%s%s%s---%d", u, rand_del, s, flagIN);
khiter_t k = kh_get(khStrInt, _rules_fast_routing, keybuf_ptr);
if (k == kh_end(_rules_fast_routing)) {
sprintf(keybuf_ptr,"%s%s---%d", rand_del, s, flagIN);
khiter_t k2 = kh_get(khStrInt, _rules_fast_routing, keybuf_ptr);
if (k2 == kh_end(_rules_fast_routing)) {
} else {
dest_hg = kh_val(_rules_fast_routing,k2);
}
} else {
dest_hg = kh_val(_rules_fast_routing,k);
}
if (keylen > 250) {
free(keybuf_ptr);
}
return dest_hg;
}
struct get_query_digests_parallel_args {
unsigned long long ret;
pthread_t thr;
@ -1703,23 +1746,39 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
_thr_SQP_rules->push_back(qr2);
}
}
kh_destroy(khStrInt, _thr_SQP_rules_fast_routing);
_thr_SQP_rules_fast_routing = kh_init(khStrInt); // create a hashtable
if (_thr___rules_fast_routing___keys_values) {
free(_thr___rules_fast_routing___keys_values);
_thr___rules_fast_routing___keys_values = NULL;
}
if (rules_fast_routing___keys_values___size) {
_thr___rules_fast_routing___keys_values = (char *)malloc(rules_fast_routing___keys_values___size);
memcpy(_thr___rules_fast_routing___keys_values, rules_fast_routing___keys_values, rules_fast_routing___keys_values___size);
char *ptr = _thr___rules_fast_routing___keys_values;
while (ptr < _thr___rules_fast_routing___keys_values + rules_fast_routing___keys_values___size) {
char *ptr2 = ptr+strlen(ptr)+1;
int destination_hostgroup = atoi(ptr2);
int ret;
khiter_t k = kh_put(khStrInt, _thr_SQP_rules_fast_routing, ptr, &ret); // add the key
kh_value(_thr_SQP_rules_fast_routing, k) = destination_hostgroup; // set the value of the key
ptr = ptr2+strlen(ptr2)+1;
if (mysql_thread___query_rules_fast_routing_algorithm == 1) {
if (_thr_SQP_rules_fast_routing) {
kh_destroy(khStrInt, _thr_SQP_rules_fast_routing);
_thr_SQP_rules_fast_routing = nullptr;
}
if (_thr___rules_fast_routing___keys_values) {
free(_thr___rules_fast_routing___keys_values);
_thr___rules_fast_routing___keys_values = NULL;
}
if (rules_fast_routing___keys_values___size) {
_thr_SQP_rules_fast_routing = kh_init(khStrInt); // create a hashtable
_thr___rules_fast_routing___keys_values = (char *)malloc(rules_fast_routing___keys_values___size);
memcpy(_thr___rules_fast_routing___keys_values, rules_fast_routing___keys_values, rules_fast_routing___keys_values___size);
rules_mem_used += rules_fast_routing___keys_values___size; // per-thread
char *ptr = _thr___rules_fast_routing___keys_values;
while (ptr < _thr___rules_fast_routing___keys_values + rules_fast_routing___keys_values___size) {
char *ptr2 = ptr+strlen(ptr)+1;
int destination_hostgroup = atoi(ptr2);
int ret;
khiter_t k = kh_put(khStrInt, _thr_SQP_rules_fast_routing, ptr, &ret); // add the key
kh_value(_thr_SQP_rules_fast_routing, k) = destination_hostgroup; // set the value of the key
rules_mem_used += ((sizeof(int) + sizeof(char *) + 4)); // not sure about memory overhead
ptr = ptr2+strlen(ptr2)+1;
}
}
} else {
if (_thr_SQP_rules_fast_routing) {
kh_destroy(khStrInt, _thr_SQP_rules_fast_routing);
_thr_SQP_rules_fast_routing = nullptr;
}
if (_thr___rules_fast_routing___keys_values) {
free(_thr___rules_fast_routing___keys_values);
_thr___rules_fast_routing___keys_values = nullptr;
}
}
//for (std::unordered_map<std::string, int>::iterator it = rules_fast_routing.begin(); it != rules_fast_routing.end(); ++it) {
@ -1993,30 +2052,23 @@ __exit_process_mysql_query:
if (qr == NULL || qr->apply == false) {
// now it is time to check mysql_query_rules_fast_routing
// it is only check if "apply" is not true
if (_thr___rules_fast_routing___keys_values) {
char keybuf[256];
char * keybuf_ptr = keybuf;
const char * u = sess->client_myds->myconn->userinfo->username;
const char * s = sess->client_myds->myconn->userinfo->schemaname;
size_t keylen = strlen(u)+strlen(rand_del)+strlen(s)+30; // 30 is a big number
if (keylen > 250) {
keybuf_ptr = (char *)malloc(keylen);
}
sprintf(keybuf_ptr,"%s%s%s---%d", u, rand_del, s, flagIN);
khiter_t k = kh_get(khStrInt, _thr_SQP_rules_fast_routing, keybuf_ptr);
if (k == kh_end(_thr_SQP_rules_fast_routing)) {
sprintf(keybuf_ptr,"%s%s---%d", rand_del, s, flagIN);
khiter_t k2 = kh_get(khStrInt, _thr_SQP_rules_fast_routing, keybuf_ptr);
if (k2 == kh_end(_thr_SQP_rules_fast_routing)) {
} else {
ret->destination_hostgroup = kh_val(_thr_SQP_rules_fast_routing,k2);
}
} else {
ret->destination_hostgroup = kh_val(_thr_SQP_rules_fast_routing,k);
}
if (keylen > 250) {
free(keybuf_ptr);
}
const char * u = sess->client_myds->myconn->userinfo->username;
const char * s = sess->client_myds->myconn->userinfo->schemaname;
int dst_hg = -1;
if (_thr_SQP_rules_fast_routing != nullptr) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 7, "Searching thread-local 'rules_fast_routing' hashmap with: user='%s', schema='%s', and flagIN='%d'\n", u, s, flagIN);
dst_hg = search_rules_fast_routing_dest_hg(_thr_SQP_rules_fast_routing, u, s, flagIN);
} else if (rules_fast_routing != nullptr) {
pthread_rwlock_rdlock(&this->rwlock);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 7, "Searching global 'rules_fast_routing' hashmap with: user='%s', schema='%s', and flagIN='%d'\n", u, s, flagIN);
dst_hg = search_rules_fast_routing_dest_hg(rules_fast_routing, u, s, flagIN);
pthread_rwlock_unlock(&this->rwlock);
}
if (dst_hg != -1) {
ret->destination_hostgroup = dst_hg;
}
}
// FIXME : there is too much data being copied around
@ -3029,11 +3081,15 @@ void Query_Processor::save_query_rules(SQLite3_result *resultset) {
query_rules_resultset = resultset; // save it
}
void Query_Processor::load_fast_routing(SQLite3_result *resultset) {
unsigned long long tot_size = 0;
fast_routing_hashmap_t Query_Processor::create_fast_routing_hashmap(SQLite3_result* resultset) {
khash_t(khStrInt)* fast_routing = nullptr;
char* keys_values = nullptr;
unsigned long long keys_values_size = 0;
size_t rand_del_size = strlen(rand_del);
int num_rows = resultset->rows_count;
if (num_rows) {
unsigned long long tot_size = 0;
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
size_t row_length = strlen(r->fields[0]) + strlen(r->fields[1]) + strlen(r->fields[2]) + strlen(r->fields[3]);
@ -3042,19 +3098,18 @@ void Query_Processor::load_fast_routing(SQLite3_result *resultset) {
row_length += rand_del_size;
tot_size += row_length;
}
int nt = GloMTH->num_threads;
rules_fast_routing___keys_values = (char *)malloc(tot_size);
rules_fast_routing___keys_values___size = tot_size;
rules_mem_used += rules_fast_routing___keys_values___size; // global
rules_mem_used += rules_fast_routing___keys_values___size * nt; // per-thread
char *ptr = rules_fast_routing___keys_values;
keys_values = (char *)malloc(tot_size);
keys_values_size = tot_size;
char *ptr = keys_values;
fast_routing = kh_init(khStrInt);
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
sprintf(ptr,"%s%s%s---%s",r->fields[0],rand_del,r->fields[1],r->fields[2]);
int destination_hostgroup = atoi(r->fields[3]);
int ret;
khiter_t k = kh_put(khStrInt, rules_fast_routing, ptr, &ret); // add the key
kh_value(rules_fast_routing, k) = destination_hostgroup; // set the value of the key
khiter_t k = kh_put(khStrInt, fast_routing, ptr, &ret); // add the key
kh_value(fast_routing, k) = destination_hostgroup; // set the value of the key
int l = strlen((const char *)ptr);
ptr += l;
ptr++; // NULL 1
@ -3062,13 +3117,35 @@ void Query_Processor::load_fast_routing(SQLite3_result *resultset) {
memcpy(ptr,r->fields[3],l+1);
ptr += l;
ptr++; // NULL 2
rules_mem_used += ((sizeof(int) + sizeof(char *) + 4 )); // not sure about memory overhead
rules_mem_used += ((sizeof(int) + sizeof(char *) + 4 ) * nt); // per-thread . not sure about memory overhead
}
}
delete fast_routing_resultset;
fast_routing_resultset = resultset; // save it
rules_mem_used += fast_routing_resultset->get_size();
return { resultset, resultset->get_size(), fast_routing, keys_values, keys_values_size };
}
SQLite3_result* Query_Processor::load_fast_routing(const fast_routing_hashmap_t& fast_routing_hashmap) {
khash_t(khStrInt)* _rules_fast_routing = fast_routing_hashmap.rules_fast_routing;
SQLite3_result* _rules_resultset = fast_routing_hashmap.rules_resultset;
if (_rules_fast_routing && _rules_resultset) {
// Replace map structures, assumed to be previously reset
this->rules_fast_routing___keys_values = fast_routing_hashmap.rules_fast_routing___keys_values;
this->rules_fast_routing___keys_values___size = fast_routing_hashmap.rules_fast_routing___keys_values___size;
this->rules_fast_routing = _rules_fast_routing;
// Update global memory stats
rules_mem_used += rules_fast_routing___keys_values___size; // global
khint_t map_size = kh_size(_rules_fast_routing);
rules_mem_used += map_size * ((sizeof(int) + sizeof(char *) + 4 )); // not sure about memory overhead
}
// Backup current resultset for later freeing
SQLite3_result* prev_fast_routing_resultset = this->fast_routing_resultset;
// Save new resultset
fast_routing_resultset = _rules_resultset;
// Use resultset pre-computed size
rules_mem_used += fast_routing_hashmap.rules_resultset_size;
return prev_fast_routing_resultset;
};
// this testing function doesn't care if the user exists or not
@ -3100,44 +3177,22 @@ int Query_Processor::testing___find_HG_in_mysql_query_rules_fast_routing(char *u
// this testing function implement the dual search: with and without username
// if the length of username is 0 , it will search for random username (that shouldn't exist!)
int Query_Processor::testing___find_HG_in_mysql_query_rules_fast_routing_dual(char *username, char *schemaname, int flagIN) {
int Query_Processor::testing___find_HG_in_mysql_query_rules_fast_routing_dual(
khash_t(khStrInt)* _rules_fast_routing, char* username, char* schemaname, int flagIN, bool lock
) {
int ret = -1;
const char * random_user = (char *)"my_ReaLLy_Rand_User_123456";
char * u = NULL;
if (strlen(username)) {
u = username;
} else {
u = (char *)random_user;
khash_t(khStrInt)* rules_fast_routing = _rules_fast_routing ? _rules_fast_routing : this->rules_fast_routing;
if (lock) {
pthread_rwlock_rdlock(&rwlock);
}
pthread_rwlock_rdlock(&rwlock);
if (rules_fast_routing) {
char keybuf[256];
char * keybuf_ptr = keybuf;
size_t keylen = strlen(u)+strlen(rand_del)+strlen(schemaname)+30; // 30 is a big number
if (keylen > 250) {
keybuf_ptr = (char *)malloc(keylen);
}
sprintf(keybuf_ptr,"%s%s%s---%d", username, rand_del, schemaname, flagIN);
khiter_t k = kh_get(khStrInt, rules_fast_routing, keybuf_ptr);
if (k == kh_end(rules_fast_routing)) {
} else {
ret = kh_val(rules_fast_routing,k);
}
if (ret == -1) { // we didn't find it
if (strlen(username)==0) { // we need to search for empty username
sprintf(keybuf_ptr,"%s%s---%d", rand_del, schemaname, flagIN); // no username here
khiter_t k = kh_get(khStrInt, rules_fast_routing, keybuf_ptr);
if (k == kh_end(rules_fast_routing)) {
} else {
ret = kh_val(rules_fast_routing,k);
}
}
}
if (keylen > 250) {
free(keybuf_ptr);
}
ret = search_rules_fast_routing_dest_hg(rules_fast_routing, username, schemaname, flagIN);
}
pthread_rwlock_unlock(&rwlock);
if (lock) {
pthread_rwlock_unlock(&rwlock);
}
return ret;
}

Loading…
Cancel
Save