diff --git a/RAG_POC/Makefile b/RAG_POC/Makefile index 36402f56e..d97133a68 100644 --- a/RAG_POC/Makefile +++ b/RAG_POC/Makefile @@ -6,14 +6,18 @@ ROOT_DIR := .. INCLUDES := \ -I$(ROOT_DIR)/deps/json \ -I$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/include \ - -I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 + -I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 \ + -I$(ROOT_DIR)/deps/curl/curl/include LIBDIRS := \ -L$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/libmariadb SQLITE3_OBJ := $(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400/sqlite3.o -LIBS := -lmariadbclient -lssl -lcrypto -lcrypt -ldl -lpthread +# Use static libcurl +CURL_STATIC_LIB := $(ROOT_DIR)/deps/curl/curl/lib/.libs/libcurl.a + +LIBS := -lmariadbclient -lssl -lcrypto -lcrypt -ldl -lpthread $(CURL_STATIC_LIB) -lz TARGET := rag_ingest SOURCES := rag_ingest.cpp diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index df30e3556..9c8b628ca 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -1,101 +1,224 @@ -// rag_ingest.cpp -// -// ------------------------------------------------------------ -// ProxySQL RAG Ingestion PoC (General-Purpose) -// ------------------------------------------------------------ -// -// What this program does (v0): -// 1) Opens the SQLite "RAG index" database (schema.sql must already be applied). -// 2) Reads enabled sources from rag_sources. -// 3) For each source: -// - Connects to MySQL (for now). -// - Builds a SELECT that fetches only needed columns. -// - For each row: -// * Builds doc_id / title / body / metadata_json using doc_map_json. -// * Chunks body using chunking_json. -// * Inserts into: -// rag_documents -// rag_chunks -// rag_fts_chunks (FTS5 contentless table) -// * Optionally builds embedding input text using embedding_json and inserts -// embeddings into rag_vec_chunks (sqlite3-vec) via a stub embedding provider. -// - Skips docs that already exist (v0 requirement). -// -// Later (v1+): -// - Add rag_sync_state usage for incremental ingestion (watermark/CDC). -// - Add hashing to detect changed docs/chunks and update/reindex accordingly. -// - Replace the embedding stub with a real embedding generator. -// -// ------------------------------------------------------------ -// Dependencies -// ------------------------------------------------------------ -// - sqlite3 -// - MySQL client library (mysqlclient / libmysqlclient) -// - nlohmann/json (single header json.hpp) -// -// Build example (Linux/macOS): -// g++ -std=c++17 -O2 rag_ingest.cpp -o rag_ingest \ -// -lsqlite3 -lmysqlclient -// -// Usage: -// ./rag_ingest /path/to/rag_index.sqlite -// -// Notes: -// - This is a blueprint-grade PoC, written to be readable and modifiable. -// - It uses a conservative JSON mapping language so ingestion is deterministic. -// - It avoids advanced C++ patterns on purpose. -// -// ------------------------------------------------------------ -// Supported JSON Specs -// ------------------------------------------------------------ -// -// doc_map_json (required): -// { -// "doc_id": { "format": "posts:{Id}" }, -// "title": { "concat": [ {"col":"Title"} ] }, -// "body": { "concat": [ {"col":"Body"} ] }, -// "metadata": { -// "pick": ["Id","Tags","Score","CreaionDate"], -// "rename": {"CreaionDate":"CreationDate"} -// } -// } -// -// chunking_json (required, v0 chunks doc "body" only): -// { -// "enabled": true, -// "unit": "chars", // v0 supports "chars" only -// "chunk_size": 4000, -// "overlap": 400, -// "min_chunk_size": 800 -// } -// -// embedding_json (optional): -// { -// "enabled": true, -// "dim": 1536, -// "model": "text-embedding-3-large", // informational -// "input": { "concat": [ -// {"col":"Title"}, -// {"lit":"\nTags: "}, {"col":"Tags"}, -// {"lit":"\n\n"}, -// {"chunk_body": true} -// ]} -// } -// -// ------------------------------------------------------------ -// sqlite3-vec binding note -// ------------------------------------------------------------ -// sqlite3-vec "vec0(embedding float[N])" generally expects a vector value. -// The exact binding format can vary by build/config of sqlite3-vec. -// This program includes a "best effort" binder that binds a float array as a BLOB. -// If your sqlite3-vec build expects a different representation (e.g. a function to -// pack vectors), adapt bind_vec_embedding() accordingly. -// ------------------------------------------------------------ - -#include -#include -#include -#include +/** + * @file rag_ingest.cpp + * @brief ProxySQL RAG (Retrieval-Augmented Generation) Ingestion Tool + * + * @verbatim + * ProxySQL RAG Ingestion PoC (General-Purpose) + * @endverbatim + * + * @section overview Overview + * + * This program is a general-purpose ingestion tool for ProxySQL's RAG index. + * It reads data from external sources (currently MySQL), transforms it according + * to configurable JSON specifications, chunks the content, builds full-text + * search indexes, and optionally generates vector embeddings for semantic search. + * + * @section v0_features v0 Features + * + * - Reads enabled sources from rag_sources table + * - Connects to MySQL backend and fetches data using configurable SELECT queries + * - Transforms rows using doc_map_json specification + * - Chunks document bodies using configurable chunking parameters + * - Inserts into rag_documents, rag_chunks, rag_fts_chunks (FTS5) + * - Optionally generates embeddings and inserts into rag_vec_chunks (sqlite3-vec) + * - Skips documents that already exist (no upsert in v0) + * - Supports incremental sync using watermark-based cursor tracking + * + * @section future_plans Future Plans (v1+) + * + * - Add hash-based change detection for efficient updates + * - Support document updates (not just skip-if-exists) + * - Add PostgreSQL backend support + * - Add async embedding workers + * - Add operational metrics and monitoring + * + * @section dependencies Dependencies + * + * - sqlite3: For RAG index storage + * - mysqlclient / libmysqlclient: For MySQL backend connections + * - libcurl: For HTTP-based embedding providers (OpenAI-compatible) + * - nlohmann/json: Single-header JSON library (json.hpp) + * - libcrypt: For sha256_crypt_r weak alias (platform compatibility) + * + * @section building Building + * + * @verbatim + * g++ -std=c++17 -O2 rag_ingest.cpp -o rag_ingest \ + * -lsqlite3 -lmysqlclient -lcurl -lcrypt + * @endverbatim + * + * @section usage Usage + * + * @verbatim + * ./rag_ingest /path/to/rag_index.sqlite + * @endverbatim + * + * The RAG_VEC0_EXT environment variable can be set to specify the path + * to the sqlite3-vec extension: + * + * @verbatim + * export RAG_VEC0_EXT=/usr/local/lib/sqlite3-vec.so + * ./rag_ingest /path/to/rag_index.sqlite + * @endverbatim + * + * @section architecture Architecture + * + * @subsection ingestion_flow Ingestion Flow + * + *
+ * 1. Open SQLite RAG index database
+ * 2. Load sqlite3-vec extension (if RAG_VEC0_EXT is set)
+ * 3. Load enabled sources from rag_sources table
+ * 4. For each source:
+ *    a. Parse chunking_json and embedding_json configurations
+ *    b. Load sync cursor (watermark) from rag_sync_state
+ *    c. Connect to MySQL backend
+ *    d. Build minimal SELECT query (only fetch needed columns)
+ *    e. Add incremental filter based on watermark
+ *    f. For each row:
+ *       i. Build document using doc_map_json specification
+ *       ii. Check if doc_id already exists (skip if yes)
+ *       iii. Insert document into rag_documents
+ *       iv. Chunk the document body
+ *       v. For each chunk:
+ *          - Insert into rag_chunks
+ *          - Insert into rag_fts_chunks (FTS5)
+ *          - If embedding enabled: generate and insert embedding
+ *    g. Update sync cursor with max watermark value
+ * 5. Commit transaction or rollback on error
+ * 
+ * + * @subsection json_specs JSON Configuration Specifications + * + * @subsubsection doc_map_json doc_map_json (Required) + * + * Defines how to transform a source row into a canonical document: + * + * @verbatim + * { + * "doc_id": { "format": "posts:{Id}" }, + * "title": { "concat": [ {"col":"Title"} ] }, + * "body": { "concat": [ {"col":"Body"} ] }, + * "metadata": { + * "pick": ["Id","Tags","Score","CreationDate"], + * "rename": {"CreationDate":"Created"} + * } + * } + * @endverbatim + * + * Fields: + * - doc_id.format: Template string with {ColumnName} placeholders + * - title.concat: Array of column references and literals + * - body.concat: Array of column references and literals + * - metadata.pick: Columns to include in metadata JSON + * - metadata.rename: Map of old_name -> new_name for metadata keys + * + * @subsubsection chunking_json chunking_json (Required) + * + * Defines how to split document bodies into chunks: + * + * @verbatim + * { + * "enabled": true, + * "unit": "chars", // v0 only supports "chars" + * "chunk_size": 4000, // Target chunk size in characters + * "overlap": 400, // Overlap between consecutive chunks + * "min_chunk_size": 800 // Minimum chunk size (avoid tiny tail chunks) + * } + * @endverbatim + * + * @subsubsection embedding_json embedding_json (Optional) + * + * Defines how to generate embeddings for chunks: + * + * @verbatim + * { + * "enabled": true, + * "dim": 1536, + * "model": "text-embedding-3-large", + * "provider": "openai", // "stub" or "openai" + * "api_base": "https://api.openai.com/v1", + * "api_key": "sk-...", + * "batch_size": 16, + * "timeout_ms": 20000, + * "input": { "concat": [ + * {"col":"Title"}, + * {"lit":"\nTags: "}, {"col":"Tags"}, + * {"lit":"\n\n"}, + * {"chunk_body": true} + * ]} + * } + * @endverbatim + * + * The concat array supports: + * - {"col":"ColumnName"}: Include column value + * - {"lit":"literal text"}: Include literal string + * - {"chunk_body":true}: Include the chunk body text + * + * @subsection concat_spec Concat Specification + * + * The concat specification is used in multiple places (title, body, embedding input). + * It's a JSON array of objects, each describing one part to concatenate: + * + * @verbatim + * [ + * {"col": "Title"}, // Use value from "Title" column + * {"lit": "\nTags: "}, // Use literal string + * {"col": "Tags"}, // Use value from "Tags" column + * {"lit": "\n\n"}, + * {"chunk_body": true} // Use chunk body (embedding only) + * ] + * @endverbatim + * + * @subsection sqlite3_vec sqlite3-vec Integration + * + * The sqlite3-vec extension provides vector similarity search capabilities. + * This program binds float32 arrays as BLOBs for the embedding column. + * + * The binding format may vary by sqlite3-vec build. If your build expects + * a different format, modify bind_vec_embedding() accordingly. + * + * @subsection sync_cursor Sync Cursor (Watermark) + * + * The sync cursor enables incremental ingestion by tracking the last processed + * value from a monotonic column (e.g., auto-increment ID, timestamp). + * + * Stored in rag_sync_state.cursor_json: + * @verbatim + * { + * "column": "Id", // Column name for watermark + * "value": 12345 // Last processed value + * } + * @endverbatim + * + * On next run, only rows WHERE column > value are fetched. + * + * @section error_handling Error Handling + * + * This program uses a "fail-fast" approach: + * - JSON parsing errors are fatal (invalid configuration) + * - Database connection errors are fatal + * - SQL execution errors are fatal + * - Errors during ingestion rollback the entire transaction + * + * All fatal errors call fatal() which prints the message and exits with code 1. + * + * @section threading Threading Model + * + * v0 is single-threaded. Future versions may support: + * - Concurrent source ingestion + * - Async embedding generation + * - Parallel chunk processing + * + * @author ProxySQL Development Team + * @version 0.1.0 + * @date 2024 + */ + +#include "sqlite3.h" +#include "mysql.h" +#include "crypt.h" +#include "curl/curl.h" #include #include @@ -111,6 +234,23 @@ #include "json.hpp" using json = nlohmann::json; +/** + * @brief Weak alias for sha256_crypt_r function + * + * This weak symbol alias provides compatibility across platforms where + * sha256_crypt_r may or may not be available in libcrypt. If the + * symbol exists in libcrypt, it will be used. Otherwise, this + * implementation using crypt_r() will be linked. + * + * @param key The key/password to hash + * @param salt The salt string + * @param buffer Output buffer for the hashed result + * @param buflen Size of the output buffer + * @return Pointer to hashed result on success, nullptr on failure + * + * @note This is a workaround for library inconsistencies across platforms. + * The actual hashing is delegated to crypt_r(). + */ extern "C" __attribute__((weak)) char *sha256_crypt_r( const char *key, const char *salt, @@ -133,19 +273,51 @@ extern "C" __attribute__((weak)) char *sha256_crypt_r( return buffer; } -// ------------------------- -// Small helpers -// ------------------------- - +// =========================================================================== +// Utility Functions +// =========================================================================== + +/** + * @brief Print fatal error message and exit + * + * This function prints an error message to stderr and terminates the + * program with exit code 1. It is used for unrecoverable errors where + * continuing execution is impossible or unsafe. + * + * @param msg The error message to print + * + * @note This function does not return. It calls std::exit(1). + */ static void fatal(const std::string& msg) { std::cerr << "FATAL: " << msg << "\n"; std::exit(1); } +/** + * @brief Safely convert C string to std::string + * + * Converts a potentially null C string pointer to a std::string. + * If the pointer is null, returns an empty string. This prevents + * undefined behavior from constructing std::string(nullptr). + * + * @param p Pointer to C string (may be null) + * @return std::string containing the C string content, or empty string if p is null + */ static std::string str_or_empty(const char* p) { return p ? std::string(p) : std::string(); } +/** + * @brief Execute a SQL statement on SQLite + * + * Executes a SQL statement that does not return rows (e.g., INSERT, + * UPDATE, DELETE, PRAGMA). If an error occurs, it prints the error + * message and SQL to stderr. + * + * @param db SQLite database connection + * @param sql SQL statement to execute + * @return int SQLite return code (SQLITE_OK on success, error code otherwise) + */ static int sqlite_exec(sqlite3* db, const std::string& sql) { char* err = nullptr; int rc = sqlite3_exec(db, sql.c_str(), nullptr, nullptr, &err); @@ -157,6 +329,15 @@ static int sqlite_exec(sqlite3* db, const std::string& sql) { return rc; } +/** + * @brief Check if a string represents an integer + * + * Determines if the given string consists only of digits, optionally + * preceded by a minus sign. Used for watermark value type detection. + * + * @param s String to check + * @return true if string is a valid integer representation, false otherwise + */ static bool is_integer_string(const std::string& s) { if (s.empty()) return false; size_t i = 0; @@ -170,6 +351,17 @@ static bool is_integer_string(const std::string& s) { return true; } +/** + * @brief Escape single quotes in SQL string literals + * + * Doubles single quotes in a string for safe SQL literal construction. + * This is used for building incremental filter conditions. + * + * @param s Input string to escape + * @return std::string with single quotes doubled (e.g., "it's" -> "it''s") + * + * @note This is basic escaping. For user input, prefer parameter binding. + */ static std::string sql_escape_single_quotes(const std::string& s) { std::string out; out.reserve(s.size() + 8); @@ -180,11 +372,31 @@ static std::string sql_escape_single_quotes(const std::string& s) { return out; } +/** + * @brief Serialize JSON to compact string + * + * Converts a JSON object to a compact string representation without + * whitespace or pretty-printing. Used for efficient storage. + * + * @param j JSON object to serialize + * @return std::string Compact JSON representation + */ static std::string json_dump_compact(const json& j) { - // Compact output (no pretty printing) to keep storage small. return j.dump(); } +/** + * @brief Load sqlite3-vec extension if configured + * + * Loads the sqlite3-vec extension from the path specified in the + * RAG_VEC0_EXT environment variable. This is required for vector + * similarity search functionality. + * + * @param db SQLite database connection + * + * @note If RAG_VEC0_EXT is not set or empty, this function does nothing. + * The vec0 extension is only required when embedding generation is enabled. + */ static void sqlite_load_vec_extension(sqlite3* db) { const char* ext = std::getenv("RAG_VEC0_EXT"); if (!ext || std::strlen(ext) == 0) return; @@ -199,69 +411,111 @@ static void sqlite_load_vec_extension(sqlite3* db) { } } -// ------------------------- -// Data model -// ------------------------- +// =========================================================================== +// Data Structures +// =========================================================================== +/** + * @brief Configuration for a single RAG data source + * + * Represents all configuration needed to ingest data from a single + * external source into the RAG index. Loaded from the rag_sources table. + */ struct RagSource { - int source_id = 0; - std::string name; - int enabled = 0; - - // backend connection - std::string backend_type; // "mysql" for now - std::string host; - int port = 3306; - std::string user; - std::string pass; - std::string db; - - // table - std::string table_name; - std::string pk_column; - std::string where_sql; // optional - - // transformation config - json doc_map_json; - json chunking_json; - json embedding_json; // optional; may be null/object + int source_id = 0; ///< Unique source identifier from database + std::string name; ///< Human-readable source name + int enabled = 0; ///< Whether this source is enabled (0/1) + + // Backend connection settings + std::string backend_type; ///< Type of backend ("mysql" in v0) + std::string host; ///< Backend server hostname or IP + int port = 3306; ///< Backend server port + std::string user; ///< Backend authentication username + std::string pass; ///< Backend authentication password + std::string db; ///< Backend database name + + // Source table settings + std::string table_name; ///< Name of the table to read from + std::string pk_column; ///< Primary key column name + std::string where_sql; ///< Optional WHERE clause for row filtering + + // Transformation configuration (JSON) + json doc_map_json; ///< Document transformation specification + json chunking_json; ///< Chunking configuration + json embedding_json; ///< Embedding configuration (may be null) }; +/** + * @brief Parsed chunking configuration + * + * Controls how document bodies are split into chunks for indexing. + * Chunks improve retrieval precision by matching smaller, more + * focused text segments. + */ struct ChunkingConfig { - bool enabled = true; - std::string unit = "chars"; // v0 only supports chars - int chunk_size = 4000; - int overlap = 400; - int min_chunk_size = 800; + bool enabled = true; ///< Whether chunking is enabled + std::string unit = "chars"; ///< Unit of measurement ("chars" in v0) + int chunk_size = 4000; ///< Target size of each chunk + int overlap = 400; ///< Overlap between consecutive chunks + int min_chunk_size = 800; ///< Minimum size to avoid tiny tail chunks }; +/** + * @brief Parsed embedding configuration + * + * Controls how vector embeddings are generated for chunks. + * Embeddings enable semantic similarity search. + */ struct EmbeddingConfig { - bool enabled = false; - int dim = 1536; - std::string model = "unknown"; - json input_spec; // expects {"concat":[...]} - std::string provider = "stub"; // stub | openai - std::string api_base; - std::string api_key; - int batch_size = 16; - int timeout_ms = 20000; + bool enabled = false; ///< Whether embedding generation is enabled + int dim = 1536; ///< Vector dimension (model-specific) + std::string model = "unknown"; ///< Model name (for observability) + json input_spec; ///< Embedding input template {"concat":[...]} + std::string provider = "stub"; ///< Provider type: "stub" or "openai" + std::string api_base; ///< API endpoint URL (for "openai" provider) + std::string api_key; ///< API authentication key + int batch_size = 16; ///< Maximum inputs per API call (unused in v0) + int timeout_ms = 20000; ///< Request timeout in milliseconds }; +/** + * @brief Sync cursor for incremental ingestion + * + * Tracks the last processed watermark value for incremental sync. + * Only rows with watermark > cursor.value are fetched on subsequent runs. + */ struct SyncCursor { - std::string column; - bool has_value = false; - bool numeric = false; - std::int64_t num_value = 0; - std::string str_value; + std::string column; ///< Column name used for watermark + bool has_value = false; ///< Whether a cursor value has been set + bool numeric = false; ///< Whether the value is numeric (vs string) + std::int64_t num_value = 0; ///< Numeric watermark value (if numeric=true) + std::string str_value; ///< String watermark value (if numeric=false) }; -// A row fetched from MySQL, as a name->string map. +/** + * @brief Row representation from MySQL backend + * + * A map from column name to string value. All MySQL values are + * represented as strings for simplicity; conversion happens + * during document building. + */ typedef std::unordered_map RowMap; -// ------------------------- -// JSON parsing -// ------------------------- - +// =========================================================================== +// JSON Parsing Functions +// =========================================================================== + +/** + * @brief Parse chunking_json into ChunkingConfig struct + * + * Extracts and validates chunking configuration from JSON. + * Applies sensible defaults for missing or invalid values. + * + * @param j JSON object containing chunking configuration + * @return ChunkingConfig Parsed configuration with defaults applied + * + * @note Only "chars" unit is supported in v0. Other units fall back to "chars". + */ static ChunkingConfig parse_chunking_json(const json& j) { ChunkingConfig cfg; if (!j.is_object()) return cfg; @@ -272,6 +526,7 @@ static ChunkingConfig parse_chunking_json(const json& j) { if (j.contains("overlap")) cfg.overlap = j["overlap"].get(); if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get(); + // Apply sanity checks and defaults if (cfg.chunk_size <= 0) cfg.chunk_size = 4000; if (cfg.overlap < 0) cfg.overlap = 0; if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4; @@ -287,6 +542,15 @@ static ChunkingConfig parse_chunking_json(const json& j) { return cfg; } +/** + * @brief Parse embedding_json into EmbeddingConfig struct + * + * Extracts and validates embedding configuration from JSON. + * Applies sensible defaults for missing or invalid values. + * + * @param j JSON object containing embedding configuration (may be null) + * @return EmbeddingConfig Parsed configuration with defaults applied + */ static EmbeddingConfig parse_embedding_json(const json& j) { EmbeddingConfig cfg; if (!j.is_object()) return cfg; @@ -301,27 +565,53 @@ static EmbeddingConfig parse_embedding_json(const json& j) { if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get(); if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get(); + // Apply defaults if (cfg.dim <= 0) cfg.dim = 1536; if (cfg.batch_size <= 0) cfg.batch_size = 16; if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000; return cfg; } -// ------------------------- -// Row access -// ------------------------- - +// =========================================================================== +// Row Access Helpers +// =========================================================================== + +/** + * @brief Get value from RowMap by key + * + * Safely retrieves a value from a RowMap (column name -> value mapping). + * Returns std::nullopt if the key is not present, allowing the caller + * to distinguish between missing keys and empty values. + * + * @param row The RowMap to query + * @param key Column name to look up + * @return std::optional Value if present, nullopt otherwise + */ static std::optional row_get(const RowMap& row, const std::string& key) { auto it = row.find(key); if (it == row.end()) return std::nullopt; return it->second; } -// ------------------------- -// doc_id.format implementation -// ------------------------- -// Replaces occurrences of {ColumnName} with the value from the row map. -// Example: "posts:{Id}" -> "posts:12345" +// =========================================================================== +// Format String Template Engine +// =========================================================================== + +/** + * @brief Apply format template with row data substitution + * + * Replaces {ColumnName} placeholders in a format string with actual + * values from the row. Used for generating document IDs. + * + * Example: "posts:{Id}" with row["Id"] = "123" becomes "posts:123" + * + * @param fmt Format string with {ColumnName} placeholders + * @param row Row data for substitution + * @return std::string Formatted string with substitutions applied + * + * @note Unmatched '{' characters are treated as literals. + * Missing column names result in empty substitution. + */ static std::string apply_format(const std::string& fmt, const RowMap& row) { std::string out; out.reserve(fmt.size() + 32); @@ -346,14 +636,27 @@ static std::string apply_format(const std::string& fmt, const RowMap& row) { return out; } -// ------------------------- -// concat spec implementation -// ------------------------- -// Supported elements in concat array: -// {"col":"Title"} -> append row["Title"] if present -// {"lit":"\n\n"} -> append literal -// {"chunk_body": true} -> append chunk body (only in embedding_json input) -// +// =========================================================================== +// Concat Specification Evaluator +// =========================================================================== + +/** + * @brief Evaluate concat specification to build string + * + * Processes a concat specification (JSON array) to build a string by + * concatenating column values, literals, and optionally the chunk body. + * + * Supported concat elements: + * - {"col":"ColumnName"} - Include value from column + * - {"lit":"literal"} - Include literal string + * - {"chunk_body":true} - Include chunk body (only for embedding input) + * + * @param concat_spec JSON array with concat specification + * @param row Row data for column substitutions + * @param chunk_body Chunk body text (used if allow_chunk_body=true) + * @param allow_chunk_body Whether to allow chunk_body references + * @return std::string Concatenated result + */ static std::string eval_concat(const json& concat_spec, const RowMap& row, const std::string& chunk_body, @@ -365,12 +668,15 @@ static std::string eval_concat(const json& concat_spec, if (!part.is_object()) continue; if (part.contains("col")) { + // Column reference: {"col":"ColumnName"} std::string col = part["col"].get(); auto v = row_get(row, col); if (v.has_value()) out += v.value(); } else if (part.contains("lit")) { + // Literal string: {"lit":"some text"} out += part["lit"].get(); } else if (allow_chunk_body && part.contains("chunk_body")) { + // Chunk body reference: {"chunk_body":true} bool yes = part["chunk_body"].get(); if (yes) out += chunk_body; } @@ -378,16 +684,33 @@ static std::string eval_concat(const json& concat_spec, return out; } -// ------------------------- -// metadata builder -// ------------------------- -// metadata spec: -// "metadata": { "pick":[...], "rename":{...} } +// =========================================================================== +// Metadata Builder +// =========================================================================== + +/** + * @brief Build metadata JSON from row using specification + * + * Creates a metadata JSON object by picking specified columns from + * the row and optionally renaming keys. + * + * @param meta_spec Metadata specification with "pick" and "rename" keys + * @param row Source row data + * @return json Metadata object with picked and renamed fields + * + * The meta_spec format: + * @verbatim + * { + * "pick": ["Col1", "Col2"], // Columns to include + * "rename": {"Col1": "col1"} // Old name -> new name mapping + * } + * @endverbatim + */ static json build_metadata(const json& meta_spec, const RowMap& row) { json meta = json::object(); if (meta_spec.is_object()) { - // pick fields + // Pick specified fields if (meta_spec.contains("pick") && meta_spec["pick"].is_array()) { for (const auto& colv : meta_spec["pick"]) { if (!colv.is_string()) continue; @@ -397,7 +720,7 @@ static json build_metadata(const json& meta_spec, const RowMap& row) { } } - // rename keys + // Rename keys (must be done in two passes to avoid iterator invalidation) if (meta_spec.contains("rename") && meta_spec["rename"].is_object()) { std::vector> renames; for (auto it = meta_spec["rename"].begin(); it != meta_spec["rename"].end(); ++it) { @@ -418,10 +741,25 @@ static json build_metadata(const json& meta_spec, const RowMap& row) { return meta; } -// ------------------------- -// Chunking (chars-based) -// ------------------------- - +// =========================================================================== +// Text Chunking +// =========================================================================== + +/** + * @brief Split text into chunks based on character count + * + * Divides a text string into overlapping chunks of approximately + * chunk_size characters. Chunks overlap by 'overlap' characters to + * preserve context at boundaries. Tiny final chunks are appended + * to the previous chunk to avoid fragmentation. + * + * @param text Input text to chunk + * @param cfg Chunking configuration + * @return std::vector Vector of chunk strings + * + * @note If chunking is disabled, returns a single chunk containing the full text. + * @note If text is smaller than chunk_size, returns a single chunk. + */ static std::vector chunk_text_chars(const std::string& text, const ChunkingConfig& cfg) { std::vector chunks; @@ -458,10 +796,22 @@ static std::vector chunk_text_chars(const std::string& text, const return chunks; } -// ------------------------- -// MySQL helpers -// ------------------------- - +// =========================================================================== +// MySQL Backend Functions +// =========================================================================== + +/** + * @brief Connect to MySQL backend or terminate + * + * Establishes a connection to a MySQL server using the provided + * source configuration. Sets charset to utf8mb4 for proper handling + * of Unicode content (e.g., emojis, international text). + * + * @param s Source configuration with connection parameters + * @return MYSQL* MySQL connection handle + * + * @note On failure, prints error message and calls fatal() to exit. + */ static MYSQL* mysql_connect_or_die(const RagSource& s) { MYSQL* conn = mysql_init(nullptr); if (!conn) fatal("mysql_init failed"); @@ -484,6 +834,18 @@ static MYSQL* mysql_connect_or_die(const RagSource& s) { return conn; } +/** + * @brief Convert MySQL result row to RowMap + * + * Transforms a raw MySQL row into a column-name -> value map. + * All values are converted to strings; NULL values become empty strings. + * + * @param res MySQL result set (for field metadata) + * @param row Raw MySQL row data + * @return RowMap Map of column names to string values + * + * @note Uses str_or_empty() to safely handle NULL column values. + */ static RowMap mysql_row_to_map(MYSQL_RES* res, MYSQL_ROW row) { RowMap m; unsigned int n = mysql_num_fields(res); @@ -499,8 +861,19 @@ static RowMap mysql_row_to_map(MYSQL_RES* res, MYSQL_ROW row) { return m; } -// Collect columns used by doc_map_json + embedding_json so SELECT is minimal. -// v0: we intentionally keep this conservative (include pk + all referenced col parts + metadata.pick). +// =========================================================================== +// Column Collection +// =========================================================================== + +/** + * @brief Add string to vector if not already present + * + * Helper function to maintain a list of unique column names. + * Used when building the SELECT query to avoid duplicate columns. + * + * @param cols Vector of column names (modified in-place) + * @param c Column name to add (if not present) + */ static void add_unique(std::vector& cols, const std::string& c) { for (size_t i = 0; i < cols.size(); i++) { if (cols[i] == c) return; @@ -508,6 +881,15 @@ static void add_unique(std::vector& cols, const std::string& c) { cols.push_back(c); } +/** + * @brief Extract column names from concat specification + * + * Parses a concat specification and adds all referenced column names + * to the output vector. Used to build the minimal SELECT query. + * + * @param cols Vector of column names (modified in-place) + * @param concat_spec JSON concat specification to scan + */ static void collect_cols_from_concat(std::vector& cols, const json& concat_spec) { if (!concat_spec.is_array()) return; for (const auto& part : concat_spec) { @@ -517,6 +899,20 @@ static void collect_cols_from_concat(std::vector& cols, const json& } } +/** + * @brief Collect all columns needed for source ingestion + * + * Analyzes doc_map_json and embedding_json to determine which columns + * must be fetched from the source table. Builds a minimal SELECT query + * by only including required columns. + * + * @param s Source configuration with JSON specifications + * @param ecfg Embedding configuration + * @return std::vector List of required column names + * + * @note The primary key column is always included. + * @note Columns in doc_id.format must be manually included in metadata.pick or concat. + */ static std::vector collect_needed_columns(const RagSource& s, const EmbeddingConfig& ecfg) { std::vector cols; add_unique(cols, s.pk_column); @@ -540,12 +936,21 @@ static std::vector collect_needed_columns(const RagSource& s, const collect_cols_from_concat(cols, ecfg.input_spec["concat"]); } - // doc_id.format: we do not try to parse all placeholders; best practice is doc_id uses pk only. - // If you want doc_id.format to reference other columns, include them in metadata.pick or concat. - return cols; } +/** + * @brief Build SELECT SQL query for source + * + * Constructs a SELECT query that fetches only the required columns + * from the source table, with optional WHERE clause combining the + * source's where_sql and an incremental filter. + * + * @param s Source configuration + * @param cols List of column names to SELECT + * @param extra_filter Additional WHERE clause (e.g., incremental filter) + * @return std::string Complete SELECT SQL statement + */ static std::string build_select_sql(const RagSource& s, const std::vector& cols, const std::string& extra_filter) { @@ -566,9 +971,24 @@ static std::string build_select_sql(const RagSource& s, return sql; } -static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql); -static void sqlite_bind_text(sqlite3_stmt* st, int idx, const std::string& v); +// =========================================================================== +// Sync Cursor (Watermark) Management +// =========================================================================== +// Forward declarations +static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql); +static void bind_text(sqlite3_stmt* st, int idx, const std::string& v); + +/** + * @brief Load sync cursor JSON from database + * + * Retrieves the cursor_json for a source from rag_sync_state. + * Returns empty object if no cursor exists or on error. + * + * @param db SQLite database connection + * @param source_id Source identifier + * @return json Cursor JSON object (empty if not found) + */ static json load_sync_cursor_json(sqlite3* db, int source_id) { sqlite3_stmt* st = nullptr; json out = json::object(); @@ -593,6 +1013,16 @@ static json load_sync_cursor_json(sqlite3* db, int source_id) { return out; } +/** + * @brief Parse sync cursor JSON into SyncCursor struct + * + * Extracts cursor configuration from JSON and determines the + * value type (numeric vs string). Used for incremental ingestion. + * + * @param cursor_json JSON cursor configuration + * @param default_col Default column name if not specified + * @return SyncCursor Parsed cursor configuration + */ static SyncCursor parse_sync_cursor(const json& cursor_json, const std::string& default_col) { SyncCursor c; c.column = default_col; @@ -623,6 +1053,17 @@ static SyncCursor parse_sync_cursor(const json& cursor_json, const std::string& return c; } +/** + * @brief Build incremental filter SQL from cursor + * + * Constructs a WHERE clause fragment for incremental ingestion. + * Returns empty string if cursor has no value set. + * + * @param c Sync cursor configuration + * @return std::string SQL filter (e.g., "`Id` > 123") or empty string + * + * @note String values are escaped to prevent SQL injection. + */ static std::string build_incremental_filter(const SyncCursor& c) { if (!c.has_value || c.column.empty()) return ""; std::string col = "`" + c.column + "`"; @@ -632,6 +1073,18 @@ static std::string build_incremental_filter(const SyncCursor& c) { return col + " > '" + sql_escape_single_quotes(c.str_value) + "'"; } +/** + * @brief Update sync state in database + * + * Upserts the cursor state into rag_sync_state after successful + * (or partial) ingestion. Updates the watermark value and last_ok_at. + * + * @param db SQLite database connection + * @param source_id Source identifier + * @param cursor_json Updated cursor JSON to store + * + * @note On error, calls fatal() to terminate the program. + */ static void update_sync_state(sqlite3* db, int source_id, const json& cursor_json) { const char* sql = "INSERT INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) " @@ -642,7 +1095,7 @@ static void update_sync_state(sqlite3* db, int source_id, const json& cursor_jso sqlite_prepare_or_die(db, &st, sql); sqlite3_bind_int(st, 1, source_id); std::string cursor_str = json_dump_compact(cursor_json); - sqlite_bind_text(st, 2, cursor_str); + bind_text(st, 2, cursor_str); int rc = sqlite3_step(st); sqlite3_finalize(st); if (rc != SQLITE_DONE) { @@ -650,24 +1103,49 @@ static void update_sync_state(sqlite3* db, int source_id, const json& cursor_jso } } -// ------------------------- -// SQLite prepared statements (batched insertion) -// ------------------------- +// =========================================================================== +// SQLite Prepared Statements +// =========================================================================== +/** + * @brief Collection of prepared SQLite statements + * + * Holds all prepared statements needed for ingestion. + * Prepared statements are reused for efficiency and SQL injection safety. + */ struct SqliteStmts { - sqlite3_stmt* doc_exists = nullptr; - sqlite3_stmt* ins_doc = nullptr; - sqlite3_stmt* ins_chunk = nullptr; - sqlite3_stmt* ins_fts = nullptr; - sqlite3_stmt* ins_vec = nullptr; // optional (only used if embedding enabled) + sqlite3_stmt* doc_exists = nullptr; ///< Check if document exists + sqlite3_stmt* ins_doc = nullptr; ///< Insert document + sqlite3_stmt* ins_chunk = nullptr; ///< Insert chunk + sqlite3_stmt* ins_fts = nullptr; ///< Insert into FTS index + sqlite3_stmt* ins_vec = nullptr; ///< Insert vector embedding }; +/** + * @brief Prepare SQLite statement or terminate + * + * Prepares a SQLite statement and calls fatal() on failure. + * Used for statements that must succeed for the program to continue. + * + * @param db SQLite database connection + * @param st Output parameter for prepared statement handle + * @param sql SQL statement to prepare + * + * @note On failure, prints error and calls fatal() to exit. + */ static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql) { if (sqlite3_prepare_v2(db, sql, -1, st, nullptr) != SQLITE_OK) { fatal(std::string("SQLite prepare failed: ") + sqlite3_errmsg(db) + "\nSQL: " + sql); } } +/** + * @brief Finalize all statements in SqliteStmts + * + * Releases all prepared statement resources. + * + * @param s Statement collection to finalize (reset to default state) + */ static void sqlite_finalize_all(SqliteStmts& s) { if (s.doc_exists) sqlite3_finalize(s.doc_exists); if (s.ins_doc) sqlite3_finalize(s.ins_doc); @@ -677,30 +1155,76 @@ static void sqlite_finalize_all(SqliteStmts& s) { s = SqliteStmts{}; } -static void sqlite_bind_text(sqlite3_stmt* st, int idx, const std::string& v) { +/** + * @brief Bind text parameter to SQLite statement + * + * Binds a std::string value to a prepared statement parameter. + * Uses SQLITE_TRANSIENT for string management (SQLite makes a copy). + * + * @param st Prepared statement + * @param idx Parameter index (1-based) + * @param v String value to bind + */ +static void bind_text(sqlite3_stmt* st, int idx, const std::string& v) { sqlite3_bind_text(st, idx, v.c_str(), -1, SQLITE_TRANSIENT); } -// Best-effort binder for sqlite3-vec embeddings (float32 array). -// If your sqlite3-vec build expects a different encoding, change this function only. +/** + * @brief Bind vector embedding to SQLite statement + * + * Binds a float32 array as a BLOB for sqlite3-vec. + * This is the "best effort" binding format that works with most + * sqlite3-vec builds. Modify if your build expects different format. + * + * @param st Prepared statement + * @param idx Parameter index (1-based) + * @param emb Float vector to bind + * + * @note The binding format is build-dependent. If vec operations fail, + * check your sqlite3-vec build's expected format. + */ static void bind_vec_embedding(sqlite3_stmt* st, int idx, const std::vector& emb) { const void* data = (const void*)emb.data(); int bytes = (int)(emb.size() * sizeof(float)); sqlite3_bind_blob(st, idx, data, bytes, SQLITE_TRANSIENT); } -// Check if doc exists +/** + * @brief Check if document exists in database + * + * Tests whether a document with the given doc_id already exists + * in rag_documents. Used for skip-if-exists logic in v0. + * + * @param ss Statement collection + * @param doc_id Document identifier to check + * @return true if document exists, false otherwise + */ static bool sqlite_doc_exists(SqliteStmts& ss, const std::string& doc_id) { sqlite3_reset(ss.doc_exists); sqlite3_clear_bindings(ss.doc_exists); - sqlite_bind_text(ss.doc_exists, 1, doc_id); + bind_text(ss.doc_exists, 1, doc_id); int rc = sqlite3_step(ss.doc_exists); return (rc == SQLITE_ROW); } -// Insert doc +/** + * @brief Insert document into rag_documents table + * + * Inserts a new document record with all metadata. + * + * @param ss Statement collection + * @param source_id Source identifier + * @param source_name Source name + * @param doc_id Document identifier + * @param pk_json Primary key JSON (for refetch) + * @param title Document title + * @param body Document body + * @param meta_json Metadata JSON object + * + * @note On failure, calls fatal() to terminate. + */ static void sqlite_insert_doc(SqliteStmts& ss, int source_id, const std::string& source_name, @@ -712,13 +1236,13 @@ static void sqlite_insert_doc(SqliteStmts& ss, sqlite3_reset(ss.ins_doc); sqlite3_clear_bindings(ss.ins_doc); - sqlite_bind_text(ss.ins_doc, 1, doc_id); + bind_text(ss.ins_doc, 1, doc_id); sqlite3_bind_int(ss.ins_doc, 2, source_id); - sqlite_bind_text(ss.ins_doc, 3, source_name); - sqlite_bind_text(ss.ins_doc, 4, pk_json); - sqlite_bind_text(ss.ins_doc, 5, title); - sqlite_bind_text(ss.ins_doc, 6, body); - sqlite_bind_text(ss.ins_doc, 7, meta_json); + bind_text(ss.ins_doc, 3, source_name); + bind_text(ss.ins_doc, 4, pk_json); + bind_text(ss.ins_doc, 5, title); + bind_text(ss.ins_doc, 6, body); + bind_text(ss.ins_doc, 7, meta_json); int rc = sqlite3_step(ss.ins_doc); if (rc != SQLITE_DONE) { @@ -726,7 +1250,22 @@ static void sqlite_insert_doc(SqliteStmts& ss, } } -// Insert chunk +/** + * @brief Insert chunk into rag_chunks table + * + * Inserts a new chunk record with title, body, and metadata. + * + * @param ss Statement collection + * @param chunk_id Chunk identifier (typically doc_id#index) + * @param doc_id Parent document identifier + * @param source_id Source identifier + * @param chunk_index Zero-based chunk index within document + * @param title Chunk title + * @param body Chunk body text + * @param meta_json Chunk metadata JSON + * + * @note On failure, calls fatal() to terminate. + */ static void sqlite_insert_chunk(SqliteStmts& ss, const std::string& chunk_id, const std::string& doc_id, @@ -738,13 +1277,13 @@ static void sqlite_insert_chunk(SqliteStmts& ss, sqlite3_reset(ss.ins_chunk); sqlite3_clear_bindings(ss.ins_chunk); - sqlite_bind_text(ss.ins_chunk, 1, chunk_id); - sqlite_bind_text(ss.ins_chunk, 2, doc_id); + bind_text(ss.ins_chunk, 1, chunk_id); + bind_text(ss.ins_chunk, 2, doc_id); sqlite3_bind_int(ss.ins_chunk, 3, source_id); sqlite3_bind_int(ss.ins_chunk, 4, chunk_index); - sqlite_bind_text(ss.ins_chunk, 5, title); - sqlite_bind_text(ss.ins_chunk, 6, body); - sqlite_bind_text(ss.ins_chunk, 7, meta_json); + bind_text(ss.ins_chunk, 5, title); + bind_text(ss.ins_chunk, 6, body); + bind_text(ss.ins_chunk, 7, meta_json); int rc = sqlite3_step(ss.ins_chunk); if (rc != SQLITE_DONE) { @@ -752,7 +1291,20 @@ static void sqlite_insert_chunk(SqliteStmts& ss, } } -// Insert into FTS +/** + * @brief Insert chunk into FTS index + * + * Inserts a chunk into the rag_fts_chunks FTS5 table for + * full-text search. The FTS table is contentless (text stored + * only in rag_chunks, FTS only maintains index). + * + * @param ss Statement collection + * @param chunk_id Chunk identifier + * @param title Chunk title + * @param body Chunk body text + * + * @note On failure, calls fatal() to terminate. + */ static void sqlite_insert_fts(SqliteStmts& ss, const std::string& chunk_id, const std::string& title, @@ -760,9 +1312,9 @@ static void sqlite_insert_fts(SqliteStmts& ss, sqlite3_reset(ss.ins_fts); sqlite3_clear_bindings(ss.ins_fts); - sqlite_bind_text(ss.ins_fts, 1, chunk_id); - sqlite_bind_text(ss.ins_fts, 2, title); - sqlite_bind_text(ss.ins_fts, 3, body); + bind_text(ss.ins_fts, 1, chunk_id); + bind_text(ss.ins_fts, 2, title); + bind_text(ss.ins_fts, 3, body); int rc = sqlite3_step(ss.ins_fts); if (rc != SQLITE_DONE) { @@ -770,8 +1322,24 @@ static void sqlite_insert_fts(SqliteStmts& ss, } } -// Insert vector row (sqlite3-vec) -// Schema: rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) +/** + * @brief Insert vector embedding into rag_vec_chunks table + * + * Inserts a vector embedding along with metadata for vector + * similarity search using sqlite3-vec. + * + * Schema: rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) + * + * @param ss Statement collection + * @param emb Float32 vector embedding + * @param chunk_id Chunk identifier + * @param doc_id Parent document identifier + * @param source_id Source identifier + * @param updated_at_unixepoch Unix epoch timestamp (0 for "now") + * + * @note If ss.ins_vec is null (embedding disabled), returns without error. + * @note On failure, calls fatal() with vec-specific error message. + */ static void sqlite_insert_vec(SqliteStmts& ss, const std::vector& emb, const std::string& chunk_id, @@ -784,48 +1352,57 @@ static void sqlite_insert_vec(SqliteStmts& ss, sqlite3_clear_bindings(ss.ins_vec); bind_vec_embedding(ss.ins_vec, 1, emb); - sqlite_bind_text(ss.ins_vec, 2, chunk_id); - sqlite_bind_text(ss.ins_vec, 3, doc_id); + bind_text(ss.ins_vec, 2, chunk_id); + bind_text(ss.ins_vec, 3, doc_id); sqlite3_bind_int(ss.ins_vec, 4, source_id); sqlite3_bind_int64(ss.ins_vec, 5, (sqlite3_int64)updated_at_unixepoch); int rc = sqlite3_step(ss.ins_vec); if (rc != SQLITE_DONE) { - // In practice, sqlite3-vec may return errors if binding format is wrong. - // Keep the message loud and actionable. + // Vec-specific error message for troubleshooting fatal(std::string("SQLite insert rag_vec_chunks failed (check vec binding format): ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_vec))); } } -// ------------------------- -// Embedding stub -// ------------------------- -// This function is a placeholder. It returns a deterministic pseudo-embedding from the text. -// Replace it with a real embedding model call in ProxySQL later. -// -// Why deterministic? -// - Helps test end-to-end ingestion + vector SQL without needing an ML runtime. -// - Keeps behavior stable across runs. -// +// =========================================================================== +// Embedding Generation +// =========================================================================== + +/** + * @brief Generate deterministic pseudo-embedding from text + * + * This is a stub implementation that generates a deterministic but + * non-semantic embedding from input text. Used for testing when + * a real embedding service is not available. + * + * The algorithm uses a rolling hash to distribute values across + * vector dimensions, then normalizes. Results are deterministic + * (same input always produces same output) but NOT semantic. + * + * @param text Input text to "embed" + * @param dim Vector dimension + * @return std::vector Deterministic pseudo-embedding + * + * @warning This does NOT produce semantic embeddings. Only for testing. + */ static std::vector pseudo_embedding(const std::string& text, int dim) { std::vector v; v.resize((size_t)dim, 0.0f); - // Simple rolling hash-like accumulation into float bins. - // NOT a semantic embedding; only for wiring/testing. - std::uint64_t h = 1469598103934665603ULL; + // FNV-1a-like rolling hash accumulation into float bins + std::uint64_t h = 1469598103934665603ULL; // FNV offset basis for (size_t i = 0; i < text.size(); i++) { h ^= (unsigned char)text[i]; - h *= 1099511628211ULL; + h *= 1099511628211ULL; // FNV prime - // Spread influence into bins + // Spread influence into bins based on hash size_t idx = (size_t)(h % (std::uint64_t)dim); - float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; // 0..1 - v[idx] += (val - 0.5f); + float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; // Normalize to 0..1 + v[idx] += (val - 0.5f); // Center around 0 } - // Very rough normalization + // L2 normalization double norm = 0.0; for (int i = 0; i < dim; i++) norm += (double)v[(size_t)i] * (double)v[(size_t)i]; norm = std::sqrt(norm); @@ -835,16 +1412,47 @@ static std::vector pseudo_embedding(const std::string& text, int dim) { return v; } -// ------------------------- -// Embedding providers -// ------------------------- - +/** + * @brief Abstract embedding provider interface + * + * Base class for embedding providers. Concrete implementations + * include StubEmbeddingProvider and OpenAIEmbeddingProvider. + */ struct EmbeddingProvider { + /** + * @brief Virtual destructor for safe polymorphic deletion + */ virtual ~EmbeddingProvider() = default; + + /** + * @brief Generate embeddings for multiple input strings + * + * @param inputs Vector of input texts to embed + * @param dim Expected embedding dimension + * @return std::vector> Vector of embeddings (one per input) + * + * @note The returned vector must have exactly inputs.size() embeddings. + * @note Each embedding must have exactly dim float values. + * + * @throws std::runtime_error on embedding generation failure + */ virtual std::vector> embed(const std::vector& inputs, int dim) = 0; }; +/** + * @brief Stub embedding provider for testing + * + * Uses pseudo_embedding() to generate deterministic but non-semantic + * embeddings. Used when no real embedding service is available. + */ struct StubEmbeddingProvider : public EmbeddingProvider { + /** + * @brief Generate pseudo-embeddings for inputs + * + * @param inputs Vector of input texts + * @param dim Embedding dimension + * @return std::vector> Deterministic pseudo-embeddings + */ std::vector> embed(const std::vector& inputs, int dim) override { std::vector> out; out.reserve(inputs.size()); @@ -853,10 +1461,28 @@ struct StubEmbeddingProvider : public EmbeddingProvider { } }; +/** + * @brief Buffer for curl HTTP response data + * + * Simple string buffer used by curl write callback to accumulate + * HTTP response data. + */ struct CurlBuffer { - std::string data; + std::string data; ///< Accumulated response data }; +/** + * @brief Curl write callback function + * + * Callback function passed to libcurl to handle incoming response data. + * Appends received data to the CurlBuffer. + * + * @param contents Pointer to received data + * @param size Size of each data element + * @param nmemb Number of elements + * @param userp User pointer (must point to CurlBuffer) + * @return size_t Total bytes processed + */ static size_t curl_write_cb(void* contents, size_t size, size_t nmemb, void* userp) { size_t total = size * nmemb; CurlBuffer* buf = static_cast(userp); @@ -864,16 +1490,46 @@ static size_t curl_write_cb(void* contents, size_t size, size_t nmemb, void* use return total; } +/** + * @brief OpenAI-compatible embedding provider + * + * Calls an OpenAI-compatible HTTP API to generate embeddings. + * Works with OpenAI, Azure OpenAI, or any compatible service. + */ struct OpenAIEmbeddingProvider : public EmbeddingProvider { - std::string api_base; - std::string api_key; - std::string model; - int timeout_ms = 20000; - + std::string api_base; ///< API base URL (e.g., "https://api.openai.com/v1") + std::string api_key; ///< API authentication key + std::string model; ///< Model name (e.g., "text-embedding-3-large") + int timeout_ms = 20000; ///< Request timeout in milliseconds + + /** + * @brief Construct OpenAI embedding provider + * + * @param base API base URL + * @param key API key + * @param mdl Model name + * @param timeout Request timeout + */ OpenAIEmbeddingProvider(std::string base, std::string key, std::string mdl, int timeout) : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {} + /** + * @brief Generate embeddings via OpenAI-compatible API + * + * Sends a batch of texts to the embedding API and returns + * the generated embeddings. + * + * @param inputs Vector of input texts to embed + * @param dim Expected embedding dimension + * @return std::vector> Generated embeddings + * + * @throws std::runtime_error on API error, timeout, or invalid response + * + * @note The inputs are sent in a single batch. For large batches, + * consider splitting into multiple calls. + */ std::vector> embed(const std::vector& inputs, int dim) override { + // Validate configuration if (api_base.empty()) { throw std::runtime_error("embedding api_base is empty"); } @@ -883,10 +1539,12 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { if (model.empty()) { throw std::runtime_error("embedding model is empty"); } + // Note: Some providers expect "hf:" prefix for HuggingFace models if (model.rfind("hf:", 0) != 0) { std::cerr << "WARN: embedding model should be prefixed with 'hf:' per Synthetic docs\n"; } + // Build request JSON json req; req["model"] = model; req["input"] = inputs; @@ -895,10 +1553,12 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { } std::string body = req.dump(); + // Build URL std::string url = api_base; if (!url.empty() && url.back() == '/') url.pop_back(); url += "/embeddings"; + // Execute HTTP request CURL* curl = curl_easy_init(); if (!curl) { throw std::runtime_error("curl_easy_init failed"); @@ -926,6 +1586,7 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { curl_slist_free_all(headers); curl_easy_cleanup(curl); + // Check for errors if (res != CURLE_OK) { throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res)); } @@ -933,11 +1594,13 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { throw std::runtime_error("embedding request failed with status " + std::to_string(status)); } + // Parse response json resp = json::parse(buf.data); if (!resp.contains("data") || !resp["data"].is_array()) { throw std::runtime_error("embedding response missing data array"); } + // Extract embeddings std::vector> out; out.reserve(resp["data"].size()); for (const auto& item : resp["data"]) { @@ -963,6 +1626,18 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { } }; +/** + * @brief Build embedding provider from configuration + * + * Factory function that creates the appropriate embedding provider + * based on the configuration. + * + * @param cfg Embedding configuration + * @return std::unique_ptr Configured provider instance + * + * @note Currently supports "stub" and "openai" providers. + * "stub" is returned for unknown provider types. + */ static std::unique_ptr build_embedding_provider(const EmbeddingConfig& cfg) { if (cfg.provider == "openai") { return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms); @@ -970,10 +1645,22 @@ static std::unique_ptr build_embedding_provider(const Embeddi return std::make_unique(); } -// ------------------------- -// Load rag_sources from SQLite -// ------------------------- - +// =========================================================================== +// Source Loading +// =========================================================================== + +/** + * @brief Load enabled sources from rag_sources table + * + * Queries the rag_sources table and parses all enabled sources + * into RagSource structs. Validates JSON configurations. + * + * @param db SQLite database connection + * @return std::vector Vector of enabled source configurations + * + * @note On JSON parsing error, calls fatal() to terminate. + * @note Only sources with enabled=1 are loaded. + */ static std::vector load_sources(sqlite3* db) { std::vector out; @@ -1035,37 +1722,54 @@ static std::vector load_sources(sqlite3* db) { return out; } -// ------------------------- -// Build a canonical document from a source row -// ------------------------- +// =========================================================================== +// Document Building +// =========================================================================== +/** + * @brief Canonical document representation + * + * Contains all fields of a document in its canonical form, + * ready for insertion into rag_documents and chunking. + */ struct BuiltDoc { - std::string doc_id; - std::string pk_json; - std::string title; - std::string body; - std::string metadata_json; + std::string doc_id; ///< Unique document identifier + std::string pk_json; ///< Primary key JSON (for refetch) + std::string title; ///< Document title + std::string body; ///< Document body (to be chunked) + std::string metadata_json; ///< Metadata JSON object }; +/** + * @brief Build canonical document from source row + * + * Transforms a raw backend row into a canonical document using + * the doc_map_json specification. Handles doc_id formatting, + * title/body concatenation, and metadata building. + * + * @param src Source configuration with doc_map_json + * @param row Raw row data from backend + * @return BuiltDoc Canonical document representation + */ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) { BuiltDoc d; - // doc_id + // Build doc_id from format template if (src.doc_map_json.contains("doc_id") && src.doc_map_json["doc_id"].is_object() && src.doc_map_json["doc_id"].contains("format") && src.doc_map_json["doc_id"]["format"].is_string()) { d.doc_id = apply_format(src.doc_map_json["doc_id"]["format"].get(), row); } else { - // fallback: table:pk + // Fallback: table:pk format auto pk = row_get(row, src.pk_column).value_or(""); d.doc_id = src.table_name + ":" + pk; } - // pk_json (refetch pointer) + // Build pk_json (refetch pointer) json pk = json::object(); pk[src.pk_column] = row_get(row, src.pk_column).value_or(""); d.pk_json = json_dump_compact(pk); - // title/body + // Build title from concat spec if (src.doc_map_json.contains("title") && src.doc_map_json["title"].is_object() && src.doc_map_json["title"].contains("concat")) { d.title = eval_concat(src.doc_map_json["title"]["concat"], row, "", false); @@ -1073,6 +1777,7 @@ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) d.title = ""; } + // Build body from concat spec if (src.doc_map_json.contains("body") && src.doc_map_json["body"].is_object() && src.doc_map_json["body"].contains("concat")) { d.body = eval_concat(src.doc_map_json["body"]["concat"], row, "", false); @@ -1080,7 +1785,7 @@ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) d.body = ""; } - // metadata_json + // Build metadata JSON json meta = json::object(); if (src.doc_map_json.contains("metadata")) { meta = build_metadata(src.doc_map_json["metadata"], row); @@ -1090,10 +1795,21 @@ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) return d; } -// ------------------------- -// Embedding input builder (optional) -// ------------------------- - +// =========================================================================== +// Embedding Input Builder +// =========================================================================== + +/** + * @brief Build embedding input text for a chunk + * + * Constructs the text to be embedded by applying the embedding_json + * input specification. Typically includes title, tags, and chunk body. + * + * @param ecfg Embedding configuration + * @param row Source row data (for column values) + * @param chunk_body Chunk body text + * @return std::string Text to embed (empty if embedding disabled) + */ static std::string build_embedding_input(const EmbeddingConfig& ecfg, const RowMap& row, const std::string& chunk_body) { @@ -1107,10 +1823,23 @@ static std::string build_embedding_input(const EmbeddingConfig& ecfg, return chunk_body; } -// ------------------------- -// Ingest one source -// ------------------------- - +// =========================================================================== +// Statement Preparation +// =========================================================================== + +/** + * @brief Prepare all SQLite statements for ingestion + * + * Creates prepared statements for document existence check, + * document insertion, chunk insertion, FTS insertion, and + * optionally vector insertion. + * + * @param db SQLite database connection + * @param want_vec Whether to prepare vector insert statement + * @return SqliteStmts Collection of prepared statements + * + * @note On failure, calls fatal() to terminate. + */ static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { SqliteStmts ss; @@ -1134,7 +1863,6 @@ static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { // Insert vector (optional) if (want_vec) { - // NOTE: If your sqlite3-vec build expects different binding format, adapt bind_vec_embedding(). sqlite_prepare_or_die(db, &ss.ins_vec, "INSERT INTO rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) " "VALUES(?,?,?,?,?)"); @@ -1143,18 +1871,41 @@ static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { return ss; } +// =========================================================================== +// Source Ingestion +// =========================================================================== + +/** + * @brief Ingest data from a single source + * + * Main ingestion function for a single source. Performs the following: + * 1. Parse chunking and embedding configurations + * 2. Load sync cursor (watermark) + * 3. Connect to backend + * 4. Build and execute SELECT query with incremental filter + * 5. For each row: build document, check existence, insert, chunk, embed + * 6. Update sync cursor with max watermark value + * + * @param sdb SQLite database connection + * @param src Source configuration + * + * @note Only "mysql" backend_type is supported in v0. + * @note Skips documents that already exist (no update in v0). + * @note All inserts happen in a single transaction (managed by caller). + */ static void ingest_source(sqlite3* sdb, const RagSource& src) { std::cerr << "Ingesting source_id=" << src.source_id << " name=" << src.name << " backend=" << src.backend_type << " table=" << src.table_name << "\n"; + // v0 only supports MySQL if (src.backend_type != "mysql") { std::cerr << " Skipping: backend_type not supported in v0.\n"; return; } - // Parse chunking & embedding config + // Parse configurations ChunkingConfig ccfg = parse_chunking_json(src.chunking_json); EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json); std::unique_ptr embedder; @@ -1162,17 +1913,17 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { embedder = build_embedding_provider(ecfg); } - // Load sync cursor (watermark) + // Load sync cursor (watermark for incremental sync) json cursor_json = load_sync_cursor_json(sdb, src.source_id); SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column); - // Prepare SQLite statements for this run + // Prepare SQLite statements SqliteStmts ss = prepare_sqlite_statements(sdb, ecfg.enabled); - // Connect MySQL + // Connect to MySQL MYSQL* mdb = mysql_connect_or_die(src); - // Build SELECT (include watermark column if needed) + // Build SELECT with incremental filter std::vector cols = collect_needed_columns(src, ecfg); if (!cursor.column.empty()) add_unique(cols, cursor.column); std::string extra_filter = build_incremental_filter(cursor); @@ -1196,21 +1947,25 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { std::uint64_t ingested_docs = 0; std::uint64_t skipped_docs = 0; + // Track max watermark value (for next run) MYSQL_ROW r; bool max_set = false; bool max_numeric = false; std::int64_t max_num = 0; std::string max_str; + + // Process each row while ((r = mysql_fetch_row(res)) != nullptr) { RowMap row = mysql_row_to_map(res, r); - // Track max watermark value from source rows (even if doc is skipped) + // Track max watermark value (even for skipped docs) if (!cursor.column.empty()) { auto it = row.find(cursor.column); if (it != row.end()) { const std::string& v = it->second; if (!v.empty()) { if (!max_set) { + // First value - determine type and store if (cursor.numeric || is_integer_string(v)) { try { max_numeric = true; @@ -1230,6 +1985,7 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { } max_set = true; } else if (max_numeric) { + // Already numeric - try to compare as number if (is_integer_string(v)) { try { std::int64_t nv = std::stoll(v); @@ -1243,15 +1999,17 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { } } } else { + // Already string - lexicographic comparison if (v > max_str) max_str = v; } } } } + // Build document from row BuiltDoc doc = build_document_from_row(src, row); - // v0: skip if exists + // v0: Skip if document already exists if (sqlite_doc_exists(ss, doc.doc_id)) { skipped_docs++; continue; @@ -1261,30 +2019,32 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { sqlite_insert_doc(ss, src.source_id, src.name, doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json); - // Chunk and insert chunks + FTS (+ optional vec) + // Chunk document body std::vector chunks = chunk_text_chars(doc.body, ccfg); - // Use SQLite's unixepoch() for updated_at normally; vec table also stores updated_at as unix epoch. - // Here we store a best-effort "now" from SQLite (unixepoch()) would require a query; instead store 0 - // or a local time. For v0, we store 0 and let schema default handle other tables. - // If you want accuracy, query SELECT unixepoch() once per run and reuse it. + // Note: updated_at is set to 0 for v0. For accurate timestamps, + // query SELECT unixepoch() once at the start of main(). std::int64_t now_epoch = 0; + // Process each chunk for (size_t i = 0; i < chunks.size(); i++) { std::string chunk_id = doc.doc_id + "#" + std::to_string(i); - // Chunk metadata (minimal) + // Chunk metadata (minimal - just index) json cmeta = json::object(); cmeta["chunk_index"] = (int)i; - std::string chunk_title = doc.title; // simple: repeat doc title + // Use document title as chunk title (simple approach) + std::string chunk_title = doc.title; + // Insert chunk sqlite_insert_chunk(ss, chunk_id, doc.doc_id, src.source_id, (int)i, chunk_title, chunks[i], json_dump_compact(cmeta)); + // Insert into FTS index sqlite_insert_fts(ss, chunk_id, chunk_title, chunks[i]); - // Optional vectors + // Generate and insert embedding (if enabled) if (ecfg.enabled) { std::string emb_input = build_embedding_input(ecfg, row, chunks[i]); std::vector batch_inputs = {emb_input}; @@ -1300,10 +2060,12 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { } } + // Cleanup mysql_free_result(res); mysql_close(mdb); sqlite_finalize_all(ss); + // Update sync cursor with max watermark value if (!cursor_json.is_object()) cursor_json = json::object(); if (!cursor.column.empty()) cursor_json["column"] = cursor.column; if (max_set) { @@ -1320,34 +2082,62 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { << " skipped_docs=" << skipped_docs << "\n"; } -// ------------------------- -// Main -// ------------------------- - +// =========================================================================== +// Main Entry Point +// =========================================================================== + +/** + * @brief Main entry point for RAG ingestion tool + * + * Entry point for the RAG ingestion tool. Orchestrates the entire + * ingestion process: + * + * 1. Validates command-line arguments + * 2. Initializes libcurl + * 3. Opens SQLite RAG index database + * 4. Loads sqlite3-vec extension (if configured) + * 5. Sets SQLite pragmas (foreign keys, WAL mode) + * 6. Begins transaction + * 7. Loads and ingests all enabled sources + * 8. Commits or rolls back transaction + * 9. Cleanup and exit + * + * @param argc Argument count + * @param argv Argument values + * @return int Exit code (0 on success, 1 on error, 2 on usage error) + * + * Usage: + * @verbatim + * ./rag_ingest /path/to/rag_index.sqlite + * @endverbatim + */ int main(int argc, char** argv) { + // Validate arguments if (argc != 2) { std::cerr << "Usage: " << argv[0] << " \n"; return 2; } + // Initialize libcurl (for embedding API calls) curl_global_init(CURL_GLOBAL_DEFAULT); const char* sqlite_path = argv[1]; + // Open SQLite database sqlite3* db = nullptr; if (sqlite3_open(sqlite_path, &db) != SQLITE_OK) { fatal("Could not open SQLite DB: " + std::string(sqlite_path)); } - // Load vec0 if configured (needed for rag_vec_chunks inserts) + // Load sqlite3-vec extension if configured sqlite_load_vec_extension(db); - // Pragmas (safe defaults) + // Set safe SQLite pragmas sqlite_exec(db, "PRAGMA foreign_keys = ON;"); sqlite_exec(db, "PRAGMA journal_mode = WAL;"); sqlite_exec(db, "PRAGMA synchronous = NORMAL;"); - // Single transaction for speed + // Begin single transaction for speed if (sqlite_exec(db, "BEGIN IMMEDIATE;") != SQLITE_OK) { sqlite3_close(db); fatal("Failed to begin transaction"); @@ -1355,6 +2145,7 @@ int main(int argc, char** argv) { bool ok = true; try { + // Load and ingest all enabled sources std::vector sources = load_sources(db); if (sources.empty()) { std::cerr << "No enabled sources found in rag_sources.\n"; @@ -1370,6 +2161,7 @@ int main(int argc, char** argv) { ok = false; } + // Commit or rollback if (ok) { if (sqlite_exec(db, "COMMIT;") != SQLITE_OK) { sqlite_exec(db, "ROLLBACK;"); @@ -1383,8 +2175,8 @@ int main(int argc, char** argv) { return 1; } + // Cleanup sqlite3_close(db); curl_global_cleanup(); return 0; } -