Several fixes for 'test_query_rules_fast_routing_algorithm-t.cpp'

- Fixed issue with SQLite3 helper functions. Invalid use of
  'sqlite3_step' lead to fixing issue in ProxySQL itself.
- Fixed logic issues on tests checking duplicated conditions.
- Improved waiting logic checking for new entries in debug database.
- Added new utility functions for SQLite3 databases access (debug
  database) following MySQL utilities conventions.
- Simplified interface in several utilities functions with new, and more
  explicit naming convention.
pull/4833/head
Javier Jaramago Fernández 1 year ago
parent 611ea853ed
commit a247d13d76

@ -187,6 +187,60 @@ my_bool mysql_stmt_close_override(MYSQL_STMT* stmt, const char* file, int line)
#endif
long long binom_coeff(int n, int k) {
if (k > n) return 0;
if (k == 0 || k == n) return 1;
long long res = 1;
for (int i = 0; i < k; ++i) {
res *= (n - i);
res /= (i + 1);
}
return res;
}
long double prob_filled(int N, int M) {
if (N < M) return 0.0;
double prob_empty = 0.0;
for (int k = 1; k <= M; ++k) {
double binom = binom_coeff(M, k);
double base = (double(M) - k)/ double(M);
double term = binom * pow(base, double(N));
prob_empty += (k % 2 == 1 ? 1 : -1) * term;
}
return 1.0 - prob_empty;
}
int find_min_elems(double tg_prob, int M) {
int low = M, high = 20 * M, mid;
while (low < high) {
mid = (low + high) / 2;
double prob = prob_filled(mid, M);
if (prob < tg_prob) {
low = mid + 1;
} else {
high = mid;
}
}
return low;
}
string to_string(std::thread::id id) {
std::stringstream helper;
helper << id;
return helper.str();
}
pair<int,vector<MYSQL*>> disable_core_nodes_scheduler(CommandLine& cl, MYSQL* admin) {
vector<MYSQL*> nodes_conns {};
@ -553,15 +607,15 @@ std::vector<mysql_res_row> extract_mysql_rows(MYSQL_RES* my_res) {
return result;
};
pair<uint32_t,vector<mysql_res_row>> mysql_query_ext_rows(MYSQL* mysql, const string& query) {
rc_t<vector<mysql_res_row>> mysql_query_ext_rows(MYSQL* mysql, const string& query) {
int rc = mysql_query(mysql, query.c_str());
if (rc != EXIT_SUCCESS) {
return { mysql_errno(mysql), {} };
return { static_cast<int>(mysql_errno(mysql)), {} };
}
MYSQL_RES* myres = mysql_store_result(mysql);
if (myres == nullptr) {
return { mysql_errno(mysql), {} };
return { static_cast<int>(mysql_errno(mysql)), {} };
}
const vector<mysql_res_row> rows { extract_mysql_rows(myres) };
@ -1475,13 +1529,47 @@ int open_file_and_seek_end(const string& f_path, std::fstream& f_stream) {
return EXIT_SUCCESS;
}
vector<line_match_t> get_matching_lines(fstream& f_stream, const string& s_regex, bool get_matches) {
rc_t<debug_entry_t> build_debug_entry(const sq3_row_t& row) {
if (row.size() < 12) {
return { -1, {} };
}
const auto str_to_uint64 = [](const std::string& str) -> uint64_t {
return str.empty() ? 0 : std::stoull(str);
};
const auto str_to_time_t = [](const string& str) -> time_t {
return str.empty() ? 0 : static_cast<time_t>(std::stoll(str));
};
return { 0, debug_entry_t {
str_to_uint64(row[0]),
str_to_time_t(row[1]),
str_to_uint64(row[2]),
str_to_uint64(row[3]),
row[4],
str_to_uint64(row[5]),
row[6],
str_to_uint64(row[7]),
row[8],
str_to_uint64(row[9]),
row[10],
row[11]
}};
}
pair<size_t,vector<line_match_t>> get_matching_lines(
fstream& f_stream, const string& s_regex, bool get_matches
) {
vector<line_match_t> found_matches {};
size_t insp_lines { 0 };
string next_line {};
fstream::pos_type init_pos { f_stream.tellg() };
while (getline(f_stream, next_line)) {
insp_lines += 1;
re2::RE2 regex { s_regex };
re2::StringPiece match;
@ -1504,7 +1592,7 @@ vector<line_match_t> get_matching_lines(fstream& f_stream, const string& s_regex
f_stream.seekg(init_pos);
}
return found_matches;
return { insp_lines, found_matches };
}
const uint32_t USLEEP_SQLITE_LOCKED = 100;
@ -1536,7 +1624,7 @@ sq3_res_t sqlite3_execute_stmt(sqlite3* db, const string& query) {
} while (rc==SQLITE_LOCKED || rc==SQLITE_BUSY);
if (rc != SQLITE_OK) {
res = {{}, {}, {}, sqlite3_errmsg(db)};
res = {{}, {}, {}, sqlite3_errcode(db)};
goto cleanup;
}
@ -1556,7 +1644,7 @@ sq3_res_t sqlite3_execute_stmt(sqlite3* db, const string& query) {
uint32_t affected_rows = sqlite3_changes(db);
res = {{}, {}, affected_rows, {}};
} else {
res = {{}, {}, {}, sqlite3_errmsg(db)};
res = {{}, {}, {}, sqlite3_errcode(db)};
goto cleanup;
}
} else {
@ -1597,6 +1685,31 @@ cleanup:
return res;
}
rc_t<vector<debug_entry_t>> sq3_get_debug_entries(sqlite3* db, const string& conds) {
const string query {
"SELECT id, time, lapse, thread, file, line, funct, modnum, modname, verbosity, message, note"
" FROM debug_log" + (conds.empty() ? "" : " WHERE 1=1 AND " + conds)
};
const sq3_res_t rows { sqlite3_execute_stmt(db, query) };
const int sq3_err { std::get<SQ3_RES_T::SQ3_ERR>(rows) };
if (sq3_err) {
return { sq3_err, {} };
} {
vector<debug_entry_t> entries {};
for (const auto & row : std::get<SQ3_RES_T::SQ3_ROWS>(rows)) {
auto [err, entry] = build_debug_entry(row);
assert((void("Received malformed row from 'debug_log' table"), err == 0));
entries.push_back(entry);
}
return { 0, entries };
}
}
json fetch_internal_session(MYSQL* proxy, bool verbose) {
int rc = 0;

@ -2,9 +2,13 @@
#define UTILS_H
#include <algorithm>
#include <functional>
#include <string>
#include <thread>
#include <vector>
#include <fstream>
#include <unistd.h>
#include <utility>
#include "curl/curl.h"
#include "mysql.h"
@ -14,6 +18,9 @@
#include "command_line.h"
#include "mysql.h"
template <typename T>
using rc_t = std::pair<int,T>;
// Improve dependency failure compilation error
#ifndef DISABLE_WARNING_COUNT_LOGGING
@ -56,6 +63,38 @@ my_bool mysql_stmt_close_override(MYSQL_STMT* stmt, const char* file, int line);
#endif
/**
* @brief Computes the binomial coefficient C(n, k)
*/
long long binom_coeff(int n, int k);
/**
* @brief Computes the probability for all buckets has at least one elem.
* @param N Number of elements to place.
* @param M Number of buckets to fill.
* @return The computed probability.
*/
long double prob_filled(int N, int M);
/**
* @brief Computes the inverse of 'prob_filled', finding the min number of elements so the target
* probability is reached.
* @details NOTE: The inverse of 'prob_filled' isn't trivial to compute, due to this, this method
* computes it numerically using a simple binary search. The search isn't optimized or built to work
* with a generic M, yet it reasonably fits its current use cases for small M values.
* @param tg_prob The target probability for which to find the number of elements.
* @param M The number of buckets in which to place the elements.
* @return Number of elements necessary to achieve the target probability.
*/
int find_min_elems(double tg_prob, int M);
/**
* @brief Returns the string representation of an std::thread
* @param id Thread id to be converted to string.
* @return The string representation of the thread::id.
*/
std::string to_string(std::thread::id id);
/**
* @brief Helper function to disable Core nodes scheduler from ProxySQL Cluster nodes.
* @details In the CI environment, 'Scheduler' is used to induce extra load via Admin interface on
@ -144,6 +183,10 @@ int create_table_test_sbtest1(int num_rows, MYSQL *mysql);
int create_table_test_sqlite_sbtest1(int num_rows, MYSQL *mysql); // as above, but for SQLite3 server
int add_more_rows_test_sbtest1(int num_rows, MYSQL *mysql, bool sqlite=false);
/**
* @brief Returns the current monotonic time by 'clock_gettime'.
* @return Current monotonic time in microseconds (us).
*/
unsigned long long monotonic_time();
using mysql_res_row = std::vector<std::string>;
@ -164,7 +207,31 @@ std::vector<mysql_res_row> extract_mysql_rows(MYSQL_RES* my_res);
* * Query produces no resulset.
* * An error takes place during query execution or resultset retrieval.
*/
std::pair<uint32_t,std::vector<mysql_res_row>> mysql_query_ext_rows(MYSQL* mysql, const std::string& query);
rc_t<std::vector<mysql_res_row>> mysql_query_ext_rows(MYSQL* mysql, const std::string& query);
using sq3_col_def_t = std::string;
using sq3_row_t = std::vector<std::string>;
using sq3_err_t = int;
using sq3_res_t = std::tuple<std::vector<sq3_col_def_t>,std::vector<sq3_row_t>,int64_t,sq3_err_t>;
enum SQ3_RES_T {
SQ3_COLUMNS_DEF,
SQ3_ROWS,
SQ3_AFFECTED_ROWS,
SQ3_ERR
};
/**
* @brief Executes the provided query in the supplied sqlite3 db object.
* @param db Already initialized 'sqlite3' handler.
* @param query The query to be executed.
* @return An 'sq3_result_t' object holding the result, depending on the type of query and result, different
* fields will be populated, in case of success:
* - For DQL stmts COLUMN_DEF and ROWS will hold the columns definitions and the rows from the resultset.
* - For DML stmts the AFFECTED_ROWS will show the number of modified rows.
* In case of failure, ERR field will be populated and others will remain empty.
*/
sq3_res_t sqlite3_execute_stmt(sqlite3* db, const std::string& query);
/**
* @brief Holds the result of an `mysql_query_ext_val` operation.
@ -231,6 +298,32 @@ ext_val_t<T> mysql_query_ext_val(MYSQL* mysql, const std::string& query, const T
}
}
template <typename T>
ext_val_t<T> sq3_query_ext_val(sqlite3* db, const std::string& query, const T& def_val) {
const auto& sq3_res { sqlite3_execute_stmt(db, query) };
const auto& sq3_err { std::get<SQ3_RES_T::SQ3_ERR>(sq3_res) };
const auto& sq3_rows { std::get<SQ3_RES_T::SQ3_ROWS>(sq3_res) };
if (sq3_err) {
return { sq3_err, def_val };
} else if (sq3_rows.empty()) {
return { -1, def_val };
} else {
return ext_single_row_val(sq3_rows.front(), def_val);
}
}
template <typename T>
std::string sq3_get_ext_val_err(const ext_val_t<T>& ext_val) {
if (ext_val.err == -1) {
return "Received invalid empty resultset/row";
} else if (ext_val.err == -2) {
return "Failed to parse response value '" + ext_val.str + "'";
} else {
return std::string { sqlite3_errstr(ext_val.err) };
}
}
/**
* @brief Extract the error from a `ext_val_t<T>`.
* @param mysql Already oppened MYSQL connection.
@ -630,10 +723,28 @@ enum LINE_MATCH_T { POS, LINE, MATCH };
* For example, regex '\d+' should become '(\d+)'.
* @return All the lines found matching the regex.
*/
std::vector<line_match_t> get_matching_lines(
std::pair<size_t,std::vector<line_match_t>> get_matching_lines(
std::fstream& f_stream, const std::string& regex, bool get_matches=false
);
/**
* @brief Row entries from 'debug_log' table, from debug database.
*/
struct debug_entry_t {
uint64_t id;
time_t time;
uint64_t lapse;
uint64_t thread;
std::string file;
uint64_t line;
std::string funct;
uint64_t modnum;
std::string modname;
uint64_t verbosity;
std::string message;
std::string note;
};
/**
* @brief Opens a sqlite3 db file located in the supplied path with the provided flags.
* @param f_path Path to the 'db' file.
@ -642,30 +753,20 @@ std::vector<line_match_t> get_matching_lines(
* @return EXIT_SUCCESS in case of success, EXIT_FAILURE otherwise. Error cause is logged.
*/
int open_sqlite3_db(const std::string& f_path, sqlite3** db, int flags);
using sq3_col_def_t = std::string;
using sq3_row_t = std::vector<std::string>;
using sq3_err_t = std::string;
using sq3_res_t = std::tuple<std::vector<sq3_col_def_t>,std::vector<sq3_row_t>,int64_t,sq3_err_t>;
enum SQ3_RES_T {
SQ3_COLUMNS_DEF,
SQ3_ROWS,
SQ3_AFFECTED_ROWS,
SQ3_ERR
};
/**
* @brief Executes the provided query in the supplied sqlite3 db object.
* @param db Already initialized 'sqlite3' handler.
* @param query The query to be executed.
* @return An 'sq3_result_t' object holding the result, depending on the type of query and result, different
* fields will be populated, in case of success:
* - For DQL stmts COLUMN_DEF and ROWS will hold the columns definitions and the rows from the resultset.
* - For DML stmts the AFFECTED_ROWS will show the number of modified rows.
* In case of failure, ERR field will be populated and others will remain empty.
* @brief Builds a debug_entry_t from an sq3_row_t from 'debug_log' table.
* @param row The row to map to a 'debug_entry_t' struct.
* @return A pair of kind `{err_code, debug_entry_t}`.
*/
sq3_res_t sqlite3_execute_stmt(sqlite3* db, const std::string& query);
rc_t<debug_entry_t> build_debug_entry(const sq3_row_t& row);
/**
* @brief Retrieves a list of debug entries matching the supplied conditions.
* @param db An already opened connection to a 'SQLite3' database.
* @param conds Conditions for the WHERE clause for row filtering, no filtering if empty. E.g: 'id >
* $timestamp AND file="foo.cpp"'.
* @return A pair of kind `{err_code, matched_rows}`.
*/
rc_t<std::vector<debug_entry_t>> sq3_get_debug_entries(sqlite3* db, const std::string& conds);
/**
* @brief If found returns the element index, -1 otherwise.
@ -741,6 +842,36 @@ using pool_state_t = std::map<uint32_t,mysql_row_t>;
* @return A pair of the shape {err_code, pool_state_t}.
*/
std::pair<int,pool_state_t> fetch_conn_stats(MYSQL* admin, const std::vector<uint32_t> hgs);
/**
* @brief Waits for a generic condition.
* @details Wait finishes by a non-zero return code by the condition or by timeout.
* @param cond Condition to be evaluated at each wait interval.
* @param to_us Timeout at which to stop the wait, in microseconds.
* @param delay_us Delay between wait intervals, in microseconds.
* @return 0 for success, non-zero return code from 'cond' for failure, or INT_MIN for timeout.
*/
template <typename T>
rc_t<T> wait_for_cond(const std::function<rc_t<T>()>& cond, uint32_t to_us, uint32_t delay_us) {
int rc { 0 };
T res {};
uint32_t waited { 0 };
while (waited < to_us) {
std::tie(rc, res) = cond();
if (rc == 0) {
usleep(delay_us);
waited += delay_us;
} else if (rc < 0) {
break;
} else {
return { 0, res };
}
}
return { INT_MIN, res };
}
/**
* @brief Waits until the condition specified by the 'query' holds, or 'timeout' is reached.
* @details Several details about the function impl:

@ -86,7 +86,7 @@ void check_matching_logline(fstream& f_log, string regex) {
// Minimal wait for the error log to be written
usleep(500 * 1000);
std::vector<line_match_t> matching_lines { get_matching_lines(f_log, regex) };
const auto& [__1, matching_lines ] { get_matching_lines(f_log, regex) };
for (const line_match_t& line_match : matching_lines) {
diag(
"Found matching logline - pos: %ld, line: `%s`",

@ -281,7 +281,7 @@ int is_string_in_result(PGresult* result, const char* target_str) {
}
bool check_logs_for_command(std::fstream& f_proxysql_log, const std::string& command_regex) {
std::vector<line_match_t> cmd_lines{ get_matching_lines(f_proxysql_log, command_regex) };
const auto& [_, cmd_lines] { get_matching_lines(f_proxysql_log, command_regex) };
return cmd_lines.empty() ? false : true;
}

@ -772,7 +772,7 @@ int check_module_checksums_sync(
const string diff_check_regex {
"Cluster: detected a peer .* with " + module + " version \\d+, epoch \\d+, diff_check \\d+."
};
vector<line_match_t> new_matching_lines { get_matching_lines(logfile_fs, diff_check_regex) };
const auto& [_, new_matching_lines] { get_matching_lines(logfile_fs, diff_check_regex) };
diag("regex used in `%s` to find loglines: `%s`", basename(logfile_path.c_str()), diff_check_regex.c_str());
for (const line_match_t& line_match : new_matching_lines) {
@ -801,7 +801,7 @@ int check_module_checksums_sync(
" Not syncing due to '" + module_sync.sync_variable + "=0'"
};
vector<line_match_t> new_matching_lines { get_matching_lines(logfile_fs, no_syncing_regex) };
const auto& [_, new_matching_lines] { get_matching_lines(logfile_fs, no_syncing_regex) };
diag("regex used in `%s` to find loglines: `%s`", basename(logfile_path.c_str()), no_syncing_regex.c_str());
for (const line_match_t& line_match : new_matching_lines) {

@ -12,9 +12,11 @@
#include <stdio.h>
#include <unistd.h>
#include <fstream>
#include <functional>
#include <thread>
#include <map>
#include "mysql.h"
#include "mysqld_error.h"
#include "tap.h"
#include "utils.h"
@ -22,32 +24,40 @@
#include "json.hpp"
using nlohmann::json;
using std::map;
using std::pair;
using std::string;
using nlohmann::json;
using std::fstream;
using std::function;
using std::vector;
// Used for 'extract_module_host_port'
#include "modules_server_test.h"
void parse_result_json_column(MYSQL_RES *result, json& j) {
if(!result) return;
MYSQL_ROW row;
////////////////////////////////////////////////////////////////////////////////
// Borrowed from test_match_eof_conn_cap.cpp - TODO: MERGE
////////////////////////////////////////////////////////////////////////////////
while ((row = mysql_fetch_row(result))) {
j = json::parse(row[0]);
}
}
#include <dirent.h>
int extract_internal_session(MYSQL* proxy, nlohmann::json& j_internal_session) {
MYSQL_QUERY_T(proxy, "PROXYSQL INTERNAL SESSION");
MYSQL_RES* myres = mysql_store_result(proxy);
parse_result_json_column(myres, j_internal_session);
mysql_free_result(myres);
#define _S(s) ( std::string {s} )
#define _TO_S(s) ( std::to_string(s) )
return EXIT_SUCCESS;
}
#define SELECT_RUNTIME_VAR "SELECT variable_value FROM runtime_global_variables WHERE variable_name="
#define CHECK_EXT_VAL(val)\
do {\
if (val.err) {\
diag("%s:%d: Query failed err=\"%s\"", __func__, __LINE__, val.str.c_str());\
return EXIT_FAILURE;\
}\
} while(0)
const uint32_t USLEEP_SQLITE_LOCKED = 100;
////////////////////////////////////////////////////////////////////////////////
int get_query_int_res(MYSQL* admin, const string& q, int& val) {
MYSQL_QUERY_T(admin, q.c_str());
@ -76,15 +86,17 @@ int get_query_int_res(MYSQL* admin, const string& q, int& val) {
int extract_sess_qpo_dest_hg(MYSQL* proxy) {
json j_internal_session {};
int j_err = extract_internal_session(proxy, j_internal_session);
if (j_err) {
diag("Failed to extract and parse result from 'PROXYSQL INTERNAL SESSION'");
try {
j_internal_session = fetch_internal_session(proxy);
} catch (const std::exception& e) {
diag("Failed to fetch 'PROXYSQL INTERNAL SESSION' exception=\"%s\"", e.what());
return -2;
}
int dest_hg = -2;
try {
dest_hg = j_internal_session["qpo"]["destination_hostgroup"];
diag("Session information thread=%s", j_internal_session["thread"].dump().c_str());
} catch (const std::exception& e) {
diag("Processing of 'PROXYSQL INTERNAL SESSION' failed with exc: %s", e.what());
return -2;
@ -142,7 +154,11 @@ string get_last_debug_log_id(sqlite3* sq3_db) {
}
int create_mysql_servers_range(
const CommandLine& cl, MYSQL* admin, const pair<string,int>& host_port, uint32_t rng_init, uint32_t rng_end
const CommandLine& cl,
MYSQL* admin,
const pair<string,int>& host_port,
uint32_t rng_init,
uint32_t rng_end
) {
const string init { std::to_string(rng_init) };
const string end { std::to_string(rng_end) };
@ -161,7 +177,7 @@ int create_mysql_servers_range(
};
int create_fast_routing_rules_range(
const CommandLine& cl, MYSQL* admin, const pair<string,int>& host_port, uint32_t rng_init, uint32_t rng_end
const CommandLine& cl, MYSQL* admin, uint32_t rng_init, uint32_t rng_end
) {
const string init { std::to_string(rng_init) };
const string end { std::to_string(rng_end) };
@ -183,49 +199,233 @@ int create_fast_routing_rules_range(
return EXIT_SUCCESS;
};
int sq3_get_matching_msg_entries(sqlite3* db, const string& query_regex, const string& id) {
string init_db_query {
"SELECT COUNT() FROM debug_log WHERE message LIKE '%" + query_regex + "%' AND id > " + id
const char q_query_rules_mem_stats[] {
"SELECT variable_value FROM stats_memory_metrics WHERE variable_name='mysql_query_rules_memory'"
};
const char SELECT_LAST_DEBUG_ID[] { "SELECT id FROM debug_log ORDER BY id DESC limit 1" };
int check_fast_routing_rules(
const CommandLine& cl,
MYSQL* proxy,
MYSQL* admin,
uint32_t rng_init,
uint32_t rng_end,
const string& algo,
fstream& errlog,
sqlite3* sq3_db
) {
// Seek end of file for error log
errlog.seekg(0, std::ios::end);
diag("Seek end of error log file pos=%lu", size_t(errlog.tellg()));
diag("Flush debug logs to ensure getting the latest id");
MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS");
diag("Getting last 'debug_log' entry id");
ext_val_t<uint32_t> last_id { sq3_query_ext_val(sq3_db, SELECT_LAST_DEBUG_ID, uint32_t(0)) };
CHECK_EXT_VAL(last_id);
diag("Fetched last 'debug_log' entry id id=%d", last_id.val);
// Check that fast_routing rules are properly working for the defined range
int rc = check_fast_routing_rules(proxy, rng_init, rng_end);
if (rc) { return EXIT_FAILURE; }
const string init_search_regex { "Searching " + algo + " 'rules_fast_routing' hashmap" };
auto [insp_lines, matched_lines] { get_matching_lines(errlog, init_search_regex) };
diag(
"Inspected error log for matching lines pos=%lu insp_lines=%ld match_lines=%ld regex=\"%s\"",
size_t(errlog.tellg()), insp_lines, matched_lines.size(), init_search_regex.c_str()
);
ok(
matched_lines.size() == rng_end - rng_init,
"Number of '%s' entries in error log should match issued queries - Exp: %d, Act: %ld",
algo.c_str(), rng_end - rng_init, matched_lines.size()
);
const function<pair<int,int>()> check_sq3_entries {
[&] () -> pair<int,int> {
const string dbg_msg {
"Searching " + algo + " ''rules_fast_routing'' hashmap % schema=''test"
};
const string select_count {
"SELECT COUNT() FROM debug_log WHERE"
" id > " + _TO_S(last_id.val) + " AND message LIKE '%" + dbg_msg + "%'"
};
ext_val_t<int64_t> entries { sq3_query_ext_val(sq3_db, select_count, int64_t(-1))};
if (entries.err) {
const string err { get_ext_val_err(admin, entries) };
diag("%s:%d: Query failed err=\"%s\"", __func__, __LINE__, err.c_str());
return { -1, 0 };
}
diag(
"Checking matched entries entries=%ld last_debug_log_id=%d query=\"%s\"",
entries.val, last_id.val, select_count.c_str()
);
if (entries.val > 0) {
return { entries.val == (rng_end - rng_init), entries.val };
} else {
return { entries.val, 0 };
}
}
};
sq3_res_t sq3_entries_res { sqlite3_execute_stmt(db, init_db_query) };
const string& sq3_err { std::get<SQ3_RES_T::SQ3_ERR>(sq3_entries_res) };
if (!sq3_err.empty()) {
diag("Query failed to be executed in SQLite3 - query: `%s`, err: `%s`", init_db_query.c_str(), sq3_err.c_str());
return EXIT_FAILURE;
diag("Flush debug logs before fetching expected entries");
MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS");
auto [wait_res, matching_rows] = wait_for_cond(check_sq3_entries, 1000*1000, 250*1000);
if (wait_res != 0) { exit(1); }
ok(
wait_res == 0,
"Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %d",
algo.c_str(), rng_end - rng_init, matching_rows
);
return EXIT_SUCCESS;
}
const string PROXYSQL_QLOG_DIR { get_env_str("REGULAR_INFRA_DATADIR", "/tmp/datadir") };
int threads_warmup(const CommandLine& cl, MYSQL* admin, sqlite3* sq3_db) {
ext_val_t<int> mysql_threads {
mysql_query_ext_val(admin,
"SELECT variable_value FROM global_variables WHERE variable_name='mysql-threads'", 0
)
};
CHECK_EXT_VAL(mysql_threads);
const ext_val_t<string> qlog_fname {
mysql_query_ext_val(admin, SELECT_RUNTIME_VAR"'mysql-eventslog_filename'", _S("query.log"))
};
CHECK_EXT_VAL(qlog_fname);
const string PROXYSQL_AUDIT_LOG { PROXYSQL_QLOG_DIR + "/" + qlog_fname.str };
diag("Flush debug logs to ensure getting the latest id");
MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS");
diag("Getting last 'debug_log' entry id");
ext_val_t<uint32_t> last_id { sq3_query_ext_val(sq3_db, SELECT_LAST_DEBUG_ID, uint32_t(0)) };
CHECK_EXT_VAL(last_id);
diag("Fetched last 'debug_log' entry id id=%d", last_id.val);
int conns { find_min_elems(1.0 - pow(10, -6), mysql_threads.val) /* * 2 */ };
double th_conns { ceil(double(conns) / mysql_threads.val) };
diag(
"Performing warm-up queries queries=%d th_queries=%lf workers=%d",
conns, th_conns, mysql_threads.val
);
vector<std::thread> workers {};
pthread_barrier_t bar;
pthread_barrier_init(&bar, NULL, mysql_threads.val);
for (int i = 0; i < mysql_threads.val; i++) {
workers.emplace_back(std::thread([&cl, th_conns, i, &bar] () {
pthread_barrier_wait(&bar);
const string th_id { to_string(std::this_thread::get_id()) };
auto curtime = monotonic_time();
diag("Worker starting warm-up conn time=%lld thread_id=%s", curtime, th_id.c_str());
for (int j = 0; j < th_conns; j++) {
MYSQL* proxy = mysql_init(NULL);
if (!mysql_real_connect(proxy, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxy));
return EXIT_FAILURE;
}
const string query { rand() % 2 ? "SELECT 1" : "SELECT 2" };
diag("Issuing query from worker thread_id=%s query=\"%s\"", th_id.c_str(), query.c_str());
MYSQL_QUERY_T(proxy, query.c_str());
mysql_free_result(mysql_store_result(proxy));
mysql_close(proxy);
}
return 0;
}));
}
const vector<sq3_row_t>& sq3_rows { std::get<SQ3_RES_T::SQ3_ROWS>(sq3_entries_res) };
int32_t matching_rows = std::atoi(sq3_rows[0][0].c_str());
for (std::thread& w : workers) {
w.join();
}
return matching_rows;
};
pthread_barrier_destroy(&bar);
diag("Flush debug logs before fetching thread dist entries");
MYSQL_QUERY_T(admin, "PROXYSQL FLUSH LOGS");
const auto [_, matched_entries] {
sq3_get_debug_entries(sq3_db,
"id > " + _TO_S(last_id.val) + " AND file='MySQL_Session.cpp'"
" AND message LIKE '%Processing received query%'"
)
};
std::map<uint32_t, uint32_t> threads_ids {};
for (const debug_entry_t& dbg_entry : matched_entries) {
threads_ids[dbg_entry.thread] += 1;
}
diag("Thread query distribution:\n%s", nlohmann::json(threads_ids).dump(4).c_str());
ok(
threads_ids.size() == mysql_threads.val + 1,
"Each thread processed at least one warm-up query threads_ids=%ld mysql_threads=%d",
threads_ids.size(), mysql_threads.val + 1
);
vector<pair<uint32_t,uint32_t>> sq3_ths {};
std::copy_if(threads_ids.begin(), threads_ids.end(), std::back_inserter(sq3_ths),
[] (const pair<uint32_t,uint32_t>& e) -> bool { return e.second == 1; }
);
ok(
sq3_ths.size() == 1,
"Only one SQLite3 thread spawned to process 'FLUSH LOGS' sq3_ths=[%s]",
nlohmann::json(sq3_ths).dump(-1).c_str()
);
return EXIT_FAILURE;
}
int test_fast_routing_algorithm(
const CommandLine& cl, MYSQL* admin, MYSQL* proxy, const pair<string,int>& host_port, fstream& errlog,
const CommandLine& cl,
MYSQL* admin,
MYSQL* proxy,
const pair<string,int>& host_port,
fstream& errlog,
int init_algo, int new_algo
) {
uint32_t rng_init = 1000;
uint32_t rng_end = 1020;
const char query_rules_mem_stats_query[] {
"SELECT variable_value FROM stats_memory_metrics WHERE variable_name='mysql_query_rules_memory'"
};
// Enable Admin debug, set debug_output to log and DB, and increase verbosity for Query_Processor
MYSQL_QUERY_T(admin, "SET admin-debug=1");
MYSQL_QUERY_T(admin, "SET admin-debug_output=3");
MYSQL_QUERY_T(admin, "LOAD ADMIN VARIABLES TO RUNTIME");
MYSQL_QUERY_T(admin, "UPDATE debug_levels SET verbosity=7 WHERE module='debug_mysql_query_processor'");
// If there is a generic debug filter (line==0) on Query_Processor.cpp , process_mysql_query() , disable it.
// If the filter was present it will be automatically recreated by the tester.
// Remove **generic debug filters** (line==0) relevant for the test.
MYSQL_QUERY_T(admin, "DELETE FROM debug_filters WHERE filename='Query_Processor.cpp' AND line=0 AND funct='process_mysql_query'");
MYSQL_QUERY_T(admin, "DELETE FROM debug_filters WHERE filename='MySQL_Session.cpp' AND line=0 AND funct='get_pkts_from_client'");
// If the filter was present it will be automatically recreated by the tester.
MYSQL_QUERY_T(admin, "LOAD DEBUG TO RUNTIME");
// Open the SQLite3 db for debugging
const string db_path { get_env("REGULAR_INFRA_DATADIR") + "/proxysql_debug.db" };
sqlite3* sq3_db = nullptr;
int odb_err = open_sqlite3_db(db_path, &sq3_db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
int odb_err = open_sqlite3_db(db_path, &sq3_db, SQLITE_OPEN_READONLY);
if (odb_err) { return EXIT_FAILURE; }
char* prg_err { nullptr };
int c_err = create_mysql_servers_range(cl, admin, host_port, rng_init, rng_end);
if (c_err) { return EXIT_FAILURE; }
MYSQL_QUERY_T(admin, "LOAD MYSQL SERVERS TO RUNTIME");
@ -239,162 +439,96 @@ int test_fast_routing_algorithm(
MYSQL_QUERY_T(admin, "DELETE FROM mysql_query_rules_fast_routing");
MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
int init_rules_mem_stats = -1;
int get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, init_rules_mem_stats);
if (get_mem_stats_err) { return EXIT_FAILURE; }
diag("Initial 'mysql_query_rules_memory' of '%d'", init_rules_mem_stats);
ext_val_t<int> init_mem_stats { mysql_query_ext_val(admin, q_query_rules_mem_stats, -1) };
CHECK_EXT_VAL(init_mem_stats);
diag("Initial 'mysql_query_rules_memory' of '%d'", init_mem_stats.val);
// Check that fast_routing rules are being properly triggered
c_err = create_fast_routing_rules_range(cl, admin, host_port, rng_init, rng_end);
c_err = create_fast_routing_rules_range(cl, admin, rng_init, rng_end);
if (c_err) { return EXIT_FAILURE; }
MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
usleep(1000*1000);
// Seek end of file for error log
errlog.seekg(0, std::ios::end);
// Get current last id from debug db
string last_debug_log_id { get_last_debug_log_id(sq3_db) };
if (last_debug_log_id.empty()) { return EXIT_FAILURE; }
// Check that fast_routing rules are properly working for the defined range
check_fast_routing_rules(proxy, rng_init, rng_end);
// Give some time for the error log and SQLite3 to be written
usleep(100*1000);
const string init_algo_scope { init_algo == 1 ? "thread-local" : "global" };
const string init_search_regex { "Searching " + init_algo_scope + " 'rules_fast_routing' hashmap" };
vector<line_match_t> matched_lines { get_matching_lines(errlog, init_search_regex) };
diag("Warm-up threads to ensure the QueryProcessor maps are build");
threads_warmup(cl, admin, sq3_db);
ok(
matched_lines.size() == rng_end - rng_init,
"Number of '%s' searchs in error log should match issued queries - Exp: %d, Act: %ld",
init_algo_scope.c_str(), rng_end - rng_init, matched_lines.size()
);
const string sq3_query_regex { "Searching " + init_algo_scope + " ''rules_fast_routing'' hashmap" };
int matching_rows = sq3_get_matching_msg_entries(sq3_db, sq3_query_regex, last_debug_log_id);
ok(
matching_rows == rng_end - rng_init,
"Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %ld",
init_algo_scope.c_str(), rng_end - rng_init, matched_lines.size()
);
// Check that fast_routing rules are being properly triggered
const string init_algo_str { init_algo == 1 ? "thread-local" : "global" };
int chk_res = check_fast_routing_rules(cl, proxy, admin, rng_init, rng_end, init_algo_str, errlog, sq3_db);
if (chk_res) { return EXIT_FAILURE; }
printf("\n");
int old_mem_stats = -1;
get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, old_mem_stats);
if (get_mem_stats_err) { return EXIT_FAILURE; }
ext_val_t<int> old_mem_stats { mysql_query_ext_val(admin, q_query_rules_mem_stats, -1) };
CHECK_EXT_VAL(old_mem_stats);
// Changing the algorithm shouldn't have any effect
diag("*ONLY* Changing the algorithm shouldn't have any effect");
diag("Testing 'query_rules_fast_routing_algorithm=%d'", new_algo);
MYSQL_QUERY_T(admin, ("SET mysql-query_rules_fast_routing_algorithm=" + std::to_string(new_algo)).c_str());
MYSQL_QUERY_T(admin, "LOAD MYSQL VARIABLES TO RUNTIME");
usleep(1000*1000);
// Seek end of file for error log
errlog.seekg(0, std::ios::end);
// Get current last id from debug db
last_debug_log_id = get_last_debug_log_id(sq3_db);
if (last_debug_log_id.empty()) { return EXIT_FAILURE; }
diag("Search should still be performed 'per-thread'. Only variable has changed.");
check_fast_routing_rules(proxy, rng_init, rng_end);
// Give some time for the error log to be written
usleep(100*1000);
matched_lines = get_matching_lines(errlog, init_search_regex);
ok(
matching_rows == rng_end - rng_init,
"Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %ld",
init_algo_scope.c_str(), rng_end - rng_init, matched_lines.size()
);
matching_rows = sq3_get_matching_msg_entries(sq3_db, sq3_query_regex, last_debug_log_id);
ok(
matched_lines.size() == rng_end - rng_init,
"Number of 'thread-local' searchs in error log should match issued queries - Exp: %d, Act: %ld",
rng_end - rng_init, matched_lines.size()
);
// Check that fast_routing rules are being properly triggered
chk_res = check_fast_routing_rules(cl, proxy, admin, rng_init, rng_end, init_algo_str, errlog, sq3_db);
if (chk_res) { return EXIT_FAILURE; }
printf("\n");
int new_mem_stats = -1;
get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, new_mem_stats);
if (get_mem_stats_err) { return EXIT_FAILURE; }
ext_val_t<int> new_mem_stats { mysql_query_ext_val(admin, q_query_rules_mem_stats, -1) };
CHECK_EXT_VAL(new_mem_stats);
diag("Memory SHOULDN'T have changed just because of a variable change");
ok(
old_mem_stats - init_rules_mem_stats == new_mem_stats - init_rules_mem_stats,
old_mem_stats.val - init_mem_stats.val == new_mem_stats.val - init_mem_stats.val,
"Memory stats shouldn't increase just by the variable change - old: %d, new: %d",
old_mem_stats - init_rules_mem_stats, new_mem_stats - init_rules_mem_stats
old_mem_stats.val - init_mem_stats.val, new_mem_stats.val - init_mem_stats.val
);
printf("\n");
MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
diag("Search should now be using the per thread-maps");
// Seek end of file for error log
errlog.seekg(0, std::ios::end);
check_fast_routing_rules(proxy, rng_init, rng_end);
// Give some time for the error log to be written
usleep(100*1000);
diag("Warm-up threads to ensure the QueryProcessor maps are build");
threads_warmup(cl, admin, sq3_db);
const string new_algo_scope { new_algo == 1 ? "thread-local" : "global" };
const string new_search_regex { "Searching " + new_algo_scope + " 'rules_fast_routing' hashmap" };
vector<line_match_t> new_matched_lines { get_matching_lines(errlog, new_search_regex) };
const string new_algo_str { new_algo == 1 ? "thread-local" : "global" };
diag("Search should now be using the '%s' maps", new_algo_str.c_str());
chk_res = check_fast_routing_rules(cl, proxy, admin, rng_init, rng_end, new_algo_str, errlog, sq3_db);
if (chk_res) { return EXIT_FAILURE; }
ok(
new_matched_lines.size() == rng_end - rng_init,
"Number of '%s' searchs in error log should match issued queries - Exp: %d, Act: %ld",
new_algo_scope.c_str(), rng_end - rng_init, new_matched_lines.size()
);
const string new_sq3_query_regex { "Searching " + new_algo_scope + " ''rules_fast_routing'' hashmap" };
int new_matching_rows = sq3_get_matching_msg_entries(sq3_db, sq3_query_regex, last_debug_log_id);
ok(
new_matching_rows == rng_end - rng_init,
"Number of '%s' entries in SQLite3 'debug_log' should match issued queries - Exp: %d, Act: %d",
new_algo_scope.c_str(), rng_end - rng_init, new_matching_rows
);
get_mem_stats_err = get_query_int_res(admin, query_rules_mem_stats_query, new_mem_stats);
if (get_mem_stats_err) { return EXIT_FAILURE; }
new_mem_stats = mysql_query_ext_val(admin, q_query_rules_mem_stats, -1);
CHECK_EXT_VAL(new_mem_stats);
bool mem_check_res = false;
string exp_change { "" };
const auto old_new_diff { old_mem_stats.val - init_mem_stats.val };
const auto new_init_diff { new_mem_stats.val - init_mem_stats.val };
if (init_algo == 1 && new_algo == 2) {
mem_check_res = (old_mem_stats - init_rules_mem_stats) > (new_mem_stats - init_rules_mem_stats);
mem_check_res = old_new_diff > new_init_diff;
exp_change = "decrease";
} else if (init_algo == 2 && new_algo == 1) {
mem_check_res = (old_mem_stats - init_rules_mem_stats) < (new_mem_stats - init_rules_mem_stats);
mem_check_res = old_new_diff < new_init_diff;
exp_change = "increase";
} else {
mem_check_res = (old_mem_stats - init_rules_mem_stats) == (new_mem_stats - init_rules_mem_stats);
mem_check_res = old_new_diff == new_init_diff;
exp_change = "not change";
}
ok(
mem_check_res,
"Memory stats should %s after 'LOAD MYSQL QUERY RULES TO RUNTIME' - old: %d, new: %d",
exp_change.c_str(), (old_mem_stats - init_rules_mem_stats), (new_mem_stats - init_rules_mem_stats)
exp_change.c_str(), old_new_diff, new_init_diff
);
sqlite3_close_v2(sq3_db);
return EXIT_SUCCESS;
};
int main(int argc, char** argv) {
// `5` logic checks + 20*3 checks per query rule, per test
plan((8 + 20*3) * 2);
// `12` logic checks + 20*3 checks per query rule, per test
plan((2*3 + 2 + 2*2 + 20*3) * 2);
CommandLine cl;
// Seed the random-number gen
std::srand(std::time(NULL));
if (cl.getEnv()) {
diag("Failed to get the required environmental variables.");
return EXIT_FAILURE;

@ -237,13 +237,13 @@ std::vector<std::string> sqlite_intf_queries {
int check_errorlog_for_addrinuse(MYSQL* admin, fstream& logfile) {
const string command_regex { ".*\\[INFO\\] Received LOAD SQLITESERVER VARIABLES (FROM DISK|TO RUNTIME) command" };
std::vector<line_match_t> cmd_lines { get_matching_lines(logfile, command_regex) };
const auto& [__1, cmd_lines] { get_matching_lines(logfile, command_regex) };
// NOTE: Delay for poll_timeout for SQLite3_Server - harcoded 500ms
usleep(1000 * 1000);
const string bind_err_regex { ".*\\[ERROR\\] bind\\(\\): Address already in use" };
std::vector<line_match_t> err_lines { get_matching_lines(logfile, bind_err_regex) };
const auto& [__2, err_lines ] { get_matching_lines(logfile, bind_err_regex) };
if (cmd_lines.empty()) {
diag("ERROR: Commands 'LOAD SQLITESERVER' not logged as expected");

Loading…
Cancel
Save