Refactor several TAP tests utilities

- Simplified and unified condition waiting checks.
- Added new helper function for fetching cluster core nodes.
pull/4588/head
Javier Jaramago Fernández 2 years ago
parent a0841714e5
commit cca2a88fa4

@ -1741,42 +1741,49 @@ pair<int,pool_state_t> fetch_conn_stats(MYSQL* admin, const vector<uint32_t> hgs
}
}
int wait_for_cond(MYSQL* mysql, const std::string& query, uint32_t timeout) {
int result = EXIT_FAILURE;
int check_cond(MYSQL* mysql, const string& q) {
diag("Checking condition '%s' in ('%s':%d)", q.c_str(), mysql->host, mysql->port);
auto start = std::chrono::system_clock::now();
std::chrono::duration<double> elapsed {};
int rc = mysql_query(mysql, q.c_str());
int res = 1;
while (elapsed.count() < timeout && result == EXIT_FAILURE) {
int rc = mysql_query(mysql, query.c_str());
fprintf(
stderr, "# %s: Waiting for condition '%s' in ('%s':%d)\n",
get_formatted_time().c_str(), query.c_str(), mysql->host, mysql->port
);
if (rc == 0) {
MYSQL_RES* myres = mysql_store_result(mysql);
if (rc == EXIT_SUCCESS) {
MYSQL_RES* myres = mysql_store_result(mysql);
if (myres) {
uint32_t field_num = mysql_num_fields(myres);
uint32_t row_num = mysql_num_rows(myres);
if (myres) {
uint32_t field_num = mysql_num_fields(myres);
uint32_t row_num = mysql_num_rows(myres);
if (field_num == 1 && row_num == 1) {
MYSQL_ROW row = mysql_fetch_row(myres);
if (field_num == 1 && row_num == 1) {
MYSQL_ROW myrow = mysql_fetch_row(myres);
if (row && strcasecmp("TRUE", row[0]) == 0) {
result = EXIT_SUCCESS;
}
if (myrow && strcasecmp("TRUE", myrow[0]) == 0) {
res = 0;
} else if (myrow && atoi(myrow[0]) >= 1) {
res = 0;
}
}
}
} else {
diag("Check failed with error '%s'", mysql_error(mysql));
res = -1;
}
mysql_free_result(myres);
return res;
}
if (result == EXIT_SUCCESS) {
break;
}
}
} else {
diag("Condition query failed with error: ('%d','%s')", mysql_errno(mysql), mysql_error(mysql));
result = EXIT_FAILURE;
int wait_for_cond(MYSQL* mysql, const string& q, uint32_t to) {
diag("Waiting for condition '%s' in ('%s':%d)", q.c_str(), mysql->host, mysql->port);
int result = 1;
std::chrono::duration<double> elapsed {};
auto start = std::chrono::system_clock::now();
while (elapsed.count() < to && result == EXIT_FAILURE) {
result = check_cond(mysql, q);
if (result == 0 || result == -1) {
break;
}
@ -1789,6 +1796,74 @@ int wait_for_cond(MYSQL* mysql, const std::string& query, uint32_t timeout) {
return result;
}
vector<check_res_t> wait_for_conds(MYSQL* mysql, const vector<string>& qs, uint32_t to) {
diag("Waiting multiple conditions in ('%s':%d):", mysql->host, mysql->port);
for (const string& q : qs) {
diag(" - cond: '%s'", q.c_str());
}
std::chrono::duration<double> elapsed {};
vector<check_res_t> res {};
std::transform(qs.begin(), qs.end(), std::back_inserter(res),
[] (const string& q) {
return check_res_t { 1, q };
}
);
auto start = std::chrono::system_clock::now();
while (elapsed.count() < to) {
int chk_res = 0;
for (std::size_t i = 0; i < qs.size(); i++) {
chk_res = check_cond(mysql, qs[i]);
if (chk_res == -1) {
diag("Error during query. Aborting further checks");
res[i].first = -1;
break;
} else if (chk_res == 0) {
res[i].first = 0;
}
}
int acc = std::accumulate(res.begin(), res.end(), size_t(0),
[] (size_t acc, const check_res_t& p) -> size_t {
if (p.first == 0) {
return acc + 1;
} else {
return acc;
}
});
if (acc == qs.size() || chk_res == -1) {
break;
} else {
usleep(500 * 1000);
auto it_end = std::chrono::system_clock::now();
elapsed = it_end - start;
}
}
return res;
}
int proc_wait_checks(const vector<check_res_t>& chks) {
int res = 0;
for (const check_res_t& r : chks) {
if (r.first == -1) {
res = -1;
diag("Waiting check FAILED to execute '%s'", r.second.c_str());
} else if (r.first == 1) {
res = res == 0 ? 1 : res;
diag("Waiting check TIMEOUT '%s'", r.second.c_str());
}
}
return res;
}
void check_conn_count(MYSQL* admin, const string& conn_type, uint32_t conn_num, int32_t hg) {
const string hg_s { to_string(hg) };
const string conn_num_s { to_string(conn_num) };
@ -1858,8 +1933,67 @@ void check_query_count(MYSQL* admin, vector<uint32_t> queries, uint32_t hg) {
}
};
const char* get_env_str(const char* envname, const char* envdefault) {
pair<int,vector<srv_addr_t>> fetch_cluster_nodes(MYSQL* admin, bool dump_fetch) {
int rc = mysql_query_t(admin, "SELECT hostname,port FROM proxysql_servers");
if (rc) { return { static_cast<int>(mysql_errno(admin)), {} }; }
MYSQL_RES* myres = mysql_store_result(admin);
if (myres == NULL) {
diag("Storing resultset failed error:%s", mysql_error(admin));
return { static_cast<int>(mysql_errno(admin)), {} };
}
if (dump_fetch) {
const string res_table { dump_as_table(myres) };
diag("Dumping fetched cluster nodes:");
printf("%s", res_table.c_str());
}
vector<mysql_res_row> nodes_rows { extract_mysql_rows(myres) };
mysql_free_result(myres);
vector<srv_addr_t> nodes {};
std::transform(nodes_rows.begin(), nodes_rows.end(), std::back_inserter(nodes),
[] (const mysql_res_row& row) {
return srv_addr_t { row[0], std::atoi(row[1].c_str()) };
}
);
return { 0, nodes };
}
int check_nodes_sync(
const CommandLine& cl, const vector<srv_addr_t>& nodes, const string& check, uint32_t to
) {
for (const auto& node : nodes) {
MYSQL* admin = mysql_init(NULL);
if (
!mysql_real_connect(
admin, node.host.c_str(), cl.admin_username, cl.admin_password, NULL, node.port, NULL, 0
)
) {
diag("File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(admin));
return EXIT_FAILURE;
}
const vector<check_res_t> wres { wait_for_conds(admin, { check }, to) };
int node_sync = proc_wait_checks(wres);
if (node_sync != EXIT_SUCCESS) {
const string err { "Node '" + node.host + ":" + std::to_string(node.port) + "' sync timed out" };
diag("File %s, line %d, Error: %s\n", __FILE__, __LINE__, err.c_str());
return EXIT_FAILURE;
}
mysql_close(admin);
}
return EXIT_SUCCESS;
}
const char* get_env_str(const char* envname, const char* envdefault) {
const char* envval = std::getenv(envname);
if (envval != NULL)
@ -1869,13 +2003,11 @@ const char* get_env_str(const char* envname, const char* envdefault) {
};
int get_env_int(const char* envname, int envdefault) {
const char* envval = std::getenv(envname);
int res = envdefault;
if (envval != NULL)
res = strtol(envval, NULL, 0);
// diag("%s: %s='%s' >>> %d", __FUNCTION__, envname, envval, res);
return res;
};
@ -1903,7 +2035,5 @@ bool get_env_bool(const char* envname, bool envdefault) {
}
}
// diag("%s: %s='%s' >>> %d", __FUNCTION__, envname, envval, res);
return (bool) res;
};

@ -100,8 +100,7 @@ int mysql_query_t__(MYSQL* mysql, const char* query, const char* f, int ln, cons
#define MYSQL_QUERY_T(mysql, query) \
do { \
const std::string time { get_formatted_time() }; \
fprintf(stderr, "# %s: Issuing query '%s' to ('%s':%d)\n", time.c_str(), query, mysql->host, mysql->port); \
diag("Issuing query '%s' to ('%s':%d)", query, mysql->host, mysql->port); \
if (mysql_query(mysql, query)) { \
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(mysql)); \
return EXIT_FAILURE; \
@ -231,7 +230,7 @@ std::string get_ext_val_err(MYSQL* mysql, const ext_val_t<T>& ext_val) {
} else if (ext_val.err == -2) {
return "Failed to parse response value '" + ext_val.str + "'";
} else {
return "Query failed with error '" + std::string { mysql_error(mysql) } + "'";
return std::string { mysql_error(mysql) };
}
}
@ -731,11 +730,56 @@ std::pair<int,pool_state_t> fetch_conn_stats(MYSQL* admin, const std::vector<uin
*/
int wait_for_cond(MYSQL* mysql, const std::string& query, uint32_t timeout);
using check_res_t = std::pair<int,std::string>;
/**
* @brief Waits for multiple conditions to take place before returning.
* @param mysql Already oppened connection in which to execute the queries.
* @param qs Conditions represented as queries; must pass 'check_cond' requirements.
* @param to Timeout in which all the conditions should be accomplished.
* @return Vector of pairs of shape '{err, check}'.
*/
std::vector<check_res_t> wait_for_conds(MYSQL* mysql, const std::vector<std::string>& qs, uint32_t to);
/**
* @brief Reduces a vector of 'check_res_t' to either success or failure.
* @param chks Vector to be fold into single value.
* @return -1 in case a check failed to execute, 1 if any check timedout, 0 for success.
*/
int proc_wait_checks(const std::vector<check_res_t>& chks);
/**
* @brief Encapsulates a server address.
*/
struct srv_addr_t {
const std::string host;
const int port;
};
// Helpers using 'wait_for_cond' on 'stats_mysql_connection'
void check_conn_count(MYSQL* admin, const std::string& conn_type, uint32_t conn_num, int32_t hg=-1);
void check_query_count(MYSQL* admin, uint32_t queries, uint32_t hg);
void check_query_count(MYSQL* admin, std::vector<uint32_t> queries, uint32_t hg);
/**
* @brief Fetches the ProxySQL nodes configured in the supplied instance.
* @param cl Parameters for performing the connection to the instance.
* @return Pair of shape '{err, {srv_addr}}'.
*/
std::pair<int,std::vector<srv_addr_t>> fetch_cluster_nodes(MYSQL* admin, bool dump_fetch=false);
/**
* @brief Helper function that waits for a check in all the supplied nodes.
* @param cl Used for credentials to open conns to the nodes.
* @param nodes The nodes addresses in which to perform the checks.
* @param check The check itself to be performed in all the nodes. Must pass 'check_cond' requirements.
* @param to Timeout for synchronization to take place.
* @return 0 in case of success, 1 in case of timeout, and -1 in case of check failure.
*/
int check_nodes_sync(
const CommandLine& cl, const std::vector<srv_addr_t>& nodes, const std::string& check, uint32_t to
);
/**
* @brief fetches and converts env var value to str/int/bool if possible otherwise uses default
* @details helper function for fetching str/int/bool from env

@ -24,20 +24,16 @@
*/
#include <unistd.h>
#include <signal.h>
#include <pthread.h>
#include <stdio.h>
#include <time.h>
#include <errno.h>
#include <atomic>
#include <vector>
#include <string>
#include <thread>
#include <iostream>
#include <fstream>
#include <functional>
#include <regex>
#include <utility>
#include "libconfig.h"
@ -53,7 +49,9 @@
#include "command_line.h"
#include "utils.h"
using std::pair;
using std::string;
using std::vector;
#define MYSQL_QUERY__(mysql, query) \
do { \
@ -132,83 +130,6 @@ uint64_t mysql_servers_raw_checksum(MYSQL_RES* resultset) {
return res_hash;
}
/**
* @brief Helper function to verify that the sync of a table (or variable) have been performed.
*
* @param r_proxy_admin An already opened connection to ProxySQL.
* @param queries Queries to be executed that should return a **non-zero** value after the sync has taken place.
* @param sync_timeout Timeout for the sync to happen.
*
* @return EXIT_SUCCESS in case of success, otherwise:
* - '-1' if a query against Admin fails to be performed (failure is logged).
* - '-2' if timeout expired without sync being completed.
*/
int sync_checker(MYSQL* r_proxy_admin, const std::vector<std::string>& queries, uint32_t sync_timeout) {
bool not_synced_query = false;
uint waited = 0;
while (waited < sync_timeout) {
not_synced_query = false;
// Check that all the entries have been synced
for (const auto& query : queries) {
int q_res = mysql_query(r_proxy_admin, query.c_str());
if (q_res != EXIT_SUCCESS) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(r_proxy_admin));
return -1;
}
MYSQL_RES* proxysql_servers_res = mysql_store_result(r_proxy_admin);
MYSQL_ROW row = mysql_fetch_row(proxysql_servers_res);
int row_value = atoi(row[0]);
mysql_free_result(proxysql_servers_res);
if (row_value == 0) {
not_synced_query = true;
break;
}
}
if (not_synced_query) {
waited += 1;
sleep(1);
} else {
break;
}
}
if (not_synced_query) {
return -2;
} else {
return EXIT_SUCCESS;
}
}
int check_nodes_sync(
const CommandLine& cl, const std::vector<mysql_res_row>& core_nodes, const std::string& check_query, uint32_t sync_timeout
) {
for (const auto& node : core_nodes) {
const std::string host { node[0] };
const int port = std::stol(node[1]);
MYSQL* c_node_admin = mysql_init(NULL);
if (!mysql_real_connect(c_node_admin, host.c_str(), cl.admin_username, cl.admin_password, NULL, port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(c_node_admin));
return EXIT_FAILURE;
}
int not_synced = sync_checker(c_node_admin, { check_query }, sync_timeout);
if (not_synced != EXIT_SUCCESS) {
const std::string err_msg { "Node '" + host + ":" + std::to_string(port) + "' sync timed out" };
fprintf(stderr, "File %s, line %d, Error: `%s`\n", __FILE__, __LINE__, err_msg.c_str());
return EXIT_FAILURE;
}
}
return EXIT_SUCCESS;
}
int insert_mysql_servers_records(MYSQL* proxy_admin, const std::vector<mysql_server_tuple>& insert_mysql_servers_values,
const std::vector<replication_hostgroups_tuple>& insert_replication_hostgroups_values) {
@ -860,15 +781,15 @@ int main(int, char**) {
// 2. Remove primary from Core nodes
MYSQL_QUERY(proxy_admin, "DELETE FROM proxysql_servers WHERE hostname=='127.0.0.1' AND PORT==6032");
MYSQL_QUERY(proxy_admin, "LOAD PROXYSQL SERVERS TO RUNTIME");
MYSQL_QUERY(proxy_admin, "SELECT hostname,port FROM proxysql_servers");
MYSQL_RES* my_res = mysql_store_result(proxy_admin);
std::vector<mysql_res_row> core_nodes { extract_mysql_rows(my_res) };
mysql_free_result(my_res);
pair<int,vector<srv_addr_t>> core_nodes { fetch_cluster_nodes(proxy_admin) };
if (core_nodes.first) { return EXIT_FAILURE; }
// 2.1 If core nodes are not reachable, assume no cluster is running; make test gracefully exit
if (core_nodes.size()) {
const string host { core_nodes[0][0] };
const int port = std::stol(core_nodes[0][1]);
if (core_nodes.second.size()) {
const string host { core_nodes.second[0].host };
const int port { core_nodes.second[0].port };
MYSQL* c_node_admin = mysql_init(NULL);
if (!mysql_real_connect(c_node_admin, host.c_str(), cl.admin_username, cl.admin_password, NULL, port, NULL, 0)) {
@ -894,7 +815,7 @@ int main(int, char**) {
check_no_primary_query, cl.host, cl.admin_port
);
int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT);
int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT);
if (check_res != EXIT_SUCCESS) { return EXIT_FAILURE; }
// 4. Remove all current servers from primary instance (only secondary sync matters)
@ -1048,9 +969,9 @@ cleanup:
insert_query, cl.host, cl.admin_port
);
for (const auto& row : core_nodes) {
const std::string host { row[0] };
const int port = std::stol(row[1]);
for (const auto& node : core_nodes.second) {
const std::string host { node.host };
const int port = node.port;
MYSQL* c_node_admin = mysql_init(NULL);
diag("RESTORING: Inserting into node '%s:%d'", host.c_str(), port);
@ -1084,7 +1005,7 @@ cleanup:
);
// Wait for the other nodes to sync ProxySQL servers to include Primary
int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT);
int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT);
if (check_res != EXIT_SUCCESS) { return EXIT_FAILURE; }
// Recover the old ProxySQL servers from backup in primary

@ -1,8 +1,8 @@
#include <unistd.h>
#include <atomic>
#include <vector>
#include <string>
#include <utility>
#include <vector>
#include "mysql.h"
#include "tap.h"
@ -15,6 +15,9 @@
//#define BACKEND_SERVER_USER "root"
//#define BACKEND_SERVER_PASS "root"
using std::vector;
using std::pair;
#define MYSQL_QUERY__(mysql, query) \
do { \
if (mysql_query(mysql, query)) { \
@ -49,87 +52,6 @@ __exit:
return mysql;
}
/**
* @brief Helper function to verify that the sync of a table (or variable) have been performed.
*
* @param r_proxy_admin An already opened connection to ProxySQL.
* @param queries Queries to be executed that should return a **non-zero** value after the sync has taken place.
* @param sync_timeout Timeout for the sync to happen.
*
* @return EXIT_SUCCESS in case of success, otherwise:
* - '-1' if a query against Admin fails to be performed (failure is logged).
* - '-2' if timeout expired without sync being completed.
*/
int sync_checker(MYSQL* r_proxy_admin, const std::vector<std::string>& queries, uint32_t sync_timeout) {
bool not_synced_query = false;
uint waited = 0;
while (waited < sync_timeout) {
not_synced_query = false;
// Check that all the entries have been synced
for (const auto& query : queries) {
int q_res = mysql_query(r_proxy_admin, query.c_str());
if (q_res != EXIT_SUCCESS) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(r_proxy_admin));
return -1;
}
MYSQL_RES* proxysql_servers_res = mysql_store_result(r_proxy_admin);
MYSQL_ROW row = mysql_fetch_row(proxysql_servers_res);
int row_value = atoi(row[0]);
mysql_free_result(proxysql_servers_res);
if (row_value == 0) {
not_synced_query = true;
break;
}
}
if (not_synced_query) {
waited += 1;
sleep(1);
} else {
break;
}
}
if (not_synced_query) {
return -2;
} else {
return EXIT_SUCCESS;
}
}
int check_nodes_sync(
const CommandLine& cl, const std::vector<mysql_res_row>& core_nodes, const std::string& check_query, uint32_t sync_timeout
) {
int ret_status = EXIT_FAILURE;
for (const auto& node : core_nodes) {
const std::string host { node[0] };
const int port = std::stol(node[1]);
MYSQL* c_node_admin = mysql_init(NULL);
if (!mysql_real_connect(c_node_admin, host.c_str(), cl.admin_username, cl.admin_password, NULL, port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(c_node_admin));
goto __exit;
}
int not_synced = sync_checker(c_node_admin, { check_query }, sync_timeout);
if (not_synced != EXIT_SUCCESS) {
const std::string err_msg { "Node '" + host + ":" + std::to_string(port) + "' sync timed out" };
fprintf(stderr, "File %s, line %d, Error: `%s`\n", __FILE__, __LINE__, err_msg.c_str());
goto __exit;
}
}
ret_status = EXIT_SUCCESS;
__exit:
return ret_status;
}
int insert_mysql_servers_records(MYSQL* proxy_admin, const std::vector<mysql_server_tuple>& insert_mysql_servers_values,
const std::vector<replication_hostgroups_tuple>& insert_replication_hostgroups_values) {
@ -535,7 +457,7 @@ cleanup:
int test_read_only_offline_hard_servers(MYSQL* proxy_admin, const CommandLine& cl, bool isolate_primary_node) {
std::vector<mysql_res_row> core_nodes;
pair<int,vector<srv_addr_t>> core_nodes;
std::string check_no_primary_query;
if (isolate_primary_node) {
@ -553,10 +475,9 @@ int test_read_only_offline_hard_servers(MYSQL* proxy_admin, const CommandLine& c
// 2. Remove primary from Core nodes
MYSQL_QUERY__(proxy_admin, "DELETE FROM proxysql_servers WHERE hostname=='127.0.0.1' AND PORT==6032");
MYSQL_QUERY__(proxy_admin, "LOAD PROXYSQL SERVERS TO RUNTIME");
MYSQL_QUERY__(proxy_admin, "SELECT hostname,port FROM proxysql_servers");
MYSQL_RES* my_res = mysql_store_result(proxy_admin);
core_nodes = { extract_mysql_rows(my_res) };
mysql_free_result(my_res);
core_nodes = fetch_cluster_nodes(proxy_admin);
if (core_nodes.first) { goto cleanup; }
// 3. Wait for all Core nodes to sync (confirm primary out of core nodes)
string_format(
@ -564,7 +485,7 @@ int test_read_only_offline_hard_servers(MYSQL* proxy_admin, const CommandLine& c
check_no_primary_query, cl.host, cl.admin_port
);
int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT);
int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT);
if (check_res != EXIT_SUCCESS) {
goto cleanup;
}
@ -601,9 +522,9 @@ cleanup:
insert_query, cl.host, cl.admin_port
);
for (const auto& row : core_nodes) {
const std::string host{ row[0] };
const int port = std::stol(row[1]);
for (const auto& row : core_nodes.second) {
const std::string host{ row.host };
const int port = row.port;
MYSQL* c_node_admin = mysql_init(NULL);
diag("RESTORING: Inserting into node '%s:%d'", host.c_str(), port);
@ -637,7 +558,7 @@ cleanup:
);
// Wait for the other nodes to sync ProxySQL servers to include Primary
int check_res = check_nodes_sync(cl, core_nodes, check_no_primary_query, SYNC_TIMEOUT);
int check_res = check_nodes_sync(cl, core_nodes.second, check_no_primary_query, SYNC_TIMEOUT);
if (check_res != EXIT_SUCCESS) { return EXIT_FAILURE; }
// Recover the old ProxySQL servers from backup in primary

Loading…
Cancel
Save