test: refine PgSQL noise v2 and update integrated tests

- Optimized PgSQL Traffic v2 with database creation and improved setup logic.
- Implemented synchronized reporting and detailed failure tracking.
- Injected noise routines into multiple administrative and protocol tests.
- Adjusted test plans to accommodate background noise threads.
pull/5408/head
Rene Cannao 2 months ago
parent ceb89fb915
commit 09284faea4

@ -37,6 +37,14 @@ static void pg_noise_query(PGconn* conn, const char* query) {
if (res) PQclear(res);
}
// Helper to get string option from map
static std::string get_opt_str(const NoiseOptions& opt, const std::string& key, const std::string& default_val) {
if (opt.find(key) != opt.end()) {
return opt.at(key);
}
return default_val;
}
// Helper to get int option from map
static int get_opt_int(const NoiseOptions& opt, const std::string& key, int default_val) {
if (opt.find(key) != opt.end()) {
@ -83,7 +91,7 @@ int get_internal_noise_threads_count() {
// --- Standard Internal Noise Functions Implementation ---
void internal_noise_admin_pinger(const CommandLine& cl, const NoiseOptions& opt, std::atomic<bool>& stop) {
int interval_ms = get_opt_int(opt, "interval_ms", 500);
int interval_ms = get_opt_int(opt, "interval_ms", 200);
int max_retries = get_opt_int(opt, "max_retries", 5);
int retries = 0;
uint64_t total_pings = 0;
@ -306,10 +314,12 @@ void internal_noise_random_stats_poller(const CommandLine& cl, const NoiseOption
MYSQL* admin_my = mysql_init(NULL);
PGconn* admin_pg = NULL;
std::vector<std::string> my_tables = {"stats_mysql_query_digest", "stats_mysql_connection_pool", "stats_mysql_processlist", "stats_mysql_global", "stats_mysql_user_stats", "stats_mysql_query_rules", "stats_mysql_commands_counters"};
std::vector<std::string> pg_tables = {"stats_pgsql_query_digest", "stats_pgsql_connection_pool", "stats_pgsql_processlist", "stats_pgsql_global", "stats_pgsql_commands_counters"};
std::vector<std::string> stats_tables;
bool tables_fetched = false;
std::random_device rd;
std::mt19937 g(rd());
std::uniform_int_distribution<int> limit_dist(10, 1000);
while (!stop) {
bool my_ok = true;
@ -339,18 +349,49 @@ void internal_noise_random_stats_poller(const CommandLine& cl, const NoiseOption
register_noise_failure("Random Stats Poller");
break;
}
} else {
retries = 0;
std::shuffle(my_tables.begin(), my_tables.end(), g);
std::shuffle(pg_tables.begin(), pg_tables.end(), g);
std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms));
continue;
}
retries = 0;
// Fetch tables once
if (!tables_fetched && my_ok) {
if (mysql_query(admin_my, "SHOW TABLES FROM stats") == 0) {
MYSQL_RES* res = mysql_store_result(admin_my);
if (res) {
MYSQL_ROW row;
while ((row = mysql_fetch_row(res))) {
std::string table = row[0];
// Ignore tables ending in _reset
if (table.size() < 6 || table.compare(table.size() - 6, 6, "_reset") != 0) {
stats_tables.push_back(table);
}
}
mysql_free_result(res);
tables_fetched = true;
}
}
}
if (tables_fetched && !stats_tables.empty()) {
std::shuffle(stats_tables.begin(), stats_tables.end(), g);
for (size_t i = 0; i < 3; ++i) {
if (stop) break;
total_queries++;
if (my_ok && mysql_query(admin_my, ("SELECT * FROM " + my_tables[i % my_tables.size()] + " LIMIT 10").c_str()) == 0) {
MYSQL_RES* res = mysql_store_result(admin_my);
if (res) mysql_free_result(res);
std::string table = stats_tables[i % stats_tables.size()];
int limit = limit_dist(g);
std::string query = "SELECT * FROM stats." + table + " LIMIT " + std::to_string(limit);
if (my_ok) {
if (mysql_query(admin_my, query.c_str()) == 0) {
MYSQL_RES* res = mysql_store_result(admin_my);
if (res) mysql_free_result(res);
}
}
if (pg_ok) {
pg_noise_query(admin_pg, query.c_str());
}
if (pg_ok) pg_noise_query(admin_pg, ("SELECT * FROM " + pg_tables[i % pg_tables.size()] + " LIMIT 10").c_str());
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms));
@ -416,7 +457,7 @@ void internal_noise_pgsql_traffic(const CommandLine& cl, const NoiseOptions& opt
if (conn) PQfinish(conn);
std::string conninfo = "host=" + std::string(cl.host) + " port=" + std::to_string(cl.pgsql_port) +
" user=" + std::string(cl.pgsql_username) + " password=" + std::string(cl.pgsql_password) +
" dbname=test connect_timeout=2";
" dbname=postgres connect_timeout=2";
conn = PQconnectdb(conninfo.c_str());
if (PQstatus(conn) != CONNECTION_OK) {
retries++;
@ -438,3 +479,161 @@ void internal_noise_pgsql_traffic(const CommandLine& cl, const NoiseOptions& opt
if (conn) PQfinish(conn);
noise_log("[NOISE] PgSQL Traffic report: total_queries=" + std::to_string(total_queries) + "\n");
}
void internal_noise_pgsql_traffic_v2(const CommandLine& cl, const NoiseOptions& opt, std::atomic<bool>& stop) {
std::string tablename = get_opt_str(opt, "tablename", "pgsql_noise_test");
int num_connections = get_opt_int(opt, "num_connections", 20);
int reconnect_interval = get_opt_int(opt, "reconnect_interval", 200);
int max_retries = get_opt_int(opt, "max_retries", 5);
// Use postgres user for setup and load to ensure permissions
std::string conninfo = "host=" + std::string(cl.host) + " port=" + std::to_string(cl.pgsql_port) +
" user=postgres password=postgres" +
" dbname=postgres connect_timeout=5";
// --- Phase A: Ensure table exists ---
PGconn* setup_conn = PQconnectdb(conninfo.c_str());
if (PQstatus(setup_conn) != CONNECTION_OK) {
noise_log("[NOISE] PgSQL Traffic v2: Setup connection failure: " + std::string(PQerrorMessage(setup_conn)) + "\n");
PQfinish(setup_conn);
register_noise_failure("PgSQL Traffic v2 (Setup)");
return;
}
std::string check_table = "SELECT 1 FROM information_schema.tables WHERE table_name = '" + tablename + "'";
PGresult* res = PQexec(setup_conn, check_table.c_str());
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res) == 0) {
PQclear(res);
noise_log("[NOISE] PgSQL Traffic v2: Creating table " + tablename + " in postgres database\n");
std::string create_sql = "CREATE TABLE " + tablename + " (id SERIAL PRIMARY KEY, val TEXT, counter INT)";
res = PQexec(setup_conn, create_sql.c_str());
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
noise_log("[NOISE] PgSQL Traffic v2: Table creation failed: " + std::string(PQresultErrorMessage(res)) + "\n");
} else {
noise_log("[NOISE] PgSQL Traffic v2: Table " + tablename + " created successfully\n");
}
PQclear(res);
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
noise_log("[NOISE] PgSQL Traffic v2: Table " + tablename + " already exists or verification failed\n");
PQclear(res);
}
// --- Phase B: Ensure 10,000 rows ---
while (!stop) {
std::string count_sql = "SELECT COUNT(*) FROM " + tablename;
res = PQexec(setup_conn, count_sql.c_str());
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
long current_rows = std::stol(PQgetvalue(res, 0, 0));
PQclear(res);
if (current_rows < 10000) {
noise_log("[NOISE] PgSQL Traffic v2: " + std::to_string(current_rows) + " rows found, adding 5000...\n");
std::string insert_sql = "INSERT INTO " + tablename + " (val, counter) SELECT 'noise_data', generate_series(1, 5000)";
res = PQexec(setup_conn, insert_sql.c_str());
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
noise_log("[NOISE] PgSQL Traffic v2: Row insertion failed: " + std::string(PQresultErrorMessage(res)) + "\n");
}
PQclear(res);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
break;
}
} else {
noise_log("[NOISE] PgSQL Traffic v2: Row count failed: " + std::string(PQresultErrorMessage(res)) + "\n");
PQclear(res);
break;
}
}
PQfinish(setup_conn);
if (stop) return;
// --- Phase C, D, E: Multi-threaded load ---
std::atomic<uint64_t> total_queries{0};
std::atomic<uint64_t> total_connections_opened{0};
std::atomic<uint64_t> total_connections_closed{0};
std::vector<std::thread> workers;
for (int i = 0; i < num_connections; ++i) {
workers.emplace_back([&, conninfo, tablename, reconnect_interval]() {
PGconn* conn = nullptr;
uint64_t worker_queries = 0;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> op_dist(0, 3);
std::uniform_int_distribution<> id_dist(1, 10000);
std::uniform_int_distribution<> delay_dist(50, 200);
std::uniform_int_distribution<> start_dist(0, 500);
// Staggered start
std::this_thread::sleep_for(std::chrono::milliseconds(start_dist(gen)));
auto connect = [&]() {
if (conn) {
PQfinish(conn);
total_connections_closed++;
}
conn = PQconnectdb(conninfo.c_str());
if (PQstatus(conn) == CONNECTION_OK) {
total_connections_opened++;
return true;
}
return false;
};
if (!connect()) {
noise_log("[NOISE] PgSQL Traffic v2: Worker failed initial connection\n");
return;
}
while (!stop) {
int op = op_dist(gen);
std::string sql;
int target_id = id_dist(gen);
switch (op) {
case 0: // SELECT
sql = "SELECT * FROM " + tablename + " WHERE id = " + std::to_string(target_id);
break;
case 1: // INSERT
sql = "INSERT INTO " + tablename + " (val, counter) VALUES ('extra_noise', " + std::to_string(target_id) + ")";
break;
case 2: // UPDATE
sql = "UPDATE " + tablename + " SET counter = counter + 1 WHERE id = " + std::to_string(target_id);
break;
case 3: // DELETE
sql = "DELETE FROM " + tablename + " WHERE id = " + std::to_string(target_id);
break;
}
PGresult* r = PQexec(conn, sql.c_str());
if (r) PQclear(r);
worker_queries++;
uint64_t current_total = ++total_queries;
if (current_total % 1000 == 0) {
noise_log("[NOISE] PgSQL Traffic v2 progress: " + std::to_string(current_total) + " queries executed...\n");
}
if (worker_queries % reconnect_interval == 0) {
if (!connect()) break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(delay_dist(gen)));
}
if (conn) {
PQfinish(conn);
total_connections_closed++;
}
});
}
for (auto& w : workers) {
if (w.joinable()) w.join();
}
noise_log("[NOISE] PgSQL Traffic v2 report: total_queries=" + std::to_string(total_queries) +
", total_connections_opened=" + std::to_string(total_connections_opened) +
", total_connections_closed=" + std::to_string(total_connections_closed) + "\n");
}

@ -95,6 +95,11 @@ void internal_noise_mysql_traffic(const CommandLine& cl, const NoiseOptions& opt
*/
void internal_noise_pgsql_traffic(const CommandLine& cl, const NoiseOptions& opt, std::atomic<bool>& stop);
/**
* @brief Version 2 of PgSQL Traffic: complex multi-threaded load with table setup and reconnections.
*/
void internal_noise_pgsql_traffic_v2(const CommandLine& cl, const NoiseOptions& opt, std::atomic<bool>& stop);
/**
* @brief Periodically fetches Prometheus metrics via the REST API.
*/

@ -24,8 +24,8 @@ int main(int argc, char** argv) {
// Force noise to be enabled for this test
cl.use_noise = true;
// We have 7 internal noise tools
const int noise_tools_count = 7;
// We have 8 internal noise tools
const int noise_tools_count = 8;
plan(noise_tools_count);
diag("Spawning all available noise tools...");
@ -36,10 +36,11 @@ int main(int argc, char** argv) {
spawn_internal_noise(cl, internal_noise_random_stats_poller);
spawn_internal_noise(cl, internal_noise_mysql_traffic);
spawn_internal_noise(cl, internal_noise_pgsql_traffic);
spawn_internal_noise(cl, internal_noise_pgsql_traffic_v2, {{"num_connections", "100"}, {"reconnect_interval", "100"}});
spawn_internal_noise(cl, internal_noise_rest_prometheus_poller, {{"enable_rest_api", "true"}, {"port", "6070"}});
diag("Sleeping for 10 seconds to let noises work...");
sleep(10);
diag("Sleeping for 60 seconds to let noises work...");
sleep(60);
diag("Exiting test, noise reports should follow...");

Loading…
Cancel
Save