From a7dac5ef3d7481e6fb6df5869bd70763a6c7aafb Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Fri, 16 Jan 2026 23:59:22 +0000 Subject: [PATCH] feat: Make NL2SQL use async GenAI path instead of blocking calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is a critical architectural fix - NL2SQL was making blocking calls to LLMs which would block the entire MySQL thread. Now NL2SQL uses the same async socketpair pattern as the GENAI embed/rerank operations. Changes: - Added nl2sql operation type to process_json_query() in GenAI module - Updated NL2SQL handler to construct JSON query and use async GENAI path - Added extern declaration for GloAI in GenAI_Thread.cpp - Falls back to synchronous path only on systems without epoll Architecture: - Before: NL2SQL: query → blocking nl2sql->convert() → blocks MySQL thread - After: NL2SQL: query → JSON GENAI request → async socketpair → non-blocking JSON protocol for NL2SQL: GENAI: {"type": "nl2sql", "query": "Show customers", "schema": "mydb"} The NL2SQL result is delivered asynchronously through the existing GENAI response handler, making the system fully non-blocking. Related to: https://github.com/ProxySQL/proxysql-vec/pull/13 --- lib/GenAI_Thread.cpp | 80 ++++++++++++++++++++++++++++++++++++++++++- lib/MySQL_Session.cpp | 66 +++++++++++++++++++++++++++++------ 2 files changed, 135 insertions(+), 11 deletions(-) diff --git a/lib/GenAI_Thread.cpp b/lib/GenAI_Thread.cpp index 1f4942703..bfecf7323 100644 --- a/lib/GenAI_Thread.cpp +++ b/lib/GenAI_Thread.cpp @@ -1,4 +1,5 @@ #include "GenAI_Thread.h" +#include "AI_Features_Manager.h" #include "proxysql_debug.h" #include #include @@ -14,6 +15,9 @@ using json = nlohmann::json; +// Global AI Features Manager - needed for NL2SQL operations +extern AI_Features_Manager *GloAI; + // Platform compatibility #ifndef EFD_CLOEXEC #define EFD_CLOEXEC 0200000 @@ -1692,8 +1696,82 @@ std::string GenAI_Threads_Handler::process_json_query(const std::string& json_qu return result.dump(); } + // Handle nl2sql operation + if (op_type == "nl2sql") { + // Check if AI manager is available + if (!GloAI) { + result["error"] = "AI features manager is not initialized"; + return result.dump(); + } + + // Extract natural language query + if (!query_json.contains("query") || !query_json["query"].is_string()) { + result["error"] = "NL2SQL operation requires a 'query' string"; + return result.dump(); + } + std::string nl_query = query_json["query"].get(); + + if (nl_query.empty()) { + result["error"] = "NL2SQL query cannot be empty"; + return result.dump(); + } + + // Extract optional schema name + std::string schema_name; + if (query_json.contains("schema") && query_json["schema"].is_string()) { + schema_name = query_json["schema"].get(); + } + + // Extract optional cache flag + bool allow_cache = true; + if (query_json.contains("allow_cache") && query_json["allow_cache"].is_boolean()) { + allow_cache = query_json["allow_cache"].get(); + } + + // Get NL2SQL converter + NL2SQL_Converter* nl2sql = GloAI->get_nl2sql(); + if (!nl2sql) { + result["error"] = "NL2SQL converter is not initialized"; + return result.dump(); + } + + // Build NL2SQL request + NL2SQLRequest req; + req.natural_language = nl_query; + req.schema_name = schema_name; + req.allow_cache = allow_cache; + req.max_latency_ms = 0; // No specific latency requirement + + // Convert (this will use cache if available) + NL2SQLResult sql_result = nl2sql->convert(req); + + if (sql_result.sql_query.empty() || sql_result.sql_query.find("NL2SQL conversion failed") == 0) { + result["error"] = "Failed to convert natural language to SQL: " + sql_result.explanation; + return result.dump(); + } + + // Build result + result["columns"] = json::array({"sql_query", "confidence", "explanation", "cached"}); + + json rows = json::array(); + json row = json::array(); + row.push_back(sql_result.sql_query); + + char conf_buf[32]; + snprintf(conf_buf, sizeof(conf_buf), "%.2f", sql_result.confidence); + row.push_back(std::string(conf_buf)); + + row.push_back(sql_result.explanation); + row.push_back(sql_result.cached ? "true" : "false"); + + rows.push_back(row); + result["rows"] = rows; + + return result.dump(); + } + // Unknown operation type - result["error"] = "Unknown operation type: " + op_type + ". Use 'embed' or 'rerank'"; + result["error"] = "Unknown operation type: " + op_type + ". Use 'embed', 'rerank', or 'nl2sql'"; return result.dump(); } catch (const json::parse_error& e) { diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index fecdbde60..3042515d1 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -3875,6 +3875,9 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // Query format: // NL2SQL: Show me top 10 customers by revenue // Returns: Resultset with the generated SQL query +// +// Note: This now uses the async GENAI path to avoid blocking MySQL threads. +// The NL2SQL query is converted to a JSON GENAI request and sent asynchronously. void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___nl2sql(const char* query, size_t query_len, PtrSize_t* pkt) { // Skip leading space after "NL2SQL:" while (query_len > 0 && (*query == ' ' || *query == '\t')) { @@ -3892,10 +3895,20 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C return; } - // Check AI module is initialized + // Check GenAI module is initialized (NL2SQL now uses GenAI module) + if (!GloGATH) { + client_myds->DSS = STATE_QUERY_SENT_NET; + client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1241, (char*)"HY000", "GenAI module is not initialized", true); + l_free(pkt->size, pkt->ptr); + client_myds->DSS = STATE_SLEEP; + status = WAITING_CLIENT_DATA; + return; + } + + // Check AI manager is available for NL2SQL converter if (!GloAI) { client_myds->DSS = STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1241, (char*)"HY000", "AI features module is not initialized", true); + client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1242, (char*)"HY000", "AI features module is not initialized", true); l_free(pkt->size, pkt->ptr); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; @@ -3906,13 +3919,44 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C NL2SQL_Converter* nl2sql = GloAI->get_nl2sql(); if (!nl2sql) { client_myds->DSS = STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1242, (char*)"HY000", "NL2SQL converter is not initialized", true); + client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1243, (char*)"HY000", "NL2SQL converter is not initialized", true); l_free(pkt->size, pkt->ptr); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; return; } + // Increment total requests counter + GloAI->increment_nl2sql_total_requests(); + +#ifdef epoll_create1 + // Build JSON query for NL2SQL operation + json json_query; + json_query["type"] = "nl2sql"; + json_query["query"] = std::string(query, query_len); + json_query["allow_cache"] = true; + + // Add schema if available + if (client_myds->myconn->userinfo->schemaname) { + json_query["schema"] = std::string(client_myds->myconn->userinfo->schemaname); + } + + std::string json_str = json_query.dump(); + + // Use async GENAI path to avoid blocking + if (!handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async(json_str.c_str(), json_str.length(), pkt)) { + // Async send failed - error already sent to client + l_free(pkt->size, pkt->ptr); + client_myds->DSS = STATE_SLEEP; + status = WAITING_CLIENT_DATA; + return; + } + + // Request sent asynchronously - don't free pkt, will be freed in response handler + // Return immediately, session is now free to handle other queries + proxy_debug(PROXY_DEBUG_NL2SQL, 2, "NL2SQL: Query sent asynchronously via GenAI: %s\n", std::string(query, query_len).c_str()); +#else + // Fallback to synchronous blocking path for systems without epoll // Build NL2SQL request NL2SQLRequest req; req.natural_language = std::string(query, query_len); @@ -3920,10 +3964,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C req.allow_cache = true; req.max_latency_ms = 0; // No specific latency requirement - // Increment total requests counter - GloAI->increment_nl2sql_total_requests(); - - // Call NL2SQL converter (synchronous for Phase 2) + // Call NL2SQL converter (blocking fallback) NL2SQLResult result = nl2sql->convert(req); // Update performance counters based on result @@ -3948,9 +3989,13 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } // Update model call counters + char* prefer_local = GloGATH->get_variable((char*)"prefer_local_models"); + bool prefer_local_models = prefer_local && (strcmp(prefer_local, "true") == 0); + if (prefer_local) free(prefer_local); + if (result.provider_used == "openai") { // Check if it's a local call (Ollama) or cloud call - if (GloAI->get_variable("ai_prefer_local_models") && + if (prefer_local_models && (result.explanation.find("localhost") != std::string::npos || result.explanation.find("127.0.0.1") != std::string::npos)) { GloAI->increment_nl2sql_local_model_calls(); @@ -3967,7 +4012,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C std::string err_msg = "Failed to convert natural language to SQL: "; err_msg += result.explanation; client_myds->DSS = STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1243, (char*)"HY000", (char*)err_msg.c_str(), true); + client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1244, (char*)"HY000", (char*)err_msg.c_str(), true); l_free(pkt->size, pkt->ptr); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; @@ -4009,8 +4054,9 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; - proxy_debug(PROXY_DEBUG_NL2SQL, 2, "NL2SQL: Converted '%s' to SQL (confidence: %.2f)\n", + proxy_debug(PROXY_DEBUG_NL2SQL, 2, "NL2SQL: Converted '%s' to SQL (confidence: %.2f) [blocking fallback]\n", req.natural_language.c_str(), result.confidence); +#endif } #ifdef epoll_create1