From 211179bdcb29f78e0a5cd80aa984fe2aeceac128 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Mon, 26 Jan 2026 21:43:38 +0500 Subject: [PATCH] rag_ingest: Rewrite to use MySQL protocol (Sqlite_Server) instead of SQLite3 API Convert rag_ingest from direct SQLite database access to MySQL protocol communication with SQLite Server. Major changes: - Replace SQLite3 direct API with MySQL/mariadb client library - Update connection flow: rag_ingest -> MySQL protocol -> SQLite Server -> RAG DB - Remove sqlite3 amalgamation dependency from rag_ingest target - Keep all ingestion functionality intact (chunking, FTS5, vec0 embeddings) Schema initialization: - Add init command to create RAG schema via MySQL protocol - Add ingest command to run data ingestion pipeline - Add query command for vector similarity search with embeddings --- RAG_POC/Makefile | 5 +- RAG_POC/rag_ingest.cpp | 3353 ++++++++++++++------------------- lib/proxy_sqlite3_symbols.cpp | 14 +- 3 files changed, 1471 insertions(+), 1901 deletions(-) diff --git a/RAG_POC/Makefile b/RAG_POC/Makefile index d97133a68..9a66bb123 100644 --- a/RAG_POC/Makefile +++ b/RAG_POC/Makefile @@ -6,14 +6,11 @@ ROOT_DIR := .. INCLUDES := \ -I$(ROOT_DIR)/deps/json \ -I$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/include \ - -I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 \ -I$(ROOT_DIR)/deps/curl/curl/include LIBDIRS := \ -L$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/libmariadb -SQLITE3_OBJ := $(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400/sqlite3.o - # Use static libcurl CURL_STATIC_LIB := $(ROOT_DIR)/deps/curl/curl/lib/.libs/libcurl.a @@ -27,7 +24,7 @@ SOURCES := rag_ingest.cpp all: $(TARGET) $(TARGET): $(SOURCES) - $(CXX) $(CXXFLAGS) $(INCLUDES) $(LIBDIRS) $(SQLITE3_OBJ) $^ -o $@ $(LIBS) + $(CXX) $(CXXFLAGS) $(INCLUDES) $(LIBDIRS) $^ -o $@ $(LIBS) clean: rm -f $(TARGET) diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index cbfce8262..1ea4932d3 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -1,9 +1,9 @@ /** * @file rag_ingest.cpp - * @brief ProxySQL RAG (Retrieval-Augmented Generation) Ingestion Tool + * @brief ProxySQL RAG (Retrieval-Augmented Generation) Ingestion Tool - MySQL Protocol Version * * @verbatim - * ProxySQL RAG Ingestion PoC (General-Purpose) + * ProxySQL RAG Ingestion PoC (General-Purpose) - MySQL Protocol Version * @endverbatim * * @section overview Overview @@ -13,9 +13,39 @@ * to configurable JSON specifications, chunks the content, builds full-text * search indexes, and optionally generates vector embeddings for semantic search. * + * @section architecture Architecture + * + * Two-Port Design: + * + *
+ *                     rag_ingest
+ *                         |
+ *         MySQL Protocol (mariadb client)
+ *                         |
+ *                         v
+ *              +------------------------+
+ *              | ProxySQL SQLite3 Server|  Port 6030 (default)
+ *              |   (MySQL Protocol      |
+ *              |    Gateway to SQLite)  |
+ *              +------------------------+
+ *                         |
+ *                         | SQLite engine
+ *                         v
+ *              +------------------------+
+ *              |   RAG Database         |
+ *              |   - rag_* tables       |
+ *              |   - FTS5 index         |
+ *              |   - vec0 index         |
+ *              +------------------------+
+ *
+ *              rag_sources table points to backend MySQL:
+ *              - backend_host: 127.0.0.1 (default)
+ *              - backend_port: 3306 (default)
+ * 
+ * * @section v0_features v0 Features * - * - Reads enabled sources from rag_sources table + * - Reads enabled sources from rag_sources table (via MySQL protocol to SQLite gateway) * - Connects to MySQL backend and fetches data using configurable SELECT queries * - Transforms rows using doc_map_json specification * - Chunks document bodies using configurable chunking parameters @@ -24,18 +54,9 @@ * - Skips documents that already exist (no upsert in v0) * - Supports incremental sync using watermark-based cursor tracking * - * @section future_plans Future Plans (v1+) - * - * - Add hash-based change detection for efficient updates - * - Support document updates (not just skip-if-exists) - * - Add PostgreSQL backend support - * - Add async embedding workers - * - Add operational metrics and monitoring - * * @section dependencies Dependencies * - * - sqlite3: For RAG index storage - * - mysqlclient / libmysqlclient: For MySQL backend connections + * - mysqlclient / mariadb-client: For MySQL protocol connections * - libcurl: For HTTP-based embedding providers (OpenAI-compatible) * - nlohmann/json: Single-header JSON library (json.hpp) * - libcrypt: For sha256_crypt_r weak alias (platform compatibility) @@ -44,35 +65,32 @@ * * @verbatim * g++ -std=c++17 -O2 rag_ingest.cpp -o rag_ingest \ - * -lsqlite3 -lmysqlclient -lcurl -lcrypt + * -lmysqlclient -lcurl -lcrypt * @endverbatim * * @section usage Usage * * @verbatim - * ./rag_ingest /path/to/rag_index.sqlite - * @endverbatim + * # Initialize schema (SQLite Server via MySQL protocol gateway) + * ./rag_ingest init --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db * - * The RAG_VEC0_EXT environment variable can be set to specify the path - * to the sqlite3-vec extension: + * # Run ingestion + * ./rag_ingest ingest --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db * - * @verbatim - * export RAG_VEC0_EXT=/usr/local/lib/sqlite3-vec.so - * ./rag_ingest /path/to/rag_index.sqlite + * # Short options + * ./rag_ingest init -h 127.0.0.1 -P 6030 -u root -p root -D rag_db + * ./rag_ingest ingest -h 127.0.0.1 -P 6030 -u root -p root -D rag_db * @endverbatim * - * @section architecture Architecture - * - * @subsection ingestion_flow Ingestion Flow + * @section ingestion_flow Ingestion Flow * *
- * 1. Open SQLite RAG index database
- * 2. Load sqlite3-vec extension (if RAG_VEC0_EXT is set)
- * 3. Load enabled sources from rag_sources table
- * 4. For each source:
+ * 1. Connect to SQLite Server (via MySQL protocol on port 6030)
+ * 2. Load enabled sources from rag_sources table
+ * 3. For each source:
  *    a. Parse chunking_json and embedding_json configurations
  *    b. Load sync cursor (watermark) from rag_sync_state
- *    c. Connect to MySQL backend
+ *    c. Connect to MySQL backend (configured in rag_sources)
  *    d. Build minimal SELECT query (only fetch needed columns)
  *    e. Add incremental filter based on watermark
  *    f. For each row:
@@ -85,137 +103,14 @@
  *          - Insert into rag_fts_chunks (FTS5)
  *          - If embedding enabled: generate and insert embedding
  *    g. Update sync cursor with max watermark value
- * 5. Commit transaction or rollback on error
+ * 4. Commit transaction or rollback on error
  * 
* - * @subsection json_specs JSON Configuration Specifications - * - * @subsubsection doc_map_json doc_map_json (Required) - * - * Defines how to transform a source row into a canonical document: - * - * @verbatim - * { - * "doc_id": { "format": "posts:{Id}" }, - * "title": { "concat": [ {"col":"Title"} ] }, - * "body": { "concat": [ {"col":"Body"} ] }, - * "metadata": { - * "pick": ["Id","Tags","Score","CreationDate"], - * "rename": {"CreationDate":"Created"} - * } - * } - * @endverbatim - * - * Fields: - * - doc_id.format: Template string with {ColumnName} placeholders - * - title.concat: Array of column references and literals - * - body.concat: Array of column references and literals - * - metadata.pick: Columns to include in metadata JSON - * - metadata.rename: Map of old_name -> new_name for metadata keys - * - * @subsubsection chunking_json chunking_json (Required) - * - * Defines how to split document bodies into chunks: - * - * @verbatim - * { - * "enabled": true, - * "unit": "chars", // v0 only supports "chars" - * "chunk_size": 4000, // Target chunk size in characters - * "overlap": 400, // Overlap between consecutive chunks - * "min_chunk_size": 800 // Minimum chunk size (avoid tiny tail chunks) - * } - * @endverbatim - * - * @subsubsection embedding_json embedding_json (Optional) - * - * Defines how to generate embeddings for chunks: - * - * @verbatim - * { - * "enabled": true, - * "dim": 1536, - * "model": "text-embedding-3-large", - * "provider": "openai", // "stub" or "openai" - * "api_base": "https://api.openai.com/v1", - * "api_key": "sk-...", - * "batch_size": 16, - * "timeout_ms": 20000, - * "input": { "concat": [ - * {"col":"Title"}, - * {"lit":"\nTags: "}, {"col":"Tags"}, - * {"lit":"\n\n"}, - * {"chunk_body": true} - * ]} - * } - * @endverbatim - * - * The concat array supports: - * - {"col":"ColumnName"}: Include column value - * - {"lit":"literal text"}: Include literal string - * - {"chunk_body":true}: Include the chunk body text - * - * @subsection concat_spec Concat Specification - * - * The concat specification is used in multiple places (title, body, embedding input). - * It's a JSON array of objects, each describing one part to concatenate: - * - * @verbatim - * [ - * {"col": "Title"}, // Use value from "Title" column - * {"lit": "\nTags: "}, // Use literal string - * {"col": "Tags"}, // Use value from "Tags" column - * {"lit": "\n\n"}, - * {"chunk_body": true} // Use chunk body (embedding only) - * ] - * @endverbatim - * - * @subsection sqlite3_vec sqlite3-vec Integration - * - * The sqlite3-vec extension provides vector similarity search capabilities. - * This program binds float32 arrays as BLOBs for the embedding column. - * - * The binding format may vary by sqlite3-vec build. If your build expects - * a different format, modify bind_vec_embedding() accordingly. - * - * @subsection sync_cursor Sync Cursor (Watermark) - * - * The sync cursor enables incremental ingestion by tracking the last processed - * value from a monotonic column (e.g., auto-increment ID, timestamp). - * - * Stored in rag_sync_state.cursor_json: - * @verbatim - * { - * "column": "Id", // Column name for watermark - * "value": 12345 // Last processed value - * } - * @endverbatim - * - * On next run, only rows WHERE column > value are fetched. - * - * @section error_handling Error Handling - * - * This program uses a "fail-fast" approach: - * - JSON parsing errors are fatal (invalid configuration) - * - Database connection errors are fatal - * - SQL execution errors are fatal - * - Errors during ingestion rollback the entire transaction - * - * All fatal errors call fatal() which prints the message and exits with code 1. - * - * @section threading Threading Model - * - * v0 is single-threaded. Future versions may support: - * - Concurrent source ingestion - * - Async embedding generation - * - Parallel chunk processing - * * @author ProxySQL Development Team - * @version 0.1.0 - * @date 2024 + * @version 0.2.0 (MySQL Protocol) + * @date 2026 */ -#include "sqlite3.h" #include "mysql.h" #include "crypt.h" #include "curl/curl.h" @@ -224,1931 +119,1343 @@ #include #include #include +#include #include #include #include #include #include +#include #include "json.hpp" using json = nlohmann::json; -/** - * @brief Weak alias for sha256_crypt_r function - * - * This weak symbol alias provides compatibility across platforms where - * sha256_crypt_r may or may not be available in libcrypt. If the - * symbol exists in libcrypt, it will be used. Otherwise, this - * implementation using crypt_r() will be linked. - * - * @param key The key/password to hash - * @param salt The salt string - * @param buffer Output buffer for the hashed result - * @param buflen Size of the output buffer - * @return Pointer to hashed result on success, nullptr on failure - * - * @note This is a workaround for library inconsistencies across platforms. - * The actual hashing is delegated to crypt_r(). - */ -extern "C" __attribute__((weak)) char *sha256_crypt_r( - const char *key, - const char *salt, - char *buffer, - int buflen) { - if (!key || !salt || !buffer || buflen <= 0) { - return nullptr; - } - struct crypt_data data; - std::memset(&data, 0, sizeof(data)); - char *res = crypt_r(key, salt, &data); - if (!res) { - return nullptr; - } - size_t len = std::strlen(res); - if (len + 1 > static_cast(buflen)) { - return nullptr; - } - std::memcpy(buffer, res, len + 1); - return buffer; -} - // =========================================================================== // Utility Functions // =========================================================================== -/** - * @brief Print fatal error message and exit - * - * This function prints an error message to stderr and terminates the - * program with exit code 1. It is used for unrecoverable errors where - * continuing execution is impossible or unsafe. - * - * @param msg The error message to print - * - * @note This function does not return. It calls std::exit(1). - */ static void fatal(const std::string& msg) { - std::cerr << "FATAL: " << msg << "\n"; - std::exit(1); + std::cerr << "FATAL: " << msg << "\n"; + std::exit(1); } -/** - * @brief Safely convert C string to std::string - * - * Converts a potentially null C string pointer to a std::string. - * If the pointer is null, returns an empty string. This prevents - * undefined behavior from constructing std::string(nullptr). - * - * @param p Pointer to C string (may be null) - * @return std::string containing the C string content, or empty string if p is null - */ -static std::string str_or_empty(const char* p) { - return p ? std::string(p) : std::string(); +// Helper: Convert float to hex string for SQLite BLOB (X'...') +// Handles endianness correctly for IEEE 754 float32 +static std::string float_to_hex_blob(float f) { + // Use memcpy for safe type-punning (avoids strict aliasing violation) + uint32_t bits = 0; + std::memcpy(&bits, &f, sizeof(float)); + + // Format as little-endian hex (matches typical x86_64 architecture) + char buf[16]; + snprintf(buf, sizeof(buf), "%02x%02x%02x%02x", + static_cast(bits & 0xFF), + static_cast((bits >> 8) & 0xFF), + static_cast((bits >> 16) & 0xFF), + static_cast((bits >> 24) & 0xFF)); + return std::string(buf); } +// =========================================================================== +// MySQL Connection Wrapper +// =========================================================================== + /** - * @brief Execute a SQL statement on SQLite - * - * Executes a SQL statement that does not return rows (e.g., INSERT, - * UPDATE, DELETE, PRAGMA). If an error occurs, it prints the error - * message and SQL to stderr. + * @brief MySQL connection wrapper for RAG database * - * @param db SQLite database connection - * @param sql SQL statement to execute - * @return int SQLite return code (SQLITE_OK on success, error code otherwise) + * Wraps MYSQL* connection with RAII and helper methods. + * The backend is SQLite, but accessed via MySQL protocol gateway. */ -static int sqlite_exec(sqlite3* db, const std::string& sql) { - char* err = nullptr; - int rc = sqlite3_exec(db, sql.c_str(), nullptr, nullptr, &err); - if (rc != SQLITE_OK) { - std::string e = err ? err : "(unknown sqlite error)"; - sqlite3_free(err); - std::cerr << "SQLite error: " << e << "\nSQL: " << sql << "\n"; - } - return rc; +struct MySQLDB { + MYSQL* conn = nullptr; + + // Default constructor + MySQLDB() = default; + + // RAII: prevent copying + MySQLDB(const MySQLDB&) = delete; + MySQLDB& operator=(const MySQLDB&) = delete; + + // Allow moving + MySQLDB(MySQLDB&& other) noexcept : conn(other.conn) { + other.conn = nullptr; + } + + MySQLDB& operator=(MySQLDB&& other) noexcept { + if (this != &other) { + if (conn) mysql_close(conn); + conn = other.conn; + other.conn = nullptr; + } + return *this; + } + + ~MySQLDB() { + if (conn) mysql_close(conn); + } + + /** + * @brief Connect to MySQL server + * @param host Server hostname or IP + * @param port Server port + * @param user Username + * @param pass Password + * @param db Database name + */ + void connect(const char* host, int port, const char* user, + const char* pass, const char* db) { + conn = mysql_init(nullptr); + if (!conn) fatal("mysql_init failed"); + + mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4"); + + if (!mysql_real_connect(conn, host, user, pass, db, port, nullptr, 0)) { + fatal(std::string("MySQL connect failed: ") + mysql_error(conn)); + } + } + + /** + * @brief Execute a simple SQL statement + * @param sql SQL statement to execute + */ + void execute(const char* sql) { + if (mysql_query(conn, sql) != 0) { + std::cerr << "MySQL error: " << mysql_error(conn) << "\nSQL: " << sql << "\n"; + fatal("Query failed"); + } + } + + /** + * @brief Execute SQL statement and return true on success, false on error + * @param sql SQL statement to execute + * @return true if successful, false otherwise + */ + bool try_execute(const char* sql) { + if (mysql_query(conn, sql) != 0) { + return false; + } + return true; + } + + /** + * @brief Execute query and return result + * @param sql SQL query to execute + * @return MYSQL_RES* Result set (caller must free with mysql_free_result) + */ + MYSQL_RES* query(const char* sql) { + if (mysql_query(conn, sql) != 0) { + fatal(std::string("MySQL query failed: ") + mysql_error(conn) + "\nSQL: " + sql); + } + MYSQL_RES* res = mysql_store_result(conn); + if (!res) { + fatal(std::string("mysql_store_result failed: ") + mysql_error(conn)); + } + return res; + } +}; + +// =========================================================================== +// Utility Functions +// =========================================================================== + +static std::string str_or_empty(const char* p) { + return p ? std::string(p) : std::string(); } -/** - * @brief Check if a string represents an integer - * - * Determines if the given string consists only of digits, optionally - * preceded by a minus sign. Used for watermark value type detection. - * - * @param s String to check - * @return true if string is a valid integer representation, false otherwise - */ static bool is_integer_string(const std::string& s) { - if (s.empty()) return false; - size_t i = 0; - if (s[0] == '-') { - if (s.size() == 1) return false; - i = 1; - } - for (; i < s.size(); i++) { - if (s[i] < '0' || s[i] > '9') return false; - } - return true; + if (s.empty()) return false; + size_t i = 0; + if (s[0] == '-') { + if (s.size() == 1) return false; + i = 1; + } + for (; i < s.size(); i++) { + if (s[i] < '0' || s[i] > '9') return false; + } + return true; } -/** - * @brief Escape single quotes in SQL string literals - * - * Doubles single quotes in a string for safe SQL literal construction. - * This is used for building incremental filter conditions. - * - * @param s Input string to escape - * @return std::string with single quotes doubled (e.g., "it's" -> "it''s") - * - * @note This is basic escaping. For user input, prefer parameter binding. - */ static std::string sql_escape_single_quotes(const std::string& s) { - std::string out; - out.reserve(s.size() + 8); - for (char c : s) { - if (c == '\'') out.push_back('\''); - out.push_back(c); - } - return out; + std::string out; + out.reserve(s.size() * 2); // Reserve more space for escapes + for (char c : s) { + if (c == '\'') { + out.push_back('\''); // Escape single quote as '' + out.push_back('\''); + } else if (c == '\\') { + out.push_back('\\'); // Escape backslash as \\ + out.push_back('\\'); + } else { + out.push_back(c); + } + } + return out; } -/** - * @brief Serialize JSON to compact string - * - * Converts a JSON object to a compact string representation without - * whitespace or pretty-printing. Used for efficient storage. - * - * @param j JSON object to serialize - * @return std::string Compact JSON representation - */ static std::string json_dump_compact(const json& j) { - return j.dump(); -} - -/** - * @brief Load sqlite3-vec extension if configured - * - * Loads the sqlite3-vec extension from the path specified in the - * RAG_VEC0_EXT environment variable. This is required for vector - * similarity search functionality. - * - * @param db SQLite database connection - * - * @note If RAG_VEC0_EXT is not set or empty, this function does nothing. - * The vec0 extension is only required when embedding generation is enabled. - */ -static void sqlite_load_vec_extension(sqlite3* db) { - const char* ext = std::getenv("RAG_VEC0_EXT"); - if (!ext || std::strlen(ext) == 0) return; - - sqlite3_enable_load_extension(db, 1); - char* err = nullptr; - int rc = sqlite3_load_extension(db, ext, nullptr, &err); - if (rc != SQLITE_OK) { - std::string e = err ? err : "(unknown error)"; - sqlite3_free(err); - fatal("Failed to load vec0 extension: " + e + " (" + std::string(ext) + ")"); - } + return j.dump(); } // =========================================================================== // Data Structures // =========================================================================== -/** - * @brief Configuration for a single RAG data source - * - * Represents all configuration needed to ingest data from a single - * external source into the RAG index. Loaded from the rag_sources table. - */ struct RagSource { - int source_id = 0; ///< Unique source identifier from database - std::string name; ///< Human-readable source name - int enabled = 0; ///< Whether this source is enabled (0/1) - - // Backend connection settings - std::string backend_type; ///< Type of backend ("mysql" in v0) - std::string host; ///< Backend server hostname or IP - int port = 3306; ///< Backend server port - std::string user; ///< Backend authentication username - std::string pass; ///< Backend authentication password - std::string db; ///< Backend database name - - // Source table settings - std::string table_name; ///< Name of the table to read from - std::string pk_column; ///< Primary key column name - std::string where_sql; ///< Optional WHERE clause for row filtering - - // Transformation configuration (JSON) - json doc_map_json; ///< Document transformation specification - json chunking_json; ///< Chunking configuration - json embedding_json; ///< Embedding configuration (may be null) + int source_id = 0; + std::string name; + int enabled = 0; + + std::string backend_type; + std::string host; + int port = 3306; + std::string user; + std::string pass; + std::string db; + + std::string table_name; + std::string pk_column; + std::string where_sql; + + json doc_map_json; + json chunking_json; + json embedding_json; }; -/** - * @brief Parsed chunking configuration - * - * Controls how document bodies are split into chunks for indexing. - * Chunks improve retrieval precision by matching smaller, more - * focused text segments. - */ struct ChunkingConfig { - bool enabled = true; ///< Whether chunking is enabled - std::string unit = "chars"; ///< Unit of measurement ("chars" in v0) - int chunk_size = 4000; ///< Target size of each chunk - int overlap = 400; ///< Overlap between consecutive chunks - int min_chunk_size = 800; ///< Minimum size to avoid tiny tail chunks + bool enabled = true; + std::string unit = "chars"; + int chunk_size = 4000; + int overlap = 400; + int min_chunk_size = 800; }; -/** - * @brief Parsed embedding configuration - * - * Controls how vector embeddings are generated for chunks. - * Embeddings enable semantic similarity search. - */ struct EmbeddingConfig { - bool enabled = false; ///< Whether embedding generation is enabled - int dim = 1536; ///< Vector dimension (model-specific) - std::string model = "unknown"; ///< Model name (for observability) - json input_spec; ///< Embedding input template {"concat":[...]} - std::string provider = "stub"; ///< Provider type: "stub" or "openai" - std::string api_base; ///< API endpoint URL (for "openai" provider) - std::string api_key; ///< API authentication key - int batch_size = 16; ///< Maximum inputs per API call (unused in v0) - int timeout_ms = 20000; ///< Request timeout in milliseconds + bool enabled = false; + int dim = 1536; + std::string model = "unknown"; + json input_spec; + std::string provider = "stub"; + std::string api_base; + std::string api_key; + int batch_size = 16; + int timeout_ms = 20000; }; -/** - * @brief Sync cursor for incremental ingestion - * - * Tracks the last processed watermark value for incremental sync. - * Only rows with watermark > cursor.value are fetched on subsequent runs. - */ struct SyncCursor { - std::string column; ///< Column name used for watermark - bool has_value = false; ///< Whether a cursor value has been set - bool numeric = false; ///< Whether the value is numeric (vs string) - std::int64_t num_value = 0; ///< Numeric watermark value (if numeric=true) - std::string str_value; ///< String watermark value (if numeric=false) + std::string column; + bool has_value = false; + bool numeric = false; + std::int64_t num_value = 0; + std::string str_value; }; -/** - * @brief Row representation from MySQL backend - * - * A map from column name to string value. All MySQL values are - * represented as strings for simplicity; conversion happens - * during document building. - */ typedef std::unordered_map RowMap; -/** - * @brief Pending embedding for batched processing - * - * Holds metadata for a chunk that needs embedding generation. - * Used to batch multiple chunks together for efficient API calls. - */ struct PendingEmbedding { - std::string chunk_id; ///< Unique chunk identifier (doc_id#index) - std::string doc_id; ///< Parent document identifier - int source_id; ///< Source identifier - std::string input_text; ///< Text to embed (already built via build_embedding_input) + std::string chunk_id; + std::string doc_id; + int source_id; + std::string input_text; }; // =========================================================================== // JSON Parsing Functions // =========================================================================== -/** - * @brief Parse chunking_json into ChunkingConfig struct - * - * Extracts and validates chunking configuration from JSON. - * Applies sensible defaults for missing or invalid values. - * - * @param j JSON object containing chunking configuration - * @return ChunkingConfig Parsed configuration with defaults applied - * - * @note Only "chars" unit is supported in v0. Other units fall back to "chars". - */ static ChunkingConfig parse_chunking_json(const json& j) { - ChunkingConfig cfg; - if (!j.is_object()) return cfg; - - if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); - if (j.contains("unit")) cfg.unit = j["unit"].get(); - if (j.contains("chunk_size")) cfg.chunk_size = j["chunk_size"].get(); - if (j.contains("overlap")) cfg.overlap = j["overlap"].get(); - if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get(); - - // Apply sanity checks and defaults - if (cfg.chunk_size <= 0) cfg.chunk_size = 4000; - if (cfg.overlap < 0) cfg.overlap = 0; - if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4; - if (cfg.min_chunk_size < 0) cfg.min_chunk_size = 0; - - // v0 only supports chars - if (cfg.unit != "chars") { - std::cerr << "WARN: chunking_json.unit=" << cfg.unit - << " not supported in v0. Falling back to chars.\n"; - cfg.unit = "chars"; - } - - return cfg; + ChunkingConfig cfg; + if (!j.is_object()) return cfg; + + if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); + if (j.contains("unit")) cfg.unit = j["unit"].get(); + if (j.contains("chunk_size")) cfg.chunk_size = j["chunk_size"].get(); + if (j.contains("overlap")) cfg.overlap = j["overlap"].get(); + if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get(); + + if (cfg.chunk_size <= 0) cfg.chunk_size = 4000; + if (cfg.overlap < 0) cfg.overlap = 0; + if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4; + if (cfg.min_chunk_size < 0) cfg.min_chunk_size = 0; + + if (cfg.unit != "chars") { + std::cerr << "WARN: chunking_json.unit=" << cfg.unit + << " not supported in v0. Falling back to chars.\n"; + cfg.unit = "chars"; + } + + return cfg; } -/** - * @brief Parse embedding_json into EmbeddingConfig struct - * - * Extracts and validates embedding configuration from JSON. - * Applies sensible defaults for missing or invalid values. - * - * @param j JSON object containing embedding configuration (may be null) - * @return EmbeddingConfig Parsed configuration with defaults applied - */ static EmbeddingConfig parse_embedding_json(const json& j) { - EmbeddingConfig cfg; - if (!j.is_object()) return cfg; - - if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); - if (j.contains("dim")) cfg.dim = j["dim"].get(); - if (j.contains("model")) cfg.model = j["model"].get(); - if (j.contains("input")) cfg.input_spec = j["input"]; - if (j.contains("provider")) cfg.provider = j["provider"].get(); - if (j.contains("api_base")) cfg.api_base = j["api_base"].get(); - if (j.contains("api_key")) cfg.api_key = j["api_key"].get(); - if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get(); - if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get(); - - // Apply defaults - if (cfg.dim <= 0) cfg.dim = 1536; - if (cfg.batch_size <= 0) cfg.batch_size = 16; - if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000; - return cfg; + EmbeddingConfig cfg; + if (!j.is_object()) return cfg; + + if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); + if (j.contains("dim")) cfg.dim = j["dim"].get(); + if (j.contains("model")) cfg.model = j["model"].get(); + if (j.contains("input")) cfg.input_spec = j["input"]; + if (j.contains("provider")) cfg.provider = j["provider"].get(); + if (j.contains("api_base")) cfg.api_base = j["api_base"].get(); + if (j.contains("api_key")) cfg.api_key = j["api_key"].get(); + if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get(); + if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get(); + + if (cfg.dim <= 0) cfg.dim = 1536; + if (cfg.batch_size <= 0) cfg.batch_size = 16; + if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000; + return cfg; } // =========================================================================== // Row Access Helpers // =========================================================================== -/** - * @brief Get value from RowMap by key - * - * Safely retrieves a value from a RowMap (column name -> value mapping). - * Returns std::nullopt if the key is not present, allowing the caller - * to distinguish between missing keys and empty values. - * - * @param row The RowMap to query - * @param key Column name to look up - * @return std::optional Value if present, nullopt otherwise - */ static std::optional row_get(const RowMap& row, const std::string& key) { - auto it = row.find(key); - if (it == row.end()) return std::nullopt; - return it->second; + auto it = row.find(key); + if (it == row.end()) return std::nullopt; + return it->second; } // =========================================================================== // Format String Template Engine // =========================================================================== -/** - * @brief Apply format template with row data substitution - * - * Replaces {ColumnName} placeholders in a format string with actual - * values from the row. Used for generating document IDs. - * - * Example: "posts:{Id}" with row["Id"] = "123" becomes "posts:123" - * - * @param fmt Format string with {ColumnName} placeholders - * @param row Row data for substitution - * @return std::string Formatted string with substitutions applied - * - * @note Unmatched '{' characters are treated as literals. - * Missing column names result in empty substitution. - */ static std::string apply_format(const std::string& fmt, const RowMap& row) { - std::string out; - out.reserve(fmt.size() + 32); - - for (size_t i = 0; i < fmt.size(); i++) { - char c = fmt[i]; - if (c == '{') { - size_t j = fmt.find('}', i + 1); - if (j == std::string::npos) { - // unmatched '{' -> treat as literal - out.push_back(c); - continue; - } - std::string col = fmt.substr(i + 1, j - (i + 1)); - auto v = row_get(row, col); - if (v.has_value()) out += v.value(); - i = j; // jump past '}' - } else { - out.push_back(c); + std::string out; + out.reserve(fmt.size() + 32); + + for (size_t i = 0; i < fmt.size(); i++) { + char c = fmt[i]; + if (c == '{') { + size_t j = fmt.find('}', i + 1); + if (j == std::string::npos) { + out.push_back(c); + continue; + } + std::string col = fmt.substr(i + 1, j - (i + 1)); + auto v = row_get(row, col); + if (v.has_value()) out += v.value(); + i = j; + } else { + out.push_back(c); + } } - } - return out; + return out; } // =========================================================================== // Concat Specification Evaluator // =========================================================================== -/** - * @brief Evaluate concat specification to build string - * - * Processes a concat specification (JSON array) to build a string by - * concatenating column values, literals, and optionally the chunk body. - * - * Supported concat elements: - * - {"col":"ColumnName"} - Include value from column - * - {"lit":"literal"} - Include literal string - * - {"chunk_body":true} - Include chunk body (only for embedding input) - * - * @param concat_spec JSON array with concat specification - * @param row Row data for column substitutions - * @param chunk_body Chunk body text (used if allow_chunk_body=true) - * @param allow_chunk_body Whether to allow chunk_body references - * @return std::string Concatenated result - */ static std::string eval_concat(const json& concat_spec, const RowMap& row, const std::string& chunk_body, bool allow_chunk_body) { - if (!concat_spec.is_array()) return ""; - - std::string out; - for (const auto& part : concat_spec) { - if (!part.is_object()) continue; - - if (part.contains("col")) { - // Column reference: {"col":"ColumnName"} - std::string col = part["col"].get(); - auto v = row_get(row, col); - if (v.has_value()) out += v.value(); - } else if (part.contains("lit")) { - // Literal string: {"lit":"some text"} - out += part["lit"].get(); - } else if (allow_chunk_body && part.contains("chunk_body")) { - // Chunk body reference: {"chunk_body":true} - bool yes = part["chunk_body"].get(); - if (yes) out += chunk_body; + if (!concat_spec.is_array()) return ""; + + std::string out; + for (const auto& part : concat_spec) { + if (!part.is_object()) continue; + + if (part.contains("col")) { + std::string col = part["col"].get(); + auto v = row_get(row, col); + if (v.has_value()) out += v.value(); + } else if (part.contains("lit")) { + out += part["lit"].get(); + } else if (allow_chunk_body && part.contains("chunk_body")) { + bool yes = part["chunk_body"].get(); + if (yes) out += chunk_body; + } } - } - return out; + return out; } // =========================================================================== // Metadata Builder // =========================================================================== -/** - * @brief Build metadata JSON from row using specification - * - * Creates a metadata JSON object by picking specified columns from - * the row and optionally renaming keys. - * - * @param meta_spec Metadata specification with "pick" and "rename" keys - * @param row Source row data - * @return json Metadata object with picked and renamed fields - * - * The meta_spec format: - * @verbatim - * { - * "pick": ["Col1", "Col2"], // Columns to include - * "rename": {"Col1": "col1"} // Old name -> new name mapping - * } - * @endverbatim - */ static json build_metadata(const json& meta_spec, const RowMap& row) { - json meta = json::object(); - - if (meta_spec.is_object()) { - // Pick specified fields - if (meta_spec.contains("pick") && meta_spec["pick"].is_array()) { - for (const auto& colv : meta_spec["pick"]) { - if (!colv.is_string()) continue; - std::string col = colv.get(); - auto v = row_get(row, col); - if (v.has_value()) meta[col] = v.value(); - } - } + json meta = json::object(); + + if (meta_spec.is_object()) { + if (meta_spec.contains("pick") && meta_spec["pick"].is_array()) { + for (const auto& colv : meta_spec["pick"]) { + if (!colv.is_string()) continue; + std::string col = colv.get(); + auto v = row_get(row, col); + if (v.has_value()) meta[col] = v.value(); + } + } - // Rename keys (must be done in two passes to avoid iterator invalidation) - if (meta_spec.contains("rename") && meta_spec["rename"].is_object()) { - std::vector> renames; - for (auto it = meta_spec["rename"].begin(); it != meta_spec["rename"].end(); ++it) { - if (!it.value().is_string()) continue; - renames.push_back({it.key(), it.value().get()}); - } - for (size_t i = 0; i < renames.size(); i++) { - const std::string& oldk = renames[i].first; - const std::string& newk = renames[i].second; - if (meta.contains(oldk)) { - meta[newk] = meta[oldk]; - meta.erase(oldk); + if (meta_spec.contains("rename") && meta_spec["rename"].is_object()) { + std::vector> renames; + for (auto it = meta_spec["rename"].begin(); it != meta_spec["rename"].end(); ++it) { + if (!it.value().is_string()) continue; + renames.push_back({it.key(), it.value().get()}); + } + for (size_t i = 0; i < renames.size(); i++) { + const std::string& oldk = renames[i].first; + const std::string& newk = renames[i].second; + if (meta.contains(oldk)) { + meta[newk] = meta[oldk]; + meta.erase(oldk); + } + } } - } } - } - return meta; + return meta; } // =========================================================================== // Text Chunking // =========================================================================== -/** - * @brief Split text into chunks based on character count - * - * Divides a text string into overlapping chunks of approximately - * chunk_size characters. Chunks overlap by 'overlap' characters to - * preserve context at boundaries. Tiny final chunks are appended - * to the previous chunk to avoid fragmentation. - * - * @param text Input text to chunk - * @param cfg Chunking configuration - * @return std::vector Vector of chunk strings - * - * @note If chunking is disabled, returns a single chunk containing the full text. - * @note If text is smaller than chunk_size, returns a single chunk. - */ static std::vector chunk_text_chars(const std::string& text, const ChunkingConfig& cfg) { - std::vector chunks; + std::vector chunks; - if (!cfg.enabled) { - chunks.push_back(text); - return chunks; - } + if (!cfg.enabled) { + chunks.push_back(text); + return chunks; + } - if ((int)text.size() <= cfg.chunk_size) { - chunks.push_back(text); - return chunks; - } + if ((int)text.size() <= cfg.chunk_size) { + chunks.push_back(text); + return chunks; + } - int step = cfg.chunk_size - cfg.overlap; - if (step <= 0) step = cfg.chunk_size; + int step = cfg.chunk_size - cfg.overlap; + if (step <= 0) step = cfg.chunk_size; - for (int start = 0; start < (int)text.size(); start += step) { - int end = start + cfg.chunk_size; - if (end > (int)text.size()) end = (int)text.size(); - int len = end - start; - if (len <= 0) break; + for (int start = 0; start < (int)text.size(); start += step) { + int end = start + cfg.chunk_size; + if (end > (int)text.size()) end = (int)text.size(); + int len = end - start; + if (len <= 0) break; - // Avoid tiny final chunk by appending it to the previous chunk - if (len < cfg.min_chunk_size && !chunks.empty()) { - chunks.back() += text.substr(start, len); - break; - } + if (len < cfg.min_chunk_size && !chunks.empty()) { + chunks.back() += text.substr(start, len); + break; + } - chunks.push_back(text.substr(start, len)); + chunks.push_back(text.substr(start, len)); - if (end == (int)text.size()) break; - } + if (end == (int)text.size()) break; + } - return chunks; + return chunks; } // =========================================================================== // MySQL Backend Functions // =========================================================================== -/** - * @brief Connect to MySQL backend or terminate - * - * Establishes a connection to a MySQL server using the provided - * source configuration. Sets charset to utf8mb4 for proper handling - * of Unicode content (e.g., emojis, international text). - * - * @param s Source configuration with connection parameters - * @return MYSQL* MySQL connection handle - * - * @note On failure, prints error message and calls fatal() to exit. - */ static MYSQL* mysql_connect_or_die(const RagSource& s) { - MYSQL* conn = mysql_init(nullptr); - if (!conn) fatal("mysql_init failed"); - - // Set utf8mb4 for safety with StackOverflow-like content - mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4"); - - if (!mysql_real_connect(conn, - s.host.c_str(), - s.user.c_str(), - s.pass.c_str(), - s.db.c_str(), - s.port, - nullptr, - 0)) { - std::string err = mysql_error(conn); - mysql_close(conn); - fatal("MySQL connect failed: " + err); - } - return conn; + MYSQL* conn = mysql_init(nullptr); + if (!conn) fatal("mysql_init failed"); + + mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4"); + + if (!mysql_real_connect(conn, + s.host.c_str(), + s.user.c_str(), + s.pass.c_str(), + s.db.c_str(), + s.port, + nullptr, + 0)) { + std::string err = mysql_error(conn); + mysql_close(conn); + fatal("MySQL connect failed: " + err); + } + return conn; } -/** - * @brief Convert MySQL result row to RowMap - * - * Transforms a raw MySQL row into a column-name -> value map. - * All values are converted to strings; NULL values become empty strings. - * - * @param res MySQL result set (for field metadata) - * @param row Raw MySQL row data - * @return RowMap Map of column names to string values - * - * @note Uses str_or_empty() to safely handle NULL column values. - */ static RowMap mysql_row_to_map(MYSQL_RES* res, MYSQL_ROW row) { - RowMap m; - unsigned int n = mysql_num_fields(res); - MYSQL_FIELD* fields = mysql_fetch_fields(res); - - for (unsigned int i = 0; i < n; i++) { - const char* name = fields[i].name; - const char* val = row[i]; - if (name) { - m[name] = str_or_empty(val); + RowMap m; + unsigned int n = mysql_num_fields(res); + MYSQL_FIELD* fields = mysql_fetch_fields(res); + + for (unsigned int i = 0; i < n; i++) { + const char* name = fields[i].name; + const char* val = row[i]; + if (name) { + m[name] = str_or_empty(val); + } } - } - return m; + return m; } // =========================================================================== // Column Collection // =========================================================================== -/** - * @brief Add string to vector if not already present - * - * Helper function to maintain a list of unique column names. - * Used when building the SELECT query to avoid duplicate columns. - * - * @param cols Vector of column names (modified in-place) - * @param c Column name to add (if not present) - */ static void add_unique(std::vector& cols, const std::string& c) { - for (size_t i = 0; i < cols.size(); i++) { - if (cols[i] == c) return; - } - cols.push_back(c); + for (size_t i = 0; i < cols.size(); i++) { + if (cols[i] == c) return; + } + cols.push_back(c); } -/** - * @brief Extract column names from concat specification - * - * Parses a concat specification and adds all referenced column names - * to the output vector. Used to build the minimal SELECT query. - * - * @param cols Vector of column names (modified in-place) - * @param concat_spec JSON concat specification to scan - */ static void collect_cols_from_concat(std::vector& cols, const json& concat_spec) { - if (!concat_spec.is_array()) return; - for (const auto& part : concat_spec) { - if (part.is_object() && part.contains("col") && part["col"].is_string()) { - add_unique(cols, part["col"].get()); + if (!concat_spec.is_array()) return; + for (const auto& part : concat_spec) { + if (part.is_object() && part.contains("col") && part["col"].is_string()) { + add_unique(cols, part["col"].get()); + } } - } } -/** - * @brief Collect all columns needed for source ingestion - * - * Analyzes doc_map_json and embedding_json to determine which columns - * must be fetched from the source table. Builds a minimal SELECT query - * by only including required columns. - * - * @param s Source configuration with JSON specifications - * @param ecfg Embedding configuration - * @return std::vector List of required column names - * - * @note The primary key column is always included. - * @note Columns in doc_id.format must be manually included in metadata.pick or concat. - */ static std::vector collect_needed_columns(const RagSource& s, const EmbeddingConfig& ecfg) { - std::vector cols; - add_unique(cols, s.pk_column); - - // title/body concat - if (s.doc_map_json.contains("title") && s.doc_map_json["title"].contains("concat")) - collect_cols_from_concat(cols, s.doc_map_json["title"]["concat"]); - if (s.doc_map_json.contains("body") && s.doc_map_json["body"].contains("concat")) - collect_cols_from_concat(cols, s.doc_map_json["body"]["concat"]); - - // metadata.pick - if (s.doc_map_json.contains("metadata") && s.doc_map_json["metadata"].contains("pick")) { - const auto& pick = s.doc_map_json["metadata"]["pick"]; - if (pick.is_array()) { - for (const auto& c : pick) if (c.is_string()) add_unique(cols, c.get()); + std::vector cols; + add_unique(cols, s.pk_column); + + if (s.doc_map_json.contains("title") && s.doc_map_json["title"].contains("concat")) + collect_cols_from_concat(cols, s.doc_map_json["title"]["concat"]); + if (s.doc_map_json.contains("body") && s.doc_map_json["body"].contains("concat")) + collect_cols_from_concat(cols, s.doc_map_json["body"]["concat"]); + + if (s.doc_map_json.contains("metadata") && s.doc_map_json["metadata"].contains("pick")) { + const auto& pick = s.doc_map_json["metadata"]["pick"]; + if (pick.is_array()) { + for (const auto& c : pick) if (c.is_string()) add_unique(cols, c.get()); + } } - } - // embedding input concat (optional) - if (ecfg.enabled && ecfg.input_spec.is_object() && ecfg.input_spec.contains("concat")) { - collect_cols_from_concat(cols, ecfg.input_spec["concat"]); - } + if (ecfg.enabled && ecfg.input_spec.is_object() && ecfg.input_spec.contains("concat")) { + collect_cols_from_concat(cols, ecfg.input_spec["concat"]); + } - return cols; + return cols; } -/** - * @brief Build SELECT SQL query for source - * - * Constructs a SELECT query that fetches only the required columns - * from the source table, with optional WHERE clause combining the - * source's where_sql and an incremental filter. - * - * @param s Source configuration - * @param cols List of column names to SELECT - * @param extra_filter Additional WHERE clause (e.g., incremental filter) - * @return std::string Complete SELECT SQL statement - */ static std::string build_select_sql(const RagSource& s, const std::vector& cols, const std::string& extra_filter) { - std::string sql = "SELECT "; - for (size_t i = 0; i < cols.size(); i++) { - if (i) sql += ", "; - sql += "`" + cols[i] + "`"; - } - sql += " FROM `" + s.table_name + "`"; - if (!s.where_sql.empty() || !extra_filter.empty()) { - sql += " WHERE "; - if (!s.where_sql.empty()) { - sql += "(" + s.where_sql + ")"; - if (!extra_filter.empty()) sql += " AND "; + std::string sql = "SELECT "; + for (size_t i = 0; i < cols.size(); i++) { + if (i) sql += ", "; + sql += "`" + cols[i] + "`"; } - if (!extra_filter.empty()) sql += "(" + extra_filter + ")"; - } - return sql; + sql += " FROM `" + s.table_name + "`"; + if (!s.where_sql.empty() || !extra_filter.empty()) { + sql += " WHERE "; + if (!s.where_sql.empty()) { + sql += "(" + s.where_sql + ")"; + if (!extra_filter.empty()) sql += " AND "; + } + if (!extra_filter.empty()) sql += "(" + extra_filter + ")"; + } + return sql; } // =========================================================================== // Sync Cursor (Watermark) Management // =========================================================================== -// Forward declarations -static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql); -static void bind_text(sqlite3_stmt* st, int idx, const std::string& v); +static json load_sync_cursor_json(MySQLDB& db, int source_id) { + char sql[256]; + snprintf(sql, sizeof(sql), "SELECT cursor_json FROM rag_sync_state WHERE source_id=%d", source_id); -/** - * @brief Load sync cursor JSON from database - * - * Retrieves the cursor_json for a source from rag_sync_state. - * Returns empty object if no cursor exists or on error. - * - * @param db SQLite database connection - * @param source_id Source identifier - * @return json Cursor JSON object (empty if not found) - */ -static json load_sync_cursor_json(sqlite3* db, int source_id) { - sqlite3_stmt* st = nullptr; - json out = json::object(); - const char* sql = "SELECT cursor_json FROM rag_sync_state WHERE source_id=?"; - if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) { - return out; - } - sqlite3_bind_int(st, 1, source_id); - int rc = sqlite3_step(st); - if (rc == SQLITE_ROW) { - const unsigned char* txt = sqlite3_column_text(st, 0); - if (txt) { - try { - out = json::parse(reinterpret_cast(txt)); - } catch (...) { - out = json::object(); - } + MYSQL_RES* res = db.query(sql); + json out = json::object(); + + MYSQL_ROW row = mysql_fetch_row(res); + if (row && row[0]) { + try { + out = json::parse(row[0]); + } catch (...) { + out = json::object(); + } } - } - sqlite3_finalize(st); - if (!out.is_object()) out = json::object(); - return out; + + mysql_free_result(res); + if (!out.is_object()) out = json::object(); + return out; } -/** - * @brief Parse sync cursor JSON into SyncCursor struct - * - * Extracts cursor configuration from JSON and determines the - * value type (numeric vs string). Used for incremental ingestion. - * - * @param cursor_json JSON cursor configuration - * @param default_col Default column name if not specified - * @return SyncCursor Parsed cursor configuration - */ static SyncCursor parse_sync_cursor(const json& cursor_json, const std::string& default_col) { - SyncCursor c; - c.column = default_col; - if (cursor_json.is_object()) { - if (cursor_json.contains("column") && cursor_json["column"].is_string()) { - c.column = cursor_json["column"].get(); - } - if (cursor_json.contains("value")) { - const auto& v = cursor_json["value"]; - if (v.is_number_integer()) { - c.has_value = true; - c.numeric = true; - c.num_value = v.get(); - } else if (v.is_number_float()) { - c.has_value = true; - c.numeric = true; - c.num_value = static_cast(v.get()); - } else if (v.is_string()) { - c.has_value = true; - c.str_value = v.get(); - if (is_integer_string(c.str_value)) { - c.numeric = true; - c.num_value = std::stoll(c.str_value); + SyncCursor c; + c.column = default_col; + if (cursor_json.is_object()) { + if (cursor_json.contains("column") && cursor_json["column"].is_string()) { + c.column = cursor_json["column"].get(); + } + if (cursor_json.contains("value")) { + const auto& v = cursor_json["value"]; + if (v.is_number_integer()) { + c.has_value = true; + c.numeric = true; + c.num_value = v.get(); + } else if (v.is_number_float()) { + c.has_value = true; + c.numeric = true; + c.num_value = static_cast(v.get()); + } else if (v.is_string()) { + c.has_value = true; + c.str_value = v.get(); + if (is_integer_string(c.str_value)) { + c.numeric = true; + c.num_value = std::stoll(c.str_value); + } + } } - } } - } - return c; + return c; } -/** - * @brief Build incremental filter SQL from cursor - * - * Constructs a WHERE clause fragment for incremental ingestion. - * Returns empty string if cursor has no value set. - * - * @param c Sync cursor configuration - * @return std::string SQL filter (e.g., "`Id` > 123") or empty string - * - * @note String values are escaped to prevent SQL injection. - */ static std::string build_incremental_filter(const SyncCursor& c) { - if (!c.has_value || c.column.empty()) return ""; - std::string col = "`" + c.column + "`"; - if (c.numeric) { - return col + " > " + std::to_string(c.num_value); - } - return col + " > '" + sql_escape_single_quotes(c.str_value) + "'"; + if (!c.has_value || c.column.empty()) return ""; + std::string col = "`" + c.column + "`"; + if (c.numeric) { + return col + " > " + std::to_string(c.num_value); + } + return col + " > '" + sql_escape_single_quotes(c.str_value) + "'"; } -/** - * @brief Update sync state in database - * - * Upserts the cursor state into rag_sync_state after successful - * (or partial) ingestion. Updates the watermark value and last_ok_at. - * - * @param db SQLite database connection - * @param source_id Source identifier - * @param cursor_json Updated cursor JSON to store - * - * @note On error, calls fatal() to terminate the program. - */ -static void update_sync_state(sqlite3* db, int source_id, const json& cursor_json) { - const char* sql = - "INSERT INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) " - "VALUES(?, 'poll', ?, unixepoch(), NULL) " - "ON CONFLICT(source_id) DO UPDATE SET " - "cursor_json=excluded.cursor_json, last_ok_at=excluded.last_ok_at, last_error=NULL"; - sqlite3_stmt* st = nullptr; - sqlite_prepare_or_die(db, &st, sql); - sqlite3_bind_int(st, 1, source_id); - std::string cursor_str = json_dump_compact(cursor_json); - bind_text(st, 2, cursor_str); - int rc = sqlite3_step(st); - sqlite3_finalize(st); - if (rc != SQLITE_DONE) { - fatal(std::string("SQLite upsert rag_sync_state failed: ") + sqlite3_errmsg(db)); - } +static void update_sync_state(MySQLDB& db, int source_id, const json& cursor_json) { + std::string cursor_str = json_dump_compact(cursor_json); + std::string escaped_cursor = sql_escape_single_quotes(cursor_str); + + // Use std::ostringstream to avoid fixed buffer size issues + std::ostringstream sql; + sql << "INSERT INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) " + << "VALUES(" << source_id << ", 'poll', '" << escaped_cursor << "', unixepoch(), NULL) " + << "ON CONFLICT(source_id) DO UPDATE SET " + << "cursor_json='" << escaped_cursor << "', last_ok_at=unixepoch(), last_error=NULL"; + + db.execute(sql.str().c_str()); } // =========================================================================== -// SQLite Prepared Statements +// Document Operations (via MySQL) // =========================================================================== -/** - * @brief Collection of prepared SQLite statements - * - * Holds all prepared statements needed for ingestion. - * Prepared statements are reused for efficiency and SQL injection safety. - */ -struct SqliteStmts { - sqlite3_stmt* doc_exists = nullptr; ///< Check if document exists - sqlite3_stmt* ins_doc = nullptr; ///< Insert document - sqlite3_stmt* ins_chunk = nullptr; ///< Insert chunk - sqlite3_stmt* ins_fts = nullptr; ///< Insert into FTS index - sqlite3_stmt* ins_vec = nullptr; ///< Insert vector embedding -}; - -/** - * @brief Prepare SQLite statement or terminate - * - * Prepares a SQLite statement and calls fatal() on failure. - * Used for statements that must succeed for the program to continue. - * - * @param db SQLite database connection - * @param st Output parameter for prepared statement handle - * @param sql SQL statement to prepare - * - * @note On failure, prints error and calls fatal() to exit. - */ -static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql) { - if (sqlite3_prepare_v2(db, sql, -1, st, nullptr) != SQLITE_OK) { - fatal(std::string("SQLite prepare failed: ") + sqlite3_errmsg(db) + "\nSQL: " + sql); - } -} - -/** - * @brief Finalize all statements in SqliteStmts - * - * Releases all prepared statement resources. - * - * @param s Statement collection to finalize (reset to default state) - */ -static void sqlite_finalize_all(SqliteStmts& s) { - if (s.doc_exists) sqlite3_finalize(s.doc_exists); - if (s.ins_doc) sqlite3_finalize(s.ins_doc); - if (s.ins_chunk) sqlite3_finalize(s.ins_chunk); - if (s.ins_fts) sqlite3_finalize(s.ins_fts); - if (s.ins_vec) sqlite3_finalize(s.ins_vec); - s = SqliteStmts{}; -} +static bool doc_exists(MySQLDB& db, const std::string& doc_id) { + std::string escaped_id = sql_escape_single_quotes(doc_id); + std::ostringstream sql; + sql << "SELECT 1 FROM rag_documents WHERE doc_id = '" << escaped_id << "' LIMIT 1"; -/** - * @brief Bind text parameter to SQLite statement - * - * Binds a std::string value to a prepared statement parameter. - * Uses SQLITE_TRANSIENT for string management (SQLite makes a copy). - * - * @param st Prepared statement - * @param idx Parameter index (1-based) - * @param v String value to bind - */ -static void bind_text(sqlite3_stmt* st, int idx, const std::string& v) { - sqlite3_bind_text(st, idx, v.c_str(), -1, SQLITE_TRANSIENT); -} + MYSQL_RES* res = db.query(sql.str().c_str()); + my_ulonglong rows = mysql_num_rows(res); + mysql_free_result(res); -/** - * @brief Bind vector embedding to SQLite statement - * - * Binds a float32 array as a BLOB for sqlite3-vec. - * This is the "best effort" binding format that works with most - * sqlite3-vec builds. Modify if your build expects different format. - * - * @param st Prepared statement - * @param idx Parameter index (1-based) - * @param emb Float vector to bind - * - * @note The binding format is build-dependent. If vec operations fail, - * check your sqlite3-vec build's expected format. - */ -static void bind_vec_embedding(sqlite3_stmt* st, int idx, const std::vector& emb) { - const void* data = (const void*)emb.data(); - int bytes = (int)(emb.size() * sizeof(float)); - sqlite3_bind_blob(st, idx, data, bytes, SQLITE_TRANSIENT); + return rows > 0; } -/** - * @brief Check if document exists in database - * - * Tests whether a document with the given doc_id already exists - * in rag_documents. Used for skip-if-exists logic in v0. - * - * @param ss Statement collection - * @param doc_id Document identifier to check - * @return true if document exists, false otherwise - */ -static bool sqlite_doc_exists(SqliteStmts& ss, const std::string& doc_id) { - sqlite3_reset(ss.doc_exists); - sqlite3_clear_bindings(ss.doc_exists); - - bind_text(ss.doc_exists, 1, doc_id); - - int rc = sqlite3_step(ss.doc_exists); - return (rc == SQLITE_ROW); +static void insert_doc(MySQLDB& db, + int source_id, + const std::string& source_name, + const std::string& doc_id, + const std::string& pk_json, + const std::string& title, + const std::string& body, + const std::string& meta_json) { + std::string e_doc_id = sql_escape_single_quotes(doc_id); + std::string e_source_name = sql_escape_single_quotes(source_name); + std::string e_pk_json = sql_escape_single_quotes(pk_json); + std::string e_title = sql_escape_single_quotes(title); + std::string e_body = sql_escape_single_quotes(body); + std::string e_meta = sql_escape_single_quotes(meta_json); + + // Use std::ostringstream to avoid fixed buffer size issues + std::ostringstream sql; + sql << "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json) " + << "VALUES('" << e_doc_id << "', " << source_id << ", '" << e_source_name << "', '" + << e_pk_json << "', '" << e_title << "', '" << e_body << "', '" << e_meta << "')"; + + db.execute(sql.str().c_str()); } -/** - * @brief Insert document into rag_documents table - * - * Inserts a new document record with all metadata. - * - * @param ss Statement collection - * @param source_id Source identifier - * @param source_name Source name - * @param doc_id Document identifier - * @param pk_json Primary key JSON (for refetch) - * @param title Document title - * @param body Document body - * @param meta_json Metadata JSON object - * - * @note On failure, calls fatal() to terminate. - */ -static void sqlite_insert_doc(SqliteStmts& ss, - int source_id, - const std::string& source_name, - const std::string& doc_id, - const std::string& pk_json, - const std::string& title, - const std::string& body, - const std::string& meta_json) { - sqlite3_reset(ss.ins_doc); - sqlite3_clear_bindings(ss.ins_doc); - - bind_text(ss.ins_doc, 1, doc_id); - sqlite3_bind_int(ss.ins_doc, 2, source_id); - bind_text(ss.ins_doc, 3, source_name); - bind_text(ss.ins_doc, 4, pk_json); - bind_text(ss.ins_doc, 5, title); - bind_text(ss.ins_doc, 6, body); - bind_text(ss.ins_doc, 7, meta_json); - - int rc = sqlite3_step(ss.ins_doc); - if (rc != SQLITE_DONE) { - fatal(std::string("SQLite insert rag_documents failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_doc))); - } +static void insert_chunk(MySQLDB& db, + const std::string& chunk_id, + const std::string& doc_id, + int source_id, + int chunk_index, + const std::string& title, + const std::string& body, + const std::string& meta_json) { + std::string e_chunk_id = sql_escape_single_quotes(chunk_id); + std::string e_doc_id = sql_escape_single_quotes(doc_id); + std::string e_title = sql_escape_single_quotes(title); + std::string e_body = sql_escape_single_quotes(body); + std::string e_meta = sql_escape_single_quotes(meta_json); + + // Use std::ostringstream to avoid fixed buffer size issues + std::ostringstream sql; + sql << "INSERT INTO rag_chunks(chunk_id, doc_id, source_id, chunk_index, title, body, metadata_json) " + << "VALUES('" << e_chunk_id << "', '" << e_doc_id << "', " << source_id << ", " << chunk_index + << ", '" << e_title << "', '" << e_body << "', '" << e_meta << "')"; + + db.execute(sql.str().c_str()); } -/** - * @brief Insert chunk into rag_chunks table - * - * Inserts a new chunk record with title, body, and metadata. - * - * @param ss Statement collection - * @param chunk_id Chunk identifier (typically doc_id#index) - * @param doc_id Parent document identifier - * @param source_id Source identifier - * @param chunk_index Zero-based chunk index within document - * @param title Chunk title - * @param body Chunk body text - * @param meta_json Chunk metadata JSON - * - * @note On failure, calls fatal() to terminate. - */ -static void sqlite_insert_chunk(SqliteStmts& ss, - const std::string& chunk_id, - const std::string& doc_id, - int source_id, - int chunk_index, - const std::string& title, - const std::string& body, - const std::string& meta_json) { - sqlite3_reset(ss.ins_chunk); - sqlite3_clear_bindings(ss.ins_chunk); - - bind_text(ss.ins_chunk, 1, chunk_id); - bind_text(ss.ins_chunk, 2, doc_id); - sqlite3_bind_int(ss.ins_chunk, 3, source_id); - sqlite3_bind_int(ss.ins_chunk, 4, chunk_index); - bind_text(ss.ins_chunk, 5, title); - bind_text(ss.ins_chunk, 6, body); - bind_text(ss.ins_chunk, 7, meta_json); - - int rc = sqlite3_step(ss.ins_chunk); - if (rc != SQLITE_DONE) { - fatal(std::string("SQLite insert rag_chunks failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_chunk))); - } -} +static void insert_fts(MySQLDB& db, + const std::string& chunk_id, + const std::string& title, + const std::string& body) { + std::string e_chunk_id = sql_escape_single_quotes(chunk_id); + std::string e_title = sql_escape_single_quotes(title); + std::string e_body = sql_escape_single_quotes(body); -/** - * @brief Insert chunk into FTS index - * - * Inserts a chunk into the rag_fts_chunks FTS5 table for - * full-text search. The FTS table is contentless (text stored - * only in rag_chunks, FTS only maintains index). - * - * @param ss Statement collection - * @param chunk_id Chunk identifier - * @param title Chunk title - * @param body Chunk body text - * - * @note On failure, calls fatal() to terminate. - */ -static void sqlite_insert_fts(SqliteStmts& ss, - const std::string& chunk_id, - const std::string& title, - const std::string& body) { - sqlite3_reset(ss.ins_fts); - sqlite3_clear_bindings(ss.ins_fts); - - bind_text(ss.ins_fts, 1, chunk_id); - bind_text(ss.ins_fts, 2, title); - bind_text(ss.ins_fts, 3, body); - - int rc = sqlite3_step(ss.ins_fts); - if (rc != SQLITE_DONE) { - fatal(std::string("SQLite insert rag_fts_chunks failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_fts))); - } -} + // Use std::ostringstream to avoid fixed buffer size issues + std::ostringstream sql; + sql << "INSERT INTO rag_fts_chunks(chunk_id, title, body) " + << "VALUES('" << e_chunk_id << "', '" << e_title << "', '" << e_body << "')"; -/** - * @brief Insert vector embedding into rag_vec_chunks table - * - * Inserts a vector embedding along with metadata for vector - * similarity search using sqlite3-vec. - * - * Schema: rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) - * - * @param ss Statement collection - * @param emb Float32 vector embedding - * @param chunk_id Chunk identifier - * @param doc_id Parent document identifier - * @param source_id Source identifier - * @param updated_at_unixepoch Unix epoch timestamp (0 for "now") - * - * @note If ss.ins_vec is null (embedding disabled), returns without error. - * @note On failure, calls fatal() with vec-specific error message. - */ -static void sqlite_insert_vec(SqliteStmts& ss, - const std::vector& emb, - const std::string& chunk_id, - const std::string& doc_id, - int source_id, - std::int64_t updated_at_unixepoch) { - if (!ss.ins_vec) return; - - sqlite3_reset(ss.ins_vec); - sqlite3_clear_bindings(ss.ins_vec); - - bind_vec_embedding(ss.ins_vec, 1, emb); - bind_text(ss.ins_vec, 2, chunk_id); - bind_text(ss.ins_vec, 3, doc_id); - sqlite3_bind_int(ss.ins_vec, 4, source_id); - sqlite3_bind_int64(ss.ins_vec, 5, (sqlite3_int64)updated_at_unixepoch); - - int rc = sqlite3_step(ss.ins_vec); - if (rc != SQLITE_DONE) { - // Vec-specific error message for troubleshooting - fatal(std::string("SQLite insert rag_vec_chunks failed (check vec binding format): ") - + sqlite3_errmsg(sqlite3_db_handle(ss.ins_vec))); - } + db.execute(sql.str().c_str()); } // =========================================================================== // Embedding Generation // =========================================================================== -/** - * @brief Generate deterministic pseudo-embedding from text - * - * This is a stub implementation that generates a deterministic but - * non-semantic embedding from input text. Used for testing when - * a real embedding service is not available. - * - * The algorithm uses a rolling hash to distribute values across - * vector dimensions, then normalizes. Results are deterministic - * (same input always produces same output) but NOT semantic. - * - * @param text Input text to "embed" - * @param dim Vector dimension - * @return std::vector Deterministic pseudo-embedding - * - * @warning This does NOT produce semantic embeddings. Only for testing. - */ static std::vector pseudo_embedding(const std::string& text, int dim) { - std::vector v; - v.resize((size_t)dim, 0.0f); - - // FNV-1a-like rolling hash accumulation into float bins - std::uint64_t h = 1469598103934665603ULL; // FNV offset basis - for (size_t i = 0; i < text.size(); i++) { - h ^= (unsigned char)text[i]; - h *= 1099511628211ULL; // FNV prime - - // Spread influence into bins based on hash - size_t idx = (size_t)(h % (std::uint64_t)dim); - float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; // Normalize to 0..1 - v[idx] += (val - 0.5f); // Center around 0 - } - - // L2 normalization - double norm = 0.0; - for (int i = 0; i < dim; i++) norm += (double)v[(size_t)i] * (double)v[(size_t)i]; - norm = std::sqrt(norm); - if (norm > 1e-12) { - for (int i = 0; i < dim; i++) v[(size_t)i] = (float)(v[(size_t)i] / norm); - } - return v; + std::vector v; + v.resize((size_t)dim, 0.0f); + + std::uint64_t h = 1469598103934665603ULL; + for (size_t i = 0; i < text.size(); i++) { + h ^= (unsigned char)text[i]; + h *= 1099511628211ULL; + + size_t idx = (size_t)(h % (std::uint64_t)dim); + float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; + v[idx] += (val - 0.5f); + } + + double norm = 0.0; + for (int i = 0; i < dim; i++) norm += (double)v[(size_t)i] * (double)v[(size_t)i]; + norm = std::sqrt(norm); + if (norm > 1e-12) { + for (int i = 0; i < dim; i++) v[(size_t)i] = (float)(v[(size_t)i] / norm); + } + return v; } -/** - * @brief Abstract embedding provider interface - * - * Base class for embedding providers. Concrete implementations - * include StubEmbeddingProvider and OpenAIEmbeddingProvider. - */ struct EmbeddingProvider { - /** - * @brief Virtual destructor for safe polymorphic deletion - */ - virtual ~EmbeddingProvider() = default; - - /** - * @brief Generate embeddings for multiple input strings - * - * @param inputs Vector of input texts to embed - * @param dim Expected embedding dimension - * @return std::vector> Vector of embeddings (one per input) - * - * @note The returned vector must have exactly inputs.size() embeddings. - * @note Each embedding must have exactly dim float values. - * - * @throws std::runtime_error on embedding generation failure - */ - virtual std::vector> embed(const std::vector& inputs, int dim) = 0; + virtual ~EmbeddingProvider() = default; + virtual std::vector> embed(const std::vector& inputs, int dim) = 0; }; -/** - * @brief Stub embedding provider for testing - * - * Uses pseudo_embedding() to generate deterministic but non-semantic - * embeddings. Used when no real embedding service is available. - */ struct StubEmbeddingProvider : public EmbeddingProvider { - /** - * @brief Generate pseudo-embeddings for inputs - * - * @param inputs Vector of input texts - * @param dim Embedding dimension - * @return std::vector> Deterministic pseudo-embeddings - */ - std::vector> embed(const std::vector& inputs, int dim) override { - std::vector> out; - out.reserve(inputs.size()); - for (const auto& s : inputs) out.push_back(pseudo_embedding(s, dim)); - return out; - } + std::vector> embed(const std::vector& inputs, int dim) override { + std::vector> out; + out.reserve(inputs.size()); + for (const auto& s : inputs) out.push_back(pseudo_embedding(s, dim)); + return out; + } }; -/** - * @brief Buffer for curl HTTP response data - * - * Simple string buffer used by curl write callback to accumulate - * HTTP response data. - */ struct CurlBuffer { - std::string data; ///< Accumulated response data + std::string data; }; -/** - * @brief Curl write callback function - * - * Callback function passed to libcurl to handle incoming response data. - * Appends received data to the CurlBuffer. - * - * @param contents Pointer to received data - * @param size Size of each data element - * @param nmemb Number of elements - * @param userp User pointer (must point to CurlBuffer) - * @return size_t Total bytes processed - */ static size_t curl_write_cb(void* contents, size_t size, size_t nmemb, void* userp) { - size_t total = size * nmemb; - CurlBuffer* buf = static_cast(userp); - buf->data.append(static_cast(contents), total); - return total; + size_t total = size * nmemb; + CurlBuffer* buf = static_cast(userp); + buf->data.append(static_cast(contents), total); + return total; } -/** - * @brief OpenAI-compatible embedding provider - * - * Calls an OpenAI-compatible HTTP API to generate embeddings. - * Works with OpenAI, Azure OpenAI, or any compatible service. - */ struct OpenAIEmbeddingProvider : public EmbeddingProvider { - std::string api_base; ///< API base URL (e.g., "https://api.openai.com/v1") - std::string api_key; ///< API authentication key - std::string model; ///< Model name (e.g., "text-embedding-3-large") - int timeout_ms = 20000; ///< Request timeout in milliseconds - - /** - * @brief Construct OpenAI embedding provider - * - * @param base API base URL - * @param key API key - * @param mdl Model name - * @param timeout Request timeout - */ - OpenAIEmbeddingProvider(std::string base, std::string key, std::string mdl, int timeout) - : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {} - - /** - * @brief Generate embeddings via OpenAI-compatible API - * - * Sends a batch of texts to the embedding API and returns - * the generated embeddings. - * - * @param inputs Vector of input texts to embed - * @param dim Expected embedding dimension - * @return std::vector> Generated embeddings - * - * @throws std::runtime_error on API error, timeout, or invalid response - * - * @note The inputs are sent in a single batch. For large batches, - * consider splitting into multiple calls. - */ - std::vector> embed(const std::vector& inputs, int dim) override { - // Validate configuration - if (api_base.empty()) { - throw std::runtime_error("embedding api_base is empty"); - } - if (api_key.empty()) { - throw std::runtime_error("embedding api_key is empty"); - } - if (model.empty()) { - throw std::runtime_error("embedding model is empty"); - } - // Note: Some providers expect "hf:" prefix for HuggingFace models - if (model.rfind("hf:", 0) != 0) { - std::cerr << "WARN: embedding model should be prefixed with 'hf:' per Synthetic docs\n"; - } - - // Build request JSON - json req; - req["model"] = model; - req["input"] = inputs; - if (dim > 0) { - req["dimensions"] = dim; - } - std::string body = req.dump(); - - // Build URL - std::string url = api_base; - if (!url.empty() && url.back() == '/') url.pop_back(); - url += "/embeddings"; - - // Execute HTTP request - CURL* curl = curl_easy_init(); - if (!curl) { - throw std::runtime_error("curl_easy_init failed"); - } - - CurlBuffer buf; - struct curl_slist* headers = nullptr; - std::string auth = "Authorization: Bearer " + api_key; - headers = curl_slist_append(headers, "Content-Type: application/json"); - headers = curl_slist_append(headers, auth.c_str()); - - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl, CURLOPT_POST, 1L); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)body.size()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_cb); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buf); - curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms); - - CURLcode res = curl_easy_perform(curl); - long status = 0; - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); - - curl_slist_free_all(headers); - curl_easy_cleanup(curl); - - // Check for errors - if (res != CURLE_OK) { - throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res)); - } - if (status < 200 || status >= 300) { - throw std::runtime_error("embedding request failed with status " + std::to_string(status)); - } - - // Parse response - json resp = json::parse(buf.data); - if (!resp.contains("data") || !resp["data"].is_array()) { - throw std::runtime_error("embedding response missing data array"); - } - - // Extract embeddings - std::vector> out; - out.reserve(resp["data"].size()); - for (const auto& item : resp["data"]) { - if (!item.contains("embedding") || !item["embedding"].is_array()) { - throw std::runtime_error("embedding item missing embedding array"); - } - std::vector vec; - vec.reserve(item["embedding"].size()); - for (const auto& v : item["embedding"]) { - vec.push_back(v.get()); - } - if ((int)vec.size() != dim) { - throw std::runtime_error("embedding dimension mismatch: expected " + std::to_string(dim) - + ", got " + std::to_string(vec.size())); - } - out.push_back(std::move(vec)); - } + std::string api_base; + std::string api_key; + std::string model; + int timeout_ms = 20000; + + OpenAIEmbeddingProvider(std::string base, std::string key, std::string mdl, int timeout) + : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {} + + std::vector> embed(const std::vector& inputs, int dim) override { + if (api_base.empty()) throw std::runtime_error("embedding api_base is empty"); + if (api_key.empty()) throw std::runtime_error("embedding api_key is empty"); + if (model.empty()) throw std::runtime_error("embedding model is empty"); + + json req; + req["model"] = model; + req["input"] = inputs; + if (dim > 0) req["dimensions"] = dim; + std::string body = req.dump(); + + std::string url = api_base; + if (!url.empty() && url.back() == '/') url.pop_back(); + url += "/embeddings"; + + CURL* curl = curl_easy_init(); + if (!curl) throw std::runtime_error("curl_easy_init failed"); + + CurlBuffer buf; + struct curl_slist* headers = nullptr; + std::string auth = "Authorization: Bearer " + api_key; + headers = curl_slist_append(headers, "Content-Type: application/json"); + headers = curl_slist_append(headers, auth.c_str()); + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)body.size()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_cb); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buf); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms); + + CURLcode res = curl_easy_perform(curl); + long status = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + + if (res != CURLE_OK) throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res)); + if (status < 200 || status >= 300) throw std::runtime_error("embedding request failed with status " + std::to_string(status)); + + json resp = json::parse(buf.data); + if (!resp.contains("data") || !resp["data"].is_array()) throw std::runtime_error("embedding response missing data array"); + + std::vector> out; + out.reserve(resp["data"].size()); + for (const auto& item : resp["data"]) { + if (!item.contains("embedding") || !item["embedding"].is_array()) throw std::runtime_error("embedding item missing embedding array"); + std::vector vec; + vec.reserve(item["embedding"].size()); + for (const auto& v : item["embedding"]) vec.push_back(v.get()); + if ((int)vec.size() != dim) throw std::runtime_error("embedding dimension mismatch"); + out.push_back(std::move(vec)); + } - if (out.size() != inputs.size()) { - throw std::runtime_error("embedding response size mismatch"); + if (out.size() != inputs.size()) throw std::runtime_error("embedding response size mismatch"); + return out; } - return out; - } }; -/** - * @brief Build embedding provider from configuration - * - * Factory function that creates the appropriate embedding provider - * based on the configuration. - * - * @param cfg Embedding configuration - * @return std::unique_ptr Configured provider instance - * - * @note Currently supports "stub" and "openai" providers. - * "stub" is returned for unknown provider types. - */ static std::unique_ptr build_embedding_provider(const EmbeddingConfig& cfg) { - if (cfg.provider == "openai") { - return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms); - } - return std::make_unique(); + if (cfg.provider == "openai") { + return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms); + } + return std::make_unique(); } // =========================================================================== // Source Loading // =========================================================================== -/** - * @brief Load enabled sources from rag_sources table - * - * Queries the rag_sources table and parses all enabled sources - * into RagSource structs. Validates JSON configurations. - * - * @param db SQLite database connection - * @return std::vector Vector of enabled source configurations - * - * @note On JSON parsing error, calls fatal() to terminate. - * @note Only sources with enabled=1 are loaded. - */ -static std::vector load_sources(sqlite3* db) { - std::vector out; - - const char* sql = - "SELECT source_id, name, enabled, " - "backend_type, backend_host, backend_port, backend_user, backend_pass, backend_db, " - "table_name, pk_column, COALESCE(where_sql,''), " - "doc_map_json, chunking_json, COALESCE(embedding_json,'') " - "FROM rag_sources WHERE enabled = 1"; - - sqlite3_stmt* st = nullptr; - sqlite_prepare_or_die(db, &st, sql); - - while (sqlite3_step(st) == SQLITE_ROW) { - RagSource s; - s.source_id = sqlite3_column_int(st, 0); - s.name = str_or_empty((const char*)sqlite3_column_text(st, 1)); - s.enabled = sqlite3_column_int(st, 2); - - s.backend_type = str_or_empty((const char*)sqlite3_column_text(st, 3)); - s.host = str_or_empty((const char*)sqlite3_column_text(st, 4)); - s.port = sqlite3_column_int(st, 5); - s.user = str_or_empty((const char*)sqlite3_column_text(st, 6)); - s.pass = str_or_empty((const char*)sqlite3_column_text(st, 7)); - s.db = str_or_empty((const char*)sqlite3_column_text(st, 8)); - - s.table_name = str_or_empty((const char*)sqlite3_column_text(st, 9)); - s.pk_column = str_or_empty((const char*)sqlite3_column_text(st, 10)); - s.where_sql = str_or_empty((const char*)sqlite3_column_text(st, 11)); - - const char* doc_map = (const char*)sqlite3_column_text(st, 12); - const char* chunk_j = (const char*)sqlite3_column_text(st, 13); - const char* emb_j = (const char*)sqlite3_column_text(st, 14); - - try { - s.doc_map_json = json::parse(doc_map ? doc_map : "{}"); - s.chunking_json = json::parse(chunk_j ? chunk_j : "{}"); - if (emb_j && std::strlen(emb_j) > 0) s.embedding_json = json::parse(emb_j); - else s.embedding_json = json(); // null - } catch (const std::exception& e) { - sqlite3_finalize(st); - fatal("Invalid JSON in rag_sources.source_id=" + std::to_string(s.source_id) + ": " + e.what()); - } +static std::vector load_sources(MySQLDB& db) { + std::vector out; + + const char* sql = + "SELECT source_id, name, enabled, " + "backend_type, backend_host, backend_port, backend_user, backend_pass, backend_db, " + "table_name, pk_column, COALESCE(where_sql,''), " + "doc_map_json, chunking_json, COALESCE(embedding_json,'') " + "FROM rag_sources WHERE enabled = 1"; + + MYSQL_RES* res = db.query(sql); + MYSQL_FIELD* fields = mysql_fetch_fields(res); + + MYSQL_ROW row; + while ((row = mysql_fetch_row(res)) != nullptr) { + RagSource s; + s.source_id = atoi(row[0]); + s.name = str_or_empty(row[1]); + s.enabled = atoi(row[2]); + + s.backend_type = str_or_empty(row[3]); + s.host = str_or_empty(row[4]); + s.port = atoi(row[5]); + s.user = str_or_empty(row[6]); + s.pass = str_or_empty(row[7]); + s.db = str_or_empty(row[8]); + + s.table_name = str_or_empty(row[9]); + s.pk_column = str_or_empty(row[10]); + s.where_sql = str_or_empty(row[11]); + + const char* doc_map = row[12]; + const char* chunk_j = row[13]; + const char* emb_j = row[14]; + + try { + s.doc_map_json = json::parse(doc_map ? doc_map : "{}"); + s.chunking_json = json::parse(chunk_j ? chunk_j : "{}"); + if (emb_j && std::strlen(emb_j) > 0) s.embedding_json = json::parse(emb_j); + else s.embedding_json = json(); + } catch (const std::exception& e) { + mysql_free_result(res); + fatal("Invalid JSON in rag_sources.source_id=" + std::to_string(s.source_id) + ": " + e.what()); + } - // Basic validation (fail fast) - if (!s.doc_map_json.is_object()) { - sqlite3_finalize(st); - fatal("doc_map_json must be a JSON object for source_id=" + std::to_string(s.source_id)); - } - if (!s.chunking_json.is_object()) { - sqlite3_finalize(st); - fatal("chunking_json must be a JSON object for source_id=" + std::to_string(s.source_id)); - } + if (!s.doc_map_json.is_object()) { + mysql_free_result(res); + fatal("doc_map_json must be a JSON object for source_id=" + std::to_string(s.source_id)); + } + if (!s.chunking_json.is_object()) { + mysql_free_result(res); + fatal("chunking_json must be a JSON object for source_id=" + std::to_string(s.source_id)); + } - out.push_back(std::move(s)); - } + out.push_back(std::move(s)); + } - sqlite3_finalize(st); - return out; + mysql_free_result(res); + return out; } // =========================================================================== // Document Building // =========================================================================== -/** - * @brief Canonical document representation - * - * Contains all fields of a document in its canonical form, - * ready for insertion into rag_documents and chunking. - */ struct BuiltDoc { - std::string doc_id; ///< Unique document identifier - std::string pk_json; ///< Primary key JSON (for refetch) - std::string title; ///< Document title - std::string body; ///< Document body (to be chunked) - std::string metadata_json; ///< Metadata JSON object + std::string doc_id; + std::string pk_json; + std::string title; + std::string body; + std::string metadata_json; }; -/** - * @brief Build canonical document from source row - * - * Transforms a raw backend row into a canonical document using - * the doc_map_json specification. Handles doc_id formatting, - * title/body concatenation, and metadata building. - * - * @param src Source configuration with doc_map_json - * @param row Raw row data from backend - * @return BuiltDoc Canonical document representation - */ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) { - BuiltDoc d; - - // Build doc_id from format template - if (src.doc_map_json.contains("doc_id") && src.doc_map_json["doc_id"].is_object() - && src.doc_map_json["doc_id"].contains("format") && src.doc_map_json["doc_id"]["format"].is_string()) { - d.doc_id = apply_format(src.doc_map_json["doc_id"]["format"].get(), row); - } else { - // Fallback: table:pk format - auto pk = row_get(row, src.pk_column).value_or(""); - d.doc_id = src.table_name + ":" + pk; - } - - // Build pk_json (refetch pointer) - json pk = json::object(); - pk[src.pk_column] = row_get(row, src.pk_column).value_or(""); - d.pk_json = json_dump_compact(pk); - - // Build title from concat spec - if (src.doc_map_json.contains("title") && src.doc_map_json["title"].is_object() - && src.doc_map_json["title"].contains("concat")) { - d.title = eval_concat(src.doc_map_json["title"]["concat"], row, "", false); - } else { - d.title = ""; - } - - // Build body from concat spec - if (src.doc_map_json.contains("body") && src.doc_map_json["body"].is_object() - && src.doc_map_json["body"].contains("concat")) { - d.body = eval_concat(src.doc_map_json["body"]["concat"], row, "", false); - } else { - d.body = ""; - } - - // Build metadata JSON - json meta = json::object(); - if (src.doc_map_json.contains("metadata")) { - meta = build_metadata(src.doc_map_json["metadata"], row); - } - d.metadata_json = json_dump_compact(meta); - - return d; + BuiltDoc d; + + if (src.doc_map_json.contains("doc_id") && src.doc_map_json["doc_id"].is_object() + && src.doc_map_json["doc_id"].contains("format") && src.doc_map_json["doc_id"]["format"].is_string()) { + d.doc_id = apply_format(src.doc_map_json["doc_id"]["format"].get(), row); + } else { + auto pk = row_get(row, src.pk_column).value_or(""); + d.doc_id = src.table_name + ":" + pk; + } + + json pk = json::object(); + pk[src.pk_column] = row_get(row, src.pk_column).value_or(""); + d.pk_json = json_dump_compact(pk); + + if (src.doc_map_json.contains("title") && src.doc_map_json["title"].is_object() + && src.doc_map_json["title"].contains("concat")) { + d.title = eval_concat(src.doc_map_json["title"]["concat"], row, "", false); + } else { + d.title = ""; + } + + if (src.doc_map_json.contains("body") && src.doc_map_json["body"].is_object() + && src.doc_map_json["body"].contains("concat")) { + d.body = eval_concat(src.doc_map_json["body"]["concat"], row, "", false); + } else { + d.body = ""; + } + + json meta = json::object(); + if (src.doc_map_json.contains("metadata")) { + meta = build_metadata(src.doc_map_json["metadata"], row); + } + d.metadata_json = json_dump_compact(meta); + + return d; } // =========================================================================== // Embedding Input Builder // =========================================================================== -/** - * @brief Build embedding input text for a chunk - * - * Constructs the text to be embedded by applying the embedding_json - * input specification. Typically includes title, tags, and chunk body. - * - * @param ecfg Embedding configuration - * @param row Source row data (for column values) - * @param chunk_body Chunk body text - * @return std::string Text to embed (empty if embedding disabled) - */ static std::string build_embedding_input(const EmbeddingConfig& ecfg, const RowMap& row, const std::string& chunk_body) { - if (!ecfg.enabled) return ""; - if (!ecfg.input_spec.is_object()) return chunk_body; + if (!ecfg.enabled) return ""; + if (!ecfg.input_spec.is_object()) return chunk_body; - if (ecfg.input_spec.contains("concat") && ecfg.input_spec["concat"].is_array()) { - return eval_concat(ecfg.input_spec["concat"], row, chunk_body, true); - } + if (ecfg.input_spec.contains("concat") && ecfg.input_spec["concat"].is_array()) { + return eval_concat(ecfg.input_spec["concat"], row, chunk_body, true); + } - return chunk_body; + return chunk_body; +} + +// =========================================================================== +// Vector Insert (BLOB storage for SQLite backend) +// =========================================================================== + +static void insert_vec(MySQLDB& db, + const std::vector& emb, + const std::string& chunk_id, + const std::string& doc_id, + int source_id) { + // Convert float vector to hex string for SQLite BLOB literal syntax X'...' + std::string hex_blob; + hex_blob.reserve(emb.size() * 8); + for (float f : emb) { + hex_blob += float_to_hex_blob(f); + } + + std::string e_chunk_id = sql_escape_single_quotes(chunk_id); + std::string e_doc_id = sql_escape_single_quotes(doc_id); + + // Use SQLite's X'' hex literal syntax - works through MySQL protocol gateway + // Use stringstream to avoid fixed buffer size issues + std::ostringstream sql; + sql << "INSERT INTO rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) " + << "VALUES(X'" << hex_blob << "', '" << e_chunk_id << "', '" << e_doc_id + << "', " << source_id << ", unixepoch())"; + + db.execute(sql.str().c_str()); } -/** - * @brief Process a batch of pending embeddings - * - * Generates embeddings for all pending chunks in a single API call - * and inserts the resulting vectors into the database. This is much - * more efficient than generating embeddings one chunk at a time. - * - * @param pending Vector of pending embeddings to process - * @param embedder Embedding provider instance - * @param ecfg Embedding configuration (dim, etc.) - * @param ss Prepared SQLite statements - * @param now_epoch Current epoch time for updated_at field - * @return size_t Number of embeddings processed - * - * @note Clears the pending vector after processing. - * @note Throws std::runtime_error on embedding API failure. - */ static size_t flush_embedding_batch(std::vector& pending, EmbeddingProvider* embedder, const EmbeddingConfig& ecfg, - SqliteStmts& ss, - std::int64_t now_epoch) { - if (pending.empty()) return 0; - - // Build batch inputs - std::vector inputs; - inputs.reserve(pending.size()); - for (const auto& p : pending) { - inputs.push_back(p.input_text); - } - - // Generate embeddings in a single API call - std::vector> embeddings = embedder->embed(inputs, ecfg.dim); - - // Insert all vectors into the database - for (size_t i = 0; i < pending.size() && i < embeddings.size(); i++) { - const auto& p = pending[i]; - sqlite_insert_vec(ss, embeddings[i], p.chunk_id, p.doc_id, p.source_id, now_epoch); - } - - size_t count = pending.size(); - pending.clear(); - return count; -} + MySQLDB& db) { + if (pending.empty()) return 0; -// =========================================================================== -// Statement Preparation -// =========================================================================== + std::vector inputs; + inputs.reserve(pending.size()); + for (const auto& p : pending) { + inputs.push_back(p.input_text); + } -/** - * @brief Prepare all SQLite statements for ingestion - * - * Creates prepared statements for document existence check, - * document insertion, chunk insertion, FTS insertion, and - * optionally vector insertion. - * - * @param db SQLite database connection - * @param want_vec Whether to prepare vector insert statement - * @return SqliteStmts Collection of prepared statements - * - * @note On failure, calls fatal() to terminate. - */ -static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { - SqliteStmts ss; - - // Existence check - sqlite_prepare_or_die(db, &ss.doc_exists, - "SELECT 1 FROM rag_documents WHERE doc_id = ? LIMIT 1"); - - // Insert document (v0: no upsert) - sqlite_prepare_or_die(db, &ss.ins_doc, - "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json) " - "VALUES(?,?,?,?,?,?,?)"); - - // Insert chunk - sqlite_prepare_or_die(db, &ss.ins_chunk, - "INSERT INTO rag_chunks(chunk_id, doc_id, source_id, chunk_index, title, body, metadata_json) " - "VALUES(?,?,?,?,?,?,?)"); - - // Insert FTS - sqlite_prepare_or_die(db, &ss.ins_fts, - "INSERT INTO rag_fts_chunks(chunk_id, title, body) VALUES(?,?,?)"); - - // Insert vector (optional) - if (want_vec) { - sqlite_prepare_or_die(db, &ss.ins_vec, - "INSERT INTO rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) " - "VALUES(?,?,?,?,?)"); - } - - return ss; + std::vector> embeddings = embedder->embed(inputs, ecfg.dim); + + for (size_t i = 0; i < pending.size() && i < embeddings.size(); i++) { + const auto& p = pending[i]; + insert_vec(db, embeddings[i], p.chunk_id, p.doc_id, p.source_id); + } + + size_t count = pending.size(); + pending.clear(); + return count; } // =========================================================================== // Source Ingestion // =========================================================================== -/** - * @brief Ingest data from a single source - * - * Main ingestion function for a single source. Performs the following: - * 1. Parse chunking and embedding configurations - * 2. Load sync cursor (watermark) - * 3. Connect to backend - * 4. Build and execute SELECT query with incremental filter - * 5. For each row: build document, check existence, insert, chunk, embed - * 6. Update sync cursor with max watermark value - * - * @param sdb SQLite database connection - * @param src Source configuration - * - * @note Only "mysql" backend_type is supported in v0. - * @note Skips documents that already exist (no update in v0). - * @note All inserts happen in a single transaction (managed by caller). - */ -static void ingest_source(sqlite3* sdb, const RagSource& src) { - std::cerr << "Ingesting source_id=" << src.source_id - << " name=" << src.name - << " backend=" << src.backend_type - << " table=" << src.table_name << "\n"; - - // v0 only supports MySQL - if (src.backend_type != "mysql") { - std::cerr << " Skipping: backend_type not supported in v0.\n"; - return; - } - - // Parse configurations - ChunkingConfig ccfg = parse_chunking_json(src.chunking_json); - EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json); - std::unique_ptr embedder; - if (ecfg.enabled) { - embedder = build_embedding_provider(ecfg); - } - - // Load sync cursor (watermark for incremental sync) - json cursor_json = load_sync_cursor_json(sdb, src.source_id); - SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column); - - // Prepare SQLite statements - SqliteStmts ss = prepare_sqlite_statements(sdb, ecfg.enabled); - - // Connect to MySQL - MYSQL* mdb = mysql_connect_or_die(src); - - // Build SELECT with incremental filter - std::vector cols = collect_needed_columns(src, ecfg); - if (!cursor.column.empty()) add_unique(cols, cursor.column); - std::string extra_filter = build_incremental_filter(cursor); - std::string sel = build_select_sql(src, cols, extra_filter); - - if (mysql_query(mdb, sel.c_str()) != 0) { - std::string err = mysql_error(mdb); - mysql_close(mdb); - sqlite_finalize_all(ss); - fatal("MySQL query failed: " + err + "\nSQL: " + sel); - } +static void ingest_source(MySQLDB& db, const RagSource& src) { + std::cerr << "Ingesting source_id=" << src.source_id + << " name=" << src.name + << " backend=" << src.backend_type + << " table=" << src.table_name << "\n"; - MYSQL_RES* res = mysql_store_result(mdb); - if (!res) { - std::string err = mysql_error(mdb); - mysql_close(mdb); - sqlite_finalize_all(ss); - fatal("mysql_store_result failed: " + err); - } - - std::uint64_t ingested_docs = 0; - std::uint64_t skipped_docs = 0; - - // Note: updated_at is set to 0 for v0. For accurate timestamps, - // query SELECT unixepoch() once at the start of main(). - std::int64_t now_epoch = 0; - - // Batch embeddings for efficiency - std::vector pending_embeddings; - - // Track max watermark value (for next run) - MYSQL_ROW r; - bool max_set = false; - bool max_numeric = false; - std::int64_t max_num = 0; - std::string max_str; - - // Process each row - while ((r = mysql_fetch_row(res)) != nullptr) { - RowMap row = mysql_row_to_map(res, r); - - // Track max watermark value (even for skipped docs) - if (!cursor.column.empty()) { - auto it = row.find(cursor.column); - if (it != row.end()) { - const std::string& v = it->second; - if (!v.empty()) { - if (!max_set) { - // First value - determine type and store - if (cursor.numeric || is_integer_string(v)) { - try { - max_numeric = true; - max_num = std::stoll(v); - } catch (const std::out_of_range& e) { - // Huge integer - fall back to string comparison - max_numeric = false; - max_str = v; - } catch (const std::invalid_argument& e) { - // Not actually a number despite is_integer_string check - max_numeric = false; - max_str = v; - } - } else { - max_numeric = false; - max_str = v; - } - max_set = true; - } else if (max_numeric) { - // Already numeric - try to compare as number - if (is_integer_string(v)) { - try { - std::int64_t nv = std::stoll(v); - if (nv > max_num) max_num = nv; - } catch (const std::out_of_range& e) { - // Huge integer - fall back to string comparison - max_numeric = false; - max_str = v; - } catch (const std::invalid_argument& e) { - // Not actually a number - skip this value - } - } - } else { - // Already string - lexicographic comparison - if (v > max_str) max_str = v; - } - } - } + if (src.backend_type != "mysql") { + std::cerr << " Skipping: backend_type not supported in v0.\n"; + return; + } + + ChunkingConfig ccfg = parse_chunking_json(src.chunking_json); + EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json); + std::unique_ptr embedder; + if (ecfg.enabled) { + embedder = build_embedding_provider(ecfg); } - // Build document from row - BuiltDoc doc = build_document_from_row(src, row); + json cursor_json = load_sync_cursor_json(db, src.source_id); + SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column); + + MYSQL* mdb = mysql_connect_or_die(src); + + std::vector cols = collect_needed_columns(src, ecfg); + if (!cursor.column.empty()) add_unique(cols, cursor.column); + std::string extra_filter = build_incremental_filter(cursor); + std::string sel = build_select_sql(src, cols, extra_filter); - // v0: Skip if document already exists - if (sqlite_doc_exists(ss, doc.doc_id)) { - skipped_docs++; - continue; + if (mysql_query(mdb, sel.c_str()) != 0) { + std::string err = mysql_error(mdb); + mysql_close(mdb); + fatal("MySQL query failed: " + err + "\nSQL: " + sel); } - // Insert document - sqlite_insert_doc(ss, src.source_id, src.name, - doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json); + MYSQL_RES* res = mysql_store_result(mdb); + if (!res) { + std::string err = mysql_error(mdb); + mysql_close(mdb); + fatal("mysql_store_result failed: " + err); + } - // Chunk document body - std::vector chunks = chunk_text_chars(doc.body, ccfg); + std::uint64_t ingested_docs = 0; + std::uint64_t skipped_docs = 0; + std::vector pending_embeddings; + + MYSQL_ROW r; + bool max_set = false; + bool max_numeric = false; + std::int64_t max_num = 0; + std::string max_str; + + while ((r = mysql_fetch_row(res)) != nullptr) { + RowMap row = mysql_row_to_map(res, r); + + if (!cursor.column.empty()) { + auto it = row.find(cursor.column); + if (it != row.end()) { + const std::string& v = it->second; + if (!v.empty()) { + if (!max_set) { + if (cursor.numeric || is_integer_string(v)) { + try { + max_numeric = true; + max_num = std::stoll(v); + } catch (...) { + max_numeric = false; + max_str = v; + } + } else { + max_numeric = false; + max_str = v; + } + max_set = true; + } else if (max_numeric) { + if (is_integer_string(v)) { + try { + std::int64_t nv = std::stoll(v); + if (nv > max_num) max_num = nv; + } catch (...) { + max_numeric = false; + max_str = v; + } + } + } else { + if (v > max_str) max_str = v; + } + } + } + } - // Process each chunk - for (size_t i = 0; i < chunks.size(); i++) { - std::string chunk_id = doc.doc_id + "#" + std::to_string(i); + BuiltDoc doc = build_document_from_row(src, row); + + if (doc_exists(db, doc.doc_id)) { + skipped_docs++; + continue; + } - // Chunk metadata (minimal - just index) - json cmeta = json::object(); - cmeta["chunk_index"] = (int)i; + insert_doc(db, src.source_id, src.name, + doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json); - // Use document title as chunk title (simple approach) - std::string chunk_title = doc.title; + std::vector chunks = chunk_text_chars(doc.body, ccfg); - // Insert chunk - sqlite_insert_chunk(ss, chunk_id, doc.doc_id, src.source_id, (int)i, - chunk_title, chunks[i], json_dump_compact(cmeta)); + for (size_t i = 0; i < chunks.size(); i++) { + std::string chunk_id = doc.doc_id + "#" + std::to_string(i); - // Insert into FTS index - sqlite_insert_fts(ss, chunk_id, chunk_title, chunks[i]); + json cmeta = json::object(); + cmeta["chunk_index"] = (int)i; - // Collect embedding for batched processing (if enabled) - if (ecfg.enabled) { - std::string emb_input = build_embedding_input(ecfg, row, chunks[i]); - pending_embeddings.push_back({chunk_id, doc.doc_id, src.source_id, emb_input}); + std::string chunk_title = doc.title; - // Flush batch if full - if ((int)pending_embeddings.size() >= ecfg.batch_size) { - flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, ss, now_epoch); + insert_chunk(db, chunk_id, doc.doc_id, src.source_id, (int)i, + chunk_title, chunks[i], json_dump_compact(cmeta)); + + insert_fts(db, chunk_id, chunk_title, chunks[i]); + + if (ecfg.enabled) { + std::string emb_input = build_embedding_input(ecfg, row, chunks[i]); + pending_embeddings.push_back({chunk_id, doc.doc_id, src.source_id, emb_input}); + + if ((int)pending_embeddings.size() >= ecfg.batch_size) { + flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, db); + } + } + } + + ingested_docs++; + if (ingested_docs % 1000 == 0) { + std::cerr << " progress: ingested_docs=" << ingested_docs + << " skipped_docs=" << skipped_docs << "\n"; } - } } - ingested_docs++; - if (ingested_docs % 1000 == 0) { - std::cerr << " progress: ingested_docs=" << ingested_docs - << " skipped_docs=" << skipped_docs << "\n"; + if (ecfg.enabled && !pending_embeddings.empty()) { + flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, db); } - } - - // Flush any remaining pending embeddings - if (ecfg.enabled && !pending_embeddings.empty()) { - flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, ss, now_epoch); - } - - // Cleanup - mysql_free_result(res); - mysql_close(mdb); - sqlite_finalize_all(ss); - - // Update sync cursor with max watermark value - if (!cursor_json.is_object()) cursor_json = json::object(); - if (!cursor.column.empty()) cursor_json["column"] = cursor.column; - if (max_set) { - if (max_numeric) { - cursor_json["value"] = max_num; - } else { - cursor_json["value"] = max_str; + + mysql_free_result(res); + mysql_close(mdb); + + if (!cursor_json.is_object()) cursor_json = json::object(); + if (!cursor.column.empty()) cursor_json["column"] = cursor.column; + if (max_set) { + if (max_numeric) { + cursor_json["value"] = max_num; + } else { + cursor_json["value"] = max_str; + } + } + update_sync_state(db, src.source_id, cursor_json); + + std::cerr << "Done source " << src.name + << " ingested_docs=" << ingested_docs + << " skipped_docs=" << skipped_docs << "\n"; +} + +// =========================================================================== +// Schema Initialization +// =========================================================================== + +/** + * @brief Check if a table exists in the database + * @param db Database connection + * @param table_name Name of the table to check + * @return true if table exists, false otherwise + */ +static bool table_exists(MySQLDB& db, const std::string& table_name) { + // Check connection validity + if (!db.conn) { + return false; + } + + // Try to query the table - if it fails, table doesn't exist + std::string escaped = sql_escape_single_quotes(table_name); + std::ostringstream sql; + sql << "SELECT COUNT(*) FROM `" << escaped << "` LIMIT 1"; + + // Suppress error output for this check + if (mysql_query(db.conn, sql.str().c_str()) != 0) { + return false; // Table doesn't exist + } + + // Check for actual errors (like "table doesn't exist") + unsigned int err = mysql_errno(db.conn); + if (err != 0) { + return false; // Table doesn't exist + } + + MYSQL_RES* res = mysql_store_result(db.conn); + if (res) { + mysql_free_result(res); + return true; // Table exists + } + return false; // Table doesn't exist +} + +/** + * @brief Initialize RAG schema in the database + * @param db Database connection + * @param vec_dim Vector dimension for rag_vec_chunks table + * @return true if schema was created, false if already exists + */ +static bool init_schema(MySQLDB& db, int vec_dim = 1536) { + // Check if schema is complete by checking for rag_sync_state table + // (rag_sync_state is created last, so if it exists, schema is complete) + bool schema_complete = table_exists(db, "rag_sync_state"); + + // Note: PRAGMA commands are SQLite-specific and not supported through MySQL protocol + // The SQLite backend should have these configured already + + // Create rag_sources table + db.execute( + "CREATE TABLE IF NOT EXISTS rag_sources (" + " source_id INTEGER PRIMARY KEY," + " name TEXT NOT NULL UNIQUE," + " enabled INTEGER NOT NULL DEFAULT 1," + " backend_type TEXT NOT NULL," + " backend_host TEXT NOT NULL," + " backend_port INTEGER NOT NULL," + " backend_user TEXT NOT NULL," + " backend_pass TEXT NOT NULL," + " backend_db TEXT NOT NULL," + " table_name TEXT NOT NULL," + " pk_column TEXT NOT NULL," + " where_sql TEXT," + " doc_map_json TEXT NOT NULL," + " chunking_json TEXT NOT NULL," + " embedding_json TEXT," + " created_at INTEGER NOT NULL DEFAULT (unixepoch())," + " updated_at INTEGER NOT NULL DEFAULT (unixepoch())" + ")" + ); + + db.execute("CREATE INDEX IF NOT EXISTS idx_rag_sources_enabled ON rag_sources(enabled)"); + db.execute("CREATE INDEX IF NOT EXISTS idx_rag_sources_backend ON rag_sources(backend_type, backend_host, backend_port, backend_db, table_name)"); + + // Create rag_documents table + db.execute( + "CREATE TABLE IF NOT EXISTS rag_documents (" + " doc_id TEXT PRIMARY KEY," + " source_id INTEGER NOT NULL REFERENCES rag_sources(source_id)," + " source_name TEXT NOT NULL," + " pk_json TEXT NOT NULL," + " title TEXT," + " body TEXT," + " metadata_json TEXT NOT NULL DEFAULT '{}'," + " updated_at INTEGER NOT NULL DEFAULT (unixepoch())," + " deleted INTEGER NOT NULL DEFAULT 0" + ")" + ); + + db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_updated ON rag_documents(source_id, updated_at)"); + db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_deleted ON rag_documents(source_id, deleted)"); + + // Create rag_chunks table + db.execute( + "CREATE TABLE IF NOT EXISTS rag_chunks (" + " chunk_id TEXT PRIMARY KEY," + " doc_id TEXT NOT NULL REFERENCES rag_documents(doc_id)," + " source_id INTEGER NOT NULL REFERENCES rag_sources(source_id)," + " chunk_index INTEGER NOT NULL," + " title TEXT," + " body TEXT NOT NULL," + " metadata_json TEXT NOT NULL DEFAULT '{}'," + " updated_at INTEGER NOT NULL DEFAULT (unixepoch())," + " deleted INTEGER NOT NULL DEFAULT 0" + ")" + ); + + db.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_rag_chunks_doc_idx ON rag_chunks(doc_id, chunk_index)"); + db.execute("CREATE INDEX IF NOT EXISTS idx_rag_chunks_source_doc ON rag_chunks(source_id, doc_id)"); + db.execute("CREATE INDEX IF NOT EXISTS idx_rag_chunks_deleted ON rag_chunks(deleted)"); + + // Create FTS5 virtual table + db.execute( + "CREATE VIRTUAL TABLE IF NOT EXISTS rag_fts_chunks " + "USING fts5(" + " chunk_id UNINDEXED," + " title," + " body," + " tokenize = 'unicode61'" + ")" + ); + + // Create vec0 virtual table for embeddings + // Note: This may fail if sqlite-vec extension is not loaded + std::ostringstream vec_sql; + vec_sql << "CREATE VIRTUAL TABLE IF NOT EXISTS rag_vec_chunks " + << "USING vec0(" + << " embedding float[" << vec_dim << "]," + << " chunk_id TEXT," + << " doc_id TEXT," + << " source_id INTEGER," + << " updated_at INTEGER" + << ")"; + if (!db.try_execute(vec_sql.str().c_str())) { + std::cerr << "Warning: vec0 table creation failed (sqlite-vec extension not available). Vector embeddings will be disabled.\n"; } - } - update_sync_state(sdb, src.source_id, cursor_json); - std::cerr << "Done source " << src.name - << " ingested_docs=" << ingested_docs - << " skipped_docs=" << skipped_docs << "\n"; + // Create convenience view + db.execute( + "CREATE VIEW IF NOT EXISTS rag_chunk_view AS " + "SELECT " + " c.chunk_id, " + " c.doc_id, " + " c.source_id, " + " d.source_name, " + " d.pk_json, " + " COALESCE(c.title, d.title) AS title, " + " c.body, " + " d.metadata_json AS doc_metadata_json, " + " c.metadata_json AS chunk_metadata_json, " + " c.updated_at " + "FROM rag_chunks c " + "JOIN rag_documents d ON d.doc_id = c.doc_id " + "WHERE c.deleted = 0 AND d.deleted = 0" + ); + + // Create sync state table + db.execute( + "CREATE TABLE IF NOT EXISTS rag_sync_state (" + " source_id INTEGER PRIMARY KEY REFERENCES rag_sources(source_id)," + " mode TEXT NOT NULL DEFAULT 'poll'," + " cursor_json TEXT NOT NULL DEFAULT '{}'," + " last_ok_at INTEGER," + " last_error TEXT" + ")" + ); + + return !schema_complete; // Return true if we created it, false if it was already complete } // =========================================================================== @@ -2156,96 +1463,354 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { // =========================================================================== /** - * @brief Main entry point for RAG ingestion tool - * - * Entry point for the RAG ingestion tool. Orchestrates the entire - * ingestion process: - * - * 1. Validates command-line arguments - * 2. Initializes libcurl - * 3. Opens SQLite RAG index database - * 4. Loads sqlite3-vec extension (if configured) - * 5. Sets SQLite pragmas (foreign keys, WAL mode) - * 6. Begins transaction - * 7. Loads and ingests all enabled sources - * 8. Commits or rolls back transaction - * 9. Cleanup and exit - * + * @brief Connection parameters + */ +struct ConnParams { + std::string host = "127.0.0.1"; + int port = 6030; + std::string user; + std::string pass; + std::string database; + + // Query-specific parameters + std::string query_text; + int source_id = -1; + int limit = 5; + + // Init-specific parameters + int vec_dim = 1536; // Vector dimension for vec0 table +}; + +static void print_usage(const char* prog_name) { + std::cerr << "Usage:\n"; + std::cerr << " Initialize schema:\n"; + std::cerr << " " << prog_name << " init [OPTIONS]\n"; + std::cerr << "\n"; + std::cerr << " Run ingestion:\n"; + std::cerr << " " << prog_name << " ingest [OPTIONS]\n"; + std::cerr << "\n"; + std::cerr << " Vector similarity search:\n"; + std::cerr << " " << prog_name << " query --text=\"your query\" [OPTIONS]\n"; + std::cerr << "\n"; + std::cerr << "Common Options (SQLite Server via MySQL protocol gateway):\n"; + std::cerr << " -h, --host=name SQLite Server host (default: 127.0.0.1)\n"; + std::cerr << " -P, --port=# SQLite Server port - MySQL protocol gateway (default: 6030)\n"; + std::cerr << " -u, --user=name User for login\n"; + std::cerr << " -p, --password=name Password to use\n"; + std::cerr << " -D, --database=name Database to use (required)\n"; + std::cerr << " -?, --help Show this help message\n"; + std::cerr << "\n"; + std::cerr << "Init Options:\n"; + std::cerr << " --vec-dim=# Vector dimension for rag_vec_chunks table (default: 1536)\n"; + std::cerr << "\n"; + std::cerr << "Query Options:\n"; + std::cerr << " -t, --text=text Query text to search for (required for query)\n"; + std::cerr << " -s, --source-id=# Source ID to search (default: all enabled sources)\n"; + std::cerr << " -l, --limit=# Maximum results to return (default: 5)\n"; +} + +/** + * @brief Parse connection parameters from command-line arguments * @param argc Argument count * @param argv Argument values - * @return int Exit code (0 on success, 1 on error, 2 on usage error) - * - * Usage: - * @verbatim - * ./rag_ingest /path/to/rag_index.sqlite - * @endverbatim + * @param params Output connection parameters + * @return Command name ("init", "ingest", or "query") or empty string on error */ +static std::string parse_args(int argc, char** argv, ConnParams& params) { + static struct option long_options[] = { + {"host", required_argument, 0, 'h'}, + {"port", required_argument, 0, 'P'}, + {"user", required_argument, 0, 'u'}, + {"password", required_argument, 0, 'p'}, + {"database", required_argument, 0, 'D'}, + {"text", required_argument, 0, 't'}, + {"source-id",required_argument, 0, 's'}, + {"limit", required_argument, 0, 'l'}, + {"vec-dim", required_argument, 0, 1000}, // Using 1000 as short code + {"help", no_argument, 0, '?'}, + {0, 0, 0, 0} + }; + + std::string command; + int opt; + int option_index = 0; + + // Parse command as first argument + if (argc < 2) { + return ""; + } + + command = argv[1]; + + // Validate command + if (command != "init" && command != "ingest" && command != "query") { + return ""; + } + + // Shift argv for getopt so command is processed separately + argc--; + argv++; + + // Parse options using getopt_long + while ((opt = getopt_long(argc, argv, "h:P:u:p:D:t:s:l:?", long_options, &option_index)) != -1) { + switch (opt) { + case 'h': + params.host = optarg; + break; + case 'P': + params.port = std::atoi(optarg); + break; + case 'u': + params.user = optarg; + break; + case 'p': + params.pass = optarg; + break; + case 'D': + params.database = optarg; + break; + case 't': + params.query_text = optarg; + break; + case 's': + params.source_id = std::atoi(optarg); + break; + case 'l': + params.limit = std::atoi(optarg); + break; + case 1000: // --vec-dim + params.vec_dim = std::atoi(optarg); + if (params.vec_dim <= 0) { + std::cerr << "Error: --vec-dim must be positive\n"; + return ""; + } + break; + case '?': + default: + return ""; + } + } + + // Validate required parameters + if (params.database.empty()) { + std::cerr << "Error: Required parameter missing: --database is required\n"; + return ""; + } + + // For query command, query_text is required + if (command == "query" && params.query_text.empty()) { + std::cerr << "Error: --text is required for query command\n"; + return ""; + } + + return command; +} + int main(int argc, char** argv) { - // Validate arguments - if (argc != 2) { - std::cerr << "Usage: " << argv[0] << " \n"; - return 2; - } - - // Initialize libcurl (for embedding API calls) - curl_global_init(CURL_GLOBAL_DEFAULT); - - const char* sqlite_path = argv[1]; - - // Open SQLite database - sqlite3* db = nullptr; - if (sqlite3_open(sqlite_path, &db) != SQLITE_OK) { - fatal("Could not open SQLite DB: " + std::string(sqlite_path)); - } - - // Load sqlite3-vec extension if configured - sqlite_load_vec_extension(db); - - // Set safe SQLite pragmas - sqlite_exec(db, "PRAGMA foreign_keys = ON;"); - sqlite_exec(db, "PRAGMA journal_mode = WAL;"); - sqlite_exec(db, "PRAGMA synchronous = NORMAL;"); - - // Begin single transaction for speed - if (sqlite_exec(db, "BEGIN IMMEDIATE;") != SQLITE_OK) { - sqlite3_close(db); - fatal("Failed to begin transaction"); - } - - bool ok = true; - try { - // Load and ingest all enabled sources - std::vector sources = load_sources(db); - if (sources.empty()) { - std::cerr << "No enabled sources found in rag_sources.\n"; + ConnParams params; + std::string command = parse_args(argc, argv, params); + + if (command.empty()) { + print_usage(argv[0]); + return 2; } - for (size_t i = 0; i < sources.size(); i++) { - ingest_source(db, sources[i]); + + // Initialize command + if (command == "init") { + MySQLDB db; + db.connect(params.host.c_str(), params.port, params.user.c_str(), + params.pass.c_str(), params.database.c_str()); + + bool created = init_schema(db, params.vec_dim); + if (created) { + std::cout << "Schema created successfully (vec_dim=" << params.vec_dim << ").\n"; + } else { + std::cout << "Schema already exists.\n"; + } + + return 0; } - } catch (const std::exception& e) { - std::cerr << "Exception: " << e.what() << "\n"; - ok = false; - } catch (...) { - std::cerr << "Unknown exception\n"; - ok = false; - } - - // Commit or rollback - if (ok) { - if (sqlite_exec(db, "COMMIT;") != SQLITE_OK) { - sqlite_exec(db, "ROLLBACK;"); - sqlite3_close(db); - fatal("Failed to commit transaction"); + + // Ingest command + if (command == "ingest") { + MySQLDB db; + db.connect(params.host.c_str(), params.port, params.user.c_str(), + params.pass.c_str(), params.database.c_str()); + + // Check if schema exists before proceeding + if (!table_exists(db, "rag_sources")) { + std::cerr << "Error: RAG schema not found. Please run 'init' command first:\n"; + std::cerr << " " << argv[0] << " init -h " << params.host + << " -P " << params.port << " -u " << params.user + << " -p " << params.pass << " -D " << params.database << "\n"; + return 1; + } + + curl_global_init(CURL_GLOBAL_DEFAULT); + + // Note: PRAGMA commands are SQLite-specific and not supported through MySQL protocol + // The SQLite backend should have these configured already + db.execute("BEGIN IMMEDIATE;"); + + bool ok = true; + try { + std::vector sources = load_sources(db); + if (sources.empty()) { + std::cerr << "No enabled sources found in rag_sources.\n"; + } + for (size_t i = 0; i < sources.size(); i++) { + ingest_source(db, sources[i]); + } + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << "\n"; + ok = false; + } catch (...) { + std::cerr << "Unknown exception\n"; + ok = false; + } + + db.execute(ok ? "COMMIT;" : "ROLLBACK;"); + curl_global_cleanup(); + return ok ? 0 : 1; } - } else { - sqlite_exec(db, "ROLLBACK;"); - sqlite3_close(db); - curl_global_cleanup(); - return 1; - } - - // Cleanup - sqlite3_close(db); - curl_global_cleanup(); - return 0; + + // Query command + if (command == "query") { + MySQLDB db; + db.connect(params.host.c_str(), params.port, params.user.c_str(), + params.pass.c_str(), params.database.c_str()); + + // Check if schema exists + if (!table_exists(db, "rag_sources")) { + std::cerr << "Error: RAG schema not found. Please run 'init' command first:\n"; + std::cerr << " " << argv[0] << " init -h " << params.host + << " -P " << params.port << " -u " << params.user + << " -p " << params.pass << " -D " << params.database << "\n"; + return 1; + } + + curl_global_init(CURL_GLOBAL_DEFAULT); + + try { + // Load sources + std::vector sources = load_sources(db); + + // Filter by source_id if specified + if (params.source_id >= 0) { + auto it = std::remove_if(sources.begin(), sources.end(), + [params](const RagSource& s) { return s.source_id != params.source_id; }); + sources.erase(it, sources.end()); + } + + if (sources.empty()) { + std::cerr << "No enabled sources found"; + if (params.source_id >= 0) { + std::cerr << " for source_id=" << params.source_id; + } + std::cerr << ".\n"; + curl_global_cleanup(); + return 1; + } + + // Use the first source's embedding config + RagSource& source = sources[0]; + + if (source.embedding_json.empty()) { + std::cerr << "Error: Embeddings not configured for source " << source.source_id << "\n"; + curl_global_cleanup(); + return 1; + } + + EmbeddingConfig emb_cfg = parse_embedding_json(source.embedding_json); + if (!emb_cfg.enabled) { + std::cerr << "Error: Embeddings not enabled for source " << source.source_id << "\n"; + curl_global_cleanup(); + return 1; + } + + std::cout << "Generating embedding for query using: " << emb_cfg.provider << "\n"; + + // Build embedding provider + auto embedder = build_embedding_provider(emb_cfg); + + // Generate embedding for query + std::vector query_inputs = {params.query_text}; + std::vector> query_embeddings = embedder->embed(query_inputs, emb_cfg.dim); + + if (query_embeddings.empty() || query_embeddings[0].empty()) { + std::cerr << "Error: Failed to generate embedding for query\n"; + curl_global_cleanup(); + return 1; + } + + // Convert embedding to hex string for vec0 MATCH + std::string query_hex; + query_hex.reserve(query_embeddings[0].size() * 8); + for (float f : query_embeddings[0]) { + query_hex += float_to_hex_blob(f); + } + + // Build search query + // vec0 knn requires subquery approach: MATCH (SELECT ... LIMIT 1) AND k = ? + std::string source_filter; + if (params.source_id >= 0) { + source_filter = "AND c.source_id = " + std::to_string(params.source_id); + } + + // Use subquery with VALUES to provide the query embedding + // This creates a temporary single-row result with the query embedding + std::string search_sql = + "SELECT c.chunk_id, c.source_id, SUBSTR(c.body, 1, 200) as content, " + "v.distance, d.title " + "FROM rag_vec_chunks v " + "JOIN rag_chunks c ON c.chunk_id = v.chunk_id " + "JOIN rag_documents d ON d.doc_id = c.doc_id " + "WHERE v.embedding MATCH (" + " SELECT X'" + query_hex + "' AS embedding" + ") AND k = " + std::to_string(params.limit) + " " + + source_filter + " " + "ORDER BY v.distance"; + + // Execute search + MYSQL_RES* result = db.query(search_sql.c_str()); + if (result) { + MYSQL_ROW row; + int row_count = 0; + while ((row = mysql_fetch_row(result))) { + std::cout << "\n--- Result " << (++row_count) << " ---\n"; + unsigned long* lengths = mysql_fetch_lengths(result); + int field_count = mysql_num_fields(result); + for (int i = 0; i < field_count; i++) { + MYSQL_FIELD* field = mysql_fetch_field_direct(result, i); + if (row[i]) { + std::cout << field->name << ": " << row[i] << "\n"; + } + } + } + mysql_free_result(result); + + if (row_count == 0) { + std::cout << "No results found.\n"; + } else { + std::cout << "\nFound " << row_count << " result(s).\n"; + } + } + + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << "\n"; + curl_global_cleanup(); + return 1; + } catch (...) { + std::cerr << "Unknown exception\n"; + curl_global_cleanup(); + return 1; + } + + curl_global_cleanup(); + return 0; + } + + // Unknown command + print_usage(argv[0]); + return 2; } diff --git a/lib/proxy_sqlite3_symbols.cpp b/lib/proxy_sqlite3_symbols.cpp index 600c8a116..573ab3eca 100644 --- a/lib/proxy_sqlite3_symbols.cpp +++ b/lib/proxy_sqlite3_symbols.cpp @@ -50,9 +50,17 @@ int (*proxy_sqlite3_prepare_v2)(sqlite3*, const char*, int, sqlite3_stmt**, cons int (*proxy_sqlite3_open_v2)(const char*, sqlite3**, int, const char*) = sqlite3_open_v2; int (*proxy_sqlite3_exec)(sqlite3*, const char*, int (*)(void*,int,char**,char**), void*, char**) = sqlite3_exec; -// Optional hooks used by sqlite-vec (function pointers will be set by LoadPlugin or remain NULL) -void (*proxy_sqlite3_vec_init)(sqlite3*, char**, const sqlite3_api_routines*) = NULL; -void (*proxy_sqlite3_rembed_init)(sqlite3*, char**, const sqlite3_api_routines*) = NULL; +// Optional hooks used by sqlite-vec and sqlite-rembed +// These are statically linked extensions that need to be initialized + +// Forward declarations for the extension init functions +extern "C" int sqlite3_vec_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi); +extern "C" int sqlite3_rembed_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi); + +// Initialize the extension pointers to the statically linked functions +// These allow Admin_Bootstrap.cpp to register them as auto-extensions +int (*proxy_sqlite3_vec_init)(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) = sqlite3_vec_init; +int (*proxy_sqlite3_rembed_init)(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) = sqlite3_rembed_init; // Internal helpers used by admin stats batching; keep defaults as NULL