feat: Make NL2SQL use async GenAI path instead of blocking calls

This is a critical architectural fix - NL2SQL was making blocking calls
to LLMs which would block the entire MySQL thread. Now NL2SQL uses the
same async socketpair pattern as the GENAI embed/rerank operations.

Changes:
- Added nl2sql operation type to process_json_query() in GenAI module
- Updated NL2SQL handler to construct JSON query and use async GENAI path
- Added extern declaration for GloAI in GenAI_Thread.cpp
- Falls back to synchronous path only on systems without epoll

Architecture:
- Before: NL2SQL: query → blocking nl2sql->convert() → blocks MySQL thread
- After: NL2SQL: query → JSON GENAI request → async socketpair → non-blocking

JSON protocol for NL2SQL:
  GENAI: {"type": "nl2sql", "query": "Show customers", "schema": "mydb"}

The NL2SQL result is delivered asynchronously through the existing
GENAI response handler, making the system fully non-blocking.

Related to: https://github.com/ProxySQL/proxysql-vec/pull/13
pull/5310/head
Rene Cannao 3 months ago
parent 527bfed297
commit a7dac5ef3d

@ -1,4 +1,5 @@
#include "GenAI_Thread.h"
#include "AI_Features_Manager.h"
#include "proxysql_debug.h"
#include <cstring>
#include <sstream>
@ -14,6 +15,9 @@
using json = nlohmann::json;
// Global AI Features Manager - needed for NL2SQL operations
extern AI_Features_Manager *GloAI;
// Platform compatibility
#ifndef EFD_CLOEXEC
#define EFD_CLOEXEC 0200000
@ -1692,8 +1696,82 @@ std::string GenAI_Threads_Handler::process_json_query(const std::string& json_qu
return result.dump();
}
// Handle nl2sql operation
if (op_type == "nl2sql") {
// Check if AI manager is available
if (!GloAI) {
result["error"] = "AI features manager is not initialized";
return result.dump();
}
// Extract natural language query
if (!query_json.contains("query") || !query_json["query"].is_string()) {
result["error"] = "NL2SQL operation requires a 'query' string";
return result.dump();
}
std::string nl_query = query_json["query"].get<std::string>();
if (nl_query.empty()) {
result["error"] = "NL2SQL query cannot be empty";
return result.dump();
}
// Extract optional schema name
std::string schema_name;
if (query_json.contains("schema") && query_json["schema"].is_string()) {
schema_name = query_json["schema"].get<std::string>();
}
// Extract optional cache flag
bool allow_cache = true;
if (query_json.contains("allow_cache") && query_json["allow_cache"].is_boolean()) {
allow_cache = query_json["allow_cache"].get<bool>();
}
// Get NL2SQL converter
NL2SQL_Converter* nl2sql = GloAI->get_nl2sql();
if (!nl2sql) {
result["error"] = "NL2SQL converter is not initialized";
return result.dump();
}
// Build NL2SQL request
NL2SQLRequest req;
req.natural_language = nl_query;
req.schema_name = schema_name;
req.allow_cache = allow_cache;
req.max_latency_ms = 0; // No specific latency requirement
// Convert (this will use cache if available)
NL2SQLResult sql_result = nl2sql->convert(req);
if (sql_result.sql_query.empty() || sql_result.sql_query.find("NL2SQL conversion failed") == 0) {
result["error"] = "Failed to convert natural language to SQL: " + sql_result.explanation;
return result.dump();
}
// Build result
result["columns"] = json::array({"sql_query", "confidence", "explanation", "cached"});
json rows = json::array();
json row = json::array();
row.push_back(sql_result.sql_query);
char conf_buf[32];
snprintf(conf_buf, sizeof(conf_buf), "%.2f", sql_result.confidence);
row.push_back(std::string(conf_buf));
row.push_back(sql_result.explanation);
row.push_back(sql_result.cached ? "true" : "false");
rows.push_back(row);
result["rows"] = rows;
return result.dump();
}
// Unknown operation type
result["error"] = "Unknown operation type: " + op_type + ". Use 'embed' or 'rerank'";
result["error"] = "Unknown operation type: " + op_type + ". Use 'embed', 'rerank', or 'nl2sql'";
return result.dump();
} catch (const json::parse_error& e) {

@ -3875,6 +3875,9 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// Query format:
// NL2SQL: Show me top 10 customers by revenue
// Returns: Resultset with the generated SQL query
//
// Note: This now uses the async GENAI path to avoid blocking MySQL threads.
// The NL2SQL query is converted to a JSON GENAI request and sent asynchronously.
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___nl2sql(const char* query, size_t query_len, PtrSize_t* pkt) {
// Skip leading space after "NL2SQL:"
while (query_len > 0 && (*query == ' ' || *query == '\t')) {
@ -3892,10 +3895,20 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return;
}
// Check AI module is initialized
// Check GenAI module is initialized (NL2SQL now uses GenAI module)
if (!GloGATH) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1241, (char*)"HY000", "GenAI module is not initialized", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Check AI manager is available for NL2SQL converter
if (!GloAI) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1241, (char*)"HY000", "AI features module is not initialized", true);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1242, (char*)"HY000", "AI features module is not initialized", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
@ -3906,13 +3919,44 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
NL2SQL_Converter* nl2sql = GloAI->get_nl2sql();
if (!nl2sql) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1242, (char*)"HY000", "NL2SQL converter is not initialized", true);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1243, (char*)"HY000", "NL2SQL converter is not initialized", true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Increment total requests counter
GloAI->increment_nl2sql_total_requests();
#ifdef epoll_create1
// Build JSON query for NL2SQL operation
json json_query;
json_query["type"] = "nl2sql";
json_query["query"] = std::string(query, query_len);
json_query["allow_cache"] = true;
// Add schema if available
if (client_myds->myconn->userinfo->schemaname) {
json_query["schema"] = std::string(client_myds->myconn->userinfo->schemaname);
}
std::string json_str = json_query.dump();
// Use async GENAI path to avoid blocking
if (!handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async(json_str.c_str(), json_str.length(), pkt)) {
// Async send failed - error already sent to client
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return;
}
// Request sent asynchronously - don't free pkt, will be freed in response handler
// Return immediately, session is now free to handle other queries
proxy_debug(PROXY_DEBUG_NL2SQL, 2, "NL2SQL: Query sent asynchronously via GenAI: %s\n", std::string(query, query_len).c_str());
#else
// Fallback to synchronous blocking path for systems without epoll
// Build NL2SQL request
NL2SQLRequest req;
req.natural_language = std::string(query, query_len);
@ -3920,10 +3964,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
req.allow_cache = true;
req.max_latency_ms = 0; // No specific latency requirement
// Increment total requests counter
GloAI->increment_nl2sql_total_requests();
// Call NL2SQL converter (synchronous for Phase 2)
// Call NL2SQL converter (blocking fallback)
NL2SQLResult result = nl2sql->convert(req);
// Update performance counters based on result
@ -3948,9 +3989,13 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
// Update model call counters
char* prefer_local = GloGATH->get_variable((char*)"prefer_local_models");
bool prefer_local_models = prefer_local && (strcmp(prefer_local, "true") == 0);
if (prefer_local) free(prefer_local);
if (result.provider_used == "openai") {
// Check if it's a local call (Ollama) or cloud call
if (GloAI->get_variable("ai_prefer_local_models") &&
if (prefer_local_models &&
(result.explanation.find("localhost") != std::string::npos ||
result.explanation.find("127.0.0.1") != std::string::npos)) {
GloAI->increment_nl2sql_local_model_calls();
@ -3967,7 +4012,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
std::string err_msg = "Failed to convert natural language to SQL: ";
err_msg += result.explanation;
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1243, (char*)"HY000", (char*)err_msg.c_str(), true);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 1244, (char*)"HY000", (char*)err_msg.c_str(), true);
l_free(pkt->size, pkt->ptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
@ -4009,8 +4054,9 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
proxy_debug(PROXY_DEBUG_NL2SQL, 2, "NL2SQL: Converted '%s' to SQL (confidence: %.2f)\n",
proxy_debug(PROXY_DEBUG_NL2SQL, 2, "NL2SQL: Converted '%s' to SQL (confidence: %.2f) [blocking fallback]\n",
req.natural_language.c_str(), result.confidence);
#endif
}
#ifdef epoll_create1

Loading…
Cancel
Save