diff --git a/test/tap/tests/test_cluster_sync-t.cpp b/test/tap/tests/test_cluster_sync-t.cpp index 7764bb6b7..dc21c4753 100644 --- a/test/tap/tests/test_cluster_sync-t.cpp +++ b/test/tap/tests/test_cluster_sync-t.cpp @@ -31,14 +31,18 @@ #include #include #include +#include +#include #include -#include #include #include #include #include #include +#include +#include +#include #include @@ -62,6 +66,8 @@ using std::string; using std::vector; using std::tuple; +using std::fstream; +using std::function; /** * @brief Helper function to verify that the sync of a table (or variable) have been performed. @@ -391,6 +397,539 @@ int check_mysql_servers_sync( return EXIT_SUCCESS; } +struct sync_payload_t { + function update_module_val; + string module; + string sync_variable; +}; + +int64_t fetch_single_int_res(MYSQL* admin) { + MYSQL_RES* myres = mysql_store_result(admin); + MYSQL_ROW myrow = mysql_fetch_row(myres); + + int64_t val = -1; + + if (myrow && myrow[0]) { + char* endptr = NULL; + val = std::strtol(myrow[0], &endptr, 10); + + if (myrow[0] == endptr) { + val = -1; + } + } + + mysql_free_result(myres); + + return val; +} + +int update_variable_val(const CommandLine& cl, MYSQL* admin, const string& type, const string& var_name) { + cfmt_t select_query { + cstr_format("SELECT variable_value FROM global_variables WHERE variable_name='%s'", var_name.c_str()) + }; + + MYSQL_QUERY_T(admin, select_query.str.c_str()); + int64_t cur_val = fetch_single_int_res(admin); + if (cur_val == -1) { + return EXIT_FAILURE; + } + + cfmt_t update_query { cstr_format("SET %s=%ld", var_name.c_str(), cur_val + 1) }; + MYSQL_QUERY_T(admin, update_query.str.c_str()); + + if (type == "admin") { + MYSQL_QUERY_T(admin, "LOAD ADMIN VARIABLES TO RUNTIME"); + } else if (type == "mysql") { + MYSQL_QUERY_T(admin, "LOAD MYSQL VARIABLES TO RUNTIME"); + } + + return EXIT_SUCCESS; +} + +int update_mysql_servers(const CommandLine& cl, MYSQL* admin) { + const char select_max_conns_t[] { + "SELECT max_connections FROM mysql_servers WHERE hostgroup_id=" + "(SELECT default_hostgroup FROM mysql_users WHERE username='%s')" + }; + const char update_max_conns_t[] { + "UPDATE mysql_servers SET max_connections=%ld WHERE hostgroup_id=" + "(SELECT default_hostgroup FROM mysql_users WHERE username='%s')" + }; + + cfmt_t select_max_conns { cstr_format(select_max_conns_t, cl.username) }; + MYSQL_QUERY_T(admin, select_max_conns.str.c_str()); + int64_t cur_val = fetch_single_int_res(admin); + if (cur_val == -1) { + return EXIT_FAILURE; + } + + cfmt_t update_query { cstr_format(update_max_conns_t, cur_val + 1, cl.username) }; + MYSQL_QUERY_T(admin, update_query.str.c_str()); + MYSQL_QUERY_T(admin, "LOAD MYSQL SERVERS TO RUNTIME"); + + return EXIT_SUCCESS; +} + +int update_mysql_query_rules(const CommandLine& cl, MYSQL* admin) { + const char update_mysql_query_rules[] { + "INSERT INTO mysql_query_rules (active) VALUES (1)" + }; + + MYSQL_QUERY_T(admin, update_mysql_query_rules); + MYSQL_QUERY_T(admin, "LOAD MYSQL QUERY RULES TO RUNTIME"); + + return EXIT_SUCCESS; +} + +/** + * @brief Assumes that 'proxysql_servers' holds at least the one entry required for this test. + * @details It's assumed that primary ProxySQL is part of a Cluster. + */ +int update_proxysql_servers(const CommandLine& cl, MYSQL* admin) { + const char update_proxysql_servers_t[] { + "UPDATE proxysql_servers SET comment='%s' WHERE hostname='%s' and port=%d" + }; + + cfmt_t update_servers { + cstr_format(update_proxysql_servers_t, std::to_string(time(NULL)).c_str(), cl.host, cl.admin_port) + }; + MYSQL_QUERY_T(admin, update_servers.str.c_str()); + MYSQL_QUERY_T(admin, "LOAD PROXYSQL SERVERS TO RUNTIME"); + + return EXIT_SUCCESS; +} + +const vector module_sync_payloads { + { + update_mysql_servers, + "mysql_servers", + "admin-cluster_mysql_servers_diffs_before_sync", + }, + { + update_mysql_query_rules, + "mysql_query_rules", + "admin-cluster_mysql_query_rules_diffs_before_sync", + }, + { + update_proxysql_servers, + "proxysql_servers", + "admin-cluster_proxysql_servers_diffs_before_sync", + }, + { + [] (const CommandLine& cl, MYSQL* admin) -> int { + return update_variable_val(cl, admin, "mysql", "mysql-ping_timeout_server"); + }, + "mysql_variables", + "admin-cluster_mysql_variables_diffs_before_sync" + }, + { + [] (const CommandLine& cl, MYSQL* admin) -> int { + return update_variable_val(cl, admin, "admin", "admin-refresh_interval"); + }, + "admin_variables", + "admin-cluster_admin_variables_diffs_before_sync" + }, + // TODO: LDAP pluging currently not loaded for this test + // { + // update_ldap_variables, + // "proxysql_servers", + // "admin-cluster_proxysql_servers_diffs_before_sync", + // }, +}; + +using line_match_t = tuple; +enum LINE_MATCH_T { POS, LINE, MATCHES }; + +int open_file_and_seek_end(const string& f_path, fstream& f_logfile) { + f_logfile.open(f_path.c_str(), fstream::in | fstream::out); + + if (!f_logfile.is_open() || !f_logfile.good()) { + printf("Failed to open 'proxysql.log' file: { path: %s, error: %d }", f_path.c_str(), errno); + return EXIT_FAILURE; + } + + f_logfile.seekg(0, std::ios::end); + + return EXIT_SUCCESS; +} + +vector get_matching_lines(fstream& f_logfile, const string& line_regex) { + vector found_matches {}; + + string s_logline {}; + fstream::pos_type line_pos {}; + + while (std::getline(f_logfile, s_logline)) { + std::regex regex_err_line { line_regex }; + std::smatch match_results {}; + + if (std::regex_search(s_logline, match_results, regex_err_line)) { + found_matches.push_back({ f_logfile.tellg(), s_logline, match_results }); + } + } + + return found_matches; +} + +int wait_for_node_sync(MYSQL* admin, const vector queries, uint32_t timeout) { + uint waited = 0; + bool not_synced = false; + std::string failed_query {}; + + while (waited < timeout) { + not_synced = false; + + // Check that all the entries have been synced + for (const auto& query : queries) { + if (mysql_query(admin, query.c_str())) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin)); + return -1; + } + + MYSQL_RES* myres = mysql_store_result(admin); + MYSQL_ROW myrow = mysql_fetch_row(myres); + int row_value = -1; + + if (myrow && myrow[0]) { + row_value = std::atoi(myrow[0]); + } + + mysql_free_result(myres); + + if (row_value == 0) { + not_synced = true; + failed_query = query; + + diag("Not synced yet - Result: %d, Query: %s", row_value, query.c_str()); + break; + } + } + + if (not_synced) { + waited += 1; + sleep(1); + } else { + break; + } + } + + if (not_synced) { + diag("'wait_for_node_sync' timeout for query '%s'", failed_query.c_str()); + } + + return not_synced; +}; + +string fetch_remote_checksum(MYSQL* admin, const CommandLine& cl, const string& module) { + const char select_core_module_checksum_t[] { + "SELECT checksum FROM stats_proxysql_servers_checksums WHERE hostname='%s' AND port='%d' AND name='%s'" + }; + + cfmt_t select_checksum { cstr_format(select_core_module_checksum_t, cl.host, cl.admin_port, module.c_str()) }; + if (mysql_query(admin, select_checksum.str.c_str())) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin)); + return {}; + } + + string checksum {}; + + { + MYSQL_RES* myres = mysql_store_result(admin); + MYSQL_ROW myrow = mysql_fetch_row(myres); + + if (myrow && myrow[0]) { + checksum = myrow[0]; + } + + mysql_free_result(myres); + } + + return checksum; +}; + +string fetch_runtime_checksum(MYSQL* admin, const string& module) { + const char select_core_module_checksum_t[] { + "SELECT checksum FROM runtime_checksums_values WHERE name='%s'" + }; + + cfmt_t select_checksum { cstr_format(select_core_module_checksum_t, module.c_str()) }; + if (mysql_query(admin, select_checksum.str.c_str())) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin)); + return {}; + } + + string checksum {}; + + { + MYSQL_RES* myres = mysql_store_result(admin); + MYSQL_ROW myrow = mysql_fetch_row(myres); + + if (myrow && myrow[0]) { + checksum = myrow[0]; + } + + mysql_free_result(myres); + } + + return checksum; +}; + +int check_module_checksums_sync( + MYSQL* admin, MYSQL* r_admin, const CommandLine& cl, const sync_payload_t& module_sync, int diffs_sync +) { + const char new_remote_checksum_query_t[] { + "SELECT count(*) FROM stats_proxysql_servers_checksums WHERE " + "hostname='%s' AND port='%d' AND name='%s' AND checksum!='%s'" + }; + const char synced_runtime_checksums_query_t[] { + "SELECT COUNT(*) FROM runtime_checksums_values WHERE name='%s' AND checksum='%s'" + }; + + // Store current remote checksum value + const string& module { module_sync.module }; + + // Checksum can not be present if we have just added the remote + uint32_t CHECKSUM_SYNC_TIMEOUT = 3; + + const char wait_remote_checksums_init_t[] { + "SELECT LENGTH(checksum) FROM stats_proxysql_servers_checksums WHERE " + "hostname='%s' AND port='%d' AND name='%s'" + }; + cfmt_t wait_remote_checksums_init { + cstr_format(wait_remote_checksums_init_t, cl.host, cl.admin_port, module.c_str()) + }; + + int checksum_present = wait_for_node_sync( r_admin, { wait_remote_checksums_init.str }, CHECKSUM_SYNC_TIMEOUT); + if (checksum_present) { + diag("No checksum (or zero) detected int the target remote server for module '%s'", module.c_str()); + return EXIT_FAILURE; + } + + string cur_remote_checksum { fetch_remote_checksum(r_admin, cl, module) }; + if (cur_remote_checksum.empty()) { + diag("Failed to fetch current fetch for module '%s'", module.c_str()); + return EXIT_FAILURE; + } + + // Open the error log and fetch the final position + const string r_stderr { string(cl.workdir) + "test_cluster_sync_config/cluster_sync_node_stderr.txt" }; + fstream s_logfile {}; + + int of_err = open_file_and_seek_end(r_stderr, s_logfile); + if (of_err != EXIT_SUCCESS) { return of_err; } + + // Perform update operation + int upd_res = module_sync.update_module_val(cl, admin); + if (upd_res) { + diag("Failed to perform the update operation for module '%s'", module.c_str()); + return EXIT_FAILURE; + } + + // Wait for new checksum to be detected + cfmt_t new_remote_checksum_query { + cstr_format(new_remote_checksum_query_t, cl.host, cl.admin_port, module.c_str(), cur_remote_checksum.c_str()) + }; + int sync_res = wait_for_node_sync(r_admin, { new_remote_checksum_query.str }, CHECKSUM_SYNC_TIMEOUT); + + // Fetch the new remote checksum after the synchronization + string new_remote_checksum { fetch_remote_checksum(r_admin, cl, module) }; + if (new_remote_checksum.empty()) { + diag("Failed to fetch current fetch for module '%s'", module.c_str()); + return EXIT_FAILURE; + } + + ok( + sync_res == 0 && cur_remote_checksum != new_remote_checksum, + "New checksum SHOULD be DETECTED and SYNCED for module '%s' - old: '%s', new: '%s'", + module.c_str(), cur_remote_checksum.c_str(), new_remote_checksum.c_str() + ); + + // Get the current diff_check for the new detected checksum + cfmt_t select_diff_check { + cstr_format( + "SELECT diff_check FROM stats_proxysql_servers_checksums WHERE name='%s' AND hostname='%s' AND port=%d AND checksum='%s'", + module.c_str(), cl.host, cl.admin_port, new_remote_checksum.c_str() + ) + }; + MYSQL_QUERY_T(r_admin, select_diff_check.str.c_str()); + int64_t cur_diff_check = fetch_single_int_res(r_admin); + if (cur_diff_check == -1) { + diag("Failed to fetch current 'diff_check' for module '%s'", module.c_str()); + return EXIT_FAILURE; + } + + // We automatically fails this test if the checksum isn't even detected + if (sync_res == 0) { + MYSQL_QUERY_T(r_admin, "SELECT variable_value FROM global_variables WHERE variable_name='admin-cluster_check_interval_ms'"); + int64_t cluster_check_interval_ms = fetch_single_int_res(r_admin); + if (cluster_check_interval_ms == -1) { + diag("Failed to fetch 'cluster_check_interval_ms'"); + return EXIT_FAILURE; + } + + const double cluster_check_interval_s = static_cast(cluster_check_interval_ms) / 1000; + const int SYNC_TIMEOUT = 5; + + // Check that configuration was properly applied by checking 'runtime_checksums' for module + cfmt_t synced_runtime_checksums_query { + cstr_format(synced_runtime_checksums_query_t, module.c_str(), new_remote_checksum.c_str()) + }; + int sync_res = wait_for_node_sync(r_admin, { synced_runtime_checksums_query.str }, SYNC_TIMEOUT); + string runtime_checksum { fetch_runtime_checksum(r_admin, module.c_str()) }; + + if (diffs_sync) { + // Check that error log has a new two new entries matching the exp 'diff_checks' + const string diff_check_regex { + "Cluster: detected a peer .* with " + module + " version \\d+, epoch \\d+, diff_check \\d+." + }; + vector new_matching_lines { get_matching_lines(s_logfile, diff_check_regex) }; + diag("Regex used find loglines: `%s`", diff_check_regex.c_str()); + + for (const line_match_t& line_match : new_matching_lines) { + diag( + "Found matching logline - pos: %ld, line: `%s`", + static_cast(std::get(line_match)), + std::get(line_match).c_str() + ); + } + + ok( + diffs_sync - 1 == new_matching_lines.size(), + "Expected to find 'diff_checks minus one' loglines matching regex - diff_checks: %d, found_lines: %ld", + diffs_sync, new_matching_lines.size() + ); + + ok( + sync_res == 0 && new_remote_checksum == runtime_checksum, + "Config SHOULD be fetched and synced checksum MATCH runtime - detected: %s, runtime: %s", + new_remote_checksum.c_str(), runtime_checksum.c_str() + ); + } else { + const string no_syncing_regex { + "Cluster: detected a new checksum for " + module + " from peer .*:\\d+, version \\d+, epoch \\d+, checksum .*." + " Not syncing due to '" + module_sync.sync_variable + "=0'" + }; + + vector new_matching_lines { get_matching_lines(s_logfile, no_syncing_regex) }; + diag("Regex used find loglines: `%s`", no_syncing_regex.c_str()); + + for (const line_match_t& line_match : new_matching_lines) { + diag( + "Found matching logline - pos: %ld, line: `%s`", + static_cast(std::get(line_match)), + std::get(line_match).c_str() + ); + } + + ok( + new_matching_lines.size() == 1, + "Expected to find ONE logline matching regex - diff_checks: %d, found_lines: %ld", + diffs_sync, new_matching_lines.size() + ); + + ok( + sync_res == 1 && new_remote_checksum != runtime_checksum, + "Config SHOULDN'T be fetched and synced checksum DON'T MATCH runtime - detected: %s, runtime: %s", + new_remote_checksum.c_str(), runtime_checksum.c_str() + ); + + // Check that 'diff_check' increased + MYSQL_QUERY_T(r_admin, select_diff_check.str.c_str()); + int64_t new_diff_check = fetch_single_int_res(r_admin); + + ok( + (new_diff_check - cur_diff_check) >= (SYNC_TIMEOUT - 1) / cluster_check_interval_s, + "There needs to be at least a difference of '%lf' in diff_check - old: %ld, new: %ld", + (SYNC_TIMEOUT - 1) / cluster_check_interval_s, cur_diff_check, new_diff_check + ); + + diag("Enabling sync for module '%s'", module.c_str()); + + // TODO: Re-enable the module and check that sync takes place + MYSQL_QUERY_T(r_admin, string {"SET " + module_sync.sync_variable + "=" + std::to_string(3)}.c_str()); + MYSQL_QUERY_T(r_admin, "LOAD ADMIN VARIABLES TO RUNTIME"); + + diag("Check that sync takes place with 'admin_variables' module exception (due to newer checksum)"); + + // Wait for sync to take place and fetch the new runtime_checksum + cfmt_t synced_runtime_checksums_query { + cstr_format(synced_runtime_checksums_query_t, module.c_str(), new_remote_checksum.c_str()) + }; + int sync_res = wait_for_node_sync(r_admin, { synced_runtime_checksums_query.str }, SYNC_TIMEOUT); + string runtime_checksum { fetch_runtime_checksum(r_admin, module.c_str()) }; + + if (module != "admin_variables") { + ok( + sync_res == 0 && new_remote_checksum == runtime_checksum, + "Config SHOULD be fetched and synced checksum MATCH runtime - detected: %s, runtime: %s", + new_remote_checksum.c_str(), runtime_checksum.c_str() + ); + } else { + ok( + sync_res == 1 && new_remote_checksum != runtime_checksum, + "Config SHOULDN'T be fetched and synced checksum DON'T MATCH runtime - detected: %s, runtime: %s", + new_remote_checksum.c_str(), runtime_checksum.c_str() + ); + } + } + } + + return EXIT_SUCCESS; +} + +int check_modules_checksums_sync(MYSQL* admin, MYSQL* r_admin, const CommandLine& cl) { + const int module_diffs_sync = 2; + + for (const sync_payload_t& sync_payload : module_sync_payloads) { + const string set_query { "SET " + sync_payload.sync_variable + "=" + std::to_string(module_diffs_sync) }; + MYSQL_QUERY_T(r_admin, set_query.c_str()); + } + MYSQL_QUERY_T(r_admin, "LOAD ADMIN VARIABLES TO RUNTIME"); + + printf("\n"); + diag("Start test with sync Enabled for all modules"); + + for (const sync_payload_t& sync_payload : module_sync_payloads) { + int check_sync = check_module_checksums_sync(admin, r_admin, cl, sync_payload, module_diffs_sync); + if (check_sync) { + diag("Enabled sync test failed for module '%s'. Aborting further testing.", sync_payload.module.c_str()); + return EXIT_FAILURE; + } + } + + for (size_t dis_module = 0; dis_module < module_sync_payloads.size(); dis_module++) { + printf("\n"); + diag("Start test with sync Disabled for module '%s'", module_sync_payloads[dis_module].module.c_str()); + + for (const sync_payload_t& sync_payload : module_sync_payloads) { + const string set_query { "SET " + sync_payload.sync_variable + "=" + std::to_string(module_diffs_sync) }; + MYSQL_QUERY_T(r_admin, set_query.c_str()); + } + MYSQL_QUERY_T(r_admin, "LOAD ADMIN VARIABLES TO RUNTIME"); + + const string& module_sync_var { module_sync_payloads[dis_module].sync_variable }; + MYSQL_QUERY_T(r_admin, string {"SET " + module_sync_var + "=0"}.c_str()); + MYSQL_QUERY_T(r_admin, "LOAD ADMIN VARIABLES TO RUNTIME"); + + for (size_t j = 0; j < module_sync_payloads.size(); j++) { + const sync_payload_t& sync_payload = module_sync_payloads[j]; + const int diffs_sync = j == dis_module ? 0 : module_diffs_sync; + + int check_sync = check_module_checksums_sync(admin, r_admin, cl, sync_payload, diffs_sync); + if (check_sync) { + if (diffs_sync) { + diag("Enabled sync test failed for module '%s'. Aborting further testing.", sync_payload.module.c_str()); + } else { + diag("Disabled sync test failed for module '%s'. Aborting further testing.", sync_payload.module.c_str()); + } + return EXIT_FAILURE; + } + } + } + + return EXIT_SUCCESS; +} + int main(int, char**) { int res = 0; CommandLine cl; @@ -401,7 +940,15 @@ int main(int, char**) { return EXIT_FAILURE; } - plan(15); + const size_t num_payloads = module_sync_payloads.size(); + plan( + // Sync tests by values + 15 + + // All modules enabled sync checksum tests + num_payloads * 3 + + // Module with disabled sync checksum tests + (num_payloads + ((num_payloads-1) * 3)) * 5 + ); const std::string fmt_config_file = std::string(cl.workdir) + "test_cluster_sync_config/test_cluster_sync.cnf"; @@ -460,21 +1007,23 @@ int main(int, char**) { // Launch proxysql with cluster config std::thread proxy_replica_th([&save_proxy_stderr, &cl] () { - const std::string cluster_sync_node_stderr { - std::string(cl.workdir) + "test_cluster_sync_config/cluster_sync_node_stderr.txt" - }; + const string replica_stderr { string(cl.workdir) + "test_cluster_sync_config/cluster_sync_node_stderr.txt" }; const std::string proxysql_db = std::string(cl.workdir) + "test_cluster_sync_config/proxysql.db"; const std::string stats_db = std::string(cl.workdir) + "test_cluster_sync_config/proxysql_stats.db"; const std::string fmt_config_file = std::string(cl.workdir) + "test_cluster_sync_config/test_cluster_sync.cnf"; std::string proxy_stdout {}; std::string proxy_stderr {}; - int exec_res = wexecvp( - std::string(cl.workdir) + "../../../src/proxysql", { "-f", "-M", "-c", fmt_config_file.c_str() }, {}, - proxy_stdout, proxy_stderr - ); + const string proxy_binary_path { string { cl.workdir } + "../../../src/proxysql" }; - ok(exec_res == 0, "proxysql cluster node should execute and shutdown nicely. 'wexecvp' result was: %d", exec_res); + const string proxy_command { + proxy_binary_path + " -f -M -c " + fmt_config_file + " > " + replica_stderr + " 2>&1" + }; + + diag("Launching replica ProxySQL via 'system' with command: `%s`", proxy_command.c_str()); + int exec_res = system(proxy_command.c_str()); + + ok(exec_res == 0, "proxysql cluster node should execute and shutdown nicely. 'system' result was: %d", exec_res); // In case of error place in log the reason if (exec_res || save_proxy_stderr.load()) { @@ -485,12 +1034,6 @@ int main(int, char**) { } } - // Always log child process output to file - std::ofstream error_log_file {}; - error_log_file.open(cluster_sync_node_stderr); - error_log_file << proxy_stderr; - error_log_file.close(); - remove(proxysql_db.c_str()); remove(stats_db.c_str()); }); @@ -1744,6 +2287,16 @@ int main(int, char**) { MYSQL_QUERY__(proxy_admin, "LOAD ADMIN VARIABLES TO RUNTIME"); } + sleep(2); + + // Check sync disable via 'admin-cluster_*_sync' variables + { + int checksum_sync_res = check_modules_checksums_sync(proxy_admin, r_proxy_admin, cl); + if (checksum_sync_res != EXIT_SUCCESS) { + goto cleanup; + } + } + cleanup: // Teardown config