@ -3711,899 +3711,6 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return false ;
}
# ifdef PROXYSQLGENAI
// Handler for GENAI: queries - experimental GenAI integration
// Query formats:
// GENAI: {"type": "embed", "documents": ["doc1", "doc2", ...]}
// GENAI: {"type": "rerank", "query": "...", "documents": [...], "top_n": 5, "columns": 3}
// Returns: Resultset with embeddings or reranked documents
void MySQL_Session : : handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___genai ( const char * query , size_t query_len , PtrSize_t * pkt ) {
// Skip leading space after "GENAI:"
while ( query_len > 0 & & ( * query = = ' ' | | * query = = ' \t ' ) ) {
query + + ;
query_len - - ;
}
if ( query_len = = 0 ) {
// Empty query after GENAI:
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1234 , ( char * ) " HY000 " , " Empty GENAI: query " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Check GenAI module is initialized
if ( ! GloGATH ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1237 , ( char * ) " HY000 " , " GenAI module is not initialized " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
if ( ! GloGATH - > variables . genai_enabled ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1238 , ( char * ) " HY000 " ,
" GenAI is disabled (set genai-enabled=true) " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
# ifdef epoll_create1
// Use async path with socketpair for non-blocking operation
if ( ! handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async ( query , query_len , pkt ) ) {
// Async send failed - error already sent to client
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Request sent asynchronously - don't free pkt, will be freed in response handler
// Return immediately, session is now free to handle other queries
proxy_debug ( PROXY_DEBUG_GENAI , 3 , " GenAI: Query sent asynchronously, session continuing \n " ) ;
# else
// Fallback to synchronous blocking path for systems without epoll
// Pass JSON query to GenAI module for autonomous processing
std : : string json_query ( query , query_len ) ;
std : : string result_json = GloGATH - > process_json_query ( json_query ) ;
if ( result_json . empty ( ) ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1250 , ( char * ) " HY000 " , " GenAI query processing failed " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Parse the JSON result and build MySQL resultset
try {
json result = json : : parse ( result_json ) ;
if ( ! result . is_object ( ) ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1251 , ( char * ) " HY000 " , " GenAI returned invalid result format " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Check if result is an error
if ( result . contains ( " error " ) & & result [ " error " ] . is_string ( ) ) {
std : : string error_msg = result [ " error " ] . get < std : : string > ( ) ;
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1252 , ( char * ) " HY000 " , ( char * ) error_msg . c_str ( ) , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Extract resultset data
if ( ! result . contains ( " columns " ) | | ! result [ " columns " ] . is_array ( ) ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1253 , ( char * ) " HY000 " , " GenAI result missing 'columns' field " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
if ( ! result . contains ( " rows " ) | | ! result [ " rows " ] . is_array ( ) ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1254 , ( char * ) " HY000 " , " GenAI result missing 'rows' field " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
auto columns = result [ " columns " ] ;
auto rows = result [ " rows " ] ;
// Build SQLite3 resultset
std : : unique_ptr < SQLite3_result > resultset ( new SQLite3_result ( columns . size ( ) ) ) ;
// Add column definitions
for ( size_t i = 0 ; i < columns . size ( ) ; i + + ) {
if ( columns [ i ] . is_string ( ) ) {
std : : string col_name = columns [ i ] . get < std : : string > ( ) ;
resultset - > add_column_definition ( SQLITE_TEXT , ( char * ) col_name . c_str ( ) ) ;
}
}
// Add rows
for ( const auto & row : rows ) {
if ( ! row . is_array ( ) ) continue ;
// Create row data array
char * * row_data = ( char * * ) malloc ( columns . size ( ) * sizeof ( char * ) ) ;
size_t valid_cols = 0 ;
for ( size_t i = 0 ; i < columns . size ( ) & & i < row . size ( ) ; i + + ) {
if ( row [ i ] . is_string ( ) ) {
std : : string val = row [ i ] . get < std : : string > ( ) ;
row_data [ valid_cols + + ] = strdup ( val . c_str ( ) ) ;
} else if ( row [ i ] . is_null ( ) ) {
row_data [ valid_cols + + ] = NULL ;
} else {
// Convert to string
std : : string val = row [ i ] . dump ( ) ;
// Remove quotes if present
if ( val . size ( ) > = 2 & & val [ 0 ] = = ' " ' & & val [ val . size ( ) - 1 ] = = ' " ' ) {
val = val . substr ( 1 , val . size ( ) - 2 ) ;
}
row_data [ valid_cols + + ] = strdup ( val . c_str ( ) ) ;
}
}
resultset - > add_row ( row_data ) ;
// Free row data
for ( size_t i = 0 ; i < valid_cols ; i + + ) {
if ( row_data [ i ] ) free ( row_data [ i ] ) ;
}
free ( row_data ) ;
}
// Send resultset to client
SQLite3_to_MySQL ( resultset . get ( ) , NULL , 0 , & client_myds - > myprot , false ,
( client_myds - > myconn - > options . client_flag & CLIENT_DEPRECATE_EOF ) ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
} catch ( const json : : parse_error & e ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
std : : string err_msg = " Failed to parse GenAI result: " ;
err_msg + = e . what ( ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1255 , ( char * ) " HY000 " , ( char * ) err_msg . c_str ( ) , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
} catch ( const std : : exception & e ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
std : : string err_msg = " Error processing GenAI result: " ;
err_msg + = e . what ( ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1256 , ( char * ) " HY000 " , ( char * ) err_msg . c_str ( ) , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
}
# endif // epoll_create1 - fallback blocking path
}
// Handler for LLM: queries - Generic LLM bridge processing
// Query format:
// LLM: Summarize the customer feedback
// LLM: Generate a Python function to validate emails
// LLM: Explain this SQL query: SELECT * FROM users
// Returns: Resultset with the text response from LLM
//
// Note: This now uses the async GENAI path to avoid blocking MySQL threads.
// The LLM query is converted to a JSON GENAI request and sent asynchronously.
void MySQL_Session : : handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___llm ( const char * query , size_t query_len , PtrSize_t * pkt ) {
// Skip leading space after "LLM:"
while ( query_len > 0 & & ( * query = = ' ' | | * query = = ' \t ' ) ) {
query + + ;
query_len - - ;
}
if ( query_len = = 0 ) {
// Empty query after LLM:
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1240 , ( char * ) " HY000 " , " Empty LLM: query " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Check GenAI module is initialized (LLM now uses GenAI module)
if ( ! GloGATH ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1241 , ( char * ) " HY000 " , " GenAI module is not initialized " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
if ( ! GloGATH - > variables . genai_enabled | | ! GloGATH - > variables . genai_llm_enabled ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1245 , ( char * ) " HY000 " ,
" LLM is disabled (set genai-enabled=true and genai-llm_enabled=true) " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Check AI manager is available for LLM bridge
if ( ! GloAI ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1242 , ( char * ) " HY000 " , " AI features module is not initialized " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Get LLM bridge from AI manager
LLM_Bridge * llm_bridge = GloAI - > get_llm_bridge ( ) ;
if ( ! llm_bridge ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1243 , ( char * ) " HY000 " , " LLM bridge is not initialized " , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Increment total requests counter
GloAI - > increment_llm_total_requests ( ) ;
# ifdef epoll_create1
// Build JSON query for LLM operation
json json_query ;
json_query [ " type " ] = " llm " ;
json_query [ " prompt " ] = std : : string ( query , query_len ) ;
json_query [ " allow_cache " ] = true ;
// Add schema if available (for context)
if ( client_myds - > myconn - > userinfo - > schemaname ) {
json_query [ " schema " ] = std : : string ( client_myds - > myconn - > userinfo - > schemaname ) ;
}
std : : string json_str = json_query . dump ( ) ;
// Use async GENAI path to avoid blocking
if ( ! handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async ( json_str . c_str ( ) , json_str . length ( ) , pkt ) ) {
// Async send failed - error already sent to client
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Request sent asynchronously - don't free pkt, will be freed in response handler
// Return immediately, session is now free to handle other queries
proxy_debug ( PROXY_DEBUG_GENAI , 2 , " LLM: Query sent asynchronously via GenAI: %s \n " , std : : string ( query , query_len ) . c_str ( ) ) ;
# else
// Fallback to synchronous blocking path for systems without epoll
// Build LLM request
LLMRequest req ;
req . prompt = std : : string ( query , query_len ) ;
req . schema_name = client_myds - > myconn - > userinfo - > schemaname ? client_myds - > myconn - > userinfo - > schemaname : " " ;
req . allow_cache = true ;
req . max_latency_ms = 0 ; // No specific latency requirement
// Call LLM bridge (blocking fallback)
LLMResult result = llm_bridge - > process ( req ) ;
// Update performance counters based on result
if ( result . cache_hit ) {
GloAI - > increment_llm_cache_hits ( ) ;
} else {
GloAI - > increment_llm_cache_misses ( ) ;
}
// Update timing counters
GloAI - > add_llm_response_time_ms ( result . total_time_ms ) ;
GloAI - > add_llm_cache_lookup_time_ms ( result . cache_lookup_time_ms ) ;
GloAI - > increment_llm_cache_lookups ( ) ;
if ( result . cache_hit ) {
// For cache hits, we're done
} else {
// For cache misses, also count LLM call time and cache store time
GloAI - > add_llm_cache_store_time_ms ( result . cache_store_time_ms ) ;
if ( result . cache_store_time_ms > 0 ) {
GloAI - > increment_llm_cache_stores ( ) ;
}
// Update model call counters
char * prefer_local = GloGATH - > get_variable ( ( char * ) " prefer_local_models " ) ;
bool prefer_local_models = prefer_local & & ( strcmp ( prefer_local , " true " ) = = 0 ) ;
if ( prefer_local ) free ( prefer_local ) ;
if ( result . provider_used = = " openai " ) {
// Check if it's a local call (Ollama) or cloud call
if ( prefer_local_models & &
( result . explanation . find ( " localhost " ) ! = std : : string : : npos | |
result . explanation . find ( " 127.0.0.1 " ) ! = std : : string : : npos ) ) {
GloAI - > increment_llm_local_model_calls ( ) ;
} else {
GloAI - > increment_llm_cloud_model_calls ( ) ;
}
} else if ( result . provider_used = = " anthropic " ) {
GloAI - > increment_llm_cloud_model_calls ( ) ;
}
}
if ( result . text_response . empty ( ) & & ! result . error_code . empty ( ) ) {
// LLM processing failed
std : : string err_msg = " LLM processing failed: " ;
err_msg + = result . error_code ;
if ( ! result . error_details . empty ( ) ) {
err_msg + = " - " ;
err_msg + = result . error_details ;
}
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1244 , ( char * ) " HY000 " , ( char * ) err_msg . c_str ( ) , true ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
return ;
}
// Build resultset with the generated text response
std : : vector < std : : string > columns = { " text_response " , " explanation " , " cached " , " provider " } ;
std : : unique_ptr < SQLite3_result > resultset ( new SQLite3_result ( columns . size ( ) ) ) ;
// Add column definitions
for ( size_t i = 0 ; i < columns . size ( ) ; i + + ) {
resultset - > add_column_definition ( SQLITE_TEXT , ( char * ) columns [ i ] . c_str ( ) ) ;
}
// Add single row with the result
char * * row_data = ( char * * ) malloc ( columns . size ( ) * sizeof ( char * ) ) ;
row_data [ 0 ] = strdup ( result . text_response . c_str ( ) ) ;
row_data [ 1 ] = strdup ( result . explanation . c_str ( ) ) ;
row_data [ 2 ] = strdup ( result . cached ? " true " : " false " ) ;
row_data [ 3 ] = strdup ( result . provider_used . c_str ( ) ) ;
resultset - > add_row ( row_data ) ;
// Free row data
for ( size_t i = 0 ; i < columns . size ( ) ; i + + ) {
free ( row_data [ i ] ) ;
}
free ( row_data ) ;
// Send resultset to client
SQLite3_to_MySQL ( resultset . get ( ) , NULL , 0 , & client_myds - > myprot , false ,
( client_myds - > myconn - > options . client_flag & CLIENT_DEPRECATE_EOF ) ) ;
l_free ( pkt - > size , pkt - > ptr ) ;
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
proxy_debug ( PROXY_DEBUG_GENAI , 2 , " LLM: Processed prompt '%s' [blocking fallback] \n " ,
req . prompt . c_str ( ) ) ;
# endif
}
# ifdef epoll_create1
/**
* @ brief Send GenAI request asynchronously via socketpair
*
* This function implements the non - blocking async GenAI request path . It creates
* a socketpair for bidirectional communication with the GenAI module and sends
* the request immediately without waiting for the response .
*
* Async flow :
* 1. Create socketpair ( fds ) for bidirectional communication
* 2. Register fds [ 1 ] ( GenAI side ) with GenAI module via register_client ( )
* 3. Store request in pending_genai_requests_ map for later response matching
* 4. Send GenAI_RequestHeader + JSON query via fds [ 0 ]
* 5. Add fds [ 0 ] to session ' s genai_epoll_fd_ for response notification
* 6. Return immediately ( MySQL thread is now free to process other queries )
*
* The response will be delivered asynchronously and handled by
* handle_genai_response ( ) when the GenAI worker completes processing .
*
* Error handling :
* - On socketpair failure : Send ERR packet to client , return false
* - On register_client failure : Cleanup fds , send ERR packet , return false
* - On write failure : Cleanup request via genai_cleanup_request ( ) , send ERR packet
* - On epoll add failure : Log warning but continue ( request was sent successfully )
*
* Memory management :
* - Original packet is copied to pending . original_pkt for response generation
* - Memory is freed in genai_cleanup_request ( ) when response is processed
*
* @ param query JSON query string to send to GenAI module
* @ param query_len Length of the query string
* @ param pkt Original MySQL packet ( for command number and later response )
* @ return true if request was sent successfully , false on error
*
* @ note This function is non - blocking and returns immediately after sending .
* The actual GenAI processing happens in worker threads , not MySQL threads .
* @ see handle_genai_response ( ) , genai_cleanup_request ( ) , check_genai_events ( )
*/
bool MySQL_Session : : handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___genai_send_async (
const char * query , size_t query_len , PtrSize_t * pkt ) {
// Create socketpair for async communication
int fds [ 2 ] ;
if ( socketpair ( AF_UNIX , SOCK_STREAM , 0 , fds ) < 0 ) {
proxy_error ( " GenAI: socketpair failed: %s \n " , strerror ( errno ) ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1260 , ( char * ) " HY000 " ,
" Failed to create GenAI communication channel " , true ) ;
return false ;
}
// Set MySQL side to non-blocking
int flags = fcntl ( fds [ 0 ] , F_GETFL , 0 ) ;
fcntl ( fds [ 0 ] , F_SETFL , flags | O_NONBLOCK ) ;
// Register GenAI side with GenAI module
if ( ! GloGATH - > register_client ( fds [ 1 ] ) ) {
proxy_error ( " GenAI: Failed to register client fd %d with GenAI module \n " , fds [ 1 ] ) ;
close ( fds [ 0 ] ) ;
close ( fds [ 1 ] ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1261 , ( char * ) " HY000 " ,
" Failed to register with GenAI module " , true ) ;
return false ;
}
// Prepare request header
GenAI_RequestHeader hdr ;
hdr . request_id = next_genai_request_id_ + + ;
hdr . operation = GENAI_OP_JSON ;
hdr . query_len = query_len ;
hdr . flags = 0 ;
hdr . top_n = 0 ;
// Store request in pending map
GenAI_PendingRequest pending ;
pending . request_id = hdr . request_id ;
pending . client_fd = fds [ 0 ] ;
pending . json_query = std : : string ( query , query_len ) ;
pending . start_time = std : : chrono : : steady_clock : : now ( ) ;
// Copy the original packet for later response
pending . original_pkt = ( PtrSize_t * ) malloc ( sizeof ( PtrSize_t ) ) ;
if ( ! pending . original_pkt ) {
proxy_error ( " GenAI: Failed to allocate memory for packet copy \n " ) ;
close ( fds [ 0 ] ) ;
close ( fds [ 1 ] ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1262 , ( char * ) " HY000 " ,
" Memory allocation failed " , true ) ;
return false ;
}
pending . original_pkt - > ptr = pkt - > ptr ;
pending . original_pkt - > size = pkt - > size ;
pending_genai_requests_ [ hdr . request_id ] = pending ;
// Send request header
ssize_t written = write ( fds [ 0 ] , & hdr , sizeof ( hdr ) ) ;
if ( written ! = sizeof ( hdr ) ) {
proxy_error ( " GenAI: Failed to write request header to fd %d: %s \n " ,
fds [ 0 ] , strerror ( errno ) ) ;
auto it = pending_genai_requests_ . find ( hdr . request_id ) ;
if ( it ! = pending_genai_requests_ . end ( ) & & it - > second . original_pkt ) {
it - > second . original_pkt - > ptr = nullptr ;
it - > second . original_pkt - > size = 0 ;
}
genai_cleanup_request ( hdr . request_id ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1263 , ( char * ) " HY000 " ,
" Failed to send request to GenAI module " , true ) ;
return false ;
}
// Send JSON query
size_t total_written = 0 ;
while ( total_written < query_len ) {
ssize_t w = write ( fds [ 0 ] , query + total_written , query_len - total_written ) ;
if ( w < = 0 ) {
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK ) {
usleep ( 1000 ) ;
continue ;
}
proxy_error ( " GenAI: Failed to write JSON query to fd %d: %s \n " ,
fds [ 0 ] , strerror ( errno ) ) ;
auto it = pending_genai_requests_ . find ( hdr . request_id ) ;
if ( it ! = pending_genai_requests_ . end ( ) & & it - > second . original_pkt ) {
it - > second . original_pkt - > ptr = nullptr ;
it - > second . original_pkt - > size = 0 ;
}
genai_cleanup_request ( hdr . request_id ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1264 , ( char * ) " HY000 " ,
" Failed to send query to GenAI module " , true ) ;
return false ;
}
total_written + = w ;
}
// Add to epoll for response notification
struct epoll_event ev ;
ev . events = EPOLLIN ;
ev . data . fd = fds [ 0 ] ;
if ( epoll_ctl ( genai_epoll_fd_ , EPOLL_CTL_ADD , fds [ 0 ] , & ev ) < 0 ) {
proxy_error ( " GenAI: Failed to add fd %d to epoll: %s \n " , fds [ 0 ] , strerror ( errno ) ) ;
// Request is sent, but we won't be notified of response
// This is not fatal - we'll timeout eventually
}
proxy_debug ( PROXY_DEBUG_GENAI , 3 ,
" GenAI: Sent async request %lu via fd %d (query_len=%zu) \n " ,
hdr . request_id , fds [ 0 ] , query_len ) ;
return true ; // Success - request sent asynchronously
}
/**
* @ brief Handle GenAI response from socketpair
*
* This function is called when epoll notifies that data is available on a
* GenAI response file descriptor . It reads the response from the GenAI worker
* thread , processes the result , and sends the MySQL result packet to the client .
*
* Response handling flow :
* 1. Read GenAI_ResponseHeader from socketpair
* 2. Find matching pending request via request_id in pending_genai_requests_
* 3. Read JSON result payload ( if result_len > 0 )
* 4. Parse JSON and convert to MySQL resultset format
* 5. Send result packet ( or ERR packet on error ) to client
* 6. Cleanup resources via genai_cleanup_request ( )
*
* Response format ( from GenAI worker ) :
* - GenAI_ResponseHeader ( request_id , status_code , result_len , processing_time_ms )
* - JSON result payload ( if result_len > 0 )
*
* Error handling :
* - On read error : Find and cleanup pending request , return
* - On incomplete header : Log error , return
* - On unknown request_id : Log error , close fd , return
* - On status_code ! = 0 : Send ERR packet to client with error details
* - On JSON parse error : Send ERR packet to client
*
* RTT ( Round - Trip Time ) tracking :
* - Calculates RTT from request start to response receipt
* - Logs RTT along with GenAI processing time for monitoring
*
* @ param fd The MySQL side file descriptor from socketpair ( fds [ 0 ] )
*
* @ note This function is called from check_genai_events ( ) which is invoked
* from the main handler ( ) loop . It runs in the MySQL thread context .
* @ see genai_send_async ( ) , genai_cleanup_request ( ) , check_genai_events ( )
*/
void MySQL_Session : : handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response ( int fd ) {
// Read response header
GenAI_ResponseHeader resp ;
ssize_t n = read ( fd , & resp , sizeof ( resp ) ) ;
if ( n < 0 ) {
// Check for non-blocking read - not an error, just no data yet
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK ) {
return ;
}
// Real error - log and cleanup
proxy_error ( " GenAI: Error reading response header from fd %d: %s \n " ,
fd , strerror ( errno ) ) ;
} else if ( n = = 0 ) {
// Connection closed (EOF) - cleanup
} else {
// Successfully read header, continue processing
goto process_response ;
}
// Cleanup path for error or EOF
for ( auto & pair : pending_genai_requests_ ) {
if ( pair . second . client_fd = = fd ) {
genai_cleanup_request ( pair . first ) ;
break ;
}
}
return ;
process_response :
if ( n ! = sizeof ( resp ) ) {
proxy_error ( " GenAI: Incomplete response header from fd %d: got %zd, expected %zu \n " ,
fd , n , sizeof ( resp ) ) ;
return ;
}
// Find the pending request
auto it = pending_genai_requests_ . find ( resp . request_id ) ;
if ( it = = pending_genai_requests_ . end ( ) ) {
proxy_error ( " GenAI: Received response for unknown request %lu \n " , resp . request_id ) ;
close ( fd ) ;
return ;
}
GenAI_PendingRequest & pending = it - > second ;
// Read JSON result
std : : string json_result ;
if ( resp . result_len > 0 ) {
json_result . resize ( resp . result_len ) ;
size_t total_read = 0 ;
while ( total_read < resp . result_len ) {
ssize_t r = read ( fd , & json_result [ total_read ] ,
resp . result_len - total_read ) ;
if ( r < = 0 ) {
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK ) {
usleep ( 1000 ) ;
continue ;
}
proxy_error ( " GenAI: Error reading JSON result from fd %d: %s \n " ,
fd , strerror ( errno ) ) ;
json_result . clear ( ) ;
break ;
}
total_read + = r ;
}
}
// Process the result
auto end_time = std : : chrono : : steady_clock : : now ( ) ;
int rtt_ms = std : : chrono : : duration_cast < std : : chrono : : milliseconds > (
end_time - pending . start_time ) . count ( ) ;
proxy_debug ( PROXY_DEBUG_GENAI , 3 ,
" GenAI: Received response %lu (status=%u, result_len=%u, rtt=%dms, proc=%dms) \n " ,
resp . request_id , resp . status_code , resp . result_len , rtt_ms , resp . processing_time_ms ) ;
// Check for errors
if ( resp . status_code ! = 0 | | json_result . empty ( ) ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1265 , ( char * ) " HY000 " ,
" GenAI query processing failed " , true ) ;
} else {
// Parse JSON result and send resultset
try {
json result = json : : parse ( json_result ) ;
if ( ! result . is_object ( ) ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1266 , ( char * ) " HY000 " ,
" GenAI returned invalid result format " , true ) ;
} else if ( result . contains ( " error " ) & & result [ " error " ] . is_string ( ) ) {
std : : string error_msg = result [ " error " ] . get < std : : string > ( ) ;
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1267 , ( char * ) " HY000 " ,
( char * ) error_msg . c_str ( ) , true ) ;
} else if ( ! result . contains ( " columns " ) | | ! result . contains ( " rows " ) ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1268 , ( char * ) " HY000 " ,
" GenAI result missing required fields " , true ) ;
} else {
// Build and send resultset
auto columns = result [ " columns " ] ;
auto rows = result [ " rows " ] ;
std : : unique_ptr < SQLite3_result > resultset ( new SQLite3_result ( columns . size ( ) ) ) ;
// Add column definitions
for ( size_t i = 0 ; i < columns . size ( ) ; i + + ) {
if ( columns [ i ] . is_string ( ) ) {
std : : string col_name = columns [ i ] . get < std : : string > ( ) ;
resultset - > add_column_definition ( SQLITE_TEXT , ( char * ) col_name . c_str ( ) ) ;
}
}
// Add rows
for ( const auto & row : rows ) {
if ( ! row . is_array ( ) ) continue ;
size_t num_cols = row . size ( ) ;
if ( num_cols > columns . size ( ) ) num_cols = columns . size ( ) ;
char * * row_data = ( char * * ) malloc ( num_cols * sizeof ( char * ) ) ;
size_t valid_cols = 0 ;
for ( size_t i = 0 ; i < num_cols ; i + + ) {
if ( ! row [ i ] . is_null ( ) ) {
std : : string val ;
if ( row [ i ] . is_string ( ) ) {
val = row [ i ] . get < std : : string > ( ) ;
} else {
val = row [ i ] . dump ( ) ;
}
row_data [ valid_cols + + ] = strdup ( val . c_str ( ) ) ;
}
}
resultset - > add_row ( row_data ) ;
for ( size_t i = 0 ; i < valid_cols ; i + + ) {
if ( row_data [ i ] ) free ( row_data [ i ] ) ;
}
free ( row_data ) ;
}
// Send resultset to client
SQLite3_to_MySQL ( resultset . get ( ) , NULL , 0 , & client_myds - > myprot , false ,
( client_myds - > myconn - > options . client_flag & CLIENT_DEPRECATE_EOF ) ) ;
}
} catch ( const json : : parse_error & e ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
std : : string err_msg = " Failed to parse GenAI result: " ;
err_msg + = e . what ( ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1269 , ( char * ) " HY000 " ,
( char * ) err_msg . c_str ( ) , true ) ;
} catch ( const std : : exception & e ) {
client_myds - > DSS = STATE_QUERY_SENT_NET ;
std : : string err_msg = " Error processing GenAI result: " ;
err_msg + = e . what ( ) ;
client_myds - > myprot . generate_pkt_ERR ( true , NULL , NULL , 1 , 1270 , ( char * ) " HY000 " ,
( char * ) err_msg . c_str ( ) , true ) ;
}
}
// Cleanup the request
genai_cleanup_request ( resp . request_id ) ;
// Return to waiting state
client_myds - > DSS = STATE_SLEEP ;
status = WAITING_CLIENT_DATA ;
}
/**
* @ brief Cleanup a GenAI pending request
*
* This function cleans up all resources associated with a GenAI pending request .
* It is called after a response has been processed or when an error occurs .
*
* Cleanup operations :
* 1. Remove request from pending_genai_requests_ map
* 2. Close the socketpair file descriptor ( client_fd )
* 3. Remove fd from genai_epoll_fd_ monitoring
* 4. Free the original packet memory ( original_pkt )
*
* Resource cleanup details :
* - client_fd : The MySQL side of the socketpair ( fds [ 0 ] ) is closed
* - epoll : The fd is removed from genai_epoll_fd_ to stop monitoring
* - original_pkt : The copied packet memory is freed ( ptr and size )
* - pending map : The request entry is removed from the map
*
* This function must be called exactly once per request to avoid :
* - File descriptor leaks ( unclosed sockets )
* - Memory leaks ( unfreed packets )
* - Epoll monitoring stale fds ( removed from map but still in epoll )
*
* @ param request_id The request ID to cleanup ( must exist in pending_genai_requests_ )
*
* @ note This function is idempotent - if the request_id is not found , it safely
* returns without error ( useful for error paths where cleanup might be
* called multiple times ) .
* @ note If the request is not found in the map , this function silently returns
* without error ( this is intentional to avoid crashes on double cleanup ) .
*
* @ see genai_send_async ( ) , handle_genai_response ( )
*/
void MySQL_Session : : genai_cleanup_request ( uint64_t request_id ) {
auto it = pending_genai_requests_ . find ( request_id ) ;
if ( it = = pending_genai_requests_ . end ( ) ) {
return ;
}
GenAI_PendingRequest & pending = it - > second ;
// Remove from epoll
epoll_ctl ( genai_epoll_fd_ , EPOLL_CTL_DEL , pending . client_fd , nullptr ) ;
// Close socketpair fds
close ( pending . client_fd ) ;
// Free the original packet
if ( pending . original_pkt ) {
l_free ( pending . original_pkt - > size , pending . original_pkt - > ptr ) ;
free ( pending . original_pkt ) ;
}
pending_genai_requests_ . erase ( it ) ;
proxy_debug ( PROXY_DEBUG_GENAI , 3 , " GenAI: Cleaned up request %lu \n " , request_id ) ;
}
/**
* @ brief Check for pending GenAI responses
*
* This function performs a non - blocking epoll_wait on the session ' s GenAI epoll
* file descriptor to check if any responses from GenAI workers are ready to be
* processed . It is called from the main handler ( ) loop in the WAITING_CLIENT_DATA
* state to interleave GenAI response processing with normal client query handling .
*
* Event checking flow :
* 1. Early return if no pending requests ( empty pending_genai_requests_ )
* 2. Non - blocking epoll_wait with timeout = 0 on genai_epoll_fd_
* 3. For each ready fd , find matching pending request
* 4. Call handle_genai_response ( ) to process the response
* 5. Return true after processing one response ( to re - check for more )
*
* Integration with main loop :
* ` ` ` cpp
* handler_again :
* switch ( status ) {
* case WAITING_CLIENT_DATA :
* handler___status_WAITING_CLIENT_DATA ( ) ;
* # ifdef epoll_create1
* // Check for GenAI responses before processing new client data
* if ( check_genai_events ( ) ) {
* // GenAI response was processed, check for more
* goto handler_again ;
* }
* # endif
* break ;
* }
* ` ` `
*
* Non - blocking behavior :
* - epoll_wait timeout is 0 ( immediate return )
* - Returns true only if a response was actually processed
* - Allows main loop to continue processing client queries between responses
*
* Return value :
* - true : A GenAI response was processed ( caller should re - check for more )
* - false : No responses ready ( caller can proceed to normal client handling )
*
* @ return true if a GenAI response was processed , false otherwise
*
* @ note This function is called from the main handler ( ) loop on every iteration
* when in WAITING_CLIENT_DATA state . It must return quickly to avoid
* delaying normal client query processing .
* @ note Only processes one response per call to avoid starving client handling .
* The main loop will call again to process additional responses .
*
* @ see handle_genai_response ( ) , genai_send_async ( )
*/
bool MySQL_Session : : check_genai_events ( ) {
# ifdef epoll_create1
if ( pending_genai_requests_ . empty ( ) ) {
return false ;
}
const int MAX_EVENTS = 16 ;
struct epoll_event events [ MAX_EVENTS ] ;
int nfds = epoll_wait ( genai_epoll_fd_ , events , MAX_EVENTS , 0 ) ; // Non-blocking check
if ( nfds < = 0 ) {
return false ;
}
for ( int i = 0 ; i < nfds ; i + + ) {
int fd = events [ i ] . data . fd ;
// Find the pending request for this fd
for ( auto it = pending_genai_requests_ . begin ( ) ; it ! = pending_genai_requests_ . end ( ) ; + + it ) {
if ( it - > second . client_fd = = fd ) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_genai_response ( fd ) ;
return true ; // Processed one response
}
}
}
return false ;
# else
return false ;
# endif
}
# endif /* epoll_create1 (was mislabeled PROXYSQLGENAI in original) */
# endif /* PROXYSQLGENAI (the outer block opened just before the GENAI: handler) */
// this function was inline inside MySQL_Session::get_pkts_from_client
// where:
@ -6146,13 +5253,13 @@ handler_again:
case WAITING_CLIENT_DATA :
// housekeeping
handler___status_WAITING_CLIENT_DATA ( ) ;
# ifdef epoll_create1
// Check for GenAI responses before processing new client data
if ( check_genai_events ( ) ) {
// GenAI response was processed, check for more
goto handler_again ;
}
# endif
// In-core check_genai_events() poll-loop hook removed in Step 4
// of the GenAI plugin carve-out -- the async-genai socketpair
// protocol it monitored is gone with the GENAI:/LLM: prefix
// handlers. Plugins that need their own per-session polling
// hook will get one through a future ABI extension; today the
// only consumer (the genai plugin) handles everything inline
// from the query hook.
break ;
case FAST_FORWARD :
if ( mybe - > server_myds - > mypolls = = NULL ) {
@ -7251,26 +6358,12 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
bool exit_after_SetParse = true ;
unsigned char command_type = * ( ( unsigned char * ) pkt - > ptr + sizeof ( mysql_hdr ) ) ;
# ifdef PROXYSQLGENAI
// Check for GENAI: queries - experimental GenAI integration
if ( pkt - > size > sizeof ( mysql_hdr ) + 7 ) { // Need at least "GENAI: " (7 chars after header)
const char * query_ptr = ( const char * ) pkt - > ptr + sizeof ( mysql_hdr ) + 1 ;
size_t query_len = pkt - > size - sizeof ( mysql_hdr ) - 1 ;
if ( query_len > = 7 & & strncasecmp ( query_ptr , " GENAI: " , 6 ) = = 0 ) {
// This is a GENAI: query - handle with GenAI module
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___genai ( query_ptr + 6 , query_len - 6 , pkt ) ;
return true ;
}
// Check for LLM: queries - Generic LLM bridge processing
if ( query_len > = 5 & & strncasecmp ( query_ptr , " LLM: " , 4 ) = = 0 ) {
// This is a LLM: query - handle with LLM bridge
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___llm ( query_ptr + 4 , query_len - 4 , pkt ) ;
return true ;
}
}
# endif // PROXYSQLGENAI
// The "GENAI:" / "LLM:" query-prefix escape hatches were removed in
// Step 4 of the GenAI plugin carve-out (decision Q2 in the design
// doc). Users now reach GenAI features through MCP, admin SQL, or
// the REST endpoint -- the in-line MySQL-protocol prefix was a
// debug/POC convenience that bypassed routing, ACLs, and the
// query processor.
if ( qpo - > new_query ) {
handler_WCD_SS_MCQ_qpo_QueryRewrite ( pkt ) ;