MCP: introduce profile-based target/auth routing and unified LOAD/SAVE MCP PROFILES commands

This commit introduces the core MCP routing architecture that decouples client-facing target selection from backend connection details.

Architecture introduced:
- New MCP auth profile table to store backend credentials/policy (`mcp_auth_profiles`).
- New MCP target profile table to map opaque `target_id` -> protocol/hostgroup/auth profile (`mcp_target_profiles`).
- Runtime mirrors for both profile tables.

Behavioral changes:
- MCP query tools resolve execution context by `target_id` and route internally by protocol + hostgroup.
- Client no longer needs backend-specific connection details; only logical target identifiers.
- Unified admin commands for profile lifecycle management:
  - `LOAD MCP PROFILES TO/FROM MEMORY/RUNTIME/DISK`
  - `SAVE MCP PROFILES TO/FROM MEMORY/RUNTIME/DISK`

Design goals:
- Support multi-backend MCP execution (MySQL + PostgreSQL) behind one endpoint.
- Provide server-side credential management and cleaner operational model.
- Replace legacy one-backend POC configuration with scalable target/auth abstractions.

Result:
- MCP routing is now dynamic, protocol-agnostic at the client layer, and fully managed through profile tables and unified load/save commands.
pull/5386/head
Rene Cannao 2 months ago
parent 5de836a4c1
commit 013864b36f

@ -175,6 +175,7 @@ Each MCP endpoint has its own dedicated tool handler with specific tools designe
**Purpose**: Safe database exploration and query execution
**Tools**:
- `list_targets` - List logical backend targets (hostgroup-backed routing handles)
- `list_schemas` - List databases
- `list_tables` - List tables in schema
- `describe_table` - Get table structure
@ -202,9 +203,37 @@ Each MCP endpoint has its own dedicated tool handler with specific tools designe
- Data analysis and discovery
- Query optimization assistance
- Two-phase discovery (static harvest + LLM analysis)
- Multi-backend routing via opaque `target_id` values instead of direct host/protocol details
**Authentication**: `mcp-query_endpoint_auth` (Bearer token)
#### Query Target Routing Model
`/mcp/query` now supports a dynamic routing model based on logical targets:
- Clients call `list_targets` to discover routable targets.
- Each target exposes:
- `target_id` (opaque identifier)
- `description` (human-readable summary)
- `capabilities` (for example `inventory`, `readonly_sql`, `explain`)
- Query tools (for example `run_sql_readonly`, `explain_sql`, `list_tables`) accept an optional `target_id`.
- If `target_id` is omitted, the server uses a default executable target when available.
Internally, the server resolves each `target_id` to runtime hostgroup metadata, a configured auth profile, and protocol-specific execution paths.
Credential separation model:
- MCP endpoint authentication (Bearer token) identifies and authorizes the MCP client.
- Backend database credentials are server-managed in MCP runtime profile tables:
- `runtime_mcp_target_profiles`
- `runtime_mcp_auth_profiles`
- Query execution pools are keyed by `target_id + auth_profile_id`.
Current execution support:
- `mysql` targets: `list_tables`, `run_sql_readonly`, `explain_sql`
- `pgsql` targets: `list_tables`, `run_sql_readonly`, `explain_sql`
---
#### `/mcp/admin` - Administration Endpoint

@ -90,12 +90,23 @@ curl -k -X POST "https://127.0.0.1:6071/mcp/query?token=YOUR_TOKEN" \
### Inventory Tools
#### list_targets
List logical query targets. Each target identifier maps internally to a ProxySQL hostgroup and routing policy.
Notes:
- Targets are loaded from server-side runtime profile tables and include backend auth mapping.
- MCP clients only use `target_id`; backend credentials are never sent in tool calls.
**Parameters:**
- None
#### list_schemas
List all available schemas/databases.
**Parameters:**
- `page_token` (string, optional) - Pagination token
- `page_size` (integer, optional) - Results per page (default: 50)
- `target_id` (string, optional) - Logical query target identifier
#### list_tables
List tables in a schema.
@ -105,6 +116,10 @@ List tables in a schema.
- `page_token` (string, optional) - Pagination token
- `page_size` (integer, optional) - Results per page (default: 50)
- `name_filter` (string, optional) - Filter table names by pattern
- `target_id` (string, optional) - Logical query target identifier
**Routing behavior:**
- Uses `SHOW TABLES` for MySQL targets and `information_schema.tables` for PostgreSQL targets.
### Structure Tools
@ -171,9 +186,14 @@ Execute a read-only SQL query with safety guardrails enforced.
**Parameters:**
- `sql` (string, **required**) - SQL query to execute
- `target_id` (string, optional) - Logical query target identifier
- `max_rows` (integer, optional) - Maximum rows to return (default: 200)
- `timeout_sec` (integer, optional) - Query timeout (default: 2)
**Routing behavior:**
- If `target_id` points to a MySQL target, query executes with MySQL protocol.
- If `target_id` points to a PostgreSQL target, query executes with PostgreSQL protocol.
**Safety rules:**
- Must start with SELECT
- No dangerous keywords (DROP, DELETE, INSERT, UPDATE, etc.)
@ -184,6 +204,10 @@ Explain a query execution plan using EXPLAIN or EXPLAIN ANALYZE.
**Parameters:**
- `sql` (string, **required**) - SQL query to explain
- `target_id` (string, optional) - Logical query target identifier
**Routing behavior:**
- Uses protocol-specific execution based on `target_id` (`mysql` or `pgsql` target).
### Relationship Inference Tools

@ -121,6 +121,7 @@ The following variables control authentication (Bearer tokens) for specific MCP
The Query Tool Handler provides LLM-based tools for MySQL database exploration and two-phase discovery, including:
- **inventory** - List databases and tables
- **targets** - Discover logical routing targets (`list_targets`)
- **structure** - Get table schema
- **profiling** - Analyze query performance
- **sampling** - Sample table data
@ -131,10 +132,29 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a
- **agent** - Agent coordination tools
- **llm** - LLM interaction tools
### Dynamic Target Discovery and Routing
Query tools use a logical `target_id` routing model with server-managed credentials:
- Use `list_targets` to retrieve discoverable backend targets.
- Active targets come from `runtime_mcp_target_profiles` joined with `runtime_mcp_auth_profiles`.
- The MCP server maps `target_id -> auth_profile_id` and applies backend credentials internally.
- MCP clients must never send backend credentials in tool arguments.
- Clients should pass `target_id` to query tools instead of host/protocol details.
### Backend Credential Model
Backend credentials are defined in MCP tables, not in client requests:
- `mcp_auth_profiles` / `runtime_mcp_auth_profiles`
- `mcp_target_profiles` / `runtime_mcp_target_profiles`
The in-memory target/auth map is loaded by `MCP_Threads_Handler` from runtime tables and used by the query executor connection pools.
#### `mcp-mysql_hosts`
- **Type:** String (comma-separated)
- **Default:** `"127.0.0.1"`
- **Description:** Comma-separated list of MySQL host addresses
- **Description:** Legacy POC variable used by non-routed components (for example static harvester defaults). Routed query execution uses MCP profile tables.
- **Runtime:** Yes
- **Example:**
```sql
@ -145,7 +165,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a
#### `mcp-mysql_ports`
- **Type:** String (comma-separated)
- **Default:** `"3306"`
- **Description:** Comma-separated list of MySQL ports (corresponds to `mcp-mysql_hosts`)
- **Description:** Legacy POC variable used by non-routed components (for example static harvester defaults). Routed query execution uses MCP profile tables.
- **Runtime:** Yes
- **Example:**
```sql
@ -156,7 +176,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a
#### `mcp-mysql_user`
- **Type:** String
- **Default:** `""` (empty)
- **Description:** MySQL username for tool handler connections
- **Description:** Legacy POC variable. Routed query execution uses credentials from `mcp_auth_profiles`.
- **Runtime:** Yes
- **Example:**
```sql
@ -167,7 +187,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a
#### `mcp-mysql_password`
- **Type:** String
- **Default:** `""` (empty)
- **Description:** MySQL password for tool handler connections
- **Description:** Legacy POC variable. Routed query execution uses credentials from `mcp_auth_profiles`.
- **Runtime:** Yes
- **Note:** Password is stored in plaintext in `global_variables`. Use restrictive MySQL user permissions.
- **Example:**
@ -179,7 +199,7 @@ The Query Tool Handler provides LLM-based tools for MySQL database exploration a
#### `mcp-mysql_schema`
- **Type:** String
- **Default:** `""` (empty)
- **Description:** Default database/schema to use for tool operations
- **Description:** Legacy POC variable. Routed query execution uses `default_schema` from `mcp_auth_profiles`.
- **Runtime:** Yes
- **Example:**
```sql
@ -222,6 +242,27 @@ LOAD MCP VARIABLES TO RUNTIME;
SAVE MCP VARIABLES TO DISK;
```
### Profile Commands
Unified command family for MCP backend profiles (auth + target together):
```sql
-- Disk -> Memory
LOAD MCP PROFILES FROM DISK;
LOAD MCP PROFILES TO MEMORY;
-- Memory -> Runtime
LOAD MCP PROFILES TO RUNTIME;
LOAD MCP PROFILES FROM MEMORY;
-- Runtime -> Memory
SAVE MCP PROFILES TO MEMORY;
SAVE MCP PROFILES FROM RUNTIME;
-- Memory -> Disk
SAVE MCP PROFILES TO DISK;
```
### Checksum Commands
```sql

@ -8,6 +8,9 @@
#include <pthread.h>
#include <cstring>
#include <cstdlib>
#include <map>
#include <string>
#include <vector>
// Forward declarations
class ProxySQL_MCP_Server;
@ -20,6 +23,7 @@ class Cache_Tool_Handler;
class Observe_Tool_Handler;
class AI_Tool_Handler;
class RAG_Tool_Handler;
class SQLite3_result;
/**
* @brief MCP Threads Handler class for managing MCP module configuration
@ -37,6 +41,21 @@ private:
pthread_rwlock_t rwlock; ///< Read-write lock for thread-safe access
public:
struct MCP_Target_Auth_Context {
std::string target_id;
std::string protocol;
int hostgroup_id;
std::string auth_profile_id;
std::string db_username;
std::string db_password;
std::string default_schema;
int max_rows;
int timeout_ms;
bool allow_explain;
bool allow_discovery;
std::string description;
};
/**
* @brief Structure holding MCP module configuration variables
*
@ -201,6 +220,25 @@ public:
* Outputs the MCP module version to stderr.
*/
void print_version();
/**
* @brief Load MCP target/auth profiles from a joined runtime resultset into memory map
* @return 0 on success, -1 on failure
*/
int load_target_auth_map(SQLite3_result* resultset);
/**
* @brief Resolve backend auth/policy context for a target_id
*/
bool get_target_auth_context(const std::string& target_id, MCP_Target_Auth_Context& out_ctx);
/**
* @brief Return all active target auth contexts (thread-safe copy)
*/
std::vector<MCP_Target_Auth_Context> get_all_target_auth_contexts();
private:
std::map<std::string, MCP_Target_Auth_Context> target_auth_map;
};
// Global instance of the MCP Threads Handler

@ -396,6 +396,56 @@
" comment VARCHAR" \
")"
// MCP backend authentication profiles (server-side credentials)
#define ADMIN_SQLITE_TABLE_MCP_AUTH_PROFILES "CREATE TABLE mcp_auth_profiles (" \
" auth_profile_id VARCHAR PRIMARY KEY NOT NULL ," \
" db_username VARCHAR NOT NULL ," \
" db_password VARCHAR NOT NULL ," \
" default_schema VARCHAR DEFAULT '' ," \
" use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 ," \
" ssl_mode VARCHAR DEFAULT '' ," \
" comment VARCHAR DEFAULT ''" \
")"
#define ADMIN_SQLITE_TABLE_RUNTIME_MCP_AUTH_PROFILES "CREATE TABLE runtime_mcp_auth_profiles (" \
" auth_profile_id VARCHAR PRIMARY KEY NOT NULL ," \
" db_username VARCHAR NOT NULL ," \
" db_password VARCHAR NOT NULL ," \
" default_schema VARCHAR DEFAULT '' ," \
" use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 ," \
" ssl_mode VARCHAR DEFAULT '' ," \
" comment VARCHAR DEFAULT ''" \
")"
// MCP target routing profiles: target_id -> (protocol, hostgroup, auth_profile, policy)
#define ADMIN_SQLITE_TABLE_MCP_TARGET_PROFILES "CREATE TABLE mcp_target_profiles (" \
" target_id VARCHAR PRIMARY KEY NOT NULL ," \
" protocol VARCHAR NOT NULL CHECK (protocol IN ('mysql','pgsql')) ," \
" hostgroup_id INT CHECK (hostgroup_id >= 0) NOT NULL ," \
" auth_profile_id VARCHAR NOT NULL ," \
" description VARCHAR DEFAULT '' ," \
" max_rows INT CHECK (max_rows > 0) NOT NULL DEFAULT 200 ," \
" timeout_ms INT CHECK (timeout_ms >= 0) NOT NULL DEFAULT 2000 ," \
" allow_explain INT CHECK (allow_explain IN (0,1)) NOT NULL DEFAULT 1 ," \
" allow_discovery INT CHECK (allow_discovery IN (0,1)) NOT NULL DEFAULT 1 ," \
" active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 ," \
" comment VARCHAR DEFAULT ''" \
")"
#define ADMIN_SQLITE_TABLE_RUNTIME_MCP_TARGET_PROFILES "CREATE TABLE runtime_mcp_target_profiles (" \
" target_id VARCHAR PRIMARY KEY NOT NULL ," \
" protocol VARCHAR NOT NULL CHECK (protocol IN ('mysql','pgsql')) ," \
" hostgroup_id INT CHECK (hostgroup_id >= 0) NOT NULL ," \
" auth_profile_id VARCHAR NOT NULL ," \
" description VARCHAR DEFAULT '' ," \
" max_rows INT CHECK (max_rows > 0) NOT NULL DEFAULT 200 ," \
" timeout_ms INT CHECK (timeout_ms >= 0) NOT NULL DEFAULT 2000 ," \
" allow_explain INT CHECK (allow_explain IN (0,1)) NOT NULL DEFAULT 1 ," \
" allow_discovery INT CHECK (allow_discovery IN (0,1)) NOT NULL DEFAULT 1 ," \
" active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 ," \
" comment VARCHAR DEFAULT ''" \
")"
// MCP query digest statistics table
#define STATS_SQLITE_TABLE_MCP_QUERY_DIGEST "CREATE TABLE stats_mcp_query_digest (" \
" tool_name VARCHAR NOT NULL ," \

@ -28,12 +28,28 @@ class Static_Harvester;
*/
class Query_Tool_Handler : public MCP_Tool_Handler {
private:
// Logical query targets discovered from runtime hostgroups
struct QueryTarget {
std::string target_id; ///< Opaque MCP-visible target identifier
std::string description; ///< Human-readable target description
std::string protocol; ///< Internal protocol: mysql|pgsql
int hostgroup_id; ///< Internal hostgroup identifier
std::string auth_profile_id; ///< Server-side auth profile identifier
std::string db_username; ///< Backend DB username (from auth profile)
std::string db_password; ///< Backend DB password (from auth profile)
std::string default_schema; ///< Backend default schema/database
std::string host; ///< Concrete host used by executor
int port; ///< Concrete port used by executor
bool executable; ///< True if current handler can execute against this target
};
// MySQL connection configuration
std::string mysql_hosts;
std::string mysql_ports;
std::string mysql_user;
std::string mysql_password;
std::string mysql_schema;
std::string default_target_id;
// Discovery components (NEW - replaces MySQL_Tool_Handler wrapper)
Discovery_Schema* catalog; ///< Discovery catalog (replaces old MySQL_Catalog)
@ -42,14 +58,28 @@ private:
// Connection pool for MySQL queries
struct MySQLConnection {
void* mysql; ///< MySQL connection handle (MYSQL*)
std::string target_id;
std::string auth_profile_id;
std::string host;
int port;
bool in_use;
std::string current_schema; ///< Track current schema for this connection
};
struct PgSQLConnection {
void* pgconn; ///< PostgreSQL connection handle (PGconn*)
std::string target_id;
std::string auth_profile_id;
std::string host;
int port;
bool in_use;
std::string current_schema; ///< Track current schema for this connection
};
std::vector<MySQLConnection> connection_pool;
std::vector<PgSQLConnection> pgsql_connection_pool;
std::vector<QueryTarget> target_registry;
pthread_mutex_t pool_lock;
int pool_size;
int pool_size; ///< MySQL pool size
int pg_pool_size; ///< PostgreSQL pool size
// Query guardrails
int max_rows;
@ -106,10 +136,21 @@ private:
*/
int init_connection_pool();
/**
* @brief Discover logical targets from runtime hostgroups
*/
void refresh_target_registry();
/**
* @brief Resolve target id (or default target if empty)
*/
const QueryTarget* resolve_target(const std::string& target_id);
/**
* @brief Get a connection from the pool
*/
void* get_connection();
void* get_connection(const std::string& target_id);
void* get_pgsql_connection(const std::string& target_id);
/**
* @brief Return a connection to the pool
@ -123,11 +164,12 @@ private:
* @note Caller should NOT hold pool_lock when calling this
*/
MySQLConnection* find_connection(void* mysql_ptr);
PgSQLConnection* find_pgsql_connection(void* pgconn_ptr);
/**
* @brief Execute a query and return results as JSON
*/
std::string execute_query(const std::string& query);
std::string execute_query(const std::string& query, const std::string& target_id = "");
/**
* @brief Execute a query with optional schema switching
@ -137,7 +179,8 @@ private:
*/
std::string execute_query_with_schema(
const std::string& query,
const std::string& schema
const std::string& schema,
const std::string& target_id
);
/**

@ -822,8 +822,14 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) {
// MCP query rules
insert_into_tables_defs(tables_defs_admin, "mcp_query_rules", ADMIN_SQLITE_TABLE_MCP_QUERY_RULES);
insert_into_tables_defs(tables_defs_admin, "runtime_mcp_query_rules", ADMIN_SQLITE_TABLE_RUNTIME_MCP_QUERY_RULES);
insert_into_tables_defs(tables_defs_admin, "mcp_auth_profiles", ADMIN_SQLITE_TABLE_MCP_AUTH_PROFILES);
insert_into_tables_defs(tables_defs_admin, "runtime_mcp_auth_profiles", ADMIN_SQLITE_TABLE_RUNTIME_MCP_AUTH_PROFILES);
insert_into_tables_defs(tables_defs_admin, "mcp_target_profiles", ADMIN_SQLITE_TABLE_MCP_TARGET_PROFILES);
insert_into_tables_defs(tables_defs_admin, "runtime_mcp_target_profiles", ADMIN_SQLITE_TABLE_RUNTIME_MCP_TARGET_PROFILES);
insert_into_tables_defs(tables_defs_config, "mcp_query_rules", ADMIN_SQLITE_TABLE_MCP_QUERY_RULES);
insert_into_tables_defs(tables_defs_config, "mcp_auth_profiles", ADMIN_SQLITE_TABLE_MCP_AUTH_PROFILES);
insert_into_tables_defs(tables_defs_config, "mcp_target_profiles", ADMIN_SQLITE_TABLE_MCP_TARGET_PROFILES);
#endif /* PROXYSQLGENAI */
insert_into_tables_defs(tables_defs_config, "pgsql_servers", ADMIN_SQLITE_TABLE_PGSQL_SERVERS);

@ -2534,6 +2534,164 @@ bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query
// MCP QUERY RULES COMMAND HANDLERS
// ============================================================
#ifdef PROXYSQLGENAI
// ============================================================
// MCP PROFILES COMMAND HANDLERS (auth + target together)
// ============================================================
if ((query_no_space_length > 17) &&
((!strncasecmp("SAVE MCP PROFILES ", query_no_space, 18)) ||
(!strncasecmp("LOAD MCP PROFILES ", query_no_space, 18)))) {
ProxySQL_Admin *SPA = (ProxySQL_Admin *)pa;
const auto load_target_auth_map_from_runtime = [&]() -> bool {
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
const char* q =
"SELECT t.target_id, t.protocol, t.hostgroup_id, t.auth_profile_id,"
" t.max_rows, t.timeout_ms, t.allow_explain, t.allow_discovery, t.description,"
" a.db_username, a.db_password, a.default_schema"
" FROM runtime_mcp_target_profiles t"
" JOIN runtime_mcp_auth_profiles a ON a.auth_profile_id=t.auth_profile_id"
" WHERE t.active=1"
" ORDER BY t.target_id";
SPA->admindb->execute_statement(q, &error, &cols, &affected_rows, &resultset);
if (error) {
proxy_error("Failed to load MCP target auth map: %s\n", error);
free(error);
if (resultset) {
delete resultset;
}
return false;
}
if (GloMCPH) {
GloMCPH->load_target_auth_map(resultset);
} else if (resultset) {
delete resultset;
}
return true;
};
// LOAD MCP PROFILES FROM DISK / TO MEMORY
if (
(query_no_space_length == strlen("LOAD MCP PROFILES FROM DISK") &&
!strncasecmp("LOAD MCP PROFILES FROM DISK", query_no_space, query_no_space_length)) ||
(query_no_space_length == strlen("LOAD MCP PROFILES TO MEMORY") &&
!strncasecmp("LOAD MCP PROFILES TO MEMORY", query_no_space, query_no_space_length))
) {
if (!SPA->admindb->execute("BEGIN")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction");
return false;
}
if (!SPA->admindb->execute("DELETE FROM main.mcp_auth_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_auth_profiles SELECT * FROM disk.mcp_auth_profiles") ||
!SPA->admindb->execute("DELETE FROM main.mcp_target_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_target_profiles SELECT * FROM disk.mcp_target_profiles")) {
SPA->admindb->execute("ROLLBACK");
SPA->send_error_msg_to_client(sess, (char *)"Failed to load MCP profiles from disk");
return false;
}
if (!SPA->admindb->execute("COMMIT")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction");
return false;
}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}
// SAVE MCP PROFILES TO DISK
if (
(query_no_space_length == strlen("SAVE MCP PROFILES TO DISK") &&
!strncasecmp("SAVE MCP PROFILES TO DISK", query_no_space, query_no_space_length))
) {
if (!SPA->admindb->execute("BEGIN")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction");
return false;
}
if (!SPA->admindb->execute("DELETE FROM disk.mcp_auth_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO disk.mcp_auth_profiles SELECT * FROM main.mcp_auth_profiles") ||
!SPA->admindb->execute("DELETE FROM disk.mcp_target_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO disk.mcp_target_profiles SELECT * FROM main.mcp_target_profiles")) {
SPA->admindb->execute("ROLLBACK");
SPA->send_error_msg_to_client(sess, (char *)"Failed to save MCP profiles to disk");
return false;
}
if (!SPA->admindb->execute("COMMIT")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction");
return false;
}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}
// LOAD MCP PROFILES TO RUNTIME / FROM MEMORY
if (
(query_no_space_length == strlen("LOAD MCP PROFILES TO RUNTIME") &&
!strncasecmp("LOAD MCP PROFILES TO RUNTIME", query_no_space, query_no_space_length)) ||
(query_no_space_length == strlen("LOAD MCP PROFILES TO RUN") &&
!strncasecmp("LOAD MCP PROFILES TO RUN", query_no_space, query_no_space_length)) ||
(query_no_space_length == strlen("LOAD MCP PROFILES FROM MEMORY") &&
!strncasecmp("LOAD MCP PROFILES FROM MEMORY", query_no_space, query_no_space_length)) ||
(query_no_space_length == strlen("LOAD MCP PROFILES FROM MEM") &&
!strncasecmp("LOAD MCP PROFILES FROM MEM", query_no_space, query_no_space_length))
) {
if (!SPA->admindb->execute("BEGIN")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction");
return false;
}
if (!SPA->admindb->execute("DELETE FROM runtime_mcp_auth_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO runtime_mcp_auth_profiles SELECT * FROM main.mcp_auth_profiles") ||
!SPA->admindb->execute("DELETE FROM runtime_mcp_target_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO runtime_mcp_target_profiles SELECT * FROM main.mcp_target_profiles")) {
SPA->admindb->execute("ROLLBACK");
SPA->send_error_msg_to_client(sess, (char *)"Failed to load MCP profiles to runtime");
return false;
}
if (!SPA->admindb->execute("COMMIT")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction");
return false;
}
if (!load_target_auth_map_from_runtime()) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to refresh MCP runtime profile map");
return false;
}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}
// SAVE MCP PROFILES FROM RUNTIME / TO MEMORY
if (
(query_no_space_length == strlen("SAVE MCP PROFILES TO MEMORY") &&
!strncasecmp("SAVE MCP PROFILES TO MEMORY", query_no_space, query_no_space_length)) ||
(query_no_space_length == strlen("SAVE MCP PROFILES TO MEM") &&
!strncasecmp("SAVE MCP PROFILES TO MEM", query_no_space, query_no_space_length)) ||
(query_no_space_length == strlen("SAVE MCP PROFILES FROM RUNTIME") &&
!strncasecmp("SAVE MCP PROFILES FROM RUNTIME", query_no_space, query_no_space_length)) ||
(query_no_space_length == strlen("SAVE MCP PROFILES FROM RUN") &&
!strncasecmp("SAVE MCP PROFILES FROM RUN", query_no_space, query_no_space_length))
) {
if (!SPA->admindb->execute("BEGIN")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to BEGIN transaction");
return false;
}
if (!SPA->admindb->execute("DELETE FROM main.mcp_auth_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_auth_profiles SELECT * FROM runtime_mcp_auth_profiles") ||
!SPA->admindb->execute("DELETE FROM main.mcp_target_profiles") ||
!SPA->admindb->execute("INSERT OR REPLACE INTO main.mcp_target_profiles SELECT * FROM runtime_mcp_target_profiles")) {
SPA->admindb->execute("ROLLBACK");
SPA->send_error_msg_to_client(sess, (char *)"Failed to save MCP profiles from runtime");
return false;
}
if (!SPA->admindb->execute("COMMIT")) {
SPA->send_error_msg_to_client(sess, (char *)"Failed to COMMIT transaction");
return false;
}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}
}
// Supported commands:
// LOAD MCP QUERY RULES FROM DISK - Copy from disk to memory
// LOAD MCP QUERY RULES TO MEMORY - Copy from disk to memory (alias)

@ -389,4 +389,64 @@ void MCP_Threads_Handler::print_version() {
fprintf(stderr, "MCP Threads Handler rev. %s -- %s -- %s\n", MCP_THREAD_VERSION, __FILE__, __TIMESTAMP__);
}
int MCP_Threads_Handler::load_target_auth_map(SQLite3_result* resultset) {
if (!resultset) {
return -1;
}
std::map<std::string, MCP_Target_Auth_Context> new_map;
for (auto row : resultset->rows) {
if (row->cnt < 12 || !row->fields[0] || !row->fields[1] || !row->fields[2] || !row->fields[3] ||
!row->fields[9] || !row->fields[10]) {
continue;
}
MCP_Target_Auth_Context ctx;
ctx.target_id = row->fields[0];
ctx.protocol = row->fields[1];
ctx.hostgroup_id = atoi(row->fields[2]);
ctx.auth_profile_id = row->fields[3];
ctx.max_rows = row->fields[4] ? atoi(row->fields[4]) : 200;
ctx.timeout_ms = row->fields[5] ? atoi(row->fields[5]) : 2000;
ctx.allow_explain = row->fields[6] ? (atoi(row->fields[6]) != 0) : true;
ctx.allow_discovery = row->fields[7] ? (atoi(row->fields[7]) != 0) : true;
ctx.description = row->fields[8] ? row->fields[8] : "";
ctx.db_username = row->fields[9];
ctx.db_password = row->fields[10];
ctx.default_schema = row->fields[11] ? row->fields[11] : "";
new_map[ctx.target_id] = ctx;
}
delete resultset;
pthread_rwlock_wrlock(&rwlock);
target_auth_map.swap(new_map);
pthread_rwlock_unlock(&rwlock);
proxy_info("MCP_Threads_Handler: loaded %zu target auth profile mapping(s)\n", target_auth_map.size());
return 0;
}
bool MCP_Threads_Handler::get_target_auth_context(const std::string& target_id, MCP_Target_Auth_Context& out_ctx) {
pthread_rwlock_rdlock(&rwlock);
auto it = target_auth_map.find(target_id);
if (it == target_auth_map.end()) {
pthread_rwlock_unlock(&rwlock);
return false;
}
out_ctx = it->second;
pthread_rwlock_unlock(&rwlock);
return true;
}
std::vector<MCP_Threads_Handler::MCP_Target_Auth_Context> MCP_Threads_Handler::get_all_target_auth_contexts() {
std::vector<MCP_Target_Auth_Context> out;
pthread_rwlock_rdlock(&rwlock);
out.reserve(target_auth_map.size());
for (const auto& kv : target_auth_map) {
out.push_back(kv.second);
}
pthread_rwlock_unlock(&rwlock);
return out;
}
#endif /* PROXYSQLGENAI */

@ -2977,6 +2977,35 @@ void ProxySQL_Admin::init_mcp_variables() {
flush_mcp_variables___runtime_to_database(configdb, false, false, false, false, false);
flush_mcp_variables___runtime_to_database(admindb, false, true, false, false, false);
flush_mcp_variables___database_to_runtime(admindb, true, "", 0);
// Load MCP target/auth profiles into runtime tables and then in-memory map.
admindb->execute("DELETE FROM runtime_mcp_auth_profiles");
admindb->execute("INSERT OR REPLACE INTO runtime_mcp_auth_profiles SELECT * FROM main.mcp_auth_profiles");
admindb->execute("DELETE FROM runtime_mcp_target_profiles");
admindb->execute("INSERT OR REPLACE INTO runtime_mcp_target_profiles SELECT * FROM main.mcp_target_profiles");
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
const char* q =
"SELECT t.target_id, t.protocol, t.hostgroup_id, t.auth_profile_id,"
" t.max_rows, t.timeout_ms, t.allow_explain, t.allow_discovery, t.description,"
" a.db_username, a.db_password, a.default_schema"
" FROM runtime_mcp_target_profiles t"
" JOIN runtime_mcp_auth_profiles a ON a.auth_profile_id=t.auth_profile_id"
" WHERE t.active=1"
" ORDER BY t.target_id";
admindb->execute_statement(q, &error, &cols, &affected_rows, &resultset);
if (error) {
proxy_error("Failed to load MCP target auth map: %s\n", error);
free(error);
if (resultset) {
delete resultset;
}
} else {
GloMCPH->load_target_auth_map(resultset);
}
}
}

@ -8,7 +8,9 @@ using json = nlohmann::json;
#define PROXYJSON
#include "Query_Tool_Handler.h"
#include "MCP_Thread.h"
#include "proxysql_debug.h"
#include "proxysql_admin.h"
#include "Static_Harvester.h"
#include <vector>
@ -18,6 +20,9 @@ using json = nlohmann::json;
// MySQL client library
#include <mysql.h>
#include <libpq-fe.h>
extern ProxySQL_Admin *GloAdmin;
// ============================================================
// JSON Helper Functions
@ -203,6 +208,7 @@ Query_Tool_Handler::Query_Tool_Handler(
: catalog(NULL),
harvester(NULL),
pool_size(0),
pg_pool_size(0),
max_rows(200),
timeout_ms(2000),
allow_select_star(false)
@ -302,58 +308,42 @@ void Query_Tool_Handler::close() {
connection_pool.clear();
pool_size = 0;
pthread_mutex_unlock(&pool_lock);
}
int Query_Tool_Handler::init_connection_pool() {
// Parse hosts
std::vector<std::string> host_list;
std::istringstream h(mysql_hosts);
std::string host;
while (std::getline(h, host, ',')) {
host.erase(0, host.find_first_not_of(" \t"));
host.erase(host.find_last_not_of(" \t") + 1);
if (!host.empty()) {
host_list.push_back(host);
for (auto& conn : pgsql_connection_pool) {
if (conn.pgconn) {
PQfinish(static_cast<PGconn*>(conn.pgconn));
conn.pgconn = NULL;
}
}
pgsql_connection_pool.clear();
pg_pool_size = 0;
// Parse ports
std::vector<int> port_list;
std::istringstream p(mysql_ports);
std::string port;
while (std::getline(p, port, ',')) {
port.erase(0, port.find_first_not_of(" \t"));
port.erase(port.find_last_not_of(" \t") + 1);
if (!port.empty()) {
port_list.push_back(atoi(port.c_str()));
}
}
// Ensure ports array matches hosts array size
while (port_list.size() < host_list.size()) {
port_list.push_back(3306);
}
pthread_mutex_unlock(&pool_lock);
}
if (host_list.empty()) {
proxy_error("Query_Tool_Handler: No hosts configured\n");
return -1;
}
int Query_Tool_Handler::init_connection_pool() {
refresh_target_registry();
pthread_mutex_lock(&pool_lock);
pool_size = 0;
pg_pool_size = 0;
for (size_t i = 0; i < host_list.size(); i++) {
for (const auto& target : target_registry) {
if (!target.executable || target.protocol != "mysql") {
continue;
}
MySQLConnection conn;
conn.host = host_list[i];
conn.port = port_list[i];
conn.target_id = target.target_id;
conn.auth_profile_id = target.auth_profile_id;
conn.host = target.host;
conn.port = target.port;
conn.in_use = false;
conn.current_schema = target.default_schema;
MYSQL* mysql = mysql_init(NULL);
if (!mysql) {
proxy_error("Query_Tool_Handler: mysql_init failed for %s:%d\n",
conn.host.c_str(), conn.port);
pthread_mutex_unlock(&pool_lock);
return -1;
continue;
}
unsigned int timeout = 5;
@ -364,9 +354,9 @@ int Query_Tool_Handler::init_connection_pool() {
if (!mysql_real_connect(
mysql,
conn.host.c_str(),
mysql_user.c_str(),
mysql_password.c_str(),
mysql_schema.empty() ? NULL : mysql_schema.c_str(),
target.db_username.c_str(),
target.db_password.c_str(),
target.default_schema.empty() ? NULL : target.default_schema.c_str(),
conn.port,
NULL,
CLIENT_MULTI_STATEMENTS
@ -374,28 +364,222 @@ int Query_Tool_Handler::init_connection_pool() {
proxy_error("Query_Tool_Handler: mysql_real_connect failed for %s:%d: %s\n",
conn.host.c_str(), conn.port, mysql_error(mysql));
mysql_close(mysql);
pthread_mutex_unlock(&pool_lock);
return -1;
continue;
}
conn.mysql = mysql;
connection_pool.push_back(conn);
pool_size++;
proxy_info("Query_Tool_Handler: Connected to %s:%d\n",
conn.host.c_str(), conn.port);
proxy_info("Query_Tool_Handler: Connected target '%s' to %s:%d\n",
conn.target_id.c_str(), conn.host.c_str(), conn.port);
if (default_target_id.empty()) {
default_target_id = conn.target_id;
}
}
for (const auto& target : target_registry) {
if (!target.executable || target.protocol != "pgsql") {
continue;
}
PgSQLConnection conn;
conn.target_id = target.target_id;
conn.auth_profile_id = target.auth_profile_id;
conn.host = target.host;
conn.port = target.port;
conn.in_use = false;
conn.current_schema = target.default_schema;
std::ostringstream conninfo;
conninfo << "host=" << conn.host
<< " port=" << conn.port
<< " user=" << target.db_username
<< " password=" << target.db_password
<< " connect_timeout=5";
if (!target.default_schema.empty()) {
conninfo << " dbname=" << target.default_schema;
}
PGconn* pgconn = PQconnectdb(conninfo.str().c_str());
if (pgconn == NULL || PQstatus(pgconn) != CONNECTION_OK) {
proxy_error(
"Query_Tool_Handler: PQconnectdb failed for %s:%d: %s\n",
conn.host.c_str(), conn.port, pgconn ? PQerrorMessage(pgconn) : "null connection"
);
if (pgconn) {
PQfinish(pgconn);
}
continue;
}
conn.pgconn = pgconn;
pgsql_connection_pool.push_back(conn);
pg_pool_size++;
proxy_info("Query_Tool_Handler: Connected target '%s' to pgsql %s:%d\n",
conn.target_id.c_str(), conn.host.c_str(), conn.port);
if (default_target_id.empty()) {
default_target_id = conn.target_id;
}
}
pthread_mutex_unlock(&pool_lock);
proxy_info("Query_Tool_Handler: Connection pool initialized with %d connection(s)\n", pool_size);
if ((pool_size + pg_pool_size) == 0) {
proxy_error("Query_Tool_Handler: No executable targets available\n");
return -1;
}
proxy_info(
"Query_Tool_Handler: Connection pools initialized mysql=%d pgsql=%d, default target '%s'\n",
pool_size, pg_pool_size, default_target_id.c_str()
);
return 0;
}
void* Query_Tool_Handler::get_connection() {
void Query_Tool_Handler::refresh_target_registry() {
target_registry.clear();
default_target_id.clear();
if (!GloMCPH) {
return;
}
// Refresh MCP target/auth map from runtime profile tables before resolving targets.
if (GloAdmin && GloAdmin->admindb) {
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
const char* q =
"SELECT t.target_id, t.protocol, t.hostgroup_id, t.auth_profile_id,"
" t.max_rows, t.timeout_ms, t.allow_explain, t.allow_discovery, t.description,"
" a.db_username, a.db_password, a.default_schema"
" FROM runtime_mcp_target_profiles t"
" JOIN runtime_mcp_auth_profiles a ON a.auth_profile_id=t.auth_profile_id"
" WHERE t.active=1"
" ORDER BY t.target_id";
GloAdmin->admindb->execute_statement(q, &error, &cols, &affected_rows, &resultset);
if (error) {
proxy_warning("Query_Tool_Handler: failed refreshing target auth map: %s\n", error);
free(error);
if (resultset) {
delete resultset;
}
} else {
GloMCPH->load_target_auth_map(resultset);
}
}
const auto profiles = GloMCPH->get_all_target_auth_contexts();
const auto resolve_endpoint = [&](
const std::string& protocol,
int hostgroup_id,
std::string& host,
int& port,
int& backends
) -> bool {
if (GloAdmin == NULL || GloAdmin->admindb == NULL) {
return false;
}
const char* table_name = (protocol == "pgsql") ? "runtime_pgsql_servers" : "runtime_mysql_servers";
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
std::ostringstream sql;
sql << "SELECT hostname, port FROM " << table_name
<< " WHERE hostgroup_id=" << hostgroup_id
<< " AND UPPER(status)='ONLINE'"
<< " ORDER BY weight DESC, hostname, port";
GloAdmin->admindb->execute_statement(sql.str().c_str(), &error, &cols, &affected_rows, &resultset);
if (error) {
proxy_warning("Query_Tool_Handler: endpoint resolution failed for %s/%d: %s\n",
protocol.c_str(), hostgroup_id, error);
free(error);
if (resultset) {
delete resultset;
}
return false;
}
if (!resultset || resultset->rows.empty()) {
if (resultset) {
delete resultset;
}
return false;
}
backends = resultset->rows.size();
host = resultset->rows[0]->fields[0] ? resultset->rows[0]->fields[0] : "";
port = resultset->rows[0]->fields[1] ? atoi(resultset->rows[0]->fields[1]) : ((protocol == "pgsql") ? 5432 : 3306);
delete resultset;
return !host.empty();
};
for (const auto& ctx : profiles) {
QueryTarget target;
target.target_id = ctx.target_id;
target.protocol = ctx.protocol;
target.hostgroup_id = ctx.hostgroup_id;
target.auth_profile_id = ctx.auth_profile_id;
target.db_username = ctx.db_username;
target.db_password = ctx.db_password;
target.default_schema = ctx.default_schema;
target.description = ctx.description;
target.executable = false;
int backend_count = 0;
if (resolve_endpoint(target.protocol, target.hostgroup_id, target.host, target.port, backend_count)) {
target.executable = !target.db_username.empty();
if (target.description.empty()) {
target.description = "Hostgroup " + std::to_string(target.hostgroup_id) +
" (" + std::to_string(backend_count) + " backend(s))";
}
} else {
if (target.description.empty()) {
target.description = "Hostgroup " + std::to_string(target.hostgroup_id) + " (no ONLINE backends)";
}
}
target_registry.push_back(target);
}
for (const auto& target : target_registry) {
if (target.executable) {
default_target_id = target.target_id;
break;
}
}
}
const Query_Tool_Handler::QueryTarget* Query_Tool_Handler::resolve_target(const std::string& target_id) {
const std::string& resolved_target_id = target_id.empty() ? default_target_id : target_id;
if (resolved_target_id.empty()) {
return NULL;
}
for (const auto& target : target_registry) {
if (target.target_id == resolved_target_id) {
return &target;
}
}
return NULL;
}
void* Query_Tool_Handler::get_connection(const std::string& target_id) {
const std::string resolved_target = target_id.empty() ? default_target_id : target_id;
const QueryTarget* target = resolve_target(resolved_target);
if (target == NULL) {
return NULL;
}
pthread_mutex_lock(&pool_lock);
for (auto& conn : connection_pool) {
if (!conn.in_use) {
if (!conn.in_use && conn.target_id == resolved_target && conn.auth_profile_id == target->auth_profile_id) {
conn.in_use = true;
pthread_mutex_unlock(&pool_lock);
return conn.mysql;
@ -403,7 +587,29 @@ void* Query_Tool_Handler::get_connection() {
}
pthread_mutex_unlock(&pool_lock);
proxy_error("Query_Tool_Handler: No available connection\n");
proxy_error("Query_Tool_Handler: No available connection for target '%s'\n", resolved_target.c_str());
return NULL;
}
void* Query_Tool_Handler::get_pgsql_connection(const std::string& target_id) {
const std::string resolved_target = target_id.empty() ? default_target_id : target_id;
const QueryTarget* target = resolve_target(resolved_target);
if (target == NULL) {
return NULL;
}
pthread_mutex_lock(&pool_lock);
for (auto& conn : pgsql_connection_pool) {
if (!conn.in_use && conn.target_id == resolved_target && conn.auth_profile_id == target->auth_profile_id) {
conn.in_use = true;
pthread_mutex_unlock(&pool_lock);
return conn.pgconn;
}
}
pthread_mutex_unlock(&pool_lock);
proxy_error("Query_Tool_Handler: No available pgsql connection for target '%s'\n", resolved_target.c_str());
return NULL;
}
@ -415,7 +621,16 @@ void Query_Tool_Handler::return_connection(void* mysql_ptr) {
for (auto& conn : connection_pool) {
if (conn.mysql == mysql_ptr) {
conn.in_use = false;
break;
pthread_mutex_unlock(&pool_lock);
return;
}
}
for (auto& conn : pgsql_connection_pool) {
if (conn.pgconn == mysql_ptr) {
conn.in_use = false;
pthread_mutex_unlock(&pool_lock);
return;
}
}
@ -435,12 +650,101 @@ Query_Tool_Handler::MySQLConnection* Query_Tool_Handler::find_connection(void* m
return nullptr;
}
std::string Query_Tool_Handler::execute_query(const std::string& query) {
void* mysql = get_connection();
if (!mysql) {
return "{\"error\": \"No available connection\"}";
// Helper to find pgsql connection wrapper by PGconn pointer (thread-safe, acquires pool_lock)
Query_Tool_Handler::PgSQLConnection* Query_Tool_Handler::find_pgsql_connection(void* pgconn_ptr) {
pthread_mutex_lock(&pool_lock);
for (auto& conn : pgsql_connection_pool) {
if (conn.pgconn == pgconn_ptr) {
pthread_mutex_unlock(&pool_lock);
return &conn;
}
}
pthread_mutex_unlock(&pool_lock);
return nullptr;
}
std::string Query_Tool_Handler::execute_query(const std::string& query, const std::string& target_id) {
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
json j;
j["success"] = false;
j["error"] = std::string("Unknown target: ") +
(target_id.empty() ? default_target_id : target_id);
return j.dump();
}
if (target->protocol == "pgsql") {
void* pgconn_v = get_pgsql_connection(target_id);
if (!pgconn_v) {
json j;
j["success"] = false;
j["error"] = std::string("No available pgsql connection for target: ") +
(target_id.empty() ? default_target_id : target_id);
return j.dump();
}
PGconn* pgconn = static_cast<PGconn*>(pgconn_v);
PGresult* res = PQexec(pgconn, query.c_str());
if (res == NULL) {
return_connection(pgconn_v);
json j;
j["success"] = false;
j["error"] = std::string("PQexec returned null result");
return j.dump();
}
ExecStatusType st = PQresultStatus(res);
if (st != PGRES_TUPLES_OK && st != PGRES_COMMAND_OK) {
std::string err = PQresultErrorMessage(res);
PQclear(res);
return_connection(pgconn_v);
json j;
j["success"] = false;
j["error"] = err;
return j.dump();
}
if (st == PGRES_COMMAND_OK) {
const char* tuples = PQcmdTuples(res);
long affected = 0;
if (tuples && tuples[0] != '\0') {
affected = atol(tuples);
}
PQclear(res);
return_connection(pgconn_v);
json j;
j["success"] = true;
j["affected_rows"] = affected;
return j.dump();
}
int num_fields = PQnfields(res);
int num_rows = PQntuples(res);
json results = json::array();
for (int r = 0; r < num_rows; r++) {
json row_data = json::array();
for (int c = 0; c < num_fields; c++) {
row_data.push_back(PQgetisnull(res, r, c) ? "" : PQgetvalue(res, r, c));
}
results.push_back(row_data);
}
PQclear(res);
return_connection(pgconn_v);
json j;
j["success"] = true;
j["columns"] = num_fields;
j["rows"] = results;
return j.dump();
}
void* mysql = get_connection(target_id);
if (!mysql) {
json j;
j["success"] = false;
j["error"] = std::string("No available mysql connection for target: ") +
(target_id.empty() ? default_target_id : target_id);
return j.dump();
}
MYSQL* mysql_ptr = static_cast<MYSQL*>(mysql);
if (mysql_query(mysql_ptr, query.c_str())) {
@ -490,13 +794,119 @@ std::string Query_Tool_Handler::execute_query(const std::string& query) {
// Execute query with optional schema switching
std::string Query_Tool_Handler::execute_query_with_schema(
const std::string& query,
const std::string& schema
const std::string& schema,
const std::string& target_id
) {
void* mysql = get_connection();
if (!mysql) {
return "{\"error\": \"No available connection\"}";
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
json j;
j["success"] = false;
j["error"] = std::string("Unknown target: ") +
(target_id.empty() ? default_target_id : target_id);
return j.dump();
}
if (target->protocol == "pgsql") {
void* pgconn_v = get_pgsql_connection(target_id);
if (!pgconn_v) {
json j;
j["success"] = false;
j["error"] = std::string("No available pgsql connection for target: ") +
(target_id.empty() ? default_target_id : target_id);
return j.dump();
}
PGconn* pgconn = static_cast<PGconn*>(pgconn_v);
PgSQLConnection* conn_wrapper = find_pgsql_connection(pgconn_v);
if (!schema.empty() && conn_wrapper && conn_wrapper->current_schema != schema) {
std::string validated_schema = validate_sql_identifier_sqlite(schema);
if (validated_schema.empty()) {
return_connection(pgconn_v);
json j;
j["success"] = false;
j["error"] = "Invalid schema name: contains unsafe characters";
return j.dump();
}
std::string set_search_path = "SET search_path TO " + validated_schema;
PGresult* set_res = PQexec(pgconn, set_search_path.c_str());
if (set_res == NULL || PQresultStatus(set_res) != PGRES_COMMAND_OK) {
std::string err = set_res ? PQresultErrorMessage(set_res) : "set search_path failed";
if (set_res) {
PQclear(set_res);
}
return_connection(pgconn_v);
json j;
j["success"] = false;
j["error"] = err;
return j.dump();
}
PQclear(set_res);
conn_wrapper->current_schema = validated_schema;
}
PGresult* res = PQexec(pgconn, query.c_str());
if (res == NULL) {
return_connection(pgconn_v);
json j;
j["success"] = false;
j["error"] = std::string("PQexec returned null result");
return j.dump();
}
ExecStatusType st = PQresultStatus(res);
if (st != PGRES_TUPLES_OK && st != PGRES_COMMAND_OK) {
std::string err = PQresultErrorMessage(res);
PQclear(res);
return_connection(pgconn_v);
json j;
j["success"] = false;
j["error"] = err;
return j.dump();
}
if (st == PGRES_COMMAND_OK) {
const char* tuples = PQcmdTuples(res);
long affected = 0;
if (tuples && tuples[0] != '\0') {
affected = atol(tuples);
}
PQclear(res);
return_connection(pgconn_v);
json j;
j["success"] = true;
j["affected_rows"] = affected;
return j.dump();
}
int num_fields = PQnfields(res);
int num_rows = PQntuples(res);
json results = json::array();
for (int r = 0; r < num_rows; r++) {
json row_data = json::array();
for (int c = 0; c < num_fields; c++) {
row_data.push_back(PQgetisnull(res, r, c) ? "" : PQgetvalue(res, r, c));
}
results.push_back(row_data);
}
PQclear(res);
return_connection(pgconn_v);
json j;
j["success"] = true;
j["columns"] = num_fields;
j["rows"] = results;
return j.dump();
}
void* mysql = get_connection(target_id);
if (!mysql) {
json j;
j["success"] = false;
j["error"] = std::string("No available mysql connection for target: ") +
(target_id.empty() ? default_target_id : target_id);
return j.dump();
}
MYSQL* mysql_ptr = static_cast<MYSQL*>(mysql);
MySQLConnection* conn_wrapper = find_connection(mysql);
@ -686,18 +1096,25 @@ json Query_Tool_Handler::get_tool_list() {
// ============================================================
// INVENTORY TOOLS
// ============================================================
tools.push_back(create_tool_schema(
"list_targets",
"List logical query targets. Each target maps internally to a ProxySQL hostgroup and routing policy.",
{},
{}
));
tools.push_back(create_tool_schema(
"list_schemas",
"List all available schemas/databases",
{},
{{"page_token", "string"}, {"page_size", "integer"}}
{{"page_token", "string"}, {"page_size", "integer"}, {"target_id", "string"}}
));
tools.push_back(create_tool_schema(
"list_tables",
"List tables in a schema",
{"schema"},
{{"page_token", "string"}, {"page_size", "integer"}, {"name_filter", "string"}}
{{"page_token", "string"}, {"page_size", "integer"}, {"name_filter", "string"}, {"target_id", "string"}}
));
// ============================================================
@ -732,16 +1149,16 @@ json Query_Tool_Handler::get_tool_list() {
// ============================================================
tools.push_back(create_tool_schema(
"run_sql_readonly",
"Execute a read-only SQL query with safety guardrails enforced. Optional schema parameter switches database context before query execution.",
"Execute a read-only SQL query with safety guardrails enforced. Optional schema parameter switches database context before query execution. target_id routes the query to a logical backend target.",
{"sql"},
{{"schema", "string"}, {"max_rows", "integer"}, {"timeout_sec", "integer"}}
{{"schema", "string"}, {"target_id", "string"}, {"max_rows", "integer"}, {"timeout_sec", "integer"}}
));
tools.push_back(create_tool_schema(
"explain_sql",
"Explain a query execution plan using EXPLAIN or EXPLAIN ANALYZE",
{"sql"},
{}
{{"target_id", "string"}}
));
// ============================================================
@ -988,9 +1405,39 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
// ============================================================
// INVENTORY TOOLS
// ============================================================
if (tool_name == "list_schemas") {
if (tool_name == "list_targets") {
refresh_target_registry();
json targets = json::array();
for (const auto& target : target_registry) {
json t;
t["target_id"] = target.target_id;
t["description"] = target.description;
json capabilities = json::array();
capabilities.push_back("inventory");
if (target.executable) {
capabilities.push_back("readonly_sql");
capabilities.push_back("explain");
}
t["capabilities"] = capabilities;
targets.push_back(t);
}
json payload;
payload["targets"] = targets;
payload["default_target_id"] = default_target_id;
result = create_success_response(payload);
}
else if (tool_name == "list_schemas") {
std::string target_id = json_string(arguments, "target_id");
std::string page_token = json_string(arguments, "page_token");
int page_size = json_int(arguments, "page_size", 50);
if (!target_id.empty()) {
refresh_target_registry();
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
return create_error_response("Unknown target_id: " + target_id);
}
}
// Query catalog's schemas table instead of live database
char* error = NULL;
@ -1038,9 +1485,25 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
else if (tool_name == "list_tables") {
std::string schema = json_string(arguments, "schema");
std::string target_id = json_string(arguments, "target_id");
std::string page_token = json_string(arguments, "page_token");
int page_size = json_int(arguments, "page_size", 50);
std::string name_filter = json_string(arguments, "name_filter");
(void)page_token;
(void)page_size;
refresh_target_registry();
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
result = create_error_response(
target_id.empty() ? "No executable default target available" : "Unknown target_id: " + target_id
);
return result;
}
if (!target->executable) {
result = create_error_response("Target is not executable by this handler");
return result;
}
// Validate schema identifier if provided
if (!schema.empty()) {
@ -1053,17 +1516,27 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
}
}
// TODO: Implement using MySQL connection
std::ostringstream sql;
sql << "SHOW TABLES";
if (!schema.empty()) {
sql << " FROM " << schema;
}
if (!name_filter.empty()) {
// Escape the name_filter to prevent SQL injection
sql << " LIKE '" << escape_string_literal(name_filter) << "'";
if (target->protocol == "pgsql") {
sql << "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE'";
if (!schema.empty()) {
sql << " AND table_schema='" << escape_string_literal(schema) << "'";
}
if (!name_filter.empty()) {
sql << " AND table_name LIKE '" << escape_string_literal(name_filter) << "'";
}
sql << " ORDER BY table_name";
} else {
sql << "SHOW TABLES";
if (!schema.empty()) {
sql << " FROM " << schema;
}
if (!name_filter.empty()) {
// Escape the name_filter to prevent SQL injection
sql << " LIKE '" << escape_string_literal(name_filter) << "'";
}
}
std::string query_result = execute_query(sql.str());
std::string query_result = execute_query_with_schema(sql.str(), schema, target->target_id);
result = create_success_response(json::parse(query_result));
}
@ -1733,12 +2206,28 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
else if (tool_name == "run_sql_readonly") {
std::string sql = json_string(arguments, "sql");
std::string schema = json_string(arguments, "schema");
std::string target_id = json_string(arguments, "target_id");
int max_rows = json_int(arguments, "max_rows", 200);
int timeout_sec = json_int(arguments, "timeout_sec", 2);
(void)max_rows;
(void)timeout_sec;
if (sql.empty()) {
result = create_error_response("sql is required");
} else {
refresh_target_registry();
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
result = create_error_response(
target_id.empty() ? "No executable default target available" : "Unknown target_id: " + target_id
);
return result;
}
if (!target->executable) {
result = create_error_response("Target is not executable by this handler");
return result;
}
// ============================================================
// MCP QUERY RULES EVALUATION
// ============================================================
@ -1796,7 +2285,7 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
} else if (is_dangerous_query(sql)) {
result = create_error_response("SQL contains dangerous operations");
} else {
std::string query_result = execute_query_with_schema(sql, schema);
std::string query_result = execute_query_with_schema(sql, schema, target->target_id);
try {
json result_json = json::parse(query_result);
// Check if query actually failed
@ -1844,10 +2333,24 @@ json Query_Tool_Handler::execute_tool(const std::string& tool_name, const json&
else if (tool_name == "explain_sql") {
std::string sql = json_string(arguments, "sql");
std::string target_id = json_string(arguments, "target_id");
if (sql.empty()) {
result = create_error_response("sql is required");
} else {
std::string query_result = execute_query("EXPLAIN " + sql);
refresh_target_registry();
const QueryTarget* target = resolve_target(target_id);
if (target == NULL) {
result = create_error_response(
target_id.empty() ? "No executable default target available" : "Unknown target_id: " + target_id
);
return result;
}
if (!target->executable) {
result = create_error_response("Target is not executable by this handler");
return result;
}
std::string query_result = execute_query("EXPLAIN " + sql, target->target_id);
try {
result = create_success_response(json::parse(query_result));
} catch (...) {

Loading…
Cancel
Save