@ -73,6 +73,7 @@ bool check_genai_initialized(MYSQL* admin) {
*/
bool execute_genai_query ( MYSQL * client , const string & json_query , int expected_rows = - 1 ) {
string full_query = " GENAI: " + json_query ;
diag ( " Executing: %s " , full_query . c_str ( ) ) ;
int rc = mysql_query ( client , full_query . c_str ( ) ) ;
if ( rc ! = 0 ) {
@ -87,6 +88,7 @@ bool execute_genai_query(MYSQL* client, const string& json_query, int expected_r
}
int num_rows = mysql_num_rows ( res ) ;
diag ( " Result: %d rows " , num_rows ) ;
mysql_free_result ( res ) ;
if ( expected_rows > = 0 & & num_rows ! = expected_rows ) {
@ -106,17 +108,20 @@ bool execute_genai_query(MYSQL* client, const string& json_query, int expected_r
*/
bool execute_genai_query_expect_error ( MYSQL * client , const string & json_query ) {
string full_query = " GENAI: " + json_query ;
diag ( " Executing (expecting error): %s " , full_query . c_str ( ) ) ;
int rc = mysql_query ( client , full_query . c_str ( ) ) ;
// Query should either fail or return an error result set
if ( rc ! = 0 ) {
// Query failed at MySQL level - this is expected for errors
diag ( " Query failed as expected: %s " , mysql_error ( client ) ) ;
return true ;
}
MYSQL_RES * res = mysql_store_result ( client ) ;
if ( ! res ) {
// No result set - error condition
diag ( " No result set returned (treated as error) " ) ;
return true ;
}
@ -127,6 +132,7 @@ bool execute_genai_query_expect_error(MYSQL* client, const string& json_query) {
if ( num_fields > = 1 ) {
MYSQL_ROW row = mysql_fetch_row ( res ) ;
if ( row & & row [ 0 ] ) {
diag ( " Result row 0: %s " , row [ 0 ] ) ;
// Check if the first column contains "error"
if ( strstr ( row [ 0 ] , " \" error \" " ) | | strstr ( row [ 0 ] , " error " ) ) {
has_error = true ;
@ -134,6 +140,12 @@ bool execute_genai_query_expect_error(MYSQL* client, const string& json_query) {
}
}
if ( ! has_error ) {
diag ( " Query succeeded but error was expected " ) ;
} else {
diag ( " Found error message in result set as expected " ) ;
}
mysql_free_result ( res ) ;
return has_error ;
}
@ -147,7 +159,7 @@ bool execute_genai_query_expect_error(MYSQL* client, const string& json_query) {
int test_single_async_request ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing single async GenAI request " ) ;
diag ( " Testing single async GenAI request : embedding for 1 document " ) ;
// Test 1: Single embedding request - should return immediately (async)
auto start = std : : chrono : : steady_clock : : now ( ) ;
@ -159,10 +171,6 @@ int test_single_async_request(MYSQL* client) {
ok ( success , " Single async embedding request succeeds " ) ;
test_count + + ;
// Note: For async, the query returns quickly but the actual processing
// happens in the worker thread. We can't easily test the non-blocking
// behavior from a single connection, but we can verify it works.
diag ( " Async request completed in %ld ms " , elapsed ) ;
return test_count ;
@ -177,11 +185,12 @@ int test_single_async_request(MYSQL* client) {
int test_sequential_async_requests ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing multiple sequential async requests " ) ;
diag ( " Testing multiple sequential async requests : 5 embeddings followed by 5 reranks " ) ;
// Test 1: Send 5 sequential embedding requests
int success_count = 0 ;
for ( int i = 0 ; i < 5 ; i + + ) {
diag ( " Sequential embedding %d/5 " , i + 1 ) ;
string json = R " ({ " type " : " embed " , " documents " : [ " Sequential test document ) " +
std : : to_string ( i ) + R " ( " ] } ) " ;
if ( execute_genai_query ( client , json , 1 ) ) {
@ -195,6 +204,7 @@ int test_sequential_async_requests(MYSQL* client) {
// Test 2: Send 5 sequential rerank requests
success_count = 0 ;
for ( int i = 0 ; i < 5 ; i + + ) {
diag ( " Sequential rerank %d/5 " , i + 1 ) ;
string json = R " ({
" type " : " rerank " ,
" query " : " Sequential test query ) " + std : : to_string ( i ) + R " ( " ,
@ -220,7 +230,7 @@ int test_sequential_async_requests(MYSQL* client) {
int test_batch_async_requests ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing batch async requests " ) ;
diag ( " Testing batch async requests : 10 documents in a single request " ) ;
// Test 1: Batch embedding with 10 documents
string json = R " ({ " type " : " embed " , " documents " : [) " ;
@ -230,6 +240,7 @@ int test_batch_async_requests(MYSQL* client) {
}
json + = " ]} " ;
diag ( " Executing batch embedding (10 docs) " ) ;
ok ( execute_genai_query ( client , json , 10 ) ,
" Batch embedding with 10 documents returns 10 rows " ) ;
test_count + + ;
@ -245,6 +256,7 @@ int test_batch_async_requests(MYSQL* client) {
}
json + = " ]} " ;
diag ( " Executing batch rerank (10 docs) " ) ;
ok ( execute_genai_query ( client , json , 10 ) ,
" Batch rerank with 10 documents returns 10 rows " ) ;
test_count + + ;
@ -261,11 +273,12 @@ int test_batch_async_requests(MYSQL* client) {
int test_mixed_requests ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing mixed embedding and rerank requests " ) ;
diag ( " Testing mixed embedding and rerank requests interleaved " ) ;
// Test 1: Interleave embedding and rerank requests
int success_count = 0 ;
for ( int i = 0 ; i < 3 ; i + + ) {
diag ( " Mixed pair %d/3 " , i + 1 ) ;
// Embedding
string json = R " ({ " type " : " embed " , " documents " : [ " Mixed test ) " +
std : : to_string ( i ) + R " ( " ] } ) " ;
@ -299,13 +312,14 @@ int test_mixed_requests(MYSQL* client) {
int test_request_response_matching ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing request/response matching " ) ;
diag ( " Testing request/response matching by varying document counts " ) ;
// Test 1: Send requests with different document counts and verify
std : : vector < int > doc_counts = { 1 , 3 , 5 , 7 } ;
int success_count = 0 ;
for ( int doc_count : doc_counts ) {
diag ( " Testing request with %d documents " , doc_count ) ;
string json = R " ({ " type " : " embed " , " documents " : [) " ;
for ( int i = 0 ; i < doc_count ; i + + ) {
if ( i > 0 ) json + = " , " ;
@ -335,39 +349,45 @@ int test_request_response_matching(MYSQL* client) {
int test_async_error_handling ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing async error handling " ) ;
diag ( " Testing async error handling for invalid inputs " ) ;
// Test 1: Invalid JSON - should return error immediately
diag ( " Case 1: Invalid JSON (unterminated array) " ) ;
string json = R " ({ " type " : " embed " , " documents " : [) " ;
ok ( execute_genai_query_expect_error ( client , json ) ,
" Invalid JSON returns error in async mode " ) ;
test_count + + ;
// Test 2: Missing documents array
diag ( " Case 2: Missing 'documents' array " ) ;
json = R " ({ " type " : " embed " }) " ;
ok ( execute_genai_query_expect_error ( client , json ) ,
" Missing documents array returns error in async mode " ) ;
test_count + + ;
// Test 3: Empty documents array
diag ( " Case 3: Empty 'documents' array " ) ;
json = R " ({ " type " : " embed " , " documents " : []}) " ;
ok ( execute_genai_query_expect_error ( client , json ) ,
" Empty documents array returns error in async mode " ) ;
test_count + + ;
// Test 4: Rerank without query
diag ( " Case 4: Rerank without 'query' field " ) ;
json = R " ({ " type " : " rerank " , " documents " : [ " doc1 " ]}) " ;
ok ( execute_genai_query_expect_error ( client , json ) ,
" Rerank without query returns error in async mode " ) ;
test_count + + ;
// Test 5: Unknown operation type
diag ( " Case 5: Unknown 'type' field " ) ;
json = R " ({ " type " : " unknown " , " documents " : [ " doc1 " ]}) " ;
ok ( execute_genai_query_expect_error ( client , json ) ,
" Unknown operation type returns error in async mode " ) ;
test_count + + ;
// Test 6: Verify connection still works after errors
diag ( " Case 6: Connection recovery after errors " ) ;
json = R " ({ " type " : " embed " , " documents " : [ " Recovery test " ]}) " ;
ok ( execute_genai_query ( client , json , 1 ) ,
" Connection still works after error requests " ) ;
@ -385,33 +405,38 @@ int test_async_error_handling(MYSQL* client) {
int test_special_characters ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing special characters in async queries " ) ;
diag ( " Testing special characters in async queries (quotes, paths, unicode) " ) ;
// Test 1: Quotes and apostrophes
diag ( " Embedding with quotes and apostrophes " ) ;
string json = R " ({ " type " : " embed " , " documents " : [ " Test with \ " quotes \" and 'apostrophes' " ] } ) " ;
ok ( execute_genai_query ( client , json , 1 ) ,
" Embedding with quotes and apostrophes succeeds " ) ;
test_count + + ;
// Test 2: Backslashes
diag ( " Embedding with backslashes " ) ;
json = R " ({ " type " : " embed " , " documents " : [ " Path : C : \ \ Users \ \ test " ]}) " ;
ok ( execute_genai_query ( client , json , 1 ) ,
" Embedding with backslashes succeeds " ) ;
test_count + + ;
// Test 3: Newlines and tabs
diag ( " Embedding with newlines and tabs " ) ;
json = R " ({ " type " : " embed " , " documents " : [ " Line1 \ nLine2 \ tTabbed " ]}) " ;
ok ( execute_genai_query ( client , json , 1 ) ,
" Embedding with newlines and tabs succeeds " ) ;
test_count + + ;
// Test 4: Unicode characters
diag ( " Embedding with unicode characters " ) ;
json = R " ({ " type " : " embed " , " documents " : [ " Unicode : 你 好 世 界 🌍 🚀 " ]}) " ;
ok ( execute_genai_query ( client , json , 1 ) ,
" Embedding with unicode characters succeeds " ) ;
test_count + + ;
// Test 5: Rerank with special characters in query
diag ( " Rerank with quoted query " ) ;
json = R " ({
" type " : " rerank " ,
" query " : " What is \" SQL \" injection? " ,
@ -433,9 +458,10 @@ int test_special_characters(MYSQL* client) {
int test_large_documents ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing large documents in async mode " ) ;
diag ( " Testing large documents in async mode (5KB+) " ) ;
// Test 1: Single large document (several KB)
diag ( " Embedding 5KB document " ) ;
string large_doc ( 5000 , ' A ' ) ; // 5KB of 'A's
string json = R " ({ " type " : " embed " , " documents " : [ " ) " + large_doc + R " ( " ]}) " ;
ok ( execute_genai_query ( client , json , 1 ) ,
@ -443,6 +469,7 @@ int test_large_documents(MYSQL* client) {
test_count + + ;
// Test 2: Multiple large documents
diag ( " Embedding 5x1KB documents " ) ;
json = R " ({ " type " : " embed " , " documents " : [) " ;
for ( int i = 0 ; i < 5 ; i + + ) {
if ( i > 0 ) json + = " , " ;
@ -466,9 +493,10 @@ int test_large_documents(MYSQL* client) {
int test_top_n_parameter ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing top_n parameter in async mode " ) ;
diag ( " Testing top_n parameter in rerank requests " ) ;
// Test 1: top_n = 3 with 10 documents
diag ( " Rerank with top_n=3 (10 documents) " ) ;
string json = R " ({
" type " : " rerank " ,
" query " : " Test query " ,
@ -480,6 +508,7 @@ int test_top_n_parameter(MYSQL* client) {
test_count + + ;
// Test 2: top_n = 1
diag ( " Rerank with top_n=1 (3 documents) " ) ;
json = R " ({
" type " : " rerank " ,
" query " : " Test query " ,
@ -491,6 +520,7 @@ int test_top_n_parameter(MYSQL* client) {
test_count + + ;
// Test 3: top_n = 0 (return all)
diag ( " Rerank with top_n=0 (all 5 documents) " ) ;
json = R " ({
" type " : " rerank " ,
" query " : " Test query " ,
@ -513,9 +543,10 @@ int test_top_n_parameter(MYSQL* client) {
int test_columns_parameter ( MYSQL * client ) {
int test_count = 0 ;
diag ( " Testing columns parameter in async mode " ) ;
diag ( " Testing columns parameter in rerank requests " ) ;
// Test 1: columns = 2 (index and score only)
diag ( " Rerank with columns=2 (index, score) " ) ;
string json = R " ({
" type " : " rerank " ,
" query " : " Test query " ,
@ -527,6 +558,7 @@ int test_columns_parameter(MYSQL* client) {
test_count + + ;
// Test 2: columns = 3 (index, score, document) - default
diag ( " Rerank with columns=3 (index, score, document) " ) ;
json = R " ({
" type " : " rerank " ,
" query " : " Test query " ,
@ -538,6 +570,7 @@ int test_columns_parameter(MYSQL* client) {
test_count + + ;
// Test 3: Invalid columns value
diag ( " Rerank with invalid columns=5 " ) ;
json = R " ({
" type " : " rerank " ,
" query " : " Test query " ,
@ -564,7 +597,7 @@ int test_concurrent_connections(const char* host, const char* username,
const char * password , int port ) {
int test_count = 0 ;
diag ( " Testing concurrent requests from multiple connections " ) ;
diag ( " Testing concurrent requests from multiple connections (3 concurrent) " ) ;
// Create 3 separate connections
const int num_conns = 3 ;
@ -584,6 +617,7 @@ int test_concurrent_connections(const char* host, const char* username,
conns [ i ] = NULL ;
continue ;
}
diag ( " Connection %d connected " , i ) ;
}
// Count successful connections
@ -605,6 +639,7 @@ int test_concurrent_connections(const char* host, const char* username,
}
// Send requests from all connections concurrently
diag ( " Launching %d concurrent worker threads " , num_conns ) ;
std : : atomic < int > success_count { 0 } ;
std : : vector < std : : thread > threads ;
@ -612,6 +647,7 @@ int test_concurrent_connections(const char* host, const char* username,
threads . push_back ( std : : thread ( [ & , i ] ( ) {
string json = R " ({ " type " : " embed " , " documents " : [ " Concurrent test ) " +
std : : to_string ( i ) + R " ( " ] } ) " ;
diag ( " Thread %d executing query " , i ) ;
if ( execute_genai_query ( conns [ i ] , json , 1 ) ) {
success_count + + ;
}
@ -623,6 +659,7 @@ int test_concurrent_connections(const char* host, const char* username,
t . join ( ) ;
}
diag ( " All %d threads completed. Successes: %d " , num_conns , success_count . load ( ) ) ;
ok ( success_count = = num_conns ,
" All %d concurrent requests succeeded " , num_conns ) ;
test_count + + ;
@ -647,6 +684,16 @@ int main() {
return EXIT_FAILURE ;
}
diag ( " Starting genai_async-t " ) ;
diag ( " This test verifies the GenAI module's asynchronous architecture. " ) ;
diag ( " It tests: " ) ;
diag ( " - Concurrent execution of embedding and rerank requests. " ) ;
diag ( " - Internal communication via socketpair and epoll. " ) ;
diag ( " - Proper request/response matching across multiple connections. " ) ;
diag ( " - Error handling for invalid JSON or missing parameters. " ) ;
diag ( " - Handling of large documents and special characters. " ) ;
diag ( " Note: Requires genai-enabled=true and running backend AI services. " ) ;
// Initialize connections
MYSQL * admin = mysql_init ( NULL ) ;
if ( ! admin ) {
@ -662,18 +709,48 @@ int main() {
diag ( " Connected to ProxySQL admin interface at %s:%d " , cl . admin_host , cl . admin_port ) ;
MYSQL * client = mysql_init ( NULL ) ;
if ( ! client ) {
fprintf ( stderr , " File %s, line %d, Error: mysql_init failed \n " , __FILE__ , __LINE__ ) ;
mysql_close ( admin ) ;
return EXIT_FAILURE ;
// Set writable vector DB path to prevent crash during GenAI initialization
const char * vdb_path = " ./ai_features.db " ;
char query [ 256 ] ;
snprintf ( query , sizeof ( query ) , " UPDATE global_variables SET variable_value='%s' WHERE variable_name='genai-vector_db_path' " , vdb_path ) ;
diag ( " Admin: %s " , query ) ;
mysql_query ( admin , query ) ;
// Enable GenAI
diag ( " Admin: UPDATE global_variables SET variable_value='true' WHERE variable_name='genai-enabled' " ) ;
mysql_query ( admin , " UPDATE global_variables SET variable_value='true' WHERE variable_name='genai-enabled' " ) ;
diag ( " Admin: LOAD GENAI VARIABLES TO RUNTIME " ) ;
mysql_query ( admin , " LOAD GENAI VARIABLES TO RUNTIME " ) ;
// Wait for GenAI to initialize
diag ( " Waiting 2 seconds for GenAI threads to start... " ) ;
sleep ( 2 ) ;
MYSQL * client = NULL ;
int retry = 0 ;
while ( retry < 5 ) {
client = mysql_init ( NULL ) ;
if ( ! client ) {
fprintf ( stderr , " File %s, line %d, Error: mysql_init failed \n " , __FILE__ , __LINE__ ) ;
mysql_close ( admin ) ;
return EXIT_FAILURE ;
}
if ( mysql_real_connect ( client , cl . host , cl . username , cl . password ,
NULL , cl . port , NULL , 0 ) ) {
break ;
}
diag ( " Failed to connect to ProxySQL client (retry %d): %s " , retry , mysql_error ( client ) ) ;
mysql_close ( client ) ;
client = NULL ;
retry + + ;
sleep ( 1 ) ;
}
if ( ! mysql_real_connect ( client , cl . host , cl . username , cl . password ,
NULL , cl . port , NULL , 0 ) ) {
fprintf ( stderr , " File %s, line %d, Error: %s \n " , __FILE__ , __LINE__ , mysql_error ( client ) ) ;
if ( ! client ) {
fprintf ( stderr , " Failed to connect to ProxySQL client after 5 retries \n " ) ;
mysql_close ( admin ) ;
mysql_close ( client ) ;
return EXIT_FAILURE ;
}