mcp discovery: enforce target-scoped run model and add protocol-aware static harvesting (mysql+pgsql)

Refactor the MCP query/discovery stack to remove prototype-era MySQL-only assumptions and make discovery/catalog semantics explicitly target-scoped. This commit is intentionally not backward compatible with legacy single-target catalog metadata.

Key implementation details:
- Discovery_Schema:
  - runs table now stores target_id, protocol, and server_version
  - resolve_run_id() now requires target_id and resolves schemas within target scope
  - create_run() now records target/protocol/server version
  - added legacy schema detection + destructive catalog table rebuild when old runs layout is found
  - fixed resultset lifetime/escaping issues in target-aware run resolution
- New PostgreSQL static harvester:
  - added PgSQL_Static_Harvester class and build wiring
  - harvests schemas/objects/columns/indexes/fks/view definitions into Discovery_Schema
  - maps pg metadata to object_id-based insert APIs used by Discovery_Schema
- Query_Tool_Handler:
  - constructor simplified to catalog_path only (no mcp-mysql_* runtime ctor deps)
  - discovery.run_static now requires target_id and dispatches protocol-aware harvester at runtime
  - catalog.*, agent.run_start, and llm.* tool contracts now require target_id when resolving run_id
  - all run_id resolution call sites switched to resolve_run_id(target_id, ...)
  - list_schemas catalog query now scoped by target_id via runs join
  - digest logging path now resolves run_id in target context
- ProxySQL_MCP_Server:
  - updated Query_Tool_Handler construction to new signature
- Docs updated:
  - added explicit target_id scoping guidance for discovery/catalog/agent/llm workflows
  - clarified protocol-aware routing and noted legacy examples still present in large script docs

Build validation:
- Recompiled changed objects with PROXYSQLGENAI=1:
  - Query_Tool_Handler.oo
  - Discovery_Schema.oo
  - PgSQL_Static_Harvester.oo
  - ProxySQL_MCP_Server.oo
v3.0-MCP_multi
Rene Cannao 5 days ago
parent 7e54883ba5
commit 9685cdaa4b

@ -207,6 +207,17 @@ The agents use these MCP tools for database analysis:
- `catalog_upsert` - Store findings in catalog
- `catalog_list` / `catalog_get` - Retrieve findings from catalog
### Target Scoping Requirement
Discovery and catalog/LLM tools are target-scoped. Always pass `target_id`:
- `discovery.run_static(target_id=..., schema_filter=...)`
- `catalog.*(target_id=..., run_id=...)`
- `agent.run_start(target_id=..., run_id=...)`
- `llm.*(target_id=..., run_id=...)`
`run_id` resolution is no longer global. The same schema name can exist on multiple targets, so `target_id` is required to resolve the correct discovery run.
## Benefits of Multi-Agent Approach
1. **Parallel Execution**: All 4 agents run simultaneously

@ -206,27 +206,32 @@ public:
void close();
/**
* @brief Resolve schema name or run_id to a run_id
* @brief Resolve schema name or run_id to a run_id within a target scope
*
* If input is a numeric run_id, returns it as-is.
* If input is a schema name, finds the latest run_id for that schema.
*
* @param target_id Required target scope identifier
* @param run_id_or_schema Either a numeric run_id or a schema name
* @return run_id on success, -1 if schema not found
*/
int resolve_run_id(const std::string& run_id_or_schema);
int resolve_run_id(const std::string& target_id, const std::string& run_id_or_schema);
/**
* @brief Create a new discovery run
*
* @param target_id Logical target identifier that produced this run
* @param protocol Backend protocol for this run (mysql|pgsql)
* @param source_dsn Data source identifier (e.g., "mysql://host:port/")
* @param mysql_version MySQL server version
* @param server_version Backend server version string
* @param notes Optional notes for this run
* @return run_id on success, -1 on error
*/
int create_run(
const std::string& target_id,
const std::string& protocol,
const std::string& source_dsn,
const std::string& mysql_version,
const std::string& server_version,
const std::string& notes = ""
);

@ -0,0 +1,70 @@
#ifndef CLASS_PGSQL_STATIC_HARVESTER_H
#define CLASS_PGSQL_STATIC_HARVESTER_H
#ifdef PROXYSQLGENAI
#include "Discovery_Schema.h"
#include <string>
#include <vector>
#include <memory>
#include <pthread.h>
typedef struct pg_conn PGconn;
class PgSQL_Static_Harvester {
private:
std::string pgsql_host;
int pgsql_port;
std::string pgsql_user;
std::string pgsql_password;
std::string pgsql_dbname;
PGconn* pgsql_conn;
pthread_mutex_t conn_lock;
Discovery_Schema* catalog;
int current_run_id;
std::string source_dsn;
std::string server_version;
int connect_pgsql();
void disconnect_pgsql();
int execute_query(const std::string& query, std::vector<std::vector<std::string>>& results);
std::string get_pgsql_version();
static bool is_valid_schema_name(const std::string& name);
static std::string escape_sql_string(const std::string& str);
public:
PgSQL_Static_Harvester(
const std::string& host,
int port,
const std::string& user,
const std::string& password,
const std::string& dbname,
const std::string& catalog_path
);
~PgSQL_Static_Harvester();
int init();
void close();
int start_run(const std::string& target_id, const std::string& notes = "");
int finish_run(const std::string& notes = "");
int get_run_id() const { return current_run_id; }
int harvest_schemas(const std::string& only_schema = "");
int harvest_objects(const std::string& only_schema = "");
int harvest_columns(const std::string& only_schema = "");
int harvest_indexes(const std::string& only_schema = "");
int harvest_foreign_keys(const std::string& only_schema = "");
int harvest_view_definitions(const std::string& only_schema = "");
int build_quick_profiles();
int rebuild_fts_index();
int run_full_harvest(const std::string& target_id, const std::string& only_schema = "", const std::string& notes = "");
std::string get_harvest_stats();
std::string get_harvest_stats(int run_id);
};
#endif /* PROXYSQLGENAI */
#endif /* CLASS_PGSQL_STATIC_HARVESTER_H */

@ -9,6 +9,7 @@
// Forward declaration to avoid circular include
class Static_Harvester;
class PgSQL_Static_Harvester;
/**
* @brief Query Tool Handler for /mcp/query endpoint
@ -43,17 +44,12 @@ private:
bool executable; ///< True if current handler can execute against this target
};
// MySQL connection configuration
std::string mysql_hosts;
std::string mysql_ports;
std::string mysql_user;
std::string mysql_password;
std::string mysql_schema;
std::string default_target_id;
// Discovery components (NEW - replaces MySQL_Tool_Handler wrapper)
Discovery_Schema* catalog; ///< Discovery catalog (replaces old MySQL_Catalog)
Static_Harvester* harvester; ///< Static harvester for Phase 1
Static_Harvester* mysql_harvester; ///< MySQL static harvester for Phase 1
PgSQL_Static_Harvester* pgsql_harvester; ///< PostgreSQL static harvester for Phase 1
// Connection pool for MySQL queries
struct MySQLConnection {
@ -218,11 +214,6 @@ public:
* @brief Constructor (creates catalog and harvester)
*/
Query_Tool_Handler(
const std::string& hosts,
const std::string& ports,
const std::string& user,
const std::string& password,
const std::string& schema,
const std::string& catalog_path
);
@ -247,7 +238,7 @@ public:
/**
* @brief Get the static harvester
*/
Static_Harvester* get_harvester() const { return harvester; }
Static_Harvester* get_harvester() const { return mysql_harvester; }
/**
* @brief Get tool usage statistics (thread-safe copy)

@ -155,7 +155,7 @@ public:
* @param notes Optional notes for this run
* @return run_id on success, -1 on error
*/
int start_run(const std::string& notes = "");
int start_run(const std::string& target_id, const std::string& notes = "");
/**
* @brief Finish the current discovery run
@ -281,7 +281,7 @@ public:
* @param notes Optional run notes
* @return run_id on success, -1 on error
*/
int run_full_harvest(const std::string& only_schema = "", const std::string& notes = "");
int run_full_harvest(const std::string& target_id, const std::string& only_schema = "", const std::string& notes = "");
/**
* @brief Get harvest statistics

@ -51,6 +51,7 @@
#include "ProxySQL_MCP_Server.hpp"
#include "Query_Tool_Handler.h"
#include "RAG_Tool_Handler.h"
#include "PgSQL_Static_Harvester.h"
#include "Static_Harvester.h"
#endif /* PROXYSQLGENAI */

@ -84,21 +84,53 @@ void Discovery_Schema::close() {
}
}
int Discovery_Schema::resolve_run_id(const std::string& run_id_or_schema) {
int Discovery_Schema::resolve_run_id(const std::string& target_id, const std::string& run_id_or_schema) {
if (target_id.empty()) {
proxy_warning("resolve_run_id called without target_id\n");
return -1;
}
// If it's already a number (run_id), return it
if (!run_id_or_schema.empty() && std::isdigit(run_id_or_schema[0])) {
return std::stoi(run_id_or_schema);
int run_id = std::stoi(run_id_or_schema);
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::string esc_target_id = target_id;
replace_str(esc_target_id, "'", "''");
std::ostringstream verify_sql;
verify_sql << "SELECT 1 FROM runs WHERE run_id=" << run_id
<< " AND target_id='" << esc_target_id << "' LIMIT 1;";
db->execute_statement(verify_sql.str().c_str(), &error, &cols, &affected, &resultset);
if (error) {
proxy_error("Failed to validate run_id '%d' for target '%s': %s\n", run_id, target_id.c_str(), error);
free(error);
if (resultset) {
delete resultset;
}
return -1;
}
bool found = (resultset && resultset->rows_count > 0);
if (resultset) {
delete resultset;
}
return found ? run_id : -1;
}
// It's a schema name - find the latest run_id for this schema
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::string esc_target_id = target_id;
std::string esc_run_id_or_schema = run_id_or_schema;
replace_str(esc_target_id, "'", "''");
replace_str(esc_run_id_or_schema, "'", "''");
std::ostringstream sql;
sql << "SELECT r.run_id FROM runs r "
<< "INNER JOIN schemas s ON s.run_id = r.run_id "
<< "WHERE s.schema_name = '" << run_id_or_schema << "' "
<< "WHERE r.target_id = '" << esc_target_id << "' "
<< "AND s.schema_name = '" << esc_run_id_or_schema << "' "
<< "ORDER BY r.started_at DESC LIMIT 1;";
db->execute_statement(sql.str().c_str(), &error, &cols, &affected, &resultset);
@ -111,7 +143,7 @@ int Discovery_Schema::resolve_run_id(const std::string& run_id_or_schema) {
if (!resultset || resultset->rows_count == 0) {
proxy_warning("No run found for schema '%s'\n", run_id_or_schema.c_str());
if (resultset) {
free(resultset);
delete resultset;
resultset = NULL;
}
return -1;
@ -120,7 +152,7 @@ int Discovery_Schema::resolve_run_id(const std::string& run_id_or_schema) {
SQLite3_row* row = resultset->rows[0];
int run_id = atoi(row->fields[0]);
free(resultset);
delete resultset;
return run_id;
}
@ -128,6 +160,77 @@ int Discovery_Schema::init_schema() {
// Enable foreign keys
db->execute("PRAGMA foreign_keys = ON");
// The prototype schema had MySQL-only runs metadata. Backward compatibility is intentionally
// not preserved: if we detect legacy runs layout, rebuild catalog tables from scratch.
{
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
db->execute_statement("PRAGMA table_info(runs);", &error, &cols, &affected, &resultset);
if (error) {
proxy_error("Discovery_Schema: failed reading runs table layout: %s\n", error);
free(error);
if (resultset) {
delete resultset;
}
return -1;
}
bool has_rows = (resultset && !resultset->rows.empty());
bool has_target_id = false;
bool has_protocol = false;
bool has_server_version = false;
for (size_t i = 0; resultset && i < resultset->rows.size(); i++) {
SQLite3_row* row = resultset->rows[i];
const char* col_name = (row->cnt > 1 && row->fields[1]) ? row->fields[1] : "";
if (!strcmp(col_name, "target_id")) {
has_target_id = true;
} else if (!strcmp(col_name, "protocol")) {
has_protocol = true;
} else if (!strcmp(col_name, "server_version")) {
has_server_version = true;
}
}
if (resultset) {
delete resultset;
}
if (has_rows && (!has_target_id || !has_protocol || !has_server_version)) {
proxy_warning("Discovery_Schema: legacy catalog schema detected, rebuilding catalog tables\n");
db->execute("PRAGMA foreign_keys = OFF");
db->execute("DROP TABLE IF EXISTS schema_docs");
db->execute("DROP TABLE IF EXISTS query_tool_calls");
db->execute("DROP TABLE IF EXISTS rag_search_log");
db->execute("DROP TABLE IF EXISTS llm_search_log");
db->execute("DROP TABLE IF EXISTS llm_notes");
db->execute("DROP TABLE IF EXISTS llm_question_templates");
db->execute("DROP TABLE IF EXISTS llm_metrics");
db->execute("DROP TABLE IF EXISTS llm_domain_members");
db->execute("DROP TABLE IF EXISTS llm_domains");
db->execute("DROP TABLE IF EXISTS llm_relationships");
db->execute("DROP TABLE IF EXISTS llm_object_summaries");
db->execute("DROP TABLE IF EXISTS agent_events");
db->execute("DROP TABLE IF EXISTS agent_runs");
db->execute("DROP TABLE IF EXISTS stats_mcp_query_digest_reset");
db->execute("DROP TABLE IF EXISTS stats_mcp_query_digest");
db->execute("DROP TABLE IF EXISTS mcp_query_rules");
db->execute("DROP TABLE IF EXISTS profiles");
db->execute("DROP TABLE IF EXISTS inferred_relationships");
db->execute("DROP TABLE IF EXISTS view_dependencies");
db->execute("DROP TABLE IF EXISTS foreign_key_columns");
db->execute("DROP TABLE IF EXISTS foreign_keys");
db->execute("DROP TABLE IF EXISTS index_columns");
db->execute("DROP TABLE IF EXISTS indexes");
db->execute("DROP TABLE IF EXISTS columns");
db->execute("DROP TABLE IF EXISTS objects");
db->execute("DROP TABLE IF EXISTS schemas");
db->execute("DROP TABLE IF EXISTS runs");
db->execute("DROP TABLE IF EXISTS fts_objects");
db->execute("DROP TABLE IF EXISTS fts_llm");
db->execute("PRAGMA foreign_keys = ON");
}
}
// Create all tables
int rc = create_deterministic_tables();
if (rc) {
@ -166,13 +269,16 @@ int Discovery_Schema::create_deterministic_tables() {
db->execute(
"CREATE TABLE IF NOT EXISTS runs ("
" run_id INTEGER PRIMARY KEY , "
" target_id TEXT NOT NULL , "
" protocol TEXT NOT NULL CHECK(protocol IN ('mysql','pgsql')) , "
" started_at TEXT NOT NULL DEFAULT (datetime('now')) , "
" finished_at TEXT , "
" source_dsn TEXT , "
" mysql_version TEXT , "
" server_version TEXT , "
" notes TEXT"
");"
);
db->execute("CREATE INDEX IF NOT EXISTS idx_runs_target_started ON runs(target_id, started_at DESC);");
// Schemas table
db->execute(
@ -650,20 +756,24 @@ int Discovery_Schema::create_fts_tables() {
// ============================================================================
int Discovery_Schema::create_run(
const std::string& target_id,
const std::string& protocol,
const std::string& source_dsn,
const std::string& mysql_version,
const std::string& server_version,
const std::string& notes
) {
sqlite3_stmt* stmt = NULL;
const char* sql = "INSERT INTO runs(source_dsn, mysql_version, notes) VALUES(?1, ?2 , ?3);";
const char* sql = "INSERT INTO runs(target_id, protocol, source_dsn, server_version, notes) VALUES(?1, ?2, ?3, ?4, ?5);";
auto [rc, stmt_unique] = db->prepare_v2(sql);
stmt = stmt_unique.get();
if (rc != SQLITE_OK) return -1;
(*proxy_sqlite3_bind_text)(stmt, 1, source_dsn.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, 2, mysql_version.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, 3, notes.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, 1, target_id.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, 2, protocol.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, 3, source_dsn.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, 4, server_version.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, 5, notes.c_str(), -1, SQLITE_TRANSIENT);
SAFE_SQLITE3_STEP2(stmt);
int run_id = (int)(*proxy_sqlite3_last_insert_rowid)(db->get_db());
@ -693,7 +803,7 @@ std::string Discovery_Schema::get_run_info(int run_id) {
SQLite3_result* resultset = NULL;
std::ostringstream sql;
sql << "SELECT run_id, started_at, finished_at, source_dsn, mysql_version , notes "
sql << "SELECT run_id, target_id, protocol, started_at, finished_at, source_dsn, server_version , notes "
<< "FROM runs WHERE run_id = " << run_id << ";";
db->execute_statement(sql.str().c_str(), &error, &cols, &affected, &resultset);
@ -702,11 +812,13 @@ std::string Discovery_Schema::get_run_info(int run_id) {
if (resultset && !resultset->rows.empty()) {
SQLite3_row* row = resultset->rows[0];
result["run_id"] = run_id;
result["started_at"] = std::string(row->fields[0] ? row->fields[0] : "");
result["finished_at"] = std::string(row->fields[1] ? row->fields[1] : "");
result["source_dsn"] = std::string(row->fields[2] ? row->fields[2] : "");
result["mysql_version"] = std::string(row->fields[3] ? row->fields[3] : "");
result["notes"] = std::string(row->fields[4] ? row->fields[4] : "");
result["target_id"] = std::string(row->fields[1] ? row->fields[1] : "");
result["protocol"] = std::string(row->fields[2] ? row->fields[2] : "");
result["started_at"] = std::string(row->fields[3] ? row->fields[3] : "");
result["finished_at"] = std::string(row->fields[4] ? row->fields[4] : "");
result["source_dsn"] = std::string(row->fields[5] ? row->fields[5] : "");
result["server_version"] = std::string(row->fields[6] ? row->fields[6] : "");
result["notes"] = std::string(row->fields[7] ? row->fields[7] : "");
} else {
result["error"] = "Run not found";
}

@ -96,7 +96,7 @@ _OBJ_CXX += GenAI_Thread.oo \
Admin_Tool_Handler.oo Cache_Tool_Handler.oo Observe_Tool_Handler.oo \
AI_Features_Manager.oo LLM_Bridge.oo LLM_Clients.oo Anomaly_Detector.oo AI_Vector_Storage.oo AI_Tool_Handler.oo \
RAG_Tool_Handler.oo \
Discovery_Schema.oo Static_Harvester.oo
Discovery_Schema.oo Static_Harvester.oo PgSQL_Static_Harvester.oo
endif
OBJ_CXX := $(patsubst %,$(ODIR)/%,$(_OBJ_CXX))

@ -0,0 +1,699 @@
#ifdef PROXYSQLGENAI
#include "PgSQL_Static_Harvester.h"
#include "proxysql.h"
#include "cpp.h"
#include <libpq-fe.h>
#include <sstream>
#include <map>
#include <algorithm>
#include <cctype>
#include "../deps/json/json.hpp"
using json = nlohmann::json;
PgSQL_Static_Harvester::PgSQL_Static_Harvester(
const std::string& host,
int port,
const std::string& user,
const std::string& password,
const std::string& dbname,
const std::string& catalog_path
) : pgsql_host(host),
pgsql_port(port),
pgsql_user(user),
pgsql_password(password),
pgsql_dbname(dbname),
pgsql_conn(NULL),
catalog(NULL),
current_run_id(-1)
{
pthread_mutex_init(&conn_lock, NULL);
catalog = new Discovery_Schema(catalog_path);
}
PgSQL_Static_Harvester::~PgSQL_Static_Harvester() {
close();
if (catalog) {
delete catalog;
catalog = NULL;
}
pthread_mutex_destroy(&conn_lock);
}
int PgSQL_Static_Harvester::init() {
if (catalog->init()) {
proxy_error("PgSQL_Static_Harvester: Failed to initialize catalog\n");
return -1;
}
return 0;
}
void PgSQL_Static_Harvester::close() {
disconnect_pgsql();
}
int PgSQL_Static_Harvester::connect_pgsql() {
pthread_mutex_lock(&conn_lock);
if (pgsql_conn) {
pthread_mutex_unlock(&conn_lock);
return 0;
}
std::ostringstream conninfo;
conninfo << "host=" << pgsql_host
<< " port=" << pgsql_port
<< " user=" << pgsql_user
<< " password=" << pgsql_password
<< " connect_timeout=30";
if (!pgsql_dbname.empty()) {
conninfo << " dbname=" << pgsql_dbname;
}
pgsql_conn = PQconnectdb(conninfo.str().c_str());
if (pgsql_conn == NULL || PQstatus(pgsql_conn) != CONNECTION_OK) {
proxy_error("PgSQL_Static_Harvester: PQconnectdb failed: %s\n",
pgsql_conn ? PQerrorMessage(pgsql_conn) : "NULL connection");
if (pgsql_conn) {
PQfinish(pgsql_conn);
pgsql_conn = NULL;
}
pthread_mutex_unlock(&conn_lock);
return -1;
}
server_version = get_pgsql_version();
source_dsn = "pgsql://" + pgsql_user + "@" + pgsql_host + ":" + std::to_string(pgsql_port) + "/" + pgsql_dbname;
proxy_info("PgSQL_Static_Harvester: Connected to PostgreSQL %s at %s:%d\n",
server_version.c_str(), pgsql_host.c_str(), pgsql_port);
pthread_mutex_unlock(&conn_lock);
return 0;
}
void PgSQL_Static_Harvester::disconnect_pgsql() {
pthread_mutex_lock(&conn_lock);
if (pgsql_conn) {
PQfinish(pgsql_conn);
pgsql_conn = NULL;
}
pthread_mutex_unlock(&conn_lock);
}
int PgSQL_Static_Harvester::execute_query(const std::string& query, std::vector<std::vector<std::string>>& results) {
if (!pgsql_conn) {
proxy_error("PgSQL_Static_Harvester: Not connected to PostgreSQL\n");
return -1;
}
PGresult* res = PQexec(pgsql_conn, query.c_str());
if (!res) {
proxy_error("PgSQL_Static_Harvester: PQexec returned NULL\n");
return -1;
}
ExecStatusType st = PQresultStatus(res);
if (st != PGRES_TUPLES_OK && st != PGRES_COMMAND_OK) {
proxy_error("PgSQL_Static_Harvester: Query failed: %s\n", PQresultErrorMessage(res));
PQclear(res);
return -1;
}
if (st == PGRES_COMMAND_OK) {
PQclear(res);
return 0;
}
int nrows = PQntuples(res);
int ncols = PQnfields(res);
results.clear();
results.reserve(nrows);
for (int r = 0; r < nrows; r++) {
std::vector<std::string> row;
row.reserve(ncols);
for (int c = 0; c < ncols; c++) {
row.push_back(PQgetisnull(res, r, c) ? "" : PQgetvalue(res, r, c));
}
results.push_back(row);
}
PQclear(res);
return 0;
}
std::string PgSQL_Static_Harvester::get_pgsql_version() {
std::vector<std::vector<std::string>> rows;
if (execute_query("SELECT version();", rows) == 0 && !rows.empty() && !rows[0].empty()) {
return rows[0][0];
}
return "";
}
bool PgSQL_Static_Harvester::is_valid_schema_name(const std::string& name) {
if (name.empty()) {
return true;
}
for (char c : name) {
if (!std::isalnum(static_cast<unsigned char>(c)) && c != '_' && c != '$') {
return false;
}
}
return true;
}
std::string PgSQL_Static_Harvester::escape_sql_string(const std::string& str) {
std::string escaped;
escaped.reserve(str.length() * 2);
for (char c : str) {
if (c == '\'') {
escaped += "''";
} else {
escaped += c;
}
}
return escaped;
}
int PgSQL_Static_Harvester::start_run(const std::string& target_id, const std::string& notes) {
if (current_run_id >= 0) {
proxy_error("PgSQL_Static_Harvester: Run already active (run_id=%d)\n", current_run_id);
return -1;
}
if (connect_pgsql()) {
return -1;
}
current_run_id = catalog->create_run(target_id, "pgsql", source_dsn, server_version, notes);
if (current_run_id < 0) {
proxy_error("PgSQL_Static_Harvester: Failed to create run\n");
return -1;
}
proxy_info("PgSQL_Static_Harvester: Started run_id=%d\n", current_run_id);
return current_run_id;
}
int PgSQL_Static_Harvester::finish_run(const std::string& notes) {
if (current_run_id < 0) {
proxy_error("PgSQL_Static_Harvester: No active run\n");
return -1;
}
int rc = catalog->finish_run(current_run_id, notes);
if (rc) {
proxy_error("PgSQL_Static_Harvester: Failed to finish run\n");
return -1;
}
proxy_info("PgSQL_Static_Harvester: Finished run_id=%d\n", current_run_id);
current_run_id = -1;
return 0;
}
int PgSQL_Static_Harvester::harvest_schemas(const std::string& only_schema) {
if (current_run_id < 0) {
return -1;
}
if (!is_valid_schema_name(only_schema)) {
return -1;
}
std::ostringstream sql;
sql << "SELECT schema_name, default_character_set_name, default_collation_name "
<< "FROM information_schema.schemata "
<< "WHERE schema_name NOT IN ('pg_catalog','information_schema') "
<< "AND schema_name NOT LIKE 'pg_toast%'";
if (!only_schema.empty()) {
sql << " AND schema_name='" << only_schema << "'";
}
sql << " ORDER BY schema_name";
std::vector<std::vector<std::string>> rows;
if (execute_query(sql.str(), rows)) {
return -1;
}
int count = 0;
for (const auto& r : rows) {
if (r.size() < 3) continue;
if (catalog->insert_schema(current_run_id, r[0], r[1], r[2]) >= 0) {
count++;
}
}
return count;
}
int PgSQL_Static_Harvester::harvest_objects(const std::string& only_schema) {
if (current_run_id < 0) {
return -1;
}
if (!is_valid_schema_name(only_schema)) {
return -1;
}
std::vector<std::vector<std::string>> rows;
std::ostringstream sql_tables;
sql_tables << "SELECT table_schema, table_name, "
<< "CASE WHEN table_type='BASE TABLE' THEN 'table' ELSE 'view' END "
<< "FROM information_schema.tables "
<< "WHERE table_schema NOT IN ('pg_catalog','information_schema') "
<< "AND table_schema NOT LIKE 'pg_toast%'";
if (!only_schema.empty()) {
sql_tables << " AND table_schema='" << only_schema << "'";
}
if (execute_query(sql_tables.str(), rows)) {
return -1;
}
int count = 0;
for (const auto& r : rows) {
if (r.size() < 3) continue;
if (catalog->insert_object(
current_run_id, r[0], r[1], r[2],
"", 0, 0, 0, "", "", "", ""
) >= 0) {
count++;
}
}
rows.clear();
std::ostringstream sql_routines;
sql_routines << "SELECT routine_schema, routine_name, 'routine', COALESCE(routine_definition,'') "
<< "FROM information_schema.routines "
<< "WHERE routine_schema NOT IN ('pg_catalog','information_schema')";
if (!only_schema.empty()) {
sql_routines << " AND routine_schema='" << only_schema << "'";
}
if (execute_query(sql_routines.str(), rows) == 0) {
for (const auto& r : rows) {
if (r.size() < 4) continue;
if (catalog->insert_object(
current_run_id, r[0], r[1], r[2],
"", 0, 0, 0, "", "", "", r[3]
) >= 0) {
count++;
}
}
}
rows.clear();
std::ostringstream sql_triggers;
sql_triggers << "SELECT trigger_schema, trigger_name, 'trigger', COALESCE(action_statement,'') "
<< "FROM information_schema.triggers "
<< "WHERE trigger_schema NOT IN ('pg_catalog','information_schema')";
if (!only_schema.empty()) {
sql_triggers << " AND trigger_schema='" << only_schema << "'";
}
if (execute_query(sql_triggers.str(), rows) == 0) {
for (const auto& r : rows) {
if (r.size() < 4) continue;
if (catalog->insert_object(
current_run_id, r[0], r[1], r[2],
"", 0, 0, 0, "", "", "", r[3]
) >= 0) {
count++;
}
}
}
return count;
}
int PgSQL_Static_Harvester::harvest_columns(const std::string& only_schema) {
if (current_run_id < 0) return -1;
if (!is_valid_schema_name(only_schema)) return -1;
std::ostringstream sql;
sql << "SELECT table_schema, table_name, ordinal_position, column_name, data_type, "
<< "COALESCE(udt_name,''), CASE WHEN is_nullable='YES' THEN 1 ELSE 0 END, "
<< "COALESCE(column_default,''), '', COALESCE(character_set_name,''), "
<< "COALESCE(collation_name,''), COALESCE(column_name,'') "
<< "FROM information_schema.columns "
<< "WHERE table_schema NOT IN ('pg_catalog','information_schema') "
<< "AND table_schema NOT LIKE 'pg_toast%'";
if (!only_schema.empty()) {
sql << " AND table_schema='" << only_schema << "'";
}
sql << " ORDER BY table_schema, table_name, ordinal_position";
std::vector<std::vector<std::string>> rows;
if (execute_query(sql.str(), rows)) return -1;
int count = 0;
for (const auto& r : rows) {
if (r.size() < 12) continue;
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::ostringstream obj_sql;
obj_sql << "SELECT object_id FROM objects "
<< "WHERE run_id=" << current_run_id
<< " AND schema_name='" << escape_sql_string(r[0]) << "'"
<< " AND object_name='" << escape_sql_string(r[1]) << "'"
<< " AND object_type IN ('table','view') LIMIT 1;";
catalog->get_db()->execute_statement(obj_sql.str().c_str(), &error, &cols, &affected, &resultset);
if (error || !resultset || resultset->rows.empty()) {
if (error) {
free(error);
}
if (resultset) {
delete resultset;
}
continue;
}
int object_id = atoi(resultset->rows[0]->fields[0]);
delete resultset;
int is_time = 0;
if (r[4] == "date" || r[4] == "time" || r[4] == "timestamp" ||
r[4] == "timestamptz" || r[4] == "timetz") {
is_time = 1;
}
int is_id_like = (r[3] == "id" || (r[3].size() > 3 && r[3].substr(r[3].size() - 3) == "_id")) ? 1 : 0;
if (catalog->insert_column(
object_id,
atoi(r[2].c_str()), r[3], r[4], r[5], atoi(r[6].c_str()),
r[7], r[8], r[9], r[10], r[11],
0, 0, 0, is_time, is_id_like
) >= 0) {
count++;
}
}
catalog->update_object_flags(current_run_id);
return count;
}
int PgSQL_Static_Harvester::harvest_indexes(const std::string& only_schema) {
if (current_run_id < 0) return -1;
if (!is_valid_schema_name(only_schema)) return -1;
std::ostringstream sql;
sql << "SELECT ns.nspname, t.relname, i.relname, "
<< "CASE WHEN ix.indisunique THEN 1 ELSE 0 END, "
<< "COALESCE(am.amname,''), k.ord, COALESCE(a.attname,''), 0, '', 0 "
<< "FROM pg_class t "
<< "JOIN pg_namespace ns ON ns.oid=t.relnamespace "
<< "JOIN pg_index ix ON ix.indrelid=t.oid "
<< "JOIN pg_class i ON i.oid=ix.indexrelid "
<< "JOIN pg_am am ON am.oid=i.relam "
<< "JOIN LATERAL unnest(ix.indkey) WITH ORDINALITY AS k(attnum, ord) ON TRUE "
<< "LEFT JOIN pg_attribute a ON a.attrelid=t.oid AND a.attnum=k.attnum "
<< "WHERE t.relkind IN ('r','p') "
<< "AND ns.nspname NOT IN ('pg_catalog','information_schema') "
<< "AND ns.nspname NOT LIKE 'pg_toast%'";
if (!only_schema.empty()) {
sql << " AND ns.nspname='" << only_schema << "'";
}
sql << " ORDER BY ns.nspname, t.relname, i.relname, k.ord";
std::vector<std::vector<std::string>> rows;
if (execute_query(sql.str(), rows)) return -1;
int count = 0;
std::map<std::string, std::vector<std::vector<std::string>>> grouped;
for (const auto& r : rows) {
if (r.size() < 10) continue;
std::string key = r[0] + "." + r[1] + "." + r[2];
grouped[key].push_back(r);
}
for (const auto& entry : grouped) {
const auto& rows_for_index = entry.second;
if (rows_for_index.empty()) continue;
const auto& first = rows_for_index[0];
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::ostringstream obj_sql;
obj_sql << "SELECT object_id FROM objects "
<< "WHERE run_id=" << current_run_id
<< " AND schema_name='" << escape_sql_string(first[0]) << "'"
<< " AND object_name='" << escape_sql_string(first[1]) << "'"
<< " AND object_type='table' LIMIT 1;";
catalog->get_db()->execute_statement(obj_sql.str().c_str(), &error, &cols, &affected, &resultset);
if (error || !resultset || resultset->rows.empty()) {
if (error) {
free(error);
}
if (resultset) {
delete resultset;
}
continue;
}
int object_id = atoi(resultset->rows[0]->fields[0]);
delete resultset;
int is_primary = (first[2] == "PRIMARY" || first[2] == first[1] + "_pkey") ? 1 : 0;
int index_id = catalog->insert_index(
object_id, first[2], atoi(first[3].c_str()), is_primary, first[4], atol(first[9].c_str())
);
if (index_id < 0) {
continue;
}
for (const auto& idx_col : rows_for_index) {
catalog->insert_index_column(index_id, atoi(idx_col[5].c_str()), idx_col[6], atoi(idx_col[7].c_str()), idx_col[8]);
}
count++;
}
catalog->update_object_flags(current_run_id);
return count;
}
int PgSQL_Static_Harvester::harvest_foreign_keys(const std::string& only_schema) {
if (current_run_id < 0) return -1;
if (!is_valid_schema_name(only_schema)) return -1;
std::ostringstream sql;
sql << "SELECT tc.table_schema, tc.table_name, tc.constraint_name, "
<< "kcu.column_name, ccu.table_schema, ccu.table_name, ccu.column_name, "
<< "kcu.ordinal_position, COALESCE(rc.update_rule,''), COALESCE(rc.delete_rule,'') "
<< "FROM information_schema.table_constraints tc "
<< "JOIN information_schema.key_column_usage kcu "
<< "ON tc.constraint_name=kcu.constraint_name "
<< "AND tc.table_schema=kcu.table_schema "
<< "JOIN information_schema.constraint_column_usage ccu "
<< "ON ccu.constraint_name=tc.constraint_name "
<< "AND ccu.constraint_schema=tc.constraint_schema "
<< "LEFT JOIN information_schema.referential_constraints rc "
<< "ON rc.constraint_name=tc.constraint_name "
<< "AND rc.constraint_schema=tc.constraint_schema "
<< "WHERE tc.constraint_type='FOREIGN KEY' "
<< "AND tc.table_schema NOT IN ('pg_catalog','information_schema') "
<< "AND tc.table_schema NOT LIKE 'pg_toast%'";
if (!only_schema.empty()) {
sql << " AND tc.table_schema='" << only_schema << "'";
}
sql << " ORDER BY tc.table_schema, tc.table_name, tc.constraint_name, kcu.ordinal_position";
std::vector<std::vector<std::string>> rows;
if (execute_query(sql.str(), rows)) return -1;
int count = 0;
int last_fk_id = -1;
std::string last_fk_key;
for (const auto& r : rows) {
if (r.size() < 10) continue;
std::string fk_key = r[0] + "." + r[1] + "." + r[2];
if (fk_key != last_fk_key) {
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::ostringstream obj_sql;
obj_sql << "SELECT object_id FROM objects "
<< "WHERE run_id=" << current_run_id
<< " AND schema_name='" << escape_sql_string(r[0]) << "'"
<< " AND object_name='" << escape_sql_string(r[1]) << "'"
<< " AND object_type='table' LIMIT 1;";
catalog->get_db()->execute_statement(obj_sql.str().c_str(), &error, &cols, &affected, &resultset);
if (error || !resultset || resultset->rows.empty()) {
if (error) {
free(error);
}
if (resultset) {
delete resultset;
}
last_fk_id = -1;
last_fk_key.clear();
continue;
}
int child_object_id = atoi(resultset->rows[0]->fields[0]);
delete resultset;
last_fk_id = catalog->insert_foreign_key(
current_run_id, child_object_id, r[2], r[4], r[5], r[8], r[9]
);
last_fk_key = fk_key;
}
if (last_fk_id >= 0) {
catalog->insert_foreign_key_column(last_fk_id, atoi(r[7].c_str()), r[3], r[6]);
count++;
}
}
catalog->update_object_flags(current_run_id);
return count;
}
int PgSQL_Static_Harvester::harvest_view_definitions(const std::string& only_schema) {
if (current_run_id < 0) return -1;
if (!is_valid_schema_name(only_schema)) return -1;
std::ostringstream sql;
sql << "SELECT schemaname, viewname, definition FROM pg_views "
<< "WHERE schemaname NOT IN ('pg_catalog','information_schema') "
<< "AND schemaname NOT LIKE 'pg_toast%'";
if (!only_schema.empty()) {
sql << " AND schemaname='" << only_schema << "'";
}
std::vector<std::vector<std::string>> rows;
if (execute_query(sql.str(), rows)) return -1;
int count = 0;
for (const auto& row : rows) {
if (row.size() < 3) continue;
char* error = NULL;
int cols = 0, affected = 0;
std::ostringstream update_sql;
update_sql << "UPDATE objects SET definition_sql = '" << escape_sql_string(row[2]) << "' "
<< "WHERE run_id = " << current_run_id
<< " AND schema_name = '" << escape_sql_string(row[0]) << "'"
<< " AND object_name = '" << escape_sql_string(row[1]) << "'"
<< " AND object_type = 'view';";
catalog->get_db()->execute_statement(update_sql.str().c_str(), &error, &cols, &affected);
if (error) {
free(error);
}
if (affected > 0) {
count++;
}
}
return count;
}
int PgSQL_Static_Harvester::build_quick_profiles() {
if (current_run_id < 0) return -1;
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::ostringstream sql;
sql << "SELECT object_id, object_name, table_rows_est, data_length, index_length, "
"has_primary_key, has_foreign_keys, has_time_column "
"FROM objects WHERE run_id=" << current_run_id << " AND object_type='table'";
catalog->get_db()->execute_statement(sql.str().c_str(), &error, &cols, &affected, &resultset);
if (error) {
free(error);
if (resultset) delete resultset;
return -1;
}
int count = 0;
if (resultset) {
for (auto row : resultset->rows) {
if (row->cnt < 8) continue;
const std::string name = row->fields[1] ? row->fields[1] : "";
std::string kind = "unknown";
std::string lname = name;
std::transform(lname.begin(), lname.end(), lname.begin(), ::tolower);
if (lname.find("log") != std::string::npos || lname.find("event") != std::string::npos || lname.find("audit") != std::string::npos) {
kind = "log";
} else if (lname.find("order") != std::string::npos || lname.find("invoice") != std::string::npos || lname.find("payment") != std::string::npos || lname.find("transaction") != std::string::npos) {
kind = "fact";
} else if (lname.find("user") != std::string::npos || lname.find("customer") != std::string::npos || lname.find("account") != std::string::npos || lname.find("product") != std::string::npos) {
kind = "entity";
}
json p;
p["guessed_kind"] = kind;
p["rows_est"] = row->fields[2] ? atoll(row->fields[2]) : 0;
p["size_bytes"] = (row->fields[3] ? atoll(row->fields[3]) : 0) + (row->fields[4] ? atoll(row->fields[4]) : 0);
p["engine"] = "pgsql";
p["has_primary_key"] = row->fields[5] ? atoi(row->fields[5]) : 0;
p["has_foreign_keys"] = row->fields[6] ? atoi(row->fields[6]) : 0;
p["has_time_column"] = row->fields[7] ? atoi(row->fields[7]) : 0;
if (catalog->upsert_profile(current_run_id, atoi(row->fields[0]), "table_quick", p.dump()) == 0) {
count++;
}
}
delete resultset;
}
return count;
}
int PgSQL_Static_Harvester::rebuild_fts_index() {
if (current_run_id < 0) return -1;
return catalog->rebuild_fts_index(current_run_id);
}
int PgSQL_Static_Harvester::run_full_harvest(const std::string& target_id, const std::string& only_schema, const std::string& notes) {
if (start_run(target_id, notes) < 0) return -1;
if (harvest_schemas(only_schema) < 0) { finish_run("Failed during schema harvest"); return -1; }
if (harvest_objects(only_schema) < 0) { finish_run("Failed during object harvest"); return -1; }
if (harvest_columns(only_schema) < 0) { finish_run("Failed during column harvest"); return -1; }
if (harvest_indexes(only_schema) < 0) { finish_run("Failed during index harvest"); return -1; }
if (harvest_foreign_keys(only_schema) < 0) { finish_run("Failed during foreign key harvest"); return -1; }
if (harvest_view_definitions(only_schema) < 0) { finish_run("Failed during view definition harvest"); return -1; }
if (build_quick_profiles() < 0) { finish_run("Failed during profile building"); return -1; }
if (rebuild_fts_index() < 0) { finish_run("Failed during FTS rebuild"); return -1; }
int final_run_id = current_run_id;
finish_run("Harvest completed successfully");
return final_run_id;
}
std::string PgSQL_Static_Harvester::get_harvest_stats() {
if (current_run_id < 0) {
return "{\"error\": \"No active run\"}";
}
return get_harvest_stats(current_run_id);
}
std::string PgSQL_Static_Harvester::get_harvest_stats(int run_id) {
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
json stats;
stats["run_id"] = run_id;
stats["protocol"] = "pgsql";
std::ostringstream q1;
q1 << "SELECT object_type, COUNT(*) FROM objects WHERE run_id=" << run_id << " GROUP BY object_type";
catalog->get_db()->execute_statement(q1.str().c_str(), &error, &cols, &affected, &resultset);
json objects = json::object();
if (!error && resultset) {
for (auto row : resultset->rows) {
if (row->cnt >= 2) {
objects[row->fields[0] ? row->fields[0] : "unknown"] = row->fields[1] ? atoi(row->fields[1]) : 0;
}
}
delete resultset;
} else if (error) {
free(error);
error = NULL;
if (resultset) delete resultset;
}
stats["objects"] = objects;
auto get_count = [&](const std::string& table) -> int {
char* e = NULL;
int c = 0, a = 0;
SQLite3_result* rs = NULL;
std::ostringstream q;
q << "SELECT COUNT(*) FROM " << table << " WHERE run_id=" << run_id;
catalog->get_db()->execute_statement(q.str().c_str(), &e, &c, &a, &rs);
int out = 0;
if (!e && rs && !rs->rows.empty() && rs->rows[0]->cnt > 0 && rs->rows[0]->fields[0]) {
out = atoi(rs->rows[0]->fields[0]);
}
if (e) free(e);
if (rs) delete rs;
return out;
};
stats["columns"] = get_count("columns");
stats["foreign_keys"] = get_count("foreign_keys");
return stats.dump();
}
#endif /* PROXYSQLGENAI */

@ -97,11 +97,6 @@ ProxySQL_MCP_Server::ProxySQL_MCP_Server(int p, MCP_Threads_Handler* h)
std::string catalog_path = std::string(GloVars.datadir) + "/mcp_catalog.db";
handler->query_tool_handler = new Query_Tool_Handler(
handler->variables.mcp_mysql_hosts ? handler->variables.mcp_mysql_hosts : "",
handler->variables.mcp_mysql_ports ? handler->variables.mcp_mysql_ports : "",
handler->variables.mcp_mysql_user ? handler->variables.mcp_mysql_user : "",
handler->variables.mcp_mysql_password ? handler->variables.mcp_mysql_password : "",
handler->variables.mcp_mysql_schema ? handler->variables.mcp_mysql_schema : "",
catalog_path.c_str()
);
if (handler->query_tool_handler->init() == 0) {

@ -12,6 +12,7 @@ using json = nlohmann::json;
#include "proxysql_debug.h"
#include "proxysql_admin.h"
#include "Static_Harvester.h"
#include "PgSQL_Static_Harvester.h"
#include <vector>
#include <map>
@ -251,60 +252,28 @@ static std::string escape_string_literal(const std::string& value) {
}
Query_Tool_Handler::Query_Tool_Handler(
const std::string& hosts,
const std::string& ports,
const std::string& user,
const std::string& password,
const std::string& schema,
const std::string& catalog_path)
: catalog(NULL),
harvester(NULL),
mysql_harvester(NULL),
pgsql_harvester(NULL),
pool_size(0),
pg_pool_size(0),
max_rows(200),
timeout_ms(2000),
allow_select_star(false)
{
// Parse hosts
std::istringstream h(hosts);
std::string host;
while (std::getline(h, host, ',')) {
host.erase(0, host.find_first_not_of(" \t"));
host.erase(host.find_last_not_of(" \t") + 1);
if (!host.empty()) {
// Store hosts for later
}
}
// Parse ports
std::istringstream p(ports);
std::string port;
while (std::getline(p, port, ',')) {
port.erase(0, port.find_first_not_of(" \t"));
port.erase(port.find_last_not_of(" \t") + 1);
}
mysql_hosts = hosts;
mysql_ports = ports;
mysql_user = user;
mysql_password = password;
mysql_schema = schema;
// Initialize pool mutex
pthread_mutex_init(&pool_lock, NULL);
// Initialize counters mutex
pthread_mutex_init(&counters_lock, NULL);
// Create discovery schema and harvester
// Create discovery schema and protocol-specific harvesters.
catalog = new Discovery_Schema(catalog_path);
harvester = new Static_Harvester(
hosts.empty() ? "127.0.0.1" : hosts,
ports.empty() ? 3306 : std::stoi(ports),
user, password, schema, catalog_path
);
mysql_harvester = new Static_Harvester("127.0.0.1", 3306, "", "", "", catalog_path);
pgsql_harvester = new PgSQL_Static_Harvester("127.0.0.1", 5432, "", "", "", catalog_path);
proxy_debug(PROXY_DEBUG_GENERIC, 3, "Query_Tool_Handler created with Discovery_Schema\n");
proxy_debug(PROXY_DEBUG_GENERIC, 3, "Query_Tool_Handler created with Discovery_Schema and protocol harvesters\n");
}
Query_Tool_Handler::~Query_Tool_Handler() {
@ -315,9 +284,14 @@ Query_Tool_Handler::~Query_Tool_Handler() {
catalog = NULL;
}
if (harvester) {
delete harvester;
harvester = NULL;
if (mysql_harvester) {
delete mysql_harvester;
mysql_harvester = NULL;
}
if (pgsql_harvester) {
delete pgsql_harvester;
pgsql_harvester = NULL;
}
pthread_mutex_destroy(&pool_lock);
@ -332,9 +306,13 @@ int Query_Tool_Handler::init() {
return -1;
}
// Initialize harvester (but don't connect yet)
if (harvester->init()) {
proxy_error("Query_Tool_Handler: Failed to initialize Static_Harvester\n");
// Initialize protocol-specific harvesters (lazy backend connect).
if (mysql_harvester->init()) {
proxy_error("Query_Tool_Handler: Failed to initialize MySQL Static_Harvester\n");
return -1;
}
if (pgsql_harvester->init()) {
proxy_error("Query_Tool_Handler: Failed to initialize PgSQL Static_Harvester\n");
return -1;
}
@ -344,7 +322,7 @@ int Query_Tool_Handler::init() {
return -1;
}
proxy_info("Query_Tool_Handler initialized with Discovery_Schema and Static_Harvester\n");
proxy_info("Query_Tool_Handler initialized with Discovery_Schema and protocol harvesters\n");
return 0;
}
@ -1381,9 +1359,9 @@ json Query_Tool_Handler::get_tool_list() {
// ============================================================
tools.push_back(create_tool_schema(
"discovery.run_static",
"Trigger ProxySQL to perform static metadata harvest from MySQL INFORMATION_SCHEMA for a single schema. Returns the new run_id for subsequent LLM analysis.",
{"schema_filter"},
{{"notes", "string"}}
"Trigger ProxySQL to perform static metadata harvest for a specific logical target. target_id is required and protocol-aware (mysql/pgsql).",
{"target_id"},
{{"schema_filter", "string"}, {"notes", "string"}}
));
// ============================================================
@ -1399,28 +1377,28 @@ json Query_Tool_Handler::get_tool_list() {
tools.push_back(create_tool_schema(
"catalog.search",
"Full-text search over discovered objects (tables/views/routines) using FTS5. Returns ranked object_keys and basic metadata.",
{"run_id", "query"},
{"target_id", "run_id", "query"},
{{"limit", "integer"}, {"object_type", "string"}, {"schema_name", "string"}}
));
tools.push_back(create_tool_schema(
"catalog.get_object",
"Fetch a discovered object and its columns/indexes/foreign keys by object_key (schema.object) or by object_id.",
{"run_id"},
{"target_id", "run_id"},
{{"object_id", "integer"}, {"object_key", "string"}, {"include_definition", "boolean"}, {"include_profiles", "boolean"}}
));
tools.push_back(create_tool_schema(
"catalog.list_objects",
"List objects (paged) for a run, optionally filtered by schema/type, ordered by name or size/rows estimate.",
{"run_id"},
{"target_id", "run_id"},
{{"schema_name", "string"}, {"object_type", "string"}, {"order_by", "string"}, {"page_size", "integer"}, {"page_token", "string"}}
));
tools.push_back(create_tool_schema(
"catalog.get_relationships",
"Get relationships for a given object: foreign keys, view deps, inferred relationships (deterministic + LLM).",
{"run_id"},
{"target_id", "run_id"},
{{"object_id", "integer"}, {"object_key", "string"}, {"include_inferred", "boolean"}, {"min_confidence", "number"}}
));
@ -1430,7 +1408,7 @@ json Query_Tool_Handler::get_tool_list() {
tools.push_back(create_tool_schema(
"agent.run_start",
"Create a new LLM agent run bound to a deterministic discovery run_id.",
{"run_id", "model_name"},
{"target_id", "run_id", "model_name"},
{{"prompt_hash", "string"}, {"budget", "object"}}
));
@ -1454,63 +1432,63 @@ json Query_Tool_Handler::get_tool_list() {
tools.push_back(create_tool_schema(
"llm.summary_upsert",
"Upsert a structured semantic summary for an object (table/view/routine). This is the main LLM 'memory' per object.",
{"agent_run_id", "run_id", "object_id", "summary"},
{"target_id", "agent_run_id", "run_id", "object_id", "summary"},
{{"confidence", "number"}, {"status", "string"}, {"sources", "object"}}
));
tools.push_back(create_tool_schema(
"llm.summary_get",
"Get the LLM semantic summary for an object, optionally for a specific agent_run_id.",
{"run_id", "object_id"},
{"target_id", "run_id", "object_id"},
{{"agent_run_id", "integer"}, {"latest", "boolean"}}
));
tools.push_back(create_tool_schema(
"llm.relationship_upsert",
"Upsert an LLM-inferred relationship (join edge) between objects/columns with confidence and evidence.",
{"agent_run_id", "run_id", "child_object_id", "child_column", "parent_object_id", "parent_column", "confidence"},
{"target_id", "agent_run_id", "run_id", "child_object_id", "child_column", "parent_object_id", "parent_column", "confidence"},
{{"rel_type", "string"}, {"evidence", "object"}}
));
tools.push_back(create_tool_schema(
"llm.domain_upsert",
"Create or update a domain (cluster) like 'billing' and its description.",
{"agent_run_id", "run_id", "domain_key"},
{"target_id", "agent_run_id", "run_id", "domain_key"},
{{"title", "string"}, {"description", "string"}, {"confidence", "number"}}
));
tools.push_back(create_tool_schema(
"llm.domain_set_members",
"Replace members of a domain with a provided list of object_ids and optional roles/confidences.",
{"agent_run_id", "run_id", "domain_key", "members"},
{"target_id", "agent_run_id", "run_id", "domain_key", "members"},
{}
));
tools.push_back(create_tool_schema(
"llm.metric_upsert",
"Upsert a metric/KPI definition with optional SQL template and dependencies.",
{"agent_run_id", "run_id", "metric_key", "title"},
{"target_id", "agent_run_id", "run_id", "metric_key", "title"},
{{"description", "string"}, {"domain_key", "string"}, {"grain", "string"}, {"unit", "string"}, {"sql_template", "string"}, {"depends", "object"}, {"confidence", "number"}}
));
tools.push_back(create_tool_schema(
"llm.question_template_add",
"Add a question template (NL) mapped to a structured query plan. Extract table/view names from example_sql and populate related_objects. agent_run_id is optional - if not provided, uses the last agent run for the schema.",
{"run_id", "title", "question_nl", "template"},
{"target_id", "run_id", "title", "question_nl", "template"},
{{"agent_run_id", "integer"}, {"example_sql", "string"}, {"related_objects", "array"}, {"confidence", "number"}}
));
tools.push_back(create_tool_schema(
"llm.note_add",
"Add a durable free-form note (global/schema/object/domain scoped) for the agent memory.",
{"agent_run_id", "run_id", "scope", "body"},
{"target_id", "agent_run_id", "run_id", "scope", "body"},
{{"object_id", "integer"}, {"domain_key", "string"}, {"title", "string"}, {"tags", "array"}}
));
tools.push_back(create_tool_schema(
"llm.search",
"Full-text search across LLM artifacts. For question_templates, returns example_sql, related_objects, template_json, and confidence. Use include_objects=true with a non-empty query to get full object schema details (for search mode only). Empty query (list mode) returns only templates without objects to avoid huge responses.",
{"run_id"},
{"target_id", "run_id"},
{{"query", "string"}, {"limit", "integer"}, {"include_objects", "boolean"}}
));
@ -1544,10 +1522,13 @@ json Query_Tool_Handler::get_tool_description(const std::string& tool_name) {
* Returns "(no schema)" for tools without schema context
*/
static std::string extract_schema_name(const std::string& tool_name, const json& arguments, Discovery_Schema* catalog) {
(void)tool_name;
std::string target_id = json_string(arguments, "target_id");
// Tools that use run_id (can be resolved to schema)
if (arguments.contains("run_id")) {
if (arguments.contains("run_id") && !target_id.empty()) {
std::string run_id_str = json_string(arguments, "run_id");
int run_id = catalog->resolve_run_id(run_id_str);
int run_id = catalog->resolve_run_id(target_id, run_id_str);
if (run_id > 0) {
// Look up schema name from catalog
char* error = NULL;
@ -1629,21 +1610,26 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
std::string target_id = json_string(arguments, "target_id");
std::string page_token = json_string(arguments, "page_token");
int page_size = json_int(arguments, "page_size", 50);
if (!target_id.empty()) {
refresh_target_registry();
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
return create_error_response("Unknown target_id: " + target_id);
}
refresh_target_registry();
std::string resolved_target_id = target_id.empty() ? default_target_id : target_id;
if (resolved_target_id.empty()) {
return create_error_response("target_id is required because no default target is available");
}
const QueryTarget* target = resolve_target(resolved_target_id);
if (target == NULL) {
return create_error_response("Unknown target_id: " + resolved_target_id);
}
// Query catalog's schemas table instead of live database
// Query catalog schemas for the resolved target only.
char* error = NULL;
int cols = 0, affected = 0;
SQLite3_result* resultset = NULL;
std::ostringstream sql;
sql << "SELECT DISTINCT schema_name FROM schemas ORDER BY schema_name";
sql << "SELECT DISTINCT s.schema_name"
<< " FROM schemas s JOIN runs r ON r.run_id=s.run_id"
<< " WHERE r.target_id='" << escape_string_literal(resolved_target_id) << "'"
<< " ORDER BY s.schema_name";
if (page_size > 0) {
sql << " LIMIT " << page_size;
if (!page_token.empty()) {
@ -1755,31 +1741,91 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
// DISCOVERY TOOLS
// ============================================================
else if (tool_name == "discovery.run_static") {
if (!harvester) {
result = create_error_response("Static harvester not configured");
std::string target_id = json_string(arguments, "target_id");
std::string schema_filter = json_string(arguments, "schema_filter");
std::string notes = json_string(arguments, "notes", "Static discovery harvest");
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else {
std::string schema_filter = json_string(arguments, "schema_filter");
if (schema_filter.empty()) {
result = create_error_response("schema_filter is required and must not be empty");
refresh_target_registry();
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
result = create_error_response("Unknown target_id: " + target_id);
} else if (!target->executable) {
result = create_error_response(format_target_unavailable_error(target_id));
} else {
std::string notes = json_string(arguments, "notes", "Static discovery harvest");
int run_id = harvester->run_full_harvest(schema_filter, notes);
if (run_id < 0) {
result = create_error_response("Static discovery failed");
int run_id = -1;
if (target->protocol == "pgsql") {
if (!pgsql_harvester) {
result = create_error_response("PgSQL static harvester not configured");
} else {
PgSQL_Static_Harvester harvester(
target->host,
target->port > 0 ? target->port : 5432,
target->db_username,
target->db_password,
target->default_schema,
catalog->get_db_path()
);
if (harvester.init()) {
result = create_error_response("Failed to initialize PgSQL static harvester");
} else {
run_id = harvester.run_full_harvest(target->target_id, schema_filter, notes);
if (run_id >= 0) {
std::string stats_str = harvester.get_harvest_stats(run_id);
try {
json stats = json::parse(stats_str);
stats["target_id"] = target->target_id;
stats["protocol"] = target->protocol;
result = create_success_response(stats);
} catch (...) {
json stats;
stats["run_id"] = run_id;
stats["target_id"] = target->target_id;
stats["protocol"] = target->protocol;
result = create_success_response(stats);
}
}
}
}
} else {
// Get stats using the run_id (after finish_run() has reset current_run_id)
std::string stats_str = harvester->get_harvest_stats(run_id);
json stats;
try {
stats = json::parse(stats_str);
} catch (...) {
stats["run_id"] = run_id;
if (!mysql_harvester) {
result = create_error_response("MySQL static harvester not configured");
} else {
Static_Harvester harvester(
target->host,
target->port > 0 ? target->port : 3306,
target->db_username,
target->db_password,
target->default_schema,
catalog->get_db_path()
);
if (harvester.init()) {
result = create_error_response("Failed to initialize MySQL static harvester");
} else {
run_id = harvester.run_full_harvest(target->target_id, schema_filter, notes);
if (run_id >= 0) {
std::string stats_str = harvester.get_harvest_stats(run_id);
try {
json stats = json::parse(stats_str);
stats["target_id"] = target->target_id;
stats["protocol"] = target->protocol;
result = create_success_response(stats);
} catch (...) {
json stats;
stats["run_id"] = run_id;
stats["target_id"] = target->target_id;
stats["protocol"] = target->protocol;
result = create_success_response(stats);
}
}
}
}
}
stats["started_at"] = "";
stats["mysql_version"] = "";
result = create_success_response(stats);
if (run_id < 0) {
result = create_error_response("Static discovery failed");
}
}
}
@ -1801,21 +1847,24 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "catalog.search") {
std::string target_id = json_string(arguments, "target_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string query = json_string(arguments, "query");
int limit = json_int(arguments, "limit", 25);
std::string object_type = json_string(arguments, "object_type");
std::string schema_name = json_string(arguments, "schema_name");
if (run_id_or_schema.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty()) {
result = create_error_response("run_id is required");
} else if (query.empty()) {
result = create_error_response("query is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
std::string search_results = catalog->fts_search(run_id, query, limit, object_type, schema_name);
try {
@ -1828,19 +1877,22 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "catalog.get_object") {
std::string target_id = json_string(arguments, "target_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
int object_id = json_int(arguments, "object_id", -1);
std::string object_key = json_string(arguments, "object_key");
bool include_definition = json_int(arguments, "include_definition", 0) != 0;
bool include_profiles = json_int(arguments, "include_profiles", 1) != 0;
if (run_id_or_schema.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty()) {
result = create_error_response("run_id is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
std::string schema_name, object_name;
if (!object_key.empty()) {
@ -1870,6 +1922,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "catalog.list_objects") {
std::string target_id = json_string(arguments, "target_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string schema_name = json_string(arguments, "schema_name");
std::string object_type = json_string(arguments, "object_type");
@ -1877,13 +1930,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
int page_size = json_int(arguments, "page_size", 50);
std::string page_token = json_string(arguments, "page_token");
if (run_id_or_schema.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty()) {
result = create_error_response("run_id is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
std::string list_result = catalog->list_objects(
run_id, schema_name, object_type, order_by, page_size, page_token
@ -1898,19 +1953,22 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "catalog.get_relationships") {
std::string target_id = json_string(arguments, "target_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
int object_id = json_int(arguments, "object_id", -1);
std::string object_key = json_string(arguments, "object_key");
bool include_inferred = json_int(arguments, "include_inferred", 1) != 0;
double min_confidence = json_double(arguments, "min_confidence", 0.0);
if (run_id_or_schema.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty()) {
result = create_error_response("run_id is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
// Resolve object_key to object_id if needed
if (object_id < 0 && !object_key.empty()) {
@ -1963,6 +2021,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
// AGENT TOOLS
// ============================================================
else if (tool_name == "agent.run_start") {
std::string target_id = json_string(arguments, "target_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string model_name = json_string(arguments, "model_name");
std::string prompt_hash = json_string(arguments, "prompt_hash");
@ -1972,15 +2031,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
budget_json = arguments["budget"].dump();
}
if (run_id_or_schema.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty()) {
result = create_error_response("run_id is required");
} else if (model_name.empty()) {
result = create_error_response("model_name is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
int agent_run_id = catalog->create_agent_run(run_id, model_name, prompt_hash, budget_json);
if (agent_run_id < 0) {
@ -2048,6 +2109,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
// LLM MEMORY TOOLS
// ============================================================
else if (tool_name == "llm.summary_upsert") {
std::string target_id = json_string(arguments, "target_id");
int agent_run_id = json_int(arguments, "agent_run_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
int object_id = json_int(arguments, "object_id");
@ -2065,15 +2127,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
sources_json = arguments["sources"].dump();
}
if (agent_run_id <= 0 || run_id_or_schema.empty() || object_id <= 0) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (agent_run_id <= 0 || run_id_or_schema.empty() || object_id <= 0) {
result = create_error_response("agent_run_id, run_id, and object_id are required");
} else if (summary_json.empty()) {
result = create_error_response("summary is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
int rc = catalog->upsert_llm_summary(
agent_run_id, run_id, object_id, summary_json,
@ -2092,18 +2156,21 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.summary_get") {
std::string target_id = json_string(arguments, "target_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
int object_id = json_int(arguments, "object_id");
int agent_run_id = json_int(arguments, "agent_run_id", -1);
bool latest = json_int(arguments, "latest", 1) != 0;
if (run_id_or_schema.empty() || object_id <= 0) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty() || object_id <= 0) {
result = create_error_response("run_id and object_id are required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
std::string sum_result = catalog->get_llm_summary(run_id, object_id, agent_run_id, latest);
try {
@ -2121,6 +2188,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.relationship_upsert") {
std::string target_id = json_string(arguments, "target_id");
int agent_run_id = json_int(arguments, "agent_run_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
int child_object_id = json_int(arguments, "child_object_id");
@ -2135,15 +2203,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
evidence_json = arguments["evidence"].dump();
}
if (agent_run_id <= 0 || run_id_or_schema.empty() || child_object_id <= 0 || parent_object_id <= 0) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (agent_run_id <= 0 || run_id_or_schema.empty() || child_object_id <= 0 || parent_object_id <= 0) {
result = create_error_response("agent_run_id, run_id, child_object_id, and parent_object_id are required");
} else if (child_column.empty() || parent_column.empty()) {
result = create_error_response("child_column and parent_column are required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
int rc = catalog->upsert_llm_relationship(
agent_run_id, run_id, child_object_id, child_column,
@ -2161,6 +2231,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.domain_upsert") {
std::string target_id = json_string(arguments, "target_id");
int agent_run_id = json_int(arguments, "agent_run_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string domain_key = json_string(arguments, "domain_key");
@ -2168,13 +2239,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
std::string description = json_string(arguments, "description");
double confidence = json_double(arguments, "confidence", 0.6);
if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) {
result = create_error_response("agent_run_id, run_id, and domain_key are required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
int domain_id = catalog->upsert_llm_domain(
agent_run_id, run_id, domain_key, title, description, confidence
@ -2192,6 +2265,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.domain_set_members") {
std::string target_id = json_string(arguments, "target_id");
int agent_run_id = json_int(arguments, "agent_run_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string domain_key = json_string(arguments, "domain_key");
@ -2208,7 +2282,9 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
}
if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) {
result = create_error_response("agent_run_id, run_id, and domain_key are required");
} else if (members_json.empty()) {
proxy_error("llm.domain_set_members: members not provided or invalid type (got: %s)\n",
@ -2216,9 +2292,9 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
result = create_error_response("members array is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
proxy_debug(PROXY_DEBUG_GENERIC, 3, "llm.domain_set_members: setting members='%s'\n", members_json.c_str());
int rc = catalog->set_domain_members(agent_run_id, run_id, domain_key, members_json);
@ -2236,6 +2312,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.metric_upsert") {
std::string target_id = json_string(arguments, "target_id");
int agent_run_id = json_int(arguments, "agent_run_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string metric_key = json_string(arguments, "metric_key");
@ -2253,13 +2330,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
double confidence = json_double(arguments, "confidence", 0.6);
if (agent_run_id <= 0 || run_id_or_schema.empty() || metric_key.empty() || title.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (agent_run_id <= 0 || run_id_or_schema.empty() || metric_key.empty() || title.empty()) {
result = create_error_response("agent_run_id, run_id, metric_key, and title are required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
int metric_id = catalog->upsert_llm_metric(
agent_run_id, run_id, metric_key, title, description, domain_key,
@ -2278,6 +2357,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.question_template_add") {
std::string target_id = json_string(arguments, "target_id");
int agent_run_id = json_int(arguments, "agent_run_id", 0); // Optional, default 0
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string title = json_string(arguments, "title");
@ -2297,15 +2377,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
related_objects = arguments["related_objects"].dump();
}
if (run_id_or_schema.empty() || title.empty() || question_nl.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty() || title.empty() || question_nl.empty()) {
result = create_error_response("run_id, title, and question_nl are required");
} else if (template_json.empty()) {
result = create_error_response("template is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
// If agent_run_id not provided, get the last one for this run_id
if (agent_run_id <= 0) {
@ -2336,6 +2418,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.note_add") {
std::string target_id = json_string(arguments, "target_id");
int agent_run_id = json_int(arguments, "agent_run_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string scope = json_string(arguments, "scope");
@ -2349,13 +2432,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
tags_json = arguments["tags"].dump();
}
if (agent_run_id <= 0 || run_id_or_schema.empty() || scope.empty() || body.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (agent_run_id <= 0 || run_id_or_schema.empty() || scope.empty() || body.empty()) {
result = create_error_response("agent_run_id, run_id, scope, and body are required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
int note_id = catalog->add_llm_note(
agent_run_id, run_id, scope, object_id, domain_key, title, body, tags_json
@ -2372,18 +2457,21 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
else if (tool_name == "llm.search") {
std::string target_id = json_string(arguments, "target_id");
std::string run_id_or_schema = json_string(arguments, "run_id");
std::string query = json_string(arguments, "query");
int limit = json_int(arguments, "limit", 25);
bool include_objects = json_int(arguments, "include_objects", 0) != 0;
if (run_id_or_schema.empty()) {
if (target_id.empty()) {
result = create_error_response("target_id is required");
} else if (run_id_or_schema.empty()) {
result = create_error_response("run_id is required");
} else {
// Resolve schema name to run_id if needed
int run_id = catalog->resolve_run_id(run_id_or_schema);
int run_id = catalog->resolve_run_id(target_id, run_id_or_schema);
if (run_id < 0) {
result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema);
result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema);
} else {
// Log the search query
catalog->log_llm_search(run_id, query, limit);
@ -2513,7 +2601,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
uint64_t digest = Discovery_Schema::compute_mcp_digest(tool_name, arguments);
std::string digest_text = Discovery_Schema::fingerprint_mcp_args(arguments);
unsigned long long duration = monotonic_time() - start_time;
int digest_run_id = schema.empty() ? 0 : catalog->resolve_run_id(schema);
int digest_run_id = schema.empty() ? 0 : catalog->resolve_run_id(target->target_id, schema);
catalog->update_mcp_query_digest(
tool_name,
digest_run_id,
@ -2654,8 +2742,11 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
// Log tool invocation to catalog
int run_id = 0;
std::string run_id_str = json_string(arguments, "run_id");
std::string run_target_id = json_string(arguments, "target_id");
if (!run_id_str.empty()) {
run_id = catalog->resolve_run_id(run_id_str);
if (!run_target_id.empty()) {
run_id = catalog->resolve_run_id(run_target_id, run_id_str);
}
}
// Extract error message if present

@ -389,7 +389,7 @@ std::string Static_Harvester::escape_sql_string(const std::string& str) {
// - Only one run can be active at a time per harvester instance
// - Automatically connects to MySQL if not already connected
// - Records source DSN and MySQL version in the run metadata
int Static_Harvester::start_run(const std::string& notes) {
int Static_Harvester::start_run(const std::string& target_id, const std::string& notes) {
if (current_run_id >= 0) {
proxy_error("Static_Harvester: Run already active (run_id=%d)\n", current_run_id);
return -1;
@ -399,7 +399,7 @@ int Static_Harvester::start_run(const std::string& notes) {
return -1;
}
current_run_id = catalog->create_run(source_dsn, mysql_version, notes);
current_run_id = catalog->create_run(target_id, "mysql", source_dsn, mysql_version, notes);
if (current_run_id < 0) {
proxy_error("Static_Harvester: Failed to create run\n");
return -1;
@ -1278,8 +1278,8 @@ int Static_Harvester::rebuild_fts_index() {
//
// Returns:
// run_id on success, -1 on error
int Static_Harvester::run_full_harvest(const std::string& only_schema, const std::string& notes) {
if (start_run(notes) < 0) {
int Static_Harvester::run_full_harvest(const std::string& target_id, const std::string& only_schema, const std::string& notes) {
if (start_run(target_id, notes) < 0) {
return -1;
}

@ -1,6 +1,6 @@
# MCP Module Testing Suite
This directory contains scripts to test the ProxySQL MCP (Model Context Protocol) module with MySQL connection pool and exploration tools.
This directory contains scripts to test the ProxySQL MCP (Model Context Protocol) module with target-profile routing and exploration tools across MySQL and PostgreSQL backends.
## Table of Contents
@ -11,6 +11,8 @@ This directory contains scripts to test the ProxySQL MCP (Model Context Protocol
5. [Detailed Documentation](#detailed-documentation)
6. [Troubleshooting](#troubleshooting)
> Note: parts of this README still show legacy single-MySQL examples (`mcp-mysql_*`). The current MCP routing model uses target/auth profiles (`runtime_mcp_target_profiles`, `runtime_mcp_auth_profiles`) and `target_id` in query/discovery/catalog/llm workflows.
---
## Architecture Overview
@ -32,7 +34,7 @@ MCP (Model Context Protocol) is a JSON-RPC 2.0 protocol that allows AI/LLM appli
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ ProxySQL Admin Interface (Port 6032) │ │
│ │ Configure: mcp-enabled, mcp-mysql_hosts, mcp-port, etc. │ │
│ │ Configure: mcp-enabled, mcp-port, MCP profiles, etc. │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────┐ │
@ -151,7 +153,7 @@ Where:
| **Relationships** | `suggest_joins`, `find_reference_candidates` | Infer table relationships |
| **Profiling** | `table_profile`, `column_profile` | Analyze data distributions and statistics |
| **Catalog** | `catalog_upsert`, `catalog_get`, `catalog_search`, `catalog_delete`, `catalog_list`, `catalog_merge` | Store/retrieve LLM discoveries |
| **Discovery** | `discovery.run_static` | Run Phase 1 of two-phase discovery |
| **Discovery** | `discovery.run_static` | Run Phase 1 of two-phase discovery for a specific `target_id` |
| **Agent Coordination** | `agent.run_start`, `agent.run_finish`, `agent.event_append` | Coordinate LLM agent discovery runs |
| **LLM Interaction** | `llm.summary_upsert`, `llm.summary_get`, `llm.relationship_upsert`, `llm.domain_upsert`, `llm.domain_set_members`, `llm.metric_upsert`, `llm.question_template_add`, `llm.note_add`, `llm.search` | Store and retrieve LLM-generated insights |
| **RAG** | `rag.search_fts`, `rag.search_vector`, `rag.search_hybrid`, `rag.get_chunks`, `rag.get_docs`, `rag.fetch_from_source`, `rag.admin.stats` | Retrieval-Augmented Generation tools |
@ -179,11 +181,8 @@ Where:
| `mcp-cache_endpoint_auth` | (empty) | Auth token for /cache endpoint |
| `mcp-ai_endpoint_auth` | (empty) | Auth token for /ai endpoint |
| `mcp-timeout_ms` | 30000 | Query timeout in milliseconds |
| `mcp-mysql_hosts` | 127.0.0.1 | MySQL server(s) for tool execution |
| `mcp-mysql_ports` | 3306 | MySQL port(s) |
| `mcp-mysql_user` | (empty) | MySQL username for connections |
| `mcp-mysql_password` | (empty) | MySQL password for connections |
| `mcp-mysql_schema` | (empty) | Default schema for connections |
| `runtime_mcp_target_profiles` | n/a | Logical targets (`target_id`, protocol, hostgroup, auth profile) |
| `runtime_mcp_auth_profiles` | n/a | Backend credentials bound to target profiles |
**RAG Configuration Variables:**
@ -219,11 +218,11 @@ Where:
- **Observe_Tool_Handler** - Monitoring and metrics
- **AI_Tool_Handler** - AI and LLM features
### 3. MySQL Connection Pools
### 3. Protocol-Aware Connection Pools
**Location:** Each Tool_Handler manages its own connection pool
**Purpose:** Manages reusable connections to backend MySQL servers for tool execution.
**Purpose:** Manages reusable connections to backend MySQL/PostgreSQL targets for tool execution.
**Features:**
- Thread-safe connection pooling with `pthread_mutex_t`

@ -98,13 +98,13 @@ Once connected, the following tools will be available in Claude Code:
- `find_reference_candidates` - Find potential foreign key relationships
### Two-Phase Discovery Tools
- `discovery.run_static` - Run Phase 1 of two-phase discovery (static harvest)
- `agent.run_start` - Start a new agent run for discovery coordination
- `discovery.run_static` - Run Phase 1 of two-phase discovery (static harvest), requires `target_id`
- `agent.run_start` - Start a new agent run for discovery coordination, requires `target_id` + `run_id`
- `agent.run_finish` - Mark an agent run as completed
- `agent.event_append` - Append an event to an agent run
### LLM Interaction Tools
- `llm.summary_upsert` - Store or update a table/column summary generated by LLM
- `llm.summary_upsert` - Store or update a table/column summary generated by LLM (`target_id` + `run_id`)
- `llm.summary_get` - Retrieve LLM-generated summary for a table or column
- `llm.relationship_upsert` - Store or update an inferred relationship between tables
- `llm.domain_upsert` - Store or update a business domain classification
@ -114,6 +114,8 @@ Once connected, the following tools will be available in Claude Code:
- `llm.note_add` - Add a general note or insight about the data
- `llm.search` - Search LLM-generated content and insights
Note: catalog/agent/llm tools are target-scoped. Always include `target_id` when a tool accepts `run_id`.
### Catalog Tools
- `catalog_upsert` - Store data in the catalog
- `catalog_get` - Retrieve from the catalog

Loading…
Cancel
Save