diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 9f53bce2e..e4a4a6996 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -524,6 +524,7 @@ class MySQL_Threads_Handler int threshold_resultset_size; int query_digests_max_digest_length; int query_digests_max_query_length; + int query_rules_fast_routing_algorithm; int wait_timeout; int throttle_max_bytes_per_second_to_client; int throttle_ratio_server_to_client; diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 29f8ef25d..d4c6838eb 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -516,7 +516,7 @@ class ProxySQL_Admin { #endif // TEST_GROUPREP unsigned int ProxySQL_Test___GenerateRandom_mysql_query_rules_fast_routing(unsigned int, bool); - bool ProxySQL_Test___Verify_mysql_query_rules_fast_routing(int *ret1, int *ret2, int cnt, int dual); + bool ProxySQL_Test___Verify_mysql_query_rules_fast_routing(int *ret1, int *ret2, int cnt, int dual, int ths, bool lock, bool maps_per_thread); void ProxySQL_Test___MySQL_HostGroups_Manager_generate_many_clusters(); unsigned long long ProxySQL_Test___MySQL_HostGroups_Manager_read_only_action(); #ifdef DEBUG diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index afa9ee3a0..6783ddf83 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -837,6 +837,7 @@ __thread bool mysql_thread___query_digests_track_hostname; __thread bool mysql_thread___query_digests_keep_comment; __thread int mysql_thread___query_digests_max_digest_length; __thread int mysql_thread___query_digests_max_query_length; +__thread int mysql_thread___query_rules_fast_routing_algorithm; __thread bool mysql_thread___parse_failure_logs_digest; __thread int mysql_thread___show_processlist_extended; __thread int mysql_thread___session_idle_ms; @@ -1003,6 +1004,7 @@ extern __thread bool mysql_thread___query_digests_track_hostname; extern __thread bool mysql_thread___query_digests_keep_comment; extern __thread int mysql_thread___query_digests_max_digest_length; extern __thread int mysql_thread___query_digests_max_query_length; +extern __thread int mysql_thread___query_rules_fast_routing_algorithm; extern __thread bool mysql_thread___parse_failure_logs_digest; extern __thread int mysql_thread___show_processlist_extended; extern __thread int mysql_thread___session_idle_ms; diff --git a/include/query_processor.h b/include/query_processor.h index 5e17903c0..7a03fa25e 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -255,6 +255,33 @@ class Command_Counter { } }; +/** + * @brief Frees the supplied query rules and cleans the vector. + */ +void __reset_rules(std::vector* qrs); + +/** + * @brief Helper type for performing the 'mysql_rules_fast_routing' hashmaps creation. + * @details Holds all the info 'Query_Processor' requires about the hashmap. + */ +struct fast_routing_hashmap_t { + SQLite3_result* rules_resultset; + unsigned long long rules_resultset_size; + khash_t(khStrInt)* rules_fast_routing; + char* rules_fast_routing___keys_values; + unsigned long long rules_fast_routing___keys_values___size; +}; + +/** + * @brief Helper type for backing up 'query_rules' memory structures. + * @details Used when reinitializing the query rules. + */ +struct rules_mem_sts_t { + std::vector query_rules; + char* rules_fast_routing___keys_values; + khash_t(khStrInt)* rules_fast_routing; +}; + class Query_Processor { private: char rand_del[16]; @@ -268,6 +295,7 @@ class Query_Processor { khash_t(khStrInt) * rules_fast_routing; char * rules_fast_routing___keys_values; unsigned long long rules_fast_routing___keys_values___size; + unsigned long long rules_fast_routing___number; Command_Counter * commands_counters[MYSQL_COM_QUERY___NONE]; // firewall @@ -289,7 +317,7 @@ class Query_Processor { Query_Processor(); ~Query_Processor(); void print_version(); - void reset_all(bool lock=true); + rules_mem_sts_t reset_all(bool lock=true); void wrlock(); // explicit write lock, to be used in multi-insert void wrunlock(); // explicit write unlock bool insert(QP_rule_t *qr, bool lock=true); // insert a new rule. Uses a generic void pointer to a structure that may vary depending from the Query Processor @@ -342,12 +370,25 @@ class Query_Processor { // fast routing SQLite3_result * fast_routing_resultset; // here we save a copy of resultset for query rules fast routing - void load_fast_routing(SQLite3_result *resultset); + /** + * @brief Creates a hashmap for 'rules_fast_routing' from the provided resultset. + * @param resultset A resulset from which to create a hashmap. + * @return A hashmap encapsulated into the 'fast_routing_hashmap_t' type. + */ + fast_routing_hashmap_t create_fast_routing_hashmap(SQLite3_result* resultset); + /** + * @brief Swaps the current 'rules_fast_routing' hashmap, updating all the required related info. + * @details This function assumes caller has taken write access over '' + * @param fast_routing_hashmap New hashmap and info replacing current. + * @return Old 'fast_routing_resultset' that has been replaced. Required to be freed by caller. + */ + SQLite3_result* load_fast_routing(const fast_routing_hashmap_t& fast_routing_hashmap); + int search_rules_fast_routing_dest_hg(khash_t(khStrInt)* _rules_fast_routing, const char* u, const char* s, int flagIN); SQLite3_result * get_current_query_rules_fast_routing(); SQLite3_result * get_current_query_rules_fast_routing_inner(); int get_current_query_rules_fast_routing_count(); int testing___find_HG_in_mysql_query_rules_fast_routing(char *username, char *schemaname, int flagIN); - int testing___find_HG_in_mysql_query_rules_fast_routing_dual(char *username, char *schemaname, int flagIN); + int testing___find_HG_in_mysql_query_rules_fast_routing_dual(khash_t(khStrInt)* _rules_fast_routing, char *username, char *schemaname, int flagIN, bool lock); // firewall void load_mysql_firewall(SQLite3_result *u, SQLite3_result *r, SQLite3_result *sf); diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index f4df74679..fb0d785cd 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -509,6 +509,7 @@ static char * mysql_thread_variables_names[]= { (char *)"query_digests_max_query_length", (char *)"query_digests_grouping_limit", (char *)"query_digests_groups_grouping_limit", + (char *)"query_rules_fast_routing_algorithm", (char *)"wait_timeout", (char *)"throttle_max_bytes_per_second_to_client", (char *)"throttle_ratio_server_to_client", @@ -1119,6 +1120,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.threshold_resultset_size=4*1024*1024; variables.query_digests_max_digest_length=2*1024; variables.query_digests_max_query_length=65000; // legacy default + variables.query_rules_fast_routing_algorithm=1; variables.wait_timeout=8*3600*1000; variables.throttle_max_bytes_per_second_to_client=0; variables.throttle_ratio_server_to_client=0; @@ -2203,6 +2205,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["query_digests_groups_grouping_limit"] = make_tuple(&variables.query_digests_groups_grouping_limit, 0, 2089, false); VariablesPointers_int["query_digests_max_digest_length"] = make_tuple(&variables.query_digests_max_digest_length, 16, 1*1024*1024, false); VariablesPointers_int["query_digests_max_query_length"] = make_tuple(&variables.query_digests_max_query_length, 16, 1*1024*1024, false); + VariablesPointers_int["query_rules_fast_routing_algorithm"] = make_tuple(&variables.query_rules_fast_routing_algorithm, 1, 2, false); VariablesPointers_int["query_processor_iterations"] = make_tuple(&variables.query_processor_iterations, 0, 1000*1000, false); VariablesPointers_int["query_processor_regex"] = make_tuple(&variables.query_processor_regex, 1, 2, false); VariablesPointers_int["query_retries_on_failure"] = make_tuple(&variables.query_retries_on_failure, 0, 1000, false); @@ -3967,6 +3970,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___threshold_resultset_size=GloMTH->get_variable_int((char *)"threshold_resultset_size"); mysql_thread___query_digests_max_digest_length=GloMTH->get_variable_int((char *)"query_digests_max_digest_length"); mysql_thread___query_digests_max_query_length=GloMTH->get_variable_int((char *)"query_digests_max_query_length"); + mysql_thread___query_rules_fast_routing_algorithm=GloMTH->get_variable_int((char *)"query_rules_fast_routing_algorithm"); mysql_thread___wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout"); mysql_thread___throttle_max_bytes_per_second_to_client=GloMTH->get_variable_int((char *)"throttle_max_bytes_per_second_to_client"); mysql_thread___throttle_ratio_server_to_client=GloMTH->get_variable_int((char *)"throttle_ratio_server_to_client"); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 3169d1eac..ffd30e0d0 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -3891,10 +3891,12 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) { int test_n = 0; int test_arg1 = 0; int test_arg2 = 0; + int test_arg3 = -1; + int test_arg4 = -1; int r1 = 0; proxy_warning("Received PROXYSQLTEST command: %s\n", query_no_space); char *msg = NULL; - sscanf(query_no_space+strlen("PROXYSQLTEST "),"%d %d %d", &test_n, &test_arg1, &test_arg2); + sscanf(query_no_space+strlen("PROXYSQLTEST "),"%d %d %d %d %d", &test_n, &test_arg1, &test_arg2, &test_arg3, &test_arg4); if (test_n) { switch (test_n) { case 1: @@ -4002,13 +4004,28 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) { break; case 14: // old algorithm case 17: // perform dual lookup, with and without username - // verify all mysql_query_rules_fast_routing rules + // Allows to verify and benchmark 'mysql_query_rules_fast_routing'. Every options + // verifies all 'mysql_query_rules_fast_routing' rules: + // - Test num: 14 old algorithm, 17 perform a dual lookup. + // - arg1: 1-N Number of times the computation should be repeated. + // - arg2: 1-N Number of parallel threads for the test. + // - arg3: 1-0 Wether or not to acquire a read_lock before searching in the hashmap. + // - arg4: 1-0 Wether or not to create thread specific hashmaps for the search. if (test_arg1==0) { test_arg1=1; } + // To preserve classic mode + if (test_arg3 == -1) { + test_arg3 = 1; + } + if (test_arg4 == -1) { + test_arg4 = 0; + } { int ret1, ret2; - bool bret = SPA->ProxySQL_Test___Verify_mysql_query_rules_fast_routing(&ret1, &ret2, test_arg1, (test_n==14 ? 0 : 1)); + bool bret = SPA->ProxySQL_Test___Verify_mysql_query_rules_fast_routing( + &ret1, &ret2, test_arg1, (test_n==14 ? 0 : 1), test_arg2, test_arg3, test_arg4 + ); if (bret) { SPA->send_MySQL_OK(&sess->client_myds->myprot, (char *)"Verified all rules in mysql_query_rules_fast_routing", ret1); } else { @@ -7372,59 +7389,137 @@ bool ProxySQL_Admin::ProxySQL_Test___Load_MySQL_Whitelist(int *ret1, int *ret2, } // if dual is not 0 , we call the new search algorithm -bool ProxySQL_Admin::ProxySQL_Test___Verify_mysql_query_rules_fast_routing(int *ret1, int *ret2, int cnt, int dual) { +bool ProxySQL_Admin::ProxySQL_Test___Verify_mysql_query_rules_fast_routing( + int *ret1, int *ret2, int cnt, int dual, int ths, bool lock, bool maps_per_thread +) { + // A thread param of '0' is equivalent to not testing + if (ths == 0) { ths = 1; } char *q = (char *)"SELECT username, schemaname, flagIN, destination_hostgroup FROM mysql_query_rules_fast_routing ORDER BY RANDOM()"; - char *error=NULL; - int cols=0; - int affected_rows=0; - SQLite3_result *resultset=NULL; - int matching_rows = 0; + bool ret = true; - admindb->execute_statement(q, &error , &cols , &affected_rows , &resultset); - if (error) { - proxy_error("Error on %s : %s\n", q, error); - *ret1 = -1; - return false; - } else { - *ret2 = resultset->rows_count; + int matching_rows = 0; + + SQLite3_result *resultset=NULL; + { + char *error=NULL; + int cols=0; + int affected_rows=0; + admindb->execute_statement(q, &error , &cols , &affected_rows , &resultset); + + if (error) { + proxy_error("Error on %s : %s\n", q, error); + *ret1 = -1; + return false; + } + } + *ret2 = resultset->rows_count; + + char *query2=(char *)"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM main.mysql_query_rules_fast_routing ORDER BY username, schemaname, flagIN"; + SQLite3_result* resultset2 = nullptr; + + if (maps_per_thread) { + char* error2 = nullptr; + int cols2 = 0; + int affected_rows2 = 0; + admindb->execute_statement(query2, &error2 , &cols2 , &affected_rows2 , &resultset2); + + if (error2) { + proxy_error("Error on %s : %s\n", query2, error2); + return false; + } + } + + vector results(ths, 0); + vector th_hashmaps {}; + + if (maps_per_thread) { + for (uint32_t i = 0; i < ths; i++) { + th_hashmaps.push_back(GloQPro->create_fast_routing_hashmap(resultset2)); + } + } + + const auto perform_searches = + [&results,&dual](khash_t(khStrInt)* hashmap, SQLite3_result* resultset, uint32_t pos, bool lock) -> void + { + uint32_t matching_rows = 0; + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; int dest_HG = atoi(r->fields[3]); - int ret_HG; - if (dual==0) { - // legacy algorithm - ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing(r->fields[0], r->fields[1], atoi(r->fields[2])); + int ret_HG = -1; + if (dual) { + ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing_dual( + hashmap, r->fields[0], r->fields[1], atoi(r->fields[2]), lock + ); } else { - ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing_dual(r->fields[0], r->fields[1], atoi(r->fields[2])); + ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing( + r->fields[0], r->fields[1], atoi(r->fields[2]) + ); } + if (dest_HG == ret_HG) { matching_rows++; } } + + results[pos] = matching_rows; + }; + + proxy_info("Test with params - cnt: %d, threads: %d, lock: %d, maps_per_thread: %d\n", cnt, ths, lock, maps_per_thread); + + unsigned long long curtime1 = monotonic_time() / 1000; + std::vector workers {}; + + for (uint32_t i = 0; i < ths; i++) { + khash_t(khStrInt)* hashmap = maps_per_thread ? th_hashmaps[i].rules_fast_routing : nullptr; + workers.push_back(std::thread(perform_searches, hashmap, resultset, i, lock)); } - if (matching_rows != resultset->rows_count) { + + for (std::thread& w : workers) { + w.join(); + } + + matching_rows = results[0]; + if (matching_rows != resultset->rows_count) { ret = false; } *ret1 = matching_rows; + if (ret == true) { if (cnt > 1) { for (int i=1 ; i < cnt; i++) { - for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { - SQLite3_row *r=*it; - int dest_HG = atoi(r->fields[3]); - int ret_HG; - if (dual==0) { - // legacy algorithm - ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing(r->fields[0], r->fields[1], atoi(r->fields[2])); - } else { - ret_HG = GloQPro->testing___find_HG_in_mysql_query_rules_fast_routing_dual(r->fields[0], r->fields[1], atoi(r->fields[2])); - } - assert(dest_HG==ret_HG); + std::vector workers {}; + + for (uint32_t i = 0; i < ths; i++) { + khash_t(khStrInt)* hashmap = maps_per_thread ? th_hashmaps[i].rules_fast_routing : nullptr; + workers.push_back(std::thread(perform_searches, hashmap, resultset, i, lock)); + } + + for (std::thread& w : workers) { + w.join(); } } } } + + unsigned long long curtime2 = monotonic_time() / 1000; + uint32_t total_maps_size = 0; + + for (const fast_routing_hashmap_t& hashmap : th_hashmaps) { + total_maps_size += hashmap.rules_fast_routing___keys_values___size; + total_maps_size += kh_size(hashmap.rules_fast_routing) * ((sizeof(int) + sizeof(char *) + 4)); + + kh_destroy(khStrInt, hashmap.rules_fast_routing); + free(hashmap.rules_fast_routing___keys_values); + } + + proxy_info("Test took %llums\n", curtime2 - curtime1); + proxy_info("Verified rows %d\n", results[0]); + proxy_info("Total maps size %dkb\n", total_maps_size / 1024); + if (resultset) delete resultset; + if (resultset2) delete resultset2; + return ret; } @@ -8773,9 +8868,6 @@ void ProxySQL_Admin::stats___memory_metrics() { } if (GloQPro) { unsigned long long mu = GloQPro->get_rules_mem_used(); - if (GloMTH) { - mu += mu * GloMTH->num_threads; - } vn=(char *)"mysql_query_rules_memory"; sprintf(bu,"%llu",mu); query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16); @@ -12340,10 +12432,24 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_ } else if (error2) { proxy_error("Error on %s : %s\n", query2, error2); } else { - GloQPro->wrlock(); + fast_routing_hashmap_t fast_routing_hashmap { GloQPro->create_fast_routing_hashmap(resultset2) }; #ifdef BENCHMARK_FASTROUTING_LOAD for (int i=0; i<10; i++) { #endif // BENCHMARK_FASTROUTING_LOAD + // Computed resultsets checksums outside of critical sections + uint64_t hash1 = 0; + uint64_t hash2 = 0; + + if ( + SQLite3_query_rules_resultset == nullptr || + SQLite3_query_rules_fast_routing_resultset == nullptr + ) { + hash1 = resultset->raw_checksum(); + hash2 = resultset2->raw_checksum(); + } + + unsigned long long curtime1 = monotonic_time(); + GloQPro->wrlock(); if (checksum_variables.checksum_mysql_query_rules) { pthread_mutex_lock(&GloVars.checksum_mutex); char* buff = nullptr; @@ -12354,8 +12460,6 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_ SQLite3_query_rules_resultset == nullptr || SQLite3_query_rules_fast_routing_resultset == nullptr ) { - uint64_t hash1 = resultset->raw_checksum(); - uint64_t hash2 = resultset2->raw_checksum(); hash1 += hash2; uint32_t d32[2]; memcpy(&d32, &hash1, sizeof(hash1)); @@ -12393,7 +12497,7 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_ GloVars.checksums_values.mysql_query_rules.checksum, GloVars.checksums_values.mysql_query_rules.epoch ); } - GloQPro->reset_all(false); + rules_mem_sts_t prev_rules_data { GloQPro->reset_all(false) }; QP_rule_t * nqpr; for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; @@ -12462,13 +12566,30 @@ char* ProxySQL_Admin::load_mysql_query_rules_to_runtime(SQLite3_result* SQLite3_ #else // load the original resultset and resultset2 GloQPro->save_query_rules(resultset); - GloQPro->load_fast_routing(resultset2); + SQLite3_result* prev_fast_routing_resultset = GloQPro->load_fast_routing(fast_routing_hashmap); #endif // BENCHMARK_FASTROUTING_LOAD GloQPro->commit(); #ifdef BENCHMARK_FASTROUTING_LOAD } #endif // BENCHMARK_FASTROUTING_LOAD GloQPro->wrunlock(); + unsigned long long curtime2 = monotonic_time(); + unsigned long long elapsed_ms = (curtime2/1000) - (curtime1/1000); + if (elapsed_ms > 5) { + proxy_info("Query processor locked for %llums\n", curtime2 - curtime1); + } + + // Free previous 'fast_routing' structures outside of critical section + { + delete prev_fast_routing_resultset; + if (prev_rules_data.rules_fast_routing) { + kh_destroy(khStrInt, prev_rules_data.rules_fast_routing); + } + if (prev_rules_data.rules_fast_routing___keys_values) { + free(prev_rules_data.rules_fast_routing___keys_values); + } + __reset_rules(&prev_rules_data.query_rules); + } } // if (resultset) delete resultset; // never delete it. GloQPro saves it // if (resultset2) delete resultset2; // never delete it. GloQPro saves it diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index 22cc0367d..b9bedfadf 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -431,7 +431,7 @@ static void __delete_query_rule(QP_rule_t *qr) { // delete all the query rules in a Query Processor Table // Note that this function is called by GloQPro with &rules (generic table) // and is called by each mysql thread with _thr_SQP_rules (per thread table) -static void __reset_rules(std::vector * qrs) { +void __reset_rules(std::vector * qrs) { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Resetting rules in Query Processor Table %p\n", qrs); if (qrs==NULL) return; QP_rule_t *qr; @@ -546,7 +546,8 @@ Query_Processor::Query_Processor() { } query_rules_resultset = NULL; fast_routing_resultset = NULL; - rules_fast_routing = kh_init(khStrInt); // create a hashtable + // 'rules_fast_routing' structures created on demand + rules_fast_routing = nullptr; rules_fast_routing___keys_values = NULL; rules_fast_routing___keys_values___size = 0; new_req_conns_count = 0; @@ -555,7 +556,9 @@ Query_Processor::Query_Processor() { Query_Processor::~Query_Processor() { for (int i=0; i; - _thr_SQP_rules_fast_routing = kh_init(khStrInt); // create a hashtable + // per-thread 'rules_fast_routing' structures are created on demand + _thr_SQP_rules_fast_routing = nullptr; _thr___rules_fast_routing___keys_values = NULL; for (int i=0; irules.swap(hashmaps_data.query_rules); + if (rules_fast_routing) { - kh_destroy(khStrInt, rules_fast_routing); - rules_fast_routing = NULL; - rules_fast_routing = kh_init(khStrInt); // create a hashtable + hashmaps_data.rules_fast_routing = rules_fast_routing; + rules_fast_routing = nullptr; } - free(rules_fast_routing___keys_values); - rules_fast_routing___keys_values = NULL; - rules_fast_routing___keys_values___size = 0; + + if (rules_fast_routing___keys_values) { + hashmaps_data.rules_fast_routing___keys_values = rules_fast_routing___keys_values; + rules_fast_routing___keys_values = NULL; + rules_fast_routing___keys_values___size = 0; + } + if (lock) pthread_rwlock_unlock(&rwlock); rules_mem_used=0; + + return hashmaps_data; }; bool Query_Processor::insert(QP_rule_t *qr, bool lock) { @@ -914,6 +928,35 @@ SQLite3_result * Query_Processor::get_current_query_rules_fast_routing() { return result; } +int Query_Processor::search_rules_fast_routing_dest_hg( + khash_t(khStrInt)* _rules_fast_routing, const char* u, const char* s, int flagIN +) { + int dest_hg = -1; + char keybuf[256]; + char * keybuf_ptr = keybuf; + size_t keylen = strlen(u)+strlen(rand_del)+strlen(s)+30; // 30 is a big number + if (keylen > 250) { + keybuf_ptr = (char *)malloc(keylen); + } + sprintf(keybuf_ptr,"%s%s%s---%d", u, rand_del, s, flagIN); + khiter_t k = kh_get(khStrInt, _rules_fast_routing, keybuf_ptr); + if (k == kh_end(_rules_fast_routing)) { + sprintf(keybuf_ptr,"%s%s---%d", rand_del, s, flagIN); + khiter_t k2 = kh_get(khStrInt, _rules_fast_routing, keybuf_ptr); + if (k2 == kh_end(_rules_fast_routing)) { + } else { + dest_hg = kh_val(_rules_fast_routing,k2); + } + } else { + dest_hg = kh_val(_rules_fast_routing,k); + } + if (keylen > 250) { + free(keybuf_ptr); + } + + return dest_hg; +} + struct get_query_digests_parallel_args { unsigned long long ret; pthread_t thr; @@ -1703,23 +1746,39 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses _thr_SQP_rules->push_back(qr2); } } - kh_destroy(khStrInt, _thr_SQP_rules_fast_routing); - _thr_SQP_rules_fast_routing = kh_init(khStrInt); // create a hashtable - if (_thr___rules_fast_routing___keys_values) { - free(_thr___rules_fast_routing___keys_values); - _thr___rules_fast_routing___keys_values = NULL; - } - if (rules_fast_routing___keys_values___size) { - _thr___rules_fast_routing___keys_values = (char *)malloc(rules_fast_routing___keys_values___size); - memcpy(_thr___rules_fast_routing___keys_values, rules_fast_routing___keys_values, rules_fast_routing___keys_values___size); - char *ptr = _thr___rules_fast_routing___keys_values; - while (ptr < _thr___rules_fast_routing___keys_values + rules_fast_routing___keys_values___size) { - char *ptr2 = ptr+strlen(ptr)+1; - int destination_hostgroup = atoi(ptr2); - int ret; - khiter_t k = kh_put(khStrInt, _thr_SQP_rules_fast_routing, ptr, &ret); // add the key - kh_value(_thr_SQP_rules_fast_routing, k) = destination_hostgroup; // set the value of the key - ptr = ptr2+strlen(ptr2)+1; + if (mysql_thread___query_rules_fast_routing_algorithm == 1) { + if (_thr_SQP_rules_fast_routing) { + kh_destroy(khStrInt, _thr_SQP_rules_fast_routing); + _thr_SQP_rules_fast_routing = nullptr; + } + if (_thr___rules_fast_routing___keys_values) { + free(_thr___rules_fast_routing___keys_values); + _thr___rules_fast_routing___keys_values = NULL; + } + if (rules_fast_routing___keys_values___size) { + _thr_SQP_rules_fast_routing = kh_init(khStrInt); // create a hashtable + _thr___rules_fast_routing___keys_values = (char *)malloc(rules_fast_routing___keys_values___size); + memcpy(_thr___rules_fast_routing___keys_values, rules_fast_routing___keys_values, rules_fast_routing___keys_values___size); + rules_mem_used += rules_fast_routing___keys_values___size; // per-thread + char *ptr = _thr___rules_fast_routing___keys_values; + while (ptr < _thr___rules_fast_routing___keys_values + rules_fast_routing___keys_values___size) { + char *ptr2 = ptr+strlen(ptr)+1; + int destination_hostgroup = atoi(ptr2); + int ret; + khiter_t k = kh_put(khStrInt, _thr_SQP_rules_fast_routing, ptr, &ret); // add the key + kh_value(_thr_SQP_rules_fast_routing, k) = destination_hostgroup; // set the value of the key + rules_mem_used += ((sizeof(int) + sizeof(char *) + 4)); // not sure about memory overhead + ptr = ptr2+strlen(ptr2)+1; + } + } + } else { + if (_thr_SQP_rules_fast_routing) { + kh_destroy(khStrInt, _thr_SQP_rules_fast_routing); + _thr_SQP_rules_fast_routing = nullptr; + } + if (_thr___rules_fast_routing___keys_values) { + free(_thr___rules_fast_routing___keys_values); + _thr___rules_fast_routing___keys_values = nullptr; } } //for (std::unordered_map::iterator it = rules_fast_routing.begin(); it != rules_fast_routing.end(); ++it) { @@ -1993,30 +2052,23 @@ __exit_process_mysql_query: if (qr == NULL || qr->apply == false) { // now it is time to check mysql_query_rules_fast_routing // it is only check if "apply" is not true - if (_thr___rules_fast_routing___keys_values) { - char keybuf[256]; - char * keybuf_ptr = keybuf; - const char * u = sess->client_myds->myconn->userinfo->username; - const char * s = sess->client_myds->myconn->userinfo->schemaname; - size_t keylen = strlen(u)+strlen(rand_del)+strlen(s)+30; // 30 is a big number - if (keylen > 250) { - keybuf_ptr = (char *)malloc(keylen); - } - sprintf(keybuf_ptr,"%s%s%s---%d", u, rand_del, s, flagIN); - khiter_t k = kh_get(khStrInt, _thr_SQP_rules_fast_routing, keybuf_ptr); - if (k == kh_end(_thr_SQP_rules_fast_routing)) { - sprintf(keybuf_ptr,"%s%s---%d", rand_del, s, flagIN); - khiter_t k2 = kh_get(khStrInt, _thr_SQP_rules_fast_routing, keybuf_ptr); - if (k2 == kh_end(_thr_SQP_rules_fast_routing)) { - } else { - ret->destination_hostgroup = kh_val(_thr_SQP_rules_fast_routing,k2); - } - } else { - ret->destination_hostgroup = kh_val(_thr_SQP_rules_fast_routing,k); - } - if (keylen > 250) { - free(keybuf_ptr); - } + const char * u = sess->client_myds->myconn->userinfo->username; + const char * s = sess->client_myds->myconn->userinfo->schemaname; + + int dst_hg = -1; + + if (_thr_SQP_rules_fast_routing != nullptr) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 7, "Searching thread-local 'rules_fast_routing' hashmap with: user='%s', schema='%s', and flagIN='%d'\n", u, s, flagIN); + dst_hg = search_rules_fast_routing_dest_hg(_thr_SQP_rules_fast_routing, u, s, flagIN); + } else if (rules_fast_routing != nullptr) { + pthread_rwlock_rdlock(&this->rwlock); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 7, "Searching global 'rules_fast_routing' hashmap with: user='%s', schema='%s', and flagIN='%d'\n", u, s, flagIN); + dst_hg = search_rules_fast_routing_dest_hg(rules_fast_routing, u, s, flagIN); + pthread_rwlock_unlock(&this->rwlock); + } + + if (dst_hg != -1) { + ret->destination_hostgroup = dst_hg; } } // FIXME : there is too much data being copied around @@ -3029,11 +3081,15 @@ void Query_Processor::save_query_rules(SQLite3_result *resultset) { query_rules_resultset = resultset; // save it } -void Query_Processor::load_fast_routing(SQLite3_result *resultset) { - unsigned long long tot_size = 0; +fast_routing_hashmap_t Query_Processor::create_fast_routing_hashmap(SQLite3_result* resultset) { + khash_t(khStrInt)* fast_routing = nullptr; + char* keys_values = nullptr; + unsigned long long keys_values_size = 0; + size_t rand_del_size = strlen(rand_del); int num_rows = resultset->rows_count; if (num_rows) { + unsigned long long tot_size = 0; for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; size_t row_length = strlen(r->fields[0]) + strlen(r->fields[1]) + strlen(r->fields[2]) + strlen(r->fields[3]); @@ -3042,19 +3098,18 @@ void Query_Processor::load_fast_routing(SQLite3_result *resultset) { row_length += rand_del_size; tot_size += row_length; } - int nt = GloMTH->num_threads; - rules_fast_routing___keys_values = (char *)malloc(tot_size); - rules_fast_routing___keys_values___size = tot_size; - rules_mem_used += rules_fast_routing___keys_values___size; // global - rules_mem_used += rules_fast_routing___keys_values___size * nt; // per-thread - char *ptr = rules_fast_routing___keys_values; + keys_values = (char *)malloc(tot_size); + keys_values_size = tot_size; + char *ptr = keys_values; + fast_routing = kh_init(khStrInt); + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; sprintf(ptr,"%s%s%s---%s",r->fields[0],rand_del,r->fields[1],r->fields[2]); int destination_hostgroup = atoi(r->fields[3]); int ret; - khiter_t k = kh_put(khStrInt, rules_fast_routing, ptr, &ret); // add the key - kh_value(rules_fast_routing, k) = destination_hostgroup; // set the value of the key + khiter_t k = kh_put(khStrInt, fast_routing, ptr, &ret); // add the key + kh_value(fast_routing, k) = destination_hostgroup; // set the value of the key int l = strlen((const char *)ptr); ptr += l; ptr++; // NULL 1 @@ -3062,13 +3117,35 @@ void Query_Processor::load_fast_routing(SQLite3_result *resultset) { memcpy(ptr,r->fields[3],l+1); ptr += l; ptr++; // NULL 2 - rules_mem_used += ((sizeof(int) + sizeof(char *) + 4 )); // not sure about memory overhead - rules_mem_used += ((sizeof(int) + sizeof(char *) + 4 ) * nt); // per-thread . not sure about memory overhead } } - delete fast_routing_resultset; - fast_routing_resultset = resultset; // save it - rules_mem_used += fast_routing_resultset->get_size(); + + return { resultset, resultset->get_size(), fast_routing, keys_values, keys_values_size }; +} + +SQLite3_result* Query_Processor::load_fast_routing(const fast_routing_hashmap_t& fast_routing_hashmap) { + khash_t(khStrInt)* _rules_fast_routing = fast_routing_hashmap.rules_fast_routing; + SQLite3_result* _rules_resultset = fast_routing_hashmap.rules_resultset; + + if (_rules_fast_routing && _rules_resultset) { + // Replace map structures, assumed to be previously reset + this->rules_fast_routing___keys_values = fast_routing_hashmap.rules_fast_routing___keys_values; + this->rules_fast_routing___keys_values___size = fast_routing_hashmap.rules_fast_routing___keys_values___size; + this->rules_fast_routing = _rules_fast_routing; + // Update global memory stats + rules_mem_used += rules_fast_routing___keys_values___size; // global + khint_t map_size = kh_size(_rules_fast_routing); + rules_mem_used += map_size * ((sizeof(int) + sizeof(char *) + 4 )); // not sure about memory overhead + } + + // Backup current resultset for later freeing + SQLite3_result* prev_fast_routing_resultset = this->fast_routing_resultset; + // Save new resultset + fast_routing_resultset = _rules_resultset; + // Use resultset pre-computed size + rules_mem_used += fast_routing_hashmap.rules_resultset_size; + + return prev_fast_routing_resultset; }; // this testing function doesn't care if the user exists or not @@ -3100,44 +3177,22 @@ int Query_Processor::testing___find_HG_in_mysql_query_rules_fast_routing(char *u // this testing function implement the dual search: with and without username // if the length of username is 0 , it will search for random username (that shouldn't exist!) -int Query_Processor::testing___find_HG_in_mysql_query_rules_fast_routing_dual(char *username, char *schemaname, int flagIN) { +int Query_Processor::testing___find_HG_in_mysql_query_rules_fast_routing_dual( + khash_t(khStrInt)* _rules_fast_routing, char* username, char* schemaname, int flagIN, bool lock +) { int ret = -1; - const char * random_user = (char *)"my_ReaLLy_Rand_User_123456"; - char * u = NULL; - if (strlen(username)) { - u = username; - } else { - u = (char *)random_user; + khash_t(khStrInt)* rules_fast_routing = _rules_fast_routing ? _rules_fast_routing : this->rules_fast_routing; + + if (lock) { + pthread_rwlock_rdlock(&rwlock); } - pthread_rwlock_rdlock(&rwlock); if (rules_fast_routing) { - char keybuf[256]; - char * keybuf_ptr = keybuf; - size_t keylen = strlen(u)+strlen(rand_del)+strlen(schemaname)+30; // 30 is a big number - if (keylen > 250) { - keybuf_ptr = (char *)malloc(keylen); - } - sprintf(keybuf_ptr,"%s%s%s---%d", username, rand_del, schemaname, flagIN); - khiter_t k = kh_get(khStrInt, rules_fast_routing, keybuf_ptr); - if (k == kh_end(rules_fast_routing)) { - } else { - ret = kh_val(rules_fast_routing,k); - } - if (ret == -1) { // we didn't find it - if (strlen(username)==0) { // we need to search for empty username - sprintf(keybuf_ptr,"%s%s---%d", rand_del, schemaname, flagIN); // no username here - khiter_t k = kh_get(khStrInt, rules_fast_routing, keybuf_ptr); - if (k == kh_end(rules_fast_routing)) { - } else { - ret = kh_val(rules_fast_routing,k); - } - } - } - if (keylen > 250) { - free(keybuf_ptr); - } + ret = search_rules_fast_routing_dest_hg(rules_fast_routing, username, schemaname, flagIN); } - pthread_rwlock_unlock(&rwlock); + if (lock) { + pthread_rwlock_unlock(&rwlock); + } + return ret; }