diff --git a/include/MCP_Thread.h b/include/MCP_Thread.h index 103399b60..e0eb93bc9 100644 --- a/include/MCP_Thread.h +++ b/include/MCP_Thread.h @@ -83,6 +83,14 @@ public: * hardcoded safety maximum in `Stats_Tool_Handler`. */ int mcp_stats_show_queries_max_rows; + /** + * @brief Runtime cap for `stats.show_processlist` returned rows. + * + * The handler applies this as an upper bound for caller-requested page + * size (`limit`). The configurable value is itself clamped by a + * hardcoded safety maximum in `Stats_Tool_Handler`. + */ + int mcp_stats_show_processlist_max_rows; // MySQL Tool Handler configuration char* mcp_mysql_hosts; ///< Comma-separated list of MySQL hosts char* mcp_mysql_ports; ///< Comma-separated list of MySQL ports diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 3d844dc6a..b8aafa39e 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -712,6 +712,17 @@ class MySQL_Threads_Handler void start_listeners(); void stop_listeners(); void signal_all_threads(unsigned char _c=0); + /** + * @brief Build an in-memory processlist snapshot for MySQL sessions. + * + * The returned resultset always uses the canonical `stats_mysql_processlist` + * column layout. When `args.query_options.enabled=true`, the snapshot is + * post-processed in memory using typed filters, deterministic sorting, and + * pagination controls from `processlist_query_options_t`. + * + * @param args Processlist rendering options and optional query options. + * @return Newly allocated resultset owned by the caller. + */ SQLite3_result * SQL3_Processlist(processlist_config_t args); SQLite3_result * SQL3_GlobalStatus(bool _memory); bool kill_session(uint32_t _thread_session_id); diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index bb874463f..3caedbf32 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -1515,7 +1515,10 @@ public: /** * @brief Retrieves a process list for all threads in the thread pool. * - * @param args Processlist configuration of PgSQL. + * @param args + * Processlist rendering options and optional typed query controls. + * When `args.query_options.enabled=true`, filtering/sorting/pagination is + * applied in memory after the live snapshot is collected. * * @return A `SQLite3_result` object containing the process list, or `NULL` if an error * occurred. diff --git a/include/Stats_Tool_Handler.h b/include/Stats_Tool_Handler.h index ce8d88abc..06fbcfed3 100644 --- a/include/Stats_Tool_Handler.h +++ b/include/Stats_Tool_Handler.h @@ -58,7 +58,7 @@ private: /** * @brief Shows all currently active sessions - * @param arguments JSON with db_type, username, hostgroup, min_time_ms, limit, offset + * @param arguments JSON with db_type, filters, sort_by, sort_order, limit, offset */ json handle_show_processlist(const json& arguments); diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 5a8ccc99b..a84a6dec8 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -242,12 +242,62 @@ struct peer_pgsql_servers_v2_t { peer_pgsql_servers_v2_t(SQLite3_result*, const pgsql_servers_v2_checksum_t&); }; +/** + * @brief Sort keys supported by in-memory processlist queries. + * + * These keys are intentionally limited to fields that are available for both + * MySQL and PgSQL processlist rows so callers can use a single contract. + */ +enum class processlist_sort_by_t { + none, ///< Keep producer order (no explicit sorting). + time_ms, ///< Sort by session runtime in milliseconds. + session_id, ///< Sort by ProxySQL session id. + username, ///< Sort by authenticated username. + hostgroup, ///< Sort by current hostgroup id. + command ///< Sort by command/status text. +}; + +/** + * @brief Typed filtering, ordering, and pagination options for processlist. + * + * The options are consumed by `MySQL_Threads_Handler::SQL3_Processlist()` and + * `PgSQL_Threads_Handler::SQL3_Processlist()` to query live in-memory session + * state without going through runtime-populated `stats_*_processlist` tables. + * + * All string filters are exact matches unless noted otherwise. + */ +struct processlist_query_options_t { + bool enabled {false}; ///< Enables query-options processing. + std::string username {}; ///< Optional exact username filter. + std::string database {}; ///< Optional exact schema/database filter. + int hostgroup {-1}; ///< Optional hostgroup filter (`-1` disables). + std::string command {}; ///< Optional exact command/status filter. + int min_time_ms {-1}; ///< Optional minimum runtime (`-1` disables). + bool has_session_id {false}; ///< Whether @ref session_id is active. + uint32_t session_id {0}; ///< Optional exact session identifier. + std::string match_info {}; ///< Optional substring filter on `info`. + bool info_case_sensitive {false}; ///< Case sensitivity mode for @ref match_info. + processlist_sort_by_t sort_by {processlist_sort_by_t::none}; ///< Optional primary sort key. + bool sort_desc {true}; ///< Sort direction for @ref sort_by. + bool disable_pagination {false}; ///< If true, ignore @ref limit and @ref offset. + uint32_t limit {0}; ///< Page size (`0` means return zero rows). + uint32_t offset {0}; ///< Number of rows to skip before page. +}; + +/** + * @brief Processlist extraction configuration. + * + * This structure combines legacy processlist rendering controls + * (`show_extended`, `max_query_length`) with optional query-time filters, + * ordering, and pagination in @ref query_options. + */ struct processlist_config_t { #ifdef IDLE_THREADS bool show_idle_session; #endif int show_extended; int max_query_length; + processlist_query_options_t query_options {}; }; class ProxySQL_Admin { diff --git a/lib/MCP_Thread.cpp b/lib/MCP_Thread.cpp index 30a304835..158172386 100644 --- a/lib/MCP_Thread.cpp +++ b/lib/MCP_Thread.cpp @@ -34,6 +34,7 @@ static const char* mcp_thread_variables_names[] = { "rag_endpoint_auth", "timeout_ms", "stats_show_queries_max_rows", + "stats_show_processlist_max_rows", // MySQL Tool Handler configuration "mysql_hosts", "mysql_ports", @@ -62,6 +63,7 @@ MCP_Threads_Handler::MCP_Threads_Handler() { variables.mcp_rag_endpoint_auth = strdup(""); variables.mcp_timeout_ms = 30000; variables.mcp_stats_show_queries_max_rows = 200; + variables.mcp_stats_show_processlist_max_rows = 200; // MySQL Tool Handler default values variables.mcp_mysql_hosts = strdup("127.0.0.1"); variables.mcp_mysql_ports = strdup("3306"); @@ -236,6 +238,10 @@ int MCP_Threads_Handler::get_variable(const char* name, char* val) { sprintf(val, "%d", variables.mcp_stats_show_queries_max_rows); return 0; } + if (!strcmp(name, "stats_show_processlist_max_rows")) { + sprintf(val, "%d", variables.mcp_stats_show_processlist_max_rows); + return 0; + } // MySQL Tool Handler configuration if (!strcmp(name, "mysql_hosts")) { sprintf(val, "%s", variables.mcp_mysql_hosts ? variables.mcp_mysql_hosts : ""); @@ -357,6 +363,19 @@ int MCP_Threads_Handler::set_variable(const char* name, const char* value) { } return -1; } + if (!strcmp(name, "stats_show_processlist_max_rows")) { + /** + * Hard safety cap: do not allow configuring values above 1000. + * This keeps MCP show_processlist bounded even if callers request + * large pages. + */ + int max_rows = atoi(value); + if (max_rows >= 1 && max_rows <= 1000) { + variables.mcp_stats_show_processlist_max_rows = max_rows; + return 0; + } + return -1; + } // MySQL Tool Handler configuration if (!strcmp(name, "mysql_hosts")) { if (variables.mcp_mysql_hosts) diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 8350ad508..b5309cbed 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -5,6 +5,9 @@ using json = nlohmann::json; //#define __CLASS_STANDARD_MYSQL_THREAD_H #include +#include +#include +#include #include #include "proxysql_utils.h" @@ -5277,6 +5280,268 @@ void MySQL_Threads_Handler::Get_Memory_Stats() { } } +namespace { + +/** + * @brief Column indexes used by MySQL processlist filtering and sorting helpers. + */ +struct mysql_processlist_columns_t { + static constexpr int session_id = 1; + static constexpr int username = 2; + static constexpr int database = 3; + static constexpr int hostgroup = 6; + static constexpr int command = 11; + static constexpr int time_ms = 12; + static constexpr int info = 13; +}; + +/** + * @brief Safely return a row field or an empty string when missing. + * + * @param row Source processlist row. + * @param idx Field index in the processlist result. + * @return Pointer to the requested field or an empty-string literal. + */ +static const char* mysql_pl_field(const SQLite3_row* row, int idx) { + if (!row || idx < 0 || idx >= row->cnt || !row->fields[idx]) { + return ""; + } + return row->fields[idx]; +} + +/** + * @brief Parse a processlist numeric field as unsigned integer. + * + * Invalid or empty values are normalized to zero so sorting and filtering remain + * deterministic for partial rows. + * + * @param value Text field containing an integer representation. + * @return Parsed unsigned value, or `0` on parse failure. + */ +static uint64_t mysql_pl_to_u64(const char* value) { + if (!value || !value[0]) { + return 0; + } + + char* end = nullptr; + errno = 0; + unsigned long long parsed = strtoull(value, &end, 10); + if (end == value || *end != '\0' || errno != 0) { + return 0; + } + + return static_cast(parsed); +} + +/** + * @brief Case-(in)sensitive substring matcher used by `match_info`. + * + * @param haystack Candidate text. + * @param needle Substring to search for. + * @param case_sensitive Whether matching should be case-sensitive. + * @return true when @p needle is found in @p haystack. + */ +static bool mysql_pl_contains(const std::string& haystack, const std::string& needle, bool case_sensitive) { + if (needle.empty()) { + return true; + } + + if (case_sensitive) { + return haystack.find(needle) != std::string::npos; + } + + auto it = std::search( + haystack.begin(), + haystack.end(), + needle.begin(), + needle.end(), + [](char lhs, char rhs) { + return std::tolower(static_cast(lhs)) == + std::tolower(static_cast(rhs)); + } + ); + return it != haystack.end(); +} + +/** + * @brief Evaluate whether a MySQL processlist row matches caller filters. + * + * @param row Processlist row from `SQL3_Processlist`. + * @param opts Typed query options supplied by the caller. + * @return true when the row satisfies all active filters. + */ +static bool mysql_pl_row_matches(const SQLite3_row* row, const processlist_query_options_t& opts) { + if (!opts.username.empty() && opts.username != mysql_pl_field(row, mysql_processlist_columns_t::username)) { + return false; + } + if (!opts.database.empty() && opts.database != mysql_pl_field(row, mysql_processlist_columns_t::database)) { + return false; + } + if (opts.hostgroup >= 0) { + const int row_hostgroup = static_cast(mysql_pl_to_u64(mysql_pl_field(row, mysql_processlist_columns_t::hostgroup))); + if (row_hostgroup != opts.hostgroup) { + return false; + } + } + if (!opts.command.empty() && opts.command != mysql_pl_field(row, mysql_processlist_columns_t::command)) { + return false; + } + if (opts.min_time_ms >= 0) { + const int row_time_ms = static_cast(mysql_pl_to_u64(mysql_pl_field(row, mysql_processlist_columns_t::time_ms))); + if (row_time_ms < opts.min_time_ms) { + return false; + } + } + if (opts.has_session_id) { + const uint64_t row_session_id = mysql_pl_to_u64(mysql_pl_field(row, mysql_processlist_columns_t::session_id)); + if (row_session_id != opts.session_id) { + return false; + } + } + if (!opts.match_info.empty()) { + const std::string info = mysql_pl_field(row, mysql_processlist_columns_t::info); + if (!mysql_pl_contains(info, opts.match_info, opts.info_case_sensitive)) { + return false; + } + } + + return true; +} + +/** + * @brief Compare two MySQL processlist rows according to typed sort options. + * + * The comparison applies a deterministic tie-breaker on `SessionID` so paging + * remains stable across repeated calls with identical data. + * + * @param lhs Left-hand row. + * @param rhs Right-hand row. + * @param opts Query options carrying sort key and direction. + * @return true when @p lhs should be ordered before @p rhs. + */ +static bool mysql_pl_row_less(const SQLite3_row* lhs, const SQLite3_row* rhs, const processlist_query_options_t& opts) { + const uint64_t lhs_session_id = mysql_pl_to_u64(mysql_pl_field(lhs, mysql_processlist_columns_t::session_id)); + const uint64_t rhs_session_id = mysql_pl_to_u64(mysql_pl_field(rhs, mysql_processlist_columns_t::session_id)); + + auto string_compare = [&](int idx) -> int { + const std::string lhs_value = mysql_pl_field(lhs, idx); + const std::string rhs_value = mysql_pl_field(rhs, idx); + if (lhs_value < rhs_value) { + return -1; + } + if (lhs_value > rhs_value) { + return 1; + } + return 0; + }; + + auto numeric_compare = [&](int idx) -> int { + const uint64_t lhs_value = mysql_pl_to_u64(mysql_pl_field(lhs, idx)); + const uint64_t rhs_value = mysql_pl_to_u64(mysql_pl_field(rhs, idx)); + if (lhs_value < rhs_value) { + return -1; + } + if (lhs_value > rhs_value) { + return 1; + } + return 0; + }; + + int cmp = 0; + switch (opts.sort_by) { + case processlist_sort_by_t::time_ms: + cmp = numeric_compare(mysql_processlist_columns_t::time_ms); + break; + case processlist_sort_by_t::session_id: + cmp = numeric_compare(mysql_processlist_columns_t::session_id); + break; + case processlist_sort_by_t::username: + cmp = string_compare(mysql_processlist_columns_t::username); + break; + case processlist_sort_by_t::hostgroup: + cmp = numeric_compare(mysql_processlist_columns_t::hostgroup); + break; + case processlist_sort_by_t::command: + cmp = string_compare(mysql_processlist_columns_t::command); + break; + case processlist_sort_by_t::none: + default: + cmp = 0; + break; + } + + if (cmp != 0) { + return opts.sort_desc ? (cmp > 0) : (cmp < 0); + } + + return lhs_session_id < rhs_session_id; +} + +/** + * @brief Apply typed filtering, sorting, and pagination to MySQL processlist rows. + * + * This post-processing stage is intentionally local to `SQL3_Processlist()` so + * the same API can serve: + * - legacy Admin stats refreshes (options disabled, full result) + * - MCP live queries (options enabled with filters and page controls) + * + * @param result Mutable resultset generated by `SQL3_Processlist()`. + * @param opts Query options controlling filtering/sorting/pagination. + */ +static void apply_mysql_processlist_query_options(SQLite3_result* result, const processlist_query_options_t& opts) { + if (!result || !opts.enabled) { + return; + } + + std::vector filtered_rows; + filtered_rows.reserve(result->rows.size()); + + for (SQLite3_row* row : result->rows) { + if (mysql_pl_row_matches(row, opts)) { + filtered_rows.push_back(row); + } else { + delete row; + } + } + + if (opts.sort_by != processlist_sort_by_t::none && filtered_rows.size() > 1) { + std::stable_sort( + filtered_rows.begin(), + filtered_rows.end(), + [&opts](const SQLite3_row* lhs, const SQLite3_row* rhs) { + return mysql_pl_row_less(lhs, rhs, opts); + } + ); + } + + size_t begin = std::min(opts.offset, filtered_rows.size()); + size_t end = begin; + if (opts.disable_pagination) { + begin = 0; + end = filtered_rows.size(); + } else { + const uint64_t requested_end = static_cast(begin) + static_cast(opts.limit); + end = std::min(filtered_rows.size(), static_cast(requested_end)); + } + + std::vector paged_rows; + paged_rows.reserve(end > begin ? (end - begin) : 0); + + for (size_t idx = 0; idx < filtered_rows.size(); ++idx) { + SQLite3_row* row = filtered_rows[idx]; + if (idx >= begin && idx < end) { + paged_rows.push_back(row); + } else { + delete row; + } + } + + result->rows.swap(paged_rows); + result->rows_count = static_cast(result->rows.size()); +} + +} // namespace + SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist(processlist_config_t args) { const int colnum=16; char port[NI_MAXSERV]; @@ -5558,6 +5823,14 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist(processlist_config_t ar } pthread_mutex_unlock(&thr->thread_mutex); } + + /** + * Apply optional in-memory query options used by MCP and other internal + * consumers. Legacy callers keep `query_options.enabled=false`, so their + * behavior remains unchanged and they still receive the full processlist. + */ + apply_mysql_processlist_query_options(result, args.query_options); + return result; } diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index c3aa74136..385bfadb0 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -5,6 +5,9 @@ using json = nlohmann::json; //#define __CLASS_STANDARD_MYSQL_THREAD_H #include +#include +#include +#include #include #include "proxysql_utils.h" @@ -4647,6 +4650,268 @@ void PgSQL_Threads_Handler::Get_Memory_Stats() { } } +namespace { + +/** + * @brief Column indexes used by PgSQL processlist filtering and sorting helpers. + */ +struct pgsql_processlist_columns_t { + static constexpr int session_id = 1; + static constexpr int username = 2; + static constexpr int database = 3; + static constexpr int hostgroup = 6; + static constexpr int command = 13; + static constexpr int time_ms = 14; + static constexpr int info = 15; +}; + +/** + * @brief Safely return a row field or an empty string when missing. + * + * @param row Source processlist row. + * @param idx Field index in the processlist result. + * @return Pointer to the requested field or an empty-string literal. + */ +static const char* pgsql_pl_field(const SQLite3_row* row, int idx) { + if (!row || idx < 0 || idx >= row->cnt || !row->fields[idx]) { + return ""; + } + return row->fields[idx]; +} + +/** + * @brief Parse a processlist numeric field as unsigned integer. + * + * Invalid or empty values are normalized to zero so sorting and filtering remain + * deterministic for partial rows. + * + * @param value Text field containing an integer representation. + * @return Parsed unsigned value, or `0` on parse failure. + */ +static uint64_t pgsql_pl_to_u64(const char* value) { + if (!value || !value[0]) { + return 0; + } + + char* end = nullptr; + errno = 0; + unsigned long long parsed = strtoull(value, &end, 10); + if (end == value || *end != '\0' || errno != 0) { + return 0; + } + + return static_cast(parsed); +} + +/** + * @brief Case-(in)sensitive substring matcher used by `match_info`. + * + * @param haystack Candidate text. + * @param needle Substring to search for. + * @param case_sensitive Whether matching should be case-sensitive. + * @return true when @p needle is found in @p haystack. + */ +static bool pgsql_pl_contains(const std::string& haystack, const std::string& needle, bool case_sensitive) { + if (needle.empty()) { + return true; + } + + if (case_sensitive) { + return haystack.find(needle) != std::string::npos; + } + + auto it = std::search( + haystack.begin(), + haystack.end(), + needle.begin(), + needle.end(), + [](char lhs, char rhs) { + return std::tolower(static_cast(lhs)) == + std::tolower(static_cast(rhs)); + } + ); + return it != haystack.end(); +} + +/** + * @brief Evaluate whether a PgSQL processlist row matches caller filters. + * + * @param row Processlist row from `SQL3_Processlist`. + * @param opts Typed query options supplied by the caller. + * @return true when the row satisfies all active filters. + */ +static bool pgsql_pl_row_matches(const SQLite3_row* row, const processlist_query_options_t& opts) { + if (!opts.username.empty() && opts.username != pgsql_pl_field(row, pgsql_processlist_columns_t::username)) { + return false; + } + if (!opts.database.empty() && opts.database != pgsql_pl_field(row, pgsql_processlist_columns_t::database)) { + return false; + } + if (opts.hostgroup >= 0) { + const int row_hostgroup = static_cast(pgsql_pl_to_u64(pgsql_pl_field(row, pgsql_processlist_columns_t::hostgroup))); + if (row_hostgroup != opts.hostgroup) { + return false; + } + } + if (!opts.command.empty() && opts.command != pgsql_pl_field(row, pgsql_processlist_columns_t::command)) { + return false; + } + if (opts.min_time_ms >= 0) { + const int row_time_ms = static_cast(pgsql_pl_to_u64(pgsql_pl_field(row, pgsql_processlist_columns_t::time_ms))); + if (row_time_ms < opts.min_time_ms) { + return false; + } + } + if (opts.has_session_id) { + const uint64_t row_session_id = pgsql_pl_to_u64(pgsql_pl_field(row, pgsql_processlist_columns_t::session_id)); + if (row_session_id != opts.session_id) { + return false; + } + } + if (!opts.match_info.empty()) { + const std::string info = pgsql_pl_field(row, pgsql_processlist_columns_t::info); + if (!pgsql_pl_contains(info, opts.match_info, opts.info_case_sensitive)) { + return false; + } + } + + return true; +} + +/** + * @brief Compare two PgSQL processlist rows according to typed sort options. + * + * The comparison applies a deterministic tie-breaker on `SessionID` so paging + * remains stable across repeated calls with identical data. + * + * @param lhs Left-hand row. + * @param rhs Right-hand row. + * @param opts Query options carrying sort key and direction. + * @return true when @p lhs should be ordered before @p rhs. + */ +static bool pgsql_pl_row_less(const SQLite3_row* lhs, const SQLite3_row* rhs, const processlist_query_options_t& opts) { + const uint64_t lhs_session_id = pgsql_pl_to_u64(pgsql_pl_field(lhs, pgsql_processlist_columns_t::session_id)); + const uint64_t rhs_session_id = pgsql_pl_to_u64(pgsql_pl_field(rhs, pgsql_processlist_columns_t::session_id)); + + auto string_compare = [&](int idx) -> int { + const std::string lhs_value = pgsql_pl_field(lhs, idx); + const std::string rhs_value = pgsql_pl_field(rhs, idx); + if (lhs_value < rhs_value) { + return -1; + } + if (lhs_value > rhs_value) { + return 1; + } + return 0; + }; + + auto numeric_compare = [&](int idx) -> int { + const uint64_t lhs_value = pgsql_pl_to_u64(pgsql_pl_field(lhs, idx)); + const uint64_t rhs_value = pgsql_pl_to_u64(pgsql_pl_field(rhs, idx)); + if (lhs_value < rhs_value) { + return -1; + } + if (lhs_value > rhs_value) { + return 1; + } + return 0; + }; + + int cmp = 0; + switch (opts.sort_by) { + case processlist_sort_by_t::time_ms: + cmp = numeric_compare(pgsql_processlist_columns_t::time_ms); + break; + case processlist_sort_by_t::session_id: + cmp = numeric_compare(pgsql_processlist_columns_t::session_id); + break; + case processlist_sort_by_t::username: + cmp = string_compare(pgsql_processlist_columns_t::username); + break; + case processlist_sort_by_t::hostgroup: + cmp = numeric_compare(pgsql_processlist_columns_t::hostgroup); + break; + case processlist_sort_by_t::command: + cmp = string_compare(pgsql_processlist_columns_t::command); + break; + case processlist_sort_by_t::none: + default: + cmp = 0; + break; + } + + if (cmp != 0) { + return opts.sort_desc ? (cmp > 0) : (cmp < 0); + } + + return lhs_session_id < rhs_session_id; +} + +/** + * @brief Apply typed filtering, sorting, and pagination to PgSQL processlist rows. + * + * This post-processing stage is intentionally local to `SQL3_Processlist()` so + * the same API can serve: + * - legacy Admin stats refreshes (options disabled, full result) + * - MCP live queries (options enabled with filters and page controls) + * + * @param result Mutable resultset generated by `SQL3_Processlist()`. + * @param opts Query options controlling filtering/sorting/pagination. + */ +static void apply_pgsql_processlist_query_options(SQLite3_result* result, const processlist_query_options_t& opts) { + if (!result || !opts.enabled) { + return; + } + + std::vector filtered_rows; + filtered_rows.reserve(result->rows.size()); + + for (SQLite3_row* row : result->rows) { + if (pgsql_pl_row_matches(row, opts)) { + filtered_rows.push_back(row); + } else { + delete row; + } + } + + if (opts.sort_by != processlist_sort_by_t::none && filtered_rows.size() > 1) { + std::stable_sort( + filtered_rows.begin(), + filtered_rows.end(), + [&opts](const SQLite3_row* lhs, const SQLite3_row* rhs) { + return pgsql_pl_row_less(lhs, rhs, opts); + } + ); + } + + size_t begin = std::min(opts.offset, filtered_rows.size()); + size_t end = begin; + if (opts.disable_pagination) { + begin = 0; + end = filtered_rows.size(); + } else { + const uint64_t requested_end = static_cast(begin) + static_cast(opts.limit); + end = std::min(filtered_rows.size(), static_cast(requested_end)); + } + + std::vector paged_rows; + paged_rows.reserve(end > begin ? (end - begin) : 0); + + for (size_t idx = 0; idx < filtered_rows.size(); ++idx) { + SQLite3_row* row = filtered_rows[idx]; + if (idx >= begin && idx < end) { + paged_rows.push_back(row); + } else { + delete row; + } + } + + result->rows.swap(paged_rows); + result->rows_count = static_cast(result->rows.size()); +} + +} // namespace + SQLite3_result* PgSQL_Threads_Handler::SQL3_Processlist(processlist_config_t args) { const int colnum = 18; char port[NI_MAXSERV]; @@ -4912,6 +5177,14 @@ SQLite3_result* PgSQL_Threads_Handler::SQL3_Processlist(processlist_config_t arg } pthread_mutex_unlock(&thr->thread_mutex); } + + /** + * Apply optional in-memory query options used by MCP and other internal + * consumers. Legacy callers keep `query_options.enabled=false`, so their + * behavior remains unchanged and they still receive the full processlist. + */ + apply_pgsql_processlist_query_options(result, args.query_options); + return result; } diff --git a/lib/Stats_Tool_Handler.cpp b/lib/Stats_Tool_Handler.cpp index 266f7d2eb..4c742c24d 100644 --- a/lib/Stats_Tool_Handler.cpp +++ b/lib/Stats_Tool_Handler.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "../deps/json/json.hpp" using json = nlohmann::json; @@ -20,6 +21,8 @@ using json = nlohmann::json; extern ProxySQL_Admin *GloAdmin; extern MySQL_Logger *GloMyLogger; +extern MySQL_Threads_Handler *GloMTH; +extern PgSQL_Threads_Handler *GloPTH; // Latency bucket thresholds in microseconds for commands_counters histogram static const std::vector LATENCY_BUCKET_THRESHOLDS = { @@ -72,6 +75,14 @@ static const std::map> CATEGORY_PREFIXES = */ static constexpr uint32_t SHOW_QUERIES_MAX_LIMIT_HARDCODED = 1000; +/** + * Hard upper bound for configurable `mcp_stats_show_processlist_max_rows`. + * + * The runtime MCP variable can reduce this value, but cannot exceed it. + * It protects the process from unbounded processlist page windows. + */ +static constexpr uint32_t SHOW_PROCESSLIST_MAX_LIMIT_HARDCODED = 1000; + /** * @brief Parse and validate a backend filter in `host:port` format. * @@ -412,14 +423,47 @@ json Stats_Tool_Handler::get_tool_list() { {"type", "string"}, {"description", "Filter by username"} }}, + {"database", { + {"type", "string"}, + {"description", "Filter by schema/database name"} + }}, {"hostgroup", { {"type", "integer"}, {"description", "Filter by hostgroup ID"} }}, + {"command", { + {"type", "string"}, + {"description", "Filter by command/status (for example Query, Sleep, Connect)"} + }}, + {"session_id", { + {"type", "integer"}, + {"description", "Filter by ProxySQL SessionID"} + }}, {"min_time_ms", { {"type", "integer"}, {"description", "Only show sessions running longer than N milliseconds"} }}, + {"match_info", { + {"type", "string"}, + {"description", "Substring filter on current query text/info"} + }}, + {"info_case_sensitive", { + {"type", "boolean"}, + {"description", "Case-sensitive matching for match_info (default: false)"}, + {"default", false} + }}, + {"sort_by", { + {"type", "string"}, + {"enum", {"time_ms", "session_id", "username", "hostgroup", "command"}}, + {"description", "Sort key (default: time_ms)"}, + {"default", "time_ms"} + }}, + {"sort_order", { + {"type", "string"}, + {"enum", {"asc", "desc"}}, + {"description", "Sort direction (default: desc)"}, + {"default", "desc"} + }}, {"limit", { {"type", "integer"}, {"description", "Maximum number of sessions to return (default: 100)"}, @@ -1185,108 +1229,223 @@ json Stats_Tool_Handler::handle_show_status(const json& arguments) { /** * @brief Shows all currently active sessions being processed by ProxySQL * - * Returns detailed information about each active session including client/backend - * connection details, current command, execution time, and query info. Includes - * summary statistics grouped by user, hostgroup, and command type. + * Reads live in-memory processlist state via `SQL3_Processlist()` and applies + * typed filters/sort/pagination through `processlist_query_options_t`. + * + * This avoids stale reads from runtime-populated `stats_*_processlist` tables. * * @param arguments JSON object with optional parameters: * - db_type: "mysql" (default) or "pgsql" * - username: Filter by username + * - database: Filter by schema/database * - hostgroup: Filter by hostgroup ID + * - command: Filter by command/status text + * - session_id: Filter by SessionID * - min_time_ms: Only show sessions running longer than N milliseconds + * - match_info: Optional substring filter on query/info text + * - info_case_sensitive: Optional case-sensitive toggle for match_info + * - sort_by: "time_ms" (default), "session_id", "username", "hostgroup", "command" + * - sort_order: "desc" (default) or "asc" * - limit: Maximum number of sessions to return (default: 100) * - offset: Skip first N results (default: 0) * - * @return JSON response with sessions array and summary statistics + * @return JSON response with filtered session rows and summary buckets. */ json Stats_Tool_Handler::handle_show_processlist(const json& arguments) { std::string db_type = arguments.value("db_type", "mysql"); std::string username = arguments.value("username", ""); + std::string database = arguments.value("database", ""); int hostgroup = arguments.value("hostgroup", -1); + std::string command = arguments.value("command", ""); + long long session_id = arguments.value("session_id", -1LL); int min_time_ms = arguments.value("min_time_ms", -1); + std::string match_info = arguments.value("match_info", ""); + bool info_case_sensitive = arguments.value("info_case_sensitive", false); + std::string sort_by = arguments.value("sort_by", "time_ms"); + std::string sort_order = arguments.value("sort_order", "desc"); int limit = arguments.value("limit", 100); int offset = arguments.value("offset", 0); - std::string table = (db_type == "pgsql") ? "stats_pgsql_processlist" : "stats_mysql_processlist"; - std::string db_col = (db_type == "pgsql") ? "database" : "db"; - - std::string sql = "SELECT ThreadID, SessionID, user, " + db_col + ", cli_host, cli_port, " - "hostgroup, srv_host, srv_port, command, time_ms, info " - "FROM stats." + table + " WHERE 1=1"; - - if (!username.empty()) { - sql += " AND user = '" + sql_escape(username) + "'"; + if (limit < 0) { + return create_error_response("limit must be >= 0"); } - if (hostgroup >= 0) { - sql += " AND hostgroup = " + std::to_string(hostgroup); + if (offset < 0) { + return create_error_response("offset must be >= 0"); + } + if (hostgroup < -1) { + return create_error_response("hostgroup must be >= -1"); + } + if (min_time_ms < -1) { + return create_error_response("min_time_ms must be >= -1"); + } + if (session_id < -1) { + return create_error_response("session_id must be >= -1"); } - if (min_time_ms >= 0) { - sql += " AND time_ms >= " + std::to_string(min_time_ms); + if (session_id > static_cast(std::numeric_limits::max())) { + return create_error_response("session_id is too large"); } - sql += " ORDER BY time_ms DESC LIMIT " + std::to_string(limit) + " OFFSET " + std::to_string(offset); + processlist_sort_by_t sort_mode = processlist_sort_by_t::time_ms; + if (sort_by == "session_id") { + sort_mode = processlist_sort_by_t::session_id; + } else if (sort_by == "username") { + sort_mode = processlist_sort_by_t::username; + } else if (sort_by == "hostgroup") { + sort_mode = processlist_sort_by_t::hostgroup; + } else if (sort_by == "command") { + sort_mode = processlist_sort_by_t::command; + } else if (sort_by != "time_ms") { + return create_error_response("Invalid sort_by: " + sort_by); + } - SQLite3_result* resultset = NULL; - int cols = 0; - std::string err = execute_admin_query(sql.c_str(), &resultset, &cols); + bool sort_desc = true; + if (sort_order == "asc") { + sort_desc = false; + } else if (sort_order != "desc") { + return create_error_response("Invalid sort_order: " + sort_order); + } - if (!err.empty()) { - return create_error_response("Failed to query processlist: " + err); + if (!GloAdmin) { + return create_error_response("ProxySQL Admin not available"); } - json sessions = json::array(); - std::map by_user, by_hostgroup, by_command; + const bool is_pgsql = (db_type == "pgsql"); + if (!is_pgsql && db_type != "mysql") { + return create_error_response("Invalid db_type: " + db_type); + } + if (is_pgsql && !GloPTH) { + return create_error_response("PgSQL threads handler not available"); + } + if (!is_pgsql && !GloMTH) { + return create_error_response("MySQL threads handler not available"); + } - // Get total count - std::string count_sql = "SELECT COUNT(*) FROM stats." + table; - SQLite3_result* count_rs = NULL; - int count_cols = 0; - int total_sessions = 0; - std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols, false); - if (!count_err.empty()) { - if (count_rs) { - delete count_rs; + uint32_t configured_cap = 200; + if (mcp_handler) { + const int configured_value = mcp_handler->variables.mcp_stats_show_processlist_max_rows; + if (configured_value > 0) { + configured_cap = static_cast(configured_value); } - proxy_error("show_processlist: failed to count rows: %s\n", count_err.c_str()); - return create_error_response("Failed to count processlist rows: " + count_err); } - if (count_rs && count_rs->rows_count > 0 && count_rs->rows[0]->fields[0]) { - total_sessions = std::stoi(count_rs->rows[0]->fields[0]); + if (configured_cap > SHOW_PROCESSLIST_MAX_LIMIT_HARDCODED) { + configured_cap = SHOW_PROCESSLIST_MAX_LIMIT_HARDCODED; } - if (count_rs) delete count_rs; - if (resultset) { - for (const auto& row : resultset->rows) { - json session; - session["session_id"] = row->fields[1] ? std::stoll(row->fields[1]) : 0; - session["thread_id"] = row->fields[0] ? std::stoi(row->fields[0]) : 0; - session["user"] = row->fields[2] ? row->fields[2] : ""; - session["database"] = row->fields[3] ? row->fields[3] : ""; - session["client_host"] = row->fields[4] ? row->fields[4] : ""; - session["client_port"] = row->fields[5] ? std::stoi(row->fields[5]) : 0; - session["hostgroup"] = row->fields[6] ? std::stoi(row->fields[6]) : 0; - session["backend_host"] = row->fields[7] ? row->fields[7] : ""; - session["backend_port"] = row->fields[8] ? std::stoi(row->fields[8]) : 0; - session["command"] = row->fields[9] ? row->fields[9] : ""; - session["time_ms"] = row->fields[10] ? std::stoi(row->fields[10]) : 0; - session["info"] = row->fields[11] ? row->fields[11] : ""; - sessions.push_back(session); - - // Aggregate summaries - std::string u = row->fields[2] ? row->fields[2] : "unknown"; - std::string hg = row->fields[6] ? row->fields[6] : "unknown"; - std::string cmd = row->fields[9] ? row->fields[9] : "unknown"; - by_user[u]++; - by_hostgroup[hg]++; - by_command[cmd]++; + const uint32_t requested_limit = static_cast(limit); + const uint32_t requested_offset = static_cast(offset); + const uint32_t effective_limit = std::min(requested_limit, configured_cap); + const uint32_t capped_offset = std::min(requested_offset, configured_cap); + + processlist_query_options_t query_opts {}; + query_opts.enabled = true; + query_opts.username = username; + query_opts.database = database; + query_opts.hostgroup = hostgroup; + query_opts.command = command; + query_opts.min_time_ms = min_time_ms; + query_opts.has_session_id = (session_id >= 0); + query_opts.session_id = (session_id >= 0) ? static_cast(session_id) : 0; + query_opts.match_info = match_info; + query_opts.info_case_sensitive = info_case_sensitive; + query_opts.sort_by = sort_mode; + query_opts.sort_desc = sort_desc; + query_opts.limit = effective_limit; + query_opts.offset = capped_offset; + + processlist_config_t base_cfg {}; +#ifdef IDLE_THREADS + base_cfg.show_idle_session = is_pgsql + ? GloPTH->variables.session_idle_show_processlist + : GloMTH->variables.session_idle_show_processlist; +#endif + base_cfg.show_extended = is_pgsql + ? GloPTH->variables.show_processlist_extended + : GloMTH->variables.show_processlist_extended; + base_cfg.max_query_length = is_pgsql + ? GloPTH->variables.processlist_max_query_length + : GloMTH->variables.processlist_max_query_length; + base_cfg.query_options = query_opts; + + /** + * Compute the full matched cardinality before pagination so the MCP payload + * can expose deterministic metadata (`total_sessions`) regardless of page. + */ + processlist_config_t count_cfg = base_cfg; + count_cfg.query_options.sort_by = processlist_sort_by_t::none; + count_cfg.query_options.disable_pagination = true; + + SQLite3_result* count_rs = is_pgsql ? GloPTH->SQL3_Processlist(count_cfg) : GloMTH->SQL3_Processlist(count_cfg); + if (!count_rs) { + return create_error_response("Failed to read in-memory processlist for total count"); + } + const int total_sessions = count_rs->rows_count; + delete count_rs; + + SQLite3_result* resultset = is_pgsql ? GloPTH->SQL3_Processlist(base_cfg) : GloMTH->SQL3_Processlist(base_cfg); + if (!resultset) { + return create_error_response("Failed to read in-memory processlist rows"); + } + + auto to_int64 = [](const char* value) -> int64_t { + if (!value || !value[0]) { + return 0; + } + char* end = nullptr; + errno = 0; + long long parsed = strtoll(value, &end, 10); + if (end == value || *end != '\0' || errno != 0) { + return 0; } - delete resultset; - } + return static_cast(parsed); + }; + + json sessions = json::array(); + std::map by_user, by_hostgroup, by_command; + const int command_idx = is_pgsql ? 13 : 11; + const int time_ms_idx = is_pgsql ? 14 : 12; + const int info_idx = is_pgsql ? 15 : 13; + + for (const auto& row : resultset->rows) { + json session; + session["session_id"] = static_cast(to_int64(row->fields[1])); + session["thread_id"] = static_cast(to_int64(row->fields[0])); + session["user"] = row->fields[2] ? row->fields[2] : ""; + session["database"] = row->fields[3] ? row->fields[3] : ""; + session["client_host"] = row->fields[4] ? row->fields[4] : ""; + session["client_port"] = static_cast(to_int64(row->fields[5])); + session["hostgroup"] = static_cast(to_int64(row->fields[6])); + session["local_backend_host"] = row->fields[7] ? row->fields[7] : ""; + session["local_backend_port"] = static_cast(to_int64(row->fields[8])); + session["backend_host"] = row->fields[9] ? row->fields[9] : ""; + session["backend_port"] = static_cast(to_int64(row->fields[10])); + if (is_pgsql) { + session["backend_pid"] = static_cast(to_int64(row->fields[11])); + session["backend_state"] = row->fields[12] ? row->fields[12] : ""; + } + session["command"] = row->fields[command_idx] ? row->fields[command_idx] : ""; + session["time_ms"] = static_cast(to_int64(row->fields[time_ms_idx])); + session["info"] = row->fields[info_idx] ? row->fields[info_idx] : ""; + sessions.push_back(session); + + const std::string summary_user = row->fields[2] ? row->fields[2] : "unknown"; + const std::string summary_hg = row->fields[6] ? row->fields[6] : "unknown"; + const std::string summary_cmd = row->fields[command_idx] ? row->fields[command_idx] : "unknown"; + by_user[summary_user]++; + by_hostgroup[summary_hg]++; + by_command[summary_cmd]++; + } + delete resultset; json result; result["db_type"] = db_type; result["total_sessions"] = total_sessions; result["sessions"] = sessions; + result["requested_limit"] = requested_limit; + result["requested_offset"] = requested_offset; + result["effective_limit"] = effective_limit; + result["limit_cap"] = configured_cap; + result["sort_by"] = sort_by; + result["sort_order"] = sort_desc ? "desc" : "asc"; result["summary"] = { {"by_user", by_user}, {"by_hostgroup", by_hostgroup}, diff --git a/src/proxysql.cfg b/src/proxysql.cfg index 604e64c3f..23f0b5729 100644 --- a/src/proxysql.cfg +++ b/src/proxysql.cfg @@ -71,6 +71,7 @@ mcp_variables= mcp_rag_endpoint_auth="" mcp_timeout_ms=30000 mcp_stats_show_queries_max_rows=200 + mcp_stats_show_processlist_max_rows=200 } # GenAI module configuration diff --git a/test/tap/tests/mcp_pgsql_concurrency_stress-t.cpp b/test/tap/tests/mcp_pgsql_concurrency_stress-t.cpp new file mode 100644 index 000000000..d4fcbfceb --- /dev/null +++ b/test/tap/tests/mcp_pgsql_concurrency_stress-t.cpp @@ -0,0 +1,1013 @@ +/** + * @file mcp_pgsql_concurrency_stress-t.cpp + * @brief TAP stress test for concurrent PgSQL traffic with parallel MCP stats polling. + * + * This test generates sustained traffic through ProxySQL's PgSQL frontend while + * concurrently querying MCP stats tools from multiple threads. + * + * Workload characteristics: + * 1. Many concurrent PgSQL client connections. + * 2. Mixed query stream: + * - simple reads (`SELECT 1`, `SELECT 1 + 2`) + * - mutable workload over a test-created table (`INSERT/UPDATE/DELETE/SELECT`) + * - randomized sleeps (`SELECT pg_sleep(x)`) + * 3. Parallel MCP polling while workload is active: + * - `stats.show_processlist` with sorting, metadata, and `match_info` filtering + * - `stats.show_queries` with sorting, metadata, and `match_digest_text` filtering + * + * The primary goal is validating that MCP stats remains correct and responsive + * under concurrent client traffic. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mysql.h" +#include "libpq-fe.h" + +#include "tap.h" +#include "command_line.h" +#include "utils.h" +#include "mcp_client.h" + +using json = nlohmann::json; + +namespace { + +/** + * Number of parallel PgSQL workers used to generate traffic. + */ +static constexpr int k_worker_threads = 24; + +/** + * Total stress runtime in seconds. + */ +static constexpr int k_runtime_seconds = 12; + +/** + * Configured MCP cap for `stats.show_processlist`. + */ +static constexpr int k_processlist_cap = 80; + +/** + * Configured MCP cap for `stats.show_queries`. + */ +static constexpr int k_show_queries_cap = 120; + +/** + * Requested processlist limit used to verify cap metadata. + */ +static constexpr int k_processlist_requested_limit = 500; + +/** + * Requested show_queries limit used to verify cap metadata. + */ +static constexpr int k_show_queries_requested_limit = 500; + +/** + * Minimum query volume expected from the mixed workload. + */ +static constexpr uint64_t k_min_total_queries = 500; + +/** + * Minimum successful poll count expected from each MCP poller thread. + */ +static constexpr uint64_t k_min_successful_polls = 10; + +using PGConnPtr = std::unique_ptr; + +/** + * @brief Aggregated counters for the PgSQL workload generator. + */ +struct workload_stats_t { + std::atomic connected_workers {0}; + std::atomic connection_failures {0}; + std::atomic total_queries {0}; + std::atomic failed_queries {0}; + std::atomic simple_queries {0}; + std::atomic insert_queries {0}; + std::atomic update_queries {0}; + std::atomic delete_queries {0}; + std::atomic table_select_queries {0}; + std::atomic sleep_queries {0}; +}; + +/** + * @brief Aggregated counters for the processlist MCP poller. + */ +struct processlist_poll_stats_t { + std::atomic calls_total {0}; + std::atomic calls_ok {0}; + std::atomic calls_failed {0}; + std::atomic parse_failures {0}; + std::atomic metadata_failures {0}; + std::atomic row_shape_failures {0}; + std::atomic non_empty_snapshots {0}; + std::atomic filtered_calls_ok {0}; + std::atomic filtered_non_empty_snapshots {0}; + std::atomic filtered_invalid_rows {0}; + std::atomic max_sessions_observed {0}; +}; + +/** + * @brief Aggregated counters for the show_queries MCP poller. + */ +struct show_queries_poll_stats_t { + std::atomic calls_total {0}; + std::atomic calls_ok {0}; + std::atomic calls_failed {0}; + std::atomic parse_failures {0}; + std::atomic metadata_failures {0}; + std::atomic sorted_failures {0}; + std::atomic digests_seen_snapshots {0}; + std::atomic filtered_calls_ok {0}; + std::atomic filtered_non_empty_snapshots {0}; + std::atomic filtered_invalid_rows {0}; +}; + +/** + * @brief Execute an admin SQL statement and consume any result set. + * + * @param admin Open admin connection. + * @param query SQL statement to execute. + * @param context Diagnostic label used on failure. + * @return true on success, false on failure. + */ +bool run_admin_stmt(MYSQL* admin, const std::string& query, const char* context) { + if (!admin) { + diag("%s: admin connection is null", context); + return false; + } + if (mysql_query(admin, query.c_str()) != 0) { + diag("%s failed: %s", context, mysql_error(admin)); + return false; + } + MYSQL_RES* res = mysql_store_result(admin); + if (res) { + mysql_free_result(res); + } + return true; +} + +/** + * @brief Configure MCP runtime for concurrent stats polling. + * + * @param admin Open admin connection. + * @param cl TAP command-line configuration. + * @return true if all setup statements succeeded. + */ +bool configure_mcp_runtime(MYSQL* admin, const CommandLine& cl) { + const std::vector statements = { + "SET mcp-port=" + std::to_string(cl.mcp_port), + "SET mcp-use_ssl=false", + "SET mcp-enabled=true", + "SET mcp-stats_endpoint_auth=''", + "SET mcp-stats_show_processlist_max_rows=" + std::to_string(k_processlist_cap), + "SET mcp-stats_show_queries_max_rows=" + std::to_string(k_show_queries_cap), + "LOAD MCP VARIABLES TO RUNTIME" + }; + + for (const auto& stmt : statements) { + if (!run_admin_stmt(admin, stmt, "MCP stress setup")) { + return false; + } + } + return true; +} + +/** + * @brief Restore MCP variables changed by this test. + * + * @param admin Open admin connection. + */ +void restore_mcp_runtime(MYSQL* admin) { + if (!admin) { + return; + } + run_q(admin, "SET mcp-stats_show_processlist_max_rows=200"); + run_q(admin, "SET mcp-stats_show_queries_max_rows=200"); + run_q(admin, "SET mcp-stats_endpoint_auth=''"); + run_q(admin, "SET mcp-enabled=false"); + run_q(admin, "LOAD MCP VARIABLES TO RUNTIME"); +} + +/** + * @brief Build a PgSQL connection string from TAP environment settings. + * + * @param cl TAP command-line configuration. + * @param use_root_credentials Whether to use privileged pgsql root credentials. + * @param application_name Optional application_name value. + * @return Connection string suitable for `PQconnectdb`. + */ +std::string build_pg_conninfo(const CommandLine& cl, bool use_root_credentials, const std::string& application_name) { + const char* host = use_root_credentials ? cl.pgsql_root_host : cl.pgsql_host; + const int port = use_root_credentials ? cl.pgsql_root_port : cl.pgsql_port; + const char* user = use_root_credentials ? cl.pgsql_root_username : cl.pgsql_username; + const char* pass = use_root_credentials ? cl.pgsql_root_password : cl.pgsql_password; + + std::ostringstream ss; + ss << "host=" << host + << " port=" << port + << " dbname=postgres" + << " user=" << user + << " password=" << pass + << " sslmode=disable" + << " connect_timeout=5"; + + if (!application_name.empty()) { + ss << " application_name=" << application_name; + } + + return ss.str(); +} + +/** + * @brief Open a PgSQL connection through ProxySQL. + * + * @param cl TAP command-line configuration. + * @param use_root_credentials Whether to use privileged pgsql root credentials. + * @param application_name Optional application name. + * @param error Output error text on failure. + * @return Managed connection pointer (null on failure). + */ +PGConnPtr create_pg_connection( + const CommandLine& cl, + bool use_root_credentials, + const std::string& application_name, + std::string& error +) { + const std::string conninfo = build_pg_conninfo(cl, use_root_credentials, application_name); + PGconn* raw = PQconnectdb(conninfo.c_str()); + PGConnPtr conn(raw, &PQfinish); + if (!raw || PQstatus(raw) != CONNECTION_OK) { + error = raw ? PQerrorMessage(raw) : "PQconnectdb returned null connection"; + return PGConnPtr(nullptr, &PQfinish); + } + return conn; +} + +/** + * @brief Execute a SQL statement on PgSQL and validate status class. + * + * @param conn Open PgSQL connection. + * @param sql SQL statement to execute. + * @param error Output error text on failure. + * @return true on success (command or tuple result), false on failure. + */ +bool execute_pg_sql(PGconn* conn, const std::string& sql, std::string& error) { + if (!conn) { + error = "PgSQL connection is null"; + return false; + } + + PGresult* res = PQexec(conn, sql.c_str()); + if (!res) { + error = PQerrorMessage(conn); + return false; + } + + const ExecStatusType status = PQresultStatus(res); + const bool ok = (status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK); + if (!ok) { + error = PQresultErrorMessage(res) ? PQresultErrorMessage(res) : "unknown PgSQL execution error"; + } + PQclear(res); + return ok; +} + +/** + * @brief Case-insensitive substring check. + * + * @param haystack Candidate text. + * @param needle Search token. + * @return true when @p needle appears in @p haystack ignoring case. + */ +bool contains_icase(const std::string& haystack, const std::string& needle) { + if (needle.empty()) { + return true; + } + for (size_t p = 0; p < haystack.size(); ++p) { + size_t i = 0; + while (i < needle.size() && (p + i) < haystack.size()) { + const unsigned char lhs = static_cast(haystack[p + i]); + const unsigned char rhs = static_cast(needle[i]); + if (std::tolower(lhs) != std::tolower(rhs)) { + break; + } + ++i; + } + if (i == needle.size()) { + return true; + } + } + return false; +} + +/** + * @brief Parse successful MCP tool payload into a result object. + * + * Supported payload shapes: + * - direct result object: `{ ... }` + * - legacy wrapped payload: `{ "success": true, "result": { ... } }` + * + * @param response MCP response object. + * @param result_obj Output parsed result object. + * @param error Output error text on failure. + * @return true on successful parsing. + */ +bool extract_tool_result(const MCPResponse& response, json& result_obj, std::string& error) { + if (!response.is_success()) { + error = response.get_error_message(); + return false; + } + + const json& payload = response.get_result(); + if (!payload.is_object()) { + error = "MCP payload is not an object"; + return false; + } + + if (!payload.contains("success") && !payload.contains("result")) { + result_obj = payload; + return true; + } + + if (!payload.value("success", false)) { + error = payload.value("error", std::string("tool returned error payload")); + return false; + } + + if (!payload.contains("result") || !payload["result"].is_object()) { + error = "wrapped MCP payload missing object field 'result'"; + return false; + } + + result_obj = payload["result"]; + return true; +} + +/** + * @brief Atomically update an integer maximum value. + * + * @tparam T Unsigned integer type. + * @param max_value Atomic max target. + * @param candidate Candidate max value. + */ +template +void atomic_update_max(std::atomic& max_value, T candidate) { + T current = max_value.load(std::memory_order_relaxed); + while (candidate > current && !max_value.compare_exchange_weak( + current, candidate, std::memory_order_relaxed, std::memory_order_relaxed + )) { + } +} + +/** + * @brief Create workload table used by concurrent PgSQL workers. + * + * @param conn Open setup connection. + * @param table_name Test table name. + * @param error Output error text on failure. + * @return true when table creation succeeds. + */ +bool create_workload_table(PGconn* conn, const std::string& table_name, std::string& error) { + const std::string drop_sql = "DROP TABLE IF EXISTS " + table_name; + if (!execute_pg_sql(conn, drop_sql, error)) { + return false; + } + + const std::string create_sql = + "CREATE TABLE " + table_name + " (" + "id BIGSERIAL PRIMARY KEY, " + "worker_id INT NOT NULL, " + "v INT NOT NULL, " + "payload TEXT NOT NULL, " + "created_at TIMESTAMP NOT NULL DEFAULT NOW()" + ")"; + + return execute_pg_sql(conn, create_sql, error); +} + +/** + * @brief Drop workload table created by this test. + * + * @param conn Open setup connection. + * @param table_name Test table name. + * @param error Output error text on failure. + * @return true when table drop succeeds. + */ +bool drop_workload_table(PGconn* conn, const std::string& table_name, std::string& error) { + const std::string drop_sql = "DROP TABLE IF EXISTS " + table_name; + return execute_pg_sql(conn, drop_sql, error); +} + +/** + * @brief Worker thread body that issues mixed PgSQL traffic continuously. + * + * @param worker_id Worker identifier. + * @param cl TAP command-line configuration. + * @param table_name Shared workload table. + * @param stop Stop flag set by main thread. + * @param stats Shared workload counters. + */ +void run_pgsql_worker( + int worker_id, + const CommandLine& cl, + const std::string& table_name, + const std::atomic& stop, + workload_stats_t& stats +) { + std::string err; + PGConnPtr conn = create_pg_connection(cl, true, "mcp_stress_worker_" + std::to_string(worker_id), err); + if (!conn) { + conn = create_pg_connection(cl, false, "mcp_stress_worker_" + std::to_string(worker_id), err); + } + if (!conn) { + stats.connection_failures.fetch_add(1, std::memory_order_relaxed); + return; + } + + stats.connected_workers.fetch_add(1, std::memory_order_relaxed); + + std::mt19937 rng(static_cast( + std::chrono::steady_clock::now().time_since_epoch().count() ^ (worker_id * 2654435761U) + )); + std::uniform_int_distribution op_dist(0, 99); + std::uniform_int_distribution val_dist(1, 100000); + std::uniform_int_distribution sleep_ms_dist(50, 350); + + uint64_t local_seq = 0; + while (!stop.load(std::memory_order_relaxed)) { + const int roll = op_dist(rng); + std::string sql; + enum class op_t { simple, ins, upd, del, sel, sleep } op = op_t::simple; + + if (roll < 20) { + sql = "SELECT 1"; + op = op_t::simple; + } else if (roll < 35) { + sql = "SELECT 1 + 2"; + op = op_t::simple; + } else if (roll < 55) { + sql = "INSERT INTO " + table_name + " (worker_id, v, payload) VALUES (" + + std::to_string(worker_id) + ", " + std::to_string(val_dist(rng)) + ", 'p_" + + std::to_string(worker_id) + "_" + std::to_string(local_seq) + "')"; + op = op_t::ins; + } else if (roll < 70) { + sql = "UPDATE " + table_name + " SET v = v + 1, payload = payload || '_u' " + "WHERE id = (SELECT id FROM " + table_name + " WHERE worker_id=" + + std::to_string(worker_id) + " ORDER BY id DESC LIMIT 1)"; + op = op_t::upd; + } else if (roll < 82) { + sql = "DELETE FROM " + table_name + " WHERE id IN (SELECT id FROM " + table_name + + " WHERE worker_id=" + std::to_string(worker_id) + " ORDER BY id ASC LIMIT 1)"; + op = op_t::del; + } else if (roll < 90) { + sql = "SELECT COUNT(*) FROM " + table_name + " WHERE worker_id=" + std::to_string(worker_id); + op = op_t::sel; + } else { + const double sleep_s = static_cast(sleep_ms_dist(rng)) / 1000.0; + std::ostringstream ss; + ss.setf(std::ios::fixed); + ss.precision(3); + ss << "SELECT pg_sleep(" << sleep_s << ")"; + sql = ss.str(); + op = op_t::sleep; + } + + std::string query_error; + if (!execute_pg_sql(conn.get(), sql, query_error)) { + stats.failed_queries.fetch_add(1, std::memory_order_relaxed); + } else { + stats.total_queries.fetch_add(1, std::memory_order_relaxed); + switch (op) { + case op_t::simple: + stats.simple_queries.fetch_add(1, std::memory_order_relaxed); + break; + case op_t::ins: + stats.insert_queries.fetch_add(1, std::memory_order_relaxed); + break; + case op_t::upd: + stats.update_queries.fetch_add(1, std::memory_order_relaxed); + break; + case op_t::del: + stats.delete_queries.fetch_add(1, std::memory_order_relaxed); + break; + case op_t::sel: + stats.table_select_queries.fetch_add(1, std::memory_order_relaxed); + break; + case op_t::sleep: + stats.sleep_queries.fetch_add(1, std::memory_order_relaxed); + break; + } + } + + ++local_seq; + } +} + +/** + * @brief MCP poller thread that validates `stats.show_processlist` during load. + * + * @param cl TAP command-line configuration. + * @param stop Stop flag set by main thread. + * @param stats Shared processlist poll counters. + */ +void run_processlist_poller( + const CommandLine& cl, + const std::atomic& stop, + processlist_poll_stats_t& stats +) { + MCPClient mcp(cl.admin_host, cl.mcp_port); + if (std::strlen(cl.mcp_auth_token) > 0) { + mcp.set_auth_token(cl.mcp_auth_token); + } + + uint64_t iter = 0; + while (!stop.load(std::memory_order_relaxed)) { + stats.calls_total.fetch_add(1, std::memory_order_relaxed); + const MCPResponse resp = mcp.call_tool( + "stats", + "show_processlist", + json{ + {"db_type", "pgsql"}, + {"sort_by", "time_ms"}, + {"sort_order", "desc"}, + {"limit", k_processlist_requested_limit} + } + ); + + json result_obj = json::object(); + std::string parse_error; + if (!extract_tool_result(resp, result_obj, parse_error)) { + stats.calls_failed.fetch_add(1, std::memory_order_relaxed); + stats.parse_failures.fetch_add(1, std::memory_order_relaxed); + } else { + stats.calls_ok.fetch_add(1, std::memory_order_relaxed); + + const int requested_limit = result_obj.value("requested_limit", -1); + const int effective_limit = result_obj.value("effective_limit", -1); + const int limit_cap = result_obj.value("limit_cap", -1); + if (!(requested_limit == k_processlist_requested_limit && + effective_limit >= 0 && + limit_cap == k_processlist_cap && + effective_limit <= limit_cap)) { + stats.metadata_failures.fetch_add(1, std::memory_order_relaxed); + } + + if (!result_obj.contains("sessions") || !result_obj["sessions"].is_array()) { + stats.row_shape_failures.fetch_add(1, std::memory_order_relaxed); + } else { + const json& sessions = result_obj["sessions"]; + if (!sessions.empty()) { + stats.non_empty_snapshots.fetch_add(1, std::memory_order_relaxed); + atomic_update_max(stats.max_sessions_observed, static_cast(sessions.size())); + + const json& first = sessions[0]; + const bool shape_ok = + first.contains("session_id") && + first.contains("user") && + first.contains("database") && + first.contains("command") && + first.contains("time_ms") && + first.contains("info"); + if (!shape_ok) { + stats.row_shape_failures.fetch_add(1, std::memory_order_relaxed); + } + } + } + } + + if ((iter % 4U) == 0U) { + const MCPResponse filter_resp = mcp.call_tool( + "stats", + "show_processlist", + json{ + {"db_type", "pgsql"}, + {"sort_by", "time_ms"}, + {"sort_order", "desc"}, + {"limit", 50}, + {"match_info", "pg_sleep"}, + {"info_case_sensitive", false} + } + ); + + json filter_obj = json::object(); + std::string filter_error; + if (extract_tool_result(filter_resp, filter_obj, filter_error)) { + stats.filtered_calls_ok.fetch_add(1, std::memory_order_relaxed); + if (filter_obj.contains("sessions") && filter_obj["sessions"].is_array()) { + const json& sessions = filter_obj["sessions"]; + if (!sessions.empty()) { + stats.filtered_non_empty_snapshots.fetch_add(1, std::memory_order_relaxed); + for (const auto& row : sessions) { + const std::string info = row.value("info", std::string("")); + if (!contains_icase(info, "pg_sleep")) { + stats.filtered_invalid_rows.fetch_add(1, std::memory_order_relaxed); + break; + } + } + } + } + } + } + + ++iter; + std::this_thread::sleep_for(std::chrono::milliseconds(40)); + } +} + +/** + * @brief MCP poller thread that validates `stats.show_queries` during load. + * + * @param cl TAP command-line configuration. + * @param stop Stop flag set by main thread. + * @param stats Shared show_queries poll counters. + */ +void run_show_queries_poller( + const CommandLine& cl, + const std::atomic& stop, + show_queries_poll_stats_t& stats +) { + MCPClient mcp(cl.admin_host, cl.mcp_port); + if (std::strlen(cl.mcp_auth_token) > 0) { + mcp.set_auth_token(cl.mcp_auth_token); + } + + uint64_t iter = 0; + while (!stop.load(std::memory_order_relaxed)) { + stats.calls_total.fetch_add(1, std::memory_order_relaxed); + const MCPResponse resp = mcp.call_tool( + "stats", + "show_queries", + json{ + {"db_type", "pgsql"}, + {"sort_by", "count"}, + {"limit", k_show_queries_requested_limit} + } + ); + + json result_obj = json::object(); + std::string parse_error; + if (!extract_tool_result(resp, result_obj, parse_error)) { + stats.calls_failed.fetch_add(1, std::memory_order_relaxed); + stats.parse_failures.fetch_add(1, std::memory_order_relaxed); + } else { + stats.calls_ok.fetch_add(1, std::memory_order_relaxed); + + const int requested_limit = result_obj.value("requested_limit", -1); + const int effective_limit = result_obj.value("effective_limit", -1); + const int limit_cap = result_obj.value("limit_cap", -1); + if (!(requested_limit == k_show_queries_requested_limit && + effective_limit >= 0 && + limit_cap == k_show_queries_cap && + effective_limit <= limit_cap)) { + stats.metadata_failures.fetch_add(1, std::memory_order_relaxed); + } + + const uint64_t total_digests = result_obj.value("total_digests", static_cast(0)); + if (total_digests > 0) { + stats.digests_seen_snapshots.fetch_add(1, std::memory_order_relaxed); + } + + if (result_obj.contains("queries") && result_obj["queries"].is_array()) { + const json& queries = result_obj["queries"]; + uint64_t prev = std::numeric_limits::max(); + for (const auto& row : queries) { + const uint64_t count_star = row.value("count_star", static_cast(0)); + if (count_star > prev) { + stats.sorted_failures.fetch_add(1, std::memory_order_relaxed); + break; + } + prev = count_star; + } + } + } + + if ((iter % 3U) == 0U) { + const MCPResponse filter_resp = mcp.call_tool( + "stats", + "show_queries", + json{ + {"db_type", "pgsql"}, + {"sort_by", "count"}, + {"limit", 50}, + {"match_digest_text", "PG_SLEEP"}, + {"digest_text_case_sensitive", false} + } + ); + + json filter_obj = json::object(); + std::string filter_error; + if (extract_tool_result(filter_resp, filter_obj, filter_error)) { + stats.filtered_calls_ok.fetch_add(1, std::memory_order_relaxed); + if (filter_obj.contains("queries") && filter_obj["queries"].is_array()) { + const json& queries = filter_obj["queries"]; + if (!queries.empty()) { + stats.filtered_non_empty_snapshots.fetch_add(1, std::memory_order_relaxed); + for (const auto& row : queries) { + const std::string digest_text = row.value("digest_text", std::string("")); + if (!contains_icase(digest_text, "PG_SLEEP")) { + stats.filtered_invalid_rows.fetch_add(1, std::memory_order_relaxed); + break; + } + } + } + } + } + } + + ++iter; + std::this_thread::sleep_for(std::chrono::milliseconds(60)); + } +} + +} // namespace + +int main(int argc, char** argv) { + (void)argc; + (void)argv; + + plan(31); + + CommandLine cl; + if (cl.getEnv()) { + diag("Failed to load TAP environment"); + return exit_status(); + } + + MYSQL* admin = nullptr; + bool can_continue = true; + + admin = init_mysql_conn(cl.admin_host, cl.admin_port, cl.admin_username, cl.admin_password); + ok(admin != nullptr, "Admin connection established"); + if (!admin) { + skip(30, "Cannot continue without admin connection"); + can_continue = false; + } + + if (can_continue) { + const bool configured = configure_mcp_runtime(admin, cl); + ok(configured, "Configured MCP runtime for PgSQL concurrency stress"); + if (!configured) { + skip(29, "Cannot continue without MCP runtime configuration"); + can_continue = false; + } + } + + MCPClient* mcp = nullptr; + if (can_continue) { + mcp = new MCPClient(cl.admin_host, cl.mcp_port); + if (std::strlen(cl.mcp_auth_token) > 0) { + mcp->set_auth_token(cl.mcp_auth_token); + } + const bool mcp_ready = mcp->check_server(); + ok(mcp_ready, "MCP server reachable at %s", mcp->get_connection_info().c_str()); + if (!mcp_ready) { + skip(28, "Cannot continue without MCP connectivity"); + can_continue = false; + } + } + + PGConnPtr setup_conn(nullptr, &PQfinish); + std::string setup_error; + if (can_continue) { + setup_conn = create_pg_connection(cl, true, "mcp_stress_setup", setup_error); + if (!setup_conn) { + setup_conn = create_pg_connection(cl, false, "mcp_stress_setup", setup_error); + } + if (setup_conn) { + ok(true, "PgSQL workload connection established via ProxySQL"); + } else { + ok(false, "PgSQL workload connection established via ProxySQL: %s", setup_error.c_str()); + skip(27, "Cannot continue without PgSQL connectivity"); + can_continue = false; + } + } + + std::string table_name = "mcp_pgsql_stress_" + std::to_string(static_cast(getpid())); + if (can_continue) { + std::string table_error; + const bool table_created = create_workload_table(setup_conn.get(), table_name, table_error); + ok(table_created, "Created stress workload table `%s`", table_name.c_str()); + if (!table_created) { + diag("Table setup error: %s", table_error.c_str()); + skip(26, "Cannot continue without workload table"); + can_continue = false; + } + } + + workload_stats_t workload_stats {}; + processlist_poll_stats_t processlist_stats {}; + show_queries_poll_stats_t show_queries_stats {}; + + if (can_continue) { + std::atomic stop {false}; + std::vector workers; + workers.reserve(k_worker_threads); + for (int i = 0; i < k_worker_threads; ++i) { + workers.emplace_back(run_pgsql_worker, i, std::cref(cl), std::cref(table_name), std::cref(stop), std::ref(workload_stats)); + } + + std::thread processlist_poller( + run_processlist_poller, std::cref(cl), std::cref(stop), std::ref(processlist_stats) + ); + std::thread show_queries_poller( + run_show_queries_poller, std::cref(cl), std::cref(stop), std::ref(show_queries_stats) + ); + + std::this_thread::sleep_for(std::chrono::seconds(k_runtime_seconds)); + stop.store(true, std::memory_order_relaxed); + + for (auto& t : workers) { + if (t.joinable()) { + t.join(); + } + } + if (processlist_poller.joinable()) { + processlist_poller.join(); + } + if (show_queries_poller.joinable()) { + show_queries_poller.join(); + } + } + + if (can_continue) { + const uint64_t connected_workers = workload_stats.connected_workers.load(std::memory_order_relaxed); + const uint64_t total_queries = workload_stats.total_queries.load(std::memory_order_relaxed); + const uint64_t failed_queries = workload_stats.failed_queries.load(std::memory_order_relaxed); + const uint64_t simple_queries = workload_stats.simple_queries.load(std::memory_order_relaxed); + const uint64_t insert_queries = workload_stats.insert_queries.load(std::memory_order_relaxed); + const uint64_t update_queries = workload_stats.update_queries.load(std::memory_order_relaxed); + const uint64_t delete_queries = workload_stats.delete_queries.load(std::memory_order_relaxed); + const uint64_t table_select_queries = workload_stats.table_select_queries.load(std::memory_order_relaxed); + const uint64_t sleep_queries = workload_stats.sleep_queries.load(std::memory_order_relaxed); + + ok( + connected_workers >= static_cast(k_worker_threads / 2), + "At least half of worker connections succeeded (%llu/%d)", + static_cast(connected_workers), + k_worker_threads + ); + ok( + total_queries >= k_min_total_queries, + "Workload generated sufficient traffic (%llu >= %llu)", + static_cast(total_queries), + static_cast(k_min_total_queries) + ); + ok(simple_queries > 0, "Simple SELECT workload executed"); + ok( + insert_queries > 0 && update_queries > 0 && delete_queries > 0 && table_select_queries > 0, + "R/W workload mix executed (ins=%llu upd=%llu del=%llu sel=%llu)", + static_cast(insert_queries), + static_cast(update_queries), + static_cast(delete_queries), + static_cast(table_select_queries) + ); + ok(sleep_queries > 0, "pg_sleep workload executed"); + ok(failed_queries == 0, "Workload queries completed without execution errors"); + } + + if (can_continue) { + const uint64_t pl_calls_ok = processlist_stats.calls_ok.load(std::memory_order_relaxed); + const uint64_t pl_calls_failed = processlist_stats.calls_failed.load(std::memory_order_relaxed); + const uint64_t pl_non_empty = processlist_stats.non_empty_snapshots.load(std::memory_order_relaxed); + const uint64_t pl_max_sessions = processlist_stats.max_sessions_observed.load(std::memory_order_relaxed); + const uint64_t pl_metadata_failures = processlist_stats.metadata_failures.load(std::memory_order_relaxed); + const uint64_t pl_row_shape_failures = processlist_stats.row_shape_failures.load(std::memory_order_relaxed); + const uint64_t pl_filtered_ok = processlist_stats.filtered_calls_ok.load(std::memory_order_relaxed); + const uint64_t pl_filtered_non_empty = processlist_stats.filtered_non_empty_snapshots.load(std::memory_order_relaxed); + const uint64_t pl_filtered_invalid = processlist_stats.filtered_invalid_rows.load(std::memory_order_relaxed); + + ok( + pl_calls_ok >= k_min_successful_polls, + "show_processlist poller collected enough successful samples (%llu)", + static_cast(pl_calls_ok) + ); + ok( + pl_calls_ok > pl_calls_failed, + "show_processlist poller success dominates failures (ok=%llu fail=%llu)", + static_cast(pl_calls_ok), + static_cast(pl_calls_failed) + ); + ok(pl_non_empty > 0, "show_processlist observed non-empty snapshots during load"); + ok( + pl_max_sessions >= 3, + "show_processlist observed at least 3 concurrent sessions (max=%llu)", + static_cast(pl_max_sessions) + ); + ok(pl_metadata_failures == 0, "show_processlist limit-cap metadata remained consistent"); + ok(pl_row_shape_failures == 0, "show_processlist row shape remained consistent"); + ok(pl_filtered_ok > 0, "show_processlist filtered polling executed"); + ok(pl_filtered_non_empty > 0, "show_processlist filter returned non-empty snapshots"); + ok(pl_filtered_invalid == 0, "show_processlist filter rows matched expected `pg_sleep` token"); + } + + if (can_continue) { + const uint64_t q_calls_ok = show_queries_stats.calls_ok.load(std::memory_order_relaxed); + const uint64_t q_calls_failed = show_queries_stats.calls_failed.load(std::memory_order_relaxed); + const uint64_t q_metadata_failures = show_queries_stats.metadata_failures.load(std::memory_order_relaxed); + const uint64_t q_sorted_failures = show_queries_stats.sorted_failures.load(std::memory_order_relaxed); + const uint64_t q_digests_seen = show_queries_stats.digests_seen_snapshots.load(std::memory_order_relaxed); + const uint64_t q_filtered_ok = show_queries_stats.filtered_calls_ok.load(std::memory_order_relaxed); + const uint64_t q_filtered_non_empty = show_queries_stats.filtered_non_empty_snapshots.load(std::memory_order_relaxed); + const uint64_t q_filtered_invalid = show_queries_stats.filtered_invalid_rows.load(std::memory_order_relaxed); + + ok( + q_calls_ok >= k_min_successful_polls, + "show_queries poller collected enough successful samples (%llu)", + static_cast(q_calls_ok) + ); + ok( + q_calls_ok > q_calls_failed, + "show_queries poller success dominates failures (ok=%llu fail=%llu)", + static_cast(q_calls_ok), + static_cast(q_calls_failed) + ); + ok(q_digests_seen > 0, "show_queries observed non-zero digest snapshots"); + ok(q_metadata_failures == 0, "show_queries limit-cap metadata remained consistent"); + ok(q_sorted_failures == 0, "show_queries snapshots remained sorted by count_star DESC"); + ok(q_filtered_ok > 0, "show_queries filtered polling executed"); + ok(q_filtered_non_empty > 0, "show_queries filter returned non-empty snapshots"); + ok(q_filtered_invalid == 0, "show_queries filtered rows matched expected digest token"); + } + + if (can_continue) { + const MCPResponse final_queries_resp = mcp->call_tool( + "stats", + "show_queries", + json{ + {"db_type", "pgsql"}, + {"sort_by", "count"}, + {"limit", 50}, + {"match_digest_text", table_name}, + {"digest_text_case_sensitive", false} + } + ); + json final_queries_result = json::object(); + std::string final_queries_error; + const bool final_queries_ok = extract_tool_result(final_queries_resp, final_queries_result, final_queries_error); + ok( + final_queries_ok && final_queries_result.value("db_type", std::string("")) == "pgsql", + "Final show_queries call succeeded for pgsql%s%s", + final_queries_ok ? "" : ": ", + final_queries_ok ? "" : final_queries_error.c_str() + ); + + const MCPResponse final_processlist_resp = mcp->call_tool( + "stats", + "show_processlist", + json{ + {"db_type", "pgsql"}, + {"sort_by", "time_ms"}, + {"sort_order", "desc"}, + {"limit", 20} + } + ); + json final_processlist_result = json::object(); + std::string final_processlist_error; + const bool final_processlist_ok = extract_tool_result( + final_processlist_resp, final_processlist_result, final_processlist_error + ); + ok( + final_processlist_ok && final_processlist_result.value("db_type", std::string("")) == "pgsql", + "Final show_processlist call succeeded for pgsql%s%s", + final_processlist_ok ? "" : ": ", + final_processlist_ok ? "" : final_processlist_error.c_str() + ); + } + + if (can_continue) { + std::string drop_error; + const bool dropped = drop_workload_table(setup_conn.get(), table_name, drop_error); + ok(dropped, "Dropped stress workload table `%s`", table_name.c_str()); + if (!dropped) { + diag("Table cleanup error: %s", drop_error.c_str()); + } + } + + if (setup_conn) { + setup_conn.reset(); + } + if (mcp) { + delete mcp; + } + if (admin) { + restore_mcp_runtime(admin); + mysql_close(admin); + } + + return exit_status(); +}