diff --git a/test/tap/tap/utils.cpp b/test/tap/tap/utils.cpp index c7b124820..b20b28dac 100644 --- a/test/tap/tap/utils.cpp +++ b/test/tap/tap/utils.cpp @@ -187,6 +187,60 @@ my_bool mysql_stmt_close_override(MYSQL_STMT* stmt, const char* file, int line) #endif +long long binom_coeff(int n, int k) { + if (k > n) return 0; + if (k == 0 || k == n) return 1; + + long long res = 1; + + for (int i = 0; i < k; ++i) { + res *= (n - i); + res /= (i + 1); + } + + return res; +} + +long double prob_filled(int N, int M) { + if (N < M) return 0.0; + + double prob_empty = 0.0; + + for (int k = 1; k <= M; ++k) { + double binom = binom_coeff(M, k); + double base = (double(M) - k)/ double(M); + double term = binom * pow(base, double(N)); + + prob_empty += (k % 2 == 1 ? 1 : -1) * term; + } + + return 1.0 - prob_empty; +} + +int find_min_elems(double tg_prob, int M) { + int low = M, high = 20 * M, mid; + + while (low < high) { + mid = (low + high) / 2; + double prob = prob_filled(mid, M); + + if (prob < tg_prob) { + low = mid + 1; + } else { + high = mid; + } + } + + return low; +} + +string to_string(std::thread::id id) { + std::stringstream helper; + helper << id; + + return helper.str(); +} + pair> disable_core_nodes_scheduler(CommandLine& cl, MYSQL* admin) { vector nodes_conns {}; @@ -553,15 +607,15 @@ std::vector extract_mysql_rows(MYSQL_RES* my_res) { return result; }; -pair> mysql_query_ext_rows(MYSQL* mysql, const string& query) { +rc_t> mysql_query_ext_rows(MYSQL* mysql, const string& query) { int rc = mysql_query(mysql, query.c_str()); if (rc != EXIT_SUCCESS) { - return { mysql_errno(mysql), {} }; + return { static_cast(mysql_errno(mysql)), {} }; } MYSQL_RES* myres = mysql_store_result(mysql); if (myres == nullptr) { - return { mysql_errno(mysql), {} }; + return { static_cast(mysql_errno(mysql)), {} }; } const vector rows { extract_mysql_rows(myres) }; @@ -1475,13 +1529,47 @@ int open_file_and_seek_end(const string& f_path, std::fstream& f_stream) { return EXIT_SUCCESS; } -vector get_matching_lines(fstream& f_stream, const string& s_regex, bool get_matches) { +rc_t build_debug_entry(const sq3_row_t& row) { + if (row.size() < 12) { + return { -1, {} }; + } + + const auto str_to_uint64 = [](const std::string& str) -> uint64_t { + return str.empty() ? 0 : std::stoull(str); + }; + + const auto str_to_time_t = [](const string& str) -> time_t { + return str.empty() ? 0 : static_cast(std::stoll(str)); + }; + + return { 0, debug_entry_t { + str_to_uint64(row[0]), + str_to_time_t(row[1]), + str_to_uint64(row[2]), + str_to_uint64(row[3]), + row[4], + str_to_uint64(row[5]), + row[6], + str_to_uint64(row[7]), + row[8], + str_to_uint64(row[9]), + row[10], + row[11] + }}; +} + +pair> get_matching_lines( + fstream& f_stream, const string& s_regex, bool get_matches +) { vector found_matches {}; + size_t insp_lines { 0 }; string next_line {}; fstream::pos_type init_pos { f_stream.tellg() }; while (getline(f_stream, next_line)) { + insp_lines += 1; + re2::RE2 regex { s_regex }; re2::StringPiece match; @@ -1504,7 +1592,7 @@ vector get_matching_lines(fstream& f_stream, const string& s_regex f_stream.seekg(init_pos); } - return found_matches; + return { insp_lines, found_matches }; } const uint32_t USLEEP_SQLITE_LOCKED = 100; @@ -1536,7 +1624,7 @@ sq3_res_t sqlite3_execute_stmt(sqlite3* db, const string& query) { } while (rc==SQLITE_LOCKED || rc==SQLITE_BUSY); if (rc != SQLITE_OK) { - res = {{}, {}, {}, sqlite3_errmsg(db)}; + res = {{}, {}, {}, sqlite3_errcode(db)}; goto cleanup; } @@ -1556,7 +1644,7 @@ sq3_res_t sqlite3_execute_stmt(sqlite3* db, const string& query) { uint32_t affected_rows = sqlite3_changes(db); res = {{}, {}, affected_rows, {}}; } else { - res = {{}, {}, {}, sqlite3_errmsg(db)}; + res = {{}, {}, {}, sqlite3_errcode(db)}; goto cleanup; } } else { @@ -1597,6 +1685,31 @@ cleanup: return res; } +rc_t> sq3_get_debug_entries(sqlite3* db, const string& conds) { + const string query { + "SELECT id, time, lapse, thread, file, line, funct, modnum, modname, verbosity, message, note" + " FROM debug_log" + (conds.empty() ? "" : " WHERE 1=1 AND " + conds) + }; + + const sq3_res_t rows { sqlite3_execute_stmt(db, query) }; + const int sq3_err { std::get(rows) }; + + if (sq3_err) { + return { sq3_err, {} }; + } { + vector entries {}; + + for (const auto & row : std::get(rows)) { + auto [err, entry] = build_debug_entry(row); + assert((void("Received malformed row from 'debug_log' table"), err == 0)); + + entries.push_back(entry); + } + + return { 0, entries }; + } +} + json fetch_internal_session(MYSQL* proxy, bool verbose) { int rc = 0; diff --git a/test/tap/tap/utils.h b/test/tap/tap/utils.h index 29eeb4c73..cb1c16ca6 100644 --- a/test/tap/tap/utils.h +++ b/test/tap/tap/utils.h @@ -2,9 +2,13 @@ #define UTILS_H #include +#include #include +#include #include #include +#include +#include #include "curl/curl.h" #include "mysql.h" @@ -14,6 +18,9 @@ #include "command_line.h" #include "mysql.h" +template +using rc_t = std::pair; + // Improve dependency failure compilation error #ifndef DISABLE_WARNING_COUNT_LOGGING @@ -56,6 +63,38 @@ my_bool mysql_stmt_close_override(MYSQL_STMT* stmt, const char* file, int line); #endif +/** + * @brief Computes the binomial coefficient C(n, k) + */ +long long binom_coeff(int n, int k); + +/** + * @brief Computes the probability for all buckets has at least one elem. + * @param N Number of elements to place. + * @param M Number of buckets to fill. + * @return The computed probability. + */ +long double prob_filled(int N, int M); + +/** + * @brief Computes the inverse of 'prob_filled', finding the min number of elements so the target + * probability is reached. + * @details NOTE: The inverse of 'prob_filled' isn't trivial to compute, due to this, this method + * computes it numerically using a simple binary search. The search isn't optimized or built to work + * with a generic M, yet it reasonably fits its current use cases for small M values. + * @param tg_prob The target probability for which to find the number of elements. + * @param M The number of buckets in which to place the elements. + * @return Number of elements necessary to achieve the target probability. + */ +int find_min_elems(double tg_prob, int M); + +/** + * @brief Returns the string representation of an std::thread + * @param id Thread id to be converted to string. + * @return The string representation of the thread::id. + */ +std::string to_string(std::thread::id id); + /** * @brief Helper function to disable Core nodes scheduler from ProxySQL Cluster nodes. * @details In the CI environment, 'Scheduler' is used to induce extra load via Admin interface on @@ -144,6 +183,10 @@ int create_table_test_sbtest1(int num_rows, MYSQL *mysql); int create_table_test_sqlite_sbtest1(int num_rows, MYSQL *mysql); // as above, but for SQLite3 server int add_more_rows_test_sbtest1(int num_rows, MYSQL *mysql, bool sqlite=false); +/** + * @brief Returns the current monotonic time by 'clock_gettime'. + * @return Current monotonic time in microseconds (us). + */ unsigned long long monotonic_time(); using mysql_res_row = std::vector; @@ -164,7 +207,31 @@ std::vector extract_mysql_rows(MYSQL_RES* my_res); * * Query produces no resulset. * * An error takes place during query execution or resultset retrieval. */ -std::pair> mysql_query_ext_rows(MYSQL* mysql, const std::string& query); +rc_t> mysql_query_ext_rows(MYSQL* mysql, const std::string& query); + +using sq3_col_def_t = std::string; +using sq3_row_t = std::vector; +using sq3_err_t = int; +using sq3_res_t = std::tuple,std::vector,int64_t,sq3_err_t>; + +enum SQ3_RES_T { + SQ3_COLUMNS_DEF, + SQ3_ROWS, + SQ3_AFFECTED_ROWS, + SQ3_ERR +}; + +/** + * @brief Executes the provided query in the supplied sqlite3 db object. + * @param db Already initialized 'sqlite3' handler. + * @param query The query to be executed. + * @return An 'sq3_result_t' object holding the result, depending on the type of query and result, different + * fields will be populated, in case of success: + * - For DQL stmts COLUMN_DEF and ROWS will hold the columns definitions and the rows from the resultset. + * - For DML stmts the AFFECTED_ROWS will show the number of modified rows. + * In case of failure, ERR field will be populated and others will remain empty. + */ +sq3_res_t sqlite3_execute_stmt(sqlite3* db, const std::string& query); /** * @brief Holds the result of an `mysql_query_ext_val` operation. @@ -231,6 +298,32 @@ ext_val_t mysql_query_ext_val(MYSQL* mysql, const std::string& query, const T } } +template +ext_val_t sq3_query_ext_val(sqlite3* db, const std::string& query, const T& def_val) { + const auto& sq3_res { sqlite3_execute_stmt(db, query) }; + const auto& sq3_err { std::get(sq3_res) }; + const auto& sq3_rows { std::get(sq3_res) }; + + if (sq3_err) { + return { sq3_err, def_val }; + } else if (sq3_rows.empty()) { + return { -1, def_val }; + } else { + return ext_single_row_val(sq3_rows.front(), def_val); + } +} + +template +std::string sq3_get_ext_val_err(const ext_val_t& ext_val) { + if (ext_val.err == -1) { + return "Received invalid empty resultset/row"; + } else if (ext_val.err == -2) { + return "Failed to parse response value '" + ext_val.str + "'"; + } else { + return std::string { sqlite3_errstr(ext_val.err) }; + } +} + /** * @brief Extract the error from a `ext_val_t`. * @param mysql Already oppened MYSQL connection. @@ -630,10 +723,28 @@ enum LINE_MATCH_T { POS, LINE, MATCH }; * For example, regex '\d+' should become '(\d+)'. * @return All the lines found matching the regex. */ -std::vector get_matching_lines( +std::pair> get_matching_lines( std::fstream& f_stream, const std::string& regex, bool get_matches=false ); +/** + * @brief Row entries from 'debug_log' table, from debug database. + */ +struct debug_entry_t { + uint64_t id; + time_t time; + uint64_t lapse; + uint64_t thread; + std::string file; + uint64_t line; + std::string funct; + uint64_t modnum; + std::string modname; + uint64_t verbosity; + std::string message; + std::string note; +}; + /** * @brief Opens a sqlite3 db file located in the supplied path with the provided flags. * @param f_path Path to the 'db' file. @@ -642,30 +753,20 @@ std::vector get_matching_lines( * @return EXIT_SUCCESS in case of success, EXIT_FAILURE otherwise. Error cause is logged. */ int open_sqlite3_db(const std::string& f_path, sqlite3** db, int flags); - -using sq3_col_def_t = std::string; -using sq3_row_t = std::vector; -using sq3_err_t = std::string; -using sq3_res_t = std::tuple,std::vector,int64_t,sq3_err_t>; - -enum SQ3_RES_T { - SQ3_COLUMNS_DEF, - SQ3_ROWS, - SQ3_AFFECTED_ROWS, - SQ3_ERR -}; - /** - * @brief Executes the provided query in the supplied sqlite3 db object. - * @param db Already initialized 'sqlite3' handler. - * @param query The query to be executed. - * @return An 'sq3_result_t' object holding the result, depending on the type of query and result, different - * fields will be populated, in case of success: - * - For DQL stmts COLUMN_DEF and ROWS will hold the columns definitions and the rows from the resultset. - * - For DML stmts the AFFECTED_ROWS will show the number of modified rows. - * In case of failure, ERR field will be populated and others will remain empty. + * @brief Builds a debug_entry_t from an sq3_row_t from 'debug_log' table. + * @param row The row to map to a 'debug_entry_t' struct. + * @return A pair of kind `{err_code, debug_entry_t}`. */ -sq3_res_t sqlite3_execute_stmt(sqlite3* db, const std::string& query); +rc_t build_debug_entry(const sq3_row_t& row); +/** + * @brief Retrieves a list of debug entries matching the supplied conditions. + * @param db An already opened connection to a 'SQLite3' database. + * @param conds Conditions for the WHERE clause for row filtering, no filtering if empty. E.g: 'id > + * $timestamp AND file="foo.cpp"'. + * @return A pair of kind `{err_code, matched_rows}`. + */ +rc_t> sq3_get_debug_entries(sqlite3* db, const std::string& conds); /** * @brief If found returns the element index, -1 otherwise. @@ -741,6 +842,36 @@ using pool_state_t = std::map; * @return A pair of the shape {err_code, pool_state_t}. */ std::pair fetch_conn_stats(MYSQL* admin, const std::vector hgs); +/** + * @brief Waits for a generic condition. + * @details Wait finishes by a non-zero return code by the condition or by timeout. + * @param cond Condition to be evaluated at each wait interval. + * @param to_us Timeout at which to stop the wait, in microseconds. + * @param delay_us Delay between wait intervals, in microseconds. + * @return 0 for success, non-zero return code from 'cond' for failure, or INT_MIN for timeout. + */ +template +rc_t wait_for_cond(const std::function()>& cond, uint32_t to_us, uint32_t delay_us) { + int rc { 0 }; + T res {}; + uint32_t waited { 0 }; + + while (waited < to_us) { + std::tie(rc, res) = cond(); + + if (rc == 0) { + usleep(delay_us); + waited += delay_us; + } else if (rc < 0) { + break; + } else { + return { 0, res }; + } + } + + return { INT_MIN, res }; +} + /** * @brief Waits until the condition specified by the 'query' holds, or 'timeout' is reached. * @details Several details about the function impl: diff --git a/test/tap/tests/mysql_hostgroup_attributes-servers_defaults-t.cpp b/test/tap/tests/mysql_hostgroup_attributes-servers_defaults-t.cpp index 51d5300f9..6b4245f40 100644 --- a/test/tap/tests/mysql_hostgroup_attributes-servers_defaults-t.cpp +++ b/test/tap/tests/mysql_hostgroup_attributes-servers_defaults-t.cpp @@ -86,7 +86,7 @@ void check_matching_logline(fstream& f_log, string regex) { // Minimal wait for the error log to be written usleep(500 * 1000); - std::vector matching_lines { get_matching_lines(f_log, regex) }; + const auto& [__1, matching_lines ] { get_matching_lines(f_log, regex) }; for (const line_match_t& line_match : matching_lines) { diag( "Found matching logline - pos: %ld, line: `%s`", diff --git a/test/tap/tests/pgsql-copy_from_test-t.cpp b/test/tap/tests/pgsql-copy_from_test-t.cpp index 0c494aecb..65db67eef 100644 --- a/test/tap/tests/pgsql-copy_from_test-t.cpp +++ b/test/tap/tests/pgsql-copy_from_test-t.cpp @@ -281,7 +281,7 @@ int is_string_in_result(PGresult* result, const char* target_str) { } bool check_logs_for_command(std::fstream& f_proxysql_log, const std::string& command_regex) { - std::vector cmd_lines{ get_matching_lines(f_proxysql_log, command_regex) }; + const auto& [_, cmd_lines] { get_matching_lines(f_proxysql_log, command_regex) }; return cmd_lines.empty() ? false : true; } diff --git a/test/tap/tests/test_cluster_sync-t.cpp b/test/tap/tests/test_cluster_sync-t.cpp index 7d1e71501..168800d15 100644 --- a/test/tap/tests/test_cluster_sync-t.cpp +++ b/test/tap/tests/test_cluster_sync-t.cpp @@ -772,7 +772,7 @@ int check_module_checksums_sync( const string diff_check_regex { "Cluster: detected a peer .* with " + module + " version \\d+, epoch \\d+, diff_check \\d+." }; - vector new_matching_lines { get_matching_lines(logfile_fs, diff_check_regex) }; + const auto& [_, new_matching_lines] { get_matching_lines(logfile_fs, diff_check_regex) }; diag("regex used in `%s` to find loglines: `%s`", basename(logfile_path.c_str()), diff_check_regex.c_str()); for (const line_match_t& line_match : new_matching_lines) { @@ -801,7 +801,7 @@ int check_module_checksums_sync( " Not syncing due to '" + module_sync.sync_variable + "=0'" }; - vector new_matching_lines { get_matching_lines(logfile_fs, no_syncing_regex) }; + const auto& [_, new_matching_lines] { get_matching_lines(logfile_fs, no_syncing_regex) }; diag("regex used in `%s` to find loglines: `%s`", basename(logfile_path.c_str()), no_syncing_regex.c_str()); for (const line_match_t& line_match : new_matching_lines) { diff --git a/test/tap/tests/test_query_rules_fast_routing_algorithm-t.cpp b/test/tap/tests/test_query_rules_fast_routing_algorithm-t.cpp index ce0faa604..1dd13e5fb 100644 --- a/test/tap/tests/test_query_rules_fast_routing_algorithm-t.cpp +++ b/test/tap/tests/test_query_rules_fast_routing_algorithm-t.cpp @@ -12,9 +12,11 @@ #include #include #include +#include +#include +#include #include "mysql.h" -#include "mysqld_error.h" #include "tap.h" #include "utils.h" @@ -22,32 +24,40 @@ #include "json.hpp" +using nlohmann::json; + +using std::map; using std::pair; using std::string; -using nlohmann::json; using std::fstream; +using std::function; using std::vector; // Used for 'extract_module_host_port' #include "modules_server_test.h" -void parse_result_json_column(MYSQL_RES *result, json& j) { - if(!result) return; - MYSQL_ROW row; +//////////////////////////////////////////////////////////////////////////////// +// Borrowed from test_match_eof_conn_cap.cpp - TODO: MERGE +//////////////////////////////////////////////////////////////////////////////// - while ((row = mysql_fetch_row(result))) { - j = json::parse(row[0]); - } -} +#include -int extract_internal_session(MYSQL* proxy, nlohmann::json& j_internal_session) { - MYSQL_QUERY_T(proxy, "PROXYSQL INTERNAL SESSION"); - MYSQL_RES* myres = mysql_store_result(proxy); - parse_result_json_column(myres, j_internal_session); - mysql_free_result(myres); +#define _S(s) ( std::string {s} ) +#define _TO_S(s) ( std::to_string(s) ) - return EXIT_SUCCESS; -} +#define SELECT_RUNTIME_VAR "SELECT variable_value FROM runtime_global_variables WHERE variable_name=" + +#define CHECK_EXT_VAL(val)\ + do {\ + if (val.err) {\ + diag("%s:%d: Query failed err=\"%s\"", __func__, __LINE__, val.str.c_str());\ + return EXIT_FAILURE;\ + }\ + } while(0) + +const uint32_t USLEEP_SQLITE_LOCKED = 100; + +//////////////////////////////////////////////////////////////////////////////// int get_query_int_res(MYSQL* admin, const string& q, int& val) { MYSQL_QUERY_T(admin, q.c_str()); @@ -76,15 +86,17 @@ int get_query_int_res(MYSQL* admin, const string& q, int& val) { int extract_sess_qpo_dest_hg(MYSQL* proxy) { json j_internal_session {}; - int j_err = extract_internal_session(proxy, j_internal_session); - if (j_err) { - diag("Failed to extract and parse result from 'PROXYSQL INTERNAL SESSION'"); + try { + j_internal_session = fetch_internal_session(proxy); + } catch (const std::exception& e) { + diag("Failed to fetch 'PROXYSQL INTERNAL SESSION' exception=\"%s\"", e.what()); return -2; } int dest_hg = -2; try { dest_hg = j_internal_session["qpo"]["destination_hostgroup"]; + diag("Session information thread=%s", j_internal_session["thread"].dump().c_str()); } catch (const std::exception& e) { diag("Processing of 'PROXYSQL INTERNAL SESSION' failed with exc: %s", e.what()); return -2; @@ -142,7 +154,11 @@ string get_last_debug_log_id(sqlite3* sq3_db) { } int create_mysql_servers_range( - const CommandLine& cl, MYSQL* admin, const pair& host_port, uint32_t rng_init, uint32_t rng_end + const CommandLine& cl, + MYSQL* admin, + const pair& host_port, + uint32_t rng_init, + uint32_t rng_end ) { const string init { std::to_string(rng_init) }; const string end { std::to_string(rng_end) }; @@ -161,7 +177,7 @@ int create_mysql_servers_range( }; int create_fast_routing_rules_range( - const CommandLine& cl, MYSQL* admin, const pair& host_port, uint32_t rng_init, uint32_t rng_end + const CommandLine& cl, MYSQL* admin, uint32_t rng_init, uint32_t rng_end ) { const string init { std::to_string(rng_init) }; const string end { std::to_string(rng_end) }; @@ -183,49 +199,233 @@ int create_fast_routing_rules_range( return EXIT_SUCCESS; }; -int sq3_get_matching_msg_entries(sqlite3* db, const string& query_regex, const string& id) { - string init_db_query { - "SELECT COUNT() FROM debug_log WHERE message LIKE '%" + query_regex + "%' AND id > " + id +const char q_query_rules_mem_stats[] { + "SELECT variable_value FROM stats_memory_metrics WHERE variable_name='mysql_query_rules_memory'" +}; +const char SELECT_LAST_DEBUG_ID[] { "SELECT id FROM debug_log ORDER BY id DESC limit 1" }; + +int check_fast_routing_rules( + const CommandLine& cl, + MYSQL* proxy, + MYSQL* admin, + uint32_t rng_init, + uint32_t rng_end, + const string& algo, + fstream& errlog, + sqlite3* sq3_db +) { + // Seek end of file for error log + errlog.seekg(0, std::ios::end); + diag("Seek end of error log file pos=%lu", size_t(errlog.tellg())); + + diag("Flush debug logs to ensure getting the latest id"); + MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS"); + + diag("Getting last 'debug_log' entry id"); + ext_val_t last_id { sq3_query_ext_val(sq3_db, SELECT_LAST_DEBUG_ID, uint32_t(0)) }; + CHECK_EXT_VAL(last_id); + diag("Fetched last 'debug_log' entry id id=%d", last_id.val); + + // Check that fast_routing rules are properly working for the defined range + int rc = check_fast_routing_rules(proxy, rng_init, rng_end); + if (rc) { return EXIT_FAILURE; } + + const string init_search_regex { "Searching " + algo + " 'rules_fast_routing' hashmap" }; + auto [insp_lines, matched_lines] { get_matching_lines(errlog, init_search_regex) }; + diag( + "Inspected error log for matching lines pos=%lu insp_lines=%ld match_lines=%ld regex=\"%s\"", + size_t(errlog.tellg()), insp_lines, matched_lines.size(), init_search_regex.c_str() + ); + + ok( + matched_lines.size() == rng_end - rng_init, + "Number of '%s' entries in error log should match issued queries - Exp: %d, Act: %ld", + algo.c_str(), rng_end - rng_init, matched_lines.size() + ); + + const function()> check_sq3_entries { + [&] () -> pair { + const string dbg_msg { + "Searching " + algo + " ''rules_fast_routing'' hashmap % schema=''test" + }; + const string select_count { + "SELECT COUNT() FROM debug_log WHERE" + " id > " + _TO_S(last_id.val) + " AND message LIKE '%" + dbg_msg + "%'" + }; + ext_val_t entries { sq3_query_ext_val(sq3_db, select_count, int64_t(-1))}; + if (entries.err) { + const string err { get_ext_val_err(admin, entries) }; + diag("%s:%d: Query failed err=\"%s\"", __func__, __LINE__, err.c_str()); + return { -1, 0 }; + } + + diag( + "Checking matched entries entries=%ld last_debug_log_id=%d query=\"%s\"", + entries.val, last_id.val, select_count.c_str() + ); + + if (entries.val > 0) { + return { entries.val == (rng_end - rng_init), entries.val }; + } else { + return { entries.val, 0 }; + } + } }; - sq3_res_t sq3_entries_res { sqlite3_execute_stmt(db, init_db_query) }; - const string& sq3_err { std::get(sq3_entries_res) }; - if (!sq3_err.empty()) { - diag("Query failed to be executed in SQLite3 - query: `%s`, err: `%s`", init_db_query.c_str(), sq3_err.c_str()); - return EXIT_FAILURE; + + diag("Flush debug logs before fetching expected entries"); + MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS"); + + auto [wait_res, matching_rows] = wait_for_cond(check_sq3_entries, 1000*1000, 250*1000); + if (wait_res != 0) { exit(1); } + + ok( + wait_res == 0, + "Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %d", + algo.c_str(), rng_end - rng_init, matching_rows + ); + + return EXIT_SUCCESS; +} + +const string PROXYSQL_QLOG_DIR { get_env_str("REGULAR_INFRA_DATADIR", "/tmp/datadir") }; + +int threads_warmup(const CommandLine& cl, MYSQL* admin, sqlite3* sq3_db) { + ext_val_t mysql_threads { + mysql_query_ext_val(admin, + "SELECT variable_value FROM global_variables WHERE variable_name='mysql-threads'", 0 + ) + }; + CHECK_EXT_VAL(mysql_threads); + + const ext_val_t qlog_fname { + mysql_query_ext_val(admin, SELECT_RUNTIME_VAR"'mysql-eventslog_filename'", _S("query.log")) + }; + CHECK_EXT_VAL(qlog_fname); + const string PROXYSQL_AUDIT_LOG { PROXYSQL_QLOG_DIR + "/" + qlog_fname.str }; + + diag("Flush debug logs to ensure getting the latest id"); + MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS"); + + diag("Getting last 'debug_log' entry id"); + ext_val_t last_id { sq3_query_ext_val(sq3_db, SELECT_LAST_DEBUG_ID, uint32_t(0)) }; + CHECK_EXT_VAL(last_id); + diag("Fetched last 'debug_log' entry id id=%d", last_id.val); + + int conns { find_min_elems(1.0 - pow(10, -6), mysql_threads.val) /* * 2 */ }; + double th_conns { ceil(double(conns) / mysql_threads.val) }; + + diag( + "Performing warm-up queries queries=%d th_queries=%lf workers=%d", + conns, th_conns, mysql_threads.val + ); + vector workers {}; + + pthread_barrier_t bar; + pthread_barrier_init(&bar, NULL, mysql_threads.val); + + for (int i = 0; i < mysql_threads.val; i++) { + workers.emplace_back(std::thread([&cl, th_conns, i, &bar] () { + pthread_barrier_wait(&bar); + + const string th_id { to_string(std::this_thread::get_id()) }; + auto curtime = monotonic_time(); + diag("Worker starting warm-up conn time=%lld thread_id=%s", curtime, th_id.c_str()); + + for (int j = 0; j < th_conns; j++) { + MYSQL* proxy = mysql_init(NULL); + + if (!mysql_real_connect(proxy, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy)); + return EXIT_FAILURE; + } + + const string query { rand() % 2 ? "SELECT 1" : "SELECT 2" }; + diag("Issuing query from worker thread_id=%s query=\"%s\"", th_id.c_str(), query.c_str()); + MYSQL_QUERY_T(proxy, query.c_str()); + mysql_free_result(mysql_store_result(proxy)); + + mysql_close(proxy); + } + + return 0; + })); } - const vector& sq3_rows { std::get(sq3_entries_res) }; - int32_t matching_rows = std::atoi(sq3_rows[0][0].c_str()); + for (std::thread& w : workers) { + w.join(); + } - return matching_rows; -}; + pthread_barrier_destroy(&bar); + + diag("Flush debug logs before fetching thread dist entries"); + MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS"); + + const auto [_, matched_entries] { + sq3_get_debug_entries(sq3_db, + "id > " + _TO_S(last_id.val) + " AND file='MySQL_Session.cpp'" + " AND message LIKE '%Processing received query%'" + ) + }; + + std::map threads_ids {}; + + for (const debug_entry_t& dbg_entry : matched_entries) { + threads_ids[dbg_entry.thread] += 1; + } + + diag("Thread query distribution:\n%s", nlohmann::json(threads_ids).dump(4).c_str()); + + ok( + threads_ids.size() == mysql_threads.val + 1, + "Each thread processed at least one warm-up query threads_ids=%ld mysql_threads=%d", + threads_ids.size(), mysql_threads.val + 1 + ); + + vector> sq3_ths {}; + std::copy_if(threads_ids.begin(), threads_ids.end(), std::back_inserter(sq3_ths), + [] (const pair& e) -> bool { return e.second == 1; } + ); + + ok( + sq3_ths.size() == 1, + "Only one SQLite3 thread spawned to process 'FLUSH LOGS' sq3_ths=[%s]", + nlohmann::json(sq3_ths).dump(-1).c_str() + ); + + return EXIT_FAILURE; +} int test_fast_routing_algorithm( - const CommandLine& cl, MYSQL* admin, MYSQL* proxy, const pair& host_port, fstream& errlog, + const CommandLine& cl, + MYSQL* admin, + MYSQL* proxy, + const pair& host_port, + fstream& errlog, int init_algo, int new_algo ) { uint32_t rng_init = 1000; uint32_t rng_end = 1020; - const char query_rules_mem_stats_query[] { - "SELECT variable_value FROM stats_memory_metrics WHERE variable_name='mysql_query_rules_memory'" - }; // Enable Admin debug, set debug_output to log and DB, and increase verbosity for Query_Processor MYSQL_QUERY_T(admin, "SET admin-debug=1"); MYSQL_QUERY_T(admin, "SET admin-debug_output=3"); MYSQL_QUERY_T(admin, "LOAD ADMIN VARIABLES TO RUNTIME"); MYSQL_QUERY_T(admin, "UPDATE debug_levels SET verbosity=7 WHERE module='debug_mysql_query_processor'"); - // If there is a generic debug filter (line==0) on Query_Processor.cpp , process_mysql_query() , disable it. - // If the filter was present it will be automatically recreated by the tester. + + // Remove **generic debug filters** (line==0) relevant for the test. MYSQL_QUERY_T(admin, "DELETE FROM debug_filters WHERE filename='Query_Processor.cpp' AND line=0 AND funct='process_mysql_query'"); + MYSQL_QUERY_T(admin, "DELETE FROM debug_filters WHERE filename='MySQL_Session.cpp' AND line=0 AND funct='get_pkts_from_client'"); + // If the filter was present it will be automatically recreated by the tester. MYSQL_QUERY_T(admin, "LOAD DEBUG TO RUNTIME"); // Open the SQLite3 db for debugging const string db_path { get_env("REGULAR_INFRA_DATADIR") + "/proxysql_debug.db" }; sqlite3* sq3_db = nullptr; - int odb_err = open_sqlite3_db(db_path, &sq3_db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE); + int odb_err = open_sqlite3_db(db_path, &sq3_db, SQLITE_OPEN_READONLY); if (odb_err) { return EXIT_FAILURE; } + char* prg_err { nullptr }; + int c_err = create_mysql_servers_range(cl, admin, host_port, rng_init, rng_end); if (c_err) { return EXIT_FAILURE; } MYSQL_QUERY_T(admin, "LOAD MYSQL SERVERS TO RUNTIME"); @@ -239,162 +439,96 @@ int test_fast_routing_algorithm( MYSQL_QUERY_T(admin, "DELETE FROM mysql_query_rules_fast_routing"); MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME"); - int init_rules_mem_stats = -1; - int get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, init_rules_mem_stats); - if (get_mem_stats_err) { return EXIT_FAILURE; } - diag("Initial 'mysql_query_rules_memory' of '%d'", init_rules_mem_stats); + ext_val_t init_mem_stats { mysql_query_ext_val(admin, q_query_rules_mem_stats, -1) }; + CHECK_EXT_VAL(init_mem_stats); + diag("Initial 'mysql_query_rules_memory' of '%d'", init_mem_stats.val); - // Check that fast_routing rules are being properly triggered - c_err = create_fast_routing_rules_range(cl, admin, host_port, rng_init, rng_end); + c_err = create_fast_routing_rules_range(cl, admin, rng_init, rng_end); if (c_err) { return EXIT_FAILURE; } MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME"); - usleep(1000*1000); - - // Seek end of file for error log - errlog.seekg(0, std::ios::end); - // Get current last id from debug db - string last_debug_log_id { get_last_debug_log_id(sq3_db) }; - if (last_debug_log_id.empty()) { return EXIT_FAILURE; } - - // Check that fast_routing rules are properly working for the defined range - check_fast_routing_rules(proxy, rng_init, rng_end); - - // Give some time for the error log and SQLite3 to be written - usleep(100*1000); - - const string init_algo_scope { init_algo == 1 ? "thread-local" : "global" }; - const string init_search_regex { "Searching " + init_algo_scope + " 'rules_fast_routing' hashmap" }; - vector matched_lines { get_matching_lines(errlog, init_search_regex) }; + diag("Warm-up threads to ensure the QueryProcessor maps are build"); + threads_warmup(cl, admin, sq3_db); - ok( - matched_lines.size() == rng_end - rng_init, - "Number of '%s' searchs in error log should match issued queries - Exp: %d, Act: %ld", - init_algo_scope.c_str(), rng_end - rng_init, matched_lines.size() - ); - - const string sq3_query_regex { "Searching " + init_algo_scope + " ''rules_fast_routing'' hashmap" }; - int matching_rows = sq3_get_matching_msg_entries(sq3_db, sq3_query_regex, last_debug_log_id); - - ok( - matching_rows == rng_end - rng_init, - "Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %ld", - init_algo_scope.c_str(), rng_end - rng_init, matched_lines.size() - ); + // Check that fast_routing rules are being properly triggered + const string init_algo_str { init_algo == 1 ? "thread-local" : "global" }; + int chk_res = check_fast_routing_rules(cl, proxy, admin, rng_init, rng_end, init_algo_str, errlog, sq3_db); + if (chk_res) { return EXIT_FAILURE; } printf("\n"); - int old_mem_stats = -1; - get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, old_mem_stats); - if (get_mem_stats_err) { return EXIT_FAILURE; } + ext_val_t old_mem_stats { mysql_query_ext_val(admin, q_query_rules_mem_stats, -1) }; + CHECK_EXT_VAL(old_mem_stats); - // Changing the algorithm shouldn't have any effect + diag("*ONLY* Changing the algorithm shouldn't have any effect"); diag("Testing 'query_rules_fast_routing_algorithm=%d'", new_algo); MYSQL_QUERY_T(admin, ("SET mysql-query_rules_fast_routing_algorithm=" + std::to_string(new_algo)).c_str()); MYSQL_QUERY_T(admin, "LOAD MYSQL VARIABLES TO RUNTIME"); - usleep(1000*1000); - - // Seek end of file for error log - errlog.seekg(0, std::ios::end); - // Get current last id from debug db - last_debug_log_id = get_last_debug_log_id(sq3_db); - if (last_debug_log_id.empty()) { return EXIT_FAILURE; } - - diag("Search should still be performed 'per-thread'. Only variable has changed."); - check_fast_routing_rules(proxy, rng_init, rng_end); - - // Give some time for the error log to be written - usleep(100*1000); - - matched_lines = get_matching_lines(errlog, init_search_regex); - - ok( - matching_rows == rng_end - rng_init, - "Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %ld", - init_algo_scope.c_str(), rng_end - rng_init, matched_lines.size() - ); - - matching_rows = sq3_get_matching_msg_entries(sq3_db, sq3_query_regex, last_debug_log_id); - - ok( - matched_lines.size() == rng_end - rng_init, - "Number of 'thread-local' searchs in error log should match issued queries - Exp: %d, Act: %ld", - rng_end - rng_init, matched_lines.size() - ); + // Check that fast_routing rules are being properly triggered + chk_res = check_fast_routing_rules(cl, proxy, admin, rng_init, rng_end, init_algo_str, errlog, sq3_db); + if (chk_res) { return EXIT_FAILURE; } + printf("\n"); - int new_mem_stats = -1; - get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, new_mem_stats); - if (get_mem_stats_err) { return EXIT_FAILURE; } + ext_val_t new_mem_stats { mysql_query_ext_val(admin, q_query_rules_mem_stats, -1) }; + CHECK_EXT_VAL(new_mem_stats); diag("Memory SHOULDN'T have changed just because of a variable change"); ok( - old_mem_stats - init_rules_mem_stats == new_mem_stats - init_rules_mem_stats, + old_mem_stats.val - init_mem_stats.val == new_mem_stats.val - init_mem_stats.val, "Memory stats shouldn't increase just by the variable change - old: %d, new: %d", - old_mem_stats - init_rules_mem_stats, new_mem_stats - init_rules_mem_stats + old_mem_stats.val - init_mem_stats.val, new_mem_stats.val - init_mem_stats.val ); - printf("\n"); MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME"); - diag("Search should now be using the per thread-maps"); - - // Seek end of file for error log - errlog.seekg(0, std::ios::end); - check_fast_routing_rules(proxy, rng_init, rng_end); - // Give some time for the error log to be written - usleep(100*1000); + diag("Warm-up threads to ensure the QueryProcessor maps are build"); + threads_warmup(cl, admin, sq3_db); - const string new_algo_scope { new_algo == 1 ? "thread-local" : "global" }; - const string new_search_regex { "Searching " + new_algo_scope + " 'rules_fast_routing' hashmap" }; - vector new_matched_lines { get_matching_lines(errlog, new_search_regex) }; + const string new_algo_str { new_algo == 1 ? "thread-local" : "global" }; + diag("Search should now be using the '%s' maps", new_algo_str.c_str()); + chk_res = check_fast_routing_rules(cl, proxy, admin, rng_init, rng_end, new_algo_str, errlog, sq3_db); + if (chk_res) { return EXIT_FAILURE; } - ok( - new_matched_lines.size() == rng_end - rng_init, - "Number of '%s' searchs in error log should match issued queries - Exp: %d, Act: %ld", - new_algo_scope.c_str(), rng_end - rng_init, new_matched_lines.size() - ); - - const string new_sq3_query_regex { "Searching " + new_algo_scope + " ''rules_fast_routing'' hashmap" }; - int new_matching_rows = sq3_get_matching_msg_entries(sq3_db, sq3_query_regex, last_debug_log_id); - - ok( - new_matching_rows == rng_end - rng_init, - "Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %d", - new_algo_scope.c_str(), rng_end - rng_init, new_matching_rows - ); - - get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, new_mem_stats); - if (get_mem_stats_err) { return EXIT_FAILURE; } + new_mem_stats = mysql_query_ext_val(admin, q_query_rules_mem_stats, -1); + CHECK_EXT_VAL(new_mem_stats); bool mem_check_res = false; string exp_change { "" }; + const auto old_new_diff { old_mem_stats.val - init_mem_stats.val }; + const auto new_init_diff { new_mem_stats.val - init_mem_stats.val }; + if (init_algo == 1 && new_algo == 2) { - mem_check_res = (old_mem_stats - init_rules_mem_stats) > (new_mem_stats - init_rules_mem_stats); + mem_check_res = old_new_diff > new_init_diff; exp_change = "decrease"; } else if (init_algo == 2 && new_algo == 1) { - mem_check_res = (old_mem_stats - init_rules_mem_stats) < (new_mem_stats - init_rules_mem_stats); + mem_check_res = old_new_diff < new_init_diff; exp_change = "increase"; } else { - mem_check_res = (old_mem_stats - init_rules_mem_stats) == (new_mem_stats - init_rules_mem_stats); + mem_check_res = old_new_diff == new_init_diff; exp_change = "not change"; } ok( mem_check_res, "Memory stats should %s after 'LOAD MYSQL QUERY RULES TO RUNTIME' - old: %d, new: %d", - exp_change.c_str(), (old_mem_stats - init_rules_mem_stats), (new_mem_stats - init_rules_mem_stats) + exp_change.c_str(), old_new_diff, new_init_diff ); + sqlite3_close_v2(sq3_db); + return EXIT_SUCCESS; }; int main(int argc, char** argv) { - // `5` logic checks + 20*3 checks per query rule, per test - plan((8 + 20*3) * 2); + // `12` logic checks + 20*3 checks per query rule, per test + plan((2*3 + 2 + 2*2 + 20*3) * 2); CommandLine cl; + // Seed the random-number gen + std::srand(std::time(NULL)); + if (cl.getEnv()) { diag("Failed to get the required environmental variables."); return EXIT_FAILURE; diff --git a/test/tap/tests/test_sqlite3_server-t.cpp b/test/tap/tests/test_sqlite3_server-t.cpp index a9e2b06f9..b2cbab02e 100644 --- a/test/tap/tests/test_sqlite3_server-t.cpp +++ b/test/tap/tests/test_sqlite3_server-t.cpp @@ -237,13 +237,13 @@ std::vector sqlite_intf_queries { int check_errorlog_for_addrinuse(MYSQL* admin, fstream& logfile) { const string command_regex { ".*\\[INFO\\] Received LOAD SQLITESERVER VARIABLES (FROM DISK|TO RUNTIME) command" }; - std::vector cmd_lines { get_matching_lines(logfile, command_regex) }; + const auto& [__1, cmd_lines] { get_matching_lines(logfile, command_regex) }; // NOTE: Delay for poll_timeout for SQLite3_Server - harcoded 500ms usleep(1000 * 1000); const string bind_err_regex { ".*\\[ERROR\\] bind\\(\\): Address already in use" }; - std::vector err_lines { get_matching_lines(logfile, bind_err_regex) }; + const auto& [__2, err_lines ] { get_matching_lines(logfile, bind_err_regex) }; if (cmd_lines.empty()) { diag("ERROR: Commands 'LOAD SQLITESERVER' not logged as expected");