diff --git a/RAG_POC/Makefile b/RAG_POC/Makefile
index d97133a68..9a66bb123 100644
--- a/RAG_POC/Makefile
+++ b/RAG_POC/Makefile
@@ -6,14 +6,11 @@ ROOT_DIR := ..
INCLUDES := \
-I$(ROOT_DIR)/deps/json \
-I$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/include \
- -I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 \
-I$(ROOT_DIR)/deps/curl/curl/include
LIBDIRS := \
-L$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/libmariadb
-SQLITE3_OBJ := $(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400/sqlite3.o
-
# Use static libcurl
CURL_STATIC_LIB := $(ROOT_DIR)/deps/curl/curl/lib/.libs/libcurl.a
@@ -27,7 +24,7 @@ SOURCES := rag_ingest.cpp
all: $(TARGET)
$(TARGET): $(SOURCES)
- $(CXX) $(CXXFLAGS) $(INCLUDES) $(LIBDIRS) $(SQLITE3_OBJ) $^ -o $@ $(LIBS)
+ $(CXX) $(CXXFLAGS) $(INCLUDES) $(LIBDIRS) $^ -o $@ $(LIBS)
clean:
rm -f $(TARGET)
diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp
index cbfce8262..1ea4932d3 100644
--- a/RAG_POC/rag_ingest.cpp
+++ b/RAG_POC/rag_ingest.cpp
@@ -1,9 +1,9 @@
/**
* @file rag_ingest.cpp
- * @brief ProxySQL RAG (Retrieval-Augmented Generation) Ingestion Tool
+ * @brief ProxySQL RAG (Retrieval-Augmented Generation) Ingestion Tool - MySQL Protocol Version
*
* @verbatim
- * ProxySQL RAG Ingestion PoC (General-Purpose)
+ * ProxySQL RAG Ingestion PoC (General-Purpose) - MySQL Protocol Version
* @endverbatim
*
* @section overview Overview
@@ -13,9 +13,39 @@
* to configurable JSON specifications, chunks the content, builds full-text
* search indexes, and optionally generates vector embeddings for semantic search.
*
+ * @section architecture Architecture
+ *
+ * Two-Port Design:
+ *
+ *
+ * rag_ingest
+ * |
+ * MySQL Protocol (mariadb client)
+ * |
+ * v
+ * +------------------------+
+ * | ProxySQL SQLite3 Server| Port 6030 (default)
+ * | (MySQL Protocol |
+ * | Gateway to SQLite) |
+ * +------------------------+
+ * |
+ * | SQLite engine
+ * v
+ * +------------------------+
+ * | RAG Database |
+ * | - rag_* tables |
+ * | - FTS5 index |
+ * | - vec0 index |
+ * +------------------------+
+ *
+ * rag_sources table points to backend MySQL:
+ * - backend_host: 127.0.0.1 (default)
+ * - backend_port: 3306 (default)
+ *
+ *
* @section v0_features v0 Features
*
- * - Reads enabled sources from rag_sources table
+ * - Reads enabled sources from rag_sources table (via MySQL protocol to SQLite gateway)
* - Connects to MySQL backend and fetches data using configurable SELECT queries
* - Transforms rows using doc_map_json specification
* - Chunks document bodies using configurable chunking parameters
@@ -24,18 +54,9 @@
* - Skips documents that already exist (no upsert in v0)
* - Supports incremental sync using watermark-based cursor tracking
*
- * @section future_plans Future Plans (v1+)
- *
- * - Add hash-based change detection for efficient updates
- * - Support document updates (not just skip-if-exists)
- * - Add PostgreSQL backend support
- * - Add async embedding workers
- * - Add operational metrics and monitoring
- *
* @section dependencies Dependencies
*
- * - sqlite3: For RAG index storage
- * - mysqlclient / libmysqlclient: For MySQL backend connections
+ * - mysqlclient / mariadb-client: For MySQL protocol connections
* - libcurl: For HTTP-based embedding providers (OpenAI-compatible)
* - nlohmann/json: Single-header JSON library (json.hpp)
* - libcrypt: For sha256_crypt_r weak alias (platform compatibility)
@@ -44,35 +65,32 @@
*
* @verbatim
* g++ -std=c++17 -O2 rag_ingest.cpp -o rag_ingest \
- * -lsqlite3 -lmysqlclient -lcurl -lcrypt
+ * -lmysqlclient -lcurl -lcrypt
* @endverbatim
*
* @section usage Usage
*
* @verbatim
- * ./rag_ingest /path/to/rag_index.sqlite
- * @endverbatim
+ * # Initialize schema (SQLite Server via MySQL protocol gateway)
+ * ./rag_ingest init --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db
*
- * The RAG_VEC0_EXT environment variable can be set to specify the path
- * to the sqlite3-vec extension:
+ * # Run ingestion
+ * ./rag_ingest ingest --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db
*
- * @verbatim
- * export RAG_VEC0_EXT=/usr/local/lib/sqlite3-vec.so
- * ./rag_ingest /path/to/rag_index.sqlite
+ * # Short options
+ * ./rag_ingest init -h 127.0.0.1 -P 6030 -u root -p root -D rag_db
+ * ./rag_ingest ingest -h 127.0.0.1 -P 6030 -u root -p root -D rag_db
* @endverbatim
*
- * @section architecture Architecture
- *
- * @subsection ingestion_flow Ingestion Flow
+ * @section ingestion_flow Ingestion Flow
*
*
- * 1. Open SQLite RAG index database
- * 2. Load sqlite3-vec extension (if RAG_VEC0_EXT is set)
- * 3. Load enabled sources from rag_sources table
- * 4. For each source:
+ * 1. Connect to SQLite Server (via MySQL protocol on port 6030)
+ * 2. Load enabled sources from rag_sources table
+ * 3. For each source:
* a. Parse chunking_json and embedding_json configurations
* b. Load sync cursor (watermark) from rag_sync_state
- * c. Connect to MySQL backend
+ * c. Connect to MySQL backend (configured in rag_sources)
* d. Build minimal SELECT query (only fetch needed columns)
* e. Add incremental filter based on watermark
* f. For each row:
@@ -85,137 +103,14 @@
* - Insert into rag_fts_chunks (FTS5)
* - If embedding enabled: generate and insert embedding
* g. Update sync cursor with max watermark value
- * 5. Commit transaction or rollback on error
+ * 4. Commit transaction or rollback on error
*
*
- * @subsection json_specs JSON Configuration Specifications
- *
- * @subsubsection doc_map_json doc_map_json (Required)
- *
- * Defines how to transform a source row into a canonical document:
- *
- * @verbatim
- * {
- * "doc_id": { "format": "posts:{Id}" },
- * "title": { "concat": [ {"col":"Title"} ] },
- * "body": { "concat": [ {"col":"Body"} ] },
- * "metadata": {
- * "pick": ["Id","Tags","Score","CreationDate"],
- * "rename": {"CreationDate":"Created"}
- * }
- * }
- * @endverbatim
- *
- * Fields:
- * - doc_id.format: Template string with {ColumnName} placeholders
- * - title.concat: Array of column references and literals
- * - body.concat: Array of column references and literals
- * - metadata.pick: Columns to include in metadata JSON
- * - metadata.rename: Map of old_name -> new_name for metadata keys
- *
- * @subsubsection chunking_json chunking_json (Required)
- *
- * Defines how to split document bodies into chunks:
- *
- * @verbatim
- * {
- * "enabled": true,
- * "unit": "chars", // v0 only supports "chars"
- * "chunk_size": 4000, // Target chunk size in characters
- * "overlap": 400, // Overlap between consecutive chunks
- * "min_chunk_size": 800 // Minimum chunk size (avoid tiny tail chunks)
- * }
- * @endverbatim
- *
- * @subsubsection embedding_json embedding_json (Optional)
- *
- * Defines how to generate embeddings for chunks:
- *
- * @verbatim
- * {
- * "enabled": true,
- * "dim": 1536,
- * "model": "text-embedding-3-large",
- * "provider": "openai", // "stub" or "openai"
- * "api_base": "https://api.openai.com/v1",
- * "api_key": "sk-...",
- * "batch_size": 16,
- * "timeout_ms": 20000,
- * "input": { "concat": [
- * {"col":"Title"},
- * {"lit":"\nTags: "}, {"col":"Tags"},
- * {"lit":"\n\n"},
- * {"chunk_body": true}
- * ]}
- * }
- * @endverbatim
- *
- * The concat array supports:
- * - {"col":"ColumnName"}: Include column value
- * - {"lit":"literal text"}: Include literal string
- * - {"chunk_body":true}: Include the chunk body text
- *
- * @subsection concat_spec Concat Specification
- *
- * The concat specification is used in multiple places (title, body, embedding input).
- * It's a JSON array of objects, each describing one part to concatenate:
- *
- * @verbatim
- * [
- * {"col": "Title"}, // Use value from "Title" column
- * {"lit": "\nTags: "}, // Use literal string
- * {"col": "Tags"}, // Use value from "Tags" column
- * {"lit": "\n\n"},
- * {"chunk_body": true} // Use chunk body (embedding only)
- * ]
- * @endverbatim
- *
- * @subsection sqlite3_vec sqlite3-vec Integration
- *
- * The sqlite3-vec extension provides vector similarity search capabilities.
- * This program binds float32 arrays as BLOBs for the embedding column.
- *
- * The binding format may vary by sqlite3-vec build. If your build expects
- * a different format, modify bind_vec_embedding() accordingly.
- *
- * @subsection sync_cursor Sync Cursor (Watermark)
- *
- * The sync cursor enables incremental ingestion by tracking the last processed
- * value from a monotonic column (e.g., auto-increment ID, timestamp).
- *
- * Stored in rag_sync_state.cursor_json:
- * @verbatim
- * {
- * "column": "Id", // Column name for watermark
- * "value": 12345 // Last processed value
- * }
- * @endverbatim
- *
- * On next run, only rows WHERE column > value are fetched.
- *
- * @section error_handling Error Handling
- *
- * This program uses a "fail-fast" approach:
- * - JSON parsing errors are fatal (invalid configuration)
- * - Database connection errors are fatal
- * - SQL execution errors are fatal
- * - Errors during ingestion rollback the entire transaction
- *
- * All fatal errors call fatal() which prints the message and exits with code 1.
- *
- * @section threading Threading Model
- *
- * v0 is single-threaded. Future versions may support:
- * - Concurrent source ingestion
- * - Async embedding generation
- * - Parallel chunk processing
- *
* @author ProxySQL Development Team
- * @version 0.1.0
- * @date 2024
+ * @version 0.2.0 (MySQL Protocol)
+ * @date 2026
*/
-#include "sqlite3.h"
#include "mysql.h"
#include "crypt.h"
#include "curl/curl.h"
@@ -224,1931 +119,1343 @@
#include
#include
#include
+#include
#include
#include
#include
#include
#include
+#include
#include "json.hpp"
using json = nlohmann::json;
-/**
- * @brief Weak alias for sha256_crypt_r function
- *
- * This weak symbol alias provides compatibility across platforms where
- * sha256_crypt_r may or may not be available in libcrypt. If the
- * symbol exists in libcrypt, it will be used. Otherwise, this
- * implementation using crypt_r() will be linked.
- *
- * @param key The key/password to hash
- * @param salt The salt string
- * @param buffer Output buffer for the hashed result
- * @param buflen Size of the output buffer
- * @return Pointer to hashed result on success, nullptr on failure
- *
- * @note This is a workaround for library inconsistencies across platforms.
- * The actual hashing is delegated to crypt_r().
- */
-extern "C" __attribute__((weak)) char *sha256_crypt_r(
- const char *key,
- const char *salt,
- char *buffer,
- int buflen) {
- if (!key || !salt || !buffer || buflen <= 0) {
- return nullptr;
- }
- struct crypt_data data;
- std::memset(&data, 0, sizeof(data));
- char *res = crypt_r(key, salt, &data);
- if (!res) {
- return nullptr;
- }
- size_t len = std::strlen(res);
- if (len + 1 > static_cast(buflen)) {
- return nullptr;
- }
- std::memcpy(buffer, res, len + 1);
- return buffer;
-}
-
// ===========================================================================
// Utility Functions
// ===========================================================================
-/**
- * @brief Print fatal error message and exit
- *
- * This function prints an error message to stderr and terminates the
- * program with exit code 1. It is used for unrecoverable errors where
- * continuing execution is impossible or unsafe.
- *
- * @param msg The error message to print
- *
- * @note This function does not return. It calls std::exit(1).
- */
static void fatal(const std::string& msg) {
- std::cerr << "FATAL: " << msg << "\n";
- std::exit(1);
+ std::cerr << "FATAL: " << msg << "\n";
+ std::exit(1);
}
-/**
- * @brief Safely convert C string to std::string
- *
- * Converts a potentially null C string pointer to a std::string.
- * If the pointer is null, returns an empty string. This prevents
- * undefined behavior from constructing std::string(nullptr).
- *
- * @param p Pointer to C string (may be null)
- * @return std::string containing the C string content, or empty string if p is null
- */
-static std::string str_or_empty(const char* p) {
- return p ? std::string(p) : std::string();
+// Helper: Convert float to hex string for SQLite BLOB (X'...')
+// Handles endianness correctly for IEEE 754 float32
+static std::string float_to_hex_blob(float f) {
+ // Use memcpy for safe type-punning (avoids strict aliasing violation)
+ uint32_t bits = 0;
+ std::memcpy(&bits, &f, sizeof(float));
+
+ // Format as little-endian hex (matches typical x86_64 architecture)
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%02x%02x%02x%02x",
+ static_cast(bits & 0xFF),
+ static_cast((bits >> 8) & 0xFF),
+ static_cast((bits >> 16) & 0xFF),
+ static_cast((bits >> 24) & 0xFF));
+ return std::string(buf);
}
+// ===========================================================================
+// MySQL Connection Wrapper
+// ===========================================================================
+
/**
- * @brief Execute a SQL statement on SQLite
- *
- * Executes a SQL statement that does not return rows (e.g., INSERT,
- * UPDATE, DELETE, PRAGMA). If an error occurs, it prints the error
- * message and SQL to stderr.
+ * @brief MySQL connection wrapper for RAG database
*
- * @param db SQLite database connection
- * @param sql SQL statement to execute
- * @return int SQLite return code (SQLITE_OK on success, error code otherwise)
+ * Wraps MYSQL* connection with RAII and helper methods.
+ * The backend is SQLite, but accessed via MySQL protocol gateway.
*/
-static int sqlite_exec(sqlite3* db, const std::string& sql) {
- char* err = nullptr;
- int rc = sqlite3_exec(db, sql.c_str(), nullptr, nullptr, &err);
- if (rc != SQLITE_OK) {
- std::string e = err ? err : "(unknown sqlite error)";
- sqlite3_free(err);
- std::cerr << "SQLite error: " << e << "\nSQL: " << sql << "\n";
- }
- return rc;
+struct MySQLDB {
+ MYSQL* conn = nullptr;
+
+ // Default constructor
+ MySQLDB() = default;
+
+ // RAII: prevent copying
+ MySQLDB(const MySQLDB&) = delete;
+ MySQLDB& operator=(const MySQLDB&) = delete;
+
+ // Allow moving
+ MySQLDB(MySQLDB&& other) noexcept : conn(other.conn) {
+ other.conn = nullptr;
+ }
+
+ MySQLDB& operator=(MySQLDB&& other) noexcept {
+ if (this != &other) {
+ if (conn) mysql_close(conn);
+ conn = other.conn;
+ other.conn = nullptr;
+ }
+ return *this;
+ }
+
+ ~MySQLDB() {
+ if (conn) mysql_close(conn);
+ }
+
+ /**
+ * @brief Connect to MySQL server
+ * @param host Server hostname or IP
+ * @param port Server port
+ * @param user Username
+ * @param pass Password
+ * @param db Database name
+ */
+ void connect(const char* host, int port, const char* user,
+ const char* pass, const char* db) {
+ conn = mysql_init(nullptr);
+ if (!conn) fatal("mysql_init failed");
+
+ mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4");
+
+ if (!mysql_real_connect(conn, host, user, pass, db, port, nullptr, 0)) {
+ fatal(std::string("MySQL connect failed: ") + mysql_error(conn));
+ }
+ }
+
+ /**
+ * @brief Execute a simple SQL statement
+ * @param sql SQL statement to execute
+ */
+ void execute(const char* sql) {
+ if (mysql_query(conn, sql) != 0) {
+ std::cerr << "MySQL error: " << mysql_error(conn) << "\nSQL: " << sql << "\n";
+ fatal("Query failed");
+ }
+ }
+
+ /**
+ * @brief Execute SQL statement and return true on success, false on error
+ * @param sql SQL statement to execute
+ * @return true if successful, false otherwise
+ */
+ bool try_execute(const char* sql) {
+ if (mysql_query(conn, sql) != 0) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @brief Execute query and return result
+ * @param sql SQL query to execute
+ * @return MYSQL_RES* Result set (caller must free with mysql_free_result)
+ */
+ MYSQL_RES* query(const char* sql) {
+ if (mysql_query(conn, sql) != 0) {
+ fatal(std::string("MySQL query failed: ") + mysql_error(conn) + "\nSQL: " + sql);
+ }
+ MYSQL_RES* res = mysql_store_result(conn);
+ if (!res) {
+ fatal(std::string("mysql_store_result failed: ") + mysql_error(conn));
+ }
+ return res;
+ }
+};
+
+// ===========================================================================
+// Utility Functions
+// ===========================================================================
+
+static std::string str_or_empty(const char* p) {
+ return p ? std::string(p) : std::string();
}
-/**
- * @brief Check if a string represents an integer
- *
- * Determines if the given string consists only of digits, optionally
- * preceded by a minus sign. Used for watermark value type detection.
- *
- * @param s String to check
- * @return true if string is a valid integer representation, false otherwise
- */
static bool is_integer_string(const std::string& s) {
- if (s.empty()) return false;
- size_t i = 0;
- if (s[0] == '-') {
- if (s.size() == 1) return false;
- i = 1;
- }
- for (; i < s.size(); i++) {
- if (s[i] < '0' || s[i] > '9') return false;
- }
- return true;
+ if (s.empty()) return false;
+ size_t i = 0;
+ if (s[0] == '-') {
+ if (s.size() == 1) return false;
+ i = 1;
+ }
+ for (; i < s.size(); i++) {
+ if (s[i] < '0' || s[i] > '9') return false;
+ }
+ return true;
}
-/**
- * @brief Escape single quotes in SQL string literals
- *
- * Doubles single quotes in a string for safe SQL literal construction.
- * This is used for building incremental filter conditions.
- *
- * @param s Input string to escape
- * @return std::string with single quotes doubled (e.g., "it's" -> "it''s")
- *
- * @note This is basic escaping. For user input, prefer parameter binding.
- */
static std::string sql_escape_single_quotes(const std::string& s) {
- std::string out;
- out.reserve(s.size() + 8);
- for (char c : s) {
- if (c == '\'') out.push_back('\'');
- out.push_back(c);
- }
- return out;
+ std::string out;
+ out.reserve(s.size() * 2); // Reserve more space for escapes
+ for (char c : s) {
+ if (c == '\'') {
+ out.push_back('\''); // Escape single quote as ''
+ out.push_back('\'');
+ } else if (c == '\\') {
+ out.push_back('\\'); // Escape backslash as \\
+ out.push_back('\\');
+ } else {
+ out.push_back(c);
+ }
+ }
+ return out;
}
-/**
- * @brief Serialize JSON to compact string
- *
- * Converts a JSON object to a compact string representation without
- * whitespace or pretty-printing. Used for efficient storage.
- *
- * @param j JSON object to serialize
- * @return std::string Compact JSON representation
- */
static std::string json_dump_compact(const json& j) {
- return j.dump();
-}
-
-/**
- * @brief Load sqlite3-vec extension if configured
- *
- * Loads the sqlite3-vec extension from the path specified in the
- * RAG_VEC0_EXT environment variable. This is required for vector
- * similarity search functionality.
- *
- * @param db SQLite database connection
- *
- * @note If RAG_VEC0_EXT is not set or empty, this function does nothing.
- * The vec0 extension is only required when embedding generation is enabled.
- */
-static void sqlite_load_vec_extension(sqlite3* db) {
- const char* ext = std::getenv("RAG_VEC0_EXT");
- if (!ext || std::strlen(ext) == 0) return;
-
- sqlite3_enable_load_extension(db, 1);
- char* err = nullptr;
- int rc = sqlite3_load_extension(db, ext, nullptr, &err);
- if (rc != SQLITE_OK) {
- std::string e = err ? err : "(unknown error)";
- sqlite3_free(err);
- fatal("Failed to load vec0 extension: " + e + " (" + std::string(ext) + ")");
- }
+ return j.dump();
}
// ===========================================================================
// Data Structures
// ===========================================================================
-/**
- * @brief Configuration for a single RAG data source
- *
- * Represents all configuration needed to ingest data from a single
- * external source into the RAG index. Loaded from the rag_sources table.
- */
struct RagSource {
- int source_id = 0; ///< Unique source identifier from database
- std::string name; ///< Human-readable source name
- int enabled = 0; ///< Whether this source is enabled (0/1)
-
- // Backend connection settings
- std::string backend_type; ///< Type of backend ("mysql" in v0)
- std::string host; ///< Backend server hostname or IP
- int port = 3306; ///< Backend server port
- std::string user; ///< Backend authentication username
- std::string pass; ///< Backend authentication password
- std::string db; ///< Backend database name
-
- // Source table settings
- std::string table_name; ///< Name of the table to read from
- std::string pk_column; ///< Primary key column name
- std::string where_sql; ///< Optional WHERE clause for row filtering
-
- // Transformation configuration (JSON)
- json doc_map_json; ///< Document transformation specification
- json chunking_json; ///< Chunking configuration
- json embedding_json; ///< Embedding configuration (may be null)
+ int source_id = 0;
+ std::string name;
+ int enabled = 0;
+
+ std::string backend_type;
+ std::string host;
+ int port = 3306;
+ std::string user;
+ std::string pass;
+ std::string db;
+
+ std::string table_name;
+ std::string pk_column;
+ std::string where_sql;
+
+ json doc_map_json;
+ json chunking_json;
+ json embedding_json;
};
-/**
- * @brief Parsed chunking configuration
- *
- * Controls how document bodies are split into chunks for indexing.
- * Chunks improve retrieval precision by matching smaller, more
- * focused text segments.
- */
struct ChunkingConfig {
- bool enabled = true; ///< Whether chunking is enabled
- std::string unit = "chars"; ///< Unit of measurement ("chars" in v0)
- int chunk_size = 4000; ///< Target size of each chunk
- int overlap = 400; ///< Overlap between consecutive chunks
- int min_chunk_size = 800; ///< Minimum size to avoid tiny tail chunks
+ bool enabled = true;
+ std::string unit = "chars";
+ int chunk_size = 4000;
+ int overlap = 400;
+ int min_chunk_size = 800;
};
-/**
- * @brief Parsed embedding configuration
- *
- * Controls how vector embeddings are generated for chunks.
- * Embeddings enable semantic similarity search.
- */
struct EmbeddingConfig {
- bool enabled = false; ///< Whether embedding generation is enabled
- int dim = 1536; ///< Vector dimension (model-specific)
- std::string model = "unknown"; ///< Model name (for observability)
- json input_spec; ///< Embedding input template {"concat":[...]}
- std::string provider = "stub"; ///< Provider type: "stub" or "openai"
- std::string api_base; ///< API endpoint URL (for "openai" provider)
- std::string api_key; ///< API authentication key
- int batch_size = 16; ///< Maximum inputs per API call (unused in v0)
- int timeout_ms = 20000; ///< Request timeout in milliseconds
+ bool enabled = false;
+ int dim = 1536;
+ std::string model = "unknown";
+ json input_spec;
+ std::string provider = "stub";
+ std::string api_base;
+ std::string api_key;
+ int batch_size = 16;
+ int timeout_ms = 20000;
};
-/**
- * @brief Sync cursor for incremental ingestion
- *
- * Tracks the last processed watermark value for incremental sync.
- * Only rows with watermark > cursor.value are fetched on subsequent runs.
- */
struct SyncCursor {
- std::string column; ///< Column name used for watermark
- bool has_value = false; ///< Whether a cursor value has been set
- bool numeric = false; ///< Whether the value is numeric (vs string)
- std::int64_t num_value = 0; ///< Numeric watermark value (if numeric=true)
- std::string str_value; ///< String watermark value (if numeric=false)
+ std::string column;
+ bool has_value = false;
+ bool numeric = false;
+ std::int64_t num_value = 0;
+ std::string str_value;
};
-/**
- * @brief Row representation from MySQL backend
- *
- * A map from column name to string value. All MySQL values are
- * represented as strings for simplicity; conversion happens
- * during document building.
- */
typedef std::unordered_map RowMap;
-/**
- * @brief Pending embedding for batched processing
- *
- * Holds metadata for a chunk that needs embedding generation.
- * Used to batch multiple chunks together for efficient API calls.
- */
struct PendingEmbedding {
- std::string chunk_id; ///< Unique chunk identifier (doc_id#index)
- std::string doc_id; ///< Parent document identifier
- int source_id; ///< Source identifier
- std::string input_text; ///< Text to embed (already built via build_embedding_input)
+ std::string chunk_id;
+ std::string doc_id;
+ int source_id;
+ std::string input_text;
};
// ===========================================================================
// JSON Parsing Functions
// ===========================================================================
-/**
- * @brief Parse chunking_json into ChunkingConfig struct
- *
- * Extracts and validates chunking configuration from JSON.
- * Applies sensible defaults for missing or invalid values.
- *
- * @param j JSON object containing chunking configuration
- * @return ChunkingConfig Parsed configuration with defaults applied
- *
- * @note Only "chars" unit is supported in v0. Other units fall back to "chars".
- */
static ChunkingConfig parse_chunking_json(const json& j) {
- ChunkingConfig cfg;
- if (!j.is_object()) return cfg;
-
- if (j.contains("enabled")) cfg.enabled = j["enabled"].get();
- if (j.contains("unit")) cfg.unit = j["unit"].get();
- if (j.contains("chunk_size")) cfg.chunk_size = j["chunk_size"].get();
- if (j.contains("overlap")) cfg.overlap = j["overlap"].get();
- if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get();
-
- // Apply sanity checks and defaults
- if (cfg.chunk_size <= 0) cfg.chunk_size = 4000;
- if (cfg.overlap < 0) cfg.overlap = 0;
- if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4;
- if (cfg.min_chunk_size < 0) cfg.min_chunk_size = 0;
-
- // v0 only supports chars
- if (cfg.unit != "chars") {
- std::cerr << "WARN: chunking_json.unit=" << cfg.unit
- << " not supported in v0. Falling back to chars.\n";
- cfg.unit = "chars";
- }
-
- return cfg;
+ ChunkingConfig cfg;
+ if (!j.is_object()) return cfg;
+
+ if (j.contains("enabled")) cfg.enabled = j["enabled"].get();
+ if (j.contains("unit")) cfg.unit = j["unit"].get();
+ if (j.contains("chunk_size")) cfg.chunk_size = j["chunk_size"].get();
+ if (j.contains("overlap")) cfg.overlap = j["overlap"].get();
+ if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get();
+
+ if (cfg.chunk_size <= 0) cfg.chunk_size = 4000;
+ if (cfg.overlap < 0) cfg.overlap = 0;
+ if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4;
+ if (cfg.min_chunk_size < 0) cfg.min_chunk_size = 0;
+
+ if (cfg.unit != "chars") {
+ std::cerr << "WARN: chunking_json.unit=" << cfg.unit
+ << " not supported in v0. Falling back to chars.\n";
+ cfg.unit = "chars";
+ }
+
+ return cfg;
}
-/**
- * @brief Parse embedding_json into EmbeddingConfig struct
- *
- * Extracts and validates embedding configuration from JSON.
- * Applies sensible defaults for missing or invalid values.
- *
- * @param j JSON object containing embedding configuration (may be null)
- * @return EmbeddingConfig Parsed configuration with defaults applied
- */
static EmbeddingConfig parse_embedding_json(const json& j) {
- EmbeddingConfig cfg;
- if (!j.is_object()) return cfg;
-
- if (j.contains("enabled")) cfg.enabled = j["enabled"].get();
- if (j.contains("dim")) cfg.dim = j["dim"].get();
- if (j.contains("model")) cfg.model = j["model"].get();
- if (j.contains("input")) cfg.input_spec = j["input"];
- if (j.contains("provider")) cfg.provider = j["provider"].get();
- if (j.contains("api_base")) cfg.api_base = j["api_base"].get();
- if (j.contains("api_key")) cfg.api_key = j["api_key"].get();
- if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get();
- if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get();
-
- // Apply defaults
- if (cfg.dim <= 0) cfg.dim = 1536;
- if (cfg.batch_size <= 0) cfg.batch_size = 16;
- if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000;
- return cfg;
+ EmbeddingConfig cfg;
+ if (!j.is_object()) return cfg;
+
+ if (j.contains("enabled")) cfg.enabled = j["enabled"].get();
+ if (j.contains("dim")) cfg.dim = j["dim"].get();
+ if (j.contains("model")) cfg.model = j["model"].get();
+ if (j.contains("input")) cfg.input_spec = j["input"];
+ if (j.contains("provider")) cfg.provider = j["provider"].get();
+ if (j.contains("api_base")) cfg.api_base = j["api_base"].get();
+ if (j.contains("api_key")) cfg.api_key = j["api_key"].get();
+ if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get();
+ if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get();
+
+ if (cfg.dim <= 0) cfg.dim = 1536;
+ if (cfg.batch_size <= 0) cfg.batch_size = 16;
+ if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000;
+ return cfg;
}
// ===========================================================================
// Row Access Helpers
// ===========================================================================
-/**
- * @brief Get value from RowMap by key
- *
- * Safely retrieves a value from a RowMap (column name -> value mapping).
- * Returns std::nullopt if the key is not present, allowing the caller
- * to distinguish between missing keys and empty values.
- *
- * @param row The RowMap to query
- * @param key Column name to look up
- * @return std::optional Value if present, nullopt otherwise
- */
static std::optional row_get(const RowMap& row, const std::string& key) {
- auto it = row.find(key);
- if (it == row.end()) return std::nullopt;
- return it->second;
+ auto it = row.find(key);
+ if (it == row.end()) return std::nullopt;
+ return it->second;
}
// ===========================================================================
// Format String Template Engine
// ===========================================================================
-/**
- * @brief Apply format template with row data substitution
- *
- * Replaces {ColumnName} placeholders in a format string with actual
- * values from the row. Used for generating document IDs.
- *
- * Example: "posts:{Id}" with row["Id"] = "123" becomes "posts:123"
- *
- * @param fmt Format string with {ColumnName} placeholders
- * @param row Row data for substitution
- * @return std::string Formatted string with substitutions applied
- *
- * @note Unmatched '{' characters are treated as literals.
- * Missing column names result in empty substitution.
- */
static std::string apply_format(const std::string& fmt, const RowMap& row) {
- std::string out;
- out.reserve(fmt.size() + 32);
-
- for (size_t i = 0; i < fmt.size(); i++) {
- char c = fmt[i];
- if (c == '{') {
- size_t j = fmt.find('}', i + 1);
- if (j == std::string::npos) {
- // unmatched '{' -> treat as literal
- out.push_back(c);
- continue;
- }
- std::string col = fmt.substr(i + 1, j - (i + 1));
- auto v = row_get(row, col);
- if (v.has_value()) out += v.value();
- i = j; // jump past '}'
- } else {
- out.push_back(c);
+ std::string out;
+ out.reserve(fmt.size() + 32);
+
+ for (size_t i = 0; i < fmt.size(); i++) {
+ char c = fmt[i];
+ if (c == '{') {
+ size_t j = fmt.find('}', i + 1);
+ if (j == std::string::npos) {
+ out.push_back(c);
+ continue;
+ }
+ std::string col = fmt.substr(i + 1, j - (i + 1));
+ auto v = row_get(row, col);
+ if (v.has_value()) out += v.value();
+ i = j;
+ } else {
+ out.push_back(c);
+ }
}
- }
- return out;
+ return out;
}
// ===========================================================================
// Concat Specification Evaluator
// ===========================================================================
-/**
- * @brief Evaluate concat specification to build string
- *
- * Processes a concat specification (JSON array) to build a string by
- * concatenating column values, literals, and optionally the chunk body.
- *
- * Supported concat elements:
- * - {"col":"ColumnName"} - Include value from column
- * - {"lit":"literal"} - Include literal string
- * - {"chunk_body":true} - Include chunk body (only for embedding input)
- *
- * @param concat_spec JSON array with concat specification
- * @param row Row data for column substitutions
- * @param chunk_body Chunk body text (used if allow_chunk_body=true)
- * @param allow_chunk_body Whether to allow chunk_body references
- * @return std::string Concatenated result
- */
static std::string eval_concat(const json& concat_spec,
const RowMap& row,
const std::string& chunk_body,
bool allow_chunk_body) {
- if (!concat_spec.is_array()) return "";
-
- std::string out;
- for (const auto& part : concat_spec) {
- if (!part.is_object()) continue;
-
- if (part.contains("col")) {
- // Column reference: {"col":"ColumnName"}
- std::string col = part["col"].get();
- auto v = row_get(row, col);
- if (v.has_value()) out += v.value();
- } else if (part.contains("lit")) {
- // Literal string: {"lit":"some text"}
- out += part["lit"].get();
- } else if (allow_chunk_body && part.contains("chunk_body")) {
- // Chunk body reference: {"chunk_body":true}
- bool yes = part["chunk_body"].get();
- if (yes) out += chunk_body;
+ if (!concat_spec.is_array()) return "";
+
+ std::string out;
+ for (const auto& part : concat_spec) {
+ if (!part.is_object()) continue;
+
+ if (part.contains("col")) {
+ std::string col = part["col"].get();
+ auto v = row_get(row, col);
+ if (v.has_value()) out += v.value();
+ } else if (part.contains("lit")) {
+ out += part["lit"].get();
+ } else if (allow_chunk_body && part.contains("chunk_body")) {
+ bool yes = part["chunk_body"].get();
+ if (yes) out += chunk_body;
+ }
}
- }
- return out;
+ return out;
}
// ===========================================================================
// Metadata Builder
// ===========================================================================
-/**
- * @brief Build metadata JSON from row using specification
- *
- * Creates a metadata JSON object by picking specified columns from
- * the row and optionally renaming keys.
- *
- * @param meta_spec Metadata specification with "pick" and "rename" keys
- * @param row Source row data
- * @return json Metadata object with picked and renamed fields
- *
- * The meta_spec format:
- * @verbatim
- * {
- * "pick": ["Col1", "Col2"], // Columns to include
- * "rename": {"Col1": "col1"} // Old name -> new name mapping
- * }
- * @endverbatim
- */
static json build_metadata(const json& meta_spec, const RowMap& row) {
- json meta = json::object();
-
- if (meta_spec.is_object()) {
- // Pick specified fields
- if (meta_spec.contains("pick") && meta_spec["pick"].is_array()) {
- for (const auto& colv : meta_spec["pick"]) {
- if (!colv.is_string()) continue;
- std::string col = colv.get();
- auto v = row_get(row, col);
- if (v.has_value()) meta[col] = v.value();
- }
- }
+ json meta = json::object();
+
+ if (meta_spec.is_object()) {
+ if (meta_spec.contains("pick") && meta_spec["pick"].is_array()) {
+ for (const auto& colv : meta_spec["pick"]) {
+ if (!colv.is_string()) continue;
+ std::string col = colv.get();
+ auto v = row_get(row, col);
+ if (v.has_value()) meta[col] = v.value();
+ }
+ }
- // Rename keys (must be done in two passes to avoid iterator invalidation)
- if (meta_spec.contains("rename") && meta_spec["rename"].is_object()) {
- std::vector> renames;
- for (auto it = meta_spec["rename"].begin(); it != meta_spec["rename"].end(); ++it) {
- if (!it.value().is_string()) continue;
- renames.push_back({it.key(), it.value().get()});
- }
- for (size_t i = 0; i < renames.size(); i++) {
- const std::string& oldk = renames[i].first;
- const std::string& newk = renames[i].second;
- if (meta.contains(oldk)) {
- meta[newk] = meta[oldk];
- meta.erase(oldk);
+ if (meta_spec.contains("rename") && meta_spec["rename"].is_object()) {
+ std::vector> renames;
+ for (auto it = meta_spec["rename"].begin(); it != meta_spec["rename"].end(); ++it) {
+ if (!it.value().is_string()) continue;
+ renames.push_back({it.key(), it.value().get()});
+ }
+ for (size_t i = 0; i < renames.size(); i++) {
+ const std::string& oldk = renames[i].first;
+ const std::string& newk = renames[i].second;
+ if (meta.contains(oldk)) {
+ meta[newk] = meta[oldk];
+ meta.erase(oldk);
+ }
+ }
}
- }
}
- }
- return meta;
+ return meta;
}
// ===========================================================================
// Text Chunking
// ===========================================================================
-/**
- * @brief Split text into chunks based on character count
- *
- * Divides a text string into overlapping chunks of approximately
- * chunk_size characters. Chunks overlap by 'overlap' characters to
- * preserve context at boundaries. Tiny final chunks are appended
- * to the previous chunk to avoid fragmentation.
- *
- * @param text Input text to chunk
- * @param cfg Chunking configuration
- * @return std::vector Vector of chunk strings
- *
- * @note If chunking is disabled, returns a single chunk containing the full text.
- * @note If text is smaller than chunk_size, returns a single chunk.
- */
static std::vector chunk_text_chars(const std::string& text, const ChunkingConfig& cfg) {
- std::vector chunks;
+ std::vector chunks;
- if (!cfg.enabled) {
- chunks.push_back(text);
- return chunks;
- }
+ if (!cfg.enabled) {
+ chunks.push_back(text);
+ return chunks;
+ }
- if ((int)text.size() <= cfg.chunk_size) {
- chunks.push_back(text);
- return chunks;
- }
+ if ((int)text.size() <= cfg.chunk_size) {
+ chunks.push_back(text);
+ return chunks;
+ }
- int step = cfg.chunk_size - cfg.overlap;
- if (step <= 0) step = cfg.chunk_size;
+ int step = cfg.chunk_size - cfg.overlap;
+ if (step <= 0) step = cfg.chunk_size;
- for (int start = 0; start < (int)text.size(); start += step) {
- int end = start + cfg.chunk_size;
- if (end > (int)text.size()) end = (int)text.size();
- int len = end - start;
- if (len <= 0) break;
+ for (int start = 0; start < (int)text.size(); start += step) {
+ int end = start + cfg.chunk_size;
+ if (end > (int)text.size()) end = (int)text.size();
+ int len = end - start;
+ if (len <= 0) break;
- // Avoid tiny final chunk by appending it to the previous chunk
- if (len < cfg.min_chunk_size && !chunks.empty()) {
- chunks.back() += text.substr(start, len);
- break;
- }
+ if (len < cfg.min_chunk_size && !chunks.empty()) {
+ chunks.back() += text.substr(start, len);
+ break;
+ }
- chunks.push_back(text.substr(start, len));
+ chunks.push_back(text.substr(start, len));
- if (end == (int)text.size()) break;
- }
+ if (end == (int)text.size()) break;
+ }
- return chunks;
+ return chunks;
}
// ===========================================================================
// MySQL Backend Functions
// ===========================================================================
-/**
- * @brief Connect to MySQL backend or terminate
- *
- * Establishes a connection to a MySQL server using the provided
- * source configuration. Sets charset to utf8mb4 for proper handling
- * of Unicode content (e.g., emojis, international text).
- *
- * @param s Source configuration with connection parameters
- * @return MYSQL* MySQL connection handle
- *
- * @note On failure, prints error message and calls fatal() to exit.
- */
static MYSQL* mysql_connect_or_die(const RagSource& s) {
- MYSQL* conn = mysql_init(nullptr);
- if (!conn) fatal("mysql_init failed");
-
- // Set utf8mb4 for safety with StackOverflow-like content
- mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4");
-
- if (!mysql_real_connect(conn,
- s.host.c_str(),
- s.user.c_str(),
- s.pass.c_str(),
- s.db.c_str(),
- s.port,
- nullptr,
- 0)) {
- std::string err = mysql_error(conn);
- mysql_close(conn);
- fatal("MySQL connect failed: " + err);
- }
- return conn;
+ MYSQL* conn = mysql_init(nullptr);
+ if (!conn) fatal("mysql_init failed");
+
+ mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4");
+
+ if (!mysql_real_connect(conn,
+ s.host.c_str(),
+ s.user.c_str(),
+ s.pass.c_str(),
+ s.db.c_str(),
+ s.port,
+ nullptr,
+ 0)) {
+ std::string err = mysql_error(conn);
+ mysql_close(conn);
+ fatal("MySQL connect failed: " + err);
+ }
+ return conn;
}
-/**
- * @brief Convert MySQL result row to RowMap
- *
- * Transforms a raw MySQL row into a column-name -> value map.
- * All values are converted to strings; NULL values become empty strings.
- *
- * @param res MySQL result set (for field metadata)
- * @param row Raw MySQL row data
- * @return RowMap Map of column names to string values
- *
- * @note Uses str_or_empty() to safely handle NULL column values.
- */
static RowMap mysql_row_to_map(MYSQL_RES* res, MYSQL_ROW row) {
- RowMap m;
- unsigned int n = mysql_num_fields(res);
- MYSQL_FIELD* fields = mysql_fetch_fields(res);
-
- for (unsigned int i = 0; i < n; i++) {
- const char* name = fields[i].name;
- const char* val = row[i];
- if (name) {
- m[name] = str_or_empty(val);
+ RowMap m;
+ unsigned int n = mysql_num_fields(res);
+ MYSQL_FIELD* fields = mysql_fetch_fields(res);
+
+ for (unsigned int i = 0; i < n; i++) {
+ const char* name = fields[i].name;
+ const char* val = row[i];
+ if (name) {
+ m[name] = str_or_empty(val);
+ }
}
- }
- return m;
+ return m;
}
// ===========================================================================
// Column Collection
// ===========================================================================
-/**
- * @brief Add string to vector if not already present
- *
- * Helper function to maintain a list of unique column names.
- * Used when building the SELECT query to avoid duplicate columns.
- *
- * @param cols Vector of column names (modified in-place)
- * @param c Column name to add (if not present)
- */
static void add_unique(std::vector& cols, const std::string& c) {
- for (size_t i = 0; i < cols.size(); i++) {
- if (cols[i] == c) return;
- }
- cols.push_back(c);
+ for (size_t i = 0; i < cols.size(); i++) {
+ if (cols[i] == c) return;
+ }
+ cols.push_back(c);
}
-/**
- * @brief Extract column names from concat specification
- *
- * Parses a concat specification and adds all referenced column names
- * to the output vector. Used to build the minimal SELECT query.
- *
- * @param cols Vector of column names (modified in-place)
- * @param concat_spec JSON concat specification to scan
- */
static void collect_cols_from_concat(std::vector& cols, const json& concat_spec) {
- if (!concat_spec.is_array()) return;
- for (const auto& part : concat_spec) {
- if (part.is_object() && part.contains("col") && part["col"].is_string()) {
- add_unique(cols, part["col"].get());
+ if (!concat_spec.is_array()) return;
+ for (const auto& part : concat_spec) {
+ if (part.is_object() && part.contains("col") && part["col"].is_string()) {
+ add_unique(cols, part["col"].get());
+ }
}
- }
}
-/**
- * @brief Collect all columns needed for source ingestion
- *
- * Analyzes doc_map_json and embedding_json to determine which columns
- * must be fetched from the source table. Builds a minimal SELECT query
- * by only including required columns.
- *
- * @param s Source configuration with JSON specifications
- * @param ecfg Embedding configuration
- * @return std::vector List of required column names
- *
- * @note The primary key column is always included.
- * @note Columns in doc_id.format must be manually included in metadata.pick or concat.
- */
static std::vector collect_needed_columns(const RagSource& s, const EmbeddingConfig& ecfg) {
- std::vector cols;
- add_unique(cols, s.pk_column);
-
- // title/body concat
- if (s.doc_map_json.contains("title") && s.doc_map_json["title"].contains("concat"))
- collect_cols_from_concat(cols, s.doc_map_json["title"]["concat"]);
- if (s.doc_map_json.contains("body") && s.doc_map_json["body"].contains("concat"))
- collect_cols_from_concat(cols, s.doc_map_json["body"]["concat"]);
-
- // metadata.pick
- if (s.doc_map_json.contains("metadata") && s.doc_map_json["metadata"].contains("pick")) {
- const auto& pick = s.doc_map_json["metadata"]["pick"];
- if (pick.is_array()) {
- for (const auto& c : pick) if (c.is_string()) add_unique(cols, c.get());
+ std::vector cols;
+ add_unique(cols, s.pk_column);
+
+ if (s.doc_map_json.contains("title") && s.doc_map_json["title"].contains("concat"))
+ collect_cols_from_concat(cols, s.doc_map_json["title"]["concat"]);
+ if (s.doc_map_json.contains("body") && s.doc_map_json["body"].contains("concat"))
+ collect_cols_from_concat(cols, s.doc_map_json["body"]["concat"]);
+
+ if (s.doc_map_json.contains("metadata") && s.doc_map_json["metadata"].contains("pick")) {
+ const auto& pick = s.doc_map_json["metadata"]["pick"];
+ if (pick.is_array()) {
+ for (const auto& c : pick) if (c.is_string()) add_unique(cols, c.get());
+ }
}
- }
- // embedding input concat (optional)
- if (ecfg.enabled && ecfg.input_spec.is_object() && ecfg.input_spec.contains("concat")) {
- collect_cols_from_concat(cols, ecfg.input_spec["concat"]);
- }
+ if (ecfg.enabled && ecfg.input_spec.is_object() && ecfg.input_spec.contains("concat")) {
+ collect_cols_from_concat(cols, ecfg.input_spec["concat"]);
+ }
- return cols;
+ return cols;
}
-/**
- * @brief Build SELECT SQL query for source
- *
- * Constructs a SELECT query that fetches only the required columns
- * from the source table, with optional WHERE clause combining the
- * source's where_sql and an incremental filter.
- *
- * @param s Source configuration
- * @param cols List of column names to SELECT
- * @param extra_filter Additional WHERE clause (e.g., incremental filter)
- * @return std::string Complete SELECT SQL statement
- */
static std::string build_select_sql(const RagSource& s,
const std::vector& cols,
const std::string& extra_filter) {
- std::string sql = "SELECT ";
- for (size_t i = 0; i < cols.size(); i++) {
- if (i) sql += ", ";
- sql += "`" + cols[i] + "`";
- }
- sql += " FROM `" + s.table_name + "`";
- if (!s.where_sql.empty() || !extra_filter.empty()) {
- sql += " WHERE ";
- if (!s.where_sql.empty()) {
- sql += "(" + s.where_sql + ")";
- if (!extra_filter.empty()) sql += " AND ";
+ std::string sql = "SELECT ";
+ for (size_t i = 0; i < cols.size(); i++) {
+ if (i) sql += ", ";
+ sql += "`" + cols[i] + "`";
}
- if (!extra_filter.empty()) sql += "(" + extra_filter + ")";
- }
- return sql;
+ sql += " FROM `" + s.table_name + "`";
+ if (!s.where_sql.empty() || !extra_filter.empty()) {
+ sql += " WHERE ";
+ if (!s.where_sql.empty()) {
+ sql += "(" + s.where_sql + ")";
+ if (!extra_filter.empty()) sql += " AND ";
+ }
+ if (!extra_filter.empty()) sql += "(" + extra_filter + ")";
+ }
+ return sql;
}
// ===========================================================================
// Sync Cursor (Watermark) Management
// ===========================================================================
-// Forward declarations
-static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql);
-static void bind_text(sqlite3_stmt* st, int idx, const std::string& v);
+static json load_sync_cursor_json(MySQLDB& db, int source_id) {
+ char sql[256];
+ snprintf(sql, sizeof(sql), "SELECT cursor_json FROM rag_sync_state WHERE source_id=%d", source_id);
-/**
- * @brief Load sync cursor JSON from database
- *
- * Retrieves the cursor_json for a source from rag_sync_state.
- * Returns empty object if no cursor exists or on error.
- *
- * @param db SQLite database connection
- * @param source_id Source identifier
- * @return json Cursor JSON object (empty if not found)
- */
-static json load_sync_cursor_json(sqlite3* db, int source_id) {
- sqlite3_stmt* st = nullptr;
- json out = json::object();
- const char* sql = "SELECT cursor_json FROM rag_sync_state WHERE source_id=?";
- if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) {
- return out;
- }
- sqlite3_bind_int(st, 1, source_id);
- int rc = sqlite3_step(st);
- if (rc == SQLITE_ROW) {
- const unsigned char* txt = sqlite3_column_text(st, 0);
- if (txt) {
- try {
- out = json::parse(reinterpret_cast(txt));
- } catch (...) {
- out = json::object();
- }
+ MYSQL_RES* res = db.query(sql);
+ json out = json::object();
+
+ MYSQL_ROW row = mysql_fetch_row(res);
+ if (row && row[0]) {
+ try {
+ out = json::parse(row[0]);
+ } catch (...) {
+ out = json::object();
+ }
}
- }
- sqlite3_finalize(st);
- if (!out.is_object()) out = json::object();
- return out;
+
+ mysql_free_result(res);
+ if (!out.is_object()) out = json::object();
+ return out;
}
-/**
- * @brief Parse sync cursor JSON into SyncCursor struct
- *
- * Extracts cursor configuration from JSON and determines the
- * value type (numeric vs string). Used for incremental ingestion.
- *
- * @param cursor_json JSON cursor configuration
- * @param default_col Default column name if not specified
- * @return SyncCursor Parsed cursor configuration
- */
static SyncCursor parse_sync_cursor(const json& cursor_json, const std::string& default_col) {
- SyncCursor c;
- c.column = default_col;
- if (cursor_json.is_object()) {
- if (cursor_json.contains("column") && cursor_json["column"].is_string()) {
- c.column = cursor_json["column"].get();
- }
- if (cursor_json.contains("value")) {
- const auto& v = cursor_json["value"];
- if (v.is_number_integer()) {
- c.has_value = true;
- c.numeric = true;
- c.num_value = v.get();
- } else if (v.is_number_float()) {
- c.has_value = true;
- c.numeric = true;
- c.num_value = static_cast(v.get());
- } else if (v.is_string()) {
- c.has_value = true;
- c.str_value = v.get();
- if (is_integer_string(c.str_value)) {
- c.numeric = true;
- c.num_value = std::stoll(c.str_value);
+ SyncCursor c;
+ c.column = default_col;
+ if (cursor_json.is_object()) {
+ if (cursor_json.contains("column") && cursor_json["column"].is_string()) {
+ c.column = cursor_json["column"].get();
+ }
+ if (cursor_json.contains("value")) {
+ const auto& v = cursor_json["value"];
+ if (v.is_number_integer()) {
+ c.has_value = true;
+ c.numeric = true;
+ c.num_value = v.get();
+ } else if (v.is_number_float()) {
+ c.has_value = true;
+ c.numeric = true;
+ c.num_value = static_cast(v.get());
+ } else if (v.is_string()) {
+ c.has_value = true;
+ c.str_value = v.get();
+ if (is_integer_string(c.str_value)) {
+ c.numeric = true;
+ c.num_value = std::stoll(c.str_value);
+ }
+ }
}
- }
}
- }
- return c;
+ return c;
}
-/**
- * @brief Build incremental filter SQL from cursor
- *
- * Constructs a WHERE clause fragment for incremental ingestion.
- * Returns empty string if cursor has no value set.
- *
- * @param c Sync cursor configuration
- * @return std::string SQL filter (e.g., "`Id` > 123") or empty string
- *
- * @note String values are escaped to prevent SQL injection.
- */
static std::string build_incremental_filter(const SyncCursor& c) {
- if (!c.has_value || c.column.empty()) return "";
- std::string col = "`" + c.column + "`";
- if (c.numeric) {
- return col + " > " + std::to_string(c.num_value);
- }
- return col + " > '" + sql_escape_single_quotes(c.str_value) + "'";
+ if (!c.has_value || c.column.empty()) return "";
+ std::string col = "`" + c.column + "`";
+ if (c.numeric) {
+ return col + " > " + std::to_string(c.num_value);
+ }
+ return col + " > '" + sql_escape_single_quotes(c.str_value) + "'";
}
-/**
- * @brief Update sync state in database
- *
- * Upserts the cursor state into rag_sync_state after successful
- * (or partial) ingestion. Updates the watermark value and last_ok_at.
- *
- * @param db SQLite database connection
- * @param source_id Source identifier
- * @param cursor_json Updated cursor JSON to store
- *
- * @note On error, calls fatal() to terminate the program.
- */
-static void update_sync_state(sqlite3* db, int source_id, const json& cursor_json) {
- const char* sql =
- "INSERT INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) "
- "VALUES(?, 'poll', ?, unixepoch(), NULL) "
- "ON CONFLICT(source_id) DO UPDATE SET "
- "cursor_json=excluded.cursor_json, last_ok_at=excluded.last_ok_at, last_error=NULL";
- sqlite3_stmt* st = nullptr;
- sqlite_prepare_or_die(db, &st, sql);
- sqlite3_bind_int(st, 1, source_id);
- std::string cursor_str = json_dump_compact(cursor_json);
- bind_text(st, 2, cursor_str);
- int rc = sqlite3_step(st);
- sqlite3_finalize(st);
- if (rc != SQLITE_DONE) {
- fatal(std::string("SQLite upsert rag_sync_state failed: ") + sqlite3_errmsg(db));
- }
+static void update_sync_state(MySQLDB& db, int source_id, const json& cursor_json) {
+ std::string cursor_str = json_dump_compact(cursor_json);
+ std::string escaped_cursor = sql_escape_single_quotes(cursor_str);
+
+ // Use std::ostringstream to avoid fixed buffer size issues
+ std::ostringstream sql;
+ sql << "INSERT INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) "
+ << "VALUES(" << source_id << ", 'poll', '" << escaped_cursor << "', unixepoch(), NULL) "
+ << "ON CONFLICT(source_id) DO UPDATE SET "
+ << "cursor_json='" << escaped_cursor << "', last_ok_at=unixepoch(), last_error=NULL";
+
+ db.execute(sql.str().c_str());
}
// ===========================================================================
-// SQLite Prepared Statements
+// Document Operations (via MySQL)
// ===========================================================================
-/**
- * @brief Collection of prepared SQLite statements
- *
- * Holds all prepared statements needed for ingestion.
- * Prepared statements are reused for efficiency and SQL injection safety.
- */
-struct SqliteStmts {
- sqlite3_stmt* doc_exists = nullptr; ///< Check if document exists
- sqlite3_stmt* ins_doc = nullptr; ///< Insert document
- sqlite3_stmt* ins_chunk = nullptr; ///< Insert chunk
- sqlite3_stmt* ins_fts = nullptr; ///< Insert into FTS index
- sqlite3_stmt* ins_vec = nullptr; ///< Insert vector embedding
-};
-
-/**
- * @brief Prepare SQLite statement or terminate
- *
- * Prepares a SQLite statement and calls fatal() on failure.
- * Used for statements that must succeed for the program to continue.
- *
- * @param db SQLite database connection
- * @param st Output parameter for prepared statement handle
- * @param sql SQL statement to prepare
- *
- * @note On failure, prints error and calls fatal() to exit.
- */
-static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql) {
- if (sqlite3_prepare_v2(db, sql, -1, st, nullptr) != SQLITE_OK) {
- fatal(std::string("SQLite prepare failed: ") + sqlite3_errmsg(db) + "\nSQL: " + sql);
- }
-}
-
-/**
- * @brief Finalize all statements in SqliteStmts
- *
- * Releases all prepared statement resources.
- *
- * @param s Statement collection to finalize (reset to default state)
- */
-static void sqlite_finalize_all(SqliteStmts& s) {
- if (s.doc_exists) sqlite3_finalize(s.doc_exists);
- if (s.ins_doc) sqlite3_finalize(s.ins_doc);
- if (s.ins_chunk) sqlite3_finalize(s.ins_chunk);
- if (s.ins_fts) sqlite3_finalize(s.ins_fts);
- if (s.ins_vec) sqlite3_finalize(s.ins_vec);
- s = SqliteStmts{};
-}
+static bool doc_exists(MySQLDB& db, const std::string& doc_id) {
+ std::string escaped_id = sql_escape_single_quotes(doc_id);
+ std::ostringstream sql;
+ sql << "SELECT 1 FROM rag_documents WHERE doc_id = '" << escaped_id << "' LIMIT 1";
-/**
- * @brief Bind text parameter to SQLite statement
- *
- * Binds a std::string value to a prepared statement parameter.
- * Uses SQLITE_TRANSIENT for string management (SQLite makes a copy).
- *
- * @param st Prepared statement
- * @param idx Parameter index (1-based)
- * @param v String value to bind
- */
-static void bind_text(sqlite3_stmt* st, int idx, const std::string& v) {
- sqlite3_bind_text(st, idx, v.c_str(), -1, SQLITE_TRANSIENT);
-}
+ MYSQL_RES* res = db.query(sql.str().c_str());
+ my_ulonglong rows = mysql_num_rows(res);
+ mysql_free_result(res);
-/**
- * @brief Bind vector embedding to SQLite statement
- *
- * Binds a float32 array as a BLOB for sqlite3-vec.
- * This is the "best effort" binding format that works with most
- * sqlite3-vec builds. Modify if your build expects different format.
- *
- * @param st Prepared statement
- * @param idx Parameter index (1-based)
- * @param emb Float vector to bind
- *
- * @note The binding format is build-dependent. If vec operations fail,
- * check your sqlite3-vec build's expected format.
- */
-static void bind_vec_embedding(sqlite3_stmt* st, int idx, const std::vector& emb) {
- const void* data = (const void*)emb.data();
- int bytes = (int)(emb.size() * sizeof(float));
- sqlite3_bind_blob(st, idx, data, bytes, SQLITE_TRANSIENT);
+ return rows > 0;
}
-/**
- * @brief Check if document exists in database
- *
- * Tests whether a document with the given doc_id already exists
- * in rag_documents. Used for skip-if-exists logic in v0.
- *
- * @param ss Statement collection
- * @param doc_id Document identifier to check
- * @return true if document exists, false otherwise
- */
-static bool sqlite_doc_exists(SqliteStmts& ss, const std::string& doc_id) {
- sqlite3_reset(ss.doc_exists);
- sqlite3_clear_bindings(ss.doc_exists);
-
- bind_text(ss.doc_exists, 1, doc_id);
-
- int rc = sqlite3_step(ss.doc_exists);
- return (rc == SQLITE_ROW);
+static void insert_doc(MySQLDB& db,
+ int source_id,
+ const std::string& source_name,
+ const std::string& doc_id,
+ const std::string& pk_json,
+ const std::string& title,
+ const std::string& body,
+ const std::string& meta_json) {
+ std::string e_doc_id = sql_escape_single_quotes(doc_id);
+ std::string e_source_name = sql_escape_single_quotes(source_name);
+ std::string e_pk_json = sql_escape_single_quotes(pk_json);
+ std::string e_title = sql_escape_single_quotes(title);
+ std::string e_body = sql_escape_single_quotes(body);
+ std::string e_meta = sql_escape_single_quotes(meta_json);
+
+ // Use std::ostringstream to avoid fixed buffer size issues
+ std::ostringstream sql;
+ sql << "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json) "
+ << "VALUES('" << e_doc_id << "', " << source_id << ", '" << e_source_name << "', '"
+ << e_pk_json << "', '" << e_title << "', '" << e_body << "', '" << e_meta << "')";
+
+ db.execute(sql.str().c_str());
}
-/**
- * @brief Insert document into rag_documents table
- *
- * Inserts a new document record with all metadata.
- *
- * @param ss Statement collection
- * @param source_id Source identifier
- * @param source_name Source name
- * @param doc_id Document identifier
- * @param pk_json Primary key JSON (for refetch)
- * @param title Document title
- * @param body Document body
- * @param meta_json Metadata JSON object
- *
- * @note On failure, calls fatal() to terminate.
- */
-static void sqlite_insert_doc(SqliteStmts& ss,
- int source_id,
- const std::string& source_name,
- const std::string& doc_id,
- const std::string& pk_json,
- const std::string& title,
- const std::string& body,
- const std::string& meta_json) {
- sqlite3_reset(ss.ins_doc);
- sqlite3_clear_bindings(ss.ins_doc);
-
- bind_text(ss.ins_doc, 1, doc_id);
- sqlite3_bind_int(ss.ins_doc, 2, source_id);
- bind_text(ss.ins_doc, 3, source_name);
- bind_text(ss.ins_doc, 4, pk_json);
- bind_text(ss.ins_doc, 5, title);
- bind_text(ss.ins_doc, 6, body);
- bind_text(ss.ins_doc, 7, meta_json);
-
- int rc = sqlite3_step(ss.ins_doc);
- if (rc != SQLITE_DONE) {
- fatal(std::string("SQLite insert rag_documents failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_doc)));
- }
+static void insert_chunk(MySQLDB& db,
+ const std::string& chunk_id,
+ const std::string& doc_id,
+ int source_id,
+ int chunk_index,
+ const std::string& title,
+ const std::string& body,
+ const std::string& meta_json) {
+ std::string e_chunk_id = sql_escape_single_quotes(chunk_id);
+ std::string e_doc_id = sql_escape_single_quotes(doc_id);
+ std::string e_title = sql_escape_single_quotes(title);
+ std::string e_body = sql_escape_single_quotes(body);
+ std::string e_meta = sql_escape_single_quotes(meta_json);
+
+ // Use std::ostringstream to avoid fixed buffer size issues
+ std::ostringstream sql;
+ sql << "INSERT INTO rag_chunks(chunk_id, doc_id, source_id, chunk_index, title, body, metadata_json) "
+ << "VALUES('" << e_chunk_id << "', '" << e_doc_id << "', " << source_id << ", " << chunk_index
+ << ", '" << e_title << "', '" << e_body << "', '" << e_meta << "')";
+
+ db.execute(sql.str().c_str());
}
-/**
- * @brief Insert chunk into rag_chunks table
- *
- * Inserts a new chunk record with title, body, and metadata.
- *
- * @param ss Statement collection
- * @param chunk_id Chunk identifier (typically doc_id#index)
- * @param doc_id Parent document identifier
- * @param source_id Source identifier
- * @param chunk_index Zero-based chunk index within document
- * @param title Chunk title
- * @param body Chunk body text
- * @param meta_json Chunk metadata JSON
- *
- * @note On failure, calls fatal() to terminate.
- */
-static void sqlite_insert_chunk(SqliteStmts& ss,
- const std::string& chunk_id,
- const std::string& doc_id,
- int source_id,
- int chunk_index,
- const std::string& title,
- const std::string& body,
- const std::string& meta_json) {
- sqlite3_reset(ss.ins_chunk);
- sqlite3_clear_bindings(ss.ins_chunk);
-
- bind_text(ss.ins_chunk, 1, chunk_id);
- bind_text(ss.ins_chunk, 2, doc_id);
- sqlite3_bind_int(ss.ins_chunk, 3, source_id);
- sqlite3_bind_int(ss.ins_chunk, 4, chunk_index);
- bind_text(ss.ins_chunk, 5, title);
- bind_text(ss.ins_chunk, 6, body);
- bind_text(ss.ins_chunk, 7, meta_json);
-
- int rc = sqlite3_step(ss.ins_chunk);
- if (rc != SQLITE_DONE) {
- fatal(std::string("SQLite insert rag_chunks failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_chunk)));
- }
-}
+static void insert_fts(MySQLDB& db,
+ const std::string& chunk_id,
+ const std::string& title,
+ const std::string& body) {
+ std::string e_chunk_id = sql_escape_single_quotes(chunk_id);
+ std::string e_title = sql_escape_single_quotes(title);
+ std::string e_body = sql_escape_single_quotes(body);
-/**
- * @brief Insert chunk into FTS index
- *
- * Inserts a chunk into the rag_fts_chunks FTS5 table for
- * full-text search. The FTS table is contentless (text stored
- * only in rag_chunks, FTS only maintains index).
- *
- * @param ss Statement collection
- * @param chunk_id Chunk identifier
- * @param title Chunk title
- * @param body Chunk body text
- *
- * @note On failure, calls fatal() to terminate.
- */
-static void sqlite_insert_fts(SqliteStmts& ss,
- const std::string& chunk_id,
- const std::string& title,
- const std::string& body) {
- sqlite3_reset(ss.ins_fts);
- sqlite3_clear_bindings(ss.ins_fts);
-
- bind_text(ss.ins_fts, 1, chunk_id);
- bind_text(ss.ins_fts, 2, title);
- bind_text(ss.ins_fts, 3, body);
-
- int rc = sqlite3_step(ss.ins_fts);
- if (rc != SQLITE_DONE) {
- fatal(std::string("SQLite insert rag_fts_chunks failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_fts)));
- }
-}
+ // Use std::ostringstream to avoid fixed buffer size issues
+ std::ostringstream sql;
+ sql << "INSERT INTO rag_fts_chunks(chunk_id, title, body) "
+ << "VALUES('" << e_chunk_id << "', '" << e_title << "', '" << e_body << "')";
-/**
- * @brief Insert vector embedding into rag_vec_chunks table
- *
- * Inserts a vector embedding along with metadata for vector
- * similarity search using sqlite3-vec.
- *
- * Schema: rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at)
- *
- * @param ss Statement collection
- * @param emb Float32 vector embedding
- * @param chunk_id Chunk identifier
- * @param doc_id Parent document identifier
- * @param source_id Source identifier
- * @param updated_at_unixepoch Unix epoch timestamp (0 for "now")
- *
- * @note If ss.ins_vec is null (embedding disabled), returns without error.
- * @note On failure, calls fatal() with vec-specific error message.
- */
-static void sqlite_insert_vec(SqliteStmts& ss,
- const std::vector& emb,
- const std::string& chunk_id,
- const std::string& doc_id,
- int source_id,
- std::int64_t updated_at_unixepoch) {
- if (!ss.ins_vec) return;
-
- sqlite3_reset(ss.ins_vec);
- sqlite3_clear_bindings(ss.ins_vec);
-
- bind_vec_embedding(ss.ins_vec, 1, emb);
- bind_text(ss.ins_vec, 2, chunk_id);
- bind_text(ss.ins_vec, 3, doc_id);
- sqlite3_bind_int(ss.ins_vec, 4, source_id);
- sqlite3_bind_int64(ss.ins_vec, 5, (sqlite3_int64)updated_at_unixepoch);
-
- int rc = sqlite3_step(ss.ins_vec);
- if (rc != SQLITE_DONE) {
- // Vec-specific error message for troubleshooting
- fatal(std::string("SQLite insert rag_vec_chunks failed (check vec binding format): ")
- + sqlite3_errmsg(sqlite3_db_handle(ss.ins_vec)));
- }
+ db.execute(sql.str().c_str());
}
// ===========================================================================
// Embedding Generation
// ===========================================================================
-/**
- * @brief Generate deterministic pseudo-embedding from text
- *
- * This is a stub implementation that generates a deterministic but
- * non-semantic embedding from input text. Used for testing when
- * a real embedding service is not available.
- *
- * The algorithm uses a rolling hash to distribute values across
- * vector dimensions, then normalizes. Results are deterministic
- * (same input always produces same output) but NOT semantic.
- *
- * @param text Input text to "embed"
- * @param dim Vector dimension
- * @return std::vector Deterministic pseudo-embedding
- *
- * @warning This does NOT produce semantic embeddings. Only for testing.
- */
static std::vector pseudo_embedding(const std::string& text, int dim) {
- std::vector v;
- v.resize((size_t)dim, 0.0f);
-
- // FNV-1a-like rolling hash accumulation into float bins
- std::uint64_t h = 1469598103934665603ULL; // FNV offset basis
- for (size_t i = 0; i < text.size(); i++) {
- h ^= (unsigned char)text[i];
- h *= 1099511628211ULL; // FNV prime
-
- // Spread influence into bins based on hash
- size_t idx = (size_t)(h % (std::uint64_t)dim);
- float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; // Normalize to 0..1
- v[idx] += (val - 0.5f); // Center around 0
- }
-
- // L2 normalization
- double norm = 0.0;
- for (int i = 0; i < dim; i++) norm += (double)v[(size_t)i] * (double)v[(size_t)i];
- norm = std::sqrt(norm);
- if (norm > 1e-12) {
- for (int i = 0; i < dim; i++) v[(size_t)i] = (float)(v[(size_t)i] / norm);
- }
- return v;
+ std::vector v;
+ v.resize((size_t)dim, 0.0f);
+
+ std::uint64_t h = 1469598103934665603ULL;
+ for (size_t i = 0; i < text.size(); i++) {
+ h ^= (unsigned char)text[i];
+ h *= 1099511628211ULL;
+
+ size_t idx = (size_t)(h % (std::uint64_t)dim);
+ float val = (float)((h >> 32) & 0xFFFF) / 65535.0f;
+ v[idx] += (val - 0.5f);
+ }
+
+ double norm = 0.0;
+ for (int i = 0; i < dim; i++) norm += (double)v[(size_t)i] * (double)v[(size_t)i];
+ norm = std::sqrt(norm);
+ if (norm > 1e-12) {
+ for (int i = 0; i < dim; i++) v[(size_t)i] = (float)(v[(size_t)i] / norm);
+ }
+ return v;
}
-/**
- * @brief Abstract embedding provider interface
- *
- * Base class for embedding providers. Concrete implementations
- * include StubEmbeddingProvider and OpenAIEmbeddingProvider.
- */
struct EmbeddingProvider {
- /**
- * @brief Virtual destructor for safe polymorphic deletion
- */
- virtual ~EmbeddingProvider() = default;
-
- /**
- * @brief Generate embeddings for multiple input strings
- *
- * @param inputs Vector of input texts to embed
- * @param dim Expected embedding dimension
- * @return std::vector> Vector of embeddings (one per input)
- *
- * @note The returned vector must have exactly inputs.size() embeddings.
- * @note Each embedding must have exactly dim float values.
- *
- * @throws std::runtime_error on embedding generation failure
- */
- virtual std::vector> embed(const std::vector& inputs, int dim) = 0;
+ virtual ~EmbeddingProvider() = default;
+ virtual std::vector> embed(const std::vector& inputs, int dim) = 0;
};
-/**
- * @brief Stub embedding provider for testing
- *
- * Uses pseudo_embedding() to generate deterministic but non-semantic
- * embeddings. Used when no real embedding service is available.
- */
struct StubEmbeddingProvider : public EmbeddingProvider {
- /**
- * @brief Generate pseudo-embeddings for inputs
- *
- * @param inputs Vector of input texts
- * @param dim Embedding dimension
- * @return std::vector> Deterministic pseudo-embeddings
- */
- std::vector> embed(const std::vector& inputs, int dim) override {
- std::vector> out;
- out.reserve(inputs.size());
- for (const auto& s : inputs) out.push_back(pseudo_embedding(s, dim));
- return out;
- }
+ std::vector> embed(const std::vector& inputs, int dim) override {
+ std::vector> out;
+ out.reserve(inputs.size());
+ for (const auto& s : inputs) out.push_back(pseudo_embedding(s, dim));
+ return out;
+ }
};
-/**
- * @brief Buffer for curl HTTP response data
- *
- * Simple string buffer used by curl write callback to accumulate
- * HTTP response data.
- */
struct CurlBuffer {
- std::string data; ///< Accumulated response data
+ std::string data;
};
-/**
- * @brief Curl write callback function
- *
- * Callback function passed to libcurl to handle incoming response data.
- * Appends received data to the CurlBuffer.
- *
- * @param contents Pointer to received data
- * @param size Size of each data element
- * @param nmemb Number of elements
- * @param userp User pointer (must point to CurlBuffer)
- * @return size_t Total bytes processed
- */
static size_t curl_write_cb(void* contents, size_t size, size_t nmemb, void* userp) {
- size_t total = size * nmemb;
- CurlBuffer* buf = static_cast(userp);
- buf->data.append(static_cast(contents), total);
- return total;
+ size_t total = size * nmemb;
+ CurlBuffer* buf = static_cast(userp);
+ buf->data.append(static_cast(contents), total);
+ return total;
}
-/**
- * @brief OpenAI-compatible embedding provider
- *
- * Calls an OpenAI-compatible HTTP API to generate embeddings.
- * Works with OpenAI, Azure OpenAI, or any compatible service.
- */
struct OpenAIEmbeddingProvider : public EmbeddingProvider {
- std::string api_base; ///< API base URL (e.g., "https://api.openai.com/v1")
- std::string api_key; ///< API authentication key
- std::string model; ///< Model name (e.g., "text-embedding-3-large")
- int timeout_ms = 20000; ///< Request timeout in milliseconds
-
- /**
- * @brief Construct OpenAI embedding provider
- *
- * @param base API base URL
- * @param key API key
- * @param mdl Model name
- * @param timeout Request timeout
- */
- OpenAIEmbeddingProvider(std::string base, std::string key, std::string mdl, int timeout)
- : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {}
-
- /**
- * @brief Generate embeddings via OpenAI-compatible API
- *
- * Sends a batch of texts to the embedding API and returns
- * the generated embeddings.
- *
- * @param inputs Vector of input texts to embed
- * @param dim Expected embedding dimension
- * @return std::vector> Generated embeddings
- *
- * @throws std::runtime_error on API error, timeout, or invalid response
- *
- * @note The inputs are sent in a single batch. For large batches,
- * consider splitting into multiple calls.
- */
- std::vector> embed(const std::vector& inputs, int dim) override {
- // Validate configuration
- if (api_base.empty()) {
- throw std::runtime_error("embedding api_base is empty");
- }
- if (api_key.empty()) {
- throw std::runtime_error("embedding api_key is empty");
- }
- if (model.empty()) {
- throw std::runtime_error("embedding model is empty");
- }
- // Note: Some providers expect "hf:" prefix for HuggingFace models
- if (model.rfind("hf:", 0) != 0) {
- std::cerr << "WARN: embedding model should be prefixed with 'hf:' per Synthetic docs\n";
- }
-
- // Build request JSON
- json req;
- req["model"] = model;
- req["input"] = inputs;
- if (dim > 0) {
- req["dimensions"] = dim;
- }
- std::string body = req.dump();
-
- // Build URL
- std::string url = api_base;
- if (!url.empty() && url.back() == '/') url.pop_back();
- url += "/embeddings";
-
- // Execute HTTP request
- CURL* curl = curl_easy_init();
- if (!curl) {
- throw std::runtime_error("curl_easy_init failed");
- }
-
- CurlBuffer buf;
- struct curl_slist* headers = nullptr;
- std::string auth = "Authorization: Bearer " + api_key;
- headers = curl_slist_append(headers, "Content-Type: application/json");
- headers = curl_slist_append(headers, auth.c_str());
-
- curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
- curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
- curl_easy_setopt(curl, CURLOPT_POST, 1L);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
- curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)body.size());
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_cb);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buf);
- curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms);
-
- CURLcode res = curl_easy_perform(curl);
- long status = 0;
- curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
-
- curl_slist_free_all(headers);
- curl_easy_cleanup(curl);
-
- // Check for errors
- if (res != CURLE_OK) {
- throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res));
- }
- if (status < 200 || status >= 300) {
- throw std::runtime_error("embedding request failed with status " + std::to_string(status));
- }
-
- // Parse response
- json resp = json::parse(buf.data);
- if (!resp.contains("data") || !resp["data"].is_array()) {
- throw std::runtime_error("embedding response missing data array");
- }
-
- // Extract embeddings
- std::vector> out;
- out.reserve(resp["data"].size());
- for (const auto& item : resp["data"]) {
- if (!item.contains("embedding") || !item["embedding"].is_array()) {
- throw std::runtime_error("embedding item missing embedding array");
- }
- std::vector vec;
- vec.reserve(item["embedding"].size());
- for (const auto& v : item["embedding"]) {
- vec.push_back(v.get());
- }
- if ((int)vec.size() != dim) {
- throw std::runtime_error("embedding dimension mismatch: expected " + std::to_string(dim)
- + ", got " + std::to_string(vec.size()));
- }
- out.push_back(std::move(vec));
- }
+ std::string api_base;
+ std::string api_key;
+ std::string model;
+ int timeout_ms = 20000;
+
+ OpenAIEmbeddingProvider(std::string base, std::string key, std::string mdl, int timeout)
+ : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {}
+
+ std::vector> embed(const std::vector& inputs, int dim) override {
+ if (api_base.empty()) throw std::runtime_error("embedding api_base is empty");
+ if (api_key.empty()) throw std::runtime_error("embedding api_key is empty");
+ if (model.empty()) throw std::runtime_error("embedding model is empty");
+
+ json req;
+ req["model"] = model;
+ req["input"] = inputs;
+ if (dim > 0) req["dimensions"] = dim;
+ std::string body = req.dump();
+
+ std::string url = api_base;
+ if (!url.empty() && url.back() == '/') url.pop_back();
+ url += "/embeddings";
+
+ CURL* curl = curl_easy_init();
+ if (!curl) throw std::runtime_error("curl_easy_init failed");
+
+ CurlBuffer buf;
+ struct curl_slist* headers = nullptr;
+ std::string auth = "Authorization: Bearer " + api_key;
+ headers = curl_slist_append(headers, "Content-Type: application/json");
+ headers = curl_slist_append(headers, auth.c_str());
+
+ curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
+ curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
+ curl_easy_setopt(curl, CURLOPT_POST, 1L);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)body.size());
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_cb);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buf);
+ curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms);
+
+ CURLcode res = curl_easy_perform(curl);
+ long status = 0;
+ curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
+
+ curl_slist_free_all(headers);
+ curl_easy_cleanup(curl);
+
+ if (res != CURLE_OK) throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res));
+ if (status < 200 || status >= 300) throw std::runtime_error("embedding request failed with status " + std::to_string(status));
+
+ json resp = json::parse(buf.data);
+ if (!resp.contains("data") || !resp["data"].is_array()) throw std::runtime_error("embedding response missing data array");
+
+ std::vector> out;
+ out.reserve(resp["data"].size());
+ for (const auto& item : resp["data"]) {
+ if (!item.contains("embedding") || !item["embedding"].is_array()) throw std::runtime_error("embedding item missing embedding array");
+ std::vector vec;
+ vec.reserve(item["embedding"].size());
+ for (const auto& v : item["embedding"]) vec.push_back(v.get());
+ if ((int)vec.size() != dim) throw std::runtime_error("embedding dimension mismatch");
+ out.push_back(std::move(vec));
+ }
- if (out.size() != inputs.size()) {
- throw std::runtime_error("embedding response size mismatch");
+ if (out.size() != inputs.size()) throw std::runtime_error("embedding response size mismatch");
+ return out;
}
- return out;
- }
};
-/**
- * @brief Build embedding provider from configuration
- *
- * Factory function that creates the appropriate embedding provider
- * based on the configuration.
- *
- * @param cfg Embedding configuration
- * @return std::unique_ptr Configured provider instance
- *
- * @note Currently supports "stub" and "openai" providers.
- * "stub" is returned for unknown provider types.
- */
static std::unique_ptr build_embedding_provider(const EmbeddingConfig& cfg) {
- if (cfg.provider == "openai") {
- return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms);
- }
- return std::make_unique();
+ if (cfg.provider == "openai") {
+ return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms);
+ }
+ return std::make_unique();
}
// ===========================================================================
// Source Loading
// ===========================================================================
-/**
- * @brief Load enabled sources from rag_sources table
- *
- * Queries the rag_sources table and parses all enabled sources
- * into RagSource structs. Validates JSON configurations.
- *
- * @param db SQLite database connection
- * @return std::vector Vector of enabled source configurations
- *
- * @note On JSON parsing error, calls fatal() to terminate.
- * @note Only sources with enabled=1 are loaded.
- */
-static std::vector load_sources(sqlite3* db) {
- std::vector out;
-
- const char* sql =
- "SELECT source_id, name, enabled, "
- "backend_type, backend_host, backend_port, backend_user, backend_pass, backend_db, "
- "table_name, pk_column, COALESCE(where_sql,''), "
- "doc_map_json, chunking_json, COALESCE(embedding_json,'') "
- "FROM rag_sources WHERE enabled = 1";
-
- sqlite3_stmt* st = nullptr;
- sqlite_prepare_or_die(db, &st, sql);
-
- while (sqlite3_step(st) == SQLITE_ROW) {
- RagSource s;
- s.source_id = sqlite3_column_int(st, 0);
- s.name = str_or_empty((const char*)sqlite3_column_text(st, 1));
- s.enabled = sqlite3_column_int(st, 2);
-
- s.backend_type = str_or_empty((const char*)sqlite3_column_text(st, 3));
- s.host = str_or_empty((const char*)sqlite3_column_text(st, 4));
- s.port = sqlite3_column_int(st, 5);
- s.user = str_or_empty((const char*)sqlite3_column_text(st, 6));
- s.pass = str_or_empty((const char*)sqlite3_column_text(st, 7));
- s.db = str_or_empty((const char*)sqlite3_column_text(st, 8));
-
- s.table_name = str_or_empty((const char*)sqlite3_column_text(st, 9));
- s.pk_column = str_or_empty((const char*)sqlite3_column_text(st, 10));
- s.where_sql = str_or_empty((const char*)sqlite3_column_text(st, 11));
-
- const char* doc_map = (const char*)sqlite3_column_text(st, 12);
- const char* chunk_j = (const char*)sqlite3_column_text(st, 13);
- const char* emb_j = (const char*)sqlite3_column_text(st, 14);
-
- try {
- s.doc_map_json = json::parse(doc_map ? doc_map : "{}");
- s.chunking_json = json::parse(chunk_j ? chunk_j : "{}");
- if (emb_j && std::strlen(emb_j) > 0) s.embedding_json = json::parse(emb_j);
- else s.embedding_json = json(); // null
- } catch (const std::exception& e) {
- sqlite3_finalize(st);
- fatal("Invalid JSON in rag_sources.source_id=" + std::to_string(s.source_id) + ": " + e.what());
- }
+static std::vector load_sources(MySQLDB& db) {
+ std::vector out;
+
+ const char* sql =
+ "SELECT source_id, name, enabled, "
+ "backend_type, backend_host, backend_port, backend_user, backend_pass, backend_db, "
+ "table_name, pk_column, COALESCE(where_sql,''), "
+ "doc_map_json, chunking_json, COALESCE(embedding_json,'') "
+ "FROM rag_sources WHERE enabled = 1";
+
+ MYSQL_RES* res = db.query(sql);
+ MYSQL_FIELD* fields = mysql_fetch_fields(res);
+
+ MYSQL_ROW row;
+ while ((row = mysql_fetch_row(res)) != nullptr) {
+ RagSource s;
+ s.source_id = atoi(row[0]);
+ s.name = str_or_empty(row[1]);
+ s.enabled = atoi(row[2]);
+
+ s.backend_type = str_or_empty(row[3]);
+ s.host = str_or_empty(row[4]);
+ s.port = atoi(row[5]);
+ s.user = str_or_empty(row[6]);
+ s.pass = str_or_empty(row[7]);
+ s.db = str_or_empty(row[8]);
+
+ s.table_name = str_or_empty(row[9]);
+ s.pk_column = str_or_empty(row[10]);
+ s.where_sql = str_or_empty(row[11]);
+
+ const char* doc_map = row[12];
+ const char* chunk_j = row[13];
+ const char* emb_j = row[14];
+
+ try {
+ s.doc_map_json = json::parse(doc_map ? doc_map : "{}");
+ s.chunking_json = json::parse(chunk_j ? chunk_j : "{}");
+ if (emb_j && std::strlen(emb_j) > 0) s.embedding_json = json::parse(emb_j);
+ else s.embedding_json = json();
+ } catch (const std::exception& e) {
+ mysql_free_result(res);
+ fatal("Invalid JSON in rag_sources.source_id=" + std::to_string(s.source_id) + ": " + e.what());
+ }
- // Basic validation (fail fast)
- if (!s.doc_map_json.is_object()) {
- sqlite3_finalize(st);
- fatal("doc_map_json must be a JSON object for source_id=" + std::to_string(s.source_id));
- }
- if (!s.chunking_json.is_object()) {
- sqlite3_finalize(st);
- fatal("chunking_json must be a JSON object for source_id=" + std::to_string(s.source_id));
- }
+ if (!s.doc_map_json.is_object()) {
+ mysql_free_result(res);
+ fatal("doc_map_json must be a JSON object for source_id=" + std::to_string(s.source_id));
+ }
+ if (!s.chunking_json.is_object()) {
+ mysql_free_result(res);
+ fatal("chunking_json must be a JSON object for source_id=" + std::to_string(s.source_id));
+ }
- out.push_back(std::move(s));
- }
+ out.push_back(std::move(s));
+ }
- sqlite3_finalize(st);
- return out;
+ mysql_free_result(res);
+ return out;
}
// ===========================================================================
// Document Building
// ===========================================================================
-/**
- * @brief Canonical document representation
- *
- * Contains all fields of a document in its canonical form,
- * ready for insertion into rag_documents and chunking.
- */
struct BuiltDoc {
- std::string doc_id; ///< Unique document identifier
- std::string pk_json; ///< Primary key JSON (for refetch)
- std::string title; ///< Document title
- std::string body; ///< Document body (to be chunked)
- std::string metadata_json; ///< Metadata JSON object
+ std::string doc_id;
+ std::string pk_json;
+ std::string title;
+ std::string body;
+ std::string metadata_json;
};
-/**
- * @brief Build canonical document from source row
- *
- * Transforms a raw backend row into a canonical document using
- * the doc_map_json specification. Handles doc_id formatting,
- * title/body concatenation, and metadata building.
- *
- * @param src Source configuration with doc_map_json
- * @param row Raw row data from backend
- * @return BuiltDoc Canonical document representation
- */
static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) {
- BuiltDoc d;
-
- // Build doc_id from format template
- if (src.doc_map_json.contains("doc_id") && src.doc_map_json["doc_id"].is_object()
- && src.doc_map_json["doc_id"].contains("format") && src.doc_map_json["doc_id"]["format"].is_string()) {
- d.doc_id = apply_format(src.doc_map_json["doc_id"]["format"].get(), row);
- } else {
- // Fallback: table:pk format
- auto pk = row_get(row, src.pk_column).value_or("");
- d.doc_id = src.table_name + ":" + pk;
- }
-
- // Build pk_json (refetch pointer)
- json pk = json::object();
- pk[src.pk_column] = row_get(row, src.pk_column).value_or("");
- d.pk_json = json_dump_compact(pk);
-
- // Build title from concat spec
- if (src.doc_map_json.contains("title") && src.doc_map_json["title"].is_object()
- && src.doc_map_json["title"].contains("concat")) {
- d.title = eval_concat(src.doc_map_json["title"]["concat"], row, "", false);
- } else {
- d.title = "";
- }
-
- // Build body from concat spec
- if (src.doc_map_json.contains("body") && src.doc_map_json["body"].is_object()
- && src.doc_map_json["body"].contains("concat")) {
- d.body = eval_concat(src.doc_map_json["body"]["concat"], row, "", false);
- } else {
- d.body = "";
- }
-
- // Build metadata JSON
- json meta = json::object();
- if (src.doc_map_json.contains("metadata")) {
- meta = build_metadata(src.doc_map_json["metadata"], row);
- }
- d.metadata_json = json_dump_compact(meta);
-
- return d;
+ BuiltDoc d;
+
+ if (src.doc_map_json.contains("doc_id") && src.doc_map_json["doc_id"].is_object()
+ && src.doc_map_json["doc_id"].contains("format") && src.doc_map_json["doc_id"]["format"].is_string()) {
+ d.doc_id = apply_format(src.doc_map_json["doc_id"]["format"].get(), row);
+ } else {
+ auto pk = row_get(row, src.pk_column).value_or("");
+ d.doc_id = src.table_name + ":" + pk;
+ }
+
+ json pk = json::object();
+ pk[src.pk_column] = row_get(row, src.pk_column).value_or("");
+ d.pk_json = json_dump_compact(pk);
+
+ if (src.doc_map_json.contains("title") && src.doc_map_json["title"].is_object()
+ && src.doc_map_json["title"].contains("concat")) {
+ d.title = eval_concat(src.doc_map_json["title"]["concat"], row, "", false);
+ } else {
+ d.title = "";
+ }
+
+ if (src.doc_map_json.contains("body") && src.doc_map_json["body"].is_object()
+ && src.doc_map_json["body"].contains("concat")) {
+ d.body = eval_concat(src.doc_map_json["body"]["concat"], row, "", false);
+ } else {
+ d.body = "";
+ }
+
+ json meta = json::object();
+ if (src.doc_map_json.contains("metadata")) {
+ meta = build_metadata(src.doc_map_json["metadata"], row);
+ }
+ d.metadata_json = json_dump_compact(meta);
+
+ return d;
}
// ===========================================================================
// Embedding Input Builder
// ===========================================================================
-/**
- * @brief Build embedding input text for a chunk
- *
- * Constructs the text to be embedded by applying the embedding_json
- * input specification. Typically includes title, tags, and chunk body.
- *
- * @param ecfg Embedding configuration
- * @param row Source row data (for column values)
- * @param chunk_body Chunk body text
- * @return std::string Text to embed (empty if embedding disabled)
- */
static std::string build_embedding_input(const EmbeddingConfig& ecfg,
const RowMap& row,
const std::string& chunk_body) {
- if (!ecfg.enabled) return "";
- if (!ecfg.input_spec.is_object()) return chunk_body;
+ if (!ecfg.enabled) return "";
+ if (!ecfg.input_spec.is_object()) return chunk_body;
- if (ecfg.input_spec.contains("concat") && ecfg.input_spec["concat"].is_array()) {
- return eval_concat(ecfg.input_spec["concat"], row, chunk_body, true);
- }
+ if (ecfg.input_spec.contains("concat") && ecfg.input_spec["concat"].is_array()) {
+ return eval_concat(ecfg.input_spec["concat"], row, chunk_body, true);
+ }
- return chunk_body;
+ return chunk_body;
+}
+
+// ===========================================================================
+// Vector Insert (BLOB storage for SQLite backend)
+// ===========================================================================
+
+static void insert_vec(MySQLDB& db,
+ const std::vector& emb,
+ const std::string& chunk_id,
+ const std::string& doc_id,
+ int source_id) {
+ // Convert float vector to hex string for SQLite BLOB literal syntax X'...'
+ std::string hex_blob;
+ hex_blob.reserve(emb.size() * 8);
+ for (float f : emb) {
+ hex_blob += float_to_hex_blob(f);
+ }
+
+ std::string e_chunk_id = sql_escape_single_quotes(chunk_id);
+ std::string e_doc_id = sql_escape_single_quotes(doc_id);
+
+ // Use SQLite's X'' hex literal syntax - works through MySQL protocol gateway
+ // Use stringstream to avoid fixed buffer size issues
+ std::ostringstream sql;
+ sql << "INSERT INTO rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) "
+ << "VALUES(X'" << hex_blob << "', '" << e_chunk_id << "', '" << e_doc_id
+ << "', " << source_id << ", unixepoch())";
+
+ db.execute(sql.str().c_str());
}
-/**
- * @brief Process a batch of pending embeddings
- *
- * Generates embeddings for all pending chunks in a single API call
- * and inserts the resulting vectors into the database. This is much
- * more efficient than generating embeddings one chunk at a time.
- *
- * @param pending Vector of pending embeddings to process
- * @param embedder Embedding provider instance
- * @param ecfg Embedding configuration (dim, etc.)
- * @param ss Prepared SQLite statements
- * @param now_epoch Current epoch time for updated_at field
- * @return size_t Number of embeddings processed
- *
- * @note Clears the pending vector after processing.
- * @note Throws std::runtime_error on embedding API failure.
- */
static size_t flush_embedding_batch(std::vector& pending,
EmbeddingProvider* embedder,
const EmbeddingConfig& ecfg,
- SqliteStmts& ss,
- std::int64_t now_epoch) {
- if (pending.empty()) return 0;
-
- // Build batch inputs
- std::vector inputs;
- inputs.reserve(pending.size());
- for (const auto& p : pending) {
- inputs.push_back(p.input_text);
- }
-
- // Generate embeddings in a single API call
- std::vector> embeddings = embedder->embed(inputs, ecfg.dim);
-
- // Insert all vectors into the database
- for (size_t i = 0; i < pending.size() && i < embeddings.size(); i++) {
- const auto& p = pending[i];
- sqlite_insert_vec(ss, embeddings[i], p.chunk_id, p.doc_id, p.source_id, now_epoch);
- }
-
- size_t count = pending.size();
- pending.clear();
- return count;
-}
+ MySQLDB& db) {
+ if (pending.empty()) return 0;
-// ===========================================================================
-// Statement Preparation
-// ===========================================================================
+ std::vector inputs;
+ inputs.reserve(pending.size());
+ for (const auto& p : pending) {
+ inputs.push_back(p.input_text);
+ }
-/**
- * @brief Prepare all SQLite statements for ingestion
- *
- * Creates prepared statements for document existence check,
- * document insertion, chunk insertion, FTS insertion, and
- * optionally vector insertion.
- *
- * @param db SQLite database connection
- * @param want_vec Whether to prepare vector insert statement
- * @return SqliteStmts Collection of prepared statements
- *
- * @note On failure, calls fatal() to terminate.
- */
-static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) {
- SqliteStmts ss;
-
- // Existence check
- sqlite_prepare_or_die(db, &ss.doc_exists,
- "SELECT 1 FROM rag_documents WHERE doc_id = ? LIMIT 1");
-
- // Insert document (v0: no upsert)
- sqlite_prepare_or_die(db, &ss.ins_doc,
- "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json) "
- "VALUES(?,?,?,?,?,?,?)");
-
- // Insert chunk
- sqlite_prepare_or_die(db, &ss.ins_chunk,
- "INSERT INTO rag_chunks(chunk_id, doc_id, source_id, chunk_index, title, body, metadata_json) "
- "VALUES(?,?,?,?,?,?,?)");
-
- // Insert FTS
- sqlite_prepare_or_die(db, &ss.ins_fts,
- "INSERT INTO rag_fts_chunks(chunk_id, title, body) VALUES(?,?,?)");
-
- // Insert vector (optional)
- if (want_vec) {
- sqlite_prepare_or_die(db, &ss.ins_vec,
- "INSERT INTO rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) "
- "VALUES(?,?,?,?,?)");
- }
-
- return ss;
+ std::vector> embeddings = embedder->embed(inputs, ecfg.dim);
+
+ for (size_t i = 0; i < pending.size() && i < embeddings.size(); i++) {
+ const auto& p = pending[i];
+ insert_vec(db, embeddings[i], p.chunk_id, p.doc_id, p.source_id);
+ }
+
+ size_t count = pending.size();
+ pending.clear();
+ return count;
}
// ===========================================================================
// Source Ingestion
// ===========================================================================
-/**
- * @brief Ingest data from a single source
- *
- * Main ingestion function for a single source. Performs the following:
- * 1. Parse chunking and embedding configurations
- * 2. Load sync cursor (watermark)
- * 3. Connect to backend
- * 4. Build and execute SELECT query with incremental filter
- * 5. For each row: build document, check existence, insert, chunk, embed
- * 6. Update sync cursor with max watermark value
- *
- * @param sdb SQLite database connection
- * @param src Source configuration
- *
- * @note Only "mysql" backend_type is supported in v0.
- * @note Skips documents that already exist (no update in v0).
- * @note All inserts happen in a single transaction (managed by caller).
- */
-static void ingest_source(sqlite3* sdb, const RagSource& src) {
- std::cerr << "Ingesting source_id=" << src.source_id
- << " name=" << src.name
- << " backend=" << src.backend_type
- << " table=" << src.table_name << "\n";
-
- // v0 only supports MySQL
- if (src.backend_type != "mysql") {
- std::cerr << " Skipping: backend_type not supported in v0.\n";
- return;
- }
-
- // Parse configurations
- ChunkingConfig ccfg = parse_chunking_json(src.chunking_json);
- EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json);
- std::unique_ptr embedder;
- if (ecfg.enabled) {
- embedder = build_embedding_provider(ecfg);
- }
-
- // Load sync cursor (watermark for incremental sync)
- json cursor_json = load_sync_cursor_json(sdb, src.source_id);
- SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column);
-
- // Prepare SQLite statements
- SqliteStmts ss = prepare_sqlite_statements(sdb, ecfg.enabled);
-
- // Connect to MySQL
- MYSQL* mdb = mysql_connect_or_die(src);
-
- // Build SELECT with incremental filter
- std::vector cols = collect_needed_columns(src, ecfg);
- if (!cursor.column.empty()) add_unique(cols, cursor.column);
- std::string extra_filter = build_incremental_filter(cursor);
- std::string sel = build_select_sql(src, cols, extra_filter);
-
- if (mysql_query(mdb, sel.c_str()) != 0) {
- std::string err = mysql_error(mdb);
- mysql_close(mdb);
- sqlite_finalize_all(ss);
- fatal("MySQL query failed: " + err + "\nSQL: " + sel);
- }
+static void ingest_source(MySQLDB& db, const RagSource& src) {
+ std::cerr << "Ingesting source_id=" << src.source_id
+ << " name=" << src.name
+ << " backend=" << src.backend_type
+ << " table=" << src.table_name << "\n";
- MYSQL_RES* res = mysql_store_result(mdb);
- if (!res) {
- std::string err = mysql_error(mdb);
- mysql_close(mdb);
- sqlite_finalize_all(ss);
- fatal("mysql_store_result failed: " + err);
- }
-
- std::uint64_t ingested_docs = 0;
- std::uint64_t skipped_docs = 0;
-
- // Note: updated_at is set to 0 for v0. For accurate timestamps,
- // query SELECT unixepoch() once at the start of main().
- std::int64_t now_epoch = 0;
-
- // Batch embeddings for efficiency
- std::vector pending_embeddings;
-
- // Track max watermark value (for next run)
- MYSQL_ROW r;
- bool max_set = false;
- bool max_numeric = false;
- std::int64_t max_num = 0;
- std::string max_str;
-
- // Process each row
- while ((r = mysql_fetch_row(res)) != nullptr) {
- RowMap row = mysql_row_to_map(res, r);
-
- // Track max watermark value (even for skipped docs)
- if (!cursor.column.empty()) {
- auto it = row.find(cursor.column);
- if (it != row.end()) {
- const std::string& v = it->second;
- if (!v.empty()) {
- if (!max_set) {
- // First value - determine type and store
- if (cursor.numeric || is_integer_string(v)) {
- try {
- max_numeric = true;
- max_num = std::stoll(v);
- } catch (const std::out_of_range& e) {
- // Huge integer - fall back to string comparison
- max_numeric = false;
- max_str = v;
- } catch (const std::invalid_argument& e) {
- // Not actually a number despite is_integer_string check
- max_numeric = false;
- max_str = v;
- }
- } else {
- max_numeric = false;
- max_str = v;
- }
- max_set = true;
- } else if (max_numeric) {
- // Already numeric - try to compare as number
- if (is_integer_string(v)) {
- try {
- std::int64_t nv = std::stoll(v);
- if (nv > max_num) max_num = nv;
- } catch (const std::out_of_range& e) {
- // Huge integer - fall back to string comparison
- max_numeric = false;
- max_str = v;
- } catch (const std::invalid_argument& e) {
- // Not actually a number - skip this value
- }
- }
- } else {
- // Already string - lexicographic comparison
- if (v > max_str) max_str = v;
- }
- }
- }
+ if (src.backend_type != "mysql") {
+ std::cerr << " Skipping: backend_type not supported in v0.\n";
+ return;
+ }
+
+ ChunkingConfig ccfg = parse_chunking_json(src.chunking_json);
+ EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json);
+ std::unique_ptr embedder;
+ if (ecfg.enabled) {
+ embedder = build_embedding_provider(ecfg);
}
- // Build document from row
- BuiltDoc doc = build_document_from_row(src, row);
+ json cursor_json = load_sync_cursor_json(db, src.source_id);
+ SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column);
+
+ MYSQL* mdb = mysql_connect_or_die(src);
+
+ std::vector cols = collect_needed_columns(src, ecfg);
+ if (!cursor.column.empty()) add_unique(cols, cursor.column);
+ std::string extra_filter = build_incremental_filter(cursor);
+ std::string sel = build_select_sql(src, cols, extra_filter);
- // v0: Skip if document already exists
- if (sqlite_doc_exists(ss, doc.doc_id)) {
- skipped_docs++;
- continue;
+ if (mysql_query(mdb, sel.c_str()) != 0) {
+ std::string err = mysql_error(mdb);
+ mysql_close(mdb);
+ fatal("MySQL query failed: " + err + "\nSQL: " + sel);
}
- // Insert document
- sqlite_insert_doc(ss, src.source_id, src.name,
- doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json);
+ MYSQL_RES* res = mysql_store_result(mdb);
+ if (!res) {
+ std::string err = mysql_error(mdb);
+ mysql_close(mdb);
+ fatal("mysql_store_result failed: " + err);
+ }
- // Chunk document body
- std::vector chunks = chunk_text_chars(doc.body, ccfg);
+ std::uint64_t ingested_docs = 0;
+ std::uint64_t skipped_docs = 0;
+ std::vector pending_embeddings;
+
+ MYSQL_ROW r;
+ bool max_set = false;
+ bool max_numeric = false;
+ std::int64_t max_num = 0;
+ std::string max_str;
+
+ while ((r = mysql_fetch_row(res)) != nullptr) {
+ RowMap row = mysql_row_to_map(res, r);
+
+ if (!cursor.column.empty()) {
+ auto it = row.find(cursor.column);
+ if (it != row.end()) {
+ const std::string& v = it->second;
+ if (!v.empty()) {
+ if (!max_set) {
+ if (cursor.numeric || is_integer_string(v)) {
+ try {
+ max_numeric = true;
+ max_num = std::stoll(v);
+ } catch (...) {
+ max_numeric = false;
+ max_str = v;
+ }
+ } else {
+ max_numeric = false;
+ max_str = v;
+ }
+ max_set = true;
+ } else if (max_numeric) {
+ if (is_integer_string(v)) {
+ try {
+ std::int64_t nv = std::stoll(v);
+ if (nv > max_num) max_num = nv;
+ } catch (...) {
+ max_numeric = false;
+ max_str = v;
+ }
+ }
+ } else {
+ if (v > max_str) max_str = v;
+ }
+ }
+ }
+ }
- // Process each chunk
- for (size_t i = 0; i < chunks.size(); i++) {
- std::string chunk_id = doc.doc_id + "#" + std::to_string(i);
+ BuiltDoc doc = build_document_from_row(src, row);
+
+ if (doc_exists(db, doc.doc_id)) {
+ skipped_docs++;
+ continue;
+ }
- // Chunk metadata (minimal - just index)
- json cmeta = json::object();
- cmeta["chunk_index"] = (int)i;
+ insert_doc(db, src.source_id, src.name,
+ doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json);
- // Use document title as chunk title (simple approach)
- std::string chunk_title = doc.title;
+ std::vector chunks = chunk_text_chars(doc.body, ccfg);
- // Insert chunk
- sqlite_insert_chunk(ss, chunk_id, doc.doc_id, src.source_id, (int)i,
- chunk_title, chunks[i], json_dump_compact(cmeta));
+ for (size_t i = 0; i < chunks.size(); i++) {
+ std::string chunk_id = doc.doc_id + "#" + std::to_string(i);
- // Insert into FTS index
- sqlite_insert_fts(ss, chunk_id, chunk_title, chunks[i]);
+ json cmeta = json::object();
+ cmeta["chunk_index"] = (int)i;
- // Collect embedding for batched processing (if enabled)
- if (ecfg.enabled) {
- std::string emb_input = build_embedding_input(ecfg, row, chunks[i]);
- pending_embeddings.push_back({chunk_id, doc.doc_id, src.source_id, emb_input});
+ std::string chunk_title = doc.title;
- // Flush batch if full
- if ((int)pending_embeddings.size() >= ecfg.batch_size) {
- flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, ss, now_epoch);
+ insert_chunk(db, chunk_id, doc.doc_id, src.source_id, (int)i,
+ chunk_title, chunks[i], json_dump_compact(cmeta));
+
+ insert_fts(db, chunk_id, chunk_title, chunks[i]);
+
+ if (ecfg.enabled) {
+ std::string emb_input = build_embedding_input(ecfg, row, chunks[i]);
+ pending_embeddings.push_back({chunk_id, doc.doc_id, src.source_id, emb_input});
+
+ if ((int)pending_embeddings.size() >= ecfg.batch_size) {
+ flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, db);
+ }
+ }
+ }
+
+ ingested_docs++;
+ if (ingested_docs % 1000 == 0) {
+ std::cerr << " progress: ingested_docs=" << ingested_docs
+ << " skipped_docs=" << skipped_docs << "\n";
}
- }
}
- ingested_docs++;
- if (ingested_docs % 1000 == 0) {
- std::cerr << " progress: ingested_docs=" << ingested_docs
- << " skipped_docs=" << skipped_docs << "\n";
+ if (ecfg.enabled && !pending_embeddings.empty()) {
+ flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, db);
}
- }
-
- // Flush any remaining pending embeddings
- if (ecfg.enabled && !pending_embeddings.empty()) {
- flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, ss, now_epoch);
- }
-
- // Cleanup
- mysql_free_result(res);
- mysql_close(mdb);
- sqlite_finalize_all(ss);
-
- // Update sync cursor with max watermark value
- if (!cursor_json.is_object()) cursor_json = json::object();
- if (!cursor.column.empty()) cursor_json["column"] = cursor.column;
- if (max_set) {
- if (max_numeric) {
- cursor_json["value"] = max_num;
- } else {
- cursor_json["value"] = max_str;
+
+ mysql_free_result(res);
+ mysql_close(mdb);
+
+ if (!cursor_json.is_object()) cursor_json = json::object();
+ if (!cursor.column.empty()) cursor_json["column"] = cursor.column;
+ if (max_set) {
+ if (max_numeric) {
+ cursor_json["value"] = max_num;
+ } else {
+ cursor_json["value"] = max_str;
+ }
+ }
+ update_sync_state(db, src.source_id, cursor_json);
+
+ std::cerr << "Done source " << src.name
+ << " ingested_docs=" << ingested_docs
+ << " skipped_docs=" << skipped_docs << "\n";
+}
+
+// ===========================================================================
+// Schema Initialization
+// ===========================================================================
+
+/**
+ * @brief Check if a table exists in the database
+ * @param db Database connection
+ * @param table_name Name of the table to check
+ * @return true if table exists, false otherwise
+ */
+static bool table_exists(MySQLDB& db, const std::string& table_name) {
+ // Check connection validity
+ if (!db.conn) {
+ return false;
+ }
+
+ // Try to query the table - if it fails, table doesn't exist
+ std::string escaped = sql_escape_single_quotes(table_name);
+ std::ostringstream sql;
+ sql << "SELECT COUNT(*) FROM `" << escaped << "` LIMIT 1";
+
+ // Suppress error output for this check
+ if (mysql_query(db.conn, sql.str().c_str()) != 0) {
+ return false; // Table doesn't exist
+ }
+
+ // Check for actual errors (like "table doesn't exist")
+ unsigned int err = mysql_errno(db.conn);
+ if (err != 0) {
+ return false; // Table doesn't exist
+ }
+
+ MYSQL_RES* res = mysql_store_result(db.conn);
+ if (res) {
+ mysql_free_result(res);
+ return true; // Table exists
+ }
+ return false; // Table doesn't exist
+}
+
+/**
+ * @brief Initialize RAG schema in the database
+ * @param db Database connection
+ * @param vec_dim Vector dimension for rag_vec_chunks table
+ * @return true if schema was created, false if already exists
+ */
+static bool init_schema(MySQLDB& db, int vec_dim = 1536) {
+ // Check if schema is complete by checking for rag_sync_state table
+ // (rag_sync_state is created last, so if it exists, schema is complete)
+ bool schema_complete = table_exists(db, "rag_sync_state");
+
+ // Note: PRAGMA commands are SQLite-specific and not supported through MySQL protocol
+ // The SQLite backend should have these configured already
+
+ // Create rag_sources table
+ db.execute(
+ "CREATE TABLE IF NOT EXISTS rag_sources ("
+ " source_id INTEGER PRIMARY KEY,"
+ " name TEXT NOT NULL UNIQUE,"
+ " enabled INTEGER NOT NULL DEFAULT 1,"
+ " backend_type TEXT NOT NULL,"
+ " backend_host TEXT NOT NULL,"
+ " backend_port INTEGER NOT NULL,"
+ " backend_user TEXT NOT NULL,"
+ " backend_pass TEXT NOT NULL,"
+ " backend_db TEXT NOT NULL,"
+ " table_name TEXT NOT NULL,"
+ " pk_column TEXT NOT NULL,"
+ " where_sql TEXT,"
+ " doc_map_json TEXT NOT NULL,"
+ " chunking_json TEXT NOT NULL,"
+ " embedding_json TEXT,"
+ " created_at INTEGER NOT NULL DEFAULT (unixepoch()),"
+ " updated_at INTEGER NOT NULL DEFAULT (unixepoch())"
+ ")"
+ );
+
+ db.execute("CREATE INDEX IF NOT EXISTS idx_rag_sources_enabled ON rag_sources(enabled)");
+ db.execute("CREATE INDEX IF NOT EXISTS idx_rag_sources_backend ON rag_sources(backend_type, backend_host, backend_port, backend_db, table_name)");
+
+ // Create rag_documents table
+ db.execute(
+ "CREATE TABLE IF NOT EXISTS rag_documents ("
+ " doc_id TEXT PRIMARY KEY,"
+ " source_id INTEGER NOT NULL REFERENCES rag_sources(source_id),"
+ " source_name TEXT NOT NULL,"
+ " pk_json TEXT NOT NULL,"
+ " title TEXT,"
+ " body TEXT,"
+ " metadata_json TEXT NOT NULL DEFAULT '{}',"
+ " updated_at INTEGER NOT NULL DEFAULT (unixepoch()),"
+ " deleted INTEGER NOT NULL DEFAULT 0"
+ ")"
+ );
+
+ db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_updated ON rag_documents(source_id, updated_at)");
+ db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_deleted ON rag_documents(source_id, deleted)");
+
+ // Create rag_chunks table
+ db.execute(
+ "CREATE TABLE IF NOT EXISTS rag_chunks ("
+ " chunk_id TEXT PRIMARY KEY,"
+ " doc_id TEXT NOT NULL REFERENCES rag_documents(doc_id),"
+ " source_id INTEGER NOT NULL REFERENCES rag_sources(source_id),"
+ " chunk_index INTEGER NOT NULL,"
+ " title TEXT,"
+ " body TEXT NOT NULL,"
+ " metadata_json TEXT NOT NULL DEFAULT '{}',"
+ " updated_at INTEGER NOT NULL DEFAULT (unixepoch()),"
+ " deleted INTEGER NOT NULL DEFAULT 0"
+ ")"
+ );
+
+ db.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_rag_chunks_doc_idx ON rag_chunks(doc_id, chunk_index)");
+ db.execute("CREATE INDEX IF NOT EXISTS idx_rag_chunks_source_doc ON rag_chunks(source_id, doc_id)");
+ db.execute("CREATE INDEX IF NOT EXISTS idx_rag_chunks_deleted ON rag_chunks(deleted)");
+
+ // Create FTS5 virtual table
+ db.execute(
+ "CREATE VIRTUAL TABLE IF NOT EXISTS rag_fts_chunks "
+ "USING fts5("
+ " chunk_id UNINDEXED,"
+ " title,"
+ " body,"
+ " tokenize = 'unicode61'"
+ ")"
+ );
+
+ // Create vec0 virtual table for embeddings
+ // Note: This may fail if sqlite-vec extension is not loaded
+ std::ostringstream vec_sql;
+ vec_sql << "CREATE VIRTUAL TABLE IF NOT EXISTS rag_vec_chunks "
+ << "USING vec0("
+ << " embedding float[" << vec_dim << "],"
+ << " chunk_id TEXT,"
+ << " doc_id TEXT,"
+ << " source_id INTEGER,"
+ << " updated_at INTEGER"
+ << ")";
+ if (!db.try_execute(vec_sql.str().c_str())) {
+ std::cerr << "Warning: vec0 table creation failed (sqlite-vec extension not available). Vector embeddings will be disabled.\n";
}
- }
- update_sync_state(sdb, src.source_id, cursor_json);
- std::cerr << "Done source " << src.name
- << " ingested_docs=" << ingested_docs
- << " skipped_docs=" << skipped_docs << "\n";
+ // Create convenience view
+ db.execute(
+ "CREATE VIEW IF NOT EXISTS rag_chunk_view AS "
+ "SELECT "
+ " c.chunk_id, "
+ " c.doc_id, "
+ " c.source_id, "
+ " d.source_name, "
+ " d.pk_json, "
+ " COALESCE(c.title, d.title) AS title, "
+ " c.body, "
+ " d.metadata_json AS doc_metadata_json, "
+ " c.metadata_json AS chunk_metadata_json, "
+ " c.updated_at "
+ "FROM rag_chunks c "
+ "JOIN rag_documents d ON d.doc_id = c.doc_id "
+ "WHERE c.deleted = 0 AND d.deleted = 0"
+ );
+
+ // Create sync state table
+ db.execute(
+ "CREATE TABLE IF NOT EXISTS rag_sync_state ("
+ " source_id INTEGER PRIMARY KEY REFERENCES rag_sources(source_id),"
+ " mode TEXT NOT NULL DEFAULT 'poll',"
+ " cursor_json TEXT NOT NULL DEFAULT '{}',"
+ " last_ok_at INTEGER,"
+ " last_error TEXT"
+ ")"
+ );
+
+ return !schema_complete; // Return true if we created it, false if it was already complete
}
// ===========================================================================
@@ -2156,96 +1463,354 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) {
// ===========================================================================
/**
- * @brief Main entry point for RAG ingestion tool
- *
- * Entry point for the RAG ingestion tool. Orchestrates the entire
- * ingestion process:
- *
- * 1. Validates command-line arguments
- * 2. Initializes libcurl
- * 3. Opens SQLite RAG index database
- * 4. Loads sqlite3-vec extension (if configured)
- * 5. Sets SQLite pragmas (foreign keys, WAL mode)
- * 6. Begins transaction
- * 7. Loads and ingests all enabled sources
- * 8. Commits or rolls back transaction
- * 9. Cleanup and exit
- *
+ * @brief Connection parameters
+ */
+struct ConnParams {
+ std::string host = "127.0.0.1";
+ int port = 6030;
+ std::string user;
+ std::string pass;
+ std::string database;
+
+ // Query-specific parameters
+ std::string query_text;
+ int source_id = -1;
+ int limit = 5;
+
+ // Init-specific parameters
+ int vec_dim = 1536; // Vector dimension for vec0 table
+};
+
+static void print_usage(const char* prog_name) {
+ std::cerr << "Usage:\n";
+ std::cerr << " Initialize schema:\n";
+ std::cerr << " " << prog_name << " init [OPTIONS]\n";
+ std::cerr << "\n";
+ std::cerr << " Run ingestion:\n";
+ std::cerr << " " << prog_name << " ingest [OPTIONS]\n";
+ std::cerr << "\n";
+ std::cerr << " Vector similarity search:\n";
+ std::cerr << " " << prog_name << " query --text=\"your query\" [OPTIONS]\n";
+ std::cerr << "\n";
+ std::cerr << "Common Options (SQLite Server via MySQL protocol gateway):\n";
+ std::cerr << " -h, --host=name SQLite Server host (default: 127.0.0.1)\n";
+ std::cerr << " -P, --port=# SQLite Server port - MySQL protocol gateway (default: 6030)\n";
+ std::cerr << " -u, --user=name User for login\n";
+ std::cerr << " -p, --password=name Password to use\n";
+ std::cerr << " -D, --database=name Database to use (required)\n";
+ std::cerr << " -?, --help Show this help message\n";
+ std::cerr << "\n";
+ std::cerr << "Init Options:\n";
+ std::cerr << " --vec-dim=# Vector dimension for rag_vec_chunks table (default: 1536)\n";
+ std::cerr << "\n";
+ std::cerr << "Query Options:\n";
+ std::cerr << " -t, --text=text Query text to search for (required for query)\n";
+ std::cerr << " -s, --source-id=# Source ID to search (default: all enabled sources)\n";
+ std::cerr << " -l, --limit=# Maximum results to return (default: 5)\n";
+}
+
+/**
+ * @brief Parse connection parameters from command-line arguments
* @param argc Argument count
* @param argv Argument values
- * @return int Exit code (0 on success, 1 on error, 2 on usage error)
- *
- * Usage:
- * @verbatim
- * ./rag_ingest /path/to/rag_index.sqlite
- * @endverbatim
+ * @param params Output connection parameters
+ * @return Command name ("init", "ingest", or "query") or empty string on error
*/
+static std::string parse_args(int argc, char** argv, ConnParams& params) {
+ static struct option long_options[] = {
+ {"host", required_argument, 0, 'h'},
+ {"port", required_argument, 0, 'P'},
+ {"user", required_argument, 0, 'u'},
+ {"password", required_argument, 0, 'p'},
+ {"database", required_argument, 0, 'D'},
+ {"text", required_argument, 0, 't'},
+ {"source-id",required_argument, 0, 's'},
+ {"limit", required_argument, 0, 'l'},
+ {"vec-dim", required_argument, 0, 1000}, // Using 1000 as short code
+ {"help", no_argument, 0, '?'},
+ {0, 0, 0, 0}
+ };
+
+ std::string command;
+ int opt;
+ int option_index = 0;
+
+ // Parse command as first argument
+ if (argc < 2) {
+ return "";
+ }
+
+ command = argv[1];
+
+ // Validate command
+ if (command != "init" && command != "ingest" && command != "query") {
+ return "";
+ }
+
+ // Shift argv for getopt so command is processed separately
+ argc--;
+ argv++;
+
+ // Parse options using getopt_long
+ while ((opt = getopt_long(argc, argv, "h:P:u:p:D:t:s:l:?", long_options, &option_index)) != -1) {
+ switch (opt) {
+ case 'h':
+ params.host = optarg;
+ break;
+ case 'P':
+ params.port = std::atoi(optarg);
+ break;
+ case 'u':
+ params.user = optarg;
+ break;
+ case 'p':
+ params.pass = optarg;
+ break;
+ case 'D':
+ params.database = optarg;
+ break;
+ case 't':
+ params.query_text = optarg;
+ break;
+ case 's':
+ params.source_id = std::atoi(optarg);
+ break;
+ case 'l':
+ params.limit = std::atoi(optarg);
+ break;
+ case 1000: // --vec-dim
+ params.vec_dim = std::atoi(optarg);
+ if (params.vec_dim <= 0) {
+ std::cerr << "Error: --vec-dim must be positive\n";
+ return "";
+ }
+ break;
+ case '?':
+ default:
+ return "";
+ }
+ }
+
+ // Validate required parameters
+ if (params.database.empty()) {
+ std::cerr << "Error: Required parameter missing: --database is required\n";
+ return "";
+ }
+
+ // For query command, query_text is required
+ if (command == "query" && params.query_text.empty()) {
+ std::cerr << "Error: --text is required for query command\n";
+ return "";
+ }
+
+ return command;
+}
+
int main(int argc, char** argv) {
- // Validate arguments
- if (argc != 2) {
- std::cerr << "Usage: " << argv[0] << " \n";
- return 2;
- }
-
- // Initialize libcurl (for embedding API calls)
- curl_global_init(CURL_GLOBAL_DEFAULT);
-
- const char* sqlite_path = argv[1];
-
- // Open SQLite database
- sqlite3* db = nullptr;
- if (sqlite3_open(sqlite_path, &db) != SQLITE_OK) {
- fatal("Could not open SQLite DB: " + std::string(sqlite_path));
- }
-
- // Load sqlite3-vec extension if configured
- sqlite_load_vec_extension(db);
-
- // Set safe SQLite pragmas
- sqlite_exec(db, "PRAGMA foreign_keys = ON;");
- sqlite_exec(db, "PRAGMA journal_mode = WAL;");
- sqlite_exec(db, "PRAGMA synchronous = NORMAL;");
-
- // Begin single transaction for speed
- if (sqlite_exec(db, "BEGIN IMMEDIATE;") != SQLITE_OK) {
- sqlite3_close(db);
- fatal("Failed to begin transaction");
- }
-
- bool ok = true;
- try {
- // Load and ingest all enabled sources
- std::vector sources = load_sources(db);
- if (sources.empty()) {
- std::cerr << "No enabled sources found in rag_sources.\n";
+ ConnParams params;
+ std::string command = parse_args(argc, argv, params);
+
+ if (command.empty()) {
+ print_usage(argv[0]);
+ return 2;
}
- for (size_t i = 0; i < sources.size(); i++) {
- ingest_source(db, sources[i]);
+
+ // Initialize command
+ if (command == "init") {
+ MySQLDB db;
+ db.connect(params.host.c_str(), params.port, params.user.c_str(),
+ params.pass.c_str(), params.database.c_str());
+
+ bool created = init_schema(db, params.vec_dim);
+ if (created) {
+ std::cout << "Schema created successfully (vec_dim=" << params.vec_dim << ").\n";
+ } else {
+ std::cout << "Schema already exists.\n";
+ }
+
+ return 0;
}
- } catch (const std::exception& e) {
- std::cerr << "Exception: " << e.what() << "\n";
- ok = false;
- } catch (...) {
- std::cerr << "Unknown exception\n";
- ok = false;
- }
-
- // Commit or rollback
- if (ok) {
- if (sqlite_exec(db, "COMMIT;") != SQLITE_OK) {
- sqlite_exec(db, "ROLLBACK;");
- sqlite3_close(db);
- fatal("Failed to commit transaction");
+
+ // Ingest command
+ if (command == "ingest") {
+ MySQLDB db;
+ db.connect(params.host.c_str(), params.port, params.user.c_str(),
+ params.pass.c_str(), params.database.c_str());
+
+ // Check if schema exists before proceeding
+ if (!table_exists(db, "rag_sources")) {
+ std::cerr << "Error: RAG schema not found. Please run 'init' command first:\n";
+ std::cerr << " " << argv[0] << " init -h " << params.host
+ << " -P " << params.port << " -u " << params.user
+ << " -p " << params.pass << " -D " << params.database << "\n";
+ return 1;
+ }
+
+ curl_global_init(CURL_GLOBAL_DEFAULT);
+
+ // Note: PRAGMA commands are SQLite-specific and not supported through MySQL protocol
+ // The SQLite backend should have these configured already
+ db.execute("BEGIN IMMEDIATE;");
+
+ bool ok = true;
+ try {
+ std::vector sources = load_sources(db);
+ if (sources.empty()) {
+ std::cerr << "No enabled sources found in rag_sources.\n";
+ }
+ for (size_t i = 0; i < sources.size(); i++) {
+ ingest_source(db, sources[i]);
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "Exception: " << e.what() << "\n";
+ ok = false;
+ } catch (...) {
+ std::cerr << "Unknown exception\n";
+ ok = false;
+ }
+
+ db.execute(ok ? "COMMIT;" : "ROLLBACK;");
+ curl_global_cleanup();
+ return ok ? 0 : 1;
}
- } else {
- sqlite_exec(db, "ROLLBACK;");
- sqlite3_close(db);
- curl_global_cleanup();
- return 1;
- }
-
- // Cleanup
- sqlite3_close(db);
- curl_global_cleanup();
- return 0;
+
+ // Query command
+ if (command == "query") {
+ MySQLDB db;
+ db.connect(params.host.c_str(), params.port, params.user.c_str(),
+ params.pass.c_str(), params.database.c_str());
+
+ // Check if schema exists
+ if (!table_exists(db, "rag_sources")) {
+ std::cerr << "Error: RAG schema not found. Please run 'init' command first:\n";
+ std::cerr << " " << argv[0] << " init -h " << params.host
+ << " -P " << params.port << " -u " << params.user
+ << " -p " << params.pass << " -D " << params.database << "\n";
+ return 1;
+ }
+
+ curl_global_init(CURL_GLOBAL_DEFAULT);
+
+ try {
+ // Load sources
+ std::vector sources = load_sources(db);
+
+ // Filter by source_id if specified
+ if (params.source_id >= 0) {
+ auto it = std::remove_if(sources.begin(), sources.end(),
+ [params](const RagSource& s) { return s.source_id != params.source_id; });
+ sources.erase(it, sources.end());
+ }
+
+ if (sources.empty()) {
+ std::cerr << "No enabled sources found";
+ if (params.source_id >= 0) {
+ std::cerr << " for source_id=" << params.source_id;
+ }
+ std::cerr << ".\n";
+ curl_global_cleanup();
+ return 1;
+ }
+
+ // Use the first source's embedding config
+ RagSource& source = sources[0];
+
+ if (source.embedding_json.empty()) {
+ std::cerr << "Error: Embeddings not configured for source " << source.source_id << "\n";
+ curl_global_cleanup();
+ return 1;
+ }
+
+ EmbeddingConfig emb_cfg = parse_embedding_json(source.embedding_json);
+ if (!emb_cfg.enabled) {
+ std::cerr << "Error: Embeddings not enabled for source " << source.source_id << "\n";
+ curl_global_cleanup();
+ return 1;
+ }
+
+ std::cout << "Generating embedding for query using: " << emb_cfg.provider << "\n";
+
+ // Build embedding provider
+ auto embedder = build_embedding_provider(emb_cfg);
+
+ // Generate embedding for query
+ std::vector query_inputs = {params.query_text};
+ std::vector> query_embeddings = embedder->embed(query_inputs, emb_cfg.dim);
+
+ if (query_embeddings.empty() || query_embeddings[0].empty()) {
+ std::cerr << "Error: Failed to generate embedding for query\n";
+ curl_global_cleanup();
+ return 1;
+ }
+
+ // Convert embedding to hex string for vec0 MATCH
+ std::string query_hex;
+ query_hex.reserve(query_embeddings[0].size() * 8);
+ for (float f : query_embeddings[0]) {
+ query_hex += float_to_hex_blob(f);
+ }
+
+ // Build search query
+ // vec0 knn requires subquery approach: MATCH (SELECT ... LIMIT 1) AND k = ?
+ std::string source_filter;
+ if (params.source_id >= 0) {
+ source_filter = "AND c.source_id = " + std::to_string(params.source_id);
+ }
+
+ // Use subquery with VALUES to provide the query embedding
+ // This creates a temporary single-row result with the query embedding
+ std::string search_sql =
+ "SELECT c.chunk_id, c.source_id, SUBSTR(c.body, 1, 200) as content, "
+ "v.distance, d.title "
+ "FROM rag_vec_chunks v "
+ "JOIN rag_chunks c ON c.chunk_id = v.chunk_id "
+ "JOIN rag_documents d ON d.doc_id = c.doc_id "
+ "WHERE v.embedding MATCH ("
+ " SELECT X'" + query_hex + "' AS embedding"
+ ") AND k = " + std::to_string(params.limit) + " "
+ + source_filter + " "
+ "ORDER BY v.distance";
+
+ // Execute search
+ MYSQL_RES* result = db.query(search_sql.c_str());
+ if (result) {
+ MYSQL_ROW row;
+ int row_count = 0;
+ while ((row = mysql_fetch_row(result))) {
+ std::cout << "\n--- Result " << (++row_count) << " ---\n";
+ unsigned long* lengths = mysql_fetch_lengths(result);
+ int field_count = mysql_num_fields(result);
+ for (int i = 0; i < field_count; i++) {
+ MYSQL_FIELD* field = mysql_fetch_field_direct(result, i);
+ if (row[i]) {
+ std::cout << field->name << ": " << row[i] << "\n";
+ }
+ }
+ }
+ mysql_free_result(result);
+
+ if (row_count == 0) {
+ std::cout << "No results found.\n";
+ } else {
+ std::cout << "\nFound " << row_count << " result(s).\n";
+ }
+ }
+
+ } catch (const std::exception& e) {
+ std::cerr << "Exception: " << e.what() << "\n";
+ curl_global_cleanup();
+ return 1;
+ } catch (...) {
+ std::cerr << "Unknown exception\n";
+ curl_global_cleanup();
+ return 1;
+ }
+
+ curl_global_cleanup();
+ return 0;
+ }
+
+ // Unknown command
+ print_usage(argv[0]);
+ return 2;
}
diff --git a/lib/proxy_sqlite3_symbols.cpp b/lib/proxy_sqlite3_symbols.cpp
index 600c8a116..573ab3eca 100644
--- a/lib/proxy_sqlite3_symbols.cpp
+++ b/lib/proxy_sqlite3_symbols.cpp
@@ -50,9 +50,17 @@ int (*proxy_sqlite3_prepare_v2)(sqlite3*, const char*, int, sqlite3_stmt**, cons
int (*proxy_sqlite3_open_v2)(const char*, sqlite3**, int, const char*) = sqlite3_open_v2;
int (*proxy_sqlite3_exec)(sqlite3*, const char*, int (*)(void*,int,char**,char**), void*, char**) = sqlite3_exec;
-// Optional hooks used by sqlite-vec (function pointers will be set by LoadPlugin or remain NULL)
-void (*proxy_sqlite3_vec_init)(sqlite3*, char**, const sqlite3_api_routines*) = NULL;
-void (*proxy_sqlite3_rembed_init)(sqlite3*, char**, const sqlite3_api_routines*) = NULL;
+// Optional hooks used by sqlite-vec and sqlite-rembed
+// These are statically linked extensions that need to be initialized
+
+// Forward declarations for the extension init functions
+extern "C" int sqlite3_vec_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi);
+extern "C" int sqlite3_rembed_init(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi);
+
+// Initialize the extension pointers to the statically linked functions
+// These allow Admin_Bootstrap.cpp to register them as auto-extensions
+int (*proxy_sqlite3_vec_init)(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) = sqlite3_vec_init;
+int (*proxy_sqlite3_rembed_init)(sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi) = sqlite3_rembed_init;
// Internal helpers used by admin stats batching; keep defaults as NULL