mcp stats: refresh stats tables under admin mutex before direct reads

Problem
MCP stats tools were querying stats.* tables directly through admindb.
Unlike admin-session SQL, that path bypassed GenericRefreshStatistics() and could
return stale or empty data for runtime-populated tables.

What changed
- Updated Stats_Tool_Handler::execute_admin_query() to mirror admin-session
  semantics for stats reads:
  - acquire GloAdmin->sql_query_global_mutex
  - optionally invoke ProxySQL_Admin::GenericRefreshStatistics(sql, ..., false)
  - execute admindb statement
  - release sql_query_global_mutex
- Added strict input validation and explicit lock/unlock error reporting.
- Added refresh_before_query parameter (default true) to avoid duplicate refresh
  passes for secondary count queries.
- Switched secondary COUNT(*) calls in show_processlist/show_queries/
  show_errors/show_query_rules to refresh_before_query=false.

Documentation
- Expanded Doxygen in include/Stats_Tool_Handler.h for execute_admin_query()
  to document locking, refresh behavior, and performance tradeoff.
- Added detailed Doxygen above the execute_admin_query() implementation in
  lib/Stats_Tool_Handler.cpp.

Testing
- Added TAP integration test test/tap/tests/mcp_stats_refresh-t.cpp.
- The test injects a synthetic stale marker row into stats.stats_mysql_global,
  calls /mcp/stats show_status, and verifies the marker disappears after
  refresh-before-read.
- Added helper-level Doxygen in the test for setup/parsing behavior.

Build verification
- Compiled lib object: make -C lib obj/Stats_Tool_Handler.oo
- Compiled TAP target: make -C test/tap/tests mcp_stats_refresh-t
- Direct runtime execution of the TAP test in this workspace could not fully run
  because no local admin listener was available (connection to 127.0.0.1:6032 failed).
pull/5398/head
Rene Cannao 3 months ago
parent 33fce1ca35
commit c126e63a23

@ -177,13 +177,32 @@ private:
// =========================================================================
/**
* @brief Execute a SQL query against GloAdmin->admindb
* @param sql The SQL query to execute
* @param resultset Output pointer for the result set (caller must delete)
* @param cols Output for number of columns
* @return Empty string on success, error message on failure
* @brief Execute a SQL query against `GloAdmin->admindb` with Admin-session semantics.
*
* This helper intentionally mirrors the critical part of the Admin SQL execution path
* used by `ProxySQL_Admin::admin_session_handler()`:
*
* 1. Acquire `GloAdmin->sql_query_global_mutex` to serialize access to the internal
* in-memory SQLite databases.
* 2. Optionally invoke `ProxySQL_Admin::GenericRefreshStatistics()` on the current SQL
* statement so `stats_*` tables are repopulated before they are read.
* 3. Execute the SQL statement via `admindb->execute_statement()`.
* 4. Release `sql_query_global_mutex`.
*
* Using this flow avoids stale reads from runtime-populated `stats.*` tables when tools
* access them directly through `admindb`.
*
* @param sql SQL statement to execute. Must not be `nullptr` or empty.
* @param resultset Output pointer receiving the result set on success. Caller owns it.
* @param cols Output pointer receiving the number of columns on success.
* @param refresh_before_query
* If true (default), call `GenericRefreshStatistics()` before executing @p sql.
* Set to false for secondary queries (for example `COUNT(*)` immediately after a
* primary read) to avoid redundant expensive refresh passes.
*
* @return Empty string on success, otherwise a human-readable error message.
*/
std::string execute_admin_query(const char* sql, SQLite3_result** resultset, int* cols);
std::string execute_admin_query(const char* sql, SQLite3_result** resultset, int* cols, bool refresh_before_query = true);
/**
* @brief Execute a SQL query against GloAdmin->statsdb_disk (historical data)

@ -129,18 +129,73 @@ void Stats_Tool_Handler::close() {
// Helper Methods
// ============================================================================
std::string Stats_Tool_Handler::execute_admin_query(const char* sql, SQLite3_result** resultset, int* cols) {
/**
* @brief Execute a statement against Admin in-memory DB under Admin global SQL mutex.
*
* This method is the temporary correctness bridge for MCP stats tools:
* the tools build SQL directly over `stats.*` tables, but most of these tables are
* refreshed on demand by Admin interception logic. To preserve data freshness when MCP
* bypasses Admin SQL parsing, this helper optionally triggers
* `ProxySQL_Admin::GenericRefreshStatistics()` before running the statement.
*
* The refresh and statement execution are performed while holding
* `GloAdmin->sql_query_global_mutex`, mirroring Admin session serialization.
*
* @param sql SQL statement to execute.
* @param resultset Output result set pointer. Set to NULL on failure.
* @param cols Output column count pointer. Set to 0 on failure.
* @param refresh_before_query If true, run GenericRefreshStatistics for @p sql before execution.
* @return Empty string on success, or descriptive error text on failure.
*/
std::string Stats_Tool_Handler::execute_admin_query(const char* sql, SQLite3_result** resultset, int* cols, bool refresh_before_query) {
if (!GloAdmin || !GloAdmin->admindb) {
return "ProxySQL Admin not available";
}
if (!resultset || !cols) {
return "Invalid output pointers for admin query execution";
}
if (!sql || sql[0] == '\0') {
*resultset = NULL;
*cols = 0;
return "Empty SQL query";
}
char* error = NULL;
int affected_rows = 0;
*resultset = NULL;
*cols = 0;
int lock_rc = pthread_mutex_lock(&GloAdmin->sql_query_global_mutex);
if (lock_rc != 0) {
return std::string("Failed to lock sql_query_global_mutex: ") + std::strerror(lock_rc);
}
if (refresh_before_query) {
GloAdmin->GenericRefreshStatistics(sql, static_cast<unsigned int>(strlen(sql)), false);
}
char* error = NULL;
int affected_rows = 0;
GloAdmin->admindb->execute_statement(sql, &error, cols, &affected_rows, resultset);
int unlock_rc = pthread_mutex_unlock(&GloAdmin->sql_query_global_mutex);
if (unlock_rc != 0) {
if (error) {
std::string err_msg(error);
free(error);
if (*resultset) {
delete *resultset;
*resultset = NULL;
}
return "Admin query error: " + err_msg +
"; also failed to unlock sql_query_global_mutex: " + std::string(std::strerror(unlock_rc));
}
if (*resultset) {
delete *resultset;
*resultset = NULL;
}
*cols = 0;
return std::string("Failed to unlock sql_query_global_mutex: ") + std::strerror(unlock_rc);
}
if (error) {
std::string err_msg(error);
free(error);
@ -1124,7 +1179,7 @@ json Stats_Tool_Handler::handle_show_processlist(const json& arguments) {
SQLite3_result* count_rs = NULL;
int count_cols = 0;
int total_sessions = 0;
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols);
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols, false);
if (!count_err.empty()) {
if (count_rs) {
delete count_rs;
@ -1260,7 +1315,7 @@ json Stats_Tool_Handler::handle_show_queries(const json& arguments) {
SQLite3_result* count_rs = NULL;
int count_cols = 0;
int total_digests = 0;
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols);
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols, false);
if (!count_err.empty()) {
if (count_rs) {
delete count_rs;
@ -1691,7 +1746,7 @@ json Stats_Tool_Handler::handle_show_errors(const json& arguments) {
int count_cols = 0;
int total_error_types = 0;
long long total_error_count = 0;
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols);
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols, false);
if (!count_err.empty()) {
if (count_rs) {
delete count_rs;
@ -1975,7 +2030,7 @@ json Stats_Tool_Handler::handle_show_query_rules(const json& arguments) {
SQLite3_result* count_rs = NULL;
int count_cols = 0;
int total_rules = 0;
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols);
std::string count_err = execute_admin_query(count_sql.c_str(), &count_rs, &count_cols, false);
if (!count_err.empty()) {
if (count_rs) {
delete count_rs;

@ -0,0 +1,248 @@
/**
* @file mcp_stats_refresh-t.cpp
* @brief TAP integration test for MCP stats refresh-on-read behavior.
*
* This test validates the temporary MCP stats correctness strategy implemented in
* `Stats_Tool_Handler::execute_admin_query()`:
*
* 1. MCP stats queries are serialized with `GloAdmin->sql_query_global_mutex`.
* 2. `ProxySQL_Admin::GenericRefreshStatistics()` is executed before reading
* runtime-populated `stats.*` tables.
*
* Test strategy:
* - Inject a synthetic marker row directly into `stats.stats_mysql_global`.
* - Query `show_status` over `/mcp/stats` for that marker.
* - Expect the marker to disappear because refresh repopulates the table from
* runtime state, dropping synthetic stale rows.
*
* A second `show_status` call validates normal data retrieval (`ProxySQL_Uptime`).
*/
#include <string>
#include <vector>
#include "mysql.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
#include "mcp_client.h"
using json = nlohmann::json;
namespace {
static const char* k_marker_name = "MCP_REFRESH_MARKER";
static const char* k_marker_value = "mcp_stale_value";
/**
* @brief Execute an admin SQL statement and report success/failure.
*
* @param admin Open admin connection.
* @param query SQL statement to execute.
* @param context Human-readable label used in diagnostics.
* @return true on success, false on failure.
*/
bool run_admin_stmt(MYSQL* admin, const std::string& query, const char* context) {
if (!admin) {
diag("%s: admin connection is null", context);
return false;
}
if (run_q(admin, query.c_str()) != 0) {
diag("%s failed: %s", context, mysql_error(admin));
return false;
}
return true;
}
/**
* @brief Configure MCP runtime variables required by this test.
*
* The test enables MCP endpoint handling and clears stats endpoint auth so the
* TAP client can call `/mcp/stats` without a token.
*
* @param admin Open admin connection.
* @param cl TAP command-line environment with target MCP port.
* @return true if all configuration statements succeeded.
*/
bool configure_mcp_stats_endpoint(MYSQL* admin, const CommandLine& cl) {
const std::vector<std::string> statements = {
"SET mcp-port=" + std::to_string(cl.mcp_port),
"SET mcp-enabled=true",
"SET mcp-stats_endpoint_auth=''",
"LOAD MCP VARIABLES TO RUNTIME"
};
for (const auto& stmt : statements) {
if (!run_admin_stmt(admin, stmt, "MCP stats config")) {
return false;
}
}
return true;
}
/**
* @brief Parse and validate the payload returned by MCP `show_status`.
*
* Expected payload shape:
* `{ "success": true, "result": { "variables": [...] } }`
*
* @param response MCP response object.
* @param variables Output JSON array of variables.
* @param error Output error text on failure.
* @return true when payload structure is valid and tool-level success is true.
*/
bool extract_show_status_variables(const MCPResponse& response, json& variables, std::string& error) {
if (!response.is_success()) {
error = response.get_error_message();
return false;
}
const json& payload = response.get_result();
if (!payload.is_object()) {
error = "show_status payload is not a JSON object";
return false;
}
if (!payload.value("success", false)) {
error = payload.value("error", std::string("show_status returned tool error"));
return false;
}
if (!payload.contains("result") || !payload["result"].is_object()) {
error = "show_status payload missing object field 'result'";
return false;
}
const json& result_obj = payload["result"];
if (!result_obj.contains("variables") || !result_obj["variables"].is_array()) {
error = "show_status payload missing array field 'result.variables'";
return false;
}
variables = result_obj["variables"];
return true;
}
} // namespace
int main(int argc, char** argv) {
(void)argc;
(void)argv;
plan(10);
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to read TAP environment");
return exit_status();
}
MYSQL* admin = nullptr;
MCPClient* mcp = nullptr;
bool can_continue = true;
admin = init_mysql_conn(cl.admin_host, cl.admin_port, cl.admin_username, cl.admin_password);
ok(admin != nullptr, "Admin connection established");
if (!admin) {
skip(9, "Cannot continue without admin connection");
can_continue = false;
}
bool configured = false;
if (can_continue) {
configured = configure_mcp_stats_endpoint(admin, cl);
ok(configured, "Configured and loaded MCP runtime variables for /mcp/stats");
if (!configured) {
skip(8, "Cannot continue without MCP runtime configuration");
can_continue = false;
}
}
if (can_continue) {
mcp = new MCPClient(cl.admin_host, cl.mcp_port);
if (strlen(cl.mcp_auth_token) > 0) {
mcp->set_auth_token(cl.mcp_auth_token);
}
}
bool mcp_reachable = false;
if (can_continue) {
mcp_reachable = mcp->check_server();
ok(mcp_reachable, "MCP server reachable at %s", mcp->get_connection_info().c_str());
if (!mcp_reachable) {
skip(7, "Cannot continue without MCP connectivity");
can_continue = false;
}
}
bool marker_deleted = false;
bool marker_inserted = false;
if (can_continue) {
// Inject synthetic stale row into stats global table.
marker_deleted = run_admin_stmt(
admin,
"DELETE FROM stats.stats_mysql_global WHERE Variable_Name='MCP_REFRESH_MARKER'",
"Delete stale marker row"
);
marker_inserted = run_admin_stmt(
admin,
"INSERT OR REPLACE INTO stats.stats_mysql_global (Variable_Name, Variable_Value) VALUES ('"
+ std::string(k_marker_name) + "', '" + std::string(k_marker_value) + "')",
"Insert stale marker row"
);
ok(marker_deleted && marker_inserted, "Injected synthetic stale marker into stats.stats_mysql_global");
if (!(marker_deleted && marker_inserted)) {
skip(6, "Cannot continue without marker row setup");
can_continue = false;
}
}
if (can_continue) {
const MCPResponse marker_resp = mcp->call_tool(
"stats",
"show_status",
json{{"db_type", "mysql"}, {"variable_name", k_marker_name}}
);
ok(marker_resp.is_success(), "MCP call stats.show_status(marker) transport/protocol success");
json marker_vars = json::array();
std::string marker_err;
const bool marker_payload_ok = extract_show_status_variables(marker_resp, marker_vars, marker_err);
ok(marker_payload_ok, "stats.show_status(marker) payload valid%s%s",
marker_payload_ok ? "" : ": ", marker_payload_ok ? "" : marker_err.c_str());
const size_t marker_row_count = marker_payload_ok ? marker_vars.size() : 0;
ok(marker_payload_ok && marker_row_count == 0,
"Marker row removed after refresh-before-read (variables=%zu)", marker_row_count);
const MCPResponse uptime_resp = mcp->call_tool(
"stats",
"show_status",
json{{"db_type", "mysql"}, {"variable_name", "ProxySQL_Uptime"}}
);
ok(uptime_resp.is_success(), "MCP call stats.show_status(ProxySQL_Uptime) transport/protocol success");
json uptime_vars = json::array();
std::string uptime_err;
const bool uptime_payload_ok = extract_show_status_variables(uptime_resp, uptime_vars, uptime_err);
ok(uptime_payload_ok, "stats.show_status(ProxySQL_Uptime) payload valid%s%s",
uptime_payload_ok ? "" : ": ", uptime_payload_ok ? "" : uptime_err.c_str());
ok(uptime_payload_ok && !uptime_vars.empty(),
"stats.show_status(ProxySQL_Uptime) returned at least one variable row");
}
if (admin) {
run_q(admin, "DELETE FROM stats.stats_mysql_global WHERE Variable_Name='MCP_REFRESH_MARKER'");
run_q(admin, "SET mcp-stats_endpoint_auth=''");
run_q(admin, "SET mcp-enabled=false");
run_q(admin, "LOAD MCP VARIABLES TO RUNTIME");
mysql_close(admin);
}
if (mcp) {
delete mcp;
}
return exit_status();
}
Loading…
Cancel
Save