feat: Add MCP query tool usage counters to stats schema

Add stats_mcp_query_tools_counters and stats_mcp_query_tools_counters_reset
tables to track MCP query tool usage statistics.

- Added get_tool_usage_stats_resultset() method to Query_Tool_Handler
- Defined table schemas in ProxySQL_Admin_Tables_Definitions.h
- Registered tables in Admin_Bootstrap.cpp
- Added pattern matching in ProxySQL_Admin.cpp
- Added stats___mcp_query_tools_counters() in ProxySQL_Admin_Stats.cpp
- Fixed friend declaration for track_tool_invocation()
- Fixed Discovery_Schema.cpp log_llm_search() to use prepare_v2/finalize
pull/5318/head
Rene Cannao 4 months ago
parent 393967f511
commit a816a756d4

@ -623,6 +623,20 @@ public:
int limit = 25
);
/**
* @brief Log an LLM search query
*
* @param run_id Run ID
* @param query Search query string
* @param limit Result limit
* @return 0 on success, -1 on error
*/
int log_llm_search(
int run_id,
const std::string& query,
int limit = 25
);
/**
* @brief Get database handle for direct access
* @return SQLite3DB pointer

@ -322,6 +322,9 @@
#define STATS_SQLITE_TABLE_PGSQL_QUERY_DIGEST_RESET "CREATE TABLE stats_pgsql_query_digest_reset (hostgroup INT , database VARCHAR NOT NULL , username VARCHAR NOT NULL , client_address VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , sum_rows_affected INTEGER NOT NULL , sum_rows_sent INTEGER NOT NULL , PRIMARY KEY(hostgroup, database, username, client_address, digest))"
#define STATS_SQLITE_TABLE_PGSQL_PREPARED_STATEMENTS_INFO "CREATE TABLE stats_pgsql_prepared_statements_info (global_stmt_id INT NOT NULL , database VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , ref_count_client INT NOT NULL , ref_count_server INT NOT NULL , num_param_types INT NOT NULL , query VARCHAR NOT NULL)"
#define STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS "CREATE TABLE stats_mcp_query_tools_counters (tool VARCHAR NOT NULL , schema VARCHAR NOT NULL , count INT NOT NULL , PRIMARY KEY (tool, schema))"
#define STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS_RESET "CREATE TABLE stats_mcp_query_tools_counters_reset (tool VARCHAR NOT NULL , schema VARCHAR NOT NULL , count INT NOT NULL , PRIMARY KEY (tool, schema))"
//#define STATS_SQLITE_TABLE_MEMORY_METRICS "CREATE TABLE stats_memory_metrics (Variable_Name VARCHAR NOT NULL PRIMARY KEY , Variable_Value VARCHAR NOT NULL)"

@ -51,6 +51,12 @@ private:
int timeout_ms;
bool allow_select_star;
// Tool usage counters: tool_name -> schema_name -> count
typedef std::map<std::string, unsigned long long> SchemaCountMap;
typedef std::map<std::string, SchemaCountMap> ToolUsageMap;
ToolUsageMap tool_usage_counters;
pthread_mutex_t counters_lock;
/**
* @brief Create tool list schema for a tool
*/
@ -91,6 +97,9 @@ private:
*/
bool is_dangerous_query(const std::string& query);
// Friend function for tracking tool invocations
friend void track_tool_invocation(Query_Tool_Handler*, const std::string&, const std::string&);
public:
/**
* @brief Constructor (creates catalog and harvester)
@ -126,6 +135,19 @@ public:
* @brief Get the static harvester
*/
Static_Harvester* get_harvester() const { return harvester; }
/**
* @brief Get tool usage statistics (thread-safe copy)
* @return ToolUsageMap copy with tool_name -> schema_name -> count
*/
ToolUsageMap get_tool_usage_stats();
/**
* @brief Get tool usage statistics as SQLite3_result* with optional reset
* @param reset If true, resets internal counters after capturing data
* @return SQLite3_result* with columns: tool, schema, count. Caller must delete.
*/
SQLite3_result* get_tool_usage_stats_resultset(bool reset = false);
};
#endif /* CLASS_QUERY_TOOL_HANDLER_H */

@ -698,6 +698,7 @@ class ProxySQL_Admin {
void stats___mysql_prepared_statements_info();
void stats___mysql_gtid_executed();
void stats___mysql_client_host_cache(bool reset);
void stats___mcp_query_tools_counters(bool reset);
// Update prometheus metrics
void p_stats___memory_metrics();

@ -878,6 +878,8 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) {
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_servers_clients_status", STATS_SQLITE_TABLE_PROXYSQL_SERVERS_CLIENTS_STATUS);
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_message_metrics", STATS_SQLITE_TABLE_PROXYSQL_MESSAGE_METRICS);
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_message_metrics_reset", STATS_SQLITE_TABLE_PROXYSQL_MESSAGE_METRICS_RESET);
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS);
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters_reset", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS_RESET);
// init ldap here
init_ldap();

@ -465,6 +465,21 @@ int Discovery_Schema::create_llm_tables() {
db->execute("CREATE INDEX IF NOT EXISTS idx_llm_notes_scope ON llm_notes(run_id, scope);");
// LLM search log table - tracks all searches performed
db->execute(
"CREATE TABLE IF NOT EXISTS llm_search_log ("
" log_id INTEGER PRIMARY KEY,"
" run_id INTEGER NOT NULL REFERENCES runs(run_id) ON DELETE CASCADE,"
" query TEXT NOT NULL,"
" limit INTEGER NOT NULL DEFAULT 25,"
" searched_at TEXT NOT NULL DEFAULT (datetime('now'))"
");"
);
db->execute("CREATE INDEX IF NOT EXISTS idx_llm_search_log_run ON llm_search_log(run_id);");
db->execute("CREATE INDEX IF NOT EXISTS idx_llm_search_log_query ON llm_search_log(query);");
db->execute("CREATE INDEX IF NOT EXISTS idx_llm_search_log_time ON llm_search_log(searched_at);");
return 0;
}
@ -1827,3 +1842,32 @@ std::string Discovery_Schema::fts_search_llm(
return results.dump();
}
int Discovery_Schema::log_llm_search(
int run_id,
const std::string& query,
int limit
) {
sqlite3_stmt* stmt = NULL;
const char* sql = "INSERT INTO llm_search_log(run_id, query, limit) VALUES(?1, ?2, ?3);";
int rc = db->prepare_v2(sql, &stmt);
if (rc != SQLITE_OK || !stmt) {
proxy_error("Failed to prepare llm_search_log insert: %d\n", rc);
return -1;
}
sqlite3_bind_int(stmt, 1, run_id);
sqlite3_bind_text(stmt, 2, query.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_int(stmt, 3, limit);
rc = sqlite3_step(stmt);
(*proxy_sqlite3_finalize)(stmt);
if (rc != SQLITE_DONE) {
proxy_error("Failed to insert llm_search_log: %d\n", rc);
return -1;
}
return 0;
}

@ -1153,6 +1153,8 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
bool stats_memory_metrics=false;
bool stats_mysql_commands_counters=false;
bool stats_pgsql_commands_counters = false;
bool stats_mcp_query_tools_counters = false;
bool stats_mcp_query_tools_counters_reset = false;
bool stats_mysql_query_rules=false;
bool stats_pgsql_query_rules = false;
bool stats_mysql_users=false;
@ -1342,6 +1344,10 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
{ stats_proxysql_message_metrics=true; refresh=true; }
if (strstr(query_no_space,"stats_proxysql_message_metrics_reset"))
{ stats_proxysql_message_metrics_reset=true; refresh=true; }
if (strstr(query_no_space,"stats_mcp_query_tools_counters"))
{ stats_mcp_query_tools_counters=true; refresh=true; }
if (strstr(query_no_space,"stats_mcp_query_tools_counters_reset"))
{ stats_mcp_query_tools_counters_reset=true; refresh=true; }
// temporary disabled because not implemented
/*
@ -1572,6 +1578,12 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
if (stats_pgsql_client_host_cache_reset) {
stats___pgsql_client_host_cache(true);
}
if (stats_mcp_query_tools_counters) {
stats___mcp_query_tools_counters(false);
}
if (stats_mcp_query_tools_counters_reset) {
stats___mcp_query_tools_counters(true);
}
if (admin) {
if (dump_global_variables) {

@ -18,6 +18,8 @@
#include "MySQL_Query_Processor.h"
#include "PgSQL_Query_Processor.h"
#include "MySQL_Logger.hpp"
#include "MCP_Thread.h"
#include "Query_Tool_Handler.h"
#define SAFE_SQLITE3_STEP(_stmt) do {\
do {\
@ -1582,6 +1584,37 @@ void ProxySQL_Admin::stats___proxysql_message_metrics(bool reset) {
delete resultset;
}
void ProxySQL_Admin::stats___mcp_query_tools_counters(bool reset) {
if (!GloMCPH) return;
Query_Tool_Handler* qth = GloMCPH->query_tool_handler;
if (!qth) return;
SQLite3_result* resultset = qth->get_tool_usage_stats_resultset(reset);
if (resultset == NULL) return;
statsdb->execute("BEGIN");
if (reset) {
statsdb->execute("DELETE FROM stats_mcp_query_tools_counters_reset");
} else {
statsdb->execute("DELETE FROM stats_mcp_query_tools_counters");
}
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin();
it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
char query[512];
snprintf(query, sizeof(query),
"INSERT INTO %smcp_query_tools_counters VALUES ('%s', '%s', %s)",
reset ? "stats_mcp_query_tools_counters_" : "stats_",
r->fields[0], r->fields[1], r->fields[2]);
statsdb->execute(query);
}
statsdb->execute("COMMIT");
delete resultset;
}
int ProxySQL_Admin::stats___save_mysql_query_digest_to_sqlite(
const bool reset, const bool copy, const SQLite3_result *resultset, const umap_query_digest *digest_umap,
const umap_query_digest_text *digest_text_umap

@ -110,6 +110,9 @@ Query_Tool_Handler::Query_Tool_Handler(
// Initialize pool mutex
pthread_mutex_init(&pool_lock, NULL);
// Initialize counters mutex
pthread_mutex_init(&counters_lock, NULL);
// Create discovery schema and harvester
catalog = new Discovery_Schema(catalog_path);
harvester = new Static_Harvester(
@ -135,6 +138,7 @@ Query_Tool_Handler::~Query_Tool_Handler() {
}
pthread_mutex_destroy(&pool_lock);
pthread_mutex_destroy(&counters_lock);
proxy_debug(PROXY_DEBUG_GENERIC, 3, "Query_Tool_Handler destroyed\n");
}
@ -644,6 +648,16 @@ json Query_Tool_Handler::get_tool_list() {
{{"limit", "integer"}}
));
// ============================================================
// STATISTICS TOOLS
// ============================================================
tools.push_back(create_tool_schema(
"stats.get_tool_usage",
"Get in-memory tool usage statistics grouped by tool name and schema.",
{},
{}
));
json result;
result["tools"] = tools;
return result;
@ -659,7 +673,62 @@ json Query_Tool_Handler::get_tool_description(const std::string& tool_name) {
return create_error_response("Tool not found: " + tool_name);
}
/**
* @brief Extract schema name from tool arguments
* Returns "(no schema)" for tools without schema context
*/
static std::string extract_schema_name(const std::string& tool_name, const json& arguments, Discovery_Schema* catalog) {
// Tools that use run_id (can be resolved to schema)
if (arguments.contains("run_id")) {
std::string run_id_str = json_string(arguments, "run_id");
int run_id = catalog->resolve_run_id(run_id_str);
if (run_id > 0) {
// Look up schema name from catalog
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::ostringstream sql;
sql << "SELECT schema_name FROM schemas WHERE run_id = " << run_id << " LIMIT 1;";
catalog->get_db()->execute_statement(sql.str().c_str(), &error, &cols, &affected, &resultset);
if (resultset && resultset->rows_count > 0) {
SQLite3_row* row = resultset->rows[0];
std::string schema = std::string(row->fields[0] ? row->fields[0] : "");
free(resultset);
return schema;
}
if (resultset) free(resultset);
}
return std::to_string(run_id);
}
// Tools that use schema_name directly
if (arguments.contains("schema_name")) {
return json_string(arguments, "schema_name");
}
// Tools without schema context
return "(no schema)";
}
/**
* @brief Track tool invocation (thread-safe)
*/
void track_tool_invocation(
Query_Tool_Handler* handler,
const std::string& tool_name,
const std::string& schema_name
) {
pthread_mutex_lock(&handler->counters_lock);
handler->tool_usage_counters[tool_name][schema_name]++;
pthread_mutex_unlock(&handler->counters_lock);
}
json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& arguments) {
// Track tool invocation
std::string schema = extract_schema_name(tool_name, arguments, catalog);
track_tool_invocation(this, tool_name, schema);
// ============================================================
// INVENTORY TOOLS
// ============================================================
@ -1357,6 +1426,9 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
return create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
}
// Log the search query
catalog->log_llm_search(run_id, query, limit);
std::string results = catalog->fts_search_llm(run_id, query, limit);
try {
return create_success_response(json::parse(results));
@ -1427,8 +1499,72 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
);
}
// ============================================================
// STATISTICS TOOLS
// ============================================================
if (tool_name == "stats.get_tool_usage") {
ToolUsageMap stats = get_tool_usage_stats();
json result = json::object();
for (ToolUsageMap::const_iterator it = stats.begin(); it != stats.end(); ++it) {
const std::string& tool_name = it->first;
const SchemaCountMap& schemas = it->second;
json schema_counts = json::object();
for (SchemaCountMap::const_iterator sit = schemas.begin(); sit != schemas.end(); ++sit) {
schema_counts[sit->first] = sit->second;
}
result[tool_name] = schema_counts;
}
return create_success_response(result);
}
// ============================================================
// FALLBACK - UNKNOWN TOOL
// ============================================================
return create_error_response("Unknown tool: " + tool_name);
}
Query_Tool_Handler::ToolUsageMap Query_Tool_Handler::get_tool_usage_stats() {
// Thread-safe copy of counters
pthread_mutex_lock(&counters_lock);
ToolUsageMap copy = tool_usage_counters;
pthread_mutex_unlock(&counters_lock);
return copy;
}
SQLite3_result* Query_Tool_Handler::get_tool_usage_stats_resultset(bool reset) {
SQLite3_result* result = new SQLite3_result(3);
result->add_column_definition(SQLITE_TEXT, "tool");
result->add_column_definition(SQLITE_TEXT, "schema");
result->add_column_definition(SQLITE_TEXT, "count");
pthread_mutex_lock(&counters_lock);
for (ToolUsageMap::const_iterator tool_it = tool_usage_counters.begin();
tool_it != tool_usage_counters.end(); ++tool_it) {
const std::string& tool_name = tool_it->first;
const SchemaCountMap& schemas = tool_it->second;
for (SchemaCountMap::const_iterator schema_it = schemas.begin();
schema_it != schemas.end(); ++schema_it) {
const std::string& schema_name = schema_it->first;
unsigned long long count = schema_it->second;
char** row = new char*[3];
row[0] = strdup(tool_name.c_str());
row[1] = strdup(schema_name.c_str());
char count_str[32];
snprintf(count_str, sizeof(count_str), "%llu", count);
row[2] = strdup(count_str);
result->add_row(row);
}
}
if (reset) {
tool_usage_counters.clear();
}
pthread_mutex_unlock(&counters_lock);
return result;
}

Loading…
Cancel
Save