MCP stats: add in-memory processlist filtering and PgSQL concurrency stress TAP

Replace direct stats-schema reads in MCP show_processlist with in-memory processlist access via SQL3_Processlist query options.

Add processlist query option types in proxysql_admin interfaces and implement filtering, sorting, and pagination post-processing in both MySQL and PgSQL thread implementations.

Introduce configurable MCP cap mcp_stats_show_processlist_max_rows (default 200, max 1000), wire it through MCP variable loading, and expose it in default config.

Expand Stats_Tool_Handler processlist handling to parse filter/sort arguments, enforce cap metadata, and return consistent MCP payload fields from in-memory snapshots.

Add extensive doxygen documentation in the touched headers and source blocks to describe API contracts, filtering behavior, and runtime constraints.

Add new TAP test mcp_pgsql_concurrency_stress-t that generates sustained concurrent PgSQL traffic (simple reads, read/write table workload, randomized pg_sleep) while polling MCP show_processlist and show_queries in parallel with filter and ordering assertions.

Validation performed locally: mcp_pgsql_concurrency_stress-t (31 assertions) and mcp_show_queries_topk-t (12 assertions).
v4.0-mcp-stats2
Rene Cannao 3 weeks ago
parent 70b61a86e3
commit e762f91cda

@ -83,6 +83,14 @@ public:
* hardcoded safety maximum in `Stats_Tool_Handler`.
*/
int mcp_stats_show_queries_max_rows;
/**
* @brief Runtime cap for `stats.show_processlist` returned rows.
*
* The handler applies this as an upper bound for caller-requested page
* size (`limit`). The configurable value is itself clamped by a
* hardcoded safety maximum in `Stats_Tool_Handler`.
*/
int mcp_stats_show_processlist_max_rows;
// MySQL Tool Handler configuration
char* mcp_mysql_hosts; ///< Comma-separated list of MySQL hosts
char* mcp_mysql_ports; ///< Comma-separated list of MySQL ports

@ -712,6 +712,17 @@ class MySQL_Threads_Handler
void start_listeners();
void stop_listeners();
void signal_all_threads(unsigned char _c=0);
/**
* @brief Build an in-memory processlist snapshot for MySQL sessions.
*
* The returned resultset always uses the canonical `stats_mysql_processlist`
* column layout. When `args.query_options.enabled=true`, the snapshot is
* post-processed in memory using typed filters, deterministic sorting, and
* pagination controls from `processlist_query_options_t`.
*
* @param args Processlist rendering options and optional query options.
* @return Newly allocated resultset owned by the caller.
*/
SQLite3_result * SQL3_Processlist(processlist_config_t args);
SQLite3_result * SQL3_GlobalStatus(bool _memory);
bool kill_session(uint32_t _thread_session_id);

@ -1515,7 +1515,10 @@ public:
/**
* @brief Retrieves a process list for all threads in the thread pool.
*
* @param args Processlist configuration of PgSQL.
* @param args
* Processlist rendering options and optional typed query controls.
* When `args.query_options.enabled=true`, filtering/sorting/pagination is
* applied in memory after the live snapshot is collected.
*
* @return A `SQLite3_result` object containing the process list, or `NULL` if an error
* occurred.

@ -58,7 +58,7 @@ private:
/**
* @brief Shows all currently active sessions
* @param arguments JSON with db_type, username, hostgroup, min_time_ms, limit, offset
* @param arguments JSON with db_type, filters, sort_by, sort_order, limit, offset
*/
json handle_show_processlist(const json& arguments);

@ -242,12 +242,62 @@ struct peer_pgsql_servers_v2_t {
peer_pgsql_servers_v2_t(SQLite3_result*, const pgsql_servers_v2_checksum_t&);
};
/**
* @brief Sort keys supported by in-memory processlist queries.
*
* These keys are intentionally limited to fields that are available for both
* MySQL and PgSQL processlist rows so callers can use a single contract.
*/
enum class processlist_sort_by_t {
none, ///< Keep producer order (no explicit sorting).
time_ms, ///< Sort by session runtime in milliseconds.
session_id, ///< Sort by ProxySQL session id.
username, ///< Sort by authenticated username.
hostgroup, ///< Sort by current hostgroup id.
command ///< Sort by command/status text.
};
/**
* @brief Typed filtering, ordering, and pagination options for processlist.
*
* The options are consumed by `MySQL_Threads_Handler::SQL3_Processlist()` and
* `PgSQL_Threads_Handler::SQL3_Processlist()` to query live in-memory session
* state without going through runtime-populated `stats_*_processlist` tables.
*
* All string filters are exact matches unless noted otherwise.
*/
struct processlist_query_options_t {
bool enabled {false}; ///< Enables query-options processing.
std::string username {}; ///< Optional exact username filter.
std::string database {}; ///< Optional exact schema/database filter.
int hostgroup {-1}; ///< Optional hostgroup filter (`-1` disables).
std::string command {}; ///< Optional exact command/status filter.
int min_time_ms {-1}; ///< Optional minimum runtime (`-1` disables).
bool has_session_id {false}; ///< Whether @ref session_id is active.
uint32_t session_id {0}; ///< Optional exact session identifier.
std::string match_info {}; ///< Optional substring filter on `info`.
bool info_case_sensitive {false}; ///< Case sensitivity mode for @ref match_info.
processlist_sort_by_t sort_by {processlist_sort_by_t::none}; ///< Optional primary sort key.
bool sort_desc {true}; ///< Sort direction for @ref sort_by.
bool disable_pagination {false}; ///< If true, ignore @ref limit and @ref offset.
uint32_t limit {0}; ///< Page size (`0` means return zero rows).
uint32_t offset {0}; ///< Number of rows to skip before page.
};
/**
* @brief Processlist extraction configuration.
*
* This structure combines legacy processlist rendering controls
* (`show_extended`, `max_query_length`) with optional query-time filters,
* ordering, and pagination in @ref query_options.
*/
struct processlist_config_t {
#ifdef IDLE_THREADS
bool show_idle_session;
#endif
int show_extended;
int max_query_length;
processlist_query_options_t query_options {};
};
class ProxySQL_Admin {

@ -34,6 +34,7 @@ static const char* mcp_thread_variables_names[] = {
"rag_endpoint_auth",
"timeout_ms",
"stats_show_queries_max_rows",
"stats_show_processlist_max_rows",
// MySQL Tool Handler configuration
"mysql_hosts",
"mysql_ports",
@ -62,6 +63,7 @@ MCP_Threads_Handler::MCP_Threads_Handler() {
variables.mcp_rag_endpoint_auth = strdup("");
variables.mcp_timeout_ms = 30000;
variables.mcp_stats_show_queries_max_rows = 200;
variables.mcp_stats_show_processlist_max_rows = 200;
// MySQL Tool Handler default values
variables.mcp_mysql_hosts = strdup("127.0.0.1");
variables.mcp_mysql_ports = strdup("3306");
@ -236,6 +238,10 @@ int MCP_Threads_Handler::get_variable(const char* name, char* val) {
sprintf(val, "%d", variables.mcp_stats_show_queries_max_rows);
return 0;
}
if (!strcmp(name, "stats_show_processlist_max_rows")) {
sprintf(val, "%d", variables.mcp_stats_show_processlist_max_rows);
return 0;
}
// MySQL Tool Handler configuration
if (!strcmp(name, "mysql_hosts")) {
sprintf(val, "%s", variables.mcp_mysql_hosts ? variables.mcp_mysql_hosts : "");
@ -357,6 +363,19 @@ int MCP_Threads_Handler::set_variable(const char* name, const char* value) {
}
return -1;
}
if (!strcmp(name, "stats_show_processlist_max_rows")) {
/**
* Hard safety cap: do not allow configuring values above 1000.
* This keeps MCP show_processlist bounded even if callers request
* large pages.
*/
int max_rows = atoi(value);
if (max_rows >= 1 && max_rows <= 1000) {
variables.mcp_stats_show_processlist_max_rows = max_rows;
return 0;
}
return -1;
}
// MySQL Tool Handler configuration
if (!strcmp(name, "mysql_hosts")) {
if (variables.mcp_mysql_hosts)

@ -5,6 +5,9 @@ using json = nlohmann::json;
//#define __CLASS_STANDARD_MYSQL_THREAD_H
#include <functional>
#include <algorithm>
#include <cerrno>
#include <cctype>
#include <vector>
#include "proxysql_utils.h"
@ -5277,6 +5280,268 @@ void MySQL_Threads_Handler::Get_Memory_Stats() {
}
}
namespace {
/**
* @brief Column indexes used by MySQL processlist filtering and sorting helpers.
*/
struct mysql_processlist_columns_t {
static constexpr int session_id = 1;
static constexpr int username = 2;
static constexpr int database = 3;
static constexpr int hostgroup = 6;
static constexpr int command = 11;
static constexpr int time_ms = 12;
static constexpr int info = 13;
};
/**
* @brief Safely return a row field or an empty string when missing.
*
* @param row Source processlist row.
* @param idx Field index in the processlist result.
* @return Pointer to the requested field or an empty-string literal.
*/
static const char* mysql_pl_field(const SQLite3_row* row, int idx) {
if (!row || idx < 0 || idx >= row->cnt || !row->fields[idx]) {
return "";
}
return row->fields[idx];
}
/**
* @brief Parse a processlist numeric field as unsigned integer.
*
* Invalid or empty values are normalized to zero so sorting and filtering remain
* deterministic for partial rows.
*
* @param value Text field containing an integer representation.
* @return Parsed unsigned value, or `0` on parse failure.
*/
static uint64_t mysql_pl_to_u64(const char* value) {
if (!value || !value[0]) {
return 0;
}
char* end = nullptr;
errno = 0;
unsigned long long parsed = strtoull(value, &end, 10);
if (end == value || *end != '\0' || errno != 0) {
return 0;
}
return static_cast<uint64_t>(parsed);
}
/**
* @brief Case-(in)sensitive substring matcher used by `match_info`.
*
* @param haystack Candidate text.
* @param needle Substring to search for.
* @param case_sensitive Whether matching should be case-sensitive.
* @return true when @p needle is found in @p haystack.
*/
static bool mysql_pl_contains(const std::string& haystack, const std::string& needle, bool case_sensitive) {
if (needle.empty()) {
return true;
}
if (case_sensitive) {
return haystack.find(needle) != std::string::npos;
}
auto it = std::search(
haystack.begin(),
haystack.end(),
needle.begin(),
needle.end(),
[](char lhs, char rhs) {
return std::tolower(static_cast<unsigned char>(lhs)) ==
std::tolower(static_cast<unsigned char>(rhs));
}
);
return it != haystack.end();
}
/**
* @brief Evaluate whether a MySQL processlist row matches caller filters.
*
* @param row Processlist row from `SQL3_Processlist`.
* @param opts Typed query options supplied by the caller.
* @return true when the row satisfies all active filters.
*/
static bool mysql_pl_row_matches(const SQLite3_row* row, const processlist_query_options_t& opts) {
if (!opts.username.empty() && opts.username != mysql_pl_field(row, mysql_processlist_columns_t::username)) {
return false;
}
if (!opts.database.empty() && opts.database != mysql_pl_field(row, mysql_processlist_columns_t::database)) {
return false;
}
if (opts.hostgroup >= 0) {
const int row_hostgroup = static_cast<int>(mysql_pl_to_u64(mysql_pl_field(row, mysql_processlist_columns_t::hostgroup)));
if (row_hostgroup != opts.hostgroup) {
return false;
}
}
if (!opts.command.empty() && opts.command != mysql_pl_field(row, mysql_processlist_columns_t::command)) {
return false;
}
if (opts.min_time_ms >= 0) {
const int row_time_ms = static_cast<int>(mysql_pl_to_u64(mysql_pl_field(row, mysql_processlist_columns_t::time_ms)));
if (row_time_ms < opts.min_time_ms) {
return false;
}
}
if (opts.has_session_id) {
const uint64_t row_session_id = mysql_pl_to_u64(mysql_pl_field(row, mysql_processlist_columns_t::session_id));
if (row_session_id != opts.session_id) {
return false;
}
}
if (!opts.match_info.empty()) {
const std::string info = mysql_pl_field(row, mysql_processlist_columns_t::info);
if (!mysql_pl_contains(info, opts.match_info, opts.info_case_sensitive)) {
return false;
}
}
return true;
}
/**
* @brief Compare two MySQL processlist rows according to typed sort options.
*
* The comparison applies a deterministic tie-breaker on `SessionID` so paging
* remains stable across repeated calls with identical data.
*
* @param lhs Left-hand row.
* @param rhs Right-hand row.
* @param opts Query options carrying sort key and direction.
* @return true when @p lhs should be ordered before @p rhs.
*/
static bool mysql_pl_row_less(const SQLite3_row* lhs, const SQLite3_row* rhs, const processlist_query_options_t& opts) {
const uint64_t lhs_session_id = mysql_pl_to_u64(mysql_pl_field(lhs, mysql_processlist_columns_t::session_id));
const uint64_t rhs_session_id = mysql_pl_to_u64(mysql_pl_field(rhs, mysql_processlist_columns_t::session_id));
auto string_compare = [&](int idx) -> int {
const std::string lhs_value = mysql_pl_field(lhs, idx);
const std::string rhs_value = mysql_pl_field(rhs, idx);
if (lhs_value < rhs_value) {
return -1;
}
if (lhs_value > rhs_value) {
return 1;
}
return 0;
};
auto numeric_compare = [&](int idx) -> int {
const uint64_t lhs_value = mysql_pl_to_u64(mysql_pl_field(lhs, idx));
const uint64_t rhs_value = mysql_pl_to_u64(mysql_pl_field(rhs, idx));
if (lhs_value < rhs_value) {
return -1;
}
if (lhs_value > rhs_value) {
return 1;
}
return 0;
};
int cmp = 0;
switch (opts.sort_by) {
case processlist_sort_by_t::time_ms:
cmp = numeric_compare(mysql_processlist_columns_t::time_ms);
break;
case processlist_sort_by_t::session_id:
cmp = numeric_compare(mysql_processlist_columns_t::session_id);
break;
case processlist_sort_by_t::username:
cmp = string_compare(mysql_processlist_columns_t::username);
break;
case processlist_sort_by_t::hostgroup:
cmp = numeric_compare(mysql_processlist_columns_t::hostgroup);
break;
case processlist_sort_by_t::command:
cmp = string_compare(mysql_processlist_columns_t::command);
break;
case processlist_sort_by_t::none:
default:
cmp = 0;
break;
}
if (cmp != 0) {
return opts.sort_desc ? (cmp > 0) : (cmp < 0);
}
return lhs_session_id < rhs_session_id;
}
/**
* @brief Apply typed filtering, sorting, and pagination to MySQL processlist rows.
*
* This post-processing stage is intentionally local to `SQL3_Processlist()` so
* the same API can serve:
* - legacy Admin stats refreshes (options disabled, full result)
* - MCP live queries (options enabled with filters and page controls)
*
* @param result Mutable resultset generated by `SQL3_Processlist()`.
* @param opts Query options controlling filtering/sorting/pagination.
*/
static void apply_mysql_processlist_query_options(SQLite3_result* result, const processlist_query_options_t& opts) {
if (!result || !opts.enabled) {
return;
}
std::vector<SQLite3_row*> filtered_rows;
filtered_rows.reserve(result->rows.size());
for (SQLite3_row* row : result->rows) {
if (mysql_pl_row_matches(row, opts)) {
filtered_rows.push_back(row);
} else {
delete row;
}
}
if (opts.sort_by != processlist_sort_by_t::none && filtered_rows.size() > 1) {
std::stable_sort(
filtered_rows.begin(),
filtered_rows.end(),
[&opts](const SQLite3_row* lhs, const SQLite3_row* rhs) {
return mysql_pl_row_less(lhs, rhs, opts);
}
);
}
size_t begin = std::min<size_t>(opts.offset, filtered_rows.size());
size_t end = begin;
if (opts.disable_pagination) {
begin = 0;
end = filtered_rows.size();
} else {
const uint64_t requested_end = static_cast<uint64_t>(begin) + static_cast<uint64_t>(opts.limit);
end = std::min<size_t>(filtered_rows.size(), static_cast<size_t>(requested_end));
}
std::vector<SQLite3_row*> paged_rows;
paged_rows.reserve(end > begin ? (end - begin) : 0);
for (size_t idx = 0; idx < filtered_rows.size(); ++idx) {
SQLite3_row* row = filtered_rows[idx];
if (idx >= begin && idx < end) {
paged_rows.push_back(row);
} else {
delete row;
}
}
result->rows.swap(paged_rows);
result->rows_count = static_cast<int>(result->rows.size());
}
} // namespace
SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist(processlist_config_t args) {
const int colnum=16;
char port[NI_MAXSERV];
@ -5558,6 +5823,14 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist(processlist_config_t ar
}
pthread_mutex_unlock(&thr->thread_mutex);
}
/**
* Apply optional in-memory query options used by MCP and other internal
* consumers. Legacy callers keep `query_options.enabled=false`, so their
* behavior remains unchanged and they still receive the full processlist.
*/
apply_mysql_processlist_query_options(result, args.query_options);
return result;
}

@ -5,6 +5,9 @@ using json = nlohmann::json;
//#define __CLASS_STANDARD_MYSQL_THREAD_H
#include <functional>
#include <algorithm>
#include <cerrno>
#include <cctype>
#include <vector>
#include "proxysql_utils.h"
@ -4647,6 +4650,268 @@ void PgSQL_Threads_Handler::Get_Memory_Stats() {
}
}
namespace {
/**
* @brief Column indexes used by PgSQL processlist filtering and sorting helpers.
*/
struct pgsql_processlist_columns_t {
static constexpr int session_id = 1;
static constexpr int username = 2;
static constexpr int database = 3;
static constexpr int hostgroup = 6;
static constexpr int command = 13;
static constexpr int time_ms = 14;
static constexpr int info = 15;
};
/**
* @brief Safely return a row field or an empty string when missing.
*
* @param row Source processlist row.
* @param idx Field index in the processlist result.
* @return Pointer to the requested field or an empty-string literal.
*/
static const char* pgsql_pl_field(const SQLite3_row* row, int idx) {
if (!row || idx < 0 || idx >= row->cnt || !row->fields[idx]) {
return "";
}
return row->fields[idx];
}
/**
* @brief Parse a processlist numeric field as unsigned integer.
*
* Invalid or empty values are normalized to zero so sorting and filtering remain
* deterministic for partial rows.
*
* @param value Text field containing an integer representation.
* @return Parsed unsigned value, or `0` on parse failure.
*/
static uint64_t pgsql_pl_to_u64(const char* value) {
if (!value || !value[0]) {
return 0;
}
char* end = nullptr;
errno = 0;
unsigned long long parsed = strtoull(value, &end, 10);
if (end == value || *end != '\0' || errno != 0) {
return 0;
}
return static_cast<uint64_t>(parsed);
}
/**
* @brief Case-(in)sensitive substring matcher used by `match_info`.
*
* @param haystack Candidate text.
* @param needle Substring to search for.
* @param case_sensitive Whether matching should be case-sensitive.
* @return true when @p needle is found in @p haystack.
*/
static bool pgsql_pl_contains(const std::string& haystack, const std::string& needle, bool case_sensitive) {
if (needle.empty()) {
return true;
}
if (case_sensitive) {
return haystack.find(needle) != std::string::npos;
}
auto it = std::search(
haystack.begin(),
haystack.end(),
needle.begin(),
needle.end(),
[](char lhs, char rhs) {
return std::tolower(static_cast<unsigned char>(lhs)) ==
std::tolower(static_cast<unsigned char>(rhs));
}
);
return it != haystack.end();
}
/**
* @brief Evaluate whether a PgSQL processlist row matches caller filters.
*
* @param row Processlist row from `SQL3_Processlist`.
* @param opts Typed query options supplied by the caller.
* @return true when the row satisfies all active filters.
*/
static bool pgsql_pl_row_matches(const SQLite3_row* row, const processlist_query_options_t& opts) {
if (!opts.username.empty() && opts.username != pgsql_pl_field(row, pgsql_processlist_columns_t::username)) {
return false;
}
if (!opts.database.empty() && opts.database != pgsql_pl_field(row, pgsql_processlist_columns_t::database)) {
return false;
}
if (opts.hostgroup >= 0) {
const int row_hostgroup = static_cast<int>(pgsql_pl_to_u64(pgsql_pl_field(row, pgsql_processlist_columns_t::hostgroup)));
if (row_hostgroup != opts.hostgroup) {
return false;
}
}
if (!opts.command.empty() && opts.command != pgsql_pl_field(row, pgsql_processlist_columns_t::command)) {
return false;
}
if (opts.min_time_ms >= 0) {
const int row_time_ms = static_cast<int>(pgsql_pl_to_u64(pgsql_pl_field(row, pgsql_processlist_columns_t::time_ms)));
if (row_time_ms < opts.min_time_ms) {
return false;
}
}
if (opts.has_session_id) {
const uint64_t row_session_id = pgsql_pl_to_u64(pgsql_pl_field(row, pgsql_processlist_columns_t::session_id));
if (row_session_id != opts.session_id) {
return false;
}
}
if (!opts.match_info.empty()) {
const std::string info = pgsql_pl_field(row, pgsql_processlist_columns_t::info);
if (!pgsql_pl_contains(info, opts.match_info, opts.info_case_sensitive)) {
return false;
}
}
return true;
}
/**
* @brief Compare two PgSQL processlist rows according to typed sort options.
*
* The comparison applies a deterministic tie-breaker on `SessionID` so paging
* remains stable across repeated calls with identical data.
*
* @param lhs Left-hand row.
* @param rhs Right-hand row.
* @param opts Query options carrying sort key and direction.
* @return true when @p lhs should be ordered before @p rhs.
*/
static bool pgsql_pl_row_less(const SQLite3_row* lhs, const SQLite3_row* rhs, const processlist_query_options_t& opts) {
const uint64_t lhs_session_id = pgsql_pl_to_u64(pgsql_pl_field(lhs, pgsql_processlist_columns_t::session_id));
const uint64_t rhs_session_id = pgsql_pl_to_u64(pgsql_pl_field(rhs, pgsql_processlist_columns_t::session_id));
auto string_compare = [&](int idx) -> int {
const std::string lhs_value = pgsql_pl_field(lhs, idx);
const std::string rhs_value = pgsql_pl_field(rhs, idx);
if (lhs_value < rhs_value) {
return -1;
}
if (lhs_value > rhs_value) {
return 1;
}
return 0;
};
auto numeric_compare = [&](int idx) -> int {
const uint64_t lhs_value = pgsql_pl_to_u64(pgsql_pl_field(lhs, idx));
const uint64_t rhs_value = pgsql_pl_to_u64(pgsql_pl_field(rhs, idx));
if (lhs_value < rhs_value) {
return -1;
}
if (lhs_value > rhs_value) {
return 1;
}
return 0;
};
int cmp = 0;
switch (opts.sort_by) {
case processlist_sort_by_t::time_ms:
cmp = numeric_compare(pgsql_processlist_columns_t::time_ms);
break;
case processlist_sort_by_t::session_id:
cmp = numeric_compare(pgsql_processlist_columns_t::session_id);
break;
case processlist_sort_by_t::username:
cmp = string_compare(pgsql_processlist_columns_t::username);
break;
case processlist_sort_by_t::hostgroup:
cmp = numeric_compare(pgsql_processlist_columns_t::hostgroup);
break;
case processlist_sort_by_t::command:
cmp = string_compare(pgsql_processlist_columns_t::command);
break;
case processlist_sort_by_t::none:
default:
cmp = 0;
break;
}
if (cmp != 0) {
return opts.sort_desc ? (cmp > 0) : (cmp < 0);
}
return lhs_session_id < rhs_session_id;
}
/**
* @brief Apply typed filtering, sorting, and pagination to PgSQL processlist rows.
*
* This post-processing stage is intentionally local to `SQL3_Processlist()` so
* the same API can serve:
* - legacy Admin stats refreshes (options disabled, full result)
* - MCP live queries (options enabled with filters and page controls)
*
* @param result Mutable resultset generated by `SQL3_Processlist()`.
* @param opts Query options controlling filtering/sorting/pagination.
*/
static void apply_pgsql_processlist_query_options(SQLite3_result* result, const processlist_query_options_t& opts) {
if (!result || !opts.enabled) {
return;
}
std::vector<SQLite3_row*> filtered_rows;
filtered_rows.reserve(result->rows.size());
for (SQLite3_row* row : result->rows) {
if (pgsql_pl_row_matches(row, opts)) {
filtered_rows.push_back(row);
} else {
delete row;
}
}
if (opts.sort_by != processlist_sort_by_t::none && filtered_rows.size() > 1) {
std::stable_sort(
filtered_rows.begin(),
filtered_rows.end(),
[&opts](const SQLite3_row* lhs, const SQLite3_row* rhs) {
return pgsql_pl_row_less(lhs, rhs, opts);
}
);
}
size_t begin = std::min<size_t>(opts.offset, filtered_rows.size());
size_t end = begin;
if (opts.disable_pagination) {
begin = 0;
end = filtered_rows.size();
} else {
const uint64_t requested_end = static_cast<uint64_t>(begin) + static_cast<uint64_t>(opts.limit);
end = std::min<size_t>(filtered_rows.size(), static_cast<size_t>(requested_end));
}
std::vector<SQLite3_row*> paged_rows;
paged_rows.reserve(end > begin ? (end - begin) : 0);
for (size_t idx = 0; idx < filtered_rows.size(); ++idx) {
SQLite3_row* row = filtered_rows[idx];
if (idx >= begin && idx < end) {
paged_rows.push_back(row);
} else {
delete row;
}
}
result->rows.swap(paged_rows);
result->rows_count = static_cast<int>(result->rows.size());
}
} // namespace
SQLite3_result* PgSQL_Threads_Handler::SQL3_Processlist(processlist_config_t args) {
const int colnum = 18;
char port[NI_MAXSERV];
@ -4912,6 +5177,14 @@ SQLite3_result* PgSQL_Threads_Handler::SQL3_Processlist(processlist_config_t arg
}
pthread_mutex_unlock(&thr->thread_mutex);
}
/**
* Apply optional in-memory query options used by MCP and other internal
* consumers. Legacy callers keep `query_options.enabled=false`, so their
* behavior remains unchanged and they still receive the full processlist.
*/
apply_pgsql_processlist_query_options(result, args.query_options);
return result;
}

@ -5,6 +5,7 @@
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <limits>
#include "../deps/json/json.hpp"
using json = nlohmann::json;
@ -20,6 +21,8 @@ using json = nlohmann::json;
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Logger *GloMyLogger;
extern MySQL_Threads_Handler *GloMTH;
extern PgSQL_Threads_Handler *GloPTH;
// Latency bucket thresholds in microseconds for commands_counters histogram
static const std::vector<int> LATENCY_BUCKET_THRESHOLDS = {
@ -72,6 +75,14 @@ static const std::map<std::string, std::vector<std::string>> CATEGORY_PREFIXES =
*/
static constexpr uint32_t SHOW_QUERIES_MAX_LIMIT_HARDCODED = 1000;
/**
* Hard upper bound for configurable `mcp_stats_show_processlist_max_rows`.
*
* The runtime MCP variable can reduce this value, but cannot exceed it.
* It protects the process from unbounded processlist page windows.
*/
static constexpr uint32_t SHOW_PROCESSLIST_MAX_LIMIT_HARDCODED = 1000;
/**
* @brief Parse and validate a backend filter in `host:port` format.
*
@ -412,14 +423,47 @@ json Stats_Tool_Handler::get_tool_list() {
{"type", "string"},
{"description", "Filter by username"}
}},
{"database", {
{"type", "string"},
{"description", "Filter by schema/database name"}
}},
{"hostgroup", {
{"type", "integer"},
{"description", "Filter by hostgroup ID"}
}},
{"command", {
{"type", "string"},
{"description", "Filter by command/status (for example Query, Sleep, Connect)"}
}},
{"session_id", {
{"type", "integer"},
{"description", "Filter by ProxySQL SessionID"}
}},
{"min_time_ms", {
{"type", "integer"},
{"description", "Only show sessions running longer than N milliseconds"}
}},
{"match_info", {
{"type", "string"},
{"description", "Substring filter on current query text/info"}
}},
{"info_case_sensitive", {
{"type", "boolean"},
{"description", "Case-sensitive matching for match_info (default: false)"},
{"default", false}
}},
{"sort_by", {
{"type", "string"},
{"enum", {"time_ms", "session_id", "username", "hostgroup", "command"}},
{"description", "Sort key (default: time_ms)"},
{"default", "time_ms"}
}},
{"sort_order", {
{"type", "string"},
{"enum", {"asc", "desc"}},
{"description", "Sort direction (default: desc)"},
{"default", "desc"}
}},
{"limit", {
{"type", "integer"},
{"description", "Maximum number of sessions to return (default: 100)"},
@ -1185,108 +1229,223 @@ json Stats_Tool_Handler::handle_show_status(const json& arguments) {
/**
* @brief Shows all currently active sessions being processed by ProxySQL
*
* Returns detailed information about each active session including client/backend
* connection details, current command, execution time, and query info. Includes
* summary statistics grouped by user, hostgroup, and command type.
* Reads live in-memory processlist state via `SQL3_Processlist()` and applies
* typed filters/sort/pagination through `processlist_query_options_t`.
*
* This avoids stale reads from runtime-populated `stats_*_processlist` tables.
*
* @param arguments JSON object with optional parameters:
* - db_type: "mysql" (default) or "pgsql"
* - username: Filter by username
* - database: Filter by schema/database
* - hostgroup: Filter by hostgroup ID
* - command: Filter by command/status text
* - session_id: Filter by SessionID
* - min_time_ms: Only show sessions running longer than N milliseconds
* - match_info: Optional substring filter on query/info text
* - info_case_sensitive: Optional case-sensitive toggle for match_info
* - sort_by: "time_ms" (default), "session_id", "username", "hostgroup", "command"
* - sort_order: "desc" (default) or "asc"
* - limit: Maximum number of sessions to return (default: 100)
* - offset: Skip first N results (default: 0)
*
* @return JSON response with sessions array and summary statistics
* @return JSON response with filtered session rows and summary buckets.
*/
json Stats_Tool_Handler::handle_show_processlist(const json& arguments) {
std::string db_type = arguments.value("db_type", "mysql");
std::string username = arguments.value("username", "");
std::string database = arguments.value("database", "");
int hostgroup = arguments.value("hostgroup", -1);
std::string command = arguments.value("command", "");
long long session_id = arguments.value("session_id", -1LL);
int min_time_ms = arguments.value("min_time_ms", -1);
std::string match_info = arguments.value("match_info", "");
bool info_case_sensitive = arguments.value("info_case_sensitive", false);
std::string sort_by = arguments.value("sort_by", "time_ms");
std::string sort_order = arguments.value("sort_order", "desc");
int limit = arguments.value("limit", 100);
int offset = arguments.value("offset", 0);
std::string table = (db_type == "pgsql") ? "stats_pgsql_processlist" : "stats_mysql_processlist";
std::string db_col = (db_type == "pgsql") ? "database" : "db";
std::string sql = "SELECT ThreadID, SessionID, user, " + db_col + ", cli_host, cli_port, "
"hostgroup, srv_host, srv_port, command, time_ms, info "
"FROM stats." + table + " WHERE 1=1";
if (!username.empty()) {
sql += " AND user = '" + sql_escape(username) + "'";
if (limit < 0) {
return create_error_response("limit must be >= 0");
}
if (hostgroup >= 0) {
sql += " AND hostgroup = " + std::to_string(hostgroup);
if (offset < 0) {
return create_error_response("offset must be >= 0");
}
if (hostgroup < -1) {
return create_error_response("hostgroup must be >= -1");
}
if (min_time_ms < -1) {
return create_error_response("min_time_ms must be >= -1");
}
if (session_id < -1) {
return create_error_response("session_id must be >= -1");
}
if (min_time_ms >= 0) {
sql += " AND time_ms >= " + std::to_string(min_time_ms);
if (session_id > static_cast<long long>(std::numeric_limits<uint32_t>::max())) {
return create_error_response("session_id is too large");
}
sql += " ORDER BY time_ms DESC LIMIT " + std::to_string(limit) + " OFFSET " + std::to_string(offset);
processlist_sort_by_t sort_mode = processlist_sort_by_t::time_ms;
if (sort_by == "session_id") {
sort_mode = processlist_sort_by_t::session_id;
} else if (sort_by == "username") {
sort_mode = processlist_sort_by_t::username;
} else if (sort_by == "hostgroup") {
sort_mode = processlist_sort_by_t::hostgroup;
} else if (sort_by == "command") {
sort_mode = processlist_sort_by_t::command;
} else if (sort_by != "time_ms") {
return create_error_response("Invalid sort_by: " + sort_by);
}
SQLite3_result* resultset = NULL;
int cols = 0;
std::string err = execute_admin_query(sql.c_str(), &resultset, &cols);
bool sort_desc = true;
if (sort_order == "asc") {
sort_desc = false;
} else if (sort_order != "desc") {
return create_error_response("Invalid sort_order: " + sort_order);
}
if (!err.empty()) {
return create_error_response("Failed to query processlist: " + err);
if (!GloAdmin) {
return create_error_response("ProxySQL Admin not available");
}
json sessions = json::array();
std::map<std::string, int> by_user, by_hostgroup, by_command;
const bool is_pgsql = (db_type == "pgsql");
if (!is_pgsql && db_type != "mysql") {
return create_error_response("Invalid db_type: " + db_type);
}
if (is_pgsql && !GloPTH) {
return create_error_response("PgSQL threads handler not available");
}
if (!is_pgsql && !GloMTH) {
return create_error_response("MySQL threads handler not available");
}
// Get total count
std::string count_sql = "SELECT COUNT(*) FROM stats." + table;
SQLite3_result* count_rs = NULL;
int count_cols = 0;
int total_sessions = 0;
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols, false);
if (!count_err.empty()) {
if (count_rs) {
delete count_rs;
uint32_t configured_cap = 200;
if (mcp_handler) {
const int configured_value = mcp_handler->variables.mcp_stats_show_processlist_max_rows;
if (configured_value > 0) {
configured_cap = static_cast<uint32_t>(configured_value);
}
proxy_error("show_processlist: failed to count rows: %s\n", count_err.c_str());
return create_error_response("Failed to count processlist rows: " + count_err);
}
if (count_rs && count_rs->rows_count > 0 && count_rs->rows[0]->fields[0]) {
total_sessions = std::stoi(count_rs->rows[0]->fields[0]);
if (configured_cap > SHOW_PROCESSLIST_MAX_LIMIT_HARDCODED) {
configured_cap = SHOW_PROCESSLIST_MAX_LIMIT_HARDCODED;
}
if (count_rs) delete count_rs;
if (resultset) {
for (const auto& row : resultset->rows) {
json session;
session["session_id"] = row->fields[1] ? std::stoll(row->fields[1]) : 0;
session["thread_id"] = row->fields[0] ? std::stoi(row->fields[0]) : 0;
session["user"] = row->fields[2] ? row->fields[2] : "";
session["database"] = row->fields[3] ? row->fields[3] : "";
session["client_host"] = row->fields[4] ? row->fields[4] : "";
session["client_port"] = row->fields[5] ? std::stoi(row->fields[5]) : 0;
session["hostgroup"] = row->fields[6] ? std::stoi(row->fields[6]) : 0;
session["backend_host"] = row->fields[7] ? row->fields[7] : "";
session["backend_port"] = row->fields[8] ? std::stoi(row->fields[8]) : 0;
session["command"] = row->fields[9] ? row->fields[9] : "";
session["time_ms"] = row->fields[10] ? std::stoi(row->fields[10]) : 0;
session["info"] = row->fields[11] ? row->fields[11] : "";
sessions.push_back(session);
// Aggregate summaries
std::string u = row->fields[2] ? row->fields[2] : "unknown";
std::string hg = row->fields[6] ? row->fields[6] : "unknown";
std::string cmd = row->fields[9] ? row->fields[9] : "unknown";
by_user[u]++;
by_hostgroup[hg]++;
by_command[cmd]++;
const uint32_t requested_limit = static_cast<uint32_t>(limit);
const uint32_t requested_offset = static_cast<uint32_t>(offset);
const uint32_t effective_limit = std::min(requested_limit, configured_cap);
const uint32_t capped_offset = std::min(requested_offset, configured_cap);
processlist_query_options_t query_opts {};
query_opts.enabled = true;
query_opts.username = username;
query_opts.database = database;
query_opts.hostgroup = hostgroup;
query_opts.command = command;
query_opts.min_time_ms = min_time_ms;
query_opts.has_session_id = (session_id >= 0);
query_opts.session_id = (session_id >= 0) ? static_cast<uint32_t>(session_id) : 0;
query_opts.match_info = match_info;
query_opts.info_case_sensitive = info_case_sensitive;
query_opts.sort_by = sort_mode;
query_opts.sort_desc = sort_desc;
query_opts.limit = effective_limit;
query_opts.offset = capped_offset;
processlist_config_t base_cfg {};
#ifdef IDLE_THREADS
base_cfg.show_idle_session = is_pgsql
? GloPTH->variables.session_idle_show_processlist
: GloMTH->variables.session_idle_show_processlist;
#endif
base_cfg.show_extended = is_pgsql
? GloPTH->variables.show_processlist_extended
: GloMTH->variables.show_processlist_extended;
base_cfg.max_query_length = is_pgsql
? GloPTH->variables.processlist_max_query_length
: GloMTH->variables.processlist_max_query_length;
base_cfg.query_options = query_opts;
/**
* Compute the full matched cardinality before pagination so the MCP payload
* can expose deterministic metadata (`total_sessions`) regardless of page.
*/
processlist_config_t count_cfg = base_cfg;
count_cfg.query_options.sort_by = processlist_sort_by_t::none;
count_cfg.query_options.disable_pagination = true;
SQLite3_result* count_rs = is_pgsql ? GloPTH->SQL3_Processlist(count_cfg) : GloMTH->SQL3_Processlist(count_cfg);
if (!count_rs) {
return create_error_response("Failed to read in-memory processlist for total count");
}
const int total_sessions = count_rs->rows_count;
delete count_rs;
SQLite3_result* resultset = is_pgsql ? GloPTH->SQL3_Processlist(base_cfg) : GloMTH->SQL3_Processlist(base_cfg);
if (!resultset) {
return create_error_response("Failed to read in-memory processlist rows");
}
auto to_int64 = [](const char* value) -> int64_t {
if (!value || !value[0]) {
return 0;
}
char* end = nullptr;
errno = 0;
long long parsed = strtoll(value, &end, 10);
if (end == value || *end != '\0' || errno != 0) {
return 0;
}
delete resultset;
}
return static_cast<int64_t>(parsed);
};
json sessions = json::array();
std::map<std::string, int> by_user, by_hostgroup, by_command;
const int command_idx = is_pgsql ? 13 : 11;
const int time_ms_idx = is_pgsql ? 14 : 12;
const int info_idx = is_pgsql ? 15 : 13;
for (const auto& row : resultset->rows) {
json session;
session["session_id"] = static_cast<uint64_t>(to_int64(row->fields[1]));
session["thread_id"] = static_cast<int>(to_int64(row->fields[0]));
session["user"] = row->fields[2] ? row->fields[2] : "";
session["database"] = row->fields[3] ? row->fields[3] : "";
session["client_host"] = row->fields[4] ? row->fields[4] : "";
session["client_port"] = static_cast<int>(to_int64(row->fields[5]));
session["hostgroup"] = static_cast<int>(to_int64(row->fields[6]));
session["local_backend_host"] = row->fields[7] ? row->fields[7] : "";
session["local_backend_port"] = static_cast<int>(to_int64(row->fields[8]));
session["backend_host"] = row->fields[9] ? row->fields[9] : "";
session["backend_port"] = static_cast<int>(to_int64(row->fields[10]));
if (is_pgsql) {
session["backend_pid"] = static_cast<int>(to_int64(row->fields[11]));
session["backend_state"] = row->fields[12] ? row->fields[12] : "";
}
session["command"] = row->fields[command_idx] ? row->fields[command_idx] : "";
session["time_ms"] = static_cast<int>(to_int64(row->fields[time_ms_idx]));
session["info"] = row->fields[info_idx] ? row->fields[info_idx] : "";
sessions.push_back(session);
const std::string summary_user = row->fields[2] ? row->fields[2] : "unknown";
const std::string summary_hg = row->fields[6] ? row->fields[6] : "unknown";
const std::string summary_cmd = row->fields[command_idx] ? row->fields[command_idx] : "unknown";
by_user[summary_user]++;
by_hostgroup[summary_hg]++;
by_command[summary_cmd]++;
}
delete resultset;
json result;
result["db_type"] = db_type;
result["total_sessions"] = total_sessions;
result["sessions"] = sessions;
result["requested_limit"] = requested_limit;
result["requested_offset"] = requested_offset;
result["effective_limit"] = effective_limit;
result["limit_cap"] = configured_cap;
result["sort_by"] = sort_by;
result["sort_order"] = sort_desc ? "desc" : "asc";
result["summary"] = {
{"by_user", by_user},
{"by_hostgroup", by_hostgroup},

@ -71,6 +71,7 @@ mcp_variables=
mcp_rag_endpoint_auth=""
mcp_timeout_ms=30000
mcp_stats_show_queries_max_rows=200
mcp_stats_show_processlist_max_rows=200
}
# GenAI module configuration

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save