diff --git a/doc/multi_agent_database_discovery.md b/doc/multi_agent_database_discovery.md index 69c016003..a29aa0f97 100644 --- a/doc/multi_agent_database_discovery.md +++ b/doc/multi_agent_database_discovery.md @@ -207,6 +207,17 @@ The agents use these MCP tools for database analysis: - `catalog_upsert` - Store findings in catalog - `catalog_list` / `catalog_get` - Retrieve findings from catalog +### Target Scoping Requirement + +Discovery and catalog/LLM tools are target-scoped. Always pass `target_id`: + +- `discovery.run_static(target_id=..., schema_filter=...)` +- `catalog.*(target_id=..., run_id=...)` +- `agent.run_start(target_id=..., run_id=...)` +- `llm.*(target_id=..., run_id=...)` + +`run_id` resolution is no longer global. The same schema name can exist on multiple targets, so `target_id` is required to resolve the correct discovery run. + ## Benefits of Multi-Agent Approach 1. **Parallel Execution**: All 4 agents run simultaneously diff --git a/include/Discovery_Schema.h b/include/Discovery_Schema.h index 9d9be4ffb..d39d7f3a0 100644 --- a/include/Discovery_Schema.h +++ b/include/Discovery_Schema.h @@ -206,27 +206,32 @@ public: void close(); /** - * @brief Resolve schema name or run_id to a run_id + * @brief Resolve schema name or run_id to a run_id within a target scope * * If input is a numeric run_id, returns it as-is. * If input is a schema name, finds the latest run_id for that schema. * + * @param target_id Required target scope identifier * @param run_id_or_schema Either a numeric run_id or a schema name * @return run_id on success, -1 if schema not found */ - int resolve_run_id(const std::string& run_id_or_schema); + int resolve_run_id(const std::string& target_id, const std::string& run_id_or_schema); /** * @brief Create a new discovery run * + * @param target_id Logical target identifier that produced this run + * @param protocol Backend protocol for this run (mysql|pgsql) * @param source_dsn Data source identifier (e.g., "mysql://host:port/") - * @param mysql_version MySQL server version + * @param server_version Backend server version string * @param notes Optional notes for this run * @return run_id on success, -1 on error */ int create_run( + const std::string& target_id, + const std::string& protocol, const std::string& source_dsn, - const std::string& mysql_version, + const std::string& server_version, const std::string& notes = "" ); diff --git a/include/PgSQL_Static_Harvester.h b/include/PgSQL_Static_Harvester.h new file mode 100644 index 000000000..17dda782f --- /dev/null +++ b/include/PgSQL_Static_Harvester.h @@ -0,0 +1,70 @@ +#ifndef CLASS_PGSQL_STATIC_HARVESTER_H +#define CLASS_PGSQL_STATIC_HARVESTER_H + +#ifdef PROXYSQLGENAI + +#include "Discovery_Schema.h" +#include +#include +#include +#include + +typedef struct pg_conn PGconn; + +class PgSQL_Static_Harvester { +private: + std::string pgsql_host; + int pgsql_port; + std::string pgsql_user; + std::string pgsql_password; + std::string pgsql_dbname; + PGconn* pgsql_conn; + pthread_mutex_t conn_lock; + + Discovery_Schema* catalog; + + int current_run_id; + std::string source_dsn; + std::string server_version; + + int connect_pgsql(); + void disconnect_pgsql(); + int execute_query(const std::string& query, std::vector>& results); + std::string get_pgsql_version(); + static bool is_valid_schema_name(const std::string& name); + static std::string escape_sql_string(const std::string& str); + +public: + PgSQL_Static_Harvester( + const std::string& host, + int port, + const std::string& user, + const std::string& password, + const std::string& dbname, + const std::string& catalog_path + ); + ~PgSQL_Static_Harvester(); + + int init(); + void close(); + + int start_run(const std::string& target_id, const std::string& notes = ""); + int finish_run(const std::string& notes = ""); + int get_run_id() const { return current_run_id; } + + int harvest_schemas(const std::string& only_schema = ""); + int harvest_objects(const std::string& only_schema = ""); + int harvest_columns(const std::string& only_schema = ""); + int harvest_indexes(const std::string& only_schema = ""); + int harvest_foreign_keys(const std::string& only_schema = ""); + int harvest_view_definitions(const std::string& only_schema = ""); + int build_quick_profiles(); + int rebuild_fts_index(); + + int run_full_harvest(const std::string& target_id, const std::string& only_schema = "", const std::string& notes = ""); + std::string get_harvest_stats(); + std::string get_harvest_stats(int run_id); +}; + +#endif /* PROXYSQLGENAI */ +#endif /* CLASS_PGSQL_STATIC_HARVESTER_H */ diff --git a/include/Query_Tool_Handler.h b/include/Query_Tool_Handler.h index d6e4045ca..6bace7dbd 100644 --- a/include/Query_Tool_Handler.h +++ b/include/Query_Tool_Handler.h @@ -9,6 +9,7 @@ // Forward declaration to avoid circular include class Static_Harvester; +class PgSQL_Static_Harvester; /** * @brief Query Tool Handler for /mcp/query endpoint @@ -43,17 +44,12 @@ private: bool executable; ///< True if current handler can execute against this target }; - // MySQL connection configuration - std::string mysql_hosts; - std::string mysql_ports; - std::string mysql_user; - std::string mysql_password; - std::string mysql_schema; std::string default_target_id; // Discovery components (NEW - replaces MySQL_Tool_Handler wrapper) Discovery_Schema* catalog; ///< Discovery catalog (replaces old MySQL_Catalog) - Static_Harvester* harvester; ///< Static harvester for Phase 1 + Static_Harvester* mysql_harvester; ///< MySQL static harvester for Phase 1 + PgSQL_Static_Harvester* pgsql_harvester; ///< PostgreSQL static harvester for Phase 1 // Connection pool for MySQL queries struct MySQLConnection { @@ -218,11 +214,6 @@ public: * @brief Constructor (creates catalog and harvester) */ Query_Tool_Handler( - const std::string& hosts, - const std::string& ports, - const std::string& user, - const std::string& password, - const std::string& schema, const std::string& catalog_path ); @@ -247,7 +238,7 @@ public: /** * @brief Get the static harvester */ - Static_Harvester* get_harvester() const { return harvester; } + Static_Harvester* get_harvester() const { return mysql_harvester; } /** * @brief Get tool usage statistics (thread-safe copy) diff --git a/include/Static_Harvester.h b/include/Static_Harvester.h index 2b3fd7507..bf38eaaf7 100644 --- a/include/Static_Harvester.h +++ b/include/Static_Harvester.h @@ -155,7 +155,7 @@ public: * @param notes Optional notes for this run * @return run_id on success, -1 on error */ - int start_run(const std::string& notes = ""); + int start_run(const std::string& target_id, const std::string& notes = ""); /** * @brief Finish the current discovery run @@ -281,7 +281,7 @@ public: * @param notes Optional run notes * @return run_id on success, -1 on error */ - int run_full_harvest(const std::string& only_schema = "", const std::string& notes = ""); + int run_full_harvest(const std::string& target_id, const std::string& only_schema = "", const std::string& notes = ""); /** * @brief Get harvest statistics diff --git a/include/cpp.h b/include/cpp.h index 1fcb65c1d..0198a350f 100644 --- a/include/cpp.h +++ b/include/cpp.h @@ -51,6 +51,7 @@ #include "ProxySQL_MCP_Server.hpp" #include "Query_Tool_Handler.h" #include "RAG_Tool_Handler.h" +#include "PgSQL_Static_Harvester.h" #include "Static_Harvester.h" #endif /* PROXYSQLGENAI */ diff --git a/lib/Discovery_Schema.cpp b/lib/Discovery_Schema.cpp index a6a425071..f62c76911 100644 --- a/lib/Discovery_Schema.cpp +++ b/lib/Discovery_Schema.cpp @@ -84,21 +84,53 @@ void Discovery_Schema::close() { } } -int Discovery_Schema::resolve_run_id(const std::string& run_id_or_schema) { +int Discovery_Schema::resolve_run_id(const std::string& target_id, const std::string& run_id_or_schema) { + if (target_id.empty()) { + proxy_warning("resolve_run_id called without target_id\n"); + return -1; + } + // If it's already a number (run_id), return it if (!run_id_or_schema.empty() && std::isdigit(run_id_or_schema[0])) { - return std::stoi(run_id_or_schema); + int run_id = std::stoi(run_id_or_schema); + char* error = NULL; + int cols = 0, affected = 0; + SQLite3_result* resultset = NULL; + std::string esc_target_id = target_id; + replace_str(esc_target_id, "'", "''"); + std::ostringstream verify_sql; + verify_sql << "SELECT 1 FROM runs WHERE run_id=" << run_id + << " AND target_id='" << esc_target_id << "' LIMIT 1;"; + db->execute_statement(verify_sql.str().c_str(), &error, &cols, &affected, &resultset); + if (error) { + proxy_error("Failed to validate run_id '%d' for target '%s': %s\n", run_id, target_id.c_str(), error); + free(error); + if (resultset) { + delete resultset; + } + return -1; + } + bool found = (resultset && resultset->rows_count > 0); + if (resultset) { + delete resultset; + } + return found ? run_id : -1; } // It's a schema name - find the latest run_id for this schema char* error = NULL; int cols = 0, affected = 0; SQLite3_result* resultset = NULL; + std::string esc_target_id = target_id; + std::string esc_run_id_or_schema = run_id_or_schema; + replace_str(esc_target_id, "'", "''"); + replace_str(esc_run_id_or_schema, "'", "''"); std::ostringstream sql; sql << "SELECT r.run_id FROM runs r " << "INNER JOIN schemas s ON s.run_id = r.run_id " - << "WHERE s.schema_name = '" << run_id_or_schema << "' " + << "WHERE r.target_id = '" << esc_target_id << "' " + << "AND s.schema_name = '" << esc_run_id_or_schema << "' " << "ORDER BY r.started_at DESC LIMIT 1;"; db->execute_statement(sql.str().c_str(), &error, &cols, &affected, &resultset); @@ -111,7 +143,7 @@ int Discovery_Schema::resolve_run_id(const std::string& run_id_or_schema) { if (!resultset || resultset->rows_count == 0) { proxy_warning("No run found for schema '%s'\n", run_id_or_schema.c_str()); if (resultset) { - free(resultset); + delete resultset; resultset = NULL; } return -1; @@ -120,7 +152,7 @@ int Discovery_Schema::resolve_run_id(const std::string& run_id_or_schema) { SQLite3_row* row = resultset->rows[0]; int run_id = atoi(row->fields[0]); - free(resultset); + delete resultset; return run_id; } @@ -128,6 +160,77 @@ int Discovery_Schema::init_schema() { // Enable foreign keys db->execute("PRAGMA foreign_keys = ON"); + // The prototype schema had MySQL-only runs metadata. Backward compatibility is intentionally + // not preserved: if we detect legacy runs layout, rebuild catalog tables from scratch. + { + char* error = NULL; + int cols = 0, affected = 0; + SQLite3_result* resultset = NULL; + db->execute_statement("PRAGMA table_info(runs);", &error, &cols, &affected, &resultset); + if (error) { + proxy_error("Discovery_Schema: failed reading runs table layout: %s\n", error); + free(error); + if (resultset) { + delete resultset; + } + return -1; + } + + bool has_rows = (resultset && !resultset->rows.empty()); + bool has_target_id = false; + bool has_protocol = false; + bool has_server_version = false; + for (size_t i = 0; resultset && i < resultset->rows.size(); i++) { + SQLite3_row* row = resultset->rows[i]; + const char* col_name = (row->cnt > 1 && row->fields[1]) ? row->fields[1] : ""; + if (!strcmp(col_name, "target_id")) { + has_target_id = true; + } else if (!strcmp(col_name, "protocol")) { + has_protocol = true; + } else if (!strcmp(col_name, "server_version")) { + has_server_version = true; + } + } + if (resultset) { + delete resultset; + } + + if (has_rows && (!has_target_id || !has_protocol || !has_server_version)) { + proxy_warning("Discovery_Schema: legacy catalog schema detected, rebuilding catalog tables\n"); + db->execute("PRAGMA foreign_keys = OFF"); + db->execute("DROP TABLE IF EXISTS schema_docs"); + db->execute("DROP TABLE IF EXISTS query_tool_calls"); + db->execute("DROP TABLE IF EXISTS rag_search_log"); + db->execute("DROP TABLE IF EXISTS llm_search_log"); + db->execute("DROP TABLE IF EXISTS llm_notes"); + db->execute("DROP TABLE IF EXISTS llm_question_templates"); + db->execute("DROP TABLE IF EXISTS llm_metrics"); + db->execute("DROP TABLE IF EXISTS llm_domain_members"); + db->execute("DROP TABLE IF EXISTS llm_domains"); + db->execute("DROP TABLE IF EXISTS llm_relationships"); + db->execute("DROP TABLE IF EXISTS llm_object_summaries"); + db->execute("DROP TABLE IF EXISTS agent_events"); + db->execute("DROP TABLE IF EXISTS agent_runs"); + db->execute("DROP TABLE IF EXISTS stats_mcp_query_digest_reset"); + db->execute("DROP TABLE IF EXISTS stats_mcp_query_digest"); + db->execute("DROP TABLE IF EXISTS mcp_query_rules"); + db->execute("DROP TABLE IF EXISTS profiles"); + db->execute("DROP TABLE IF EXISTS inferred_relationships"); + db->execute("DROP TABLE IF EXISTS view_dependencies"); + db->execute("DROP TABLE IF EXISTS foreign_key_columns"); + db->execute("DROP TABLE IF EXISTS foreign_keys"); + db->execute("DROP TABLE IF EXISTS index_columns"); + db->execute("DROP TABLE IF EXISTS indexes"); + db->execute("DROP TABLE IF EXISTS columns"); + db->execute("DROP TABLE IF EXISTS objects"); + db->execute("DROP TABLE IF EXISTS schemas"); + db->execute("DROP TABLE IF EXISTS runs"); + db->execute("DROP TABLE IF EXISTS fts_objects"); + db->execute("DROP TABLE IF EXISTS fts_llm"); + db->execute("PRAGMA foreign_keys = ON"); + } + } + // Create all tables int rc = create_deterministic_tables(); if (rc) { @@ -166,13 +269,16 @@ int Discovery_Schema::create_deterministic_tables() { db->execute( "CREATE TABLE IF NOT EXISTS runs (" " run_id INTEGER PRIMARY KEY , " + " target_id TEXT NOT NULL , " + " protocol TEXT NOT NULL CHECK(protocol IN ('mysql','pgsql')) , " " started_at TEXT NOT NULL DEFAULT (datetime('now')) , " " finished_at TEXT , " " source_dsn TEXT , " - " mysql_version TEXT , " + " server_version TEXT , " " notes TEXT" ");" ); + db->execute("CREATE INDEX IF NOT EXISTS idx_runs_target_started ON runs(target_id, started_at DESC);"); // Schemas table db->execute( @@ -650,20 +756,24 @@ int Discovery_Schema::create_fts_tables() { // ============================================================================ int Discovery_Schema::create_run( + const std::string& target_id, + const std::string& protocol, const std::string& source_dsn, - const std::string& mysql_version, + const std::string& server_version, const std::string& notes ) { sqlite3_stmt* stmt = NULL; - const char* sql = "INSERT INTO runs(source_dsn, mysql_version, notes) VALUES(?1, ?2 , ?3);"; + const char* sql = "INSERT INTO runs(target_id, protocol, source_dsn, server_version, notes) VALUES(?1, ?2, ?3, ?4, ?5);"; auto [rc, stmt_unique] = db->prepare_v2(sql); stmt = stmt_unique.get(); if (rc != SQLITE_OK) return -1; - (*proxy_sqlite3_bind_text)(stmt, 1, source_dsn.c_str(), -1, SQLITE_TRANSIENT); - (*proxy_sqlite3_bind_text)(stmt, 2, mysql_version.c_str(), -1, SQLITE_TRANSIENT); - (*proxy_sqlite3_bind_text)(stmt, 3, notes.c_str(), -1, SQLITE_TRANSIENT); + (*proxy_sqlite3_bind_text)(stmt, 1, target_id.c_str(), -1, SQLITE_TRANSIENT); + (*proxy_sqlite3_bind_text)(stmt, 2, protocol.c_str(), -1, SQLITE_TRANSIENT); + (*proxy_sqlite3_bind_text)(stmt, 3, source_dsn.c_str(), -1, SQLITE_TRANSIENT); + (*proxy_sqlite3_bind_text)(stmt, 4, server_version.c_str(), -1, SQLITE_TRANSIENT); + (*proxy_sqlite3_bind_text)(stmt, 5, notes.c_str(), -1, SQLITE_TRANSIENT); SAFE_SQLITE3_STEP2(stmt); int run_id = (int)(*proxy_sqlite3_last_insert_rowid)(db->get_db()); @@ -693,7 +803,7 @@ std::string Discovery_Schema::get_run_info(int run_id) { SQLite3_result* resultset = NULL; std::ostringstream sql; - sql << "SELECT run_id, started_at, finished_at, source_dsn, mysql_version , notes " + sql << "SELECT run_id, target_id, protocol, started_at, finished_at, source_dsn, server_version , notes " << "FROM runs WHERE run_id = " << run_id << ";"; db->execute_statement(sql.str().c_str(), &error, &cols, &affected, &resultset); @@ -702,11 +812,13 @@ std::string Discovery_Schema::get_run_info(int run_id) { if (resultset && !resultset->rows.empty()) { SQLite3_row* row = resultset->rows[0]; result["run_id"] = run_id; - result["started_at"] = std::string(row->fields[0] ? row->fields[0] : ""); - result["finished_at"] = std::string(row->fields[1] ? row->fields[1] : ""); - result["source_dsn"] = std::string(row->fields[2] ? row->fields[2] : ""); - result["mysql_version"] = std::string(row->fields[3] ? row->fields[3] : ""); - result["notes"] = std::string(row->fields[4] ? row->fields[4] : ""); + result["target_id"] = std::string(row->fields[1] ? row->fields[1] : ""); + result["protocol"] = std::string(row->fields[2] ? row->fields[2] : ""); + result["started_at"] = std::string(row->fields[3] ? row->fields[3] : ""); + result["finished_at"] = std::string(row->fields[4] ? row->fields[4] : ""); + result["source_dsn"] = std::string(row->fields[5] ? row->fields[5] : ""); + result["server_version"] = std::string(row->fields[6] ? row->fields[6] : ""); + result["notes"] = std::string(row->fields[7] ? row->fields[7] : ""); } else { result["error"] = "Run not found"; } diff --git a/lib/Makefile b/lib/Makefile index 6dcce9f39..d4b3a1ba9 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -96,7 +96,7 @@ _OBJ_CXX += GenAI_Thread.oo \ Admin_Tool_Handler.oo Cache_Tool_Handler.oo Observe_Tool_Handler.oo \ AI_Features_Manager.oo LLM_Bridge.oo LLM_Clients.oo Anomaly_Detector.oo AI_Vector_Storage.oo AI_Tool_Handler.oo \ RAG_Tool_Handler.oo \ - Discovery_Schema.oo Static_Harvester.oo + Discovery_Schema.oo Static_Harvester.oo PgSQL_Static_Harvester.oo endif OBJ_CXX := $(patsubst %,$(ODIR)/%,$(_OBJ_CXX)) diff --git a/lib/PgSQL_Static_Harvester.cpp b/lib/PgSQL_Static_Harvester.cpp new file mode 100644 index 000000000..0d9401f4b --- /dev/null +++ b/lib/PgSQL_Static_Harvester.cpp @@ -0,0 +1,699 @@ +#ifdef PROXYSQLGENAI + +#include "PgSQL_Static_Harvester.h" +#include "proxysql.h" +#include "cpp.h" + +#include +#include +#include +#include +#include +#include "../deps/json/json.hpp" + +using json = nlohmann::json; + +PgSQL_Static_Harvester::PgSQL_Static_Harvester( + const std::string& host, + int port, + const std::string& user, + const std::string& password, + const std::string& dbname, + const std::string& catalog_path +) : pgsql_host(host), + pgsql_port(port), + pgsql_user(user), + pgsql_password(password), + pgsql_dbname(dbname), + pgsql_conn(NULL), + catalog(NULL), + current_run_id(-1) +{ + pthread_mutex_init(&conn_lock, NULL); + catalog = new Discovery_Schema(catalog_path); +} + +PgSQL_Static_Harvester::~PgSQL_Static_Harvester() { + close(); + if (catalog) { + delete catalog; + catalog = NULL; + } + pthread_mutex_destroy(&conn_lock); +} + +int PgSQL_Static_Harvester::init() { + if (catalog->init()) { + proxy_error("PgSQL_Static_Harvester: Failed to initialize catalog\n"); + return -1; + } + return 0; +} + +void PgSQL_Static_Harvester::close() { + disconnect_pgsql(); +} + +int PgSQL_Static_Harvester::connect_pgsql() { + pthread_mutex_lock(&conn_lock); + + if (pgsql_conn) { + pthread_mutex_unlock(&conn_lock); + return 0; + } + + std::ostringstream conninfo; + conninfo << "host=" << pgsql_host + << " port=" << pgsql_port + << " user=" << pgsql_user + << " password=" << pgsql_password + << " connect_timeout=30"; + if (!pgsql_dbname.empty()) { + conninfo << " dbname=" << pgsql_dbname; + } + + pgsql_conn = PQconnectdb(conninfo.str().c_str()); + if (pgsql_conn == NULL || PQstatus(pgsql_conn) != CONNECTION_OK) { + proxy_error("PgSQL_Static_Harvester: PQconnectdb failed: %s\n", + pgsql_conn ? PQerrorMessage(pgsql_conn) : "NULL connection"); + if (pgsql_conn) { + PQfinish(pgsql_conn); + pgsql_conn = NULL; + } + pthread_mutex_unlock(&conn_lock); + return -1; + } + + server_version = get_pgsql_version(); + source_dsn = "pgsql://" + pgsql_user + "@" + pgsql_host + ":" + std::to_string(pgsql_port) + "/" + pgsql_dbname; + + proxy_info("PgSQL_Static_Harvester: Connected to PostgreSQL %s at %s:%d\n", + server_version.c_str(), pgsql_host.c_str(), pgsql_port); + + pthread_mutex_unlock(&conn_lock); + return 0; +} + +void PgSQL_Static_Harvester::disconnect_pgsql() { + pthread_mutex_lock(&conn_lock); + if (pgsql_conn) { + PQfinish(pgsql_conn); + pgsql_conn = NULL; + } + pthread_mutex_unlock(&conn_lock); +} + +int PgSQL_Static_Harvester::execute_query(const std::string& query, std::vector>& results) { + if (!pgsql_conn) { + proxy_error("PgSQL_Static_Harvester: Not connected to PostgreSQL\n"); + return -1; + } + + PGresult* res = PQexec(pgsql_conn, query.c_str()); + if (!res) { + proxy_error("PgSQL_Static_Harvester: PQexec returned NULL\n"); + return -1; + } + ExecStatusType st = PQresultStatus(res); + if (st != PGRES_TUPLES_OK && st != PGRES_COMMAND_OK) { + proxy_error("PgSQL_Static_Harvester: Query failed: %s\n", PQresultErrorMessage(res)); + PQclear(res); + return -1; + } + if (st == PGRES_COMMAND_OK) { + PQclear(res); + return 0; + } + + int nrows = PQntuples(res); + int ncols = PQnfields(res); + results.clear(); + results.reserve(nrows); + for (int r = 0; r < nrows; r++) { + std::vector row; + row.reserve(ncols); + for (int c = 0; c < ncols; c++) { + row.push_back(PQgetisnull(res, r, c) ? "" : PQgetvalue(res, r, c)); + } + results.push_back(row); + } + PQclear(res); + return 0; +} + +std::string PgSQL_Static_Harvester::get_pgsql_version() { + std::vector> rows; + if (execute_query("SELECT version();", rows) == 0 && !rows.empty() && !rows[0].empty()) { + return rows[0][0]; + } + return ""; +} + +bool PgSQL_Static_Harvester::is_valid_schema_name(const std::string& name) { + if (name.empty()) { + return true; + } + for (char c : name) { + if (!std::isalnum(static_cast(c)) && c != '_' && c != '$') { + return false; + } + } + return true; +} + +std::string PgSQL_Static_Harvester::escape_sql_string(const std::string& str) { + std::string escaped; + escaped.reserve(str.length() * 2); + for (char c : str) { + if (c == '\'') { + escaped += "''"; + } else { + escaped += c; + } + } + return escaped; +} + +int PgSQL_Static_Harvester::start_run(const std::string& target_id, const std::string& notes) { + if (current_run_id >= 0) { + proxy_error("PgSQL_Static_Harvester: Run already active (run_id=%d)\n", current_run_id); + return -1; + } + if (connect_pgsql()) { + return -1; + } + + current_run_id = catalog->create_run(target_id, "pgsql", source_dsn, server_version, notes); + if (current_run_id < 0) { + proxy_error("PgSQL_Static_Harvester: Failed to create run\n"); + return -1; + } + proxy_info("PgSQL_Static_Harvester: Started run_id=%d\n", current_run_id); + return current_run_id; +} + +int PgSQL_Static_Harvester::finish_run(const std::string& notes) { + if (current_run_id < 0) { + proxy_error("PgSQL_Static_Harvester: No active run\n"); + return -1; + } + int rc = catalog->finish_run(current_run_id, notes); + if (rc) { + proxy_error("PgSQL_Static_Harvester: Failed to finish run\n"); + return -1; + } + proxy_info("PgSQL_Static_Harvester: Finished run_id=%d\n", current_run_id); + current_run_id = -1; + return 0; +} + +int PgSQL_Static_Harvester::harvest_schemas(const std::string& only_schema) { + if (current_run_id < 0) { + return -1; + } + if (!is_valid_schema_name(only_schema)) { + return -1; + } + + std::ostringstream sql; + sql << "SELECT schema_name, default_character_set_name, default_collation_name " + << "FROM information_schema.schemata " + << "WHERE schema_name NOT IN ('pg_catalog','information_schema') " + << "AND schema_name NOT LIKE 'pg_toast%'"; + if (!only_schema.empty()) { + sql << " AND schema_name='" << only_schema << "'"; + } + sql << " ORDER BY schema_name"; + + std::vector> rows; + if (execute_query(sql.str(), rows)) { + return -1; + } + + int count = 0; + for (const auto& r : rows) { + if (r.size() < 3) continue; + if (catalog->insert_schema(current_run_id, r[0], r[1], r[2]) >= 0) { + count++; + } + } + return count; +} + +int PgSQL_Static_Harvester::harvest_objects(const std::string& only_schema) { + if (current_run_id < 0) { + return -1; + } + if (!is_valid_schema_name(only_schema)) { + return -1; + } + + std::vector> rows; + std::ostringstream sql_tables; + sql_tables << "SELECT table_schema, table_name, " + << "CASE WHEN table_type='BASE TABLE' THEN 'table' ELSE 'view' END " + << "FROM information_schema.tables " + << "WHERE table_schema NOT IN ('pg_catalog','information_schema') " + << "AND table_schema NOT LIKE 'pg_toast%'"; + if (!only_schema.empty()) { + sql_tables << " AND table_schema='" << only_schema << "'"; + } + if (execute_query(sql_tables.str(), rows)) { + return -1; + } + + int count = 0; + for (const auto& r : rows) { + if (r.size() < 3) continue; + if (catalog->insert_object( + current_run_id, r[0], r[1], r[2], + "", 0, 0, 0, "", "", "", "" + ) >= 0) { + count++; + } + } + + rows.clear(); + std::ostringstream sql_routines; + sql_routines << "SELECT routine_schema, routine_name, 'routine', COALESCE(routine_definition,'') " + << "FROM information_schema.routines " + << "WHERE routine_schema NOT IN ('pg_catalog','information_schema')"; + if (!only_schema.empty()) { + sql_routines << " AND routine_schema='" << only_schema << "'"; + } + if (execute_query(sql_routines.str(), rows) == 0) { + for (const auto& r : rows) { + if (r.size() < 4) continue; + if (catalog->insert_object( + current_run_id, r[0], r[1], r[2], + "", 0, 0, 0, "", "", "", r[3] + ) >= 0) { + count++; + } + } + } + + rows.clear(); + std::ostringstream sql_triggers; + sql_triggers << "SELECT trigger_schema, trigger_name, 'trigger', COALESCE(action_statement,'') " + << "FROM information_schema.triggers " + << "WHERE trigger_schema NOT IN ('pg_catalog','information_schema')"; + if (!only_schema.empty()) { + sql_triggers << " AND trigger_schema='" << only_schema << "'"; + } + if (execute_query(sql_triggers.str(), rows) == 0) { + for (const auto& r : rows) { + if (r.size() < 4) continue; + if (catalog->insert_object( + current_run_id, r[0], r[1], r[2], + "", 0, 0, 0, "", "", "", r[3] + ) >= 0) { + count++; + } + } + } + + return count; +} + +int PgSQL_Static_Harvester::harvest_columns(const std::string& only_schema) { + if (current_run_id < 0) return -1; + if (!is_valid_schema_name(only_schema)) return -1; + + std::ostringstream sql; + sql << "SELECT table_schema, table_name, ordinal_position, column_name, data_type, " + << "COALESCE(udt_name,''), CASE WHEN is_nullable='YES' THEN 1 ELSE 0 END, " + << "COALESCE(column_default,''), '', COALESCE(character_set_name,''), " + << "COALESCE(collation_name,''), COALESCE(column_name,'') " + << "FROM information_schema.columns " + << "WHERE table_schema NOT IN ('pg_catalog','information_schema') " + << "AND table_schema NOT LIKE 'pg_toast%'"; + if (!only_schema.empty()) { + sql << " AND table_schema='" << only_schema << "'"; + } + sql << " ORDER BY table_schema, table_name, ordinal_position"; + + std::vector> rows; + if (execute_query(sql.str(), rows)) return -1; + + int count = 0; + for (const auto& r : rows) { + if (r.size() < 12) continue; + char* error = NULL; + int cols = 0, affected = 0; + SQLite3_result* resultset = NULL; + std::ostringstream obj_sql; + obj_sql << "SELECT object_id FROM objects " + << "WHERE run_id=" << current_run_id + << " AND schema_name='" << escape_sql_string(r[0]) << "'" + << " AND object_name='" << escape_sql_string(r[1]) << "'" + << " AND object_type IN ('table','view') LIMIT 1;"; + catalog->get_db()->execute_statement(obj_sql.str().c_str(), &error, &cols, &affected, &resultset); + if (error || !resultset || resultset->rows.empty()) { + if (error) { + free(error); + } + if (resultset) { + delete resultset; + } + continue; + } + int object_id = atoi(resultset->rows[0]->fields[0]); + delete resultset; + + int is_time = 0; + if (r[4] == "date" || r[4] == "time" || r[4] == "timestamp" || + r[4] == "timestamptz" || r[4] == "timetz") { + is_time = 1; + } + int is_id_like = (r[3] == "id" || (r[3].size() > 3 && r[3].substr(r[3].size() - 3) == "_id")) ? 1 : 0; + + if (catalog->insert_column( + object_id, + atoi(r[2].c_str()), r[3], r[4], r[5], atoi(r[6].c_str()), + r[7], r[8], r[9], r[10], r[11], + 0, 0, 0, is_time, is_id_like + ) >= 0) { + count++; + } + } + catalog->update_object_flags(current_run_id); + return count; +} + +int PgSQL_Static_Harvester::harvest_indexes(const std::string& only_schema) { + if (current_run_id < 0) return -1; + if (!is_valid_schema_name(only_schema)) return -1; + + std::ostringstream sql; + sql << "SELECT ns.nspname, t.relname, i.relname, " + << "CASE WHEN ix.indisunique THEN 1 ELSE 0 END, " + << "COALESCE(am.amname,''), k.ord, COALESCE(a.attname,''), 0, '', 0 " + << "FROM pg_class t " + << "JOIN pg_namespace ns ON ns.oid=t.relnamespace " + << "JOIN pg_index ix ON ix.indrelid=t.oid " + << "JOIN pg_class i ON i.oid=ix.indexrelid " + << "JOIN pg_am am ON am.oid=i.relam " + << "JOIN LATERAL unnest(ix.indkey) WITH ORDINALITY AS k(attnum, ord) ON TRUE " + << "LEFT JOIN pg_attribute a ON a.attrelid=t.oid AND a.attnum=k.attnum " + << "WHERE t.relkind IN ('r','p') " + << "AND ns.nspname NOT IN ('pg_catalog','information_schema') " + << "AND ns.nspname NOT LIKE 'pg_toast%'"; + if (!only_schema.empty()) { + sql << " AND ns.nspname='" << only_schema << "'"; + } + sql << " ORDER BY ns.nspname, t.relname, i.relname, k.ord"; + + std::vector> rows; + if (execute_query(sql.str(), rows)) return -1; + + int count = 0; + std::map>> grouped; + for (const auto& r : rows) { + if (r.size() < 10) continue; + std::string key = r[0] + "." + r[1] + "." + r[2]; + grouped[key].push_back(r); + } + + for (const auto& entry : grouped) { + const auto& rows_for_index = entry.second; + if (rows_for_index.empty()) continue; + const auto& first = rows_for_index[0]; + + char* error = NULL; + int cols = 0, affected = 0; + SQLite3_result* resultset = NULL; + std::ostringstream obj_sql; + obj_sql << "SELECT object_id FROM objects " + << "WHERE run_id=" << current_run_id + << " AND schema_name='" << escape_sql_string(first[0]) << "'" + << " AND object_name='" << escape_sql_string(first[1]) << "'" + << " AND object_type='table' LIMIT 1;"; + catalog->get_db()->execute_statement(obj_sql.str().c_str(), &error, &cols, &affected, &resultset); + if (error || !resultset || resultset->rows.empty()) { + if (error) { + free(error); + } + if (resultset) { + delete resultset; + } + continue; + } + int object_id = atoi(resultset->rows[0]->fields[0]); + delete resultset; + + int is_primary = (first[2] == "PRIMARY" || first[2] == first[1] + "_pkey") ? 1 : 0; + int index_id = catalog->insert_index( + object_id, first[2], atoi(first[3].c_str()), is_primary, first[4], atol(first[9].c_str()) + ); + if (index_id < 0) { + continue; + } + for (const auto& idx_col : rows_for_index) { + catalog->insert_index_column(index_id, atoi(idx_col[5].c_str()), idx_col[6], atoi(idx_col[7].c_str()), idx_col[8]); + } + count++; + } + + catalog->update_object_flags(current_run_id); + return count; +} + +int PgSQL_Static_Harvester::harvest_foreign_keys(const std::string& only_schema) { + if (current_run_id < 0) return -1; + if (!is_valid_schema_name(only_schema)) return -1; + + std::ostringstream sql; + sql << "SELECT tc.table_schema, tc.table_name, tc.constraint_name, " + << "kcu.column_name, ccu.table_schema, ccu.table_name, ccu.column_name, " + << "kcu.ordinal_position, COALESCE(rc.update_rule,''), COALESCE(rc.delete_rule,'') " + << "FROM information_schema.table_constraints tc " + << "JOIN information_schema.key_column_usage kcu " + << "ON tc.constraint_name=kcu.constraint_name " + << "AND tc.table_schema=kcu.table_schema " + << "JOIN information_schema.constraint_column_usage ccu " + << "ON ccu.constraint_name=tc.constraint_name " + << "AND ccu.constraint_schema=tc.constraint_schema " + << "LEFT JOIN information_schema.referential_constraints rc " + << "ON rc.constraint_name=tc.constraint_name " + << "AND rc.constraint_schema=tc.constraint_schema " + << "WHERE tc.constraint_type='FOREIGN KEY' " + << "AND tc.table_schema NOT IN ('pg_catalog','information_schema') " + << "AND tc.table_schema NOT LIKE 'pg_toast%'"; + if (!only_schema.empty()) { + sql << " AND tc.table_schema='" << only_schema << "'"; + } + sql << " ORDER BY tc.table_schema, tc.table_name, tc.constraint_name, kcu.ordinal_position"; + + std::vector> rows; + if (execute_query(sql.str(), rows)) return -1; + + int count = 0; + int last_fk_id = -1; + std::string last_fk_key; + for (const auto& r : rows) { + if (r.size() < 10) continue; + std::string fk_key = r[0] + "." + r[1] + "." + r[2]; + if (fk_key != last_fk_key) { + char* error = NULL; + int cols = 0, affected = 0; + SQLite3_result* resultset = NULL; + std::ostringstream obj_sql; + obj_sql << "SELECT object_id FROM objects " + << "WHERE run_id=" << current_run_id + << " AND schema_name='" << escape_sql_string(r[0]) << "'" + << " AND object_name='" << escape_sql_string(r[1]) << "'" + << " AND object_type='table' LIMIT 1;"; + catalog->get_db()->execute_statement(obj_sql.str().c_str(), &error, &cols, &affected, &resultset); + if (error || !resultset || resultset->rows.empty()) { + if (error) { + free(error); + } + if (resultset) { + delete resultset; + } + last_fk_id = -1; + last_fk_key.clear(); + continue; + } + int child_object_id = atoi(resultset->rows[0]->fields[0]); + delete resultset; + last_fk_id = catalog->insert_foreign_key( + current_run_id, child_object_id, r[2], r[4], r[5], r[8], r[9] + ); + last_fk_key = fk_key; + } + if (last_fk_id >= 0) { + catalog->insert_foreign_key_column(last_fk_id, atoi(r[7].c_str()), r[3], r[6]); + count++; + } + } + + catalog->update_object_flags(current_run_id); + return count; +} + +int PgSQL_Static_Harvester::harvest_view_definitions(const std::string& only_schema) { + if (current_run_id < 0) return -1; + if (!is_valid_schema_name(only_schema)) return -1; + + std::ostringstream sql; + sql << "SELECT schemaname, viewname, definition FROM pg_views " + << "WHERE schemaname NOT IN ('pg_catalog','information_schema') " + << "AND schemaname NOT LIKE 'pg_toast%'"; + if (!only_schema.empty()) { + sql << " AND schemaname='" << only_schema << "'"; + } + + std::vector> rows; + if (execute_query(sql.str(), rows)) return -1; + + int count = 0; + for (const auto& row : rows) { + if (row.size() < 3) continue; + char* error = NULL; + int cols = 0, affected = 0; + std::ostringstream update_sql; + update_sql << "UPDATE objects SET definition_sql = '" << escape_sql_string(row[2]) << "' " + << "WHERE run_id = " << current_run_id + << " AND schema_name = '" << escape_sql_string(row[0]) << "'" + << " AND object_name = '" << escape_sql_string(row[1]) << "'" + << " AND object_type = 'view';"; + catalog->get_db()->execute_statement(update_sql.str().c_str(), &error, &cols, &affected); + if (error) { + free(error); + } + if (affected > 0) { + count++; + } + } + return count; +} + +int PgSQL_Static_Harvester::build_quick_profiles() { + if (current_run_id < 0) return -1; + + char* error = NULL; + int cols = 0, affected = 0; + SQLite3_result* resultset = NULL; + std::ostringstream sql; + sql << "SELECT object_id, object_name, table_rows_est, data_length, index_length, " + "has_primary_key, has_foreign_keys, has_time_column " + "FROM objects WHERE run_id=" << current_run_id << " AND object_type='table'"; + catalog->get_db()->execute_statement(sql.str().c_str(), &error, &cols, &affected, &resultset); + if (error) { + free(error); + if (resultset) delete resultset; + return -1; + } + + int count = 0; + if (resultset) { + for (auto row : resultset->rows) { + if (row->cnt < 8) continue; + const std::string name = row->fields[1] ? row->fields[1] : ""; + std::string kind = "unknown"; + std::string lname = name; + std::transform(lname.begin(), lname.end(), lname.begin(), ::tolower); + if (lname.find("log") != std::string::npos || lname.find("event") != std::string::npos || lname.find("audit") != std::string::npos) { + kind = "log"; + } else if (lname.find("order") != std::string::npos || lname.find("invoice") != std::string::npos || lname.find("payment") != std::string::npos || lname.find("transaction") != std::string::npos) { + kind = "fact"; + } else if (lname.find("user") != std::string::npos || lname.find("customer") != std::string::npos || lname.find("account") != std::string::npos || lname.find("product") != std::string::npos) { + kind = "entity"; + } + + json p; + p["guessed_kind"] = kind; + p["rows_est"] = row->fields[2] ? atoll(row->fields[2]) : 0; + p["size_bytes"] = (row->fields[3] ? atoll(row->fields[3]) : 0) + (row->fields[4] ? atoll(row->fields[4]) : 0); + p["engine"] = "pgsql"; + p["has_primary_key"] = row->fields[5] ? atoi(row->fields[5]) : 0; + p["has_foreign_keys"] = row->fields[6] ? atoi(row->fields[6]) : 0; + p["has_time_column"] = row->fields[7] ? atoi(row->fields[7]) : 0; + + if (catalog->upsert_profile(current_run_id, atoi(row->fields[0]), "table_quick", p.dump()) == 0) { + count++; + } + } + delete resultset; + } + return count; +} + +int PgSQL_Static_Harvester::rebuild_fts_index() { + if (current_run_id < 0) return -1; + return catalog->rebuild_fts_index(current_run_id); +} + +int PgSQL_Static_Harvester::run_full_harvest(const std::string& target_id, const std::string& only_schema, const std::string& notes) { + if (start_run(target_id, notes) < 0) return -1; + if (harvest_schemas(only_schema) < 0) { finish_run("Failed during schema harvest"); return -1; } + if (harvest_objects(only_schema) < 0) { finish_run("Failed during object harvest"); return -1; } + if (harvest_columns(only_schema) < 0) { finish_run("Failed during column harvest"); return -1; } + if (harvest_indexes(only_schema) < 0) { finish_run("Failed during index harvest"); return -1; } + if (harvest_foreign_keys(only_schema) < 0) { finish_run("Failed during foreign key harvest"); return -1; } + if (harvest_view_definitions(only_schema) < 0) { finish_run("Failed during view definition harvest"); return -1; } + if (build_quick_profiles() < 0) { finish_run("Failed during profile building"); return -1; } + if (rebuild_fts_index() < 0) { finish_run("Failed during FTS rebuild"); return -1; } + int final_run_id = current_run_id; + finish_run("Harvest completed successfully"); + return final_run_id; +} + +std::string PgSQL_Static_Harvester::get_harvest_stats() { + if (current_run_id < 0) { + return "{\"error\": \"No active run\"}"; + } + return get_harvest_stats(current_run_id); +} + +std::string PgSQL_Static_Harvester::get_harvest_stats(int run_id) { + char* error = NULL; + int cols = 0, affected = 0; + SQLite3_result* resultset = NULL; + json stats; + stats["run_id"] = run_id; + stats["protocol"] = "pgsql"; + + std::ostringstream q1; + q1 << "SELECT object_type, COUNT(*) FROM objects WHERE run_id=" << run_id << " GROUP BY object_type"; + catalog->get_db()->execute_statement(q1.str().c_str(), &error, &cols, &affected, &resultset); + json objects = json::object(); + if (!error && resultset) { + for (auto row : resultset->rows) { + if (row->cnt >= 2) { + objects[row->fields[0] ? row->fields[0] : "unknown"] = row->fields[1] ? atoi(row->fields[1]) : 0; + } + } + delete resultset; + } else if (error) { + free(error); + error = NULL; + if (resultset) delete resultset; + } + stats["objects"] = objects; + + auto get_count = [&](const std::string& table) -> int { + char* e = NULL; + int c = 0, a = 0; + SQLite3_result* rs = NULL; + std::ostringstream q; + q << "SELECT COUNT(*) FROM " << table << " WHERE run_id=" << run_id; + catalog->get_db()->execute_statement(q.str().c_str(), &e, &c, &a, &rs); + int out = 0; + if (!e && rs && !rs->rows.empty() && rs->rows[0]->cnt > 0 && rs->rows[0]->fields[0]) { + out = atoi(rs->rows[0]->fields[0]); + } + if (e) free(e); + if (rs) delete rs; + return out; + }; + + stats["columns"] = get_count("columns"); + stats["foreign_keys"] = get_count("foreign_keys"); + + return stats.dump(); +} + +#endif /* PROXYSQLGENAI */ diff --git a/lib/ProxySQL_MCP_Server.cpp b/lib/ProxySQL_MCP_Server.cpp index 00195d99d..61946887d 100644 --- a/lib/ProxySQL_MCP_Server.cpp +++ b/lib/ProxySQL_MCP_Server.cpp @@ -97,11 +97,6 @@ ProxySQL_MCP_Server::ProxySQL_MCP_Server(int p, MCP_Threads_Handler* h) std::string catalog_path = std::string(GloVars.datadir) + "/mcp_catalog.db"; handler->query_tool_handler = new Query_Tool_Handler( - handler->variables.mcp_mysql_hosts ? handler->variables.mcp_mysql_hosts : "", - handler->variables.mcp_mysql_ports ? handler->variables.mcp_mysql_ports : "", - handler->variables.mcp_mysql_user ? handler->variables.mcp_mysql_user : "", - handler->variables.mcp_mysql_password ? handler->variables.mcp_mysql_password : "", - handler->variables.mcp_mysql_schema ? handler->variables.mcp_mysql_schema : "", catalog_path.c_str() ); if (handler->query_tool_handler->init() == 0) { diff --git a/lib/Query_Tool_Handler.cpp b/lib/Query_Tool_Handler.cpp index 29ef0b3e9..c1b292e68 100644 --- a/lib/Query_Tool_Handler.cpp +++ b/lib/Query_Tool_Handler.cpp @@ -12,6 +12,7 @@ using json = nlohmann::json; #include "proxysql_debug.h" #include "proxysql_admin.h" #include "Static_Harvester.h" +#include "PgSQL_Static_Harvester.h" #include #include @@ -251,60 +252,28 @@ static std::string escape_string_literal(const std::string& value) { } Query_Tool_Handler::Query_Tool_Handler( - const std::string& hosts, - const std::string& ports, - const std::string& user, - const std::string& password, - const std::string& schema, const std::string& catalog_path) : catalog(NULL), - harvester(NULL), + mysql_harvester(NULL), + pgsql_harvester(NULL), pool_size(0), pg_pool_size(0), max_rows(200), timeout_ms(2000), allow_select_star(false) { - // Parse hosts - std::istringstream h(hosts); - std::string host; - while (std::getline(h, host, ',')) { - host.erase(0, host.find_first_not_of(" \t")); - host.erase(host.find_last_not_of(" \t") + 1); - if (!host.empty()) { - // Store hosts for later - } - } - - // Parse ports - std::istringstream p(ports); - std::string port; - while (std::getline(p, port, ',')) { - port.erase(0, port.find_first_not_of(" \t")); - port.erase(port.find_last_not_of(" \t") + 1); - } - - mysql_hosts = hosts; - mysql_ports = ports; - mysql_user = user; - mysql_password = password; - mysql_schema = schema; - // Initialize pool mutex pthread_mutex_init(&pool_lock, NULL); // Initialize counters mutex pthread_mutex_init(&counters_lock, NULL); - // Create discovery schema and harvester + // Create discovery schema and protocol-specific harvesters. catalog = new Discovery_Schema(catalog_path); - harvester = new Static_Harvester( - hosts.empty() ? "127.0.0.1" : hosts, - ports.empty() ? 3306 : std::stoi(ports), - user, password, schema, catalog_path - ); + mysql_harvester = new Static_Harvester("127.0.0.1", 3306, "", "", "", catalog_path); + pgsql_harvester = new PgSQL_Static_Harvester("127.0.0.1", 5432, "", "", "", catalog_path); - proxy_debug(PROXY_DEBUG_GENERIC, 3, "Query_Tool_Handler created with Discovery_Schema\n"); + proxy_debug(PROXY_DEBUG_GENERIC, 3, "Query_Tool_Handler created with Discovery_Schema and protocol harvesters\n"); } Query_Tool_Handler::~Query_Tool_Handler() { @@ -315,9 +284,14 @@ Query_Tool_Handler::~Query_Tool_Handler() { catalog = NULL; } - if (harvester) { - delete harvester; - harvester = NULL; + if (mysql_harvester) { + delete mysql_harvester; + mysql_harvester = NULL; + } + + if (pgsql_harvester) { + delete pgsql_harvester; + pgsql_harvester = NULL; } pthread_mutex_destroy(&pool_lock); @@ -332,9 +306,13 @@ int Query_Tool_Handler::init() { return -1; } - // Initialize harvester (but don't connect yet) - if (harvester->init()) { - proxy_error("Query_Tool_Handler: Failed to initialize Static_Harvester\n"); + // Initialize protocol-specific harvesters (lazy backend connect). + if (mysql_harvester->init()) { + proxy_error("Query_Tool_Handler: Failed to initialize MySQL Static_Harvester\n"); + return -1; + } + if (pgsql_harvester->init()) { + proxy_error("Query_Tool_Handler: Failed to initialize PgSQL Static_Harvester\n"); return -1; } @@ -344,7 +322,7 @@ int Query_Tool_Handler::init() { return -1; } - proxy_info("Query_Tool_Handler initialized with Discovery_Schema and Static_Harvester\n"); + proxy_info("Query_Tool_Handler initialized with Discovery_Schema and protocol harvesters\n"); return 0; } @@ -1381,9 +1359,9 @@ json Query_Tool_Handler::get_tool_list() { // ============================================================ tools.push_back(create_tool_schema( "discovery.run_static", - "Trigger ProxySQL to perform static metadata harvest from MySQL INFORMATION_SCHEMA for a single schema. Returns the new run_id for subsequent LLM analysis.", - {"schema_filter"}, - {{"notes", "string"}} + "Trigger ProxySQL to perform static metadata harvest for a specific logical target. target_id is required and protocol-aware (mysql/pgsql).", + {"target_id"}, + {{"schema_filter", "string"}, {"notes", "string"}} )); // ============================================================ @@ -1399,28 +1377,28 @@ json Query_Tool_Handler::get_tool_list() { tools.push_back(create_tool_schema( "catalog.search", "Full-text search over discovered objects (tables/views/routines) using FTS5. Returns ranked object_keys and basic metadata.", - {"run_id", "query"}, + {"target_id", "run_id", "query"}, {{"limit", "integer"}, {"object_type", "string"}, {"schema_name", "string"}} )); tools.push_back(create_tool_schema( "catalog.get_object", "Fetch a discovered object and its columns/indexes/foreign keys by object_key (schema.object) or by object_id.", - {"run_id"}, + {"target_id", "run_id"}, {{"object_id", "integer"}, {"object_key", "string"}, {"include_definition", "boolean"}, {"include_profiles", "boolean"}} )); tools.push_back(create_tool_schema( "catalog.list_objects", "List objects (paged) for a run, optionally filtered by schema/type, ordered by name or size/rows estimate.", - {"run_id"}, + {"target_id", "run_id"}, {{"schema_name", "string"}, {"object_type", "string"}, {"order_by", "string"}, {"page_size", "integer"}, {"page_token", "string"}} )); tools.push_back(create_tool_schema( "catalog.get_relationships", "Get relationships for a given object: foreign keys, view deps, inferred relationships (deterministic + LLM).", - {"run_id"}, + {"target_id", "run_id"}, {{"object_id", "integer"}, {"object_key", "string"}, {"include_inferred", "boolean"}, {"min_confidence", "number"}} )); @@ -1430,7 +1408,7 @@ json Query_Tool_Handler::get_tool_list() { tools.push_back(create_tool_schema( "agent.run_start", "Create a new LLM agent run bound to a deterministic discovery run_id.", - {"run_id", "model_name"}, + {"target_id", "run_id", "model_name"}, {{"prompt_hash", "string"}, {"budget", "object"}} )); @@ -1454,63 +1432,63 @@ json Query_Tool_Handler::get_tool_list() { tools.push_back(create_tool_schema( "llm.summary_upsert", "Upsert a structured semantic summary for an object (table/view/routine). This is the main LLM 'memory' per object.", - {"agent_run_id", "run_id", "object_id", "summary"}, + {"target_id", "agent_run_id", "run_id", "object_id", "summary"}, {{"confidence", "number"}, {"status", "string"}, {"sources", "object"}} )); tools.push_back(create_tool_schema( "llm.summary_get", "Get the LLM semantic summary for an object, optionally for a specific agent_run_id.", - {"run_id", "object_id"}, + {"target_id", "run_id", "object_id"}, {{"agent_run_id", "integer"}, {"latest", "boolean"}} )); tools.push_back(create_tool_schema( "llm.relationship_upsert", "Upsert an LLM-inferred relationship (join edge) between objects/columns with confidence and evidence.", - {"agent_run_id", "run_id", "child_object_id", "child_column", "parent_object_id", "parent_column", "confidence"}, + {"target_id", "agent_run_id", "run_id", "child_object_id", "child_column", "parent_object_id", "parent_column", "confidence"}, {{"rel_type", "string"}, {"evidence", "object"}} )); tools.push_back(create_tool_schema( "llm.domain_upsert", "Create or update a domain (cluster) like 'billing' and its description.", - {"agent_run_id", "run_id", "domain_key"}, + {"target_id", "agent_run_id", "run_id", "domain_key"}, {{"title", "string"}, {"description", "string"}, {"confidence", "number"}} )); tools.push_back(create_tool_schema( "llm.domain_set_members", "Replace members of a domain with a provided list of object_ids and optional roles/confidences.", - {"agent_run_id", "run_id", "domain_key", "members"}, + {"target_id", "agent_run_id", "run_id", "domain_key", "members"}, {} )); tools.push_back(create_tool_schema( "llm.metric_upsert", "Upsert a metric/KPI definition with optional SQL template and dependencies.", - {"agent_run_id", "run_id", "metric_key", "title"}, + {"target_id", "agent_run_id", "run_id", "metric_key", "title"}, {{"description", "string"}, {"domain_key", "string"}, {"grain", "string"}, {"unit", "string"}, {"sql_template", "string"}, {"depends", "object"}, {"confidence", "number"}} )); tools.push_back(create_tool_schema( "llm.question_template_add", "Add a question template (NL) mapped to a structured query plan. Extract table/view names from example_sql and populate related_objects. agent_run_id is optional - if not provided, uses the last agent run for the schema.", - {"run_id", "title", "question_nl", "template"}, + {"target_id", "run_id", "title", "question_nl", "template"}, {{"agent_run_id", "integer"}, {"example_sql", "string"}, {"related_objects", "array"}, {"confidence", "number"}} )); tools.push_back(create_tool_schema( "llm.note_add", "Add a durable free-form note (global/schema/object/domain scoped) for the agent memory.", - {"agent_run_id", "run_id", "scope", "body"}, + {"target_id", "agent_run_id", "run_id", "scope", "body"}, {{"object_id", "integer"}, {"domain_key", "string"}, {"title", "string"}, {"tags", "array"}} )); tools.push_back(create_tool_schema( "llm.search", "Full-text search across LLM artifacts. For question_templates, returns example_sql, related_objects, template_json, and confidence. Use include_objects=true with a non-empty query to get full object schema details (for search mode only). Empty query (list mode) returns only templates without objects to avoid huge responses.", - {"run_id"}, + {"target_id", "run_id"}, {{"query", "string"}, {"limit", "integer"}, {"include_objects", "boolean"}} )); @@ -1544,10 +1522,13 @@ json Query_Tool_Handler::get_tool_description(const std::string& tool_name) { * Returns "(no schema)" for tools without schema context */ static std::string extract_schema_name(const std::string& tool_name, const json& arguments, Discovery_Schema* catalog) { + (void)tool_name; + std::string target_id = json_string(arguments, "target_id"); + // Tools that use run_id (can be resolved to schema) - if (arguments.contains("run_id")) { + if (arguments.contains("run_id") && !target_id.empty()) { std::string run_id_str = json_string(arguments, "run_id"); - int run_id = catalog->resolve_run_id(run_id_str); + int run_id = catalog->resolve_run_id(target_id, run_id_str); if (run_id > 0) { // Look up schema name from catalog char* error = NULL; @@ -1629,21 +1610,26 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& std::string target_id = json_string(arguments, "target_id"); std::string page_token = json_string(arguments, "page_token"); int page_size = json_int(arguments, "page_size", 50); - if (!target_id.empty()) { - refresh_target_registry(); - const QueryTarget* target = resolve_target(target_id); - if (target == NULL) { - return create_error_response("Unknown target_id: " + target_id); - } + refresh_target_registry(); + std::string resolved_target_id = target_id.empty() ? default_target_id : target_id; + if (resolved_target_id.empty()) { + return create_error_response("target_id is required because no default target is available"); + } + const QueryTarget* target = resolve_target(resolved_target_id); + if (target == NULL) { + return create_error_response("Unknown target_id: " + resolved_target_id); } - // Query catalog's schemas table instead of live database + // Query catalog schemas for the resolved target only. char* error = NULL; int cols = 0, affected = 0; SQLite3_result* resultset = NULL; std::ostringstream sql; - sql << "SELECT DISTINCT schema_name FROM schemas ORDER BY schema_name"; + sql << "SELECT DISTINCT s.schema_name" + << " FROM schemas s JOIN runs r ON r.run_id=s.run_id" + << " WHERE r.target_id='" << escape_string_literal(resolved_target_id) << "'" + << " ORDER BY s.schema_name"; if (page_size > 0) { sql << " LIMIT " << page_size; if (!page_token.empty()) { @@ -1755,31 +1741,91 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& // DISCOVERY TOOLS // ============================================================ else if (tool_name == "discovery.run_static") { - if (!harvester) { - result = create_error_response("Static harvester not configured"); + std::string target_id = json_string(arguments, "target_id"); + std::string schema_filter = json_string(arguments, "schema_filter"); + std::string notes = json_string(arguments, "notes", "Static discovery harvest"); + + if (target_id.empty()) { + result = create_error_response("target_id is required"); } else { - std::string schema_filter = json_string(arguments, "schema_filter"); - if (schema_filter.empty()) { - result = create_error_response("schema_filter is required and must not be empty"); + refresh_target_registry(); + const QueryTarget* target = resolve_target(target_id); + if (target == NULL) { + result = create_error_response("Unknown target_id: " + target_id); + } else if (!target->executable) { + result = create_error_response(format_target_unavailable_error(target_id)); } else { - std::string notes = json_string(arguments, "notes", "Static discovery harvest"); - - int run_id = harvester->run_full_harvest(schema_filter, notes); - if (run_id < 0) { - result = create_error_response("Static discovery failed"); + int run_id = -1; + if (target->protocol == "pgsql") { + if (!pgsql_harvester) { + result = create_error_response("PgSQL static harvester not configured"); + } else { + PgSQL_Static_Harvester harvester( + target->host, + target->port > 0 ? target->port : 5432, + target->db_username, + target->db_password, + target->default_schema, + catalog->get_db_path() + ); + if (harvester.init()) { + result = create_error_response("Failed to initialize PgSQL static harvester"); + } else { + run_id = harvester.run_full_harvest(target->target_id, schema_filter, notes); + if (run_id >= 0) { + std::string stats_str = harvester.get_harvest_stats(run_id); + try { + json stats = json::parse(stats_str); + stats["target_id"] = target->target_id; + stats["protocol"] = target->protocol; + result = create_success_response(stats); + } catch (...) { + json stats; + stats["run_id"] = run_id; + stats["target_id"] = target->target_id; + stats["protocol"] = target->protocol; + result = create_success_response(stats); + } + } + } + } } else { - // Get stats using the run_id (after finish_run() has reset current_run_id) - std::string stats_str = harvester->get_harvest_stats(run_id); - json stats; - try { - stats = json::parse(stats_str); - } catch (...) { - stats["run_id"] = run_id; + if (!mysql_harvester) { + result = create_error_response("MySQL static harvester not configured"); + } else { + Static_Harvester harvester( + target->host, + target->port > 0 ? target->port : 3306, + target->db_username, + target->db_password, + target->default_schema, + catalog->get_db_path() + ); + if (harvester.init()) { + result = create_error_response("Failed to initialize MySQL static harvester"); + } else { + run_id = harvester.run_full_harvest(target->target_id, schema_filter, notes); + if (run_id >= 0) { + std::string stats_str = harvester.get_harvest_stats(run_id); + try { + json stats = json::parse(stats_str); + stats["target_id"] = target->target_id; + stats["protocol"] = target->protocol; + result = create_success_response(stats); + } catch (...) { + json stats; + stats["run_id"] = run_id; + stats["target_id"] = target->target_id; + stats["protocol"] = target->protocol; + result = create_success_response(stats); + } + } + } } + } - stats["started_at"] = ""; - stats["mysql_version"] = ""; - result = create_success_response(stats); + if (run_id < 0) { + result = create_error_response("Static discovery failed"); } } } @@ -1801,21 +1847,24 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "catalog.search") { + std::string target_id = json_string(arguments, "target_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string query = json_string(arguments, "query"); int limit = json_int(arguments, "limit", 25); std::string object_type = json_string(arguments, "object_type"); std::string schema_name = json_string(arguments, "schema_name"); - if (run_id_or_schema.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty()) { result = create_error_response("run_id is required"); } else if (query.empty()) { result = create_error_response("query is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { std::string search_results = catalog->fts_search(run_id, query, limit, object_type, schema_name); try { @@ -1828,19 +1877,22 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "catalog.get_object") { + std::string target_id = json_string(arguments, "target_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); int object_id = json_int(arguments, "object_id", -1); std::string object_key = json_string(arguments, "object_key"); bool include_definition = json_int(arguments, "include_definition", 0) != 0; bool include_profiles = json_int(arguments, "include_profiles", 1) != 0; - if (run_id_or_schema.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty()) { result = create_error_response("run_id is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { std::string schema_name, object_name; if (!object_key.empty()) { @@ -1870,6 +1922,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "catalog.list_objects") { + std::string target_id = json_string(arguments, "target_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string schema_name = json_string(arguments, "schema_name"); std::string object_type = json_string(arguments, "object_type"); @@ -1877,13 +1930,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& int page_size = json_int(arguments, "page_size", 50); std::string page_token = json_string(arguments, "page_token"); - if (run_id_or_schema.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty()) { result = create_error_response("run_id is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { std::string list_result = catalog->list_objects( run_id, schema_name, object_type, order_by, page_size, page_token @@ -1898,19 +1953,22 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "catalog.get_relationships") { + std::string target_id = json_string(arguments, "target_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); int object_id = json_int(arguments, "object_id", -1); std::string object_key = json_string(arguments, "object_key"); bool include_inferred = json_int(arguments, "include_inferred", 1) != 0; double min_confidence = json_double(arguments, "min_confidence", 0.0); - if (run_id_or_schema.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty()) { result = create_error_response("run_id is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { // Resolve object_key to object_id if needed if (object_id < 0 && !object_key.empty()) { @@ -1963,6 +2021,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& // AGENT TOOLS // ============================================================ else if (tool_name == "agent.run_start") { + std::string target_id = json_string(arguments, "target_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string model_name = json_string(arguments, "model_name"); std::string prompt_hash = json_string(arguments, "prompt_hash"); @@ -1972,15 +2031,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& budget_json = arguments["budget"].dump(); } - if (run_id_or_schema.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty()) { result = create_error_response("run_id is required"); } else if (model_name.empty()) { result = create_error_response("model_name is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { int agent_run_id = catalog->create_agent_run(run_id, model_name, prompt_hash, budget_json); if (agent_run_id < 0) { @@ -2048,6 +2109,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& // LLM MEMORY TOOLS // ============================================================ else if (tool_name == "llm.summary_upsert") { + std::string target_id = json_string(arguments, "target_id"); int agent_run_id = json_int(arguments, "agent_run_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); int object_id = json_int(arguments, "object_id"); @@ -2065,15 +2127,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& sources_json = arguments["sources"].dump(); } - if (agent_run_id <= 0 || run_id_or_schema.empty() || object_id <= 0) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (agent_run_id <= 0 || run_id_or_schema.empty() || object_id <= 0) { result = create_error_response("agent_run_id, run_id, and object_id are required"); } else if (summary_json.empty()) { result = create_error_response("summary is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { int rc = catalog->upsert_llm_summary( agent_run_id, run_id, object_id, summary_json, @@ -2092,18 +2156,21 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.summary_get") { + std::string target_id = json_string(arguments, "target_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); int object_id = json_int(arguments, "object_id"); int agent_run_id = json_int(arguments, "agent_run_id", -1); bool latest = json_int(arguments, "latest", 1) != 0; - if (run_id_or_schema.empty() || object_id <= 0) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty() || object_id <= 0) { result = create_error_response("run_id and object_id are required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { std::string sum_result = catalog->get_llm_summary(run_id, object_id, agent_run_id, latest); try { @@ -2121,6 +2188,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.relationship_upsert") { + std::string target_id = json_string(arguments, "target_id"); int agent_run_id = json_int(arguments, "agent_run_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); int child_object_id = json_int(arguments, "child_object_id"); @@ -2135,15 +2203,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& evidence_json = arguments["evidence"].dump(); } - if (agent_run_id <= 0 || run_id_or_schema.empty() || child_object_id <= 0 || parent_object_id <= 0) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (agent_run_id <= 0 || run_id_or_schema.empty() || child_object_id <= 0 || parent_object_id <= 0) { result = create_error_response("agent_run_id, run_id, child_object_id, and parent_object_id are required"); } else if (child_column.empty() || parent_column.empty()) { result = create_error_response("child_column and parent_column are required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { int rc = catalog->upsert_llm_relationship( agent_run_id, run_id, child_object_id, child_column, @@ -2161,6 +2231,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.domain_upsert") { + std::string target_id = json_string(arguments, "target_id"); int agent_run_id = json_int(arguments, "agent_run_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string domain_key = json_string(arguments, "domain_key"); @@ -2168,13 +2239,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& std::string description = json_string(arguments, "description"); double confidence = json_double(arguments, "confidence", 0.6); - if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) { result = create_error_response("agent_run_id, run_id, and domain_key are required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { int domain_id = catalog->upsert_llm_domain( agent_run_id, run_id, domain_key, title, description, confidence @@ -2192,6 +2265,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.domain_set_members") { + std::string target_id = json_string(arguments, "target_id"); int agent_run_id = json_int(arguments, "agent_run_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string domain_key = json_string(arguments, "domain_key"); @@ -2208,7 +2282,9 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } } - if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (agent_run_id <= 0 || run_id_or_schema.empty() || domain_key.empty()) { result = create_error_response("agent_run_id, run_id, and domain_key are required"); } else if (members_json.empty()) { proxy_error("llm.domain_set_members: members not provided or invalid type (got: %s)\n", @@ -2216,9 +2292,9 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& result = create_error_response("members array is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { proxy_debug(PROXY_DEBUG_GENERIC, 3, "llm.domain_set_members: setting members='%s'\n", members_json.c_str()); int rc = catalog->set_domain_members(agent_run_id, run_id, domain_key, members_json); @@ -2236,6 +2312,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.metric_upsert") { + std::string target_id = json_string(arguments, "target_id"); int agent_run_id = json_int(arguments, "agent_run_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string metric_key = json_string(arguments, "metric_key"); @@ -2253,13 +2330,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& double confidence = json_double(arguments, "confidence", 0.6); - if (agent_run_id <= 0 || run_id_or_schema.empty() || metric_key.empty() || title.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (agent_run_id <= 0 || run_id_or_schema.empty() || metric_key.empty() || title.empty()) { result = create_error_response("agent_run_id, run_id, metric_key, and title are required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { int metric_id = catalog->upsert_llm_metric( agent_run_id, run_id, metric_key, title, description, domain_key, @@ -2278,6 +2357,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.question_template_add") { + std::string target_id = json_string(arguments, "target_id"); int agent_run_id = json_int(arguments, "agent_run_id", 0); // Optional, default 0 std::string run_id_or_schema = json_string(arguments, "run_id"); std::string title = json_string(arguments, "title"); @@ -2297,15 +2377,17 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& related_objects = arguments["related_objects"].dump(); } - if (run_id_or_schema.empty() || title.empty() || question_nl.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty() || title.empty() || question_nl.empty()) { result = create_error_response("run_id, title, and question_nl are required"); } else if (template_json.empty()) { result = create_error_response("template is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { // If agent_run_id not provided, get the last one for this run_id if (agent_run_id <= 0) { @@ -2336,6 +2418,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.note_add") { + std::string target_id = json_string(arguments, "target_id"); int agent_run_id = json_int(arguments, "agent_run_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string scope = json_string(arguments, "scope"); @@ -2349,13 +2432,15 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& tags_json = arguments["tags"].dump(); } - if (agent_run_id <= 0 || run_id_or_schema.empty() || scope.empty() || body.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (agent_run_id <= 0 || run_id_or_schema.empty() || scope.empty() || body.empty()) { result = create_error_response("agent_run_id, run_id, scope, and body are required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { int note_id = catalog->add_llm_note( agent_run_id, run_id, scope, object_id, domain_key, title, body, tags_json @@ -2372,18 +2457,21 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (tool_name == "llm.search") { + std::string target_id = json_string(arguments, "target_id"); std::string run_id_or_schema = json_string(arguments, "run_id"); std::string query = json_string(arguments, "query"); int limit = json_int(arguments, "limit", 25); bool include_objects = json_int(arguments, "include_objects", 0) != 0; - if (run_id_or_schema.empty()) { + if (target_id.empty()) { + result = create_error_response("target_id is required"); + } else if (run_id_or_schema.empty()) { result = create_error_response("run_id is required"); } else { // Resolve schema name to run_id if needed - int run_id = catalog->resolve_run_id(run_id_or_schema); + int run_id = catalog->resolve_run_id(target_id, run_id_or_schema); if (run_id < 0) { - result = create_error_response("Invalid run_id or schema not found: " + run_id_or_schema); + result = create_error_response("Invalid run_id or schema not found for target_id " + target_id + ": " + run_id_or_schema); } else { // Log the search query catalog->log_llm_search(run_id, query, limit); @@ -2513,7 +2601,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& uint64_t digest = Discovery_Schema::compute_mcp_digest(tool_name, arguments); std::string digest_text = Discovery_Schema::fingerprint_mcp_args(arguments); unsigned long long duration = monotonic_time() - start_time; - int digest_run_id = schema.empty() ? 0 : catalog->resolve_run_id(schema); + int digest_run_id = schema.empty() ? 0 : catalog->resolve_run_id(target->target_id, schema); catalog->update_mcp_query_digest( tool_name, digest_run_id, @@ -2654,8 +2742,11 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& // Log tool invocation to catalog int run_id = 0; std::string run_id_str = json_string(arguments, "run_id"); + std::string run_target_id = json_string(arguments, "target_id"); if (!run_id_str.empty()) { - run_id = catalog->resolve_run_id(run_id_str); + if (!run_target_id.empty()) { + run_id = catalog->resolve_run_id(run_target_id, run_id_str); + } } // Extract error message if present diff --git a/lib/Static_Harvester.cpp b/lib/Static_Harvester.cpp index 01aa4af28..aadd62053 100644 --- a/lib/Static_Harvester.cpp +++ b/lib/Static_Harvester.cpp @@ -389,7 +389,7 @@ std::string Static_Harvester::escape_sql_string(const std::string& str) { // - Only one run can be active at a time per harvester instance // - Automatically connects to MySQL if not already connected // - Records source DSN and MySQL version in the run metadata -int Static_Harvester::start_run(const std::string& notes) { +int Static_Harvester::start_run(const std::string& target_id, const std::string& notes) { if (current_run_id >= 0) { proxy_error("Static_Harvester: Run already active (run_id=%d)\n", current_run_id); return -1; @@ -399,7 +399,7 @@ int Static_Harvester::start_run(const std::string& notes) { return -1; } - current_run_id = catalog->create_run(source_dsn, mysql_version, notes); + current_run_id = catalog->create_run(target_id, "mysql", source_dsn, mysql_version, notes); if (current_run_id < 0) { proxy_error("Static_Harvester: Failed to create run\n"); return -1; @@ -1278,8 +1278,8 @@ int Static_Harvester::rebuild_fts_index() { // // Returns: // run_id on success, -1 on error -int Static_Harvester::run_full_harvest(const std::string& only_schema, const std::string& notes) { - if (start_run(notes) < 0) { +int Static_Harvester::run_full_harvest(const std::string& target_id, const std::string& only_schema, const std::string& notes) { + if (start_run(target_id, notes) < 0) { return -1; } diff --git a/scripts/mcp/README.md b/scripts/mcp/README.md index 86344c74b..304d88f67 100644 --- a/scripts/mcp/README.md +++ b/scripts/mcp/README.md @@ -1,6 +1,6 @@ # MCP Module Testing Suite -This directory contains scripts to test the ProxySQL MCP (Model Context Protocol) module with MySQL connection pool and exploration tools. +This directory contains scripts to test the ProxySQL MCP (Model Context Protocol) module with target-profile routing and exploration tools across MySQL and PostgreSQL backends. ## Table of Contents @@ -11,6 +11,8 @@ This directory contains scripts to test the ProxySQL MCP (Model Context Protocol 5. [Detailed Documentation](#detailed-documentation) 6. [Troubleshooting](#troubleshooting) +> Note: parts of this README still show legacy single-MySQL examples (`mcp-mysql_*`). The current MCP routing model uses target/auth profiles (`runtime_mcp_target_profiles`, `runtime_mcp_auth_profiles`) and `target_id` in query/discovery/catalog/llm workflows. + --- ## Architecture Overview @@ -32,7 +34,7 @@ MCP (Model Context Protocol) is a JSON-RPC 2.0 protocol that allows AI/LLM appli │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ ProxySQL Admin Interface (Port 6032) │ │ -│ │ Configure: mcp-enabled, mcp-mysql_hosts, mcp-port, etc. │ │ +│ │ Configure: mcp-enabled, mcp-port, MCP profiles, etc. │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────▼──────────────────────────────────┐ │ @@ -151,7 +153,7 @@ Where: | **Relationships** | `suggest_joins`, `find_reference_candidates` | Infer table relationships | | **Profiling** | `table_profile`, `column_profile` | Analyze data distributions and statistics | | **Catalog** | `catalog_upsert`, `catalog_get`, `catalog_search`, `catalog_delete`, `catalog_list`, `catalog_merge` | Store/retrieve LLM discoveries | -| **Discovery** | `discovery.run_static` | Run Phase 1 of two-phase discovery | +| **Discovery** | `discovery.run_static` | Run Phase 1 of two-phase discovery for a specific `target_id` | | **Agent Coordination** | `agent.run_start`, `agent.run_finish`, `agent.event_append` | Coordinate LLM agent discovery runs | | **LLM Interaction** | `llm.summary_upsert`, `llm.summary_get`, `llm.relationship_upsert`, `llm.domain_upsert`, `llm.domain_set_members`, `llm.metric_upsert`, `llm.question_template_add`, `llm.note_add`, `llm.search` | Store and retrieve LLM-generated insights | | **RAG** | `rag.search_fts`, `rag.search_vector`, `rag.search_hybrid`, `rag.get_chunks`, `rag.get_docs`, `rag.fetch_from_source`, `rag.admin.stats` | Retrieval-Augmented Generation tools | @@ -179,11 +181,8 @@ Where: | `mcp-cache_endpoint_auth` | (empty) | Auth token for /cache endpoint | | `mcp-ai_endpoint_auth` | (empty) | Auth token for /ai endpoint | | `mcp-timeout_ms` | 30000 | Query timeout in milliseconds | -| `mcp-mysql_hosts` | 127.0.0.1 | MySQL server(s) for tool execution | -| `mcp-mysql_ports` | 3306 | MySQL port(s) | -| `mcp-mysql_user` | (empty) | MySQL username for connections | -| `mcp-mysql_password` | (empty) | MySQL password for connections | -| `mcp-mysql_schema` | (empty) | Default schema for connections | +| `runtime_mcp_target_profiles` | n/a | Logical targets (`target_id`, protocol, hostgroup, auth profile) | +| `runtime_mcp_auth_profiles` | n/a | Backend credentials bound to target profiles | **RAG Configuration Variables:** @@ -219,11 +218,11 @@ Where: - **Observe_Tool_Handler** - Monitoring and metrics - **AI_Tool_Handler** - AI and LLM features -### 3. MySQL Connection Pools +### 3. Protocol-Aware Connection Pools **Location:** Each Tool_Handler manages its own connection pool -**Purpose:** Manages reusable connections to backend MySQL servers for tool execution. +**Purpose:** Manages reusable connections to backend MySQL/PostgreSQL targets for tool execution. **Features:** - Thread-safe connection pooling with `pthread_mutex_t` diff --git a/scripts/mcp/STDIO_BRIDGE_README.md b/scripts/mcp/STDIO_BRIDGE_README.md index 9feee0a84..34f22167a 100644 --- a/scripts/mcp/STDIO_BRIDGE_README.md +++ b/scripts/mcp/STDIO_BRIDGE_README.md @@ -98,13 +98,13 @@ Once connected, the following tools will be available in Claude Code: - `find_reference_candidates` - Find potential foreign key relationships ### Two-Phase Discovery Tools -- `discovery.run_static` - Run Phase 1 of two-phase discovery (static harvest) -- `agent.run_start` - Start a new agent run for discovery coordination +- `discovery.run_static` - Run Phase 1 of two-phase discovery (static harvest), requires `target_id` +- `agent.run_start` - Start a new agent run for discovery coordination, requires `target_id` + `run_id` - `agent.run_finish` - Mark an agent run as completed - `agent.event_append` - Append an event to an agent run ### LLM Interaction Tools -- `llm.summary_upsert` - Store or update a table/column summary generated by LLM +- `llm.summary_upsert` - Store or update a table/column summary generated by LLM (`target_id` + `run_id`) - `llm.summary_get` - Retrieve LLM-generated summary for a table or column - `llm.relationship_upsert` - Store or update an inferred relationship between tables - `llm.domain_upsert` - Store or update a business domain classification @@ -114,6 +114,8 @@ Once connected, the following tools will be available in Claude Code: - `llm.note_add` - Add a general note or insight about the data - `llm.search` - Search LLM-generated content and insights +Note: catalog/agent/llm tools are target-scoped. Always include `target_id` when a tool accepts `run_id`. + ### Catalog Tools - `catalog_upsert` - Store data in the catalog - `catalog_get` - Retrieve from the catalog