diff --git a/genai_prototype/Makefile b/genai_prototype/Makefile index e1fa27fa2..249d5e180 100644 --- a/genai_prototype/Makefile +++ b/genai_prototype/Makefile @@ -3,7 +3,9 @@ CXX = g++ CXXFLAGS = -std=c++17 -Wall -Wextra -O2 -g -LDFLAGS = -lpthread +LDFLAGS = -lpthread -lcurl +CURL_CFLAGS = $(shell curl-config --cflags) +CURL_LDFLAGS = $(shell curl-config --libs) # Target executables TARGET_THREAD = genai_demo @@ -29,7 +31,7 @@ genai_demo: genai_demo.o genai_demo_event: genai_demo_event.o @echo "Linking genai_demo_event..." - $(CXX) genai_demo_event.o $(LDFLAGS) -o genai_demo_event + $(CXX) genai_demo_event.o $(CURL_LDFLAGS) $(LDFLAGS) -o genai_demo_event @echo "Build complete: genai_demo_event" # Compile source files @@ -39,7 +41,7 @@ genai_demo.o: genai_demo.cpp genai_demo_event.o: genai_demo_event.cpp @echo "Compiling $<..." - $(CXX) $(CXXFLAGS) -c $< -o $@ + $(CXX) $(CXXFLAGS) $(CURL_CFLAGS) -c $< -o $@ # Run the demos run: $(TARGET_THREAD) diff --git a/genai_prototype/genai_demo_event b/genai_prototype/genai_demo_event index b490baadc..c64ddff1c 100755 Binary files a/genai_prototype/genai_demo_event and b/genai_prototype/genai_demo_event differ diff --git a/genai_prototype/genai_demo_event.cpp b/genai_prototype/genai_demo_event.cpp index ad7c2d77b..ea6dc182f 100644 --- a/genai_prototype/genai_demo_event.cpp +++ b/genai_prototype/genai_demo_event.cpp @@ -1,69 +1,31 @@ /** * @file genai_demo_event.cpp - * @brief Event-driven demonstration of GenAI module architecture + * @brief Event-driven GenAI module POC with real llama-server integration * - * This program demonstrates an event-driven approach to testing the GenAI - * module, which is more realistic and provides better isolation than the - * thread-based approach. - * - * @par Key Differences from genai_demo.cpp - * - * - **Clients are objects, not threads**: Clients are managed by a main - * event loop instead of running in their own threads - * - * - **Randomized timing**: Clients are added and send requests at random - * intervals, simulating realistic traffic patterns - * - * - **Single epoll set**: Main loop monitors both client responses and - * uses timeouts for periodic tasks (adding clients, sending requests) - * - * - **Better observability**: Central event loop makes it easy to see - * queue depth, active clients, and overall system state + * This POC demonstrates the GenAI module architecture with: + * - Shared memory communication (passing pointers, not copying data) + * - Real embedding generation via llama-server HTTP API + * - Support for single or multiple documents per request + * - libcurl-based HTTP client for embedding API calls * * @par Architecture * - * ``` - * Main Event Loop - * ├─ Randomly add new clients (configurable probability) - * ├─ For each client: randomly send request if ready - * ├─ epoll_wait() for responses (with timeout) - * ├─ Process incoming responses - * ├─ Remove completed clients - * ├─ Print statistics periodically - * └─ Exit when duration elapsed or max clients completed - * ``` - * - * @par Client Lifecycle + * Client and GenAI module share the same process memory space. + * Documents and embeddings are passed by pointer to avoid copying. * - * ``` - * NEW → CONNECTED → IDLE → WAITING_FOR_RESPONSE → IDLE → ... → DONE - * ↑ │ - * └────────────────────────────────────┘ - * (after response, can send again) - * ``` + * @par Request Flow * - * @par Configuration - * - * The demo behavior can be configured via Config struct: - * - genai_workers: Number of GenAI worker threads - * - max_clients: Maximum number of concurrent clients - * - run_duration_seconds: How long to run the demo - * - client_add_probability: Chance to add a client per iteration - * - request_send_probability: Chance an idle client sends a request - * - min/max_requests_per_client: Range of requests per client - * - * @par Build and Run - * - * @code{.sh} - * # Compile - * g++ -std=c++17 -o genai_demo_event genai_demo_event.cpp -lpthread - * - * # Run - * ./genai_demo_event + * 1. Client allocates document(s) in its own memory + * 2. Client sends request with document pointers to GenAI + * 3. GenAI reads document pointers and accesses shared memory + * 4. GenAI calls llama-server via HTTP to get embeddings + * 5. GenAI allocates embedding result and passes pointer back to client + * 6. Client reads embedding from shared memory and displays length + * 7. Client waits for response before sending next request (ensures memory validity) * * @author ProxySQL Team - * @date 2025-01-08 - * @version 2.0 + * @date 2025-01-09 + * @version 3.0 - POC with real embeddings */ #include @@ -86,6 +48,10 @@ #include #include #include +#include +#include +#include +#include // Platform compatibility #ifndef EFD_CLOEXEC @@ -99,67 +65,145 @@ // Protocol Definitions // ============================================================================ +/** + * @enum Operation + * @brief GenAI operation types + */ +enum Operation : uint32_t { + OP_EMBEDDING = 0, ///< Generate embeddings for documents + OP_COMPLETION = 1, ///< Text completion (future) + OP_RAG = 2, ///< RAG query (future) +}; + +/** + * @struct Document + * @brief Document structure passed by pointer (shared memory) + * + * Client allocates this structure and passes its pointer to GenAI. + * GenAI reads the document directly from shared memory. + */ +struct Document { + const char* text; ///< Pointer to document text (owned by client) + size_t text_size; ///< Length of text in bytes + + Document() : text(nullptr), text_size(0) {} + + Document(const char* t, size_t s) : text(t), text_size(s) {} +}; + /** * @struct RequestHeader - * @brief Header structure for client requests to GenAI module + * @brief Header for GenAI requests * - * See genai_demo.cpp for full documentation. + * After this header, the client sends document_count pointers + * to Document structures (as uint64_t). */ struct RequestHeader { - uint64_t request_id; - uint32_t operation; - uint32_t input_size; - uint32_t flags; + uint64_t request_id; ///< Client's correlation ID + uint32_t operation; ///< Operation type (OP_EMBEDDING, etc.) + uint32_t document_count; ///< Number of documents (1 or more) + uint32_t flags; ///< Reserved for future use }; /** - * @struct ResponseHeader - * @brief Header structure for GenAI module responses to clients + * @struct EmbeddingResult + * @brief Embedding vector allocated by GenAI, read by client * - * See genai_demo.cpp for full documentation. + * GenAI allocates this and passes the pointer to client. + * Client reads the embedding and then frees it. */ -struct ResponseHeader { - uint64_t request_id; - uint32_t status_code; - uint32_t output_size; - uint32_t processing_time_ms; +struct EmbeddingResult { + float* data; ///< Pointer to embedding vector (owned by GenAI initially) + size_t size; ///< Number of floats in the embedding + + EmbeddingResult() : data(nullptr), size(0) {} + + ~EmbeddingResult() { + if (data) { + delete[] data; + data = nullptr; + } + } + + // Move constructor and assignment + EmbeddingResult(EmbeddingResult&& other) noexcept + : data(other.data), size(other.size) { + other.data = nullptr; + other.size = 0; + } + + EmbeddingResult& operator=(EmbeddingResult&& other) noexcept { + if (this != &other) { + if (data) delete[] data; + data = other.data; + size = other.size; + other.data = nullptr; + other.size = 0; + } + return *this; + } + + // Disable copy + EmbeddingResult(const EmbeddingResult&) = delete; + EmbeddingResult& operator=(const EmbeddingResult&) = delete; }; /** - * @enum Operation - * @brief Supported GenAI operations + * @struct ResponseHeader + * @brief Header for GenAI responses + * + * For embeddings: passes pointer to EmbeddingResult as uint64_t. */ -enum Operation { - OP_EMBEDDING = 0, - OP_COMPLETION = 1, - OP_RAG = 2 +struct ResponseHeader { + uint64_t request_id; ///< Echo client's request ID + uint32_t status_code; ///< 0=success, >0=error + uint32_t embedding_size; ///< Number of floats in embedding + uint32_t processing_time_ms;///< Time taken to process + uint64_t embedding_ptr; ///< Pointer to embedding data (as uint64_t) + uint32_t result_count; ///< Number of results (for multiple documents) }; // ============================================================================ -// GenAI Module (reused from genai_demo.cpp) +// GenAI Module // ============================================================================ /** * @class GenAIModule - * @brief Thread-pool based GenAI processing module - * - * This is the same GenAI module from genai_demo.cpp, providing - * the asynchronous request processing with thread pool. + * @brief Thread-pool based GenAI processing module with real embedding support * - * See genai_demo.cpp for detailed documentation. + * This module provides embedding generation via llama-server HTTP API. + * It uses a thread pool with epoll-based listener for async processing. */ class GenAIModule { public: + /** + * @struct Request + * @brief Internal request representation + */ struct Request { int client_fd; uint64_t request_id; uint32_t operation; - std::string input; + std::vector documents; ///< Document pointers from shared memory }; GenAIModule(int num_workers = 4) - : num_workers_(num_workers), running_(false) {} + : num_workers_(num_workers), running_(false) { + + // Initialize libcurl + curl_global_init(CURL_GLOBAL_ALL); + } + + ~GenAIModule() { + if (running_) { + stop(); + } + curl_global_cleanup(); + } + /** + * @brief Start the GenAI module (spawn threads) + */ void start() { running_ = true; @@ -190,8 +234,14 @@ public: listener_thread_ = std::thread([this]() { listener_loop(); }); std::cout << "[GenAI] Module started with " << num_workers_ << " workers\n"; + std::cout << "[GenAI] Embedding endpoint: http://127.0.0.1:8013/embedding\n"; } + /** + * @brief Register a client file descriptor with GenAI + * + * @param client_fd File descriptor to monitor (from socketpair) + */ void register_client(int client_fd) { std::lock_guard lock(clients_mutex_); @@ -202,46 +252,50 @@ public: ev.events = EPOLLIN; ev.data.fd = client_fd; if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &ev) < 0) { - perror("epoll_ctl client_fd"); + perror("epoll_ctl add client"); return; } client_fds_.insert(client_fd); } + /** + * @brief Stop the GenAI module + */ void stop() { running_ = false; uint64_t value = 1; write(event_fd_, &value, sizeof(value)); - queue_cv_.notify_all(); - if (listener_thread_.joinable()) { - listener_thread_.join(); - } + queue_cv_.notify_all(); for (auto& t : worker_threads_) { - if (t.joinable()) { - t.join(); - } + if (t.joinable()) t.join(); } - for (int fd : client_fds_) { - close(fd); + if (listener_thread_.joinable()) { + listener_thread_.join(); } - close(epoll_fd_); close(event_fd_); + close(epoll_fd_); std::cout << "[GenAI] Module stopped\n"; } + /** + * @brief Get current queue depth (for statistics) + */ size_t get_queue_size() const { std::lock_guard lock(queue_mutex_); return request_queue_.size(); } private: + /** + * @brief Listener loop - reads requests from clients via epoll + */ void listener_loop() { const int MAX_EVENTS = 64; struct epoll_event events[MAX_EVENTS]; @@ -272,19 +326,30 @@ private: continue; } - std::string input(header.input_size, '\0'); + // Read document pointers (passed as uint64_t) + std::vector doc_ptrs(header.document_count); size_t total_read = 0; - while (total_read < header.input_size) { - ssize_t r = read(client_fd, &input[total_read], header.input_size - total_read); + while (total_read < header.document_count * sizeof(uint64_t)) { + ssize_t r = read(client_fd, + (char*)doc_ptrs.data() + total_read, + header.document_count * sizeof(uint64_t) - total_read); if (r <= 0) break; total_read += r; } + // Build request with document pointers (shared memory) Request req; req.client_fd = client_fd; req.request_id = header.request_id; req.operation = header.operation; - req.input = std::move(input); + req.documents.reserve(header.document_count); + + for (uint32_t i = 0; i < header.document_count; i++) { + Document* doc = reinterpret_cast(doc_ptrs[i]); + if (doc && doc->text) { + req.documents.push_back(*doc); + } + } { std::lock_guard lock(queue_mutex_); @@ -296,6 +361,138 @@ private: } } + /** + * @brief Callback function for libcurl to handle HTTP response + */ + static size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp) { + size_t totalSize = size * nmemb; + std::string* response = static_cast(userp); + response->append(static_cast(contents), totalSize); + return totalSize; + } + + /** + * @brief Call llama-server embedding API via libcurl + * + * @param text Document text to embed + * @return EmbeddingResult containing the embedding vector + */ + EmbeddingResult call_llama_embedding(const std::string& text) { + EmbeddingResult result; + CURL* curl = curl_easy_init(); + + if (!curl) { + std::cerr << "[Worker] Failed to initialize curl\n"; + return result; + } + + // Build JSON request + std::stringstream json; + json << "{\"input\":\""; + + // Escape JSON special characters + for (char c : text) { + switch (c) { + case '"': json << "\\\""; break; + case '\\': json << "\\\\"; break; + case '\n': json << "\\n"; break; + case '\r': json << "\\r"; break; + case '\t': json << "\\t"; break; + default: json << c; break; + } + } + + json << "\"}"; + + std::string json_str = json.str(); + + // Configure curl + curl_easy_setopt(curl, CURLOPT_URL, "http://127.0.0.1:8013/embedding"); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_str.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + + std::string response_data; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_data); + + // Add content-type header + struct curl_slist* headers = nullptr; + headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + // Perform request + CURLcode res = curl_easy_perform(curl); + + if (res != CURLE_OK) { + std::cerr << "[Worker] curl_easy_perform() failed: " + << curl_easy_strerror(res) << "\n"; + } else { + // Parse JSON response to extract embedding + // Response format: [{"index":0,"embedding":[0.1,0.2,...]}] + size_t embedding_pos = response_data.find("\"embedding\":"); + if (embedding_pos != std::string::npos) { + // Find the array start + size_t array_start = response_data.find("[", embedding_pos); + if (array_start != std::string::npos) { + // Find matching bracket + size_t array_end = array_start; + int bracket_count = 0; + bool in_array = false; + + for (size_t i = array_start; i < response_data.size(); i++) { + if (response_data[i] == '[') { + bracket_count++; + in_array = true; + } else if (response_data[i] == ']') { + bracket_count--; + if (bracket_count == 0 && in_array) { + array_end = i; + break; + } + } + } + + // Parse the array of floats + std::string array_str = response_data.substr(array_start + 1, array_end - array_start - 1); + std::vector embedding; + std::stringstream ss(array_str); + std::string token; + + while (std::getline(ss, token, ',')) { + // Remove whitespace and "null" values + token.erase(0, token.find_first_not_of(" \t\n\r")); + token.erase(token.find_last_not_of(" \t\n\r") + 1); + + if (token == "null" || token.empty()) { + continue; + } + + try { + float val = std::stof(token); + embedding.push_back(val); + } catch (...) { + // Skip invalid values + } + } + + if (!embedding.empty()) { + result.size = embedding.size(); + result.data = new float[embedding.size()]; + std::copy(embedding.begin(), embedding.end(), result.data); + } + } + } + } + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + + return result; + } + + /** + * @brief Worker loop - processes requests from queue + */ void worker_loop(int worker_id) { while (running_) { Request req; @@ -314,21 +511,72 @@ private: request_queue_.pop(); } - unsigned int seed = req.request_id; - int sleep_ms = 100 + (rand_r(&seed) % 400); - - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); - - std::string output = "Processed: " + req.input; - - ResponseHeader resp; - resp.request_id = req.request_id; - resp.status_code = 0; - resp.output_size = output.size(); - resp.processing_time_ms = sleep_ms; - - write(req.client_fd, &resp, sizeof(resp)); - write(req.client_fd, output.data(), output.size()); + auto start_time = std::chrono::steady_clock::now(); + + // Process based on operation type + if (req.operation == OP_EMBEDDING) { + // For multiple documents, we'll process the first one for this POC + // TODO: Support batch embedding for multiple documents + if (!req.documents.empty()) { + const Document& doc = req.documents[0]; + std::string text(doc.text, doc.text_size); + + std::cout << "[Worker " << worker_id << "] Processing embedding for document (" + << doc.text_size << " bytes)\n"; + + EmbeddingResult embedding = call_llama_embedding(text); + + auto end_time = std::chrono::steady_clock::now(); + int processing_time_ms = std::chrono::duration_cast( + end_time - start_time).count(); + + // Prepare response + ResponseHeader resp; + resp.request_id = req.request_id; + resp.status_code = (embedding.data != nullptr) ? 0 : 1; + resp.embedding_size = embedding.size; + resp.processing_time_ms = processing_time_ms; + resp.embedding_ptr = reinterpret_cast(embedding.data); + resp.result_count = req.documents.size(); + + // Send response header + write(req.client_fd, &resp, sizeof(resp)); + + // The embedding data stays in shared memory (allocated by GenAI) + // Client will read it and then take ownership (client must free it) + embedding.data = nullptr; // Transfer ownership to client + } else { + // No documents + auto end_time = std::chrono::steady_clock::now(); + int processing_time_ms = std::chrono::duration_cast( + end_time - start_time).count(); + + ResponseHeader resp; + resp.request_id = req.request_id; + resp.status_code = 1; // Error + resp.embedding_size = 0; + resp.processing_time_ms = processing_time_ms; + resp.embedding_ptr = 0; + resp.result_count = 0; + + write(req.client_fd, &resp, sizeof(resp)); + } + } else { + // Unknown operation + auto end_time = std::chrono::steady_clock::now(); + int processing_time_ms = std::chrono::duration_cast( + end_time - start_time).count(); + + ResponseHeader resp; + resp.request_id = req.request_id; + resp.status_code = 1; // Error + resp.embedding_size = 0; + resp.processing_time_ms = processing_time_ms; + resp.embedding_ptr = 0; + resp.result_count = 0; + + write(req.client_fd, &resp, sizeof(resp)); + } } } @@ -351,41 +599,35 @@ private: /** * @struct Config - * @brief Configuration for the event-driven GenAI demo - * - * @var genai_workers - * Number of worker threads in the GenAI module pool - * - * @var max_clients - * Maximum number of concurrent client connections to create - * - * @var run_duration_seconds - * How long the demo should run before terminating - * - * @var client_add_probability - * Probability (0.0 to 1.0) of adding a new client per main loop iteration - * - * @var request_send_probability - * Probability (0.0 to 1.0) that an idle client sends a request per iteration - * - * @var min_requests_per_client - * Minimum number of requests each client must send before completing - * - * @var max_requests_per_client - * Maximum number of requests each client will send - * - * @var stats_print_interval_ms - * How often to print statistics (in milliseconds) + * @brief Configuration for the GenAI event-driven demo */ struct Config { int genai_workers = 4; - int max_clients = 15; - int run_duration_seconds = 20; - double client_add_probability = 0.15; // 15% chance per iteration - double request_send_probability = 0.08; // 8% chance per idle client (more spread out) - int min_requests_per_client = 5; - int max_requests_per_client = 15; - int stats_print_interval_ms = 500; + int max_clients = 5; // Reduced for real API calls + int run_duration_seconds = 30; + double client_add_probability = 0.10; + double request_send_probability = 0.15; + int min_documents_per_request = 1; + int max_documents_per_request = 3; + int stats_print_interval_ms = 1000; +}; + +// ============================================================================ +// Sample Documents +// ============================================================================ + +/** + * @brief Sample documents for testing embeddings + */ +const std::vector SAMPLE_DOCUMENTS = { + "The quick brown fox jumps over the lazy dog. This is a classic sentence that contains all letters of the alphabet.", + "Machine learning is a subset of artificial intelligence that enables systems to learn from data.", + "Embeddings convert text into numerical vectors that capture semantic meaning.", + "Natural language processing has revolutionized how computers understand human language.", + "Vector databases store embeddings for efficient similarity search and retrieval.", + "Transformers have become the dominant architecture for modern natural language processing tasks.", + "Large language models demonstrate remarkable capabilities in text generation and comprehension.", + "Semantic search uses embeddings to find content based on meaning rather than keyword matching." }; // ============================================================================ @@ -394,67 +636,21 @@ struct Config { /** * @class Client - * @brief Event-driven client for GenAI module testing - * - * Unlike the thread-based Client in genai_demo.cpp, this client is - * designed to be managed by a main event loop. It maintains internal - * state and processes events incrementally. - * - * @par State Machine - * - * ``` - * NEW → CONNECTED → IDLE → WAITING_FOR_RESPONSE → IDLE → ... → DONE - * ↑ │ - * └────────────────────────────────────┘ - * (after response, can send again) - * ``` - * - * @par Usage Pattern - * - * @code{.cpp} - * // Create client - * Client* client = new Client(id, config); + * @brief Client that sends embedding requests to GenAI module * - * // Connect to GenAI - * client->connect(genai_module); - * - * // In main event loop: - * if (client->can_send_request() && random() < threshold) { - * client->send_request(); - * } - * - * // After epoll event: - * if (client->has_response()) { - * client->process_response(); - * } - * - * // Check if done - * if (client->is_done()) { - * delete client; - * } - * @endcode + * The client allocates documents and passes pointers to GenAI (shared memory). + * Client waits for response before sending next request (ensures memory validity). */ class Client { public: - - /** - * @enum State - * @brief Client state for state machine - */ enum State { - NEW, ///< Client just created, not connected - CONNECTED, ///< Connected to GenAI, ready to send - IDLE, ///< Ready to send next request - WAITING_FOR_RESPONSE, ///< Request sent, waiting for response - DONE ///< All requests completed + NEW, + CONNECTED, + IDLE, + WAITING_FOR_RESPONSE, + DONE }; - /** - * @brief Construct a Client with specified ID and configuration - * - * @param id Unique identifier for this client - * @param config Configuration determining client behavior - */ Client(int id, const Config& config) : id_(id), config_(config), @@ -465,29 +661,25 @@ public: requests_sent_(0), total_requests_(0), responses_received_(0), - last_send_time_(std::chrono::steady_clock::now()), - last_response_time_(std::chrono::steady_clock::now()) { + owned_embedding_(nullptr) { - // Randomize total requests for this client std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> dist( - config_.min_requests_per_client, - config_.max_requests_per_client + config_.min_documents_per_request, + config_.max_documents_per_request ); total_requests_ = dist(gen); } - /** - * @brief Connect this client to the GenAI module - * - * Creates a socketpair and registers with GenAI module. - * - * @param genai Reference to the GenAI module - * - * @post state_ is CONNECTED - * @post read_fd_ and genai_fd_ are set - */ + ~Client() { + close(); + // Clean up any owned embedding + if (owned_embedding_) { + delete[] owned_embedding_; + } + } + void connect(GenAIModule& genai) { int fds[2]; if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { @@ -505,59 +697,61 @@ public: genai.register_client(genai_fd_); // GenAI gets the other end - state_ = IDLE; // Ready to send requests + state_ = IDLE; std::cout << "[" << id_ << "] Connected (will send " << total_requests_ << " requests)\n"; } - /** - * @brief Check if this client can send a request - * - * @return true if client is in IDLE state and can send - */ bool can_send_request() const { return state_ == IDLE; } - /** - * @brief Send a request to the GenAI module - * - * @pre state_ is IDLE - * @post state_ is WAITING_FOR_RESPONSE - */ void send_request() { if (state_ != IDLE) return; - std::string input = "Client" + std::to_string(id_) + " req#" + - std::to_string(requests_sent_ + 1); + // Allocate documents for this request (owned by client until response) + current_documents_.clear(); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> doc_dist(0, SAMPLE_DOCUMENTS.size() - 1); + std::uniform_int_distribution<> count_dist(1, 3); + + int num_docs = count_dist(gen); + for (int i = 0; i < num_docs; i++) { + const std::string& sample_text = SAMPLE_DOCUMENTS[doc_dist(gen)]; + current_documents_.push_back(Document(sample_text.c_str(), sample_text.size())); + } + uint64_t request_id = next_request_id_++; RequestHeader req; req.request_id = request_id; req.operation = OP_EMBEDDING; - req.input_size = input.size(); + req.document_count = current_documents_.size(); req.flags = 0; + // Send request header write(read_fd_, &req, sizeof(req)); - write(read_fd_, input.data(), input.size()); + + // Send document pointers (as uint64_t) + std::vector doc_ptrs; + doc_ptrs.reserve(current_documents_.size()); + for (const auto& doc : current_documents_) { + doc_ptrs.push_back(reinterpret_cast(&doc)); + } + write(read_fd_, doc_ptrs.data(), doc_ptrs.size() * sizeof(uint64_t)); pending_requests_[request_id] = std::chrono::steady_clock::now(); requests_sent_++; - last_send_time_ = std::chrono::steady_clock::now(); state_ = WAITING_FOR_RESPONSE; std::cout << "[" << id_ << "] Sent request " << request_id - << " (" << requests_sent_ << "/" << total_requests_ << ")\n"; + << " with " << current_documents_.size() << " document(s) (" + << requests_sent_ << "/" << total_requests_ << ")\n"; } - /** - * @brief Check if this client has a response ready to process - * - * Non-blocking read to check for response. - * - * @return true if response was received and processed - */ bool has_response() { if (state_ != WAITING_FOR_RESPONSE) { return false; @@ -570,15 +764,6 @@ public: return false; } - // Read output data - std::string output(resp.output_size, '\0'); - size_t total_read = 0; - while (total_read < resp.output_size) { - ssize_t r = read(read_fd_, &output[total_read], resp.output_size - total_read); - if (r <= 0) break; - total_read += r; - } - auto it = pending_requests_.find(resp.request_id); if (it != pending_requests_.end()) { auto start_time = it->second; @@ -586,14 +771,31 @@ public: auto duration = std::chrono::duration_cast( end_time - start_time).count(); - std::cout << "[" << id_ << "] Received response " << resp.request_id - << " (rtt=" << duration << "ms, proc=" << resp.processing_time_ms << "ms)\n"; + if (resp.status_code == 0 && resp.embedding_size > 0) { + // Get embedding pointer from shared memory + float* embedding_ptr = reinterpret_cast(resp.embedding_ptr); + + std::cout << "[" << id_ << "] Received response " << resp.request_id + << " (rtt=" << duration << "ms, proc=" << resp.processing_time_ms + << "ms, embedding_size=" << resp.embedding_size << " floats)\n"; + + // Take ownership of the embedding + if (owned_embedding_) { + delete[] owned_embedding_; + } + owned_embedding_ = embedding_ptr; + } else { + std::cout << "[" << id_ << "] Received response " << resp.request_id + << " (rtt=" << duration << "ms, status=ERROR)\n"; + } pending_requests_.erase(it); } responses_received_++; - last_response_time_ = std::chrono::steady_clock::now(); + + // Clean up current documents (safe now that response is received) + current_documents_.clear(); // Check if we should send more requests or are done if (requests_sent_ >= total_requests_) { @@ -605,36 +807,18 @@ public: return true; } - /** - * @brief Check if this client is done (all requests completed) - * - * @return true if state_ is DONE - */ bool is_done() const { return state_ == DONE; } - /** - * @brief Get the file descriptor for monitoring responses - * - * @return read_fd_ for epoll monitoring - */ int get_read_fd() const { return read_fd_; } - /** - * @brief Get client ID - * - * @return Client's unique identifier - */ int get_id() const { return id_; } - /** - * @brief Close connection and clean up - */ void close() { if (read_fd_ >= 0) ::close(read_fd_); if (genai_fd_ >= 0) ::close(genai_fd_); @@ -642,11 +826,6 @@ public: genai_fd_ = -1; } - /** - * @brief Get current state as string - * - * @return String representation of state_ - */ const char* get_state_string() const { switch (state_) { case NEW: return "NEW"; @@ -659,20 +838,20 @@ public: } private: - int id_; ///< Client identifier - Config config_; ///< Configuration - State state_; ///< Current state + int id_; + Config config_; + State state_; - int read_fd_; ///< FD for reading responses - int genai_fd_; ///< FD for writing requests + int read_fd_; + int genai_fd_; - uint64_t next_request_id_; ///< Next request ID to use - int requests_sent_; ///< Number of requests sent - int total_requests_; ///< Total requests to send - int responses_received_; ///< Number of responses received + uint64_t next_request_id_; + int requests_sent_; + int total_requests_; + int responses_received_; - std::chrono::steady_clock::time_point last_send_time_; - std::chrono::steady_clock::time_point last_response_time_; + std::vector current_documents_; ///< Documents for current request + float* owned_embedding_; ///< Embedding received from GenAI (owned by client) std::unordered_map pending_requests_; }; @@ -681,43 +860,20 @@ private: // Main // ============================================================================ -/** - * @brief Main entry point for event-driven GenAI demonstration - * - * Creates a single event loop that: - * - Randomly adds clients over time - * - Randomly sends requests from idle clients - * - Monitors all client FDs for responses via epoll - * - Prints statistics periodically - * - Runs for a configurable duration - * - * @par Event Loop Flow - * - * 1. Check if we should add a new client (random chance) - * 2. For each client: randomly send request if idle - * 3. epoll_wait() with timeout for: - * - Client responses - * - Periodic tasks (add client, send request, print stats) - * 4. Process any responses received - * 5. Remove completed clients - * 6. Print stats periodically - * 7. Exit when duration elapsed or max clients completed - * - * @return 0 on success - */ int main() { - std::cout << "=== GenAI Module Event-Driven Demonstration ===\n\n"; + std::cout << "=== GenAI Module Event-Driven POC ===\n"; + std::cout << "Real embedding generation via llama-server\n\n"; Config config; - std::cout << "Configuration:\n"; std::cout << " GenAI workers: " << config.genai_workers << "\n"; std::cout << " Max clients: " << config.max_clients << "\n"; std::cout << " Run duration: " << config.run_duration_seconds << "s\n"; std::cout << " Client add probability: " << config.client_add_probability << "\n"; std::cout << " Request send probability: " << config.request_send_probability << "\n"; - std::cout << " Requests per client: " << config.min_requests_per_client - << "-" << config.max_requests_per_client << "\n\n"; + std::cout << " Documents per request: " << config.min_documents_per_request + << "-" << config.max_documents_per_request << "\n"; + std::cout << " Sample documents: " << SAMPLE_DOCUMENTS.size() << "\n\n"; // Create and start GenAI module GenAIModule genai(config.genai_workers); @@ -812,7 +968,7 @@ int main() { const int MAX_EVENTS = 64; struct epoll_event events[MAX_EVENTS]; - int timeout_ms = 100; // 100ms timeout for periodic checks + int timeout_ms = 100; int nfds = epoll_wait(main_epoll_fd, events, MAX_EVENTS, timeout_ms); // --------------------------------------------------------