From 93517540a526707df85dcfe971acead3e490d4b4 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 21 Jan 2026 16:25:49 +0500 Subject: [PATCH 1/9] Added Makefile for rag_ingest --- Makefile | 9 +++++++++ RAG_POC/Makefile | 29 +++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 RAG_POC/Makefile diff --git a/Makefile b/Makefile index 285ef2f57..ab996f751 100644 --- a/Makefile +++ b/Makefile @@ -152,6 +152,15 @@ build_lib_debug: $(if $(LEGACY_BUILD),build_lib_debug_legacy,build_lib_debug_def .PHONY: build_src_debug build_src_debug: $(if $(LEGACY_BUILD),build_src_debug_legacy,build_src_debug_default) +# RAG ingester (PoC) +.PHONY: rag_ingest +rag_ingest: build_deps + cd RAG_POC && ${MAKE} CC=${CC} CXX=${CXX} OPTZ="${O2} -ggdb" CXXFLAGS="${O2} -ggdb" + +.PHONY: rag_ingest_clean +rag_ingest_clean: + cd RAG_POC && ${MAKE} clean + # legacy build targets (pre c++17) .PHONY: build_deps_legacy build_deps_legacy: diff --git a/RAG_POC/Makefile b/RAG_POC/Makefile new file mode 100644 index 000000000..36402f56e --- /dev/null +++ b/RAG_POC/Makefile @@ -0,0 +1,29 @@ +CXX ?= g++ +CXXFLAGS ?= -std=c++17 -O2 + +ROOT_DIR := .. + +INCLUDES := \ + -I$(ROOT_DIR)/deps/json \ + -I$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/include \ + -I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 + +LIBDIRS := \ + -L$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/libmariadb + +SQLITE3_OBJ := $(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400/sqlite3.o + +LIBS := -lmariadbclient -lssl -lcrypto -lcrypt -ldl -lpthread + +TARGET := rag_ingest +SOURCES := rag_ingest.cpp + +.PHONY: all clean + +all: $(TARGET) + +$(TARGET): $(SOURCES) + $(CXX) $(CXXFLAGS) $(INCLUDES) $(LIBDIRS) $(SQLITE3_OBJ) $^ -o $@ $(LIBS) + +clean: + rm -f $(TARGET) From 57d8c3f3b8d15123677d046004dd14d770296d64 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 21 Jan 2026 16:45:46 +0500 Subject: [PATCH 2/9] Make rag_ingest compile --- RAG_POC/rag_ingest.cpp | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index 415ded422..bd69a0417 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -93,7 +93,8 @@ // ------------------------------------------------------------ #include -#include +#include +#include #include #include @@ -109,6 +110,28 @@ #include "json.hpp" using json = nlohmann::json; +extern "C" __attribute__((weak)) char *sha256_crypt_r( + const char *key, + const char *salt, + char *buffer, + int buflen) { + if (!key || !salt || !buffer || buflen <= 0) { + return nullptr; + } + struct crypt_data data; + std::memset(&data, 0, sizeof(data)); + char *res = crypt_r(key, salt, &data); + if (!res) { + return nullptr; + } + size_t len = std::strlen(res); + if (len + 1 > static_cast(buflen)) { + return nullptr; + } + std::memcpy(buffer, res, len + 1); + return buffer; +} + // ------------------------- // Small helpers // ------------------------- From fb3673dd9dc4c410bf94ecb22e2c42c116d78f2f Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 22 Jan 2026 13:48:27 +0500 Subject: [PATCH 3/9] added sample sql files --- RAG_POC/sample_mysql.sql | 29 +++++++++++++++++++++++++++ RAG_POC/sample_sqlite.sql | 41 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 RAG_POC/sample_mysql.sql create mode 100644 RAG_POC/sample_sqlite.sql diff --git a/RAG_POC/sample_mysql.sql b/RAG_POC/sample_mysql.sql new file mode 100644 index 000000000..5db4aa868 --- /dev/null +++ b/RAG_POC/sample_mysql.sql @@ -0,0 +1,29 @@ +-- Sample MySQL dataset for rag_ingest testing +-- Creates a simple posts table and inserts a few rows. + +CREATE DATABASE IF NOT EXISTS rag_test; +USE rag_test; + +DROP TABLE IF EXISTS posts; + +CREATE TABLE posts ( + Id BIGINT NOT NULL PRIMARY KEY, + Title VARCHAR(255) NOT NULL, + Body TEXT NOT NULL, + Tags VARCHAR(255) NULL, + Score INT NOT NULL DEFAULT 0, + CreationDate DATETIME NOT NULL, + UpdatedAt DATETIME NULL +); + +INSERT INTO posts (Id, Title, Body, Tags, Score, CreationDate, UpdatedAt) VALUES +(1, 'Hello RAG', 'This is the first test document. It contains sample text for chunking.', 'rag,test', 10, '2024-01-01 10:00:00', '2024-01-02 12:00:00'), +(2, 'Second Doc', 'A second document body. It has more text to ensure chunking works across boundaries.', 'example,docs', 5, '2024-01-03 09:30:00', '2024-01-03 11:00:00'), +(3, 'ProxySQL RAG', 'ProxySQL adds MCP and RAG support. This row is for ingestion testing.', 'proxysql,rag', 7, '2024-01-05 08:15:00', NULL), +(4, 'Short Note', 'Tiny.', 'misc', 1, '2024-01-06 13:00:00', NULL), +(5, 'Chunk Stress', 'This row contains a longer body to force multiple chunk boundaries when chunking is enabled. Repeat: This row contains a longer body to force multiple chunk boundaries when chunking is enabled.', 'long,chunk', 12, '2024-01-07 18:45:00', '2024-01-08 07:10:00'), +(6, 'Filter Candidate', 'This document should be filtered out by a high score threshold.', 'filter,test', 2, '2024-01-09 14:20:00', NULL), +(7, 'Tag Variation', 'Contains tags and mixed content for metadata pick/rename testing.', 'rag,meta,tag', 9, '2024-01-10 09:00:00', '2024-01-10 10:00:00'), +(8, 'Null Updated', 'Document with NULL UpdatedAt for null handling in source.', 'nulls', 6, '2024-01-11 16:30:00', NULL), +(9, 'High Score', 'This is a high score document for where_sql tests.', 'score,high', 20, '2024-01-12 08:00:00', '2024-01-12 09:30:00'), +(10, 'Low Score', 'Low score entry to test filters.', 'score,low', 0, '2024-01-13 12:00:00', NULL); diff --git a/RAG_POC/sample_sqlite.sql b/RAG_POC/sample_sqlite.sql new file mode 100644 index 000000000..07d35d469 --- /dev/null +++ b/RAG_POC/sample_sqlite.sql @@ -0,0 +1,41 @@ +-- Sample SQLite setup for rag_ingest testing +-- 1) Load the RAG schema (schema.sql) into a new SQLite DB. +-- 2) Insert a sample rag_sources row that points to the MySQL sample. + +-- Step 1: apply schema +.read ./schema.sql + +-- Step 2: insert a sample source +INSERT INTO rag_sources ( + source_id, + name, + enabled, + backend_type, + backend_host, + backend_port, + backend_user, + backend_pass, + backend_db, + table_name, + pk_column, + where_sql, + doc_map_json, + chunking_json, + embedding_json +) VALUES ( + 1, + 'mysql_posts', + 1, + 'mysql', + '127.0.0.1', + 3306, + 'root', + 'root', + 'rag_test', + 'posts', + 'Id', + '', + '{"doc_id":{"format":"posts:{Id}"},"title":{"concat":[{"col":"Title"}]},"body":{"concat":[{"col":"Body"}]},"metadata":{"pick":["Id","Tags","Score","CreationDate"],"rename":{"CreationDate":"CreationDate"}}}', + '{"enabled":true,"unit":"chars","chunk_size":4000,"overlap":400,"min_chunk_size":800}', + '{"enabled":true,"dim":1536,"model":"text-embedding-3-large","input":{"concat":[{"col":"Title"},{"lit":"\\nTags: "},{"col":"Tags"},{"lit":"\\n\\n"},{"chunk_body":true}]}}' +); From 2f6b058f7b227c3cbfddd5ee8fca72a4ef1fc91b Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 22 Jan 2026 13:51:22 +0500 Subject: [PATCH 4/9] Added test_rag_ingest.sh --- RAG_POC/test_rag_ingest.sh | 128 +++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100755 RAG_POC/test_rag_ingest.sh diff --git a/RAG_POC/test_rag_ingest.sh b/RAG_POC/test_rag_ingest.sh new file mode 100755 index 000000000..0bf508375 --- /dev/null +++ b/RAG_POC/test_rag_ingest.sh @@ -0,0 +1,128 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${ROOT_DIR}/.." && pwd)" + +SQLITE_BIN="${SQLITE_BIN:-${REPO_ROOT}/deps/sqlite3/sqlite3/sqlite3}" +MYSQL_BIN="${MYSQL_BIN:-mysql}" + +MYSQL_HOST="${MYSQL_HOST:-127.0.0.1}" +MYSQL_PORT="${MYSQL_PORT:-3306}" +MYSQL_USER="${MYSQL_USER:-root}" +MYSQL_PASS="${MYSQL_PASS:-root}" + +DB1="${ROOT_DIR}/rag_ingest_test.db" + +VEC_EXT="${REPO_ROOT}/deps/sqlite3/sqlite3/vec0.so" + +if [[ ! -f "${VEC_EXT}" ]]; then + echo "FATAL: vec0.so not found at ${VEC_EXT}" >&2 + exit 1 +fi + +run_sqlite() { + local db="$1" + local sql="$2" + "${SQLITE_BIN}" "${db}" < SQLite DB: ${db}" + echo "==> load_schema: ${load_schema}" + echo "==> where_sql: ${where_sql:-}" + echo "==> chunking_json: {\"enabled\":false,\"unit\":\"chars\",\"chunk_size\":4000,\"overlap\":400,\"min_chunk_size\":800}" + echo "==> embedding_json: {\"enabled\":false}" + + "${SQLITE_BIN}" "${db}" <&2 + exit 1 + fi + echo "OK: ${label} = ${actual}" +} + +cleanup_db() { + rm -f "${DB1}" +} + +cleanup_db + +# Phase 1: load schema + source, chunking disabled, no where filter +apply_schema_and_source "${DB1}" "" "true" + +# Seed MySQL +import_mysql_seed + +# Run rag_ingest +"${ROOT_DIR}/rag_ingest" "${DB1}" + +# Validate counts (sample_mysql has 10 rows) +DOCS_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +CHUNKS_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")" +FTS_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_fts_chunks;")" +VEC_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_vec_chunks;")" + +assert_eq "rag_documents" "10" "${DOCS_COUNT}" +assert_eq "rag_chunks (chunking disabled)" "10" "${CHUNKS_COUNT}" +assert_eq "rag_fts_chunks" "10" "${FTS_COUNT}" +assert_eq "rag_vec_chunks (embedding disabled)" "0" "${VEC_COUNT}" + +# Phase 2: apply where filter, re-ingest after cleanup +run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_documents;" + +apply_schema_and_source "${DB1}" "Score >= 7" "false" +"${ROOT_DIR}/rag_ingest" "${DB1}" + +DOCS_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +CHUNKS_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")" +FTS_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_fts_chunks;")" +VEC_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_vec_chunks;")" + +# In sample_mysql: Score >= 7 matches Id 1,3,5,7,9 => 5 docs +assert_eq "rag_documents (where_sql)" "5" "${DOCS_COUNT_2}" +assert_eq "rag_chunks (where_sql)" "5" "${CHUNKS_COUNT_2}" +assert_eq "rag_fts_chunks (where_sql)" "5" "${FTS_COUNT_2}" +assert_eq "rag_vec_chunks (where_sql, embedding disabled)" "0" "${VEC_COUNT_2}" + +echo "All tests passed." From d47e196fc6b1daf03a761ddd0a34f810c601e722 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 22 Jan 2026 14:07:30 +0500 Subject: [PATCH 5/9] Added rag_chunks and rag_fts_chunks test --- RAG_POC/test_rag_ingest.sh | 88 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 3 deletions(-) diff --git a/RAG_POC/test_rag_ingest.sh b/RAG_POC/test_rag_ingest.sh index 0bf508375..5992b0d4b 100755 --- a/RAG_POC/test_rag_ingest.sh +++ b/RAG_POC/test_rag_ingest.sh @@ -34,6 +34,7 @@ apply_schema_and_source() { local db="$1" local where_sql="$2" local load_schema="$3" + local chunking_json_override="${4:-}" local schema_cmd="" if [[ "${load_schema}" == "true" ]]; then @@ -43,14 +44,22 @@ apply_schema_and_source() { echo "==> SQLite DB: ${db}" echo "==> load_schema: ${load_schema}" echo "==> where_sql: ${where_sql:-}" - echo "==> chunking_json: {\"enabled\":false,\"unit\":\"chars\",\"chunk_size\":4000,\"overlap\":400,\"min_chunk_size\":800}" + local chunking_json_value='{"enabled":false,"unit":"chars","chunk_size":4000,"overlap":400,"min_chunk_size":800}' + if [[ -n "${chunking_json_override}" ]]; then + chunking_json_value="${chunking_json_override}" + fi + echo "==> chunking_json: ${chunking_json_value}" echo "==> embedding_json: {\"enabled\":false}" "${SQLITE_BIN}" "${db}" < Sample rag_documents" + run_sqlite "${db}" "SELECT doc_id, source_id, substr(title,1,40) AS title, json_extract(metadata_json,'$.Score') AS score FROM rag_documents ORDER BY doc_id LIMIT 5;" + echo "==> Sample rag_chunks" + run_sqlite "${db}" "SELECT chunk_id, doc_id, chunk_index, substr(body,1,50) AS body FROM rag_chunks ORDER BY chunk_id LIMIT 5;" + echo "==> Sample rag_fts_chunks matches for 'ProxySQL'" + run_sqlite "${db}" "SELECT chunk_id, substr(title,1,40) AS title FROM rag_fts_chunks WHERE rag_fts_chunks MATCH 'ProxySQL' ORDER BY chunk_id LIMIT 5;" +} + cleanup_db() { rm -f "${DB1}" } @@ -105,6 +136,19 @@ assert_eq "rag_chunks (chunking disabled)" "10" "${CHUNKS_COUNT}" assert_eq "rag_fts_chunks" "10" "${FTS_COUNT}" assert_eq "rag_vec_chunks (embedding disabled)" "0" "${VEC_COUNT}" +print_samples "${DB1}" + +# FTS tests (phase 1) +FTS_PHRASE_1="$(fts_count "${DB1}" '"ProxySQL adds MCP"')" +FTS_SHORT_1="$(fts_count "${DB1}" 'Short')" +FTS_TAG_1="$(fts_count "${DB1}" 'Tag')" +FTS_BM25_1="$(fts_bm25_top "${DB1}" 'ProxySQL')" + +assert_eq "fts phrase (ProxySQL adds MCP)" "1" "${FTS_PHRASE_1}" +assert_eq "fts term (Short)" "1" "${FTS_SHORT_1}" +assert_eq "fts term (Tag)" "1" "${FTS_TAG_1}" +assert_eq "fts bm25 top (ProxySQL)" "posts:3#0" "${FTS_BM25_1}" + # Phase 2: apply where filter, re-ingest after cleanup run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;" run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;" @@ -125,4 +169,42 @@ assert_eq "rag_chunks (where_sql)" "5" "${CHUNKS_COUNT_2}" assert_eq "rag_fts_chunks (where_sql)" "5" "${FTS_COUNT_2}" assert_eq "rag_vec_chunks (where_sql, embedding disabled)" "0" "${VEC_COUNT_2}" +print_samples "${DB1}" + +# FTS tests (phase 2) +FTS_PROXYSQL_2="$(fts_count "${DB1}" 'ProxySQL')" +FTS_HIGH_2="$(fts_count "${DB1}" 'High')" +FTS_LOW_2="$(fts_count "${DB1}" 'Low')" +FTS_BM25_2="$(fts_bm25_top "${DB1}" 'High')" + +assert_eq "fts term (ProxySQL)" "1" "${FTS_PROXYSQL_2}" +assert_eq "fts term (High)" "1" "${FTS_HIGH_2}" +assert_eq "fts term (Low)" "0" "${FTS_LOW_2}" +assert_eq "fts bm25 top (High)" "posts:9#0" "${FTS_BM25_2}" + +# Phase 3: enable chunking and ensure rows split into multiple chunks +run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_documents;" + +apply_schema_and_source "${DB1}" "" "false" '{"enabled":true,"unit":"chars","chunk_size":50,"overlap":10,"min_chunk_size":10}' +"${ROOT_DIR}/rag_ingest" "${DB1}" + +DOCS_COUNT_3="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +CHUNKS_COUNT_3="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")" +LONG_DOC_CHUNKS="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks WHERE doc_id='posts:5';")" + +assert_eq "rag_documents (chunking enabled)" "10" "${DOCS_COUNT_3}" +if [[ "${CHUNKS_COUNT_3}" -le "${DOCS_COUNT_3}" ]]; then + echo "FAIL: rag_chunks should be greater than rag_documents when chunking enabled" >&2 + exit 1 +fi +if [[ "${LONG_DOC_CHUNKS}" -le "1" ]]; then + echo "FAIL: posts:5 should produce multiple chunks" >&2 + exit 1 +fi + +print_samples "${DB1}" + echo "All tests passed." From bc33c33295a1767cf9fdb5c45a86e593a02c2629 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 22 Jan 2026 15:47:40 +0500 Subject: [PATCH 6/9] Add vec embedding handling with stub and OpenAI providers Added rag_sync_state implementation --- RAG_POC/rag_ingest.cpp | 361 +++++++++++++++++++++++++++++++++++-- RAG_POC/sample_sqlite.sql | 9 +- RAG_POC/test_rag_ingest.sh | 173 +++++++++++++++++- 3 files changed, 523 insertions(+), 20 deletions(-) diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index bd69a0417..bd57ee840 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -95,6 +95,7 @@ #include #include #include +#include #include #include @@ -156,11 +157,48 @@ static int sqlite_exec(sqlite3* db, const std::string& sql) { return rc; } +static bool is_integer_string(const std::string& s) { + if (s.empty()) return false; + size_t i = 0; + if (s[0] == '-') { + if (s.size() == 1) return false; + i = 1; + } + for (; i < s.size(); i++) { + if (s[i] < '0' || s[i] > '9') return false; + } + return true; +} + +static std::string sql_escape_single_quotes(const std::string& s) { + std::string out; + out.reserve(s.size() + 8); + for (char c : s) { + if (c == '\'') out.push_back('\''); + out.push_back(c); + } + return out; +} + static std::string json_dump_compact(const json& j) { // Compact output (no pretty printing) to keep storage small. return j.dump(); } +static void sqlite_load_vec_extension(sqlite3* db) { + const char* ext = std::getenv("RAG_VEC0_EXT"); + if (!ext || std::strlen(ext) == 0) return; + + sqlite3_enable_load_extension(db, 1); + char* err = nullptr; + int rc = sqlite3_load_extension(db, ext, nullptr, &err); + if (rc != SQLITE_OK) { + std::string e = err ? err : "(unknown error)"; + sqlite3_free(err); + fatal("Failed to load vec0 extension: " + e + " (" + std::string(ext) + ")"); + } +} + // ------------------------- // Data model // ------------------------- @@ -202,6 +240,19 @@ struct EmbeddingConfig { int dim = 1536; std::string model = "unknown"; json input_spec; // expects {"concat":[...]} + std::string provider = "stub"; // stub | openai + std::string api_base; + std::string api_key; + int batch_size = 16; + int timeout_ms = 20000; +}; + +struct SyncCursor { + std::string column; + bool has_value = false; + bool numeric = false; + std::int64_t num_value = 0; + std::string str_value; }; // A row fetched from MySQL, as a name->string map. @@ -244,8 +295,15 @@ static EmbeddingConfig parse_embedding_json(const json& j) { if (j.contains("dim")) cfg.dim = j["dim"].get(); if (j.contains("model")) cfg.model = j["model"].get(); if (j.contains("input")) cfg.input_spec = j["input"]; + if (j.contains("provider")) cfg.provider = j["provider"].get(); + if (j.contains("api_base")) cfg.api_base = j["api_base"].get(); + if (j.contains("api_key")) cfg.api_key = j["api_key"].get(); + if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get(); + if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get(); if (cfg.dim <= 0) cfg.dim = 1536; + if (cfg.batch_size <= 0) cfg.batch_size = 16; + if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000; return cfg; } @@ -488,19 +546,110 @@ static std::vector collect_needed_columns(const RagSource& s, const return cols; } -static std::string build_select_sql(const RagSource& s, const std::vector& cols) { +static std::string build_select_sql(const RagSource& s, + const std::vector& cols, + const std::string& extra_filter) { std::string sql = "SELECT "; for (size_t i = 0; i < cols.size(); i++) { if (i) sql += ", "; sql += "`" + cols[i] + "`"; } sql += " FROM `" + s.table_name + "`"; - if (!s.where_sql.empty()) { - sql += " WHERE " + s.where_sql; + if (!s.where_sql.empty() || !extra_filter.empty()) { + sql += " WHERE "; + if (!s.where_sql.empty()) { + sql += "(" + s.where_sql + ")"; + if (!extra_filter.empty()) sql += " AND "; + } + if (!extra_filter.empty()) sql += "(" + extra_filter + ")"; } return sql; } +static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql); +static void sqlite_bind_text(sqlite3_stmt* st, int idx, const std::string& v); + +static json load_sync_cursor_json(sqlite3* db, int source_id) { + sqlite3_stmt* st = nullptr; + json out = json::object(); + const char* sql = "SELECT cursor_json FROM rag_sync_state WHERE source_id=?"; + if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) { + return out; + } + sqlite3_bind_int(st, 1, source_id); + int rc = sqlite3_step(st); + if (rc == SQLITE_ROW) { + const unsigned char* txt = sqlite3_column_text(st, 0); + if (txt) { + try { + out = json::parse(reinterpret_cast(txt)); + } catch (...) { + out = json::object(); + } + } + } + sqlite3_finalize(st); + if (!out.is_object()) out = json::object(); + return out; +} + +static SyncCursor parse_sync_cursor(const json& cursor_json, const std::string& default_col) { + SyncCursor c; + c.column = default_col; + if (cursor_json.is_object()) { + if (cursor_json.contains("column") && cursor_json["column"].is_string()) { + c.column = cursor_json["column"].get(); + } + if (cursor_json.contains("value")) { + const auto& v = cursor_json["value"]; + if (v.is_number_integer()) { + c.has_value = true; + c.numeric = true; + c.num_value = v.get(); + } else if (v.is_number_float()) { + c.has_value = true; + c.numeric = true; + c.num_value = static_cast(v.get()); + } else if (v.is_string()) { + c.has_value = true; + c.str_value = v.get(); + if (is_integer_string(c.str_value)) { + c.numeric = true; + c.num_value = std::stoll(c.str_value); + } + } + } + } + return c; +} + +static std::string build_incremental_filter(const SyncCursor& c) { + if (!c.has_value || c.column.empty()) return ""; + std::string col = "`" + c.column + "`"; + if (c.numeric) { + return col + " > " + std::to_string(c.num_value); + } + return col + " > '" + sql_escape_single_quotes(c.str_value) + "'"; +} + +static void update_sync_state(sqlite3* db, int source_id, const json& cursor_json) { + const char* sql = + "INSERT INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) " + "VALUES(?, 'poll', ?, unixepoch(), NULL) " + "ON CONFLICT(source_id) DO UPDATE SET " + "cursor_json=excluded.cursor_json, last_ok_at=excluded.last_ok_at, last_error=NULL"; + sqlite3_stmt* st = nullptr; + sqlite_prepare_or_die(db, &st, sql); + sqlite3_bind_int(st, 1, source_id); + std::string cursor_str = json_dump_compact(cursor_json); + sqlite_bind_text(st, 2, cursor_str); + int rc = sqlite3_step(st); + sqlite3_finalize(st); + if (rc != SQLITE_DONE) { + fatal(std::string("SQLite upsert rag_sync_state failed: ") + sqlite3_errmsg(db)); + } +} + // ------------------------- // SQLite prepared statements (batched insertion) // ------------------------- @@ -686,6 +835,141 @@ static std::vector pseudo_embedding(const std::string& text, int dim) { return v; } +// ------------------------- +// Embedding providers +// ------------------------- + +struct EmbeddingProvider { + virtual ~EmbeddingProvider() = default; + virtual std::vector> embed(const std::vector& inputs, int dim) = 0; +}; + +struct StubEmbeddingProvider : public EmbeddingProvider { + std::vector> embed(const std::vector& inputs, int dim) override { + std::vector> out; + out.reserve(inputs.size()); + for (const auto& s : inputs) out.push_back(pseudo_embedding(s, dim)); + return out; + } +}; + +struct CurlBuffer { + std::string data; +}; + +static size_t curl_write_cb(void* contents, size_t size, size_t nmemb, void* userp) { + size_t total = size * nmemb; + CurlBuffer* buf = static_cast(userp); + buf->data.append(static_cast(contents), total); + return total; +} + +struct OpenAIEmbeddingProvider : public EmbeddingProvider { + std::string api_base; + std::string api_key; + std::string model; + int timeout_ms = 20000; + + OpenAIEmbeddingProvider(std::string base, std::string key, std::string mdl, int timeout) + : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {} + + std::vector> embed(const std::vector& inputs, int dim) override { + if (api_base.empty()) { + throw std::runtime_error("embedding api_base is empty"); + } + if (api_key.empty()) { + throw std::runtime_error("embedding api_key is empty"); + } + if (model.empty()) { + throw std::runtime_error("embedding model is empty"); + } + if (model.rfind("hf:", 0) != 0) { + std::cerr << "WARN: embedding model should be prefixed with 'hf:' per Synthetic docs\n"; + } + + json req; + req["model"] = model; + req["input"] = inputs; + if (dim > 0) { + req["dimensions"] = dim; + } + std::string body = req.dump(); + + std::string url = api_base; + if (!url.empty() && url.back() == '/') url.pop_back(); + url += "/embeddings"; + + CURL* curl = curl_easy_init(); + if (!curl) { + throw std::runtime_error("curl_easy_init failed"); + } + + CurlBuffer buf; + struct curl_slist* headers = nullptr; + std::string auth = "Authorization: Bearer " + api_key; + headers = curl_slist_append(headers, "Content-Type: application/json"); + headers = curl_slist_append(headers, auth.c_str()); + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)body.size()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_cb); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buf); + curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout_ms); + + CURLcode res = curl_easy_perform(curl); + long status = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + + if (res != CURLE_OK) { + throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res)); + } + if (status < 200 || status >= 300) { + throw std::runtime_error("embedding request failed with status " + std::to_string(status)); + } + + json resp = json::parse(buf.data); + if (!resp.contains("data") || !resp["data"].is_array()) { + throw std::runtime_error("embedding response missing data array"); + } + + std::vector> out; + out.reserve(resp["data"].size()); + for (const auto& item : resp["data"]) { + if (!item.contains("embedding") || !item["embedding"].is_array()) { + throw std::runtime_error("embedding item missing embedding array"); + } + std::vector vec; + vec.reserve(item["embedding"].size()); + for (const auto& v : item["embedding"]) { + vec.push_back(v.get()); + } + if ((int)vec.size() != dim) { + throw std::runtime_error("embedding dimension mismatch: expected " + std::to_string(dim) + + ", got " + std::to_string(vec.size())); + } + out.push_back(std::move(vec)); + } + + if (out.size() != inputs.size()) { + throw std::runtime_error("embedding response size mismatch"); + } + return out; + } +}; + +static std::unique_ptr build_embedding_provider(const EmbeddingConfig& cfg) { + if (cfg.provider == "openai") { + return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms); + } + return std::make_unique(); +} + // ------------------------- // Load rag_sources from SQLite // ------------------------- @@ -873,6 +1157,14 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { // Parse chunking & embedding config ChunkingConfig ccfg = parse_chunking_json(src.chunking_json); EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json); + std::unique_ptr embedder; + if (ecfg.enabled) { + embedder = build_embedding_provider(ecfg); + } + + // Load sync cursor (watermark) + json cursor_json = load_sync_cursor_json(sdb, src.source_id); + SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column); // Prepare SQLite statements for this run SqliteStmts ss = prepare_sqlite_statements(sdb, ecfg.enabled); @@ -880,9 +1172,11 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { // Connect MySQL MYSQL* mdb = mysql_connect_or_die(src); - // Build SELECT + // Build SELECT (include watermark column if needed) std::vector cols = collect_needed_columns(src, ecfg); - std::string sel = build_select_sql(src, cols); + if (!cursor.column.empty()) add_unique(cols, cursor.column); + std::string extra_filter = build_incremental_filter(cursor); + std::string sel = build_select_sql(src, cols, extra_filter); if (mysql_query(mdb, sel.c_str()) != 0) { std::string err = mysql_error(mdb); @@ -903,9 +1197,40 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { std::uint64_t skipped_docs = 0; MYSQL_ROW r; + bool max_set = false; + bool max_numeric = false; + std::int64_t max_num = 0; + std::string max_str; while ((r = mysql_fetch_row(res)) != nullptr) { RowMap row = mysql_row_to_map(res, r); + // Track max watermark value from source rows (even if doc is skipped) + if (!cursor.column.empty()) { + auto it = row.find(cursor.column); + if (it != row.end()) { + const std::string& v = it->second; + if (!v.empty()) { + if (!max_set) { + if (cursor.numeric || is_integer_string(v)) { + max_numeric = true; + max_num = std::stoll(v); + } else { + max_numeric = false; + max_str = v; + } + max_set = true; + } else if (max_numeric) { + if (is_integer_string(v)) { + std::int64_t nv = std::stoll(v); + if (nv > max_num) max_num = nv; + } + } else { + if (v > max_str) max_str = v; + } + } + } + } + BuiltDoc doc = build_document_from_row(src, row); // v0: skip if exists @@ -943,13 +1268,10 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { // Optional vectors if (ecfg.enabled) { - // Build embedding input text, then generate pseudo embedding. - // Replace pseudo_embedding() with a real embedding provider in ProxySQL. std::string emb_input = build_embedding_input(ecfg, row, chunks[i]); - std::vector emb = pseudo_embedding(emb_input, ecfg.dim); - - // Insert into sqlite3-vec table - sqlite_insert_vec(ss, emb, chunk_id, doc.doc_id, src.source_id, now_epoch); + std::vector batch_inputs = {emb_input}; + std::vector> vecs = embedder->embed(batch_inputs, ecfg.dim); + sqlite_insert_vec(ss, vecs[0], chunk_id, doc.doc_id, src.source_id, now_epoch); } } @@ -964,6 +1286,17 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { mysql_close(mdb); sqlite_finalize_all(ss); + if (!cursor_json.is_object()) cursor_json = json::object(); + if (!cursor.column.empty()) cursor_json["column"] = cursor.column; + if (max_set) { + if (max_numeric) { + cursor_json["value"] = max_num; + } else { + cursor_json["value"] = max_str; + } + } + update_sync_state(sdb, src.source_id, cursor_json); + std::cerr << "Done source " << src.name << " ingested_docs=" << ingested_docs << " skipped_docs=" << skipped_docs << "\n"; @@ -979,6 +1312,8 @@ int main(int argc, char** argv) { return 2; } + curl_global_init(CURL_GLOBAL_DEFAULT); + const char* sqlite_path = argv[1]; sqlite3* db = nullptr; @@ -986,6 +1321,9 @@ int main(int argc, char** argv) { fatal("Could not open SQLite DB: " + std::string(sqlite_path)); } + // Load vec0 if configured (needed for rag_vec_chunks inserts) + sqlite_load_vec_extension(db); + // Pragmas (safe defaults) sqlite_exec(db, "PRAGMA foreign_keys = ON;"); sqlite_exec(db, "PRAGMA journal_mode = WAL;"); @@ -1027,6 +1365,7 @@ int main(int argc, char** argv) { } sqlite3_close(db); + curl_global_cleanup(); return 0; } diff --git a/RAG_POC/sample_sqlite.sql b/RAG_POC/sample_sqlite.sql index 07d35d469..c9342affc 100644 --- a/RAG_POC/sample_sqlite.sql +++ b/RAG_POC/sample_sqlite.sql @@ -1,11 +1,8 @@ -- Sample SQLite setup for rag_ingest testing --- 1) Load the RAG schema (schema.sql) into a new SQLite DB. --- 2) Insert a sample rag_sources row that points to the MySQL sample. +-- Inserts a sample rag_sources row that points to the MySQL sample. +-- Note: schema.sql must be loaded separately before this script. --- Step 1: apply schema -.read ./schema.sql - --- Step 2: insert a sample source +-- insert a sample source INSERT INTO rag_sources ( source_id, name, diff --git a/RAG_POC/test_rag_ingest.sh b/RAG_POC/test_rag_ingest.sh index 5992b0d4b..146d218d8 100755 --- a/RAG_POC/test_rag_ingest.sh +++ b/RAG_POC/test_rag_ingest.sh @@ -12,10 +12,36 @@ MYSQL_PORT="${MYSQL_PORT:-3306}" MYSQL_USER="${MYSQL_USER:-root}" MYSQL_PASS="${MYSQL_PASS:-root}" +# Embedding provider configuration (for phase 4/5) +EMBEDDING_PROVIDER="${EMBEDDING_PROVIDER:-stub}" +EMBEDDING_DIM="${EMBEDDING_DIM:-1536}" +OPENAI_API_BASE="${OPENAI_API_BASE:-}" +OPENAI_API_KEY="${OPENAI_API_KEY:-}" +OPENAI_MODEL="${OPENAI_MODEL:-hf:nomic-ai/nomic-embed-text-v1.5}" +OPENAI_EMBEDDING_DIM="${OPENAI_EMBEDDING_DIM:-}" + +if [[ -z "${OPENAI_EMBEDDING_DIM}" ]]; then + if [[ "${OPENAI_MODEL}" == "hf:nomic-ai/nomic-embed-text-v1.5" ]]; then + OPENAI_EMBEDDING_DIM=768 + else + OPENAI_EMBEDDING_DIM="${EMBEDDING_DIM}" + fi +fi + +# Uncomment to test OpenAI-compatible embeddings +# export EMBEDDING_PROVIDER=openai +# export EMBEDDING_DIM=1536 +export OPENAI_API_BASE="https://api.synthetic.new/openai/v1" +export OPENAI_API_KEY="your_api_key_here" +# export OPENAI_MODEL="hf:nomic-ai/nomic-embed-text-v1.5" + DB1="${ROOT_DIR}/rag_ingest_test.db" +DB_OPENAI="${ROOT_DIR}/rag_ingest_test_openai.db" VEC_EXT="${REPO_ROOT}/deps/sqlite3/sqlite3/vec0.so" +export RAG_VEC0_EXT="${VEC_EXT}" + if [[ ! -f "${VEC_EXT}" ]]; then echo "FATAL: vec0.so not found at ${VEC_EXT}" >&2 exit 1 @@ -35,10 +61,16 @@ apply_schema_and_source() { local where_sql="$2" local load_schema="$3" local chunking_json_override="${4:-}" + local embedding_json_override="${5:-}" + local schema_override_path="${6:-}" local schema_cmd="" if [[ "${load_schema}" == "true" ]]; then - schema_cmd=".read ${ROOT_DIR}/schema.sql"$'\n'".read ${ROOT_DIR}/sample_sqlite.sql" + if [[ -n "${schema_override_path}" ]]; then + schema_cmd=".read ${schema_override_path}"$'\n'".read ${ROOT_DIR}/sample_sqlite.sql" + else + schema_cmd=".read ${ROOT_DIR}/schema.sql"$'\n'".read ${ROOT_DIR}/sample_sqlite.sql" + fi fi echo "==> SQLite DB: ${db}" @@ -49,7 +81,11 @@ apply_schema_and_source() { chunking_json_value="${chunking_json_override}" fi echo "==> chunking_json: ${chunking_json_value}" - echo "==> embedding_json: {\"enabled\":false}" + local embedding_json_value='{"enabled":false}' + if [[ -n "${embedding_json_override}" ]]; then + embedding_json_value="${embedding_json_override}" + fi + echo "==> embedding_json: ${embedding_json_value}" "${SQLITE_BIN}" "${db}" < Sample rag_documents" @@ -112,6 +162,7 @@ print_samples() { cleanup_db() { rm -f "${DB1}" + rm -f "${DB_OPENAI}" } cleanup_db @@ -149,6 +200,76 @@ assert_eq "fts term (Short)" "1" "${FTS_SHORT_1}" assert_eq "fts term (Tag)" "1" "${FTS_TAG_1}" assert_eq "fts bm25 top (ProxySQL)" "posts:3#0" "${FTS_BM25_1}" +# Phase 1a: update skip behavior (existing docs are not updated) +run_mysql_sql "USE rag_test; UPDATE posts SET Title='Hello RAG UPDATED' WHERE Id=1;" +"${ROOT_DIR}/rag_ingest" "${DB1}" +TITLE_AFTER_UPDATE="$(run_sqlite "${DB1}" "SELECT title FROM rag_documents WHERE doc_id='posts:1';")" +assert_eq "rag_documents title unchanged on update" "Hello RAG" "${TITLE_AFTER_UPDATE}" + +# Reset MySQL data after update test +import_mysql_seed + +# Phase 1b: rag_sync_state watermark (incremental ingestion) +SYNC_COL_1="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.column') FROM rag_sync_state WHERE source_id=1;")" +SYNC_VAL_1="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.value') FROM rag_sync_state WHERE source_id=1;")" + +assert_eq "rag_sync_state column" "Id" "${SYNC_COL_1}" +assert_eq "rag_sync_state value (initial)" "10" "${SYNC_VAL_1}" + +# Delete one doc to verify watermark prevents backfill +run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks WHERE chunk_id LIKE 'posts:5#%';" +run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks WHERE chunk_id LIKE 'posts:5#%';" +run_sqlite "${DB1}" "DELETE FROM rag_chunks WHERE doc_id='posts:5';" +run_sqlite "${DB1}" "DELETE FROM rag_documents WHERE doc_id='posts:5';" + +DOCS_AFTER_DELETE="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +assert_eq "rag_documents after delete" "9" "${DOCS_AFTER_DELETE}" + +"${ROOT_DIR}/rag_ingest" "${DB1}" + +DOCS_AFTER_REINGEST="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +CHUNKS_AFTER_REINGEST="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")" +FTS_AFTER_REINGEST="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_fts_chunks;")" + +assert_eq "rag_documents after watermark reingest" "9" "${DOCS_AFTER_REINGEST}" +assert_eq "rag_chunks after watermark reingest" "9" "${CHUNKS_AFTER_REINGEST}" +assert_eq "rag_fts_chunks after watermark reingest" "9" "${FTS_AFTER_REINGEST}" + +# Insert a new source row and ensure only it is ingested +run_mysql_sql "USE rag_test; INSERT INTO posts (Id, Title, Body, Tags, Score, CreationDate, UpdatedAt) VALUES (11, 'Watermark New', 'This row should be ingested via watermark.', 'wm,test', 1, '2024-01-14 10:00:00', '2024-01-14 11:00:00');" + +"${ROOT_DIR}/rag_ingest" "${DB1}" + +DOCS_AFTER_NEW="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +SYNC_VAL_2="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.value') FROM rag_sync_state WHERE source_id=1;")" + +assert_eq "rag_documents after new row" "10" "${DOCS_AFTER_NEW}" +assert_eq "rag_sync_state value (after new row)" "11" "${SYNC_VAL_2}" + +# Reset sync state for subsequent phases +run_sqlite "${DB1}" "DELETE FROM rag_sync_state;" + +# Reset MySQL data after watermark insert +import_mysql_seed + +# Phase 1c: UpdatedAt-based watermark filtering +run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_documents;" +run_sqlite "${DB1}" "INSERT OR REPLACE INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) VALUES (1, 'poll', '{\"column\":\"UpdatedAt\",\"value\":\"2024-01-10 10:00:00\"}', NULL, NULL);" + +"${ROOT_DIR}/rag_ingest" "${DB1}" + +DOCS_UPDATED_AT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +SYNC_UPDATED_AT="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.value') FROM rag_sync_state WHERE source_id=1;")" + +assert_eq "rag_documents (UpdatedAt watermark)" "1" "${DOCS_UPDATED_AT}" +assert_eq "rag_sync_state value (UpdatedAt)" "2024-01-12 09:30:00" "${SYNC_UPDATED_AT}" + +# Reset sync state for subsequent phases +run_sqlite "${DB1}" "DELETE FROM rag_sync_state;" + # Phase 2: apply where filter, re-ingest after cleanup run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;" run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;" @@ -183,6 +304,7 @@ assert_eq "fts term (Low)" "0" "${FTS_LOW_2}" assert_eq "fts bm25 top (High)" "posts:9#0" "${FTS_BM25_2}" # Phase 3: enable chunking and ensure rows split into multiple chunks +run_sqlite "${DB1}" "DELETE FROM rag_sync_state;" run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;" run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;" run_sqlite "${DB1}" "DELETE FROM rag_chunks;" @@ -207,4 +329,49 @@ fi print_samples "${DB1}" +# Phase 4: enable embeddings (stub) and validate vec rows +run_sqlite "${DB1}" "DELETE FROM rag_sync_state;" +run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_chunks;" +run_sqlite "${DB1}" "DELETE FROM rag_documents;" + +apply_schema_and_source "${DB1}" "" "false" '' "{\"enabled\":true,\"provider\":\"${EMBEDDING_PROVIDER}\",\"dim\":${EMBEDDING_DIM},\"input\":{\"concat\":[{\"col\":\"Title\"},{\"lit\":\"\\n\"},{\"chunk_body\":true}]}}" +"${ROOT_DIR}/rag_ingest" "${DB1}" + +DOCS_COUNT_4="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")" +CHUNKS_COUNT_4="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")" +VEC_COUNT_4="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_vec_chunks;")" + +assert_eq "rag_documents (embeddings enabled)" "10" "${DOCS_COUNT_4}" +assert_eq "rag_chunks (embeddings enabled)" "10" "${CHUNKS_COUNT_4}" +assert_eq "rag_vec_chunks (embeddings enabled)" "10" "${VEC_COUNT_4}" + +VEC_MATCH_1="$(vec_self_match "${DB1}" 'posts:1#0')" +assert_eq "vec self-match (posts:1#0)" "posts:1#0" "${VEC_MATCH_1}" + +print_samples "${DB1}" + +# Phase 5: optional OpenAI-compatible embeddings test (requires env vars) +if [[ -n "${OPENAI_API_BASE}" && -n "${OPENAI_API_KEY}" ]]; then + OPENAI_SCHEMA_TMP="${ROOT_DIR}/schema_openai_tmp.sql" + sed "s/embedding float\[1536\]/embedding float[${OPENAI_EMBEDDING_DIM}]/" "${ROOT_DIR}/schema.sql" > "${OPENAI_SCHEMA_TMP}" + + apply_schema_and_source "${DB_OPENAI}" "" "true" '' "{\"enabled\":true,\"provider\":\"openai\",\"api_base\":\"${OPENAI_API_BASE}\",\"api_key\":\"${OPENAI_API_KEY}\",\"model\":\"${OPENAI_MODEL}\",\"dim\":${OPENAI_EMBEDDING_DIM},\"input\":{\"concat\":[{\"col\":\"Title\"},{\"lit\":\"\\n\"},{\"chunk_body\":true}]}}" "${OPENAI_SCHEMA_TMP}" + "${ROOT_DIR}/rag_ingest" "${DB_OPENAI}" + + DOCS_COUNT_5="$(run_sqlite "${DB_OPENAI}" "SELECT COUNT(*) FROM rag_documents;")" + CHUNKS_COUNT_5="$(run_sqlite "${DB_OPENAI}" "SELECT COUNT(*) FROM rag_chunks;")" + VEC_COUNT_5="$(run_sqlite "${DB_OPENAI}" "SELECT COUNT(*) FROM rag_vec_chunks;")" + + assert_eq "rag_documents (openai embeddings)" "10" "${DOCS_COUNT_5}" + assert_eq "rag_chunks (openai embeddings)" "10" "${CHUNKS_COUNT_5}" + assert_eq "rag_vec_chunks (openai embeddings)" "10" "${VEC_COUNT_5}" + + print_samples "${DB_OPENAI}" + rm -f "${OPENAI_SCHEMA_TMP}" +else + echo "==> OpenAI embeddings test skipped (set OPENAI_API_BASE and OPENAI_API_KEY)" +fi + echo "All tests passed." From 9ba3df0ce7bf8514dae010fd899b6cd537bb3746 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Fri, 23 Jan 2026 13:20:49 +0000 Subject: [PATCH 7/9] Address AI code review feedback from PR #5318 - Fix Makefile: Use $(CXXFLAGS) directly for consistency with build philosophy - Fix MySQL_Catalog: Return proper error JSON instead of empty array on missing query --- Makefile | 2 +- lib/MySQL_Catalog.cpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index dc21ee351..4a54e321e 100644 --- a/Makefile +++ b/Makefile @@ -155,7 +155,7 @@ build_src_debug: $(if $(LEGACY_BUILD),build_src_debug_legacy,build_src_debug_def # RAG ingester (PoC) .PHONY: rag_ingest rag_ingest: build_deps - cd RAG_POC && ${MAKE} CC=${CC} CXX=${CXX} OPTZ="${O2} -ggdb" CXXFLAGS="${O2} -ggdb" + cd RAG_POC && ${MAKE} CC=${CC} CXX=${CXX} CXXFLAGS="${CXXFLAGS}" .PHONY: rag_ingest_clean rag_ingest_clean: diff --git a/lib/MySQL_Catalog.cpp b/lib/MySQL_Catalog.cpp index 5db2b0504..35c5e2681 100644 --- a/lib/MySQL_Catalog.cpp +++ b/lib/MySQL_Catalog.cpp @@ -290,7 +290,8 @@ std::string MySQL_Catalog::search( // FTS5 search requires a query if (query.empty()) { proxy_error("Catalog search requires a query parameter\n"); - return "[]"; + nlohmann::json error_result = {{"error", "Catalog search requires a query parameter"}}; + return error_result.dump(); } // Helper lambda to escape single quotes for SQLite SQL literals From 38e5e8e56beff5bf8b6d4bd9a9648128450559b5 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Fri, 23 Jan 2026 13:51:44 +0000 Subject: [PATCH 8/9] Fix critical issues from coderabbitai review - Fix NULL pointer dereference in rag_ingest.cpp: use str_or_empty() helper for all sqlite3_column_text results assigned to std::string - Fix NULL tags/links crash in MySQL_Catalog.cpp: add null guards before assigning sqlite3_column_text results to std::string - Fix missing curl_global_cleanup on error path in rag_ingest.cpp - Fix std::out_of_range exception in rag_ingest.cpp: wrap std::stoll calls in try-catch blocks, fall back to string comparison on overflow --- RAG_POC/rag_ingest.cpp | 45 ++++++++++++++++++++++++++++++------------ lib/MySQL_Catalog.cpp | 12 +++++++---- 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index bd57ee840..df30e3556 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -990,19 +990,19 @@ static std::vector load_sources(sqlite3* db) { while (sqlite3_step(st) == SQLITE_ROW) { RagSource s; s.source_id = sqlite3_column_int(st, 0); - s.name = (const char*)sqlite3_column_text(st, 1); + s.name = str_or_empty((const char*)sqlite3_column_text(st, 1)); s.enabled = sqlite3_column_int(st, 2); - s.backend_type = (const char*)sqlite3_column_text(st, 3); - s.host = (const char*)sqlite3_column_text(st, 4); + s.backend_type = str_or_empty((const char*)sqlite3_column_text(st, 3)); + s.host = str_or_empty((const char*)sqlite3_column_text(st, 4)); s.port = sqlite3_column_int(st, 5); - s.user = (const char*)sqlite3_column_text(st, 6); - s.pass = (const char*)sqlite3_column_text(st, 7); - s.db = (const char*)sqlite3_column_text(st, 8); + s.user = str_or_empty((const char*)sqlite3_column_text(st, 6)); + s.pass = str_or_empty((const char*)sqlite3_column_text(st, 7)); + s.db = str_or_empty((const char*)sqlite3_column_text(st, 8)); - s.table_name = (const char*)sqlite3_column_text(st, 9); - s.pk_column = (const char*)sqlite3_column_text(st, 10); - s.where_sql = (const char*)sqlite3_column_text(st, 11); + s.table_name = str_or_empty((const char*)sqlite3_column_text(st, 9)); + s.pk_column = str_or_empty((const char*)sqlite3_column_text(st, 10)); + s.where_sql = str_or_empty((const char*)sqlite3_column_text(st, 11)); const char* doc_map = (const char*)sqlite3_column_text(st, 12); const char* chunk_j = (const char*)sqlite3_column_text(st, 13); @@ -1212,8 +1212,18 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { if (!v.empty()) { if (!max_set) { if (cursor.numeric || is_integer_string(v)) { - max_numeric = true; - max_num = std::stoll(v); + try { + max_numeric = true; + max_num = std::stoll(v); + } catch (const std::out_of_range& e) { + // Huge integer - fall back to string comparison + max_numeric = false; + max_str = v; + } catch (const std::invalid_argument& e) { + // Not actually a number despite is_integer_string check + max_numeric = false; + max_str = v; + } } else { max_numeric = false; max_str = v; @@ -1221,8 +1231,16 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { max_set = true; } else if (max_numeric) { if (is_integer_string(v)) { - std::int64_t nv = std::stoll(v); - if (nv > max_num) max_num = nv; + try { + std::int64_t nv = std::stoll(v); + if (nv > max_num) max_num = nv; + } catch (const std::out_of_range& e) { + // Huge integer - fall back to string comparison + max_numeric = false; + max_str = v; + } catch (const std::invalid_argument& e) { + // Not actually a number - skip this value + } } } else { if (v > max_str) max_str = v; @@ -1361,6 +1379,7 @@ int main(int argc, char** argv) { } else { sqlite_exec(db, "ROLLBACK;"); sqlite3_close(db); + curl_global_cleanup(); return 1; } diff --git a/lib/MySQL_Catalog.cpp b/lib/MySQL_Catalog.cpp index 35c5e2681..7a56690f8 100644 --- a/lib/MySQL_Catalog.cpp +++ b/lib/MySQL_Catalog.cpp @@ -384,8 +384,10 @@ std::string MySQL_Catalog::search( entry["document"] = nullptr; } - entry["tags"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 4)); - entry["links"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 5)); + const char* tags_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 4); + entry["tags"] = tags_str ? std::string(tags_str) : nullptr; + const char* links_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 5); + entry["links"] = links_str ? std::string(links_str) : nullptr; results.push_back(entry); } @@ -504,8 +506,10 @@ std::string MySQL_Catalog::list( entry["document"] = nullptr; } - entry["tags"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 4)); - entry["links"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 5)); + const char* tags_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 4); + entry["tags"] = tags_str ? std::string(tags_str) : nullptr; + const char* links_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 5); + entry["links"] = links_str ? std::string(links_str) : nullptr; results.push_back(entry); } From 950f163bfb15c080736fd67aa67a8977cd8e9ff8 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Fri, 23 Jan 2026 14:17:20 +0000 Subject: [PATCH 9/9] Fix compilation issues: use static libcurl and improve includes - Add curl include path from deps/curl/curl/include - Link against static libcurl.a from deps - Add zlib dependency (-lz) required by libcurl - Use "" style includes for local headers (sqlite3.h, mysql.h, crypt.h, curl/curl.h) - Rename sqlite_bind_text helper to bind_text to avoid collision with sqlite3_bind_text API - Add comprehensive Doxygen documentation to rag_ingest.cpp --- RAG_POC/Makefile | 8 +- RAG_POC/rag_ingest.cpp | 1400 +++++++++++++++++++++++++++++++--------- 2 files changed, 1102 insertions(+), 306 deletions(-) diff --git a/RAG_POC/Makefile b/RAG_POC/Makefile index 36402f56e..d97133a68 100644 --- a/RAG_POC/Makefile +++ b/RAG_POC/Makefile @@ -6,14 +6,18 @@ ROOT_DIR := .. INCLUDES := \ -I$(ROOT_DIR)/deps/json \ -I$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/include \ - -I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 + -I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 \ + -I$(ROOT_DIR)/deps/curl/curl/include LIBDIRS := \ -L$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/libmariadb SQLITE3_OBJ := $(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400/sqlite3.o -LIBS := -lmariadbclient -lssl -lcrypto -lcrypt -ldl -lpthread +# Use static libcurl +CURL_STATIC_LIB := $(ROOT_DIR)/deps/curl/curl/lib/.libs/libcurl.a + +LIBS := -lmariadbclient -lssl -lcrypto -lcrypt -ldl -lpthread $(CURL_STATIC_LIB) -lz TARGET := rag_ingest SOURCES := rag_ingest.cpp diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index df30e3556..9c8b628ca 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -1,101 +1,224 @@ -// rag_ingest.cpp -// -// ------------------------------------------------------------ -// ProxySQL RAG Ingestion PoC (General-Purpose) -// ------------------------------------------------------------ -// -// What this program does (v0): -// 1) Opens the SQLite "RAG index" database (schema.sql must already be applied). -// 2) Reads enabled sources from rag_sources. -// 3) For each source: -// - Connects to MySQL (for now). -// - Builds a SELECT that fetches only needed columns. -// - For each row: -// * Builds doc_id / title / body / metadata_json using doc_map_json. -// * Chunks body using chunking_json. -// * Inserts into: -// rag_documents -// rag_chunks -// rag_fts_chunks (FTS5 contentless table) -// * Optionally builds embedding input text using embedding_json and inserts -// embeddings into rag_vec_chunks (sqlite3-vec) via a stub embedding provider. -// - Skips docs that already exist (v0 requirement). -// -// Later (v1+): -// - Add rag_sync_state usage for incremental ingestion (watermark/CDC). -// - Add hashing to detect changed docs/chunks and update/reindex accordingly. -// - Replace the embedding stub with a real embedding generator. -// -// ------------------------------------------------------------ -// Dependencies -// ------------------------------------------------------------ -// - sqlite3 -// - MySQL client library (mysqlclient / libmysqlclient) -// - nlohmann/json (single header json.hpp) -// -// Build example (Linux/macOS): -// g++ -std=c++17 -O2 rag_ingest.cpp -o rag_ingest \ -// -lsqlite3 -lmysqlclient -// -// Usage: -// ./rag_ingest /path/to/rag_index.sqlite -// -// Notes: -// - This is a blueprint-grade PoC, written to be readable and modifiable. -// - It uses a conservative JSON mapping language so ingestion is deterministic. -// - It avoids advanced C++ patterns on purpose. -// -// ------------------------------------------------------------ -// Supported JSON Specs -// ------------------------------------------------------------ -// -// doc_map_json (required): -// { -// "doc_id": { "format": "posts:{Id}" }, -// "title": { "concat": [ {"col":"Title"} ] }, -// "body": { "concat": [ {"col":"Body"} ] }, -// "metadata": { -// "pick": ["Id","Tags","Score","CreaionDate"], -// "rename": {"CreaionDate":"CreationDate"} -// } -// } -// -// chunking_json (required, v0 chunks doc "body" only): -// { -// "enabled": true, -// "unit": "chars", // v0 supports "chars" only -// "chunk_size": 4000, -// "overlap": 400, -// "min_chunk_size": 800 -// } -// -// embedding_json (optional): -// { -// "enabled": true, -// "dim": 1536, -// "model": "text-embedding-3-large", // informational -// "input": { "concat": [ -// {"col":"Title"}, -// {"lit":"\nTags: "}, {"col":"Tags"}, -// {"lit":"\n\n"}, -// {"chunk_body": true} -// ]} -// } -// -// ------------------------------------------------------------ -// sqlite3-vec binding note -// ------------------------------------------------------------ -// sqlite3-vec "vec0(embedding float[N])" generally expects a vector value. -// The exact binding format can vary by build/config of sqlite3-vec. -// This program includes a "best effort" binder that binds a float array as a BLOB. -// If your sqlite3-vec build expects a different representation (e.g. a function to -// pack vectors), adapt bind_vec_embedding() accordingly. -// ------------------------------------------------------------ - -#include -#include -#include -#include +/** + * @file rag_ingest.cpp + * @brief ProxySQL RAG (Retrieval-Augmented Generation) Ingestion Tool + * + * @verbatim + * ProxySQL RAG Ingestion PoC (General-Purpose) + * @endverbatim + * + * @section overview Overview + * + * This program is a general-purpose ingestion tool for ProxySQL's RAG index. + * It reads data from external sources (currently MySQL), transforms it according + * to configurable JSON specifications, chunks the content, builds full-text + * search indexes, and optionally generates vector embeddings for semantic search. + * + * @section v0_features v0 Features + * + * - Reads enabled sources from rag_sources table + * - Connects to MySQL backend and fetches data using configurable SELECT queries + * - Transforms rows using doc_map_json specification + * - Chunks document bodies using configurable chunking parameters + * - Inserts into rag_documents, rag_chunks, rag_fts_chunks (FTS5) + * - Optionally generates embeddings and inserts into rag_vec_chunks (sqlite3-vec) + * - Skips documents that already exist (no upsert in v0) + * - Supports incremental sync using watermark-based cursor tracking + * + * @section future_plans Future Plans (v1+) + * + * - Add hash-based change detection for efficient updates + * - Support document updates (not just skip-if-exists) + * - Add PostgreSQL backend support + * - Add async embedding workers + * - Add operational metrics and monitoring + * + * @section dependencies Dependencies + * + * - sqlite3: For RAG index storage + * - mysqlclient / libmysqlclient: For MySQL backend connections + * - libcurl: For HTTP-based embedding providers (OpenAI-compatible) + * - nlohmann/json: Single-header JSON library (json.hpp) + * - libcrypt: For sha256_crypt_r weak alias (platform compatibility) + * + * @section building Building + * + * @verbatim + * g++ -std=c++17 -O2 rag_ingest.cpp -o rag_ingest \ + * -lsqlite3 -lmysqlclient -lcurl -lcrypt + * @endverbatim + * + * @section usage Usage + * + * @verbatim + * ./rag_ingest /path/to/rag_index.sqlite + * @endverbatim + * + * The RAG_VEC0_EXT environment variable can be set to specify the path + * to the sqlite3-vec extension: + * + * @verbatim + * export RAG_VEC0_EXT=/usr/local/lib/sqlite3-vec.so + * ./rag_ingest /path/to/rag_index.sqlite + * @endverbatim + * + * @section architecture Architecture + * + * @subsection ingestion_flow Ingestion Flow + * + *
+ * 1. Open SQLite RAG index database
+ * 2. Load sqlite3-vec extension (if RAG_VEC0_EXT is set)
+ * 3. Load enabled sources from rag_sources table
+ * 4. For each source:
+ *    a. Parse chunking_json and embedding_json configurations
+ *    b. Load sync cursor (watermark) from rag_sync_state
+ *    c. Connect to MySQL backend
+ *    d. Build minimal SELECT query (only fetch needed columns)
+ *    e. Add incremental filter based on watermark
+ *    f. For each row:
+ *       i. Build document using doc_map_json specification
+ *       ii. Check if doc_id already exists (skip if yes)
+ *       iii. Insert document into rag_documents
+ *       iv. Chunk the document body
+ *       v. For each chunk:
+ *          - Insert into rag_chunks
+ *          - Insert into rag_fts_chunks (FTS5)
+ *          - If embedding enabled: generate and insert embedding
+ *    g. Update sync cursor with max watermark value
+ * 5. Commit transaction or rollback on error
+ * 
+ * + * @subsection json_specs JSON Configuration Specifications + * + * @subsubsection doc_map_json doc_map_json (Required) + * + * Defines how to transform a source row into a canonical document: + * + * @verbatim + * { + * "doc_id": { "format": "posts:{Id}" }, + * "title": { "concat": [ {"col":"Title"} ] }, + * "body": { "concat": [ {"col":"Body"} ] }, + * "metadata": { + * "pick": ["Id","Tags","Score","CreationDate"], + * "rename": {"CreationDate":"Created"} + * } + * } + * @endverbatim + * + * Fields: + * - doc_id.format: Template string with {ColumnName} placeholders + * - title.concat: Array of column references and literals + * - body.concat: Array of column references and literals + * - metadata.pick: Columns to include in metadata JSON + * - metadata.rename: Map of old_name -> new_name for metadata keys + * + * @subsubsection chunking_json chunking_json (Required) + * + * Defines how to split document bodies into chunks: + * + * @verbatim + * { + * "enabled": true, + * "unit": "chars", // v0 only supports "chars" + * "chunk_size": 4000, // Target chunk size in characters + * "overlap": 400, // Overlap between consecutive chunks + * "min_chunk_size": 800 // Minimum chunk size (avoid tiny tail chunks) + * } + * @endverbatim + * + * @subsubsection embedding_json embedding_json (Optional) + * + * Defines how to generate embeddings for chunks: + * + * @verbatim + * { + * "enabled": true, + * "dim": 1536, + * "model": "text-embedding-3-large", + * "provider": "openai", // "stub" or "openai" + * "api_base": "https://api.openai.com/v1", + * "api_key": "sk-...", + * "batch_size": 16, + * "timeout_ms": 20000, + * "input": { "concat": [ + * {"col":"Title"}, + * {"lit":"\nTags: "}, {"col":"Tags"}, + * {"lit":"\n\n"}, + * {"chunk_body": true} + * ]} + * } + * @endverbatim + * + * The concat array supports: + * - {"col":"ColumnName"}: Include column value + * - {"lit":"literal text"}: Include literal string + * - {"chunk_body":true}: Include the chunk body text + * + * @subsection concat_spec Concat Specification + * + * The concat specification is used in multiple places (title, body, embedding input). + * It's a JSON array of objects, each describing one part to concatenate: + * + * @verbatim + * [ + * {"col": "Title"}, // Use value from "Title" column + * {"lit": "\nTags: "}, // Use literal string + * {"col": "Tags"}, // Use value from "Tags" column + * {"lit": "\n\n"}, + * {"chunk_body": true} // Use chunk body (embedding only) + * ] + * @endverbatim + * + * @subsection sqlite3_vec sqlite3-vec Integration + * + * The sqlite3-vec extension provides vector similarity search capabilities. + * This program binds float32 arrays as BLOBs for the embedding column. + * + * The binding format may vary by sqlite3-vec build. If your build expects + * a different format, modify bind_vec_embedding() accordingly. + * + * @subsection sync_cursor Sync Cursor (Watermark) + * + * The sync cursor enables incremental ingestion by tracking the last processed + * value from a monotonic column (e.g., auto-increment ID, timestamp). + * + * Stored in rag_sync_state.cursor_json: + * @verbatim + * { + * "column": "Id", // Column name for watermark + * "value": 12345 // Last processed value + * } + * @endverbatim + * + * On next run, only rows WHERE column > value are fetched. + * + * @section error_handling Error Handling + * + * This program uses a "fail-fast" approach: + * - JSON parsing errors are fatal (invalid configuration) + * - Database connection errors are fatal + * - SQL execution errors are fatal + * - Errors during ingestion rollback the entire transaction + * + * All fatal errors call fatal() which prints the message and exits with code 1. + * + * @section threading Threading Model + * + * v0 is single-threaded. Future versions may support: + * - Concurrent source ingestion + * - Async embedding generation + * - Parallel chunk processing + * + * @author ProxySQL Development Team + * @version 0.1.0 + * @date 2024 + */ + +#include "sqlite3.h" +#include "mysql.h" +#include "crypt.h" +#include "curl/curl.h" #include #include @@ -111,6 +234,23 @@ #include "json.hpp" using json = nlohmann::json; +/** + * @brief Weak alias for sha256_crypt_r function + * + * This weak symbol alias provides compatibility across platforms where + * sha256_crypt_r may or may not be available in libcrypt. If the + * symbol exists in libcrypt, it will be used. Otherwise, this + * implementation using crypt_r() will be linked. + * + * @param key The key/password to hash + * @param salt The salt string + * @param buffer Output buffer for the hashed result + * @param buflen Size of the output buffer + * @return Pointer to hashed result on success, nullptr on failure + * + * @note This is a workaround for library inconsistencies across platforms. + * The actual hashing is delegated to crypt_r(). + */ extern "C" __attribute__((weak)) char *sha256_crypt_r( const char *key, const char *salt, @@ -133,19 +273,51 @@ extern "C" __attribute__((weak)) char *sha256_crypt_r( return buffer; } -// ------------------------- -// Small helpers -// ------------------------- - +// =========================================================================== +// Utility Functions +// =========================================================================== + +/** + * @brief Print fatal error message and exit + * + * This function prints an error message to stderr and terminates the + * program with exit code 1. It is used for unrecoverable errors where + * continuing execution is impossible or unsafe. + * + * @param msg The error message to print + * + * @note This function does not return. It calls std::exit(1). + */ static void fatal(const std::string& msg) { std::cerr << "FATAL: " << msg << "\n"; std::exit(1); } +/** + * @brief Safely convert C string to std::string + * + * Converts a potentially null C string pointer to a std::string. + * If the pointer is null, returns an empty string. This prevents + * undefined behavior from constructing std::string(nullptr). + * + * @param p Pointer to C string (may be null) + * @return std::string containing the C string content, or empty string if p is null + */ static std::string str_or_empty(const char* p) { return p ? std::string(p) : std::string(); } +/** + * @brief Execute a SQL statement on SQLite + * + * Executes a SQL statement that does not return rows (e.g., INSERT, + * UPDATE, DELETE, PRAGMA). If an error occurs, it prints the error + * message and SQL to stderr. + * + * @param db SQLite database connection + * @param sql SQL statement to execute + * @return int SQLite return code (SQLITE_OK on success, error code otherwise) + */ static int sqlite_exec(sqlite3* db, const std::string& sql) { char* err = nullptr; int rc = sqlite3_exec(db, sql.c_str(), nullptr, nullptr, &err); @@ -157,6 +329,15 @@ static int sqlite_exec(sqlite3* db, const std::string& sql) { return rc; } +/** + * @brief Check if a string represents an integer + * + * Determines if the given string consists only of digits, optionally + * preceded by a minus sign. Used for watermark value type detection. + * + * @param s String to check + * @return true if string is a valid integer representation, false otherwise + */ static bool is_integer_string(const std::string& s) { if (s.empty()) return false; size_t i = 0; @@ -170,6 +351,17 @@ static bool is_integer_string(const std::string& s) { return true; } +/** + * @brief Escape single quotes in SQL string literals + * + * Doubles single quotes in a string for safe SQL literal construction. + * This is used for building incremental filter conditions. + * + * @param s Input string to escape + * @return std::string with single quotes doubled (e.g., "it's" -> "it''s") + * + * @note This is basic escaping. For user input, prefer parameter binding. + */ static std::string sql_escape_single_quotes(const std::string& s) { std::string out; out.reserve(s.size() + 8); @@ -180,11 +372,31 @@ static std::string sql_escape_single_quotes(const std::string& s) { return out; } +/** + * @brief Serialize JSON to compact string + * + * Converts a JSON object to a compact string representation without + * whitespace or pretty-printing. Used for efficient storage. + * + * @param j JSON object to serialize + * @return std::string Compact JSON representation + */ static std::string json_dump_compact(const json& j) { - // Compact output (no pretty printing) to keep storage small. return j.dump(); } +/** + * @brief Load sqlite3-vec extension if configured + * + * Loads the sqlite3-vec extension from the path specified in the + * RAG_VEC0_EXT environment variable. This is required for vector + * similarity search functionality. + * + * @param db SQLite database connection + * + * @note If RAG_VEC0_EXT is not set or empty, this function does nothing. + * The vec0 extension is only required when embedding generation is enabled. + */ static void sqlite_load_vec_extension(sqlite3* db) { const char* ext = std::getenv("RAG_VEC0_EXT"); if (!ext || std::strlen(ext) == 0) return; @@ -199,69 +411,111 @@ static void sqlite_load_vec_extension(sqlite3* db) { } } -// ------------------------- -// Data model -// ------------------------- +// =========================================================================== +// Data Structures +// =========================================================================== +/** + * @brief Configuration for a single RAG data source + * + * Represents all configuration needed to ingest data from a single + * external source into the RAG index. Loaded from the rag_sources table. + */ struct RagSource { - int source_id = 0; - std::string name; - int enabled = 0; - - // backend connection - std::string backend_type; // "mysql" for now - std::string host; - int port = 3306; - std::string user; - std::string pass; - std::string db; - - // table - std::string table_name; - std::string pk_column; - std::string where_sql; // optional - - // transformation config - json doc_map_json; - json chunking_json; - json embedding_json; // optional; may be null/object + int source_id = 0; ///< Unique source identifier from database + std::string name; ///< Human-readable source name + int enabled = 0; ///< Whether this source is enabled (0/1) + + // Backend connection settings + std::string backend_type; ///< Type of backend ("mysql" in v0) + std::string host; ///< Backend server hostname or IP + int port = 3306; ///< Backend server port + std::string user; ///< Backend authentication username + std::string pass; ///< Backend authentication password + std::string db; ///< Backend database name + + // Source table settings + std::string table_name; ///< Name of the table to read from + std::string pk_column; ///< Primary key column name + std::string where_sql; ///< Optional WHERE clause for row filtering + + // Transformation configuration (JSON) + json doc_map_json; ///< Document transformation specification + json chunking_json; ///< Chunking configuration + json embedding_json; ///< Embedding configuration (may be null) }; +/** + * @brief Parsed chunking configuration + * + * Controls how document bodies are split into chunks for indexing. + * Chunks improve retrieval precision by matching smaller, more + * focused text segments. + */ struct ChunkingConfig { - bool enabled = true; - std::string unit = "chars"; // v0 only supports chars - int chunk_size = 4000; - int overlap = 400; - int min_chunk_size = 800; + bool enabled = true; ///< Whether chunking is enabled + std::string unit = "chars"; ///< Unit of measurement ("chars" in v0) + int chunk_size = 4000; ///< Target size of each chunk + int overlap = 400; ///< Overlap between consecutive chunks + int min_chunk_size = 800; ///< Minimum size to avoid tiny tail chunks }; +/** + * @brief Parsed embedding configuration + * + * Controls how vector embeddings are generated for chunks. + * Embeddings enable semantic similarity search. + */ struct EmbeddingConfig { - bool enabled = false; - int dim = 1536; - std::string model = "unknown"; - json input_spec; // expects {"concat":[...]} - std::string provider = "stub"; // stub | openai - std::string api_base; - std::string api_key; - int batch_size = 16; - int timeout_ms = 20000; + bool enabled = false; ///< Whether embedding generation is enabled + int dim = 1536; ///< Vector dimension (model-specific) + std::string model = "unknown"; ///< Model name (for observability) + json input_spec; ///< Embedding input template {"concat":[...]} + std::string provider = "stub"; ///< Provider type: "stub" or "openai" + std::string api_base; ///< API endpoint URL (for "openai" provider) + std::string api_key; ///< API authentication key + int batch_size = 16; ///< Maximum inputs per API call (unused in v0) + int timeout_ms = 20000; ///< Request timeout in milliseconds }; +/** + * @brief Sync cursor for incremental ingestion + * + * Tracks the last processed watermark value for incremental sync. + * Only rows with watermark > cursor.value are fetched on subsequent runs. + */ struct SyncCursor { - std::string column; - bool has_value = false; - bool numeric = false; - std::int64_t num_value = 0; - std::string str_value; + std::string column; ///< Column name used for watermark + bool has_value = false; ///< Whether a cursor value has been set + bool numeric = false; ///< Whether the value is numeric (vs string) + std::int64_t num_value = 0; ///< Numeric watermark value (if numeric=true) + std::string str_value; ///< String watermark value (if numeric=false) }; -// A row fetched from MySQL, as a name->string map. +/** + * @brief Row representation from MySQL backend + * + * A map from column name to string value. All MySQL values are + * represented as strings for simplicity; conversion happens + * during document building. + */ typedef std::unordered_map RowMap; -// ------------------------- -// JSON parsing -// ------------------------- - +// =========================================================================== +// JSON Parsing Functions +// =========================================================================== + +/** + * @brief Parse chunking_json into ChunkingConfig struct + * + * Extracts and validates chunking configuration from JSON. + * Applies sensible defaults for missing or invalid values. + * + * @param j JSON object containing chunking configuration + * @return ChunkingConfig Parsed configuration with defaults applied + * + * @note Only "chars" unit is supported in v0. Other units fall back to "chars". + */ static ChunkingConfig parse_chunking_json(const json& j) { ChunkingConfig cfg; if (!j.is_object()) return cfg; @@ -272,6 +526,7 @@ static ChunkingConfig parse_chunking_json(const json& j) { if (j.contains("overlap")) cfg.overlap = j["overlap"].get(); if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get(); + // Apply sanity checks and defaults if (cfg.chunk_size <= 0) cfg.chunk_size = 4000; if (cfg.overlap < 0) cfg.overlap = 0; if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4; @@ -287,6 +542,15 @@ static ChunkingConfig parse_chunking_json(const json& j) { return cfg; } +/** + * @brief Parse embedding_json into EmbeddingConfig struct + * + * Extracts and validates embedding configuration from JSON. + * Applies sensible defaults for missing or invalid values. + * + * @param j JSON object containing embedding configuration (may be null) + * @return EmbeddingConfig Parsed configuration with defaults applied + */ static EmbeddingConfig parse_embedding_json(const json& j) { EmbeddingConfig cfg; if (!j.is_object()) return cfg; @@ -301,27 +565,53 @@ static EmbeddingConfig parse_embedding_json(const json& j) { if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get(); if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get(); + // Apply defaults if (cfg.dim <= 0) cfg.dim = 1536; if (cfg.batch_size <= 0) cfg.batch_size = 16; if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000; return cfg; } -// ------------------------- -// Row access -// ------------------------- - +// =========================================================================== +// Row Access Helpers +// =========================================================================== + +/** + * @brief Get value from RowMap by key + * + * Safely retrieves a value from a RowMap (column name -> value mapping). + * Returns std::nullopt if the key is not present, allowing the caller + * to distinguish between missing keys and empty values. + * + * @param row The RowMap to query + * @param key Column name to look up + * @return std::optional Value if present, nullopt otherwise + */ static std::optional row_get(const RowMap& row, const std::string& key) { auto it = row.find(key); if (it == row.end()) return std::nullopt; return it->second; } -// ------------------------- -// doc_id.format implementation -// ------------------------- -// Replaces occurrences of {ColumnName} with the value from the row map. -// Example: "posts:{Id}" -> "posts:12345" +// =========================================================================== +// Format String Template Engine +// =========================================================================== + +/** + * @brief Apply format template with row data substitution + * + * Replaces {ColumnName} placeholders in a format string with actual + * values from the row. Used for generating document IDs. + * + * Example: "posts:{Id}" with row["Id"] = "123" becomes "posts:123" + * + * @param fmt Format string with {ColumnName} placeholders + * @param row Row data for substitution + * @return std::string Formatted string with substitutions applied + * + * @note Unmatched '{' characters are treated as literals. + * Missing column names result in empty substitution. + */ static std::string apply_format(const std::string& fmt, const RowMap& row) { std::string out; out.reserve(fmt.size() + 32); @@ -346,14 +636,27 @@ static std::string apply_format(const std::string& fmt, const RowMap& row) { return out; } -// ------------------------- -// concat spec implementation -// ------------------------- -// Supported elements in concat array: -// {"col":"Title"} -> append row["Title"] if present -// {"lit":"\n\n"} -> append literal -// {"chunk_body": true} -> append chunk body (only in embedding_json input) -// +// =========================================================================== +// Concat Specification Evaluator +// =========================================================================== + +/** + * @brief Evaluate concat specification to build string + * + * Processes a concat specification (JSON array) to build a string by + * concatenating column values, literals, and optionally the chunk body. + * + * Supported concat elements: + * - {"col":"ColumnName"} - Include value from column + * - {"lit":"literal"} - Include literal string + * - {"chunk_body":true} - Include chunk body (only for embedding input) + * + * @param concat_spec JSON array with concat specification + * @param row Row data for column substitutions + * @param chunk_body Chunk body text (used if allow_chunk_body=true) + * @param allow_chunk_body Whether to allow chunk_body references + * @return std::string Concatenated result + */ static std::string eval_concat(const json& concat_spec, const RowMap& row, const std::string& chunk_body, @@ -365,12 +668,15 @@ static std::string eval_concat(const json& concat_spec, if (!part.is_object()) continue; if (part.contains("col")) { + // Column reference: {"col":"ColumnName"} std::string col = part["col"].get(); auto v = row_get(row, col); if (v.has_value()) out += v.value(); } else if (part.contains("lit")) { + // Literal string: {"lit":"some text"} out += part["lit"].get(); } else if (allow_chunk_body && part.contains("chunk_body")) { + // Chunk body reference: {"chunk_body":true} bool yes = part["chunk_body"].get(); if (yes) out += chunk_body; } @@ -378,16 +684,33 @@ static std::string eval_concat(const json& concat_spec, return out; } -// ------------------------- -// metadata builder -// ------------------------- -// metadata spec: -// "metadata": { "pick":[...], "rename":{...} } +// =========================================================================== +// Metadata Builder +// =========================================================================== + +/** + * @brief Build metadata JSON from row using specification + * + * Creates a metadata JSON object by picking specified columns from + * the row and optionally renaming keys. + * + * @param meta_spec Metadata specification with "pick" and "rename" keys + * @param row Source row data + * @return json Metadata object with picked and renamed fields + * + * The meta_spec format: + * @verbatim + * { + * "pick": ["Col1", "Col2"], // Columns to include + * "rename": {"Col1": "col1"} // Old name -> new name mapping + * } + * @endverbatim + */ static json build_metadata(const json& meta_spec, const RowMap& row) { json meta = json::object(); if (meta_spec.is_object()) { - // pick fields + // Pick specified fields if (meta_spec.contains("pick") && meta_spec["pick"].is_array()) { for (const auto& colv : meta_spec["pick"]) { if (!colv.is_string()) continue; @@ -397,7 +720,7 @@ static json build_metadata(const json& meta_spec, const RowMap& row) { } } - // rename keys + // Rename keys (must be done in two passes to avoid iterator invalidation) if (meta_spec.contains("rename") && meta_spec["rename"].is_object()) { std::vector> renames; for (auto it = meta_spec["rename"].begin(); it != meta_spec["rename"].end(); ++it) { @@ -418,10 +741,25 @@ static json build_metadata(const json& meta_spec, const RowMap& row) { return meta; } -// ------------------------- -// Chunking (chars-based) -// ------------------------- - +// =========================================================================== +// Text Chunking +// =========================================================================== + +/** + * @brief Split text into chunks based on character count + * + * Divides a text string into overlapping chunks of approximately + * chunk_size characters. Chunks overlap by 'overlap' characters to + * preserve context at boundaries. Tiny final chunks are appended + * to the previous chunk to avoid fragmentation. + * + * @param text Input text to chunk + * @param cfg Chunking configuration + * @return std::vector Vector of chunk strings + * + * @note If chunking is disabled, returns a single chunk containing the full text. + * @note If text is smaller than chunk_size, returns a single chunk. + */ static std::vector chunk_text_chars(const std::string& text, const ChunkingConfig& cfg) { std::vector chunks; @@ -458,10 +796,22 @@ static std::vector chunk_text_chars(const std::string& text, const return chunks; } -// ------------------------- -// MySQL helpers -// ------------------------- - +// =========================================================================== +// MySQL Backend Functions +// =========================================================================== + +/** + * @brief Connect to MySQL backend or terminate + * + * Establishes a connection to a MySQL server using the provided + * source configuration. Sets charset to utf8mb4 for proper handling + * of Unicode content (e.g., emojis, international text). + * + * @param s Source configuration with connection parameters + * @return MYSQL* MySQL connection handle + * + * @note On failure, prints error message and calls fatal() to exit. + */ static MYSQL* mysql_connect_or_die(const RagSource& s) { MYSQL* conn = mysql_init(nullptr); if (!conn) fatal("mysql_init failed"); @@ -484,6 +834,18 @@ static MYSQL* mysql_connect_or_die(const RagSource& s) { return conn; } +/** + * @brief Convert MySQL result row to RowMap + * + * Transforms a raw MySQL row into a column-name -> value map. + * All values are converted to strings; NULL values become empty strings. + * + * @param res MySQL result set (for field metadata) + * @param row Raw MySQL row data + * @return RowMap Map of column names to string values + * + * @note Uses str_or_empty() to safely handle NULL column values. + */ static RowMap mysql_row_to_map(MYSQL_RES* res, MYSQL_ROW row) { RowMap m; unsigned int n = mysql_num_fields(res); @@ -499,8 +861,19 @@ static RowMap mysql_row_to_map(MYSQL_RES* res, MYSQL_ROW row) { return m; } -// Collect columns used by doc_map_json + embedding_json so SELECT is minimal. -// v0: we intentionally keep this conservative (include pk + all referenced col parts + metadata.pick). +// =========================================================================== +// Column Collection +// =========================================================================== + +/** + * @brief Add string to vector if not already present + * + * Helper function to maintain a list of unique column names. + * Used when building the SELECT query to avoid duplicate columns. + * + * @param cols Vector of column names (modified in-place) + * @param c Column name to add (if not present) + */ static void add_unique(std::vector& cols, const std::string& c) { for (size_t i = 0; i < cols.size(); i++) { if (cols[i] == c) return; @@ -508,6 +881,15 @@ static void add_unique(std::vector& cols, const std::string& c) { cols.push_back(c); } +/** + * @brief Extract column names from concat specification + * + * Parses a concat specification and adds all referenced column names + * to the output vector. Used to build the minimal SELECT query. + * + * @param cols Vector of column names (modified in-place) + * @param concat_spec JSON concat specification to scan + */ static void collect_cols_from_concat(std::vector& cols, const json& concat_spec) { if (!concat_spec.is_array()) return; for (const auto& part : concat_spec) { @@ -517,6 +899,20 @@ static void collect_cols_from_concat(std::vector& cols, const json& } } +/** + * @brief Collect all columns needed for source ingestion + * + * Analyzes doc_map_json and embedding_json to determine which columns + * must be fetched from the source table. Builds a minimal SELECT query + * by only including required columns. + * + * @param s Source configuration with JSON specifications + * @param ecfg Embedding configuration + * @return std::vector List of required column names + * + * @note The primary key column is always included. + * @note Columns in doc_id.format must be manually included in metadata.pick or concat. + */ static std::vector collect_needed_columns(const RagSource& s, const EmbeddingConfig& ecfg) { std::vector cols; add_unique(cols, s.pk_column); @@ -540,12 +936,21 @@ static std::vector collect_needed_columns(const RagSource& s, const collect_cols_from_concat(cols, ecfg.input_spec["concat"]); } - // doc_id.format: we do not try to parse all placeholders; best practice is doc_id uses pk only. - // If you want doc_id.format to reference other columns, include them in metadata.pick or concat. - return cols; } +/** + * @brief Build SELECT SQL query for source + * + * Constructs a SELECT query that fetches only the required columns + * from the source table, with optional WHERE clause combining the + * source's where_sql and an incremental filter. + * + * @param s Source configuration + * @param cols List of column names to SELECT + * @param extra_filter Additional WHERE clause (e.g., incremental filter) + * @return std::string Complete SELECT SQL statement + */ static std::string build_select_sql(const RagSource& s, const std::vector& cols, const std::string& extra_filter) { @@ -566,9 +971,24 @@ static std::string build_select_sql(const RagSource& s, return sql; } -static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql); -static void sqlite_bind_text(sqlite3_stmt* st, int idx, const std::string& v); +// =========================================================================== +// Sync Cursor (Watermark) Management +// =========================================================================== +// Forward declarations +static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql); +static void bind_text(sqlite3_stmt* st, int idx, const std::string& v); + +/** + * @brief Load sync cursor JSON from database + * + * Retrieves the cursor_json for a source from rag_sync_state. + * Returns empty object if no cursor exists or on error. + * + * @param db SQLite database connection + * @param source_id Source identifier + * @return json Cursor JSON object (empty if not found) + */ static json load_sync_cursor_json(sqlite3* db, int source_id) { sqlite3_stmt* st = nullptr; json out = json::object(); @@ -593,6 +1013,16 @@ static json load_sync_cursor_json(sqlite3* db, int source_id) { return out; } +/** + * @brief Parse sync cursor JSON into SyncCursor struct + * + * Extracts cursor configuration from JSON and determines the + * value type (numeric vs string). Used for incremental ingestion. + * + * @param cursor_json JSON cursor configuration + * @param default_col Default column name if not specified + * @return SyncCursor Parsed cursor configuration + */ static SyncCursor parse_sync_cursor(const json& cursor_json, const std::string& default_col) { SyncCursor c; c.column = default_col; @@ -623,6 +1053,17 @@ static SyncCursor parse_sync_cursor(const json& cursor_json, const std::string& return c; } +/** + * @brief Build incremental filter SQL from cursor + * + * Constructs a WHERE clause fragment for incremental ingestion. + * Returns empty string if cursor has no value set. + * + * @param c Sync cursor configuration + * @return std::string SQL filter (e.g., "`Id` > 123") or empty string + * + * @note String values are escaped to prevent SQL injection. + */ static std::string build_incremental_filter(const SyncCursor& c) { if (!c.has_value || c.column.empty()) return ""; std::string col = "`" + c.column + "`"; @@ -632,6 +1073,18 @@ static std::string build_incremental_filter(const SyncCursor& c) { return col + " > '" + sql_escape_single_quotes(c.str_value) + "'"; } +/** + * @brief Update sync state in database + * + * Upserts the cursor state into rag_sync_state after successful + * (or partial) ingestion. Updates the watermark value and last_ok_at. + * + * @param db SQLite database connection + * @param source_id Source identifier + * @param cursor_json Updated cursor JSON to store + * + * @note On error, calls fatal() to terminate the program. + */ static void update_sync_state(sqlite3* db, int source_id, const json& cursor_json) { const char* sql = "INSERT INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) " @@ -642,7 +1095,7 @@ static void update_sync_state(sqlite3* db, int source_id, const json& cursor_jso sqlite_prepare_or_die(db, &st, sql); sqlite3_bind_int(st, 1, source_id); std::string cursor_str = json_dump_compact(cursor_json); - sqlite_bind_text(st, 2, cursor_str); + bind_text(st, 2, cursor_str); int rc = sqlite3_step(st); sqlite3_finalize(st); if (rc != SQLITE_DONE) { @@ -650,24 +1103,49 @@ static void update_sync_state(sqlite3* db, int source_id, const json& cursor_jso } } -// ------------------------- -// SQLite prepared statements (batched insertion) -// ------------------------- +// =========================================================================== +// SQLite Prepared Statements +// =========================================================================== +/** + * @brief Collection of prepared SQLite statements + * + * Holds all prepared statements needed for ingestion. + * Prepared statements are reused for efficiency and SQL injection safety. + */ struct SqliteStmts { - sqlite3_stmt* doc_exists = nullptr; - sqlite3_stmt* ins_doc = nullptr; - sqlite3_stmt* ins_chunk = nullptr; - sqlite3_stmt* ins_fts = nullptr; - sqlite3_stmt* ins_vec = nullptr; // optional (only used if embedding enabled) + sqlite3_stmt* doc_exists = nullptr; ///< Check if document exists + sqlite3_stmt* ins_doc = nullptr; ///< Insert document + sqlite3_stmt* ins_chunk = nullptr; ///< Insert chunk + sqlite3_stmt* ins_fts = nullptr; ///< Insert into FTS index + sqlite3_stmt* ins_vec = nullptr; ///< Insert vector embedding }; +/** + * @brief Prepare SQLite statement or terminate + * + * Prepares a SQLite statement and calls fatal() on failure. + * Used for statements that must succeed for the program to continue. + * + * @param db SQLite database connection + * @param st Output parameter for prepared statement handle + * @param sql SQL statement to prepare + * + * @note On failure, prints error and calls fatal() to exit. + */ static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql) { if (sqlite3_prepare_v2(db, sql, -1, st, nullptr) != SQLITE_OK) { fatal(std::string("SQLite prepare failed: ") + sqlite3_errmsg(db) + "\nSQL: " + sql); } } +/** + * @brief Finalize all statements in SqliteStmts + * + * Releases all prepared statement resources. + * + * @param s Statement collection to finalize (reset to default state) + */ static void sqlite_finalize_all(SqliteStmts& s) { if (s.doc_exists) sqlite3_finalize(s.doc_exists); if (s.ins_doc) sqlite3_finalize(s.ins_doc); @@ -677,30 +1155,76 @@ static void sqlite_finalize_all(SqliteStmts& s) { s = SqliteStmts{}; } -static void sqlite_bind_text(sqlite3_stmt* st, int idx, const std::string& v) { +/** + * @brief Bind text parameter to SQLite statement + * + * Binds a std::string value to a prepared statement parameter. + * Uses SQLITE_TRANSIENT for string management (SQLite makes a copy). + * + * @param st Prepared statement + * @param idx Parameter index (1-based) + * @param v String value to bind + */ +static void bind_text(sqlite3_stmt* st, int idx, const std::string& v) { sqlite3_bind_text(st, idx, v.c_str(), -1, SQLITE_TRANSIENT); } -// Best-effort binder for sqlite3-vec embeddings (float32 array). -// If your sqlite3-vec build expects a different encoding, change this function only. +/** + * @brief Bind vector embedding to SQLite statement + * + * Binds a float32 array as a BLOB for sqlite3-vec. + * This is the "best effort" binding format that works with most + * sqlite3-vec builds. Modify if your build expects different format. + * + * @param st Prepared statement + * @param idx Parameter index (1-based) + * @param emb Float vector to bind + * + * @note The binding format is build-dependent. If vec operations fail, + * check your sqlite3-vec build's expected format. + */ static void bind_vec_embedding(sqlite3_stmt* st, int idx, const std::vector& emb) { const void* data = (const void*)emb.data(); int bytes = (int)(emb.size() * sizeof(float)); sqlite3_bind_blob(st, idx, data, bytes, SQLITE_TRANSIENT); } -// Check if doc exists +/** + * @brief Check if document exists in database + * + * Tests whether a document with the given doc_id already exists + * in rag_documents. Used for skip-if-exists logic in v0. + * + * @param ss Statement collection + * @param doc_id Document identifier to check + * @return true if document exists, false otherwise + */ static bool sqlite_doc_exists(SqliteStmts& ss, const std::string& doc_id) { sqlite3_reset(ss.doc_exists); sqlite3_clear_bindings(ss.doc_exists); - sqlite_bind_text(ss.doc_exists, 1, doc_id); + bind_text(ss.doc_exists, 1, doc_id); int rc = sqlite3_step(ss.doc_exists); return (rc == SQLITE_ROW); } -// Insert doc +/** + * @brief Insert document into rag_documents table + * + * Inserts a new document record with all metadata. + * + * @param ss Statement collection + * @param source_id Source identifier + * @param source_name Source name + * @param doc_id Document identifier + * @param pk_json Primary key JSON (for refetch) + * @param title Document title + * @param body Document body + * @param meta_json Metadata JSON object + * + * @note On failure, calls fatal() to terminate. + */ static void sqlite_insert_doc(SqliteStmts& ss, int source_id, const std::string& source_name, @@ -712,13 +1236,13 @@ static void sqlite_insert_doc(SqliteStmts& ss, sqlite3_reset(ss.ins_doc); sqlite3_clear_bindings(ss.ins_doc); - sqlite_bind_text(ss.ins_doc, 1, doc_id); + bind_text(ss.ins_doc, 1, doc_id); sqlite3_bind_int(ss.ins_doc, 2, source_id); - sqlite_bind_text(ss.ins_doc, 3, source_name); - sqlite_bind_text(ss.ins_doc, 4, pk_json); - sqlite_bind_text(ss.ins_doc, 5, title); - sqlite_bind_text(ss.ins_doc, 6, body); - sqlite_bind_text(ss.ins_doc, 7, meta_json); + bind_text(ss.ins_doc, 3, source_name); + bind_text(ss.ins_doc, 4, pk_json); + bind_text(ss.ins_doc, 5, title); + bind_text(ss.ins_doc, 6, body); + bind_text(ss.ins_doc, 7, meta_json); int rc = sqlite3_step(ss.ins_doc); if (rc != SQLITE_DONE) { @@ -726,7 +1250,22 @@ static void sqlite_insert_doc(SqliteStmts& ss, } } -// Insert chunk +/** + * @brief Insert chunk into rag_chunks table + * + * Inserts a new chunk record with title, body, and metadata. + * + * @param ss Statement collection + * @param chunk_id Chunk identifier (typically doc_id#index) + * @param doc_id Parent document identifier + * @param source_id Source identifier + * @param chunk_index Zero-based chunk index within document + * @param title Chunk title + * @param body Chunk body text + * @param meta_json Chunk metadata JSON + * + * @note On failure, calls fatal() to terminate. + */ static void sqlite_insert_chunk(SqliteStmts& ss, const std::string& chunk_id, const std::string& doc_id, @@ -738,13 +1277,13 @@ static void sqlite_insert_chunk(SqliteStmts& ss, sqlite3_reset(ss.ins_chunk); sqlite3_clear_bindings(ss.ins_chunk); - sqlite_bind_text(ss.ins_chunk, 1, chunk_id); - sqlite_bind_text(ss.ins_chunk, 2, doc_id); + bind_text(ss.ins_chunk, 1, chunk_id); + bind_text(ss.ins_chunk, 2, doc_id); sqlite3_bind_int(ss.ins_chunk, 3, source_id); sqlite3_bind_int(ss.ins_chunk, 4, chunk_index); - sqlite_bind_text(ss.ins_chunk, 5, title); - sqlite_bind_text(ss.ins_chunk, 6, body); - sqlite_bind_text(ss.ins_chunk, 7, meta_json); + bind_text(ss.ins_chunk, 5, title); + bind_text(ss.ins_chunk, 6, body); + bind_text(ss.ins_chunk, 7, meta_json); int rc = sqlite3_step(ss.ins_chunk); if (rc != SQLITE_DONE) { @@ -752,7 +1291,20 @@ static void sqlite_insert_chunk(SqliteStmts& ss, } } -// Insert into FTS +/** + * @brief Insert chunk into FTS index + * + * Inserts a chunk into the rag_fts_chunks FTS5 table for + * full-text search. The FTS table is contentless (text stored + * only in rag_chunks, FTS only maintains index). + * + * @param ss Statement collection + * @param chunk_id Chunk identifier + * @param title Chunk title + * @param body Chunk body text + * + * @note On failure, calls fatal() to terminate. + */ static void sqlite_insert_fts(SqliteStmts& ss, const std::string& chunk_id, const std::string& title, @@ -760,9 +1312,9 @@ static void sqlite_insert_fts(SqliteStmts& ss, sqlite3_reset(ss.ins_fts); sqlite3_clear_bindings(ss.ins_fts); - sqlite_bind_text(ss.ins_fts, 1, chunk_id); - sqlite_bind_text(ss.ins_fts, 2, title); - sqlite_bind_text(ss.ins_fts, 3, body); + bind_text(ss.ins_fts, 1, chunk_id); + bind_text(ss.ins_fts, 2, title); + bind_text(ss.ins_fts, 3, body); int rc = sqlite3_step(ss.ins_fts); if (rc != SQLITE_DONE) { @@ -770,8 +1322,24 @@ static void sqlite_insert_fts(SqliteStmts& ss, } } -// Insert vector row (sqlite3-vec) -// Schema: rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) +/** + * @brief Insert vector embedding into rag_vec_chunks table + * + * Inserts a vector embedding along with metadata for vector + * similarity search using sqlite3-vec. + * + * Schema: rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) + * + * @param ss Statement collection + * @param emb Float32 vector embedding + * @param chunk_id Chunk identifier + * @param doc_id Parent document identifier + * @param source_id Source identifier + * @param updated_at_unixepoch Unix epoch timestamp (0 for "now") + * + * @note If ss.ins_vec is null (embedding disabled), returns without error. + * @note On failure, calls fatal() with vec-specific error message. + */ static void sqlite_insert_vec(SqliteStmts& ss, const std::vector& emb, const std::string& chunk_id, @@ -784,48 +1352,57 @@ static void sqlite_insert_vec(SqliteStmts& ss, sqlite3_clear_bindings(ss.ins_vec); bind_vec_embedding(ss.ins_vec, 1, emb); - sqlite_bind_text(ss.ins_vec, 2, chunk_id); - sqlite_bind_text(ss.ins_vec, 3, doc_id); + bind_text(ss.ins_vec, 2, chunk_id); + bind_text(ss.ins_vec, 3, doc_id); sqlite3_bind_int(ss.ins_vec, 4, source_id); sqlite3_bind_int64(ss.ins_vec, 5, (sqlite3_int64)updated_at_unixepoch); int rc = sqlite3_step(ss.ins_vec); if (rc != SQLITE_DONE) { - // In practice, sqlite3-vec may return errors if binding format is wrong. - // Keep the message loud and actionable. + // Vec-specific error message for troubleshooting fatal(std::string("SQLite insert rag_vec_chunks failed (check vec binding format): ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_vec))); } } -// ------------------------- -// Embedding stub -// ------------------------- -// This function is a placeholder. It returns a deterministic pseudo-embedding from the text. -// Replace it with a real embedding model call in ProxySQL later. -// -// Why deterministic? -// - Helps test end-to-end ingestion + vector SQL without needing an ML runtime. -// - Keeps behavior stable across runs. -// +// =========================================================================== +// Embedding Generation +// =========================================================================== + +/** + * @brief Generate deterministic pseudo-embedding from text + * + * This is a stub implementation that generates a deterministic but + * non-semantic embedding from input text. Used for testing when + * a real embedding service is not available. + * + * The algorithm uses a rolling hash to distribute values across + * vector dimensions, then normalizes. Results are deterministic + * (same input always produces same output) but NOT semantic. + * + * @param text Input text to "embed" + * @param dim Vector dimension + * @return std::vector Deterministic pseudo-embedding + * + * @warning This does NOT produce semantic embeddings. Only for testing. + */ static std::vector pseudo_embedding(const std::string& text, int dim) { std::vector v; v.resize((size_t)dim, 0.0f); - // Simple rolling hash-like accumulation into float bins. - // NOT a semantic embedding; only for wiring/testing. - std::uint64_t h = 1469598103934665603ULL; + // FNV-1a-like rolling hash accumulation into float bins + std::uint64_t h = 1469598103934665603ULL; // FNV offset basis for (size_t i = 0; i < text.size(); i++) { h ^= (unsigned char)text[i]; - h *= 1099511628211ULL; + h *= 1099511628211ULL; // FNV prime - // Spread influence into bins + // Spread influence into bins based on hash size_t idx = (size_t)(h % (std::uint64_t)dim); - float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; // 0..1 - v[idx] += (val - 0.5f); + float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; // Normalize to 0..1 + v[idx] += (val - 0.5f); // Center around 0 } - // Very rough normalization + // L2 normalization double norm = 0.0; for (int i = 0; i < dim; i++) norm += (double)v[(size_t)i] * (double)v[(size_t)i]; norm = std::sqrt(norm); @@ -835,16 +1412,47 @@ static std::vector pseudo_embedding(const std::string& text, int dim) { return v; } -// ------------------------- -// Embedding providers -// ------------------------- - +/** + * @brief Abstract embedding provider interface + * + * Base class for embedding providers. Concrete implementations + * include StubEmbeddingProvider and OpenAIEmbeddingProvider. + */ struct EmbeddingProvider { + /** + * @brief Virtual destructor for safe polymorphic deletion + */ virtual ~EmbeddingProvider() = default; + + /** + * @brief Generate embeddings for multiple input strings + * + * @param inputs Vector of input texts to embed + * @param dim Expected embedding dimension + * @return std::vector> Vector of embeddings (one per input) + * + * @note The returned vector must have exactly inputs.size() embeddings. + * @note Each embedding must have exactly dim float values. + * + * @throws std::runtime_error on embedding generation failure + */ virtual std::vector> embed(const std::vector& inputs, int dim) = 0; }; +/** + * @brief Stub embedding provider for testing + * + * Uses pseudo_embedding() to generate deterministic but non-semantic + * embeddings. Used when no real embedding service is available. + */ struct StubEmbeddingProvider : public EmbeddingProvider { + /** + * @brief Generate pseudo-embeddings for inputs + * + * @param inputs Vector of input texts + * @param dim Embedding dimension + * @return std::vector> Deterministic pseudo-embeddings + */ std::vector> embed(const std::vector& inputs, int dim) override { std::vector> out; out.reserve(inputs.size()); @@ -853,10 +1461,28 @@ struct StubEmbeddingProvider : public EmbeddingProvider { } }; +/** + * @brief Buffer for curl HTTP response data + * + * Simple string buffer used by curl write callback to accumulate + * HTTP response data. + */ struct CurlBuffer { - std::string data; + std::string data; ///< Accumulated response data }; +/** + * @brief Curl write callback function + * + * Callback function passed to libcurl to handle incoming response data. + * Appends received data to the CurlBuffer. + * + * @param contents Pointer to received data + * @param size Size of each data element + * @param nmemb Number of elements + * @param userp User pointer (must point to CurlBuffer) + * @return size_t Total bytes processed + */ static size_t curl_write_cb(void* contents, size_t size, size_t nmemb, void* userp) { size_t total = size * nmemb; CurlBuffer* buf = static_cast(userp); @@ -864,16 +1490,46 @@ static size_t curl_write_cb(void* contents, size_t size, size_t nmemb, void* use return total; } +/** + * @brief OpenAI-compatible embedding provider + * + * Calls an OpenAI-compatible HTTP API to generate embeddings. + * Works with OpenAI, Azure OpenAI, or any compatible service. + */ struct OpenAIEmbeddingProvider : public EmbeddingProvider { - std::string api_base; - std::string api_key; - std::string model; - int timeout_ms = 20000; - + std::string api_base; ///< API base URL (e.g., "https://api.openai.com/v1") + std::string api_key; ///< API authentication key + std::string model; ///< Model name (e.g., "text-embedding-3-large") + int timeout_ms = 20000; ///< Request timeout in milliseconds + + /** + * @brief Construct OpenAI embedding provider + * + * @param base API base URL + * @param key API key + * @param mdl Model name + * @param timeout Request timeout + */ OpenAIEmbeddingProvider(std::string base, std::string key, std::string mdl, int timeout) : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {} + /** + * @brief Generate embeddings via OpenAI-compatible API + * + * Sends a batch of texts to the embedding API and returns + * the generated embeddings. + * + * @param inputs Vector of input texts to embed + * @param dim Expected embedding dimension + * @return std::vector> Generated embeddings + * + * @throws std::runtime_error on API error, timeout, or invalid response + * + * @note The inputs are sent in a single batch. For large batches, + * consider splitting into multiple calls. + */ std::vector> embed(const std::vector& inputs, int dim) override { + // Validate configuration if (api_base.empty()) { throw std::runtime_error("embedding api_base is empty"); } @@ -883,10 +1539,12 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { if (model.empty()) { throw std::runtime_error("embedding model is empty"); } + // Note: Some providers expect "hf:" prefix for HuggingFace models if (model.rfind("hf:", 0) != 0) { std::cerr << "WARN: embedding model should be prefixed with 'hf:' per Synthetic docs\n"; } + // Build request JSON json req; req["model"] = model; req["input"] = inputs; @@ -895,10 +1553,12 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { } std::string body = req.dump(); + // Build URL std::string url = api_base; if (!url.empty() && url.back() == '/') url.pop_back(); url += "/embeddings"; + // Execute HTTP request CURL* curl = curl_easy_init(); if (!curl) { throw std::runtime_error("curl_easy_init failed"); @@ -926,6 +1586,7 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { curl_slist_free_all(headers); curl_easy_cleanup(curl); + // Check for errors if (res != CURLE_OK) { throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res)); } @@ -933,11 +1594,13 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { throw std::runtime_error("embedding request failed with status " + std::to_string(status)); } + // Parse response json resp = json::parse(buf.data); if (!resp.contains("data") || !resp["data"].is_array()) { throw std::runtime_error("embedding response missing data array"); } + // Extract embeddings std::vector> out; out.reserve(resp["data"].size()); for (const auto& item : resp["data"]) { @@ -963,6 +1626,18 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { } }; +/** + * @brief Build embedding provider from configuration + * + * Factory function that creates the appropriate embedding provider + * based on the configuration. + * + * @param cfg Embedding configuration + * @return std::unique_ptr Configured provider instance + * + * @note Currently supports "stub" and "openai" providers. + * "stub" is returned for unknown provider types. + */ static std::unique_ptr build_embedding_provider(const EmbeddingConfig& cfg) { if (cfg.provider == "openai") { return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms); @@ -970,10 +1645,22 @@ static std::unique_ptr build_embedding_provider(const Embeddi return std::make_unique(); } -// ------------------------- -// Load rag_sources from SQLite -// ------------------------- - +// =========================================================================== +// Source Loading +// =========================================================================== + +/** + * @brief Load enabled sources from rag_sources table + * + * Queries the rag_sources table and parses all enabled sources + * into RagSource structs. Validates JSON configurations. + * + * @param db SQLite database connection + * @return std::vector Vector of enabled source configurations + * + * @note On JSON parsing error, calls fatal() to terminate. + * @note Only sources with enabled=1 are loaded. + */ static std::vector load_sources(sqlite3* db) { std::vector out; @@ -1035,37 +1722,54 @@ static std::vector load_sources(sqlite3* db) { return out; } -// ------------------------- -// Build a canonical document from a source row -// ------------------------- +// =========================================================================== +// Document Building +// =========================================================================== +/** + * @brief Canonical document representation + * + * Contains all fields of a document in its canonical form, + * ready for insertion into rag_documents and chunking. + */ struct BuiltDoc { - std::string doc_id; - std::string pk_json; - std::string title; - std::string body; - std::string metadata_json; + std::string doc_id; ///< Unique document identifier + std::string pk_json; ///< Primary key JSON (for refetch) + std::string title; ///< Document title + std::string body; ///< Document body (to be chunked) + std::string metadata_json; ///< Metadata JSON object }; +/** + * @brief Build canonical document from source row + * + * Transforms a raw backend row into a canonical document using + * the doc_map_json specification. Handles doc_id formatting, + * title/body concatenation, and metadata building. + * + * @param src Source configuration with doc_map_json + * @param row Raw row data from backend + * @return BuiltDoc Canonical document representation + */ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) { BuiltDoc d; - // doc_id + // Build doc_id from format template if (src.doc_map_json.contains("doc_id") && src.doc_map_json["doc_id"].is_object() && src.doc_map_json["doc_id"].contains("format") && src.doc_map_json["doc_id"]["format"].is_string()) { d.doc_id = apply_format(src.doc_map_json["doc_id"]["format"].get(), row); } else { - // fallback: table:pk + // Fallback: table:pk format auto pk = row_get(row, src.pk_column).value_or(""); d.doc_id = src.table_name + ":" + pk; } - // pk_json (refetch pointer) + // Build pk_json (refetch pointer) json pk = json::object(); pk[src.pk_column] = row_get(row, src.pk_column).value_or(""); d.pk_json = json_dump_compact(pk); - // title/body + // Build title from concat spec if (src.doc_map_json.contains("title") && src.doc_map_json["title"].is_object() && src.doc_map_json["title"].contains("concat")) { d.title = eval_concat(src.doc_map_json["title"]["concat"], row, "", false); @@ -1073,6 +1777,7 @@ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) d.title = ""; } + // Build body from concat spec if (src.doc_map_json.contains("body") && src.doc_map_json["body"].is_object() && src.doc_map_json["body"].contains("concat")) { d.body = eval_concat(src.doc_map_json["body"]["concat"], row, "", false); @@ -1080,7 +1785,7 @@ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) d.body = ""; } - // metadata_json + // Build metadata JSON json meta = json::object(); if (src.doc_map_json.contains("metadata")) { meta = build_metadata(src.doc_map_json["metadata"], row); @@ -1090,10 +1795,21 @@ static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) return d; } -// ------------------------- -// Embedding input builder (optional) -// ------------------------- - +// =========================================================================== +// Embedding Input Builder +// =========================================================================== + +/** + * @brief Build embedding input text for a chunk + * + * Constructs the text to be embedded by applying the embedding_json + * input specification. Typically includes title, tags, and chunk body. + * + * @param ecfg Embedding configuration + * @param row Source row data (for column values) + * @param chunk_body Chunk body text + * @return std::string Text to embed (empty if embedding disabled) + */ static std::string build_embedding_input(const EmbeddingConfig& ecfg, const RowMap& row, const std::string& chunk_body) { @@ -1107,10 +1823,23 @@ static std::string build_embedding_input(const EmbeddingConfig& ecfg, return chunk_body; } -// ------------------------- -// Ingest one source -// ------------------------- - +// =========================================================================== +// Statement Preparation +// =========================================================================== + +/** + * @brief Prepare all SQLite statements for ingestion + * + * Creates prepared statements for document existence check, + * document insertion, chunk insertion, FTS insertion, and + * optionally vector insertion. + * + * @param db SQLite database connection + * @param want_vec Whether to prepare vector insert statement + * @return SqliteStmts Collection of prepared statements + * + * @note On failure, calls fatal() to terminate. + */ static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { SqliteStmts ss; @@ -1134,7 +1863,6 @@ static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { // Insert vector (optional) if (want_vec) { - // NOTE: If your sqlite3-vec build expects different binding format, adapt bind_vec_embedding(). sqlite_prepare_or_die(db, &ss.ins_vec, "INSERT INTO rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) " "VALUES(?,?,?,?,?)"); @@ -1143,18 +1871,41 @@ static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { return ss; } +// =========================================================================== +// Source Ingestion +// =========================================================================== + +/** + * @brief Ingest data from a single source + * + * Main ingestion function for a single source. Performs the following: + * 1. Parse chunking and embedding configurations + * 2. Load sync cursor (watermark) + * 3. Connect to backend + * 4. Build and execute SELECT query with incremental filter + * 5. For each row: build document, check existence, insert, chunk, embed + * 6. Update sync cursor with max watermark value + * + * @param sdb SQLite database connection + * @param src Source configuration + * + * @note Only "mysql" backend_type is supported in v0. + * @note Skips documents that already exist (no update in v0). + * @note All inserts happen in a single transaction (managed by caller). + */ static void ingest_source(sqlite3* sdb, const RagSource& src) { std::cerr << "Ingesting source_id=" << src.source_id << " name=" << src.name << " backend=" << src.backend_type << " table=" << src.table_name << "\n"; + // v0 only supports MySQL if (src.backend_type != "mysql") { std::cerr << " Skipping: backend_type not supported in v0.\n"; return; } - // Parse chunking & embedding config + // Parse configurations ChunkingConfig ccfg = parse_chunking_json(src.chunking_json); EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json); std::unique_ptr embedder; @@ -1162,17 +1913,17 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { embedder = build_embedding_provider(ecfg); } - // Load sync cursor (watermark) + // Load sync cursor (watermark for incremental sync) json cursor_json = load_sync_cursor_json(sdb, src.source_id); SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column); - // Prepare SQLite statements for this run + // Prepare SQLite statements SqliteStmts ss = prepare_sqlite_statements(sdb, ecfg.enabled); - // Connect MySQL + // Connect to MySQL MYSQL* mdb = mysql_connect_or_die(src); - // Build SELECT (include watermark column if needed) + // Build SELECT with incremental filter std::vector cols = collect_needed_columns(src, ecfg); if (!cursor.column.empty()) add_unique(cols, cursor.column); std::string extra_filter = build_incremental_filter(cursor); @@ -1196,21 +1947,25 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { std::uint64_t ingested_docs = 0; std::uint64_t skipped_docs = 0; + // Track max watermark value (for next run) MYSQL_ROW r; bool max_set = false; bool max_numeric = false; std::int64_t max_num = 0; std::string max_str; + + // Process each row while ((r = mysql_fetch_row(res)) != nullptr) { RowMap row = mysql_row_to_map(res, r); - // Track max watermark value from source rows (even if doc is skipped) + // Track max watermark value (even for skipped docs) if (!cursor.column.empty()) { auto it = row.find(cursor.column); if (it != row.end()) { const std::string& v = it->second; if (!v.empty()) { if (!max_set) { + // First value - determine type and store if (cursor.numeric || is_integer_string(v)) { try { max_numeric = true; @@ -1230,6 +1985,7 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { } max_set = true; } else if (max_numeric) { + // Already numeric - try to compare as number if (is_integer_string(v)) { try { std::int64_t nv = std::stoll(v); @@ -1243,15 +1999,17 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { } } } else { + // Already string - lexicographic comparison if (v > max_str) max_str = v; } } } } + // Build document from row BuiltDoc doc = build_document_from_row(src, row); - // v0: skip if exists + // v0: Skip if document already exists if (sqlite_doc_exists(ss, doc.doc_id)) { skipped_docs++; continue; @@ -1261,30 +2019,32 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { sqlite_insert_doc(ss, src.source_id, src.name, doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json); - // Chunk and insert chunks + FTS (+ optional vec) + // Chunk document body std::vector chunks = chunk_text_chars(doc.body, ccfg); - // Use SQLite's unixepoch() for updated_at normally; vec table also stores updated_at as unix epoch. - // Here we store a best-effort "now" from SQLite (unixepoch()) would require a query; instead store 0 - // or a local time. For v0, we store 0 and let schema default handle other tables. - // If you want accuracy, query SELECT unixepoch() once per run and reuse it. + // Note: updated_at is set to 0 for v0. For accurate timestamps, + // query SELECT unixepoch() once at the start of main(). std::int64_t now_epoch = 0; + // Process each chunk for (size_t i = 0; i < chunks.size(); i++) { std::string chunk_id = doc.doc_id + "#" + std::to_string(i); - // Chunk metadata (minimal) + // Chunk metadata (minimal - just index) json cmeta = json::object(); cmeta["chunk_index"] = (int)i; - std::string chunk_title = doc.title; // simple: repeat doc title + // Use document title as chunk title (simple approach) + std::string chunk_title = doc.title; + // Insert chunk sqlite_insert_chunk(ss, chunk_id, doc.doc_id, src.source_id, (int)i, chunk_title, chunks[i], json_dump_compact(cmeta)); + // Insert into FTS index sqlite_insert_fts(ss, chunk_id, chunk_title, chunks[i]); - // Optional vectors + // Generate and insert embedding (if enabled) if (ecfg.enabled) { std::string emb_input = build_embedding_input(ecfg, row, chunks[i]); std::vector batch_inputs = {emb_input}; @@ -1300,10 +2060,12 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { } } + // Cleanup mysql_free_result(res); mysql_close(mdb); sqlite_finalize_all(ss); + // Update sync cursor with max watermark value if (!cursor_json.is_object()) cursor_json = json::object(); if (!cursor.column.empty()) cursor_json["column"] = cursor.column; if (max_set) { @@ -1320,34 +2082,62 @@ static void ingest_source(sqlite3* sdb, const RagSource& src) { << " skipped_docs=" << skipped_docs << "\n"; } -// ------------------------- -// Main -// ------------------------- - +// =========================================================================== +// Main Entry Point +// =========================================================================== + +/** + * @brief Main entry point for RAG ingestion tool + * + * Entry point for the RAG ingestion tool. Orchestrates the entire + * ingestion process: + * + * 1. Validates command-line arguments + * 2. Initializes libcurl + * 3. Opens SQLite RAG index database + * 4. Loads sqlite3-vec extension (if configured) + * 5. Sets SQLite pragmas (foreign keys, WAL mode) + * 6. Begins transaction + * 7. Loads and ingests all enabled sources + * 8. Commits or rolls back transaction + * 9. Cleanup and exit + * + * @param argc Argument count + * @param argv Argument values + * @return int Exit code (0 on success, 1 on error, 2 on usage error) + * + * Usage: + * @verbatim + * ./rag_ingest /path/to/rag_index.sqlite + * @endverbatim + */ int main(int argc, char** argv) { + // Validate arguments if (argc != 2) { std::cerr << "Usage: " << argv[0] << " \n"; return 2; } + // Initialize libcurl (for embedding API calls) curl_global_init(CURL_GLOBAL_DEFAULT); const char* sqlite_path = argv[1]; + // Open SQLite database sqlite3* db = nullptr; if (sqlite3_open(sqlite_path, &db) != SQLITE_OK) { fatal("Could not open SQLite DB: " + std::string(sqlite_path)); } - // Load vec0 if configured (needed for rag_vec_chunks inserts) + // Load sqlite3-vec extension if configured sqlite_load_vec_extension(db); - // Pragmas (safe defaults) + // Set safe SQLite pragmas sqlite_exec(db, "PRAGMA foreign_keys = ON;"); sqlite_exec(db, "PRAGMA journal_mode = WAL;"); sqlite_exec(db, "PRAGMA synchronous = NORMAL;"); - // Single transaction for speed + // Begin single transaction for speed if (sqlite_exec(db, "BEGIN IMMEDIATE;") != SQLITE_OK) { sqlite3_close(db); fatal("Failed to begin transaction"); @@ -1355,6 +2145,7 @@ int main(int argc, char** argv) { bool ok = true; try { + // Load and ingest all enabled sources std::vector sources = load_sources(db); if (sources.empty()) { std::cerr << "No enabled sources found in rag_sources.\n"; @@ -1370,6 +2161,7 @@ int main(int argc, char** argv) { ok = false; } + // Commit or rollback if (ok) { if (sqlite_exec(db, "COMMIT;") != SQLITE_OK) { sqlite_exec(db, "ROLLBACK;"); @@ -1383,8 +2175,8 @@ int main(int argc, char** argv) { return 1; } + // Cleanup sqlite3_close(db); curl_global_cleanup(); return 0; } -