feat(session): Step 4.A — remove the GENAI:/LLM: query-prefix escape hatches

Per decision Q2 in the GenAI plugin carve-out design doc, the in-line
MySQL-protocol prefixes that bypassed routing, ACLs, and the query
processor are removed.  Users reach GenAI features through MCP, admin
SQL, or the REST endpoint -- as the rest of the carve-out finishes,
nothing else needs the in-core GENAI:/LLM: handlers.

Removals:

  lib/MySQL_Session.cpp (-933 lines)
    - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___genai
    - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___llm
    - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async
    - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response
    - genai_cleanup_request
    - check_genai_events
    - The COM_QUERY prefix-detection block that dispatched to those
      handlers
    - The check_genai_events poll-loop hook in WAITING_CLIENT_DATA

  include/MySQL_Session.h (-67 lines)
    - All matching method declarations, with their doxygen

  include/Base_Session.h (-15 lines)
    - GenAI_PendingRequest struct
    - pending_genai_requests_ map
    - next_genai_request_id_ counter
    - genai_epoll_fd_ per-session epoll fd

  lib/Base_Session.cpp (-9 lines)
    - The init block that created the per-session genai_epoll_fd_

LLM: was deleted alongside GENAI: even though only GENAI: is named in
the design.  They share the same `#ifdef PROXYSQLGENAI` gate, the same
async-genai socketpair plumbing, and the same rationale.  Leaving LLM:
behind would have left a half-block referencing functions that no
longer exist; if we want LLM: back later the plugin can re-implement
it (or a more general request-multiplexer ABI extension can land).

The async-genai socketpair protocol (GenAI_RequestHeader /
GenAI_ResponseHeader, defined in include/GenAI_Thread.h) is now
unreferenced from outside lib/GenAI_Thread.cpp itself; those structs
disappear when GenAI_Thread moves to plugins/genai/ in Step 5.

Verified:
  - clean rebuild of libproxysql.a + proxysql + ProxySQL_GenAI_Plugin.so
  - all 60 unit-test binaries pass
  - smoke test: daemon starts, SELECT through admin works, SIGTERM
    shuts down cleanly

Net diff: -1010 / +28 lines.  Largest deletion of the carve-out so far.
Rene Cannao 2 months ago
parent b59a1b94eb
commit a79a27a9e3

@ -99,20 +99,10 @@ class Base_Session {
//MySQL_STMTs_meta *sess_STMTs_meta;
//StmtLongDataHandler *SLDH;
// GenAI async support
#ifdef epoll_create1
struct GenAI_PendingRequest {
uint64_t request_id;
int client_fd; // MySQL side of socketpair
std::string json_query;
std::chrono::steady_clock::time_point start_time;
PtrSize_t *original_pkt; // Original packet to complete
};
std::unordered_map<uint64_t, GenAI_PendingRequest> pending_genai_requests_;
uint64_t next_genai_request_id_;
int genai_epoll_fd_; // For monitoring GenAI response fds
#endif
// GenAI async support (per-session epoll fd + pending-request map)
// removed in Step 4 of the GenAI plugin carve-out -- the
// GENAI:/LLM: prefix handlers that owned this state are gone.
// See decision Q2 in the design doc.

@ -385,74 +385,15 @@ class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL
void handler_rc0_RefreshActiveTransactions(MySQL_Connection* myconn);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_INIT_DB_replace_CLICKHOUSE(PtrSize_t& pkt);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___not_mysql(PtrSize_t& pkt);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___genai(const char* query, size_t query_len, PtrSize_t* pkt);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___llm(const char* query, size_t query_len, PtrSize_t* pkt);
#ifdef epoll_create1
/**
* @brief Handle GenAI response from socketpair
*
* Called when epoll notifies that a GenAI response is available on a client fd.
* Reads the GenAI_ResponseHeader and JSON result, then sends the resultset
* to the MySQL client.
*
* @param fd The socketpair fd (MySQL side) with data available to read
*
* @see handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async()
* @see check_genai_events()
*/
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response(int fd);
// MYSQL_COM_QUERY___genai / MYSQL_COM_QUERY___llm and the entire
// async-genai socketpair infrastructure (handle_genai_response,
// genai_send_async, genai_cleanup_request, check_genai_events) were
// removed in Step 4 of the GenAI plugin carve-out (decision Q2 in
// the design doc). GenAI now reaches clients through MCP / admin
// SQL / REST -- the in-line MySQL-protocol "GENAI:" / "LLM:"
// prefix escape hatches were a debug/POC convenience that bypassed
// routing, ACLs, and the query processor.
/**
* @brief Send GenAI request asynchronously via socketpair
*
* Creates a socketpair for async communication with the GenAI module:
* 1. Creates socketpair(fds)
* 2. Registers fds[1] with GenAI module
* 3. Sends GenAI_RequestHeader + JSON query via fds[0]
* 4. Adds fds[0] to session's epoll for response notification
* 5. Returns immediately (MySQL thread is free to process other queries)
*
* The response will be handled by handle_genai_response() when ready.
*
* @param query The JSON query string (after "GENAI:" prefix)
* @param query_len Length of the query string
* @param pkt Original packet (stored for later cleanup)
* @return true if request was sent successfully, false on error
*
* @see handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response()
*/
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async(const char* query, size_t query_len, PtrSize_t* pkt);
/**
* @brief Cleanup a GenAI pending request
*
* Removes the request from the pending map, closes the socketpair fd,
* removes from epoll, and frees the original packet. Called after
* the response is processed or on error.
*
* @param request_id The request ID to clean up
*
* @see handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async()
*/
void genai_cleanup_request(uint64_t request_id);
/**
* @brief Check for pending GenAI responses
*
* Performs a non-blocking epoll_wait on the session's GenAI epoll fd
* to check if any responses are ready. If a response is found, it's
* processed immediately by calling handle_genai_response().
*
* This is called from the main handler() loop in the WAITING_CLIENT_DATA
* case to ensure GenAI responses are processed promptly even when
* there's no new client data.
*
* @return true if a response was processed, false if no responses were ready
*
* @see handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response()
*/
bool check_genai_events();
#endif
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_detect_SQLi();
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP_MULTI_PACKET(PtrSize_t& pkt);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM__various(PtrSize_t* pkt, bool* wrong_pass);

@ -86,15 +86,9 @@ void Base_Session<S,DS,B,T>::init() {
MySQL_Session* mysession = static_cast<S*>(this);
mysession->sess_STMTs_meta = new MySQL_STMTs_meta();
mysession->SLDH = new StmtLongDataHandler();
#ifdef epoll_create1
// Initialize GenAI async support
mysession->next_genai_request_id_ = 1;
mysession->genai_epoll_fd_ = epoll_create1(EPOLL_CLOEXEC);
if (mysession->genai_epoll_fd_ < 0) {
proxy_error("Failed to create GenAI epoll fd: %s\n", strerror(errno));
mysession->genai_epoll_fd_ = -1;
}
#endif
// GenAI async epoll-fd init removed in Step 4 of the GenAI
// plugin carve-out (Base_Session.h GenAI async members are
// gone with the GENAI:/LLM: prefix handlers).
}
};

@ -3711,899 +3711,6 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return false;
}
#ifdef PROXYSQLGENAI
// Handler for GENAI: queries - experimental GenAI integration
// Query formats:
// GENAI: {"type": "embed", "documents": ["doc1", "doc2", ...]}
// GENAI: {"type": "rerank", "query": "...", "documents": [...], "top_n": 5, "columns": 3}
// Returns: Resultset with embeddings or reranked documents
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___genai(const char* query, size_t query_len, PtrSize_t* pkt) {
// Skip leading space after "GENAI:"
while (query_len > 0 && (*query == ' ' || *query == '\t')) {
query++;
query_len--;
}
if (query_len == 0) {
// Empty query after GENAI:
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1234, (char*)"HY000", "Empty GENAI: query", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Check GenAI module is initialized
if (!GloGATH) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1237, (char*)"HY000", "GenAI module is not initialized", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
if (!GloGATH->variables.genai_enabled) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1238, (char*)"HY000",
"GenAI is disabled (set genai-enabled=true)", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
#ifdef epoll_create1
// Use async path with socketpair for non-blocking operation
if (!handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async(query, query_len, pkt)) {
// Async send failed - error already sent to client
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Request sent asynchronously - don't free pkt, will be freed in response handler
// Return immediately, session is now free to handle other queries
proxy_debug(PROXY_DEBUG_GENAI, 3, "GenAI: Query sent asynchronously, session continuing\n");
#else
// Fallback to synchronous blocking path for systems without epoll
// Pass JSON query to GenAI module for autonomous processing
std::string json_query(query, query_len);
std::string result_json = GloGATH->process_json_query(json_query);
if (result_json.empty()) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1250, (char*)"HY000", "GenAI query processing failed", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Parse the JSON result and build MySQL resultset
try {
json result = json::parse(result_json);
if (!result.is_object()) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1251, (char*)"HY000", "GenAI returned invalid result format", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Check if result is an error
if (result.contains("error") && result["error"].is_string()) {
std::string error_msg = result["error"].get<std::string>();
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1252, (char*)"HY000", (char*)error_msg.c_str(), true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Extract resultset data
if (!result.contains("columns") || !result["columns"].is_array()) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1253, (char*)"HY000", "GenAI result missing 'columns' field", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
if (!result.contains("rows") || !result["rows"].is_array()) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1254, (char*)"HY000", "GenAI result missing 'rows' field", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
auto columns = result["columns"];
auto rows = result["rows"];
// Build SQLite3 resultset
std::unique_ptr<SQLite3_result> resultset(new SQLite3_result(columns.size()));
// Add column definitions
for (size_t i = 0; i < columns.size(); i++) {
if (columns[i].is_string()) {
std::string col_name = columns[i].get<std::string>();
resultset->add_column_definition(SQLITE_TEXT, (char*)col_name.c_str());
}
}
// Add rows
for (const auto& row : rows) {
if (!row.is_array()) continue;
// Create row data array
char** row_data = (char**)malloc(columns.size() * sizeof(char*));
size_t valid_cols = 0;
for (size_t i = 0; i < columns.size() && i < row.size(); i++) {
if (row[i].is_string()) {
std::string val = row[i].get<std::string>();
row_data[valid_cols++] = strdup(val.c_str());
} else if (row[i].is_null()) {
row_data[valid_cols++] = NULL;
} else {
// Convert to string
std::string val = row[i].dump();
// Remove quotes if present
if (val.size() >= 2 && val[0] == '"' && val[val.size()-1] == '"') {
val = val.substr(1, val.size() - 2);
}
row_data[valid_cols++] = strdup(val.c_str());
}
}
resultset->add_row(row_data);
// Free row data
for (size_t i = 0; i < valid_cols; i++) {
if (row_data[i]) free(row_data[i]);
}
free(row_data);
}
// Send resultset to client
SQLite3_to_MySQL(resultset.get(), NULL, 0, &client_myds->myprot, false,
(client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF));
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
} catch (const json::parse_error& e) {
client_myds->DSS = STATE_QUERY_SENT_NET;
std::string err_msg = "Failed to parse GenAI result: ";
err_msg += e.what();
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1255, (char*)"HY000", (char*)err_msg.c_str(), true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
} catch (const std::exception& e) {
client_myds->DSS = STATE_QUERY_SENT_NET;
std::string err_msg = "Error processing GenAI result: ";
err_msg += e.what();
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1256, (char*)"HY000", (char*)err_msg.c_str(), true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
}
#endif // epoll_create1 - fallback blocking path
}
// Handler for LLM: queries - Generic LLM bridge processing
// Query format:
// LLM: Summarize the customer feedback
// LLM: Generate a Python function to validate emails
// LLM: Explain this SQL query: SELECT * FROM users
// Returns: Resultset with the text response from LLM
//
// Note: This now uses the async GENAI path to avoid blocking MySQL threads.
// The LLM query is converted to a JSON GENAI request and sent asynchronously.
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___llm(const char* query, size_t query_len, PtrSize_t* pkt) {
// Skip leading space after "LLM:"
while (query_len > 0 && (*query == ' ' || *query == '\t')) {
query++;
query_len--;
}
if (query_len == 0) {
// Empty query after LLM:
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1240, (char*)"HY000", "Empty LLM: query", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Check GenAI module is initialized (LLM now uses GenAI module)
if (!GloGATH) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1241, (char*)"HY000", "GenAI module is not initialized", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
if (!GloGATH->variables.genai_enabled || !GloGATH->variables.genai_llm_enabled) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1245, (char*)"HY000",
"LLM is disabled (set genai-enabled=true and genai-llm_enabled=true)", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Check AI manager is available for LLM bridge
if (!GloAI) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1242, (char*)"HY000", "AI features module is not initialized", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Get LLM bridge from AI manager
LLM_Bridge* llm_bridge = GloAI->get_llm_bridge();
if (!llm_bridge) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1243, (char*)"HY000", "LLM bridge is not initialized", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Increment total requests counter
GloAI->increment_llm_total_requests();
#ifdef epoll_create1
// Build JSON query for LLM operation
json json_query;
json_query["type"] = "llm";
json_query["prompt"] = std::string(query, query_len);
json_query["allow_cache"] = true;
// Add schema if available (for context)
if (client_myds->myconn->userinfo->schemaname) {
json_query["schema"] = std::string(client_myds->myconn->userinfo->schemaname);
}
std::string json_str = json_query.dump();
// Use async GENAI path to avoid blocking
if (!handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async(json_str.c_str(), json_str.length(), pkt)) {
// Async send failed - error already sent to client
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Request sent asynchronously - don't free pkt, will be freed in response handler
// Return immediately, session is now free to handle other queries
proxy_debug(PROXY_DEBUG_GENAI, 2, "LLM: Query sent asynchronously via GenAI: %s\n", std::string(query, query_len).c_str());
#else
// Fallback to synchronous blocking path for systems without epoll
// Build LLM request
LLMRequest req;
req.prompt = std::string(query, query_len);
req.schema_name = client_myds->myconn->userinfo->schemaname ? client_myds->myconn->userinfo->schemaname : "";
req.allow_cache = true;
req.max_latency_ms = 0; // No specific latency requirement
// Call LLM bridge (blocking fallback)
LLMResult result = llm_bridge->process(req);
// Update performance counters based on result
if (result.cache_hit) {
GloAI->increment_llm_cache_hits();
} else {
GloAI->increment_llm_cache_misses();
}
// Update timing counters
GloAI->add_llm_response_time_ms(result.total_time_ms);
GloAI->add_llm_cache_lookup_time_ms(result.cache_lookup_time_ms);
GloAI->increment_llm_cache_lookups();
if (result.cache_hit) {
// For cache hits, we're done
} else {
// For cache misses, also count LLM call time and cache store time
GloAI->add_llm_cache_store_time_ms(result.cache_store_time_ms);
if (result.cache_store_time_ms > 0) {
GloAI->increment_llm_cache_stores();
}
// Update model call counters
char* prefer_local = GloGATH->get_variable((char*)"prefer_local_models");
bool prefer_local_models = prefer_local && (strcmp(prefer_local, "true") == 0);
if (prefer_local) free(prefer_local);
if (result.provider_used == "openai") {
// Check if it's a local call (Ollama) or cloud call
if (prefer_local_models &&
(result.explanation.find("localhost") != std::string::npos ||
result.explanation.find("127.0.0.1") != std::string::npos)) {
GloAI->increment_llm_local_model_calls();
} else {
GloAI->increment_llm_cloud_model_calls();
}
} else if (result.provider_used == "anthropic") {
GloAI->increment_llm_cloud_model_calls();
}
}
if (result.text_response.empty() && !result.error_code.empty()) {
// LLM processing failed
std::string err_msg = "LLM processing failed: ";
err_msg += result.error_code;
if (!result.error_details.empty()) {
err_msg += " - ";
err_msg += result.error_details;
}
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1244, (char*)"HY000", (char*)err_msg.c_str(), true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Build resultset with the generated text response
std::vector<std::string> columns = {"text_response", "explanation", "cached", "provider"};
std::unique_ptr<SQLite3_result> resultset(new SQLite3_result(columns.size()));
// Add column definitions
for (size_t i = 0; i < columns.size(); i++) {
resultset->add_column_definition(SQLITE_TEXT, (char*)columns[i].c_str());
}
// Add single row with the result
char** row_data = (char**)malloc(columns.size() * sizeof(char*));
row_data[0] = strdup(result.text_response.c_str());
row_data[1] = strdup(result.explanation.c_str());
row_data[2] = strdup(result.cached ? "true" : "false");
row_data[3] = strdup(result.provider_used.c_str());
resultset->add_row(row_data);
// Free row data
for (size_t i = 0; i < columns.size(); i++) {
free(row_data[i]);
}
free(row_data);
// Send resultset to client
SQLite3_to_MySQL(resultset.get(), NULL, 0, &client_myds->myprot, false,
(client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF));
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
proxy_debug(PROXY_DEBUG_GENAI, 2, "LLM: Processed prompt '%s' [blocking fallback]\n",
req.prompt.c_str());
#endif
}
#ifdef epoll_create1
/**
* @brief Send GenAI request asynchronously via socketpair
*
* This function implements the non-blocking async GenAI request path. It creates
* a socketpair for bidirectional communication with the GenAI module and sends
* the request immediately without waiting for the response.
*
* Async flow:
* 1. Create socketpair(fds) for bidirectional communication
* 2. Register fds[1] (GenAI side) with GenAI module via register_client()
* 3. Store request in pending_genai_requests_ map for later response matching
* 4. Send GenAI_RequestHeader + JSON query via fds[0]
* 5. Add fds[0] to session's genai_epoll_fd_ for response notification
* 6. Return immediately (MySQL thread is now free to process other queries)
*
* The response will be delivered asynchronously and handled by
* handle_genai_response() when the GenAI worker completes processing.
*
* Error handling:
* - On socketpair failure: Send ERR packet to client, return false
* - On register_client failure: Cleanup fds, send ERR packet, return false
* - On write failure: Cleanup request via genai_cleanup_request(), send ERR packet
* - On epoll add failure: Log warning but continue (request was sent successfully)
*
* Memory management:
* - Original packet is copied to pending.original_pkt for response generation
* - Memory is freed in genai_cleanup_request() when response is processed
*
* @param query JSON query string to send to GenAI module
* @param query_len Length of the query string
* @param pkt Original MySQL packet (for command number and later response)
* @return true if request was sent successfully, false on error
*
* @note This function is non-blocking and returns immediately after sending.
* The actual GenAI processing happens in worker threads, not MySQL threads.
* @see handle_genai_response(), genai_cleanup_request(), check_genai_events()
*/
bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async(
const char* query, size_t query_len, PtrSize_t* pkt) {
// Create socketpair for async communication
int fds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
proxy_error("GenAI: socketpair failed: %s\n", strerror(errno));
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1260, (char*)"HY000",
"Failed to create GenAI communication channel", true);
return false;
}
// Set MySQL side to non-blocking
int flags = fcntl(fds[0], F_GETFL, 0);
fcntl(fds[0], F_SETFL, flags | O_NONBLOCK);
// Register GenAI side with GenAI module
if (!GloGATH->register_client(fds[1])) {
proxy_error("GenAI: Failed to register client fd %d with GenAI module\n", fds[1]);
close(fds[0]);
close(fds[1]);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1261, (char*)"HY000",
"Failed to register with GenAI module", true);
return false;
}
// Prepare request header
GenAI_RequestHeader hdr;
hdr.request_id = next_genai_request_id_++;
hdr.operation = GENAI_OP_JSON;
hdr.query_len = query_len;
hdr.flags = 0;
hdr.top_n = 0;
// Store request in pending map
GenAI_PendingRequest pending;
pending.request_id = hdr.request_id;
pending.client_fd = fds[0];
pending.json_query = std::string(query, query_len);
pending.start_time = std::chrono::steady_clock::now();
// Copy the original packet for later response
pending.original_pkt = (PtrSize_t*)malloc(sizeof(PtrSize_t));
if (!pending.original_pkt) {
proxy_error("GenAI: Failed to allocate memory for packet copy\n");
close(fds[0]);
close(fds[1]);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1262, (char*)"HY000",
"Memory allocation failed", true);
return false;
}
pending.original_pkt->ptr = pkt->ptr;
pending.original_pkt->size = pkt->size;
pending_genai_requests_[hdr.request_id] = pending;
// Send request header
ssize_t written = write(fds[0], &hdr, sizeof(hdr));
if (written != sizeof(hdr)) {
proxy_error("GenAI: Failed to write request header to fd %d: %s\n",
fds[0], strerror(errno));
auto it = pending_genai_requests_.find(hdr.request_id);
if (it != pending_genai_requests_.end() && it->second.original_pkt) {
it->second.original_pkt->ptr = nullptr;
it->second.original_pkt->size = 0;
}
genai_cleanup_request(hdr.request_id);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1263, (char*)"HY000",
"Failed to send request to GenAI module", true);
return false;
}
// Send JSON query
size_t total_written = 0;
while (total_written < query_len) {
ssize_t w = write(fds[0], query + total_written, query_len - total_written);
if (w <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
usleep(1000);
continue;
}
proxy_error("GenAI: Failed to write JSON query to fd %d: %s\n",
fds[0], strerror(errno));
auto it = pending_genai_requests_.find(hdr.request_id);
if (it != pending_genai_requests_.end() && it->second.original_pkt) {
it->second.original_pkt->ptr = nullptr;
it->second.original_pkt->size = 0;
}
genai_cleanup_request(hdr.request_id);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1264, (char*)"HY000",
"Failed to send query to GenAI module", true);
return false;
}
total_written += w;
}
// Add to epoll for response notification
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fds[0];
if (epoll_ctl(genai_epoll_fd_, EPOLL_CTL_ADD, fds[0], &ev) < 0) {
proxy_error("GenAI: Failed to add fd %d to epoll: %s\n", fds[0], strerror(errno));
// Request is sent, but we won't be notified of response
// This is not fatal - we'll timeout eventually
}
proxy_debug(PROXY_DEBUG_GENAI, 3,
"GenAI: Sent async request %lu via fd %d (query_len=%zu)\n",
hdr.request_id, fds[0], query_len);
return true; // Success - request sent asynchronously
}
/**
* @brief Handle GenAI response from socketpair
*
* This function is called when epoll notifies that data is available on a
* GenAI response file descriptor. It reads the response from the GenAI worker
* thread, processes the result, and sends the MySQL result packet to the client.
*
* Response handling flow:
* 1. Read GenAI_ResponseHeader from socketpair
* 2. Find matching pending request via request_id in pending_genai_requests_
* 3. Read JSON result payload (if result_len > 0)
* 4. Parse JSON and convert to MySQL resultset format
* 5. Send result packet (or ERR packet on error) to client
* 6. Cleanup resources via genai_cleanup_request()
*
* Response format (from GenAI worker):
* - GenAI_ResponseHeader (request_id, status_code, result_len, processing_time_ms)
* - JSON result payload (if result_len > 0)
*
* Error handling:
* - On read error: Find and cleanup pending request, return
* - On incomplete header: Log error, return
* - On unknown request_id: Log error, close fd, return
* - On status_code != 0: Send ERR packet to client with error details
* - On JSON parse error: Send ERR packet to client
*
* RTT (Round-Trip Time) tracking:
* - Calculates RTT from request start to response receipt
* - Logs RTT along with GenAI processing time for monitoring
*
* @param fd The MySQL side file descriptor from socketpair (fds[0])
*
* @note This function is called from check_genai_events() which is invoked
* from the main handler() loop. It runs in the MySQL thread context.
* @see genai_send_async(), genai_cleanup_request(), check_genai_events()
*/
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response(int fd) {
// Read response header
GenAI_ResponseHeader resp;
ssize_t n = read(fd, &resp, sizeof(resp));
if (n < 0) {
// Check for non-blocking read - not an error, just no data yet
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return;
}
// Real error - log and cleanup
proxy_error("GenAI: Error reading response header from fd %d: %s\n",
fd, strerror(errno));
} else if (n == 0) {
// Connection closed (EOF) - cleanup
} else {
// Successfully read header, continue processing
goto process_response;
}
// Cleanup path for error or EOF
for (auto& pair : pending_genai_requests_) {
if (pair.second.client_fd == fd) {
genai_cleanup_request(pair.first);
break;
}
}
return;
process_response:
if (n != sizeof(resp)) {
proxy_error("GenAI: Incomplete response header from fd %d: got %zd, expected %zu\n",
fd, n, sizeof(resp));
return;
}
// Find the pending request
auto it = pending_genai_requests_.find(resp.request_id);
if (it == pending_genai_requests_.end()) {
proxy_error("GenAI: Received response for unknown request %lu\n", resp.request_id);
close(fd);
return;
}
GenAI_PendingRequest& pending = it->second;
// Read JSON result
std::string json_result;
if (resp.result_len > 0) {
json_result.resize(resp.result_len);
size_t total_read = 0;
while (total_read < resp.result_len) {
ssize_t r = read(fd, &json_result[total_read],
resp.result_len - total_read);
if (r <= 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
usleep(1000);
continue;
}
proxy_error("GenAI: Error reading JSON result from fd %d: %s\n",
fd, strerror(errno));
json_result.clear();
break;
}
total_read += r;
}
}
// Process the result
auto end_time = std::chrono::steady_clock::now();
int rtt_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - pending.start_time).count();
proxy_debug(PROXY_DEBUG_GENAI, 3,
"GenAI: Received response %lu (status=%u, result_len=%u, rtt=%dms, proc=%dms)\n",
resp.request_id, resp.status_code, resp.result_len, rtt_ms, resp.processing_time_ms);
// Check for errors
if (resp.status_code != 0 || json_result.empty()) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1265, (char*)"HY000",
"GenAI query processing failed", true);
} else {
// Parse JSON result and send resultset
try {
json result = json::parse(json_result);
if (!result.is_object()) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1266, (char*)"HY000",
"GenAI returned invalid result format", true);
} else if (result.contains("error") && result["error"].is_string()) {
std::string error_msg = result["error"].get<std::string>();
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1267, (char*)"HY000",
(char*)error_msg.c_str(), true);
} else if (!result.contains("columns") || !result.contains("rows")) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1268, (char*)"HY000",
"GenAI result missing required fields", true);
} else {
// Build and send resultset
auto columns = result["columns"];
auto rows = result["rows"];
std::unique_ptr<SQLite3_result> resultset(new SQLite3_result(columns.size()));
// Add column definitions
for (size_t i = 0; i < columns.size(); i++) {
if (columns[i].is_string()) {
std::string col_name = columns[i].get<std::string>();
resultset->add_column_definition(SQLITE_TEXT, (char*)col_name.c_str());
}
}
// Add rows
for (const auto& row : rows) {
if (!row.is_array()) continue;
size_t num_cols = row.size();
if (num_cols > columns.size()) num_cols = columns.size();
char** row_data = (char**)malloc(num_cols * sizeof(char*));
size_t valid_cols = 0;
for (size_t i = 0; i < num_cols; i++) {
if (!row[i].is_null()) {
std::string val;
if (row[i].is_string()) {
val = row[i].get<std::string>();
} else {
val = row[i].dump();
}
row_data[valid_cols++] = strdup(val.c_str());
}
}
resultset->add_row(row_data);
for (size_t i = 0; i < valid_cols; i++) {
if (row_data[i]) free(row_data[i]);
}
free(row_data);
}
// Send resultset to client
SQLite3_to_MySQL(resultset.get(), NULL, 0, &client_myds->myprot, false,
(client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF));
}
} catch (const json::parse_error& e) {
client_myds->DSS = STATE_QUERY_SENT_NET;
std::string err_msg = "Failed to parse GenAI result: ";
err_msg += e.what();
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1269, (char*)"HY000",
(char*)err_msg.c_str(), true);
} catch (const std::exception& e) {
client_myds->DSS = STATE_QUERY_SENT_NET;
std::string err_msg = "Error processing GenAI result: ";
err_msg += e.what();
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1270, (char*)"HY000",
(char*)err_msg.c_str(), true);
}
}
// Cleanup the request
genai_cleanup_request(resp.request_id);
// Return to waiting state
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
}
/**
* @brief Cleanup a GenAI pending request
*
* This function cleans up all resources associated with a GenAI pending request.
* It is called after a response has been processed or when an error occurs.
*
* Cleanup operations:
* 1. Remove request from pending_genai_requests_ map
* 2. Close the socketpair file descriptor (client_fd)
* 3. Remove fd from genai_epoll_fd_ monitoring
* 4. Free the original packet memory (original_pkt)
*
* Resource cleanup details:
* - client_fd: The MySQL side of the socketpair (fds[0]) is closed
* - epoll: The fd is removed from genai_epoll_fd_ to stop monitoring
* - original_pkt: The copied packet memory is freed (ptr and size)
* - pending map: The request entry is removed from the map
*
* This function must be called exactly once per request to avoid:
* - File descriptor leaks (unclosed sockets)
* - Memory leaks (unfreed packets)
* - Epoll monitoring stale fds (removed from map but still in epoll)
*
* @param request_id The request ID to cleanup (must exist in pending_genai_requests_)
*
* @note This function is idempotent - if the request_id is not found, it safely
* returns without error (useful for error paths where cleanup might be
* called multiple times).
* @note If the request is not found in the map, this function silently returns
* without error (this is intentional to avoid crashes on double cleanup).
*
* @see genai_send_async(), handle_genai_response()
*/
void MySQL_Session::genai_cleanup_request(uint64_t request_id) {
auto it = pending_genai_requests_.find(request_id);
if (it == pending_genai_requests_.end()) {
return;
}
GenAI_PendingRequest& pending = it->second;
// Remove from epoll
epoll_ctl(genai_epoll_fd_, EPOLL_CTL_DEL, pending.client_fd, nullptr);
// Close socketpair fds
close(pending.client_fd);
// Free the original packet
if (pending.original_pkt) {
l_free(pending.original_pkt->size, pending.original_pkt->ptr);
free(pending.original_pkt);
}
pending_genai_requests_.erase(it);
proxy_debug(PROXY_DEBUG_GENAI, 3, "GenAI: Cleaned up request %lu\n", request_id);
}
/**
* @brief Check for pending GenAI responses
*
* This function performs a non-blocking epoll_wait on the session's GenAI epoll
* file descriptor to check if any responses from GenAI workers are ready to be
* processed. It is called from the main handler() loop in the WAITING_CLIENT_DATA
* state to interleave GenAI response processing with normal client query handling.
*
* Event checking flow:
* 1. Early return if no pending requests (empty pending_genai_requests_)
* 2. Non-blocking epoll_wait with timeout=0 on genai_epoll_fd_
* 3. For each ready fd, find matching pending request
* 4. Call handle_genai_response() to process the response
* 5. Return true after processing one response (to re-check for more)
*
* Integration with main loop:
* ```cpp
* handler_again:
* switch (status) {
* case WAITING_CLIENT_DATA:
* handler___status_WAITING_CLIENT_DATA();
* #ifdef epoll_create1
* // Check for GenAI responses before processing new client data
* if (check_genai_events()) {
* // GenAI response was processed, check for more
* goto handler_again;
* }
* #endif
* break;
* }
* ```
*
* Non-blocking behavior:
* - epoll_wait timeout is 0 (immediate return)
* - Returns true only if a response was actually processed
* - Allows main loop to continue processing client queries between responses
*
* Return value:
* - true: A GenAI response was processed (caller should re-check for more)
* - false: No responses ready (caller can proceed to normal client handling)
*
* @return true if a GenAI response was processed, false otherwise
*
* @note This function is called from the main handler() loop on every iteration
* when in WAITING_CLIENT_DATA state. It must return quickly to avoid
* delaying normal client query processing.
* @note Only processes one response per call to avoid starving client handling.
* The main loop will call again to process additional responses.
*
* @see handle_genai_response(), genai_send_async()
*/
bool MySQL_Session::check_genai_events() {
#ifdef epoll_create1
if (pending_genai_requests_.empty()) {
return false;
}
const int MAX_EVENTS = 16;
struct epoll_event events[MAX_EVENTS];
int nfds = epoll_wait(genai_epoll_fd_, events, MAX_EVENTS, 0); // Non-blocking check
if (nfds <= 0) {
return false;
}
for (int i = 0; i < nfds; i++) {
int fd = events[i].data.fd;
// Find the pending request for this fd
for (auto it = pending_genai_requests_.begin(); it != pending_genai_requests_.end(); ++it) {
if (it->second.client_fd == fd) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response(fd);
return true; // Processed one response
}
}
}
return false;
#else
return false;
#endif
}
#endif /* epoll_create1 (was mislabeled PROXYSQLGENAI in original) */
#endif /* PROXYSQLGENAI (the outer block opened just before the GENAI: handler) */
// this function was inline inside MySQL_Session::get_pkts_from_client
// where:
@ -6146,13 +5253,13 @@ handler_again:
case WAITING_CLIENT_DATA:
// housekeeping
handler___status_WAITING_CLIENT_DATA();
#ifdef epoll_create1
// Check for GenAI responses before processing new client data
if (check_genai_events()) {
// GenAI response was processed, check for more
goto handler_again;
}
#endif
// In-core check_genai_events() poll-loop hook removed in Step 4
// of the GenAI plugin carve-out -- the async-genai socketpair
// protocol it monitored is gone with the GENAI:/LLM: prefix
// handlers. Plugins that need their own per-session polling
// hook will get one through a future ABI extension; today the
// only consumer (the genai plugin) handles everything inline
// from the query hook.
break;
case FAST_FORWARD:
if (mybe->server_myds->mypolls==NULL) {
@ -7251,26 +6358,12 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
bool exit_after_SetParse = true;
unsigned char command_type=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
#ifdef PROXYSQLGENAI
// Check for GENAI: queries - experimental GenAI integration
if (pkt->size > sizeof(mysql_hdr) + 7) { // Need at least "GENAI: " (7 chars after header)
const char* query_ptr = (const char*)pkt->ptr + sizeof(mysql_hdr) + 1;
size_t query_len = pkt->size - sizeof(mysql_hdr) - 1;
if (query_len >= 7 && strncasecmp(query_ptr, "GENAI:", 6) == 0) {
// This is a GENAI: query - handle with GenAI module
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___genai(query_ptr + 6, query_len - 6, pkt);
return true;
}
// Check for LLM: queries - Generic LLM bridge processing
if (query_len >= 5 && strncasecmp(query_ptr, "LLM:", 4) == 0) {
// This is a LLM: query - handle with LLM bridge
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___llm(query_ptr + 4, query_len - 4, pkt);
return true;
}
}
#endif // PROXYSQLGENAI
// The "GENAI:" / "LLM:" query-prefix escape hatches were removed in
// Step 4 of the GenAI plugin carve-out (decision Q2 in the design
// doc). Users now reach GenAI features through MCP, admin SQL, or
// the REST endpoint -- the in-line MySQL-protocol prefix was a
// debug/POC convenience that bypassed routing, ACLs, and the
// query processor.
if (qpo->new_query) {
handler_WCD_SS_MCQ_qpo_QueryRewrite(pkt);

Loading…
Cancel
Save