diff --git a/test/tap/tap/utils.cpp b/test/tap/tap/utils.cpp index 2e4383b3c..fe50ae581 100644 --- a/test/tap/tap/utils.cpp +++ b/test/tap/tap/utils.cpp @@ -1741,42 +1741,49 @@ pair fetch_conn_stats(MYSQL* admin, const vector hgs } } -int wait_for_cond(MYSQL* mysql, const std::string& query, uint32_t timeout) { - int result = EXIT_FAILURE; +int check_cond(MYSQL* mysql, const string& q) { + diag("Checking condition '%s' in ('%s':%d)", q.c_str(), mysql->host, mysql->port); - auto start = std::chrono::system_clock::now(); - std::chrono::duration elapsed {}; + int rc = mysql_query(mysql, q.c_str()); + int res = 1; - while (elapsed.count() < timeout && result == EXIT_FAILURE) { - int rc = mysql_query(mysql, query.c_str()); - fprintf( - stderr, "# %s: Waiting for condition '%s' in ('%s':%d)\n", - get_formatted_time().c_str(), query.c_str(), mysql->host, mysql->port - ); + if (rc == 0) { + MYSQL_RES* myres = mysql_store_result(mysql); - if (rc == EXIT_SUCCESS) { - MYSQL_RES* myres = mysql_store_result(mysql); - if (myres) { - uint32_t field_num = mysql_num_fields(myres); - uint32_t row_num = mysql_num_rows(myres); + if (myres) { + uint32_t field_num = mysql_num_fields(myres); + uint32_t row_num = mysql_num_rows(myres); - if (field_num == 1 && row_num == 1) { - MYSQL_ROW row = mysql_fetch_row(myres); + if (field_num == 1 && row_num == 1) { + MYSQL_ROW myrow = mysql_fetch_row(myres); - if (row && strcasecmp("TRUE", row[0]) == 0) { - result = EXIT_SUCCESS; - } + if (myrow && strcasecmp("TRUE", myrow[0]) == 0) { + res = 0; + } else if (myrow && atoi(myrow[0]) >= 1) { + res = 0; } + } + } + } else { + diag("Check failed with error '%s'", mysql_error(mysql)); + res = -1; + } - mysql_free_result(myres); + return res; +} - if (result == EXIT_SUCCESS) { - break; - } - } - } else { - diag("Condition query failed with error: ('%d','%s')", mysql_errno(mysql), mysql_error(mysql)); - result = EXIT_FAILURE; +int wait_for_cond(MYSQL* mysql, const string& q, uint32_t to) { + diag("Waiting for condition '%s' in ('%s':%d)", q.c_str(), mysql->host, mysql->port); + + int result = 1; + std::chrono::duration elapsed {}; + + auto start = std::chrono::system_clock::now(); + + while (elapsed.count() < to && result == EXIT_FAILURE) { + result = check_cond(mysql, q); + + if (result == 0 || result == -1) { break; } @@ -1789,6 +1796,74 @@ int wait_for_cond(MYSQL* mysql, const std::string& query, uint32_t timeout) { return result; } +vector wait_for_conds(MYSQL* mysql, const vector& qs, uint32_t to) { + diag("Waiting multiple conditions in ('%s':%d):", mysql->host, mysql->port); + for (const string& q : qs) { + diag(" - cond: '%s'", q.c_str()); + } + + std::chrono::duration elapsed {}; + + vector res {}; + std::transform(qs.begin(), qs.end(), std::back_inserter(res), + [] (const string& q) { + return check_res_t { 1, q }; + } + ); + auto start = std::chrono::system_clock::now(); + + while (elapsed.count() < to) { + int chk_res = 0; + + for (std::size_t i = 0; i < qs.size(); i++) { + chk_res = check_cond(mysql, qs[i]); + + if (chk_res == -1) { + diag("Error during query. Aborting further checks"); + res[i].first = -1; + break; + } else if (chk_res == 0) { + res[i].first = 0; + } + } + + int acc = std::accumulate(res.begin(), res.end(), size_t(0), + [] (size_t acc, const check_res_t& p) -> size_t { + if (p.first == 0) { + return acc + 1; + } else { + return acc; + } + }); + + if (acc == qs.size() || chk_res == -1) { + break; + } else { + usleep(500 * 1000); + auto it_end = std::chrono::system_clock::now(); + elapsed = it_end - start; + } + } + + return res; +} + +int proc_wait_checks(const vector& chks) { + int res = 0; + + for (const check_res_t& r : chks) { + if (r.first == -1) { + res = -1; + diag("Waiting check FAILED to execute '%s'", r.second.c_str()); + } else if (r.first == 1) { + res = res == 0 ? 1 : res; + diag("Waiting check TIMEOUT '%s'", r.second.c_str()); + } + } + + return res; +} + void check_conn_count(MYSQL* admin, const string& conn_type, uint32_t conn_num, int32_t hg) { const string hg_s { to_string(hg) }; const string conn_num_s { to_string(conn_num) }; @@ -1858,8 +1933,67 @@ void check_query_count(MYSQL* admin, vector queries, uint32_t hg) { } }; -const char* get_env_str(const char* envname, const char* envdefault) { +pair> fetch_cluster_nodes(MYSQL* admin, bool dump_fetch) { + int rc = mysql_query_t(admin, "SELECT hostname,port FROM proxysql_servers"); + if (rc) { return { static_cast(mysql_errno(admin)), {} }; } + + MYSQL_RES* myres = mysql_store_result(admin); + if (myres == NULL) { + diag("Storing resultset failed error:%s", mysql_error(admin)); + return { static_cast(mysql_errno(admin)), {} }; + } + + if (dump_fetch) { + const string res_table { dump_as_table(myres) }; + diag("Dumping fetched cluster nodes:"); + + printf("%s", res_table.c_str()); + } + + vector nodes_rows { extract_mysql_rows(myres) }; + mysql_free_result(myres); + + vector nodes {}; + std::transform(nodes_rows.begin(), nodes_rows.end(), std::back_inserter(nodes), + [] (const mysql_res_row& row) { + return srv_addr_t { row[0], std::atoi(row[1].c_str()) }; + } + ); + + return { 0, nodes }; +} +int check_nodes_sync( + const CommandLine& cl, const vector& nodes, const string& check, uint32_t to +) { + for (const auto& node : nodes) { + MYSQL* admin = mysql_init(NULL); + + if ( + !mysql_real_connect( + admin, node.host.c_str(), cl.admin_username, cl.admin_password, NULL, node.port, NULL, 0 + ) + ) { + diag("File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin)); + return EXIT_FAILURE; + } + + const vector wres { wait_for_conds(admin, { check }, to) }; + int node_sync = proc_wait_checks(wres); + + if (node_sync != EXIT_SUCCESS) { + const string err { "Node '" + node.host + ":" + std::to_string(node.port) + "' sync timed out" }; + diag("File %s, line %d, Error: %s\n", __FILE__, __LINE__, err.c_str()); + return EXIT_FAILURE; + } + + mysql_close(admin); + } + + return EXIT_SUCCESS; +} + +const char* get_env_str(const char* envname, const char* envdefault) { const char* envval = std::getenv(envname); if (envval != NULL) @@ -1869,13 +2003,11 @@ const char* get_env_str(const char* envname, const char* envdefault) { }; int get_env_int(const char* envname, int envdefault) { - const char* envval = std::getenv(envname); int res = envdefault; if (envval != NULL) res = strtol(envval, NULL, 0); -// diag("%s: %s='%s' >>> %d", __FUNCTION__, envname, envval, res); return res; }; @@ -1903,7 +2035,5 @@ bool get_env_bool(const char* envname, bool envdefault) { } } -// diag("%s: %s='%s' >>> %d", __FUNCTION__, envname, envval, res); - return (bool) res; }; diff --git a/test/tap/tap/utils.h b/test/tap/tap/utils.h index 3456c478b..e1c753f4f 100644 --- a/test/tap/tap/utils.h +++ b/test/tap/tap/utils.h @@ -100,8 +100,7 @@ int mysql_query_t__(MYSQL* mysql, const char* query, const char* f, int ln, cons #define MYSQL_QUERY_T(mysql, query) \ do { \ - const std::string time { get_formatted_time() }; \ - fprintf(stderr, "# %s: Issuing query '%s' to ('%s':%d)\n", time.c_str(), query, mysql->host, mysql->port); \ + diag("Issuing query '%s' to ('%s':%d)", query, mysql->host, mysql->port); \ if (mysql_query(mysql, query)) { \ fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(mysql)); \ return EXIT_FAILURE; \ @@ -231,7 +230,7 @@ std::string get_ext_val_err(MYSQL* mysql, const ext_val_t& ext_val) { } else if (ext_val.err == -2) { return "Failed to parse response value '" + ext_val.str + "'"; } else { - return "Query failed with error '" + std::string { mysql_error(mysql) } + "'"; + return std::string { mysql_error(mysql) }; } } @@ -731,11 +730,56 @@ std::pair fetch_conn_stats(MYSQL* admin, const std::vector; + +/** + * @brief Waits for multiple conditions to take place before returning. + * @param mysql Already oppened connection in which to execute the queries. + * @param qs Conditions represented as queries; must pass 'check_cond' requirements. + * @param to Timeout in which all the conditions should be accomplished. + * @return Vector of pairs of shape '{err, check}'. + */ +std::vector wait_for_conds(MYSQL* mysql, const std::vector& qs, uint32_t to); + +/** + * @brief Reduces a vector of 'check_res_t' to either success or failure. + * @param chks Vector to be fold into single value. + * @return -1 in case a check failed to execute, 1 if any check timedout, 0 for success. + */ +int proc_wait_checks(const std::vector& chks); + +/** + * @brief Encapsulates a server address. + */ +struct srv_addr_t { + const std::string host; + const int port; +}; + // Helpers using 'wait_for_cond' on 'stats_mysql_connection' void check_conn_count(MYSQL* admin, const std::string& conn_type, uint32_t conn_num, int32_t hg=-1); void check_query_count(MYSQL* admin, uint32_t queries, uint32_t hg); void check_query_count(MYSQL* admin, std::vector queries, uint32_t hg); +/** + * @brief Fetches the ProxySQL nodes configured in the supplied instance. + * @param cl Parameters for performing the connection to the instance. + * @return Pair of shape '{err, {srv_addr}}'. + */ +std::pair> fetch_cluster_nodes(MYSQL* admin, bool dump_fetch=false); + +/** + * @brief Helper function that waits for a check in all the supplied nodes. + * @param cl Used for credentials to open conns to the nodes. + * @param nodes The nodes addresses in which to perform the checks. + * @param check The check itself to be performed in all the nodes. Must pass 'check_cond' requirements. + * @param to Timeout for synchronization to take place. + * @return 0 in case of success, 1 in case of timeout, and -1 in case of check failure. + */ +int check_nodes_sync( + const CommandLine& cl, const std::vector& nodes, const std::string& check, uint32_t to +); + /** * @brief fetches and converts env var value to str/int/bool if possible otherwise uses default * @details helper function for fetching str/int/bool from env diff --git a/test/tap/tests/test_cluster_sync_mysql_servers-t.cpp b/test/tap/tests/test_cluster_sync_mysql_servers-t.cpp index 4a84270c2..6b4c8f4f9 100644 --- a/test/tap/tests/test_cluster_sync_mysql_servers-t.cpp +++ b/test/tap/tests/test_cluster_sync_mysql_servers-t.cpp @@ -24,20 +24,16 @@ */ #include -#include #include #include #include -#include #include #include #include #include #include -#include #include -#include #include #include "libconfig.h" @@ -53,7 +49,9 @@ #include "command_line.h" #include "utils.h" +using std::pair; using std::string; +using std::vector; #define MYSQL_QUERY__(mysql, query) \ do { \ @@ -132,83 +130,6 @@ uint64_t mysql_servers_raw_checksum(MYSQL_RES* resultset) { return res_hash; } - -/** - * @brief Helper function to verify that the sync of a table (or variable) have been performed. - * - * @param r_proxy_admin An already opened connection to ProxySQL. - * @param queries Queries to be executed that should return a **non-zero** value after the sync has taken place. - * @param sync_timeout Timeout for the sync to happen. - * - * @return EXIT_SUCCESS in case of success, otherwise: - * - '-1' if a query against Admin fails to be performed (failure is logged). - * - '-2' if timeout expired without sync being completed. - */ -int sync_checker(MYSQL* r_proxy_admin, const std::vector& queries, uint32_t sync_timeout) { - bool not_synced_query = false; - uint waited = 0; - - while (waited < sync_timeout) { - not_synced_query = false; - - // Check that all the entries have been synced - for (const auto& query : queries) { - int q_res = mysql_query(r_proxy_admin, query.c_str()); - if (q_res != EXIT_SUCCESS) { - fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(r_proxy_admin)); - return -1; - } - - MYSQL_RES* proxysql_servers_res = mysql_store_result(r_proxy_admin); - MYSQL_ROW row = mysql_fetch_row(proxysql_servers_res); - int row_value = atoi(row[0]); - mysql_free_result(proxysql_servers_res); - - if (row_value == 0) { - not_synced_query = true; - break; - } - } - - if (not_synced_query) { - waited += 1; - sleep(1); - } else { - break; - } - } - - if (not_synced_query) { - return -2; - } else { - return EXIT_SUCCESS; - } -} - -int check_nodes_sync( - const CommandLine& cl, const std::vector& core_nodes, const std::string& check_query, uint32_t sync_timeout -) { - for (const auto& node : core_nodes) { - const std::string host { node[0] }; - const int port = std::stol(node[1]); - - MYSQL* c_node_admin = mysql_init(NULL); - if (!mysql_real_connect(c_node_admin, host.c_str(), cl.admin_username, cl.admin_password, NULL, port, NULL, 0)) { - fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(c_node_admin)); - return EXIT_FAILURE; - } - - int not_synced = sync_checker(c_node_admin, { check_query }, sync_timeout); - if (not_synced != EXIT_SUCCESS) { - const std::string err_msg { "Node '" + host + ":" + std::to_string(port) + "' sync timed out" }; - fprintf(stderr, "File %s, line %d, Error: `%s`\n", __FILE__, __LINE__, err_msg.c_str()); - return EXIT_FAILURE; - } - } - - return EXIT_SUCCESS; -} - int insert_mysql_servers_records(MYSQL* proxy_admin, const std::vector& insert_mysql_servers_values, const std::vector& insert_replication_hostgroups_values) { @@ -860,15 +781,15 @@ int main(int, char**) { // 2. Remove primary from Core nodes MYSQL_QUERY(proxy_admin, "DELETE FROM proxysql_servers WHERE hostname=='127.0.0.1' AND PORT==6032"); MYSQL_QUERY(proxy_admin, "LOAD PROXYSQL SERVERS TO RUNTIME"); - MYSQL_QUERY(proxy_admin, "SELECT hostname,port FROM proxysql_servers"); - MYSQL_RES* my_res = mysql_store_result(proxy_admin); - std::vector core_nodes { extract_mysql_rows(my_res) }; - mysql_free_result(my_res); + + pair> core_nodes { fetch_cluster_nodes(proxy_admin) }; + if (core_nodes.first) { return EXIT_FAILURE; } // 2.1 If core nodes are not reachable, assume no cluster is running; make test gracefully exit - if (core_nodes.size()) { - const string host { core_nodes[0][0] }; - const int port = std::stol(core_nodes[0][1]); + if (core_nodes.second.size()) { + const string host { core_nodes.second[0].host }; + const int port { core_nodes.second[0].port }; + MYSQL* c_node_admin = mysql_init(NULL); if (!mysql_real_connect(c_node_admin, host.c_str(), cl.admin_username, cl.admin_password, NULL, port, NULL, 0)) { @@ -894,7 +815,7 @@ int main(int, char**) { check_no_primary_query, cl.host, cl.admin_port ); - int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT); + int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT); if (check_res != EXIT_SUCCESS) { return EXIT_FAILURE; } // 4. Remove all current servers from primary instance (only secondary sync matters) @@ -1048,9 +969,9 @@ cleanup: insert_query, cl.host, cl.admin_port ); - for (const auto& row : core_nodes) { - const std::string host { row[0] }; - const int port = std::stol(row[1]); + for (const auto& node : core_nodes.second) { + const std::string host { node.host }; + const int port = node.port; MYSQL* c_node_admin = mysql_init(NULL); diag("RESTORING: Inserting into node '%s:%d'", host.c_str(), port); @@ -1084,7 +1005,7 @@ cleanup: ); // Wait for the other nodes to sync ProxySQL servers to include Primary - int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT); + int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT); if (check_res != EXIT_SUCCESS) { return EXIT_FAILURE; } // Recover the old ProxySQL servers from backup in primary diff --git a/test/tap/tests/test_read_only_actions_offline_hard_servers-t.cpp b/test/tap/tests/test_read_only_actions_offline_hard_servers-t.cpp index 90380e20f..7ab4af208 100644 --- a/test/tap/tests/test_read_only_actions_offline_hard_servers-t.cpp +++ b/test/tap/tests/test_read_only_actions_offline_hard_servers-t.cpp @@ -1,8 +1,8 @@ - #include -#include #include #include +#include +#include #include "mysql.h" #include "tap.h" @@ -15,6 +15,9 @@ //#define BACKEND_SERVER_USER "root" //#define BACKEND_SERVER_PASS "root" +using std::vector; +using std::pair; + #define MYSQL_QUERY__(mysql, query) \ do { \ if (mysql_query(mysql, query)) { \ @@ -49,87 +52,6 @@ __exit: return mysql; } -/** - * @brief Helper function to verify that the sync of a table (or variable) have been performed. - * - * @param r_proxy_admin An already opened connection to ProxySQL. - * @param queries Queries to be executed that should return a **non-zero** value after the sync has taken place. - * @param sync_timeout Timeout for the sync to happen. - * - * @return EXIT_SUCCESS in case of success, otherwise: - * - '-1' if a query against Admin fails to be performed (failure is logged). - * - '-2' if timeout expired without sync being completed. - */ -int sync_checker(MYSQL* r_proxy_admin, const std::vector& queries, uint32_t sync_timeout) { - bool not_synced_query = false; - uint waited = 0; - - while (waited < sync_timeout) { - not_synced_query = false; - - // Check that all the entries have been synced - for (const auto& query : queries) { - int q_res = mysql_query(r_proxy_admin, query.c_str()); - if (q_res != EXIT_SUCCESS) { - fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(r_proxy_admin)); - return -1; - } - - MYSQL_RES* proxysql_servers_res = mysql_store_result(r_proxy_admin); - MYSQL_ROW row = mysql_fetch_row(proxysql_servers_res); - int row_value = atoi(row[0]); - mysql_free_result(proxysql_servers_res); - - if (row_value == 0) { - not_synced_query = true; - break; - } - } - - if (not_synced_query) { - waited += 1; - sleep(1); - } else { - break; - } - } - - if (not_synced_query) { - return -2; - } else { - return EXIT_SUCCESS; - } -} - -int check_nodes_sync( - const CommandLine& cl, const std::vector& core_nodes, const std::string& check_query, uint32_t sync_timeout -) { - int ret_status = EXIT_FAILURE; - - for (const auto& node : core_nodes) { - const std::string host { node[0] }; - const int port = std::stol(node[1]); - - MYSQL* c_node_admin = mysql_init(NULL); - if (!mysql_real_connect(c_node_admin, host.c_str(), cl.admin_username, cl.admin_password, NULL, port, NULL, 0)) { - fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(c_node_admin)); - goto __exit; - } - - int not_synced = sync_checker(c_node_admin, { check_query }, sync_timeout); - if (not_synced != EXIT_SUCCESS) { - const std::string err_msg { "Node '" + host + ":" + std::to_string(port) + "' sync timed out" }; - fprintf(stderr, "File %s, line %d, Error: `%s`\n", __FILE__, __LINE__, err_msg.c_str()); - goto __exit; - } - } - - ret_status = EXIT_SUCCESS; - -__exit: - return ret_status; -} - int insert_mysql_servers_records(MYSQL* proxy_admin, const std::vector& insert_mysql_servers_values, const std::vector& insert_replication_hostgroups_values) { @@ -535,7 +457,7 @@ cleanup: int test_read_only_offline_hard_servers(MYSQL* proxy_admin, const CommandLine& cl, bool isolate_primary_node) { - std::vector core_nodes; + pair> core_nodes; std::string check_no_primary_query; if (isolate_primary_node) { @@ -553,10 +475,9 @@ int test_read_only_offline_hard_servers(MYSQL* proxy_admin, const CommandLine& c // 2. Remove primary from Core nodes MYSQL_QUERY__(proxy_admin, "DELETE FROM proxysql_servers WHERE hostname=='127.0.0.1' AND PORT==6032"); MYSQL_QUERY__(proxy_admin, "LOAD PROXYSQL SERVERS TO RUNTIME"); - MYSQL_QUERY__(proxy_admin, "SELECT hostname,port FROM proxysql_servers"); - MYSQL_RES* my_res = mysql_store_result(proxy_admin); - core_nodes = { extract_mysql_rows(my_res) }; - mysql_free_result(my_res); + + core_nodes = fetch_cluster_nodes(proxy_admin); + if (core_nodes.first) { goto cleanup; } // 3. Wait for all Core nodes to sync (confirm primary out of core nodes) string_format( @@ -564,7 +485,7 @@ int test_read_only_offline_hard_servers(MYSQL* proxy_admin, const CommandLine& c check_no_primary_query, cl.host, cl.admin_port ); - int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT); + int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT); if (check_res != EXIT_SUCCESS) { goto cleanup; } @@ -601,9 +522,9 @@ cleanup: insert_query, cl.host, cl.admin_port ); - for (const auto& row : core_nodes) { - const std::string host{ row[0] }; - const int port = std::stol(row[1]); + for (const auto& row : core_nodes.second) { + const std::string host{ row.host }; + const int port = row.port; MYSQL* c_node_admin = mysql_init(NULL); diag("RESTORING: Inserting into node '%s:%d'", host.c_str(), port); @@ -637,7 +558,7 @@ cleanup: ); // Wait for the other nodes to sync ProxySQL servers to include Primary - int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT); + int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT); if (check_res != EXIT_SUCCESS) { return EXIT_FAILURE; } // Recover the old ProxySQL servers from backup in primary