diff --git a/doc/MCP/Architecture.md b/doc/MCP/Architecture.md index ad8a0883f..0bcd5987e 100644 --- a/doc/MCP/Architecture.md +++ b/doc/MCP/Architecture.md @@ -175,6 +175,7 @@ Each MCP endpoint has its own dedicated tool handler with specific tools designe **Purpose**: Safe database exploration and query execution **Tools**: +- `list_targets` - List logical backend targets (hostgroup-backed routing handles) - `list_schemas` - List databases - `list_tables` - List tables in schema - `describe_table` - Get table structure @@ -202,9 +203,37 @@ Each MCP endpoint has its own dedicated tool handler with specific tools designe - Data analysis and discovery - Query optimization assistance - Two-phase discovery (static harvest + LLM analysis) +- Multi-backend routing via opaque `target_id` values instead of direct host/protocol details **Authentication**: `mcp-query_endpoint_auth` (Bearer token) +#### Query Target Routing Model + +`/mcp/query` now supports a dynamic routing model based on logical targets: + +- Clients call `list_targets` to discover routable targets. +- Each target exposes: + - `target_id` (opaque identifier) + - `description` (human-readable summary) + - `capabilities` (for example `inventory`, `readonly_sql`, `explain`) +- Query tools (for example `run_sql_readonly`, `explain_sql`, `list_tables`) accept an optional `target_id`. +- If `target_id` is omitted, the server uses a default executable target when available. + +Internally, the server resolves each `target_id` to runtime hostgroup metadata, a configured auth profile, and protocol-specific execution paths. + +Credential separation model: + +- MCP endpoint authentication (Bearer token) identifies and authorizes the MCP client. +- Backend database credentials are server-managed in MCP runtime profile tables: + - `runtime_mcp_target_profiles` + - `runtime_mcp_auth_profiles` +- Query execution pools are keyed by `target_id + auth_profile_id`. + +Current execution support: + +- `mysql` targets: `list_tables`, `run_sql_readonly`, `explain_sql` +- `pgsql` targets: `list_tables`, `run_sql_readonly`, `explain_sql` + --- #### `/mcp/admin` - Administration Endpoint diff --git a/doc/MCP/Tool_Discovery_Guide.md b/doc/MCP/Tool_Discovery_Guide.md index 113af68f4..a95a8d218 100644 --- a/doc/MCP/Tool_Discovery_Guide.md +++ b/doc/MCP/Tool_Discovery_Guide.md @@ -90,12 +90,23 @@ curl -k -X POST "https://127.0.0.1:6071/mcp/query?token=YOUR_TOKEN" \ ### Inventory Tools +#### list_targets +List logical query targets. Each target identifier maps internally to a ProxySQL hostgroup and routing policy. + +Notes: +- Targets are loaded from server-side runtime profile tables and include backend auth mapping. +- MCP clients only use `target_id`; backend credentials are never sent in tool calls. + +**Parameters:** +- None + #### list_schemas List all available schemas/databases. **Parameters:** - `page_token` (string, optional) - Pagination token - `page_size` (integer, optional) - Results per page (default: 50) +- `target_id` (string, optional) - Logical query target identifier #### list_tables List tables in a schema. @@ -105,6 +116,10 @@ List tables in a schema. - `page_token` (string, optional) - Pagination token - `page_size` (integer, optional) - Results per page (default: 50) - `name_filter` (string, optional) - Filter table names by pattern +- `target_id` (string, optional) - Logical query target identifier + +**Routing behavior:** +- Uses `SHOW TABLES` for MySQL targets and `information_schema.tables` for PostgreSQL targets. ### Structure Tools @@ -171,9 +186,14 @@ Execute a read-only SQL query with safety guardrails enforced. **Parameters:** - `sql` (string, **required**) - SQL query to execute +- `target_id` (string, optional) - Logical query target identifier - `max_rows` (integer, optional) - Maximum rows to return (default: 200) - `timeout_sec` (integer, optional) - Query timeout (default: 2) +**Routing behavior:** +- If `target_id` points to a MySQL target, query executes with MySQL protocol. +- If `target_id` points to a PostgreSQL target, query executes with PostgreSQL protocol. + **Safety rules:** - Must start with SELECT - No dangerous keywords (DROP, DELETE, INSERT, UPDATE, etc.) @@ -184,6 +204,10 @@ Explain a query execution plan using EXPLAIN or EXPLAIN ANALYZE. **Parameters:** - `sql` (string, **required**) - SQL query to explain +- `target_id` (string, optional) - Logical query target identifier + +**Routing behavior:** +- Uses protocol-specific execution based on `target_id` (`mysql` or `pgsql` target). ### Relationship Inference Tools diff --git a/doc/MCP/VARIABLES.md b/doc/MCP/VARIABLES.md index ceede8c04..0c84ae5cf 100644 --- a/doc/MCP/VARIABLES.md +++ b/doc/MCP/VARIABLES.md @@ -121,6 +121,7 @@ The following variables control authentication (Bearer tokens) for specific MCP The Query Tool Handler provides LLM-based tools for MySQL database exploration and two-phase discovery, including: - **inventory** - List databases and tables +- **targets** - Discover logical routing targets (`list_targets`) - **structure** - Get table schema - **profiling** - Analyze query performance - **sampling** - Sample table data @@ -131,10 +132,29 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a - **agent** - Agent coordination tools - **llm** - LLM interaction tools +### Dynamic Target Discovery and Routing + +Query tools use a logical `target_id` routing model with server-managed credentials: + +- Use `list_targets` to retrieve discoverable backend targets. +- Active targets come from `runtime_mcp_target_profiles` joined with `runtime_mcp_auth_profiles`. +- The MCP server maps `target_id -> auth_profile_id` and applies backend credentials internally. +- MCP clients must never send backend credentials in tool arguments. +- Clients should pass `target_id` to query tools instead of host/protocol details. + +### Backend Credential Model + +Backend credentials are defined in MCP tables, not in client requests: + +- `mcp_auth_profiles` / `runtime_mcp_auth_profiles` +- `mcp_target_profiles` / `runtime_mcp_target_profiles` + +The in-memory target/auth map is loaded by `MCP_Threads_Handler` from runtime tables and used by the query executor connection pools. + #### `mcp-mysql_hosts` - **Type:** String (comma-separated) - **Default:** `"127.0.0.1"` -- **Description:** Comma-separated list of MySQL host addresses +- **Description:** Legacy POC variable used by non-routed components (for example static harvester defaults). Routed query execution uses MCP profile tables. - **Runtime:** Yes - **Example:** ```sql @@ -145,7 +165,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a #### `mcp-mysql_ports` - **Type:** String (comma-separated) - **Default:** `"3306"` -- **Description:** Comma-separated list of MySQL ports (corresponds to `mcp-mysql_hosts`) +- **Description:** Legacy POC variable used by non-routed components (for example static harvester defaults). Routed query execution uses MCP profile tables. - **Runtime:** Yes - **Example:** ```sql @@ -156,7 +176,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a #### `mcp-mysql_user` - **Type:** String - **Default:** `""` (empty) -- **Description:** MySQL username for tool handler connections +- **Description:** Legacy POC variable. Routed query execution uses credentials from `mcp_auth_profiles`. - **Runtime:** Yes - **Example:** ```sql @@ -167,7 +187,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a #### `mcp-mysql_password` - **Type:** String - **Default:** `""` (empty) -- **Description:** MySQL password for tool handler connections +- **Description:** Legacy POC variable. Routed query execution uses credentials from `mcp_auth_profiles`. - **Runtime:** Yes - **Note:** Password is stored in plaintext in `global_variables`. Use restrictive MySQL user permissions. - **Example:** @@ -179,7 +199,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a #### `mcp-mysql_schema` - **Type:** String - **Default:** `""` (empty) -- **Description:** Default database/schema to use for tool operations +- **Description:** Legacy POC variable. Routed query execution uses `default_schema` from `mcp_auth_profiles`. - **Runtime:** Yes - **Example:** ```sql @@ -222,6 +242,27 @@ LOAD MCP VARIABLES TO RUNTIME; SAVE MCP VARIABLES TO DISK; ``` +### Profile Commands + +Unified command family for MCP backend profiles (auth + target together): + +```sql +-- Disk -> Memory +LOAD MCP PROFILES FROM DISK; +LOAD MCP PROFILES TO MEMORY; + +-- Memory -> Runtime +LOAD MCP PROFILES TO RUNTIME; +LOAD MCP PROFILES FROM MEMORY; + +-- Runtime -> Memory +SAVE MCP PROFILES TO MEMORY; +SAVE MCP PROFILES FROM RUNTIME; + +-- Memory -> Disk +SAVE MCP PROFILES TO DISK; +``` + ### Checksum Commands ```sql diff --git a/include/MCP_Thread.h b/include/MCP_Thread.h index 5912b9889..38922eb6b 100644 --- a/include/MCP_Thread.h +++ b/include/MCP_Thread.h @@ -8,6 +8,9 @@ #include #include #include +#include +#include +#include // Forward declarations class ProxySQL_MCP_Server; @@ -20,6 +23,7 @@ class Cache_Tool_Handler; class Observe_Tool_Handler; class AI_Tool_Handler; class RAG_Tool_Handler; +class SQLite3_result; /** * @brief MCP Threads Handler class for managing MCP module configuration @@ -37,6 +41,21 @@ private: pthread_rwlock_t rwlock; ///< Read-write lock for thread-safe access public: + struct MCP_Target_Auth_Context { + std::string target_id; + std::string protocol; + int hostgroup_id; + std::string auth_profile_id; + std::string db_username; + std::string db_password; + std::string default_schema; + int max_rows; + int timeout_ms; + bool allow_explain; + bool allow_discovery; + std::string description; + }; + /** * @brief Structure holding MCP module configuration variables * @@ -201,6 +220,25 @@ public: * Outputs the MCP module version to stderr. */ void print_version(); + + /** + * @brief Load MCP target/auth profiles from a joined runtime resultset into memory map + * @return 0 on success, -1 on failure + */ + int load_target_auth_map(SQLite3_result* resultset); + + /** + * @brief Resolve backend auth/policy context for a target_id + */ + bool get_target_auth_context(const std::string& target_id, MCP_Target_Auth_Context& out_ctx); + + /** + * @brief Return all active target auth contexts (thread-safe copy) + */ + std::vector get_all_target_auth_contexts(); + +private: + std::map target_auth_map; }; // Global instance of the MCP Threads Handler diff --git a/include/ProxySQL_Admin_Tables_Definitions.h b/include/ProxySQL_Admin_Tables_Definitions.h index bcff6668f..c8d1b3f69 100644 --- a/include/ProxySQL_Admin_Tables_Definitions.h +++ b/include/ProxySQL_Admin_Tables_Definitions.h @@ -396,6 +396,56 @@ " comment VARCHAR" \ ")" +// MCP backend authentication profiles (server-side credentials) +#define ADMIN_SQLITE_TABLE_MCP_AUTH_PROFILES "CREATE TABLE mcp_auth_profiles (" \ + " auth_profile_id VARCHAR PRIMARY KEY NOT NULL ," \ + " db_username VARCHAR NOT NULL ," \ + " db_password VARCHAR NOT NULL ," \ + " default_schema VARCHAR DEFAULT '' ," \ + " use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 ," \ + " ssl_mode VARCHAR DEFAULT '' ," \ + " comment VARCHAR DEFAULT ''" \ + ")" + +#define ADMIN_SQLITE_TABLE_RUNTIME_MCP_AUTH_PROFILES "CREATE TABLE runtime_mcp_auth_profiles (" \ + " auth_profile_id VARCHAR PRIMARY KEY NOT NULL ," \ + " db_username VARCHAR NOT NULL ," \ + " db_password VARCHAR NOT NULL ," \ + " default_schema VARCHAR DEFAULT '' ," \ + " use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 ," \ + " ssl_mode VARCHAR DEFAULT '' ," \ + " comment VARCHAR DEFAULT ''" \ + ")" + +// MCP target routing profiles: target_id -> (protocol, hostgroup, auth_profile, policy) +#define ADMIN_SQLITE_TABLE_MCP_TARGET_PROFILES "CREATE TABLE mcp_target_profiles (" \ + " target_id VARCHAR PRIMARY KEY NOT NULL ," \ + " protocol VARCHAR NOT NULL CHECK (protocol IN ('mysql','pgsql')) ," \ + " hostgroup_id INT CHECK (hostgroup_id >= 0) NOT NULL ," \ + " auth_profile_id VARCHAR NOT NULL ," \ + " description VARCHAR DEFAULT '' ," \ + " max_rows INT CHECK (max_rows > 0) NOT NULL DEFAULT 200 ," \ + " timeout_ms INT CHECK (timeout_ms >= 0) NOT NULL DEFAULT 2000 ," \ + " allow_explain INT CHECK (allow_explain IN (0,1)) NOT NULL DEFAULT 1 ," \ + " allow_discovery INT CHECK (allow_discovery IN (0,1)) NOT NULL DEFAULT 1 ," \ + " active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 ," \ + " comment VARCHAR DEFAULT ''" \ + ")" + +#define ADMIN_SQLITE_TABLE_RUNTIME_MCP_TARGET_PROFILES "CREATE TABLE runtime_mcp_target_profiles (" \ + " target_id VARCHAR PRIMARY KEY NOT NULL ," \ + " protocol VARCHAR NOT NULL CHECK (protocol IN ('mysql','pgsql')) ," \ + " hostgroup_id INT CHECK (hostgroup_id >= 0) NOT NULL ," \ + " auth_profile_id VARCHAR NOT NULL ," \ + " description VARCHAR DEFAULT '' ," \ + " max_rows INT CHECK (max_rows > 0) NOT NULL DEFAULT 200 ," \ + " timeout_ms INT CHECK (timeout_ms >= 0) NOT NULL DEFAULT 2000 ," \ + " allow_explain INT CHECK (allow_explain IN (0,1)) NOT NULL DEFAULT 1 ," \ + " allow_discovery INT CHECK (allow_discovery IN (0,1)) NOT NULL DEFAULT 1 ," \ + " active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 ," \ + " comment VARCHAR DEFAULT ''" \ + ")" + // MCP query digest statistics table #define STATS_SQLITE_TABLE_MCP_QUERY_DIGEST "CREATE TABLE stats_mcp_query_digest (" \ " tool_name VARCHAR NOT NULL ," \ diff --git a/include/Query_Tool_Handler.h b/include/Query_Tool_Handler.h index 1dcbdf53e..f9e3b1846 100644 --- a/include/Query_Tool_Handler.h +++ b/include/Query_Tool_Handler.h @@ -28,12 +28,28 @@ class Static_Harvester; */ class Query_Tool_Handler : public MCP_Tool_Handler { private: + // Logical query targets discovered from runtime hostgroups + struct QueryTarget { + std::string target_id; ///< Opaque MCP-visible target identifier + std::string description; ///< Human-readable target description + std::string protocol; ///< Internal protocol: mysql|pgsql + int hostgroup_id; ///< Internal hostgroup identifier + std::string auth_profile_id; ///< Server-side auth profile identifier + std::string db_username; ///< Backend DB username (from auth profile) + std::string db_password; ///< Backend DB password (from auth profile) + std::string default_schema; ///< Backend default schema/database + std::string host; ///< Concrete host used by executor + int port; ///< Concrete port used by executor + bool executable; ///< True if current handler can execute against this target + }; + // MySQL connection configuration std::string mysql_hosts; std::string mysql_ports; std::string mysql_user; std::string mysql_password; std::string mysql_schema; + std::string default_target_id; // Discovery components (NEW - replaces MySQL_Tool_Handler wrapper) Discovery_Schema* catalog; ///< Discovery catalog (replaces old MySQL_Catalog) @@ -42,14 +58,28 @@ private: // Connection pool for MySQL queries struct MySQLConnection { void* mysql; ///< MySQL connection handle (MYSQL*) + std::string target_id; + std::string auth_profile_id; + std::string host; + int port; + bool in_use; + std::string current_schema; ///< Track current schema for this connection + }; + struct PgSQLConnection { + void* pgconn; ///< PostgreSQL connection handle (PGconn*) + std::string target_id; + std::string auth_profile_id; std::string host; int port; bool in_use; std::string current_schema; ///< Track current schema for this connection }; std::vector connection_pool; + std::vector pgsql_connection_pool; + std::vector target_registry; pthread_mutex_t pool_lock; - int pool_size; + int pool_size; ///< MySQL pool size + int pg_pool_size; ///< PostgreSQL pool size // Query guardrails int max_rows; @@ -106,10 +136,21 @@ private: */ int init_connection_pool(); + /** + * @brief Discover logical targets from runtime hostgroups + */ + void refresh_target_registry(); + + /** + * @brief Resolve target id (or default target if empty) + */ + const QueryTarget* resolve_target(const std::string& target_id); + /** * @brief Get a connection from the pool */ - void* get_connection(); + void* get_connection(const std::string& target_id); + void* get_pgsql_connection(const std::string& target_id); /** * @brief Return a connection to the pool @@ -123,11 +164,12 @@ private: * @note Caller should NOT hold pool_lock when calling this */ MySQLConnection* find_connection(void* mysql_ptr); + PgSQLConnection* find_pgsql_connection(void* pgconn_ptr); /** * @brief Execute a query and return results as JSON */ - std::string execute_query(const std::string& query); + std::string execute_query(const std::string& query, const std::string& target_id = ""); /** * @brief Execute a query with optional schema switching @@ -137,7 +179,8 @@ private: */ std::string execute_query_with_schema( const std::string& query, - const std::string& schema + const std::string& schema, + const std::string& target_id ); /** diff --git a/lib/Admin_Bootstrap.cpp b/lib/Admin_Bootstrap.cpp index 1751eeacf..7ff81e759 100644 --- a/lib/Admin_Bootstrap.cpp +++ b/lib/Admin_Bootstrap.cpp @@ -822,8 +822,14 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { // MCP query rules insert_into_tables_defs(tables_defs_admin, "mcp_query_rules", ADMIN_SQLITE_TABLE_MCP_QUERY_RULES); insert_into_tables_defs(tables_defs_admin, "runtime_mcp_query_rules", ADMIN_SQLITE_TABLE_RUNTIME_MCP_QUERY_RULES); + insert_into_tables_defs(tables_defs_admin, "mcp_auth_profiles", ADMIN_SQLITE_TABLE_MCP_AUTH_PROFILES); + insert_into_tables_defs(tables_defs_admin, "runtime_mcp_auth_profiles", ADMIN_SQLITE_TABLE_RUNTIME_MCP_AUTH_PROFILES); + insert_into_tables_defs(tables_defs_admin, "mcp_target_profiles", ADMIN_SQLITE_TABLE_MCP_TARGET_PROFILES); + insert_into_tables_defs(tables_defs_admin, "runtime_mcp_target_profiles", ADMIN_SQLITE_TABLE_RUNTIME_MCP_TARGET_PROFILES); insert_into_tables_defs(tables_defs_config, "mcp_query_rules", ADMIN_SQLITE_TABLE_MCP_QUERY_RULES); + insert_into_tables_defs(tables_defs_config, "mcp_auth_profiles", ADMIN_SQLITE_TABLE_MCP_AUTH_PROFILES); + insert_into_tables_defs(tables_defs_config, "mcp_target_profiles", ADMIN_SQLITE_TABLE_MCP_TARGET_PROFILES); #endif /* PROXYSQLGENAI */ insert_into_tables_defs(tables_defs_config, "pgsql_servers", ADMIN_SQLITE_TABLE_PGSQL_SERVERS); diff --git a/lib/Admin_Handler.cpp b/lib/Admin_Handler.cpp index d0c48bda4..98443b945 100644 --- a/lib/Admin_Handler.cpp +++ b/lib/Admin_Handler.cpp @@ -2534,6 +2534,164 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query // MCP QUERY RULES COMMAND HANDLERS // ============================================================ #ifdef PROXYSQLGENAI + // ============================================================ + // MCP PROFILES COMMAND HANDLERS (auth + target together) + // ============================================================ + if ((query_no_space_length > 17) && + ((!strncasecmp("SAVE MCP PROFILES ", query_no_space, 18)) || + (!strncasecmp("LOAD MCP PROFILES ", query_no_space, 18)))) { + + ProxySQL_Admin *SPA = (ProxySQL_Admin *)pa; + + const auto load_target_auth_map_from_runtime = [&]() -> bool { + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + const char* q = + "SELECT t.target_id, t.protocol, t.hostgroup_id, t.auth_profile_id," + " t.max_rows, t.timeout_ms, t.allow_explain, t.allow_discovery, t.description," + " a.db_username, a.db_password, a.default_schema" + " FROM runtime_mcp_target_profiles t" + " JOIN runtime_mcp_auth_profiles a ON a.auth_profile_id=t.auth_profile_id" + " WHERE t.active=1" + " ORDER BY t.target_id"; + SPA->admindb->execute_statement(q, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_error("Failed to load MCP target auth map: %s\n", error); + free(error); + if (resultset) { + delete resultset; + } + return false; + } + if (GloMCPH) { + GloMCPH->load_target_auth_map(resultset); + } else if (resultset) { + delete resultset; + } + return true; + }; + + // LOAD MCP PROFILES FROM DISK / TO MEMORY + if ( + (query_no_space_length == strlen("LOAD MCP PROFILES FROM DISK") && + !strncasecmp("LOAD MCP PROFILES FROM DISK", query_no_space, query_no_space_length)) || + (query_no_space_length == strlen("LOAD MCP PROFILES TO MEMORY") && + !strncasecmp("LOAD MCP PROFILES TO MEMORY", query_no_space, query_no_space_length)) + ) { + if (!SPA->admindb->execute("BEGIN")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction"); + return false; + } + if (!SPA->admindb->execute("DELETE FROM main.mcp_auth_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_auth_profiles SELECT * FROM disk.mcp_auth_profiles") || + !SPA->admindb->execute("DELETE FROM main.mcp_target_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_target_profiles SELECT * FROM disk.mcp_target_profiles")) { + SPA->admindb->execute("ROLLBACK"); + SPA->send_error_msg_to_client(sess, (char *)"Failed to load MCP profiles from disk"); + return false; + } + if (!SPA->admindb->execute("COMMIT")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction"); + return false; + } + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + return false; + } + + // SAVE MCP PROFILES TO DISK + if ( + (query_no_space_length == strlen("SAVE MCP PROFILES TO DISK") && + !strncasecmp("SAVE MCP PROFILES TO DISK", query_no_space, query_no_space_length)) + ) { + if (!SPA->admindb->execute("BEGIN")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction"); + return false; + } + if (!SPA->admindb->execute("DELETE FROM disk.mcp_auth_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO disk.mcp_auth_profiles SELECT * FROM main.mcp_auth_profiles") || + !SPA->admindb->execute("DELETE FROM disk.mcp_target_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO disk.mcp_target_profiles SELECT * FROM main.mcp_target_profiles")) { + SPA->admindb->execute("ROLLBACK"); + SPA->send_error_msg_to_client(sess, (char *)"Failed to save MCP profiles to disk"); + return false; + } + if (!SPA->admindb->execute("COMMIT")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction"); + return false; + } + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + return false; + } + + // LOAD MCP PROFILES TO RUNTIME / FROM MEMORY + if ( + (query_no_space_length == strlen("LOAD MCP PROFILES TO RUNTIME") && + !strncasecmp("LOAD MCP PROFILES TO RUNTIME", query_no_space, query_no_space_length)) || + (query_no_space_length == strlen("LOAD MCP PROFILES TO RUN") && + !strncasecmp("LOAD MCP PROFILES TO RUN", query_no_space, query_no_space_length)) || + (query_no_space_length == strlen("LOAD MCP PROFILES FROM MEMORY") && + !strncasecmp("LOAD MCP PROFILES FROM MEMORY", query_no_space, query_no_space_length)) || + (query_no_space_length == strlen("LOAD MCP PROFILES FROM MEM") && + !strncasecmp("LOAD MCP PROFILES FROM MEM", query_no_space, query_no_space_length)) + ) { + if (!SPA->admindb->execute("BEGIN")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction"); + return false; + } + if (!SPA->admindb->execute("DELETE FROM runtime_mcp_auth_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO runtime_mcp_auth_profiles SELECT * FROM main.mcp_auth_profiles") || + !SPA->admindb->execute("DELETE FROM runtime_mcp_target_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO runtime_mcp_target_profiles SELECT * FROM main.mcp_target_profiles")) { + SPA->admindb->execute("ROLLBACK"); + SPA->send_error_msg_to_client(sess, (char *)"Failed to load MCP profiles to runtime"); + return false; + } + if (!SPA->admindb->execute("COMMIT")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction"); + return false; + } + if (!load_target_auth_map_from_runtime()) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to refresh MCP runtime profile map"); + return false; + } + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + return false; + } + + // SAVE MCP PROFILES FROM RUNTIME / TO MEMORY + if ( + (query_no_space_length == strlen("SAVE MCP PROFILES TO MEMORY") && + !strncasecmp("SAVE MCP PROFILES TO MEMORY", query_no_space, query_no_space_length)) || + (query_no_space_length == strlen("SAVE MCP PROFILES TO MEM") && + !strncasecmp("SAVE MCP PROFILES TO MEM", query_no_space, query_no_space_length)) || + (query_no_space_length == strlen("SAVE MCP PROFILES FROM RUNTIME") && + !strncasecmp("SAVE MCP PROFILES FROM RUNTIME", query_no_space, query_no_space_length)) || + (query_no_space_length == strlen("SAVE MCP PROFILES FROM RUN") && + !strncasecmp("SAVE MCP PROFILES FROM RUN", query_no_space, query_no_space_length)) + ) { + if (!SPA->admindb->execute("BEGIN")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction"); + return false; + } + if (!SPA->admindb->execute("DELETE FROM main.mcp_auth_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_auth_profiles SELECT * FROM runtime_mcp_auth_profiles") || + !SPA->admindb->execute("DELETE FROM main.mcp_target_profiles") || + !SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_target_profiles SELECT * FROM runtime_mcp_target_profiles")) { + SPA->admindb->execute("ROLLBACK"); + SPA->send_error_msg_to_client(sess, (char *)"Failed to save MCP profiles from runtime"); + return false; + } + if (!SPA->admindb->execute("COMMIT")) { + SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction"); + return false; + } + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + return false; + } + } + // Supported commands: // LOAD MCP QUERY RULES FROM DISK - Copy from disk to memory // LOAD MCP QUERY RULES TO MEMORY - Copy from disk to memory (alias) diff --git a/lib/MCP_Thread.cpp b/lib/MCP_Thread.cpp index c1c13f9f6..615c7110a 100644 --- a/lib/MCP_Thread.cpp +++ b/lib/MCP_Thread.cpp @@ -389,4 +389,64 @@ void MCP_Threads_Handler::print_version() { fprintf(stderr, "MCP Threads Handler rev. %s -- %s -- %s\n", MCP_THREAD_VERSION, __FILE__, __TIMESTAMP__); } +int MCP_Threads_Handler::load_target_auth_map(SQLite3_result* resultset) { + if (!resultset) { + return -1; + } + std::map new_map; + for (auto row : resultset->rows) { + if (row->cnt < 12 || !row->fields[0] || !row->fields[1] || !row->fields[2] || !row->fields[3] || + !row->fields[9] || !row->fields[10]) { + continue; + } + + MCP_Target_Auth_Context ctx; + ctx.target_id = row->fields[0]; + ctx.protocol = row->fields[1]; + ctx.hostgroup_id = atoi(row->fields[2]); + ctx.auth_profile_id = row->fields[3]; + ctx.max_rows = row->fields[4] ? atoi(row->fields[4]) : 200; + ctx.timeout_ms = row->fields[5] ? atoi(row->fields[5]) : 2000; + ctx.allow_explain = row->fields[6] ? (atoi(row->fields[6]) != 0) : true; + ctx.allow_discovery = row->fields[7] ? (atoi(row->fields[7]) != 0) : true; + ctx.description = row->fields[8] ? row->fields[8] : ""; + ctx.db_username = row->fields[9]; + ctx.db_password = row->fields[10]; + ctx.default_schema = row->fields[11] ? row->fields[11] : ""; + + new_map[ctx.target_id] = ctx; + } + delete resultset; + + pthread_rwlock_wrlock(&rwlock); + target_auth_map.swap(new_map); + pthread_rwlock_unlock(&rwlock); + + proxy_info("MCP_Threads_Handler: loaded %zu target auth profile mapping(s)\n", target_auth_map.size()); + return 0; +} + +bool MCP_Threads_Handler::get_target_auth_context(const std::string& target_id, MCP_Target_Auth_Context& out_ctx) { + pthread_rwlock_rdlock(&rwlock); + auto it = target_auth_map.find(target_id); + if (it == target_auth_map.end()) { + pthread_rwlock_unlock(&rwlock); + return false; + } + out_ctx = it->second; + pthread_rwlock_unlock(&rwlock); + return true; +} + +std::vector MCP_Threads_Handler::get_all_target_auth_contexts() { + std::vector out; + pthread_rwlock_rdlock(&rwlock); + out.reserve(target_auth_map.size()); + for (const auto& kv : target_auth_map) { + out.push_back(kv.second); + } + pthread_rwlock_unlock(&rwlock); + return out; +} + #endif /* PROXYSQLGENAI */ diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index d52096852..8474813a6 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -2977,6 +2977,35 @@ void ProxySQL_Admin::init_mcp_variables() { flush_mcp_variables___runtime_to_database(configdb, false, false, false, false, false); flush_mcp_variables___runtime_to_database(admindb, false, true, false, false, false); flush_mcp_variables___database_to_runtime(admindb, true, "", 0); + + // Load MCP target/auth profiles into runtime tables and then in-memory map. + admindb->execute("DELETE FROM runtime_mcp_auth_profiles"); + admindb->execute("INSERT OR REPLACE INTO runtime_mcp_auth_profiles SELECT * FROM main.mcp_auth_profiles"); + admindb->execute("DELETE FROM runtime_mcp_target_profiles"); + admindb->execute("INSERT OR REPLACE INTO runtime_mcp_target_profiles SELECT * FROM main.mcp_target_profiles"); + + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + const char* q = + "SELECT t.target_id, t.protocol, t.hostgroup_id, t.auth_profile_id," + " t.max_rows, t.timeout_ms, t.allow_explain, t.allow_discovery, t.description," + " a.db_username, a.db_password, a.default_schema" + " FROM runtime_mcp_target_profiles t" + " JOIN runtime_mcp_auth_profiles a ON a.auth_profile_id=t.auth_profile_id" + " WHERE t.active=1" + " ORDER BY t.target_id"; + admindb->execute_statement(q, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_error("Failed to load MCP target auth map: %s\n", error); + free(error); + if (resultset) { + delete resultset; + } + } else { + GloMCPH->load_target_auth_map(resultset); + } } } diff --git a/lib/Query_Tool_Handler.cpp b/lib/Query_Tool_Handler.cpp index 4db769084..f69f3162c 100644 --- a/lib/Query_Tool_Handler.cpp +++ b/lib/Query_Tool_Handler.cpp @@ -8,7 +8,9 @@ using json = nlohmann::json; #define PROXYJSON #include "Query_Tool_Handler.h" +#include "MCP_Thread.h" #include "proxysql_debug.h" +#include "proxysql_admin.h" #include "Static_Harvester.h" #include @@ -18,6 +20,9 @@ using json = nlohmann::json; // MySQL client library #include +#include + +extern ProxySQL_Admin *GloAdmin; // ============================================================ // JSON Helper Functions @@ -203,6 +208,7 @@ Query_Tool_Handler::Query_Tool_Handler( : catalog(NULL), harvester(NULL), pool_size(0), + pg_pool_size(0), max_rows(200), timeout_ms(2000), allow_select_star(false) @@ -302,58 +308,42 @@ void Query_Tool_Handler::close() { connection_pool.clear(); pool_size = 0; - pthread_mutex_unlock(&pool_lock); -} - -int Query_Tool_Handler::init_connection_pool() { - // Parse hosts - std::vector host_list; - std::istringstream h(mysql_hosts); - std::string host; - while (std::getline(h, host, ',')) { - host.erase(0, host.find_first_not_of(" \t")); - host.erase(host.find_last_not_of(" \t") + 1); - if (!host.empty()) { - host_list.push_back(host); + for (auto& conn : pgsql_connection_pool) { + if (conn.pgconn) { + PQfinish(static_cast(conn.pgconn)); + conn.pgconn = NULL; } } + pgsql_connection_pool.clear(); + pg_pool_size = 0; - // Parse ports - std::vector port_list; - std::istringstream p(mysql_ports); - std::string port; - while (std::getline(p, port, ',')) { - port.erase(0, port.find_first_not_of(" \t")); - port.erase(port.find_last_not_of(" \t") + 1); - if (!port.empty()) { - port_list.push_back(atoi(port.c_str())); - } - } - - // Ensure ports array matches hosts array size - while (port_list.size() < host_list.size()) { - port_list.push_back(3306); - } + pthread_mutex_unlock(&pool_lock); +} - if (host_list.empty()) { - proxy_error("Query_Tool_Handler: No hosts configured\n"); - return -1; - } +int Query_Tool_Handler::init_connection_pool() { + refresh_target_registry(); pthread_mutex_lock(&pool_lock); + pool_size = 0; + pg_pool_size = 0; - for (size_t i = 0; i < host_list.size(); i++) { + for (const auto& target : target_registry) { + if (!target.executable || target.protocol != "mysql") { + continue; + } MySQLConnection conn; - conn.host = host_list[i]; - conn.port = port_list[i]; + conn.target_id = target.target_id; + conn.auth_profile_id = target.auth_profile_id; + conn.host = target.host; + conn.port = target.port; conn.in_use = false; + conn.current_schema = target.default_schema; MYSQL* mysql = mysql_init(NULL); if (!mysql) { proxy_error("Query_Tool_Handler: mysql_init failed for %s:%d\n", conn.host.c_str(), conn.port); - pthread_mutex_unlock(&pool_lock); - return -1; + continue; } unsigned int timeout = 5; @@ -364,9 +354,9 @@ int Query_Tool_Handler::init_connection_pool() { if (!mysql_real_connect( mysql, conn.host.c_str(), - mysql_user.c_str(), - mysql_password.c_str(), - mysql_schema.empty() ? NULL : mysql_schema.c_str(), + target.db_username.c_str(), + target.db_password.c_str(), + target.default_schema.empty() ? NULL : target.default_schema.c_str(), conn.port, NULL, CLIENT_MULTI_STATEMENTS @@ -374,28 +364,222 @@ int Query_Tool_Handler::init_connection_pool() { proxy_error("Query_Tool_Handler: mysql_real_connect failed for %s:%d: %s\n", conn.host.c_str(), conn.port, mysql_error(mysql)); mysql_close(mysql); - pthread_mutex_unlock(&pool_lock); - return -1; + continue; } conn.mysql = mysql; connection_pool.push_back(conn); pool_size++; - proxy_info("Query_Tool_Handler: Connected to %s:%d\n", - conn.host.c_str(), conn.port); + proxy_info("Query_Tool_Handler: Connected target '%s' to %s:%d\n", + conn.target_id.c_str(), conn.host.c_str(), conn.port); + + if (default_target_id.empty()) { + default_target_id = conn.target_id; + } + } + + for (const auto& target : target_registry) { + if (!target.executable || target.protocol != "pgsql") { + continue; + } + + PgSQLConnection conn; + conn.target_id = target.target_id; + conn.auth_profile_id = target.auth_profile_id; + conn.host = target.host; + conn.port = target.port; + conn.in_use = false; + conn.current_schema = target.default_schema; + + std::ostringstream conninfo; + conninfo << "host=" << conn.host + << " port=" << conn.port + << " user=" << target.db_username + << " password=" << target.db_password + << " connect_timeout=5"; + if (!target.default_schema.empty()) { + conninfo << " dbname=" << target.default_schema; + } + + PGconn* pgconn = PQconnectdb(conninfo.str().c_str()); + if (pgconn == NULL || PQstatus(pgconn) != CONNECTION_OK) { + proxy_error( + "Query_Tool_Handler: PQconnectdb failed for %s:%d: %s\n", + conn.host.c_str(), conn.port, pgconn ? PQerrorMessage(pgconn) : "null connection" + ); + if (pgconn) { + PQfinish(pgconn); + } + continue; + } + + conn.pgconn = pgconn; + pgsql_connection_pool.push_back(conn); + pg_pool_size++; + + proxy_info("Query_Tool_Handler: Connected target '%s' to pgsql %s:%d\n", + conn.target_id.c_str(), conn.host.c_str(), conn.port); + + if (default_target_id.empty()) { + default_target_id = conn.target_id; + } } pthread_mutex_unlock(&pool_lock); - proxy_info("Query_Tool_Handler: Connection pool initialized with %d connection(s)\n", pool_size); + if ((pool_size + pg_pool_size) == 0) { + proxy_error("Query_Tool_Handler: No executable targets available\n"); + return -1; + } + + proxy_info( + "Query_Tool_Handler: Connection pools initialized mysql=%d pgsql=%d, default target '%s'\n", + pool_size, pg_pool_size, default_target_id.c_str() + ); return 0; } -void* Query_Tool_Handler::get_connection() { +void Query_Tool_Handler::refresh_target_registry() { + target_registry.clear(); + default_target_id.clear(); + + if (!GloMCPH) { + return; + } + + // Refresh MCP target/auth map from runtime profile tables before resolving targets. + if (GloAdmin && GloAdmin->admindb) { + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + const char* q = + "SELECT t.target_id, t.protocol, t.hostgroup_id, t.auth_profile_id," + " t.max_rows, t.timeout_ms, t.allow_explain, t.allow_discovery, t.description," + " a.db_username, a.db_password, a.default_schema" + " FROM runtime_mcp_target_profiles t" + " JOIN runtime_mcp_auth_profiles a ON a.auth_profile_id=t.auth_profile_id" + " WHERE t.active=1" + " ORDER BY t.target_id"; + GloAdmin->admindb->execute_statement(q, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_warning("Query_Tool_Handler: failed refreshing target auth map: %s\n", error); + free(error); + if (resultset) { + delete resultset; + } + } else { + GloMCPH->load_target_auth_map(resultset); + } + } + + const auto profiles = GloMCPH->get_all_target_auth_contexts(); + + const auto resolve_endpoint = [&]( + const std::string& protocol, + int hostgroup_id, + std::string& host, + int& port, + int& backends + ) -> bool { + if (GloAdmin == NULL || GloAdmin->admindb == NULL) { + return false; + } + const char* table_name = (protocol == "pgsql") ? "runtime_pgsql_servers" : "runtime_mysql_servers"; + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + + std::ostringstream sql; + sql << "SELECT hostname, port FROM " << table_name + << " WHERE hostgroup_id=" << hostgroup_id + << " AND UPPER(status)='ONLINE'" + << " ORDER BY weight DESC, hostname, port"; + GloAdmin->admindb->execute_statement(sql.str().c_str(), &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_warning("Query_Tool_Handler: endpoint resolution failed for %s/%d: %s\n", + protocol.c_str(), hostgroup_id, error); + free(error); + if (resultset) { + delete resultset; + } + return false; + } + if (!resultset || resultset->rows.empty()) { + if (resultset) { + delete resultset; + } + return false; + } + + backends = resultset->rows.size(); + host = resultset->rows[0]->fields[0] ? resultset->rows[0]->fields[0] : ""; + port = resultset->rows[0]->fields[1] ? atoi(resultset->rows[0]->fields[1]) : ((protocol == "pgsql") ? 5432 : 3306); + delete resultset; + return !host.empty(); + }; + + for (const auto& ctx : profiles) { + QueryTarget target; + target.target_id = ctx.target_id; + target.protocol = ctx.protocol; + target.hostgroup_id = ctx.hostgroup_id; + target.auth_profile_id = ctx.auth_profile_id; + target.db_username = ctx.db_username; + target.db_password = ctx.db_password; + target.default_schema = ctx.default_schema; + target.description = ctx.description; + target.executable = false; + + int backend_count = 0; + if (resolve_endpoint(target.protocol, target.hostgroup_id, target.host, target.port, backend_count)) { + target.executable = !target.db_username.empty(); + if (target.description.empty()) { + target.description = "Hostgroup " + std::to_string(target.hostgroup_id) + + " (" + std::to_string(backend_count) + " backend(s))"; + } + } else { + if (target.description.empty()) { + target.description = "Hostgroup " + std::to_string(target.hostgroup_id) + " (no ONLINE backends)"; + } + } + + target_registry.push_back(target); + } + + for (const auto& target : target_registry) { + if (target.executable) { + default_target_id = target.target_id; + break; + } + } +} + +const Query_Tool_Handler::QueryTarget* Query_Tool_Handler::resolve_target(const std::string& target_id) { + const std::string& resolved_target_id = target_id.empty() ? default_target_id : target_id; + if (resolved_target_id.empty()) { + return NULL; + } + for (const auto& target : target_registry) { + if (target.target_id == resolved_target_id) { + return ⌖ + } + } + return NULL; +} + +void* Query_Tool_Handler::get_connection(const std::string& target_id) { + const std::string resolved_target = target_id.empty() ? default_target_id : target_id; + const QueryTarget* target = resolve_target(resolved_target); + if (target == NULL) { + return NULL; + } + pthread_mutex_lock(&pool_lock); for (auto& conn : connection_pool) { - if (!conn.in_use) { + if (!conn.in_use && conn.target_id == resolved_target && conn.auth_profile_id == target->auth_profile_id) { conn.in_use = true; pthread_mutex_unlock(&pool_lock); return conn.mysql; @@ -403,7 +587,29 @@ void* Query_Tool_Handler::get_connection() { } pthread_mutex_unlock(&pool_lock); - proxy_error("Query_Tool_Handler: No available connection\n"); + proxy_error("Query_Tool_Handler: No available connection for target '%s'\n", resolved_target.c_str()); + return NULL; +} + +void* Query_Tool_Handler::get_pgsql_connection(const std::string& target_id) { + const std::string resolved_target = target_id.empty() ? default_target_id : target_id; + const QueryTarget* target = resolve_target(resolved_target); + if (target == NULL) { + return NULL; + } + + pthread_mutex_lock(&pool_lock); + + for (auto& conn : pgsql_connection_pool) { + if (!conn.in_use && conn.target_id == resolved_target && conn.auth_profile_id == target->auth_profile_id) { + conn.in_use = true; + pthread_mutex_unlock(&pool_lock); + return conn.pgconn; + } + } + + pthread_mutex_unlock(&pool_lock); + proxy_error("Query_Tool_Handler: No available pgsql connection for target '%s'\n", resolved_target.c_str()); return NULL; } @@ -415,7 +621,16 @@ void Query_Tool_Handler::return_connection(void* mysql_ptr) { for (auto& conn : connection_pool) { if (conn.mysql == mysql_ptr) { conn.in_use = false; - break; + pthread_mutex_unlock(&pool_lock); + return; + } + } + + for (auto& conn : pgsql_connection_pool) { + if (conn.pgconn == mysql_ptr) { + conn.in_use = false; + pthread_mutex_unlock(&pool_lock); + return; } } @@ -435,12 +650,101 @@ Query_Tool_Handler::MySQLConnection* Query_Tool_Handler::find_connection(void* m return nullptr; } -std::string Query_Tool_Handler::execute_query(const std::string& query) { - void* mysql = get_connection(); - if (!mysql) { - return "{\"error\": \"No available connection\"}"; +// Helper to find pgsql connection wrapper by PGconn pointer (thread-safe, acquires pool_lock) +Query_Tool_Handler::PgSQLConnection* Query_Tool_Handler::find_pgsql_connection(void* pgconn_ptr) { + pthread_mutex_lock(&pool_lock); + for (auto& conn : pgsql_connection_pool) { + if (conn.pgconn == pgconn_ptr) { + pthread_mutex_unlock(&pool_lock); + return &conn; + } + } + pthread_mutex_unlock(&pool_lock); + return nullptr; +} + +std::string Query_Tool_Handler::execute_query(const std::string& query, const std::string& target_id) { + const QueryTarget* target = resolve_target(target_id); + if (target == NULL) { + json j; + j["success"] = false; + j["error"] = std::string("Unknown target: ") + + (target_id.empty() ? default_target_id : target_id); + return j.dump(); } + if (target->protocol == "pgsql") { + void* pgconn_v = get_pgsql_connection(target_id); + if (!pgconn_v) { + json j; + j["success"] = false; + j["error"] = std::string("No available pgsql connection for target: ") + + (target_id.empty() ? default_target_id : target_id); + return j.dump(); + } + PGconn* pgconn = static_cast(pgconn_v); + PGresult* res = PQexec(pgconn, query.c_str()); + if (res == NULL) { + return_connection(pgconn_v); + json j; + j["success"] = false; + j["error"] = std::string("PQexec returned null result"); + return j.dump(); + } + + ExecStatusType st = PQresultStatus(res); + if (st != PGRES_TUPLES_OK && st != PGRES_COMMAND_OK) { + std::string err = PQresultErrorMessage(res); + PQclear(res); + return_connection(pgconn_v); + json j; + j["success"] = false; + j["error"] = err; + return j.dump(); + } + + if (st == PGRES_COMMAND_OK) { + const char* tuples = PQcmdTuples(res); + long affected = 0; + if (tuples && tuples[0] != '\0') { + affected = atol(tuples); + } + PQclear(res); + return_connection(pgconn_v); + json j; + j["success"] = true; + j["affected_rows"] = affected; + return j.dump(); + } + + int num_fields = PQnfields(res); + int num_rows = PQntuples(res); + json results = json::array(); + for (int r = 0; r < num_rows; r++) { + json row_data = json::array(); + for (int c = 0; c < num_fields; c++) { + row_data.push_back(PQgetisnull(res, r, c) ? "" : PQgetvalue(res, r, c)); + } + results.push_back(row_data); + } + PQclear(res); + return_connection(pgconn_v); + + json j; + j["success"] = true; + j["columns"] = num_fields; + j["rows"] = results; + return j.dump(); + } + + void* mysql = get_connection(target_id); + if (!mysql) { + json j; + j["success"] = false; + j["error"] = std::string("No available mysql connection for target: ") + + (target_id.empty() ? default_target_id : target_id); + return j.dump(); + } MYSQL* mysql_ptr = static_cast(mysql); if (mysql_query(mysql_ptr, query.c_str())) { @@ -490,13 +794,119 @@ std::string Query_Tool_Handler::execute_query(const std::string& query) { // Execute query with optional schema switching std::string Query_Tool_Handler::execute_query_with_schema( const std::string& query, - const std::string& schema + const std::string& schema, + const std::string& target_id ) { - void* mysql = get_connection(); - if (!mysql) { - return "{\"error\": \"No available connection\"}"; + const QueryTarget* target = resolve_target(target_id); + if (target == NULL) { + json j; + j["success"] = false; + j["error"] = std::string("Unknown target: ") + + (target_id.empty() ? default_target_id : target_id); + return j.dump(); + } + + if (target->protocol == "pgsql") { + void* pgconn_v = get_pgsql_connection(target_id); + if (!pgconn_v) { + json j; + j["success"] = false; + j["error"] = std::string("No available pgsql connection for target: ") + + (target_id.empty() ? default_target_id : target_id); + return j.dump(); + } + PGconn* pgconn = static_cast(pgconn_v); + PgSQLConnection* conn_wrapper = find_pgsql_connection(pgconn_v); + + if (!schema.empty() && conn_wrapper && conn_wrapper->current_schema != schema) { + std::string validated_schema = validate_sql_identifier_sqlite(schema); + if (validated_schema.empty()) { + return_connection(pgconn_v); + json j; + j["success"] = false; + j["error"] = "Invalid schema name: contains unsafe characters"; + return j.dump(); + } + std::string set_search_path = "SET search_path TO " + validated_schema; + PGresult* set_res = PQexec(pgconn, set_search_path.c_str()); + if (set_res == NULL || PQresultStatus(set_res) != PGRES_COMMAND_OK) { + std::string err = set_res ? PQresultErrorMessage(set_res) : "set search_path failed"; + if (set_res) { + PQclear(set_res); + } + return_connection(pgconn_v); + json j; + j["success"] = false; + j["error"] = err; + return j.dump(); + } + PQclear(set_res); + conn_wrapper->current_schema = validated_schema; + } + + PGresult* res = PQexec(pgconn, query.c_str()); + if (res == NULL) { + return_connection(pgconn_v); + json j; + j["success"] = false; + j["error"] = std::string("PQexec returned null result"); + return j.dump(); + } + + ExecStatusType st = PQresultStatus(res); + if (st != PGRES_TUPLES_OK && st != PGRES_COMMAND_OK) { + std::string err = PQresultErrorMessage(res); + PQclear(res); + return_connection(pgconn_v); + json j; + j["success"] = false; + j["error"] = err; + return j.dump(); + } + + if (st == PGRES_COMMAND_OK) { + const char* tuples = PQcmdTuples(res); + long affected = 0; + if (tuples && tuples[0] != '\0') { + affected = atol(tuples); + } + PQclear(res); + return_connection(pgconn_v); + json j; + j["success"] = true; + j["affected_rows"] = affected; + return j.dump(); + } + + int num_fields = PQnfields(res); + int num_rows = PQntuples(res); + json results = json::array(); + for (int r = 0; r < num_rows; r++) { + json row_data = json::array(); + for (int c = 0; c < num_fields; c++) { + row_data.push_back(PQgetisnull(res, r, c) ? "" : PQgetvalue(res, r, c)); + } + results.push_back(row_data); + } + + PQclear(res); + return_connection(pgconn_v); + + json j; + j["success"] = true; + j["columns"] = num_fields; + j["rows"] = results; + return j.dump(); } + void* mysql = get_connection(target_id); + if (!mysql) { + json j; + j["success"] = false; + j["error"] = std::string("No available mysql connection for target: ") + + (target_id.empty() ? default_target_id : target_id); + return j.dump(); + } MYSQL* mysql_ptr = static_cast(mysql); MySQLConnection* conn_wrapper = find_connection(mysql); @@ -686,18 +1096,25 @@ json Query_Tool_Handler::get_tool_list() { // ============================================================ // INVENTORY TOOLS // ============================================================ + tools.push_back(create_tool_schema( + "list_targets", + "List logical query targets. Each target maps internally to a ProxySQL hostgroup and routing policy.", + {}, + {} + )); + tools.push_back(create_tool_schema( "list_schemas", "List all available schemas/databases", {}, - {{"page_token", "string"}, {"page_size", "integer"}} + {{"page_token", "string"}, {"page_size", "integer"}, {"target_id", "string"}} )); tools.push_back(create_tool_schema( "list_tables", "List tables in a schema", {"schema"}, - {{"page_token", "string"}, {"page_size", "integer"}, {"name_filter", "string"}} + {{"page_token", "string"}, {"page_size", "integer"}, {"name_filter", "string"}, {"target_id", "string"}} )); // ============================================================ @@ -732,16 +1149,16 @@ json Query_Tool_Handler::get_tool_list() { // ============================================================ tools.push_back(create_tool_schema( "run_sql_readonly", - "Execute a read-only SQL query with safety guardrails enforced. Optional schema parameter switches database context before query execution.", + "Execute a read-only SQL query with safety guardrails enforced. Optional schema parameter switches database context before query execution. target_id routes the query to a logical backend target.", {"sql"}, - {{"schema", "string"}, {"max_rows", "integer"}, {"timeout_sec", "integer"}} + {{"schema", "string"}, {"target_id", "string"}, {"max_rows", "integer"}, {"timeout_sec", "integer"}} )); tools.push_back(create_tool_schema( "explain_sql", "Explain a query execution plan using EXPLAIN or EXPLAIN ANALYZE", {"sql"}, - {} + {{"target_id", "string"}} )); // ============================================================ @@ -988,9 +1405,39 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& // ============================================================ // INVENTORY TOOLS // ============================================================ - if (tool_name == "list_schemas") { + if (tool_name == "list_targets") { + refresh_target_registry(); + json targets = json::array(); + for (const auto& target : target_registry) { + json t; + t["target_id"] = target.target_id; + t["description"] = target.description; + json capabilities = json::array(); + capabilities.push_back("inventory"); + if (target.executable) { + capabilities.push_back("readonly_sql"); + capabilities.push_back("explain"); + } + t["capabilities"] = capabilities; + targets.push_back(t); + } + json payload; + payload["targets"] = targets; + payload["default_target_id"] = default_target_id; + result = create_success_response(payload); + } + + else if (tool_name == "list_schemas") { + std::string target_id = json_string(arguments, "target_id"); std::string page_token = json_string(arguments, "page_token"); int page_size = json_int(arguments, "page_size", 50); + if (!target_id.empty()) { + refresh_target_registry(); + const QueryTarget* target = resolve_target(target_id); + if (target == NULL) { + return create_error_response("Unknown target_id: " + target_id); + } + } // Query catalog's schemas table instead of live database char* error = NULL; @@ -1038,9 +1485,25 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& else if (tool_name == "list_tables") { std::string schema = json_string(arguments, "schema"); + std::string target_id = json_string(arguments, "target_id"); std::string page_token = json_string(arguments, "page_token"); int page_size = json_int(arguments, "page_size", 50); std::string name_filter = json_string(arguments, "name_filter"); + (void)page_token; + (void)page_size; + + refresh_target_registry(); + const QueryTarget* target = resolve_target(target_id); + if (target == NULL) { + result = create_error_response( + target_id.empty() ? "No executable default target available" : "Unknown target_id: " + target_id + ); + return result; + } + if (!target->executable) { + result = create_error_response("Target is not executable by this handler"); + return result; + } // Validate schema identifier if provided if (!schema.empty()) { @@ -1053,17 +1516,27 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } } - // TODO: Implement using MySQL connection std::ostringstream sql; - sql << "SHOW TABLES"; - if (!schema.empty()) { - sql << " FROM " << schema; - } - if (!name_filter.empty()) { - // Escape the name_filter to prevent SQL injection - sql << " LIKE '" << escape_string_literal(name_filter) << "'"; + if (target->protocol == "pgsql") { + sql << "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE'"; + if (!schema.empty()) { + sql << " AND table_schema='" << escape_string_literal(schema) << "'"; + } + if (!name_filter.empty()) { + sql << " AND table_name LIKE '" << escape_string_literal(name_filter) << "'"; + } + sql << " ORDER BY table_name"; + } else { + sql << "SHOW TABLES"; + if (!schema.empty()) { + sql << " FROM " << schema; + } + if (!name_filter.empty()) { + // Escape the name_filter to prevent SQL injection + sql << " LIKE '" << escape_string_literal(name_filter) << "'"; + } } - std::string query_result = execute_query(sql.str()); + std::string query_result = execute_query_with_schema(sql.str(), schema, target->target_id); result = create_success_response(json::parse(query_result)); } @@ -1733,12 +2206,28 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& else if (tool_name == "run_sql_readonly") { std::string sql = json_string(arguments, "sql"); std::string schema = json_string(arguments, "schema"); + std::string target_id = json_string(arguments, "target_id"); int max_rows = json_int(arguments, "max_rows", 200); int timeout_sec = json_int(arguments, "timeout_sec", 2); + (void)max_rows; + (void)timeout_sec; if (sql.empty()) { result = create_error_response("sql is required"); } else { + refresh_target_registry(); + const QueryTarget* target = resolve_target(target_id); + if (target == NULL) { + result = create_error_response( + target_id.empty() ? "No executable default target available" : "Unknown target_id: " + target_id + ); + return result; + } + if (!target->executable) { + result = create_error_response("Target is not executable by this handler"); + return result; + } + // ============================================================ // MCP QUERY RULES EVALUATION // ============================================================ @@ -1796,7 +2285,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& } else if (is_dangerous_query(sql)) { result = create_error_response("SQL contains dangerous operations"); } else { - std::string query_result = execute_query_with_schema(sql, schema); + std::string query_result = execute_query_with_schema(sql, schema, target->target_id); try { json result_json = json::parse(query_result); // Check if query actually failed @@ -1844,10 +2333,24 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json& else if (tool_name == "explain_sql") { std::string sql = json_string(arguments, "sql"); + std::string target_id = json_string(arguments, "target_id"); if (sql.empty()) { result = create_error_response("sql is required"); } else { - std::string query_result = execute_query("EXPLAIN " + sql); + refresh_target_registry(); + const QueryTarget* target = resolve_target(target_id); + if (target == NULL) { + result = create_error_response( + target_id.empty() ? "No executable default target available" : "Unknown target_id: " + target_id + ); + return result; + } + if (!target->executable) { + result = create_error_response("Target is not executable by this handler"); + return result; + } + + std::string query_result = execute_query("EXPLAIN " + sql, target->target_id); try { result = create_success_response(json::parse(query_result)); } catch (...) {