diff --git a/RAG_POC/architecture-data-model.md b/RAG_POC/architecture-data-model.md new file mode 100644 index 000000000..0c672bcee --- /dev/null +++ b/RAG_POC/architecture-data-model.md @@ -0,0 +1,384 @@ +# ProxySQL RAG Index — Data Model & Ingestion Architecture (v0 Blueprint) + +This document explains the SQLite data model used to turn relational tables (e.g. MySQL `posts`) into a retrieval-friendly index hosted inside ProxySQL. It focuses on: + +- What each SQLite table does +- How tables relate to each other +- How `rag_sources` defines **explicit mapping rules** (no guessing) +- How ingestion transforms rows into documents and chunks +- How FTS and vector indexes are maintained +- What evolves later for incremental sync and updates + +--- + +## 1. Goal and core idea + +Relational databases are excellent for structured queries, but RAG-style retrieval needs: + +- Fast keyword search (error messages, identifiers, tags) +- Fast semantic search (similar meaning, paraphrased questions) +- A stable way to “refetch the authoritative data” from the source DB + +The model below implements a **canonical document layer** inside ProxySQL: + +1. Ingest selected rows from a source database (MySQL, PostgreSQL, etc.) +2. Convert each row into a **document** (title/body + metadata) +3. Split long bodies into **chunks** +4. Index chunks in: + - **FTS5** for keyword search + - **sqlite3-vec** for vector similarity +5. Serve retrieval through stable APIs (MCP or SQL), independent of where indexes physically live in the future + +--- + +## 2. The SQLite tables (what they are and why they exist) + +### 2.1 `rag_sources` — control plane: “what to ingest and how” + +**Purpose** +- Defines each ingestion source (a table or view in an external DB) +- Stores *explicit* transformation rules: + - which columns become `title`, `body` + - which columns go into `metadata_json` + - how to build `doc_id` +- Stores chunking strategy and embedding strategy configuration + +**Key columns** +- `backend_*`: how to connect (v0 connects directly; later may be “via ProxySQL”) +- `table_name`, `pk_column`: what to ingest +- `where_sql`: optional restriction (e.g. only questions) +- `doc_map_json`: mapping rules (required) +- `chunking_json`: chunking rules (required) +- `embedding_json`: embedding rules (optional) + +**Important**: `rag_sources` is the **only place** that defines mapping logic. +A general-purpose ingester must never “guess” which fields belong to `body` or metadata. + +--- + +### 2.2 `rag_documents` — canonical documents: “one per source row” + +**Purpose** +- Represents the canonical document created from a single source row. +- Stores: + - a stable identifier (`doc_id`) + - a refetch pointer (`pk_json`) + - document text (`title`, `body`) + - structured metadata (`metadata_json`) + +**Why store full `body` here?** +- Enables re-chunking later without re-fetching from the source DB. +- Makes debugging and inspection easier. +- Supports future update detection and diffing. + +**Key columns** +- `doc_id` (PK): stable across runs and machines (e.g. `"posts:12345"`) +- `source_id`: ties back to `rag_sources` +- `pk_json`: how to refetch the authoritative row later (e.g. `{"Id":12345}`) +- `title`, `body`: canonical text +- `metadata_json`: non-text signals used for filters/boosting +- `updated_at`, `deleted`: lifecycle fields for incremental sync later + +--- + +### 2.3 `rag_chunks` — retrieval units: “one or many per document” + +**Purpose** +- Stores chunked versions of a document’s text. +- Retrieval and embeddings are performed at the chunk level for better quality. + +**Why chunk at all?** +- Long bodies reduce retrieval quality: + - FTS returns large documents where only a small part is relevant + - Vector embeddings of large texts smear multiple topics together +- Chunking yields: + - better precision + - better citations (“this chunk”) and smaller context + - cheaper updates (only re-embed changed chunks later) + +**Key columns** +- `chunk_id` (PK): stable, derived from doc_id + chunk index (e.g. `"posts:12345#0"`) +- `doc_id` (FK): parent document +- `source_id`: convenience for filtering without joining documents +- `chunk_index`: 0..N-1 +- `title`, `body`: chunk text (often title repeated for context) +- `metadata_json`: optional chunk-level metadata (offsets, “has_code”, section label) +- `updated_at`, `deleted`: lifecycle for later incremental sync + +--- + +### 2.4 `rag_fts_chunks` — FTS5 index (contentless) + +**Purpose** +- Keyword search index for chunks. +- Best for: + - exact terms + - identifiers + - error messages + - tags and code tokens (depending on tokenization) + +**Design choice: contentless FTS** +- The FTS virtual table does not automatically mirror `rag_chunks`. +- The ingester explicitly inserts into FTS as chunks are created. +- This makes ingestion deterministic and avoids surprises when chunk bodies change later. + +**Stored fields** +- `chunk_id` (unindexed, acts like a row identifier) +- `title`, `body` (indexed) + +--- + +### 2.5 `rag_vec_chunks` — vector index (sqlite3-vec) + +**Purpose** +- Semantic similarity search over chunks. +- Each chunk has a vector embedding. + +**Key columns** +- `embedding float[DIM]`: embedding vector (DIM must match your model) +- `chunk_id`: join key to `rag_chunks` +- Optional metadata columns: + - `doc_id`, `source_id`, `updated_at` + - These help filtering and joining and are valuable for performance. + +**Note** +- The ingester decides what text is embedded (chunk body alone, or “Title + Tags + Body chunk”). + +--- + +### 2.6 Optional convenience objects +- `rag_chunk_view`: joins `rag_chunks` with `rag_documents` for debugging/inspection +- `rag_sync_state`: reserved for incremental sync later (not used in v0) + +--- + +## 3. Table relationships (the graph) + +Think of this as a data pipeline graph: + +```text +rag_sources + (defines mapping + chunking + embedding) + | + v +rag_documents (1 row per source row) + | + v +rag_chunks (1..N chunks per document) + / \ + v v +rag_fts rag_vec +``` + +**Cardinality** +- `rag_sources (1) -> rag_documents (N)` +- `rag_documents (1) -> rag_chunks (N)` +- `rag_chunks (1) -> rag_fts_chunks (1)` (insertion done by ingester) +- `rag_chunks (1) -> rag_vec_chunks (0/1+)` (0 if embeddings disabled; 1 typically) + +--- + +## 4. How mapping is defined (no guessing) + +### 4.1 Why `doc_map_json` exists +A general-purpose system cannot infer that: +- `posts.Body` should become document body +- `posts.Title` should become title +- `Score`, `Tags`, `CreationDate`, etc. should become metadata +- Or how to concatenate fields + +Therefore, `doc_map_json` is required. + +### 4.2 `doc_map_json` structure (v0) +`doc_map_json` defines: + +- `doc_id.format`: string template with `{ColumnName}` placeholders +- `title.concat`: concatenation spec +- `body.concat`: concatenation spec +- `metadata.pick`: list of column names to include in metadata JSON +- `metadata.rename`: mapping of old key -> new key (useful for typos or schema differences) + +**Concatenation parts** +- `{"col":"Column"}` — appends the column value (if present) +- `{"lit":"..."} ` — appends a literal string + +Example (posts-like): + +```json +{ + "doc_id": { "format": "posts:{Id}" }, + "title": { "concat": [ { "col": "Title" } ] }, + "body": { "concat": [ { "col": "Body" } ] }, + "metadata": { + "pick": ["Id","PostTypeId","Tags","Score","CreaionDate"], + "rename": {"CreaionDate":"CreationDate"} + } +} +``` + +--- + +## 5. Chunking strategy definition + +### 5.1 Why chunking is configured per source +Different tables need different chunking: +- StackOverflow `Body` may be long -> chunking recommended +- Small “reference” tables may not need chunking at all + +Thus chunking is stored in `rag_sources.chunking_json`. + +### 5.2 `chunking_json` structure (v0) +v0 supports **chars-based** chunking (simple, robust). + +```json +{ + "enabled": true, + "unit": "chars", + "chunk_size": 4000, + "overlap": 400, + "min_chunk_size": 800 +} +``` + +**Behavior** +- If `body.length <= chunk_size` -> one chunk +- Else chunks of `chunk_size` with `overlap` +- Avoid tiny final chunks by appending the tail to the previous chunk if below `min_chunk_size` + +**Why overlap matters** +- Prevents splitting a key sentence or code snippet across boundaries +- Improves both FTS and semantic retrieval consistency + +--- + +## 6. Embedding strategy definition (where it fits in the model) + +### 6.1 Why embeddings are per chunk +- Better retrieval precision +- Smaller context per match +- Allows partial updates later (only re-embed changed chunks) + +### 6.2 `embedding_json` structure (v0) +```json +{ + "enabled": true, + "dim": 1536, + "model": "text-embedding-3-large", + "input": { "concat": [ + {"col":"Title"}, + {"lit":"\nTags: "}, {"col":"Tags"}, + {"lit":"\n\n"}, + {"chunk_body": true} + ]} +} +``` + +**Meaning** +- Build embedding input text from: + - title + - tags (as plain text) + - chunk body + +This improves semantic retrieval for question-like content without embedding numeric metadata. + +--- + +## 7. Ingestion lifecycle (step-by-step) + +For each enabled `rag_sources` entry: + +1. **Connect** to source DB using `backend_*` +2. **Select rows** from `table_name` (and optional `where_sql`) + - Select only needed columns determined by `doc_map_json` and `embedding_json` +3. For each row: + - Build `doc_id` using `doc_map_json.doc_id.format` + - Build `pk_json` from `pk_column` + - Build `title` using `title.concat` + - Build `body` using `body.concat` + - Build `metadata_json` using `metadata.pick` and `metadata.rename` +4. **Skip** if `doc_id` already exists (v0 behavior) +5. Insert into `rag_documents` +6. Chunk `body` using `chunking_json` +7. For each chunk: + - Insert into `rag_chunks` + - Insert into `rag_fts_chunks` + - If embeddings enabled: + - Build embedding input text using `embedding_json.input` + - Compute embedding + - Insert into `rag_vec_chunks` +8. Commit (ideally in a transaction for performance) + +--- + +## 8. What changes later (incremental sync and updates) + +v0 is “insert-only and skip-existing.” +Product-grade ingestion requires: + +### 8.1 Detecting changes +Options: +- Watermark by `LastActivityDate` / `updated_at` column +- Hash (e.g. `sha256(title||body||metadata)`) stored in documents table +- Compare chunk hashes to re-embed only changed chunks + +### 8.2 Updating and deleting +Needs: +- Upsert documents +- Delete or mark `deleted=1` when source row deleted +- Rebuild chunks and indexes when body changes +- Maintain FTS rows: + - delete old chunk rows from FTS + - insert updated chunk rows + +### 8.3 Checkpoints +Use `rag_sync_state` to store: +- last ingested timestamp +- GTID/LSN for CDC +- or a monotonic PK watermark + +The current schema already includes: +- `updated_at` and `deleted` +- `rag_sync_state` placeholder + +So incremental sync can be added without breaking the data model. + +--- + +## 9. Practical example: mapping `posts` table + +Given a MySQL `posts` row: + +- `Id = 12345` +- `Title = "How to parse JSON in MySQL 8?"` +- `Body = "

I tried JSON_EXTRACT...

"` +- `Tags = ""` +- `Score = 12` + +With mapping: + +- `doc_id = "posts:12345"` +- `title = Title` +- `body = Body` +- `metadata_json` includes `{ "Tags": "...", "Score": "12", ... }` +- chunking splits body into: + - `posts:12345#0`, `posts:12345#1`, etc. +- FTS is populated with the chunk text +- vectors are stored per chunk + +--- + +## 10. Summary + +This data model separates concerns cleanly: + +- `rag_sources` defines *policy* (what/how to ingest) +- `rag_documents` defines canonical *identity and refetch pointer* +- `rag_chunks` defines retrieval *units* +- `rag_fts_chunks` defines keyword search +- `rag_vec_chunks` defines semantic search + +This separation makes the system: +- general purpose (works for many schemas) +- deterministic (no magic inference) +- extensible to incremental sync, external indexes, and richer hybrid retrieval + diff --git a/RAG_POC/architecture-runtime-retrieval.md b/RAG_POC/architecture-runtime-retrieval.md new file mode 100644 index 000000000..8f033e530 --- /dev/null +++ b/RAG_POC/architecture-runtime-retrieval.md @@ -0,0 +1,344 @@ +# ProxySQL RAG Engine — Runtime Retrieval Architecture (v0 Blueprint) + +This document describes how ProxySQL becomes a **RAG retrieval engine** at runtime. The companion document (Data Model & Ingestion) explains how content enters the SQLite index. This document explains how content is **queried**, how results are **returned to agents/applications**, and how **hybrid retrieval** works in practice. + +It is written as an implementation blueprint for ProxySQL (and its MCP server) and assumes the SQLite schema contains: + +- `rag_sources` (control plane) +- `rag_documents` (canonical docs) +- `rag_chunks` (retrieval units) +- `rag_fts_chunks` (FTS5) +- `rag_vec_chunks` (sqlite3-vec vectors) + +--- + +## 1. The runtime role of ProxySQL in a RAG system + +ProxySQL becomes a RAG runtime by providing four capabilities in one bounded service: + +1. **Retrieval Index Host** + - Hosts the SQLite index and search primitives (FTS + vectors). + - Offers deterministic query semantics and strict budgets. + +2. **Orchestration Layer** + - Implements search flows (FTS, vector, hybrid, rerank). + - Applies filters, caps, and result shaping. + +3. **Stable API Surface (MCP-first)** + - LLM agents call MCP tools (not raw SQL). + - Tool contracts remain stable even if internal storage changes. + +4. **Authoritative Row Refetch Gateway** + - After retrieval returns `doc_id` / `pk_json`, ProxySQL can refetch the authoritative row from the source DB on-demand (optional). + - This avoids returning stale or partial data when the full row is needed. + +In production terms, this is not “ProxySQL as a general search engine.” It is a **bounded retrieval service** colocated with database access logic. + +--- + +## 2. High-level query flow (agent-centric) + +A typical RAG flow has two phases: + +### Phase A — Retrieval (fast, bounded, cheap) +- Query the index to obtain a small number of relevant chunks (and their parent doc identity). +- Output includes `chunk_id`, `doc_id`, `score`, and small metadata. + +### Phase B — Fetch (optional, authoritative, bounded) +- If the agent needs full context or structured fields, it refetches the authoritative row from the source DB using `pk_json`. +- This avoids scanning large tables and avoids shipping huge payloads in Phase A. + +**Canonical flow** +1. `rag.search_hybrid(query, filters, k)` → returns top chunk ids and scores +2. `rag.get_chunks(chunk_ids)` → returns chunk text for prompt grounding/citations +3. Optional: `rag.fetch_from_source(doc_id)` → returns full row or selected columns + +--- + +## 3. Runtime interfaces: MCP vs SQL + +ProxySQL should support two “consumption modes”: + +### 3.1 MCP tools (preferred for AI agents) +- Strict limits and predictable response schemas. +- Tools return structured results and avoid SQL injection concerns. +- Agents do not need direct DB access. + +### 3.2 SQL access (for standard applications / debugging) +- Applications may connect to ProxySQL’s SQLite admin interface (or a dedicated port) and issue SQL. +- Useful for: + - internal dashboards + - troubleshooting + - non-agent apps that want retrieval but speak SQL + +**Principle** +- MCP is the stable, long-term interface. +- SQL is optional and may be restricted to trusted callers. + +--- + +## 4. Retrieval primitives + +### 4.1 FTS retrieval (keyword / exact match) + +FTS5 is used for: +- error messages +- identifiers and function names +- tags and exact terms +- “grep-like” queries + +**Typical output** +- `chunk_id`, `score_fts`, optional highlights/snippets + +**Ranking** +- `bm25(rag_fts_chunks)` is the default. It is fast and effective for term queries. + +### 4.2 Vector retrieval (semantic similarity) + +Vector search is used for: +- paraphrased questions +- semantic similarity (“how to do X” vs “best way to achieve X”) +- conceptual matching that is poor with keyword-only search + +**Typical output** +- `chunk_id`, `score_vec` (distance/similarity), plus join metadata + +**Important** +- Vectors are generally computed per chunk. +- Filters are applied via `source_id` and joins to `rag_chunks` / `rag_documents`. + +--- + +## 5. Hybrid retrieval patterns (two recommended modes) + +Hybrid retrieval combines FTS and vector search for better quality than either alone. Two concrete modes should be implemented because they solve different problems. + +### Mode 1 — “Best of both” (parallel FTS + vector; fuse results) +**Use when** +- the query may contain both exact tokens (e.g. error messages) and semantic intent + +**Flow** +1. Run FTS top-N (e.g. N=50) +2. Run vector top-N (e.g. N=50) +3. Merge results by `chunk_id` +4. Score fusion (recommended): Reciprocal Rank Fusion (RRF) +5. Return top-k (e.g. k=10) + +**Why RRF** +- Robust without score calibration +- Works across heterogeneous score ranges (bm25 vs cosine distance) + +**RRF formula** +- For each candidate chunk: + - `score = w_fts/(k0 + rank_fts) + w_vec/(k0 + rank_vec)` + - Typical: `k0=60`, `w_fts=1.0`, `w_vec=1.0` + +### Mode 2 — “Broad FTS then vector refine” (candidate generation + rerank) +**Use when** +- you want strong precision anchored to exact term matches +- you want to avoid vector search over the entire corpus + +**Flow** +1. Run broad FTS query top-M (e.g. M=200) +2. Fetch chunk texts for those candidates +3. Compute vector similarity of query embedding to candidate embeddings +4. Return top-k + +This mode behaves like a two-stage retrieval pipeline: +- Stage 1: cheap recall (FTS) +- Stage 2: precise semantic rerank within candidates + +--- + +## 6. Filters, constraints, and budgets (blast-radius control) + +A RAG retrieval engine must be bounded. ProxySQL should enforce limits at the MCP layer and ideally also at SQL helper functions. + +### 6.1 Hard caps (recommended defaults) +- Maximum `k` returned: 50 +- Maximum candidates for broad-stage: 200–500 +- Maximum query length: e.g. 2–8 KB +- Maximum response bytes: e.g. 1–5 MB +- Maximum execution time per request: e.g. 50–250 ms for retrieval, 1–2 s for fetch + +### 6.2 Filter semantics +Filters should be applied consistently across retrieval modes. + +Common filters: +- `source_id` or `source_name` +- tag include/exclude (via metadata_json parsing or pre-extracted tag fields later) +- post type (question vs answer) +- minimum score +- time range (creation date / last activity) + +Implementation note: +- v0 stores metadata in JSON; filtering can be implemented in MCP layer or via SQLite JSON functions (if enabled). +- For performance, later versions should denormalize key metadata into dedicated columns or side tables. + +--- + +## 7. Result shaping and what the caller receives + +A retrieval response must be designed for downstream LLM usage: + +### 7.1 Retrieval results (Phase A) +Return a compact list of “evidence candidates”: + +- `chunk_id` +- `doc_id` +- `scores` (fts, vec, fused) +- short `title` +- minimal metadata (source, tags, timestamp, etc.) + +Do **not** return full bodies by default; that is what `rag.get_chunks` is for. + +### 7.2 Chunk fetch results (Phase A.2) +`rag.get_chunks(chunk_ids)` returns: + +- `chunk_id`, `doc_id` +- `title` +- `body` (chunk text) +- optionally a snippet/highlight for display + +### 7.3 Source refetch results (Phase B) +`rag.fetch_from_source(doc_id)` returns: +- either the full row +- or a selected subset of columns (recommended) + +This is the “authoritative fetch” boundary that prevents stale/partial index usage from being a correctness problem. + +--- + +## 8. SQL examples (runtime extraction) + +These are not the preferred agent interface, but they are crucial for debugging and for SQL-native apps. + +### 8.1 FTS search (top 10) +```sql +SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts +FROM rag_fts_chunks f +WHERE rag_fts_chunks MATCH 'json_extract mysql' +ORDER BY score_fts +LIMIT 10; +``` + +Join to fetch text: +```sql +SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts, + c.doc_id, + c.body +FROM rag_fts_chunks f +JOIN rag_chunks c ON c.chunk_id = f.chunk_id +WHERE rag_fts_chunks MATCH 'json_extract mysql' +ORDER BY score_fts +LIMIT 10; +``` + +### 8.2 Vector search (top 10) +Vector syntax depends on how you expose query vectors. A typical pattern is: + +1) Bind a query vector into a function / parameter +2) Use `rag_vec_chunks` to return nearest neighbors + +Example shape (conceptual): +```sql +-- Pseudocode: nearest neighbors for :query_embedding +SELECT + v.chunk_id, + v.distance +FROM rag_vec_chunks v +WHERE v.embedding MATCH :query_embedding +ORDER BY v.distance +LIMIT 10; +``` + +In production, ProxySQL MCP will typically compute the query embedding and call SQL internally with a bound parameter. + +--- + +## 9. MCP tools (runtime API surface) + +This document does not define full schemas (that is in `mcp-tools.md`), but it defines what each tool must do. + +### 9.1 Retrieval +- `rag.search_fts(query, filters, k)` +- `rag.search_vector(query_text | query_embedding, filters, k)` +- `rag.search_hybrid(query, mode, filters, k, params)` + - Mode 1: parallel + RRF fuse + - Mode 2: broad FTS candidates + vector rerank + +### 9.2 Fetch +- `rag.get_chunks(chunk_ids)` +- `rag.get_docs(doc_ids)` +- `rag.fetch_from_source(doc_ids | pk_json, columns?, limits?)` + +**MCP-first principle** +- Agents do not see SQLite schema or SQL. +- MCP tools remain stable even if you move index storage out of ProxySQL later. + +--- + +## 10. Operational considerations + +### 10.1 Dedicated ProxySQL instance +Run GenAI retrieval in a dedicated ProxySQL instance to reduce blast radius: +- independent CPU/memory budgets +- independent configuration and rate limits +- independent failure domain + +### 10.2 Observability and metrics (minimum) +- count of docs/chunks per source +- query counts by tool and source +- p50/p95 latency for: + - FTS + - vector + - hybrid + - refetch +- dropped/limited requests (rate limit hit, cap exceeded) +- error rate and error categories + +### 10.3 Safety controls +- strict upper bounds on `k` and candidate sizes +- strict timeouts +- response size caps +- optional allowlists for sources accessible to agents +- tenant boundaries via filters (strongly recommended for multi-tenant) + +--- + +## 11. Recommended “v0-to-v1” evolution checklist + +### v0 (PoC) +- ingestion to docs/chunks +- FTS search +- vector search (if embedding pipeline available) +- simple hybrid search +- chunk fetch +- manual/limited source refetch + +### v1 (product hardening) +- incremental sync checkpoints (`rag_sync_state`) +- update detection (hashing/versioning) +- delete handling +- robust hybrid search: + - RRF fuse + - candidate-generation rerank +- stronger filtering semantics (denormalized metadata columns) +- quotas, rate limits, per-source budgets +- full MCP tool contracts + tests + +--- + +## 12. Summary + +At runtime, ProxySQL RAG retrieval is implemented as: + +- **Index query** (FTS/vector/hybrid) returning a small set of chunk IDs +- **Chunk fetch** returning the text that the LLM will ground on +- Optional **authoritative refetch** from the source DB by primary key +- Strict limits and consistent filtering to keep the service bounded + diff --git a/RAG_POC/embeddings-design.md b/RAG_POC/embeddings-design.md new file mode 100644 index 000000000..796a06a57 --- /dev/null +++ b/RAG_POC/embeddings-design.md @@ -0,0 +1,353 @@ +# ProxySQL RAG Index — Embeddings & Vector Retrieval Design (Chunk-Level) (v0→v1 Blueprint) + +This document specifies how embeddings should be produced, stored, updated, and queried for chunk-level vector search in ProxySQL’s RAG index. It is intended as an implementation blueprint. + +It assumes: +- Chunking is already implemented (`rag_chunks`). +- ProxySQL includes **sqlite3-vec** and uses a `vec0(...)` virtual table (`rag_vec_chunks`). +- Retrieval is exposed primarily via MCP tools (`mcp-tools.md`). + +--- + +## 1. Design objectives + +1. **Chunk-level embeddings** + - Each chunk receives its own embedding for retrieval precision. + +2. **Deterministic embedding input** + - The text embedded is explicitly defined per source, not inferred. + +3. **Model agility** + - The system can change embedding models/dimensions without breaking stored data or APIs. + +4. **Efficient updates** + - Only recompute embeddings for chunks whose embedding input changed. + +5. **Operational safety** + - Bound cost and latency (embedding generation can be expensive). + - Allow asynchronous embedding jobs if needed later. + +--- + +## 2. What to embed (and what not to embed) + +### 2.1 Embed text that improves semantic retrieval +Recommended embedding input per chunk: + +- Document title (if present) +- Tags (as plain text) +- Chunk body + +Example embedding input template: +``` +{Title} +Tags: {Tags} + +{ChunkBody} +``` + +This typically improves semantic recall significantly for knowledge-base-like content (StackOverflow posts, docs, tickets, runbooks). + +### 2.2 Do NOT embed numeric metadata by default +Do not embed fields like `Score`, `ViewCount`, `OwnerUserId`, timestamps, etc. These should remain structured and be used for: +- filtering +- boosting +- tie-breaking +- result shaping + +Embedding numeric metadata into text typically adds noise and reduces semantic quality. + +### 2.3 Code and HTML considerations +If your chunk body contains HTML or code: +- **v0**: embed raw text (works, but may be noisy) +- **v1**: normalize to improve quality: + - strip HTML tags (keep text content) + - preserve code blocks as text, but consider stripping excessive markup + - optionally create specialized “code-only” chunks for code-heavy sources + +Normalization should be source-configurable. + +--- + +## 3. Where embedding input rules are defined + +Embedding input rules must be explicit and stored per source. + +### 3.1 `rag_sources.embedding_json` +Recommended schema: +```json +{ + "enabled": true, + "model": "text-embedding-3-large", + "dim": 1536, + "input": { + "concat": [ + {"col":"Title"}, + {"lit":"\nTags: "}, {"col":"Tags"}, + {"lit":"\n\n"}, + {"chunk_body": true} + ] + }, + "normalize": { + "strip_html": true, + "collapse_whitespace": true + } +} +``` + +**Semantics** +- `enabled`: whether to compute/store embeddings for this source +- `model`: logical name (for observability and compatibility checks) +- `dim`: vector dimension +- `input.concat`: how to build embedding input text +- `normalize`: optional normalization steps + +--- + +## 4. Storage schema and model/versioning + +### 4.1 Current v0 schema: single vector table +`rag_vec_chunks` stores: +- embedding vector +- chunk_id +- doc_id/source_id convenience columns +- updated_at + +This is appropriate for v0 when you assume a single embedding model/dimension. + +### 4.2 Recommended v1 evolution: support multiple models +In a product setting, you may want multiple embedding models (e.g. general vs code-centric). + +Two ways to support this: + +#### Option A: include model identity columns in `rag_vec_chunks` +Add columns: +- `model TEXT` +- `dim INTEGER` (optional if fixed per model) + +Then allow multiple rows per `chunk_id` (unique key becomes `(chunk_id, model)`). +This may require schema change and a different vec0 design (some vec0 configurations support metadata columns, but uniqueness must be handled carefully). + +#### Option B: one vec table per model (recommended if vec0 constraints exist) +Create: +- `rag_vec_chunks_1536_v1` +- `rag_vec_chunks_1024_code_v1` +etc. + +Then MCP tools select the table based on requested model or default configuration. + +**Recommendation** +Start with Option A only if your sqlite3-vec build makes it easy to filter by model. Otherwise, Option B is operationally cleaner. + +--- + +## 5. Embedding generation pipeline + +### 5.1 When embeddings are created +Embeddings are created during ingestion, immediately after chunk creation, if `embedding_json.enabled=true`. + +This provides a simple, synchronous pipeline: +- ingest row → create chunks → compute embedding → store vector + +### 5.2 When embeddings should be updated +Embeddings must be recomputed if the *embedding input string* changes. That depends on: +- title changes +- tags changes +- chunk body changes +- normalization rules changes (strip_html etc.) +- embedding model changes + +Therefore, update logic should be based on a **content hash** of the embedding input. + +--- + +## 6. Content hashing for efficient updates (v1 recommendation) + +### 6.1 Why hashing is needed +Without hashing, you might recompute embeddings unnecessarily: +- expensive +- slow +- prevents incremental sync from being efficient + +### 6.2 Recommended approach +Store `embedding_input_hash` per chunk per model. + +Implementation options: + +#### Option A: Store hash in `rag_chunks.metadata_json` +Example: +```json +{ + "chunk_index": 0, + "embedding_hash": "sha256:...", + "embedding_model": "text-embedding-3-large" +} +``` + +Pros: no schema changes. +Cons: JSON parsing overhead. + +#### Option B: Dedicated side table (recommended) +Create `rag_chunk_embedding_state`: + +```sql +CREATE TABLE rag_chunk_embedding_state ( + chunk_id TEXT NOT NULL, + model TEXT NOT NULL, + dim INTEGER NOT NULL, + input_hash TEXT NOT NULL, + updated_at INTEGER NOT NULL DEFAULT (unixepoch()), + PRIMARY KEY(chunk_id, model) +); +``` + +Pros: fast lookups; avoids JSON parsing. +Cons: extra table. + +**Recommendation** +Use Option B for v1. + +--- + +## 7. Embedding model integration options + +### 7.1 External embedding service (recommended initially) +ProxySQL calls an embedding service: +- OpenAI-compatible endpoint, or +- local service (e.g. llama.cpp server), or +- vendor-specific embedding API + +Pros: +- easy to iterate on model choice +- isolates ML runtime from ProxySQL process + +Cons: +- network latency; requires caching and timeouts + +### 7.2 Embedded model runtime inside ProxySQL +ProxySQL links to an embedding runtime (llama.cpp, etc.) + +Pros: +- no network dependency +- predictable latency if tuned + +Cons: +- increases memory footprint +- needs careful resource controls + +**Recommendation** +Start with an external embedding provider and keep a modular interface that can be swapped later. + +--- + +## 8. Query embedding generation + +Vector search needs a query embedding. Do this in the MCP layer: + +1. Take `query_text` +2. Apply query normalization (optional but recommended) +3. Compute query embedding using the same model used for chunks +4. Execute vector search SQL with a bound embedding vector + +**Do not** +- accept arbitrary embedding vectors from untrusted callers without validation +- allow unbounded query lengths + +--- + +## 9. Vector search semantics + +### 9.1 Distance vs similarity +Depending on the embedding model and vec search primitive, vector search may return: +- cosine distance (lower is better) +- cosine similarity (higher is better) +- L2 distance (lower is better) + +**Recommendation** +Normalize to a “higher is better” score in MCP responses: +- if distance: `score_vec = 1 / (1 + distance)` or similar monotonic transform + +Keep raw distance in debug fields if needed. + +### 9.2 Filtering +Filtering should be supported by: +- `source_id` restriction +- optional metadata filters (doc-level or chunk-level) + +In v0, filter by `source_id` is easiest because `rag_vec_chunks` stores `source_id` as metadata. + +--- + +## 10. Hybrid retrieval integration + +Embeddings are one leg of hybrid retrieval. Two recommended hybrid modes are described in `mcp-tools.md`: + +1. **Fuse**: top-N FTS and top-N vector, merged by chunk_id, fused by RRF +2. **FTS then vector**: broad FTS candidates then vector rerank within candidates + +Embeddings support both: +- Fuse mode needs global vector search top-N. +- Candidate mode needs vector search restricted to candidate chunk IDs. + +Candidate mode is often cheaper and more precise when the query includes strong exact tokens. + +--- + +## 11. Operational controls + +### 11.1 Resource limits +Embedding generation must be bounded by: +- max chunk size embedded +- max chunks embedded per document +- per-source embedding rate limit +- timeouts when calling embedding provider + +### 11.2 Batch embedding +To improve throughput, embed in batches: +- collect N chunks +- send embedding request for N inputs +- store results + +### 11.3 Backpressure and async embedding +For v1, consider decoupling embedding generation from ingestion: +- ingestion stores chunks +- embedding worker processes “pending” chunks and fills vectors + +This allows: +- ingestion to remain fast +- embedding to scale independently +- retries on embedding failures + +In this design, store a state record: +- pending / ok / error +- last error message +- retry count + +--- + +## 12. Recommended implementation steps (coding agent checklist) + +### v0 (synchronous embedding) +1. Implement `embedding_json` parsing in ingester +2. Build embedding input string for each chunk +3. Call embedding provider (or use a stub in development) +4. Insert vector rows into `rag_vec_chunks` +5. Implement `rag.search_vector` MCP tool using query embedding + vector SQL + +### v1 (efficient incremental embedding) +1. Add `rag_chunk_embedding_state` table +2. Store `input_hash` per chunk per model +3. Only re-embed if hash changed +4. Add async embedding worker option +5. Add metrics for embedding throughput and failures + +--- + +## 13. Summary + +- Compute embeddings per chunk, not per document. +- Define embedding input explicitly in `rag_sources.embedding_json`. +- Store vectors in `rag_vec_chunks` (vec0). +- For production, add hash-based update detection and optional async embedding workers. +- Normalize vector scores in MCP responses and keep raw distance for debugging. + diff --git a/RAG_POC/mcp-tools.md b/RAG_POC/mcp-tools.md new file mode 100644 index 000000000..be3fd39b5 --- /dev/null +++ b/RAG_POC/mcp-tools.md @@ -0,0 +1,465 @@ +# MCP Tooling for ProxySQL RAG Engine (v0 Blueprint) + +This document defines the MCP tool surface for querying ProxySQL’s embedded RAG index. It is intended as a stable interface for AI agents. Internally, these tools query the SQLite schema described in `schema.sql` and the retrieval logic described in `architecture-runtime-retrieval.md`. + +**Design goals** +- Stable tool contracts (do not break agents when internals change) +- Strict bounds (prevent unbounded scans / large outputs) +- Deterministic schemas (agents can reliably parse outputs) +- Separation of concerns: + - Retrieval returns identifiers and scores + - Fetch returns content + - Optional refetch returns authoritative source rows + +--- + +## 1. Conventions + +### 1.1 Identifiers +- `doc_id`: stable document identifier (e.g. `posts:12345`) +- `chunk_id`: stable chunk identifier (e.g. `posts:12345#0`) +- `source_id` / `source_name`: corresponds to `rag_sources` + +### 1.2 Scores +- FTS score: `score_fts` (bm25; lower is better in SQLite’s bm25 by default) +- Vector score: `score_vec` (distance or similarity, depending on implementation) +- Hybrid score: `score` (normalized fused score; higher is better) + +**Recommendation** +Normalize scores in MCP layer so: +- higher is always better for agent ranking +- raw internal ranking can still be returned as `score_fts_raw`, `distance_raw`, etc. if helpful + +### 1.3 Limits and budgets (recommended defaults) +All tools should enforce caps, regardless of caller input: +- `k_max = 50` +- `candidates_max = 500` +- `query_max_bytes = 8192` +- `response_max_bytes = 5_000_000` +- `timeout_ms` (per tool): 250–2000ms depending on tool type + +Tools must return a `truncated` boolean if limits reduce output. + +--- + +## 2. Shared filter model + +Many tools accept the same filter structure. This is intentionally simple in v0. + +### 2.1 Filter object +```json +{ + "source_ids": [1,2], + "source_names": ["stack_posts"], + "doc_ids": ["posts:12345"], + "min_score": 5, + "post_type_ids": [1], + "tags_any": ["mysql","json"], + "tags_all": ["mysql","json"], + "created_after": "2022-01-01T00:00:00Z", + "created_before": "2025-01-01T00:00:00Z" +} +``` + +**Notes** +- In v0, most filters map to `metadata_json` values. Implementation can: + - filter in SQLite if JSON functions are available, or + - filter in MCP layer after initial retrieval (acceptable for small k/candidates) +- For production, denormalize hot filters into dedicated columns for speed. + +### 2.2 Filter behavior +- If both `source_ids` and `source_names` are provided, treat as intersection. +- If no source filter is provided, default to all enabled sources **but** enforce a strict global budget. + +--- + +## 3. Tool: `rag.search_fts` + +Keyword search over `rag_fts_chunks`. + +### 3.1 Request schema +```json +{ + "query": "json_extract mysql", + "k": 10, + "offset": 0, + "filters": { }, + "return": { + "include_title": true, + "include_metadata": true, + "include_snippets": false + } +} +``` + +### 3.2 Semantics +- Executes FTS query (MATCH) over indexed content. +- Returns top-k chunk matches with scores and identifiers. +- Does not return full chunk bodies unless `include_snippets` is requested (still bounded). + +### 3.3 Response schema +```json +{ + "results": [ + { + "chunk_id": "posts:12345#0", + "doc_id": "posts:12345", + "source_id": 1, + "source_name": "stack_posts", + "score_fts": 0.73, + "title": "How to parse JSON in MySQL 8?", + "metadata": { "Tags": "", "Score": "12" } + } + ], + "truncated": false, + "stats": { + "k_requested": 10, + "k_returned": 10, + "ms": 12 + } +} +``` + +--- + +## 4. Tool: `rag.search_vector` + +Semantic search over `rag_vec_chunks`. + +### 4.1 Request schema (text input) +```json +{ + "query_text": "How do I extract JSON fields in MySQL?", + "k": 10, + "filters": { }, + "embedding": { + "model": "text-embedding-3-large" + } +} +``` + +### 4.2 Request schema (precomputed vector) +```json +{ + "query_embedding": { + "dim": 1536, + "values_b64": "AAAA..." // float32 array packed and base64 encoded + }, + "k": 10, + "filters": { } +} +``` + +### 4.3 Semantics +- If `query_text` is provided, ProxySQL computes embedding internally (preferred for agents). +- If `query_embedding` is provided, ProxySQL uses it directly (useful for advanced clients). +- Returns nearest chunks by distance/similarity. + +### 4.4 Response schema +```json +{ + "results": [ + { + "chunk_id": "posts:9876#1", + "doc_id": "posts:9876", + "source_id": 1, + "source_name": "stack_posts", + "score_vec": 0.82, + "title": "Query JSON columns efficiently", + "metadata": { "Tags": "", "Score": "8" } + } + ], + "truncated": false, + "stats": { + "k_requested": 10, + "k_returned": 10, + "ms": 18 + } +} +``` + +--- + +## 5. Tool: `rag.search_hybrid` + +Hybrid search combining FTS and vectors. Supports two modes: + +- **Mode A**: parallel FTS + vector, fuse results (RRF recommended) +- **Mode B**: broad FTS candidate generation, then vector rerank + +### 5.1 Request schema (Mode A: fuse) +```json +{ + "query": "json_extract mysql", + "k": 10, + "filters": { }, + "mode": "fuse", + "fuse": { + "fts_k": 50, + "vec_k": 50, + "rrf_k0": 60, + "w_fts": 1.0, + "w_vec": 1.0 + } +} +``` + +### 5.2 Request schema (Mode B: candidates + rerank) +```json +{ + "query": "json_extract mysql", + "k": 10, + "filters": { }, + "mode": "fts_then_vec", + "fts_then_vec": { + "candidates_k": 200, + "rerank_k": 50, + "vec_metric": "cosine" + } +} +``` + +### 5.3 Semantics (Mode A) +1. Run FTS top `fts_k` +2. Run vector top `vec_k` +3. Merge candidates by `chunk_id` +4. Compute fused score (RRF recommended) +5. Return top `k` + +### 5.4 Semantics (Mode B) +1. Run FTS top `candidates_k` +2. Compute vector similarity within those candidates + - either by joining candidate chunk_ids to stored vectors, or + - by embedding candidate chunk text on the fly (not recommended) +3. Return top `k` reranked results +4. Optionally return debug info about candidate stages + +### 5.5 Response schema +```json +{ + "results": [ + { + "chunk_id": "posts:12345#0", + "doc_id": "posts:12345", + "source_id": 1, + "source_name": "stack_posts", + "score": 0.91, + "score_fts": 0.74, + "score_vec": 0.86, + "title": "How to parse JSON in MySQL 8?", + "metadata": { "Tags": "", "Score": "12" }, + "debug": { + "rank_fts": 3, + "rank_vec": 6 + } + } + ], + "truncated": false, + "stats": { + "mode": "fuse", + "k_requested": 10, + "k_returned": 10, + "ms": 27 + } +} +``` + +--- + +## 6. Tool: `rag.get_chunks` + +Fetch chunk bodies by chunk_id. This is how agents obtain grounding text. + +### 6.1 Request schema +```json +{ + "chunk_ids": ["posts:12345#0", "posts:9876#1"], + "return": { + "include_title": true, + "include_doc_metadata": true, + "include_chunk_metadata": true + } +} +``` + +### 6.2 Response schema +```json +{ + "chunks": [ + { + "chunk_id": "posts:12345#0", + "doc_id": "posts:12345", + "title": "How to parse JSON in MySQL 8?", + "body": "

I tried JSON_EXTRACT...

", + "doc_metadata": { "Tags": "", "Score": "12" }, + "chunk_metadata": { "chunk_index": 0 } + } + ], + "truncated": false, + "stats": { "ms": 6 } +} +``` + +**Hard limit recommendation** +- Cap total returned chunk bytes to a safe maximum (e.g. 1–2 MB). + +--- + +## 7. Tool: `rag.get_docs` + +Fetch full canonical documents by doc_id (not chunks). Useful for inspection or compact docs. + +### 7.1 Request schema +```json +{ + "doc_ids": ["posts:12345"], + "return": { + "include_body": true, + "include_metadata": true + } +} +``` + +### 7.2 Response schema +```json +{ + "docs": [ + { + "doc_id": "posts:12345", + "source_id": 1, + "source_name": "stack_posts", + "pk_json": { "Id": 12345 }, + "title": "How to parse JSON in MySQL 8?", + "body": "

...

", + "metadata": { "Tags": "", "Score": "12" } + } + ], + "truncated": false, + "stats": { "ms": 7 } +} +``` + +--- + +## 8. Tool: `rag.fetch_from_source` + +Refetch authoritative rows from the source DB using `doc_id` (via pk_json). + +### 8.1 Request schema +```json +{ + "doc_ids": ["posts:12345"], + "columns": ["Id","Title","Body","Tags","Score"], + "limits": { + "max_rows": 10, + "max_bytes": 200000 + } +} +``` + +### 8.2 Semantics +- Look up doc(s) in `rag_documents` to get `source_id` and `pk_json` +- Resolve source connection from `rag_sources` +- Execute a parameterized query by primary key +- Return requested columns only +- Enforce strict limits + +### 8.3 Response schema +```json +{ + "rows": [ + { + "doc_id": "posts:12345", + "source_name": "stack_posts", + "row": { + "Id": 12345, + "Title": "How to parse JSON in MySQL 8?", + "Score": 12 + } + } + ], + "truncated": false, + "stats": { "ms": 22 } +} +``` + +**Security note** +- This tool must not allow arbitrary SQL. +- Only allow fetching by primary key and a whitelist of columns. + +--- + +## 9. Tool: `rag.admin.stats` (recommended) + +Operational visibility for dashboards and debugging. + +### 9.1 Request +```json +{} +``` + +### 9.2 Response +```json +{ + "sources": [ + { + "source_id": 1, + "source_name": "stack_posts", + "docs": 123456, + "chunks": 456789, + "last_sync": null + } + ], + "stats": { "ms": 5 } +} +``` + +--- + +## 10. Tool: `rag.admin.sync` (optional in v0; required in v1) + +Kicks ingestion for a source or all sources. In v0, ingestion may run as a separate process; in ProxySQL product form, this would trigger an internal job. + +### 10.1 Request +```json +{ + "source_names": ["stack_posts"] +} +``` + +### 10.2 Response +```json +{ + "accepted": true, + "job_id": "sync-2026-01-19T10:00:00Z" +} +``` + +--- + +## 11. Implementation notes (what the coding agent should implement) + +1. **Input validation and caps** for every tool. +2. **Consistent filtering** across FTS/vector/hybrid. +3. **Stable scoring semantics** (higher-is-better recommended). +4. **Efficient joins**: + - vector search returns chunk_ids; join to `rag_chunks`/`rag_documents` for metadata. +5. **Hybrid modes**: + - Mode A (fuse): implement RRF + - Mode B (fts_then_vec): candidate set then vector rerank +6. **Error model**: + - return structured errors with codes (e.g. `INVALID_ARGUMENT`, `LIMIT_EXCEEDED`, `INTERNAL`) +7. **Observability**: + - return `stats.ms` in responses + - track tool usage counters and latency histograms + +--- + +## 12. Summary + +These MCP tools define a stable retrieval interface: + +- Search: `rag.search_fts`, `rag.search_vector`, `rag.search_hybrid` +- Fetch: `rag.get_chunks`, `rag.get_docs`, `rag.fetch_from_source` +- Admin: `rag.admin.stats`, optionally `rag.admin.sync` + diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp new file mode 100644 index 000000000..415ded422 --- /dev/null +++ b/RAG_POC/rag_ingest.cpp @@ -0,0 +1,1009 @@ +// rag_ingest.cpp +// +// ------------------------------------------------------------ +// ProxySQL RAG Ingestion PoC (General-Purpose) +// ------------------------------------------------------------ +// +// What this program does (v0): +// 1) Opens the SQLite "RAG index" database (schema.sql must already be applied). +// 2) Reads enabled sources from rag_sources. +// 3) For each source: +// - Connects to MySQL (for now). +// - Builds a SELECT that fetches only needed columns. +// - For each row: +// * Builds doc_id / title / body / metadata_json using doc_map_json. +// * Chunks body using chunking_json. +// * Inserts into: +// rag_documents +// rag_chunks +// rag_fts_chunks (FTS5 contentless table) +// * Optionally builds embedding input text using embedding_json and inserts +// embeddings into rag_vec_chunks (sqlite3-vec) via a stub embedding provider. +// - Skips docs that already exist (v0 requirement). +// +// Later (v1+): +// - Add rag_sync_state usage for incremental ingestion (watermark/CDC). +// - Add hashing to detect changed docs/chunks and update/reindex accordingly. +// - Replace the embedding stub with a real embedding generator. +// +// ------------------------------------------------------------ +// Dependencies +// ------------------------------------------------------------ +// - sqlite3 +// - MySQL client library (mysqlclient / libmysqlclient) +// - nlohmann/json (single header json.hpp) +// +// Build example (Linux/macOS): +// g++ -std=c++17 -O2 rag_ingest.cpp -o rag_ingest \ +// -lsqlite3 -lmysqlclient +// +// Usage: +// ./rag_ingest /path/to/rag_index.sqlite +// +// Notes: +// - This is a blueprint-grade PoC, written to be readable and modifiable. +// - It uses a conservative JSON mapping language so ingestion is deterministic. +// - It avoids advanced C++ patterns on purpose. +// +// ------------------------------------------------------------ +// Supported JSON Specs +// ------------------------------------------------------------ +// +// doc_map_json (required): +// { +// "doc_id": { "format": "posts:{Id}" }, +// "title": { "concat": [ {"col":"Title"} ] }, +// "body": { "concat": [ {"col":"Body"} ] }, +// "metadata": { +// "pick": ["Id","Tags","Score","CreaionDate"], +// "rename": {"CreaionDate":"CreationDate"} +// } +// } +// +// chunking_json (required, v0 chunks doc "body" only): +// { +// "enabled": true, +// "unit": "chars", // v0 supports "chars" only +// "chunk_size": 4000, +// "overlap": 400, +// "min_chunk_size": 800 +// } +// +// embedding_json (optional): +// { +// "enabled": true, +// "dim": 1536, +// "model": "text-embedding-3-large", // informational +// "input": { "concat": [ +// {"col":"Title"}, +// {"lit":"\nTags: "}, {"col":"Tags"}, +// {"lit":"\n\n"}, +// {"chunk_body": true} +// ]} +// } +// +// ------------------------------------------------------------ +// sqlite3-vec binding note +// ------------------------------------------------------------ +// sqlite3-vec "vec0(embedding float[N])" generally expects a vector value. +// The exact binding format can vary by build/config of sqlite3-vec. +// This program includes a "best effort" binder that binds a float array as a BLOB. +// If your sqlite3-vec build expects a different representation (e.g. a function to +// pack vectors), adapt bind_vec_embedding() accordingly. +// ------------------------------------------------------------ + +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "json.hpp" +using json = nlohmann::json; + +// ------------------------- +// Small helpers +// ------------------------- + +static void fatal(const std::string& msg) { + std::cerr << "FATAL: " << msg << "\n"; + std::exit(1); +} + +static std::string str_or_empty(const char* p) { + return p ? std::string(p) : std::string(); +} + +static int sqlite_exec(sqlite3* db, const std::string& sql) { + char* err = nullptr; + int rc = sqlite3_exec(db, sql.c_str(), nullptr, nullptr, &err); + if (rc != SQLITE_OK) { + std::string e = err ? err : "(unknown sqlite error)"; + sqlite3_free(err); + std::cerr << "SQLite error: " << e << "\nSQL: " << sql << "\n"; + } + return rc; +} + +static std::string json_dump_compact(const json& j) { + // Compact output (no pretty printing) to keep storage small. + return j.dump(); +} + +// ------------------------- +// Data model +// ------------------------- + +struct RagSource { + int source_id = 0; + std::string name; + int enabled = 0; + + // backend connection + std::string backend_type; // "mysql" for now + std::string host; + int port = 3306; + std::string user; + std::string pass; + std::string db; + + // table + std::string table_name; + std::string pk_column; + std::string where_sql; // optional + + // transformation config + json doc_map_json; + json chunking_json; + json embedding_json; // optional; may be null/object +}; + +struct ChunkingConfig { + bool enabled = true; + std::string unit = "chars"; // v0 only supports chars + int chunk_size = 4000; + int overlap = 400; + int min_chunk_size = 800; +}; + +struct EmbeddingConfig { + bool enabled = false; + int dim = 1536; + std::string model = "unknown"; + json input_spec; // expects {"concat":[...]} +}; + +// A row fetched from MySQL, as a name->string map. +typedef std::unordered_map RowMap; + +// ------------------------- +// JSON parsing +// ------------------------- + +static ChunkingConfig parse_chunking_json(const json& j) { + ChunkingConfig cfg; + if (!j.is_object()) return cfg; + + if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); + if (j.contains("unit")) cfg.unit = j["unit"].get(); + if (j.contains("chunk_size")) cfg.chunk_size = j["chunk_size"].get(); + if (j.contains("overlap")) cfg.overlap = j["overlap"].get(); + if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get(); + + if (cfg.chunk_size <= 0) cfg.chunk_size = 4000; + if (cfg.overlap < 0) cfg.overlap = 0; + if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4; + if (cfg.min_chunk_size < 0) cfg.min_chunk_size = 0; + + // v0 only supports chars + if (cfg.unit != "chars") { + std::cerr << "WARN: chunking_json.unit=" << cfg.unit + << " not supported in v0. Falling back to chars.\n"; + cfg.unit = "chars"; + } + + return cfg; +} + +static EmbeddingConfig parse_embedding_json(const json& j) { + EmbeddingConfig cfg; + if (!j.is_object()) return cfg; + + if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); + if (j.contains("dim")) cfg.dim = j["dim"].get(); + if (j.contains("model")) cfg.model = j["model"].get(); + if (j.contains("input")) cfg.input_spec = j["input"]; + + if (cfg.dim <= 0) cfg.dim = 1536; + return cfg; +} + +// ------------------------- +// Row access +// ------------------------- + +static std::optional row_get(const RowMap& row, const std::string& key) { + auto it = row.find(key); + if (it == row.end()) return std::nullopt; + return it->second; +} + +// ------------------------- +// doc_id.format implementation +// ------------------------- +// Replaces occurrences of {ColumnName} with the value from the row map. +// Example: "posts:{Id}" -> "posts:12345" +static std::string apply_format(const std::string& fmt, const RowMap& row) { + std::string out; + out.reserve(fmt.size() + 32); + + for (size_t i = 0; i < fmt.size(); i++) { + char c = fmt[i]; + if (c == '{') { + size_t j = fmt.find('}', i + 1); + if (j == std::string::npos) { + // unmatched '{' -> treat as literal + out.push_back(c); + continue; + } + std::string col = fmt.substr(i + 1, j - (i + 1)); + auto v = row_get(row, col); + if (v.has_value()) out += v.value(); + i = j; // jump past '}' + } else { + out.push_back(c); + } + } + return out; +} + +// ------------------------- +// concat spec implementation +// ------------------------- +// Supported elements in concat array: +// {"col":"Title"} -> append row["Title"] if present +// {"lit":"\n\n"} -> append literal +// {"chunk_body": true} -> append chunk body (only in embedding_json input) +// +static std::string eval_concat(const json& concat_spec, + const RowMap& row, + const std::string& chunk_body, + bool allow_chunk_body) { + if (!concat_spec.is_array()) return ""; + + std::string out; + for (const auto& part : concat_spec) { + if (!part.is_object()) continue; + + if (part.contains("col")) { + std::string col = part["col"].get(); + auto v = row_get(row, col); + if (v.has_value()) out += v.value(); + } else if (part.contains("lit")) { + out += part["lit"].get(); + } else if (allow_chunk_body && part.contains("chunk_body")) { + bool yes = part["chunk_body"].get(); + if (yes) out += chunk_body; + } + } + return out; +} + +// ------------------------- +// metadata builder +// ------------------------- +// metadata spec: +// "metadata": { "pick":[...], "rename":{...} } +static json build_metadata(const json& meta_spec, const RowMap& row) { + json meta = json::object(); + + if (meta_spec.is_object()) { + // pick fields + if (meta_spec.contains("pick") && meta_spec["pick"].is_array()) { + for (const auto& colv : meta_spec["pick"]) { + if (!colv.is_string()) continue; + std::string col = colv.get(); + auto v = row_get(row, col); + if (v.has_value()) meta[col] = v.value(); + } + } + + // rename keys + if (meta_spec.contains("rename") && meta_spec["rename"].is_object()) { + std::vector> renames; + for (auto it = meta_spec["rename"].begin(); it != meta_spec["rename"].end(); ++it) { + if (!it.value().is_string()) continue; + renames.push_back({it.key(), it.value().get()}); + } + for (size_t i = 0; i < renames.size(); i++) { + const std::string& oldk = renames[i].first; + const std::string& newk = renames[i].second; + if (meta.contains(oldk)) { + meta[newk] = meta[oldk]; + meta.erase(oldk); + } + } + } + } + + return meta; +} + +// ------------------------- +// Chunking (chars-based) +// ------------------------- + +static std::vector chunk_text_chars(const std::string& text, const ChunkingConfig& cfg) { + std::vector chunks; + + if (!cfg.enabled) { + chunks.push_back(text); + return chunks; + } + + if ((int)text.size() <= cfg.chunk_size) { + chunks.push_back(text); + return chunks; + } + + int step = cfg.chunk_size - cfg.overlap; + if (step <= 0) step = cfg.chunk_size; + + for (int start = 0; start < (int)text.size(); start += step) { + int end = start + cfg.chunk_size; + if (end > (int)text.size()) end = (int)text.size(); + int len = end - start; + if (len <= 0) break; + + // Avoid tiny final chunk by appending it to the previous chunk + if (len < cfg.min_chunk_size && !chunks.empty()) { + chunks.back() += text.substr(start, len); + break; + } + + chunks.push_back(text.substr(start, len)); + + if (end == (int)text.size()) break; + } + + return chunks; +} + +// ------------------------- +// MySQL helpers +// ------------------------- + +static MYSQL* mysql_connect_or_die(const RagSource& s) { + MYSQL* conn = mysql_init(nullptr); + if (!conn) fatal("mysql_init failed"); + + // Set utf8mb4 for safety with StackOverflow-like content + mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4"); + + if (!mysql_real_connect(conn, + s.host.c_str(), + s.user.c_str(), + s.pass.c_str(), + s.db.c_str(), + s.port, + nullptr, + 0)) { + std::string err = mysql_error(conn); + mysql_close(conn); + fatal("MySQL connect failed: " + err); + } + return conn; +} + +static RowMap mysql_row_to_map(MYSQL_RES* res, MYSQL_ROW row) { + RowMap m; + unsigned int n = mysql_num_fields(res); + MYSQL_FIELD* fields = mysql_fetch_fields(res); + + for (unsigned int i = 0; i < n; i++) { + const char* name = fields[i].name; + const char* val = row[i]; + if (name) { + m[name] = str_or_empty(val); + } + } + return m; +} + +// Collect columns used by doc_map_json + embedding_json so SELECT is minimal. +// v0: we intentionally keep this conservative (include pk + all referenced col parts + metadata.pick). +static void add_unique(std::vector& cols, const std::string& c) { + for (size_t i = 0; i < cols.size(); i++) { + if (cols[i] == c) return; + } + cols.push_back(c); +} + +static void collect_cols_from_concat(std::vector& cols, const json& concat_spec) { + if (!concat_spec.is_array()) return; + for (const auto& part : concat_spec) { + if (part.is_object() && part.contains("col") && part["col"].is_string()) { + add_unique(cols, part["col"].get()); + } + } +} + +static std::vector collect_needed_columns(const RagSource& s, const EmbeddingConfig& ecfg) { + std::vector cols; + add_unique(cols, s.pk_column); + + // title/body concat + if (s.doc_map_json.contains("title") && s.doc_map_json["title"].contains("concat")) + collect_cols_from_concat(cols, s.doc_map_json["title"]["concat"]); + if (s.doc_map_json.contains("body") && s.doc_map_json["body"].contains("concat")) + collect_cols_from_concat(cols, s.doc_map_json["body"]["concat"]); + + // metadata.pick + if (s.doc_map_json.contains("metadata") && s.doc_map_json["metadata"].contains("pick")) { + const auto& pick = s.doc_map_json["metadata"]["pick"]; + if (pick.is_array()) { + for (const auto& c : pick) if (c.is_string()) add_unique(cols, c.get()); + } + } + + // embedding input concat (optional) + if (ecfg.enabled && ecfg.input_spec.is_object() && ecfg.input_spec.contains("concat")) { + collect_cols_from_concat(cols, ecfg.input_spec["concat"]); + } + + // doc_id.format: we do not try to parse all placeholders; best practice is doc_id uses pk only. + // If you want doc_id.format to reference other columns, include them in metadata.pick or concat. + + return cols; +} + +static std::string build_select_sql(const RagSource& s, const std::vector& cols) { + std::string sql = "SELECT "; + for (size_t i = 0; i < cols.size(); i++) { + if (i) sql += ", "; + sql += "`" + cols[i] + "`"; + } + sql += " FROM `" + s.table_name + "`"; + if (!s.where_sql.empty()) { + sql += " WHERE " + s.where_sql; + } + return sql; +} + +// ------------------------- +// SQLite prepared statements (batched insertion) +// ------------------------- + +struct SqliteStmts { + sqlite3_stmt* doc_exists = nullptr; + sqlite3_stmt* ins_doc = nullptr; + sqlite3_stmt* ins_chunk = nullptr; + sqlite3_stmt* ins_fts = nullptr; + sqlite3_stmt* ins_vec = nullptr; // optional (only used if embedding enabled) +}; + +static void sqlite_prepare_or_die(sqlite3* db, sqlite3_stmt** st, const char* sql) { + if (sqlite3_prepare_v2(db, sql, -1, st, nullptr) != SQLITE_OK) { + fatal(std::string("SQLite prepare failed: ") + sqlite3_errmsg(db) + "\nSQL: " + sql); + } +} + +static void sqlite_finalize_all(SqliteStmts& s) { + if (s.doc_exists) sqlite3_finalize(s.doc_exists); + if (s.ins_doc) sqlite3_finalize(s.ins_doc); + if (s.ins_chunk) sqlite3_finalize(s.ins_chunk); + if (s.ins_fts) sqlite3_finalize(s.ins_fts); + if (s.ins_vec) sqlite3_finalize(s.ins_vec); + s = SqliteStmts{}; +} + +static void sqlite_bind_text(sqlite3_stmt* st, int idx, const std::string& v) { + sqlite3_bind_text(st, idx, v.c_str(), -1, SQLITE_TRANSIENT); +} + +// Best-effort binder for sqlite3-vec embeddings (float32 array). +// If your sqlite3-vec build expects a different encoding, change this function only. +static void bind_vec_embedding(sqlite3_stmt* st, int idx, const std::vector& emb) { + const void* data = (const void*)emb.data(); + int bytes = (int)(emb.size() * sizeof(float)); + sqlite3_bind_blob(st, idx, data, bytes, SQLITE_TRANSIENT); +} + +// Check if doc exists +static bool sqlite_doc_exists(SqliteStmts& ss, const std::string& doc_id) { + sqlite3_reset(ss.doc_exists); + sqlite3_clear_bindings(ss.doc_exists); + + sqlite_bind_text(ss.doc_exists, 1, doc_id); + + int rc = sqlite3_step(ss.doc_exists); + return (rc == SQLITE_ROW); +} + +// Insert doc +static void sqlite_insert_doc(SqliteStmts& ss, + int source_id, + const std::string& source_name, + const std::string& doc_id, + const std::string& pk_json, + const std::string& title, + const std::string& body, + const std::string& meta_json) { + sqlite3_reset(ss.ins_doc); + sqlite3_clear_bindings(ss.ins_doc); + + sqlite_bind_text(ss.ins_doc, 1, doc_id); + sqlite3_bind_int(ss.ins_doc, 2, source_id); + sqlite_bind_text(ss.ins_doc, 3, source_name); + sqlite_bind_text(ss.ins_doc, 4, pk_json); + sqlite_bind_text(ss.ins_doc, 5, title); + sqlite_bind_text(ss.ins_doc, 6, body); + sqlite_bind_text(ss.ins_doc, 7, meta_json); + + int rc = sqlite3_step(ss.ins_doc); + if (rc != SQLITE_DONE) { + fatal(std::string("SQLite insert rag_documents failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_doc))); + } +} + +// Insert chunk +static void sqlite_insert_chunk(SqliteStmts& ss, + const std::string& chunk_id, + const std::string& doc_id, + int source_id, + int chunk_index, + const std::string& title, + const std::string& body, + const std::string& meta_json) { + sqlite3_reset(ss.ins_chunk); + sqlite3_clear_bindings(ss.ins_chunk); + + sqlite_bind_text(ss.ins_chunk, 1, chunk_id); + sqlite_bind_text(ss.ins_chunk, 2, doc_id); + sqlite3_bind_int(ss.ins_chunk, 3, source_id); + sqlite3_bind_int(ss.ins_chunk, 4, chunk_index); + sqlite_bind_text(ss.ins_chunk, 5, title); + sqlite_bind_text(ss.ins_chunk, 6, body); + sqlite_bind_text(ss.ins_chunk, 7, meta_json); + + int rc = sqlite3_step(ss.ins_chunk); + if (rc != SQLITE_DONE) { + fatal(std::string("SQLite insert rag_chunks failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_chunk))); + } +} + +// Insert into FTS +static void sqlite_insert_fts(SqliteStmts& ss, + const std::string& chunk_id, + const std::string& title, + const std::string& body) { + sqlite3_reset(ss.ins_fts); + sqlite3_clear_bindings(ss.ins_fts); + + sqlite_bind_text(ss.ins_fts, 1, chunk_id); + sqlite_bind_text(ss.ins_fts, 2, title); + sqlite_bind_text(ss.ins_fts, 3, body); + + int rc = sqlite3_step(ss.ins_fts); + if (rc != SQLITE_DONE) { + fatal(std::string("SQLite insert rag_fts_chunks failed: ") + sqlite3_errmsg(sqlite3_db_handle(ss.ins_fts))); + } +} + +// Insert vector row (sqlite3-vec) +// Schema: rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) +static void sqlite_insert_vec(SqliteStmts& ss, + const std::vector& emb, + const std::string& chunk_id, + const std::string& doc_id, + int source_id, + std::int64_t updated_at_unixepoch) { + if (!ss.ins_vec) return; + + sqlite3_reset(ss.ins_vec); + sqlite3_clear_bindings(ss.ins_vec); + + bind_vec_embedding(ss.ins_vec, 1, emb); + sqlite_bind_text(ss.ins_vec, 2, chunk_id); + sqlite_bind_text(ss.ins_vec, 3, doc_id); + sqlite3_bind_int(ss.ins_vec, 4, source_id); + sqlite3_bind_int64(ss.ins_vec, 5, (sqlite3_int64)updated_at_unixepoch); + + int rc = sqlite3_step(ss.ins_vec); + if (rc != SQLITE_DONE) { + // In practice, sqlite3-vec may return errors if binding format is wrong. + // Keep the message loud and actionable. + fatal(std::string("SQLite insert rag_vec_chunks failed (check vec binding format): ") + + sqlite3_errmsg(sqlite3_db_handle(ss.ins_vec))); + } +} + +// ------------------------- +// Embedding stub +// ------------------------- +// This function is a placeholder. It returns a deterministic pseudo-embedding from the text. +// Replace it with a real embedding model call in ProxySQL later. +// +// Why deterministic? +// - Helps test end-to-end ingestion + vector SQL without needing an ML runtime. +// - Keeps behavior stable across runs. +// +static std::vector pseudo_embedding(const std::string& text, int dim) { + std::vector v; + v.resize((size_t)dim, 0.0f); + + // Simple rolling hash-like accumulation into float bins. + // NOT a semantic embedding; only for wiring/testing. + std::uint64_t h = 1469598103934665603ULL; + for (size_t i = 0; i < text.size(); i++) { + h ^= (unsigned char)text[i]; + h *= 1099511628211ULL; + + // Spread influence into bins + size_t idx = (size_t)(h % (std::uint64_t)dim); + float val = (float)((h >> 32) & 0xFFFF) / 65535.0f; // 0..1 + v[idx] += (val - 0.5f); + } + + // Very rough normalization + double norm = 0.0; + for (int i = 0; i < dim; i++) norm += (double)v[(size_t)i] * (double)v[(size_t)i]; + norm = std::sqrt(norm); + if (norm > 1e-12) { + for (int i = 0; i < dim; i++) v[(size_t)i] = (float)(v[(size_t)i] / norm); + } + return v; +} + +// ------------------------- +// Load rag_sources from SQLite +// ------------------------- + +static std::vector load_sources(sqlite3* db) { + std::vector out; + + const char* sql = + "SELECT source_id, name, enabled, " + "backend_type, backend_host, backend_port, backend_user, backend_pass, backend_db, " + "table_name, pk_column, COALESCE(where_sql,''), " + "doc_map_json, chunking_json, COALESCE(embedding_json,'') " + "FROM rag_sources WHERE enabled = 1"; + + sqlite3_stmt* st = nullptr; + sqlite_prepare_or_die(db, &st, sql); + + while (sqlite3_step(st) == SQLITE_ROW) { + RagSource s; + s.source_id = sqlite3_column_int(st, 0); + s.name = (const char*)sqlite3_column_text(st, 1); + s.enabled = sqlite3_column_int(st, 2); + + s.backend_type = (const char*)sqlite3_column_text(st, 3); + s.host = (const char*)sqlite3_column_text(st, 4); + s.port = sqlite3_column_int(st, 5); + s.user = (const char*)sqlite3_column_text(st, 6); + s.pass = (const char*)sqlite3_column_text(st, 7); + s.db = (const char*)sqlite3_column_text(st, 8); + + s.table_name = (const char*)sqlite3_column_text(st, 9); + s.pk_column = (const char*)sqlite3_column_text(st, 10); + s.where_sql = (const char*)sqlite3_column_text(st, 11); + + const char* doc_map = (const char*)sqlite3_column_text(st, 12); + const char* chunk_j = (const char*)sqlite3_column_text(st, 13); + const char* emb_j = (const char*)sqlite3_column_text(st, 14); + + try { + s.doc_map_json = json::parse(doc_map ? doc_map : "{}"); + s.chunking_json = json::parse(chunk_j ? chunk_j : "{}"); + if (emb_j && std::strlen(emb_j) > 0) s.embedding_json = json::parse(emb_j); + else s.embedding_json = json(); // null + } catch (const std::exception& e) { + sqlite3_finalize(st); + fatal("Invalid JSON in rag_sources.source_id=" + std::to_string(s.source_id) + ": " + e.what()); + } + + // Basic validation (fail fast) + if (!s.doc_map_json.is_object()) { + sqlite3_finalize(st); + fatal("doc_map_json must be a JSON object for source_id=" + std::to_string(s.source_id)); + } + if (!s.chunking_json.is_object()) { + sqlite3_finalize(st); + fatal("chunking_json must be a JSON object for source_id=" + std::to_string(s.source_id)); + } + + out.push_back(std::move(s)); + } + + sqlite3_finalize(st); + return out; +} + +// ------------------------- +// Build a canonical document from a source row +// ------------------------- + +struct BuiltDoc { + std::string doc_id; + std::string pk_json; + std::string title; + std::string body; + std::string metadata_json; +}; + +static BuiltDoc build_document_from_row(const RagSource& src, const RowMap& row) { + BuiltDoc d; + + // doc_id + if (src.doc_map_json.contains("doc_id") && src.doc_map_json["doc_id"].is_object() + && src.doc_map_json["doc_id"].contains("format") && src.doc_map_json["doc_id"]["format"].is_string()) { + d.doc_id = apply_format(src.doc_map_json["doc_id"]["format"].get(), row); + } else { + // fallback: table:pk + auto pk = row_get(row, src.pk_column).value_or(""); + d.doc_id = src.table_name + ":" + pk; + } + + // pk_json (refetch pointer) + json pk = json::object(); + pk[src.pk_column] = row_get(row, src.pk_column).value_or(""); + d.pk_json = json_dump_compact(pk); + + // title/body + if (src.doc_map_json.contains("title") && src.doc_map_json["title"].is_object() + && src.doc_map_json["title"].contains("concat")) { + d.title = eval_concat(src.doc_map_json["title"]["concat"], row, "", false); + } else { + d.title = ""; + } + + if (src.doc_map_json.contains("body") && src.doc_map_json["body"].is_object() + && src.doc_map_json["body"].contains("concat")) { + d.body = eval_concat(src.doc_map_json["body"]["concat"], row, "", false); + } else { + d.body = ""; + } + + // metadata_json + json meta = json::object(); + if (src.doc_map_json.contains("metadata")) { + meta = build_metadata(src.doc_map_json["metadata"], row); + } + d.metadata_json = json_dump_compact(meta); + + return d; +} + +// ------------------------- +// Embedding input builder (optional) +// ------------------------- + +static std::string build_embedding_input(const EmbeddingConfig& ecfg, + const RowMap& row, + const std::string& chunk_body) { + if (!ecfg.enabled) return ""; + if (!ecfg.input_spec.is_object()) return chunk_body; + + if (ecfg.input_spec.contains("concat") && ecfg.input_spec["concat"].is_array()) { + return eval_concat(ecfg.input_spec["concat"], row, chunk_body, true); + } + + return chunk_body; +} + +// ------------------------- +// Ingest one source +// ------------------------- + +static SqliteStmts prepare_sqlite_statements(sqlite3* db, bool want_vec) { + SqliteStmts ss; + + // Existence check + sqlite_prepare_or_die(db, &ss.doc_exists, + "SELECT 1 FROM rag_documents WHERE doc_id = ? LIMIT 1"); + + // Insert document (v0: no upsert) + sqlite_prepare_or_die(db, &ss.ins_doc, + "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json) " + "VALUES(?,?,?,?,?,?,?)"); + + // Insert chunk + sqlite_prepare_or_die(db, &ss.ins_chunk, + "INSERT INTO rag_chunks(chunk_id, doc_id, source_id, chunk_index, title, body, metadata_json) " + "VALUES(?,?,?,?,?,?,?)"); + + // Insert FTS + sqlite_prepare_or_die(db, &ss.ins_fts, + "INSERT INTO rag_fts_chunks(chunk_id, title, body) VALUES(?,?,?)"); + + // Insert vector (optional) + if (want_vec) { + // NOTE: If your sqlite3-vec build expects different binding format, adapt bind_vec_embedding(). + sqlite_prepare_or_die(db, &ss.ins_vec, + "INSERT INTO rag_vec_chunks(embedding, chunk_id, doc_id, source_id, updated_at) " + "VALUES(?,?,?,?,?)"); + } + + return ss; +} + +static void ingest_source(sqlite3* sdb, const RagSource& src) { + std::cerr << "Ingesting source_id=" << src.source_id + << " name=" << src.name + << " backend=" << src.backend_type + << " table=" << src.table_name << "\n"; + + if (src.backend_type != "mysql") { + std::cerr << " Skipping: backend_type not supported in v0.\n"; + return; + } + + // Parse chunking & embedding config + ChunkingConfig ccfg = parse_chunking_json(src.chunking_json); + EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json); + + // Prepare SQLite statements for this run + SqliteStmts ss = prepare_sqlite_statements(sdb, ecfg.enabled); + + // Connect MySQL + MYSQL* mdb = mysql_connect_or_die(src); + + // Build SELECT + std::vector cols = collect_needed_columns(src, ecfg); + std::string sel = build_select_sql(src, cols); + + if (mysql_query(mdb, sel.c_str()) != 0) { + std::string err = mysql_error(mdb); + mysql_close(mdb); + sqlite_finalize_all(ss); + fatal("MySQL query failed: " + err + "\nSQL: " + sel); + } + + MYSQL_RES* res = mysql_store_result(mdb); + if (!res) { + std::string err = mysql_error(mdb); + mysql_close(mdb); + sqlite_finalize_all(ss); + fatal("mysql_store_result failed: " + err); + } + + std::uint64_t ingested_docs = 0; + std::uint64_t skipped_docs = 0; + + MYSQL_ROW r; + while ((r = mysql_fetch_row(res)) != nullptr) { + RowMap row = mysql_row_to_map(res, r); + + BuiltDoc doc = build_document_from_row(src, row); + + // v0: skip if exists + if (sqlite_doc_exists(ss, doc.doc_id)) { + skipped_docs++; + continue; + } + + // Insert document + sqlite_insert_doc(ss, src.source_id, src.name, + doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json); + + // Chunk and insert chunks + FTS (+ optional vec) + std::vector chunks = chunk_text_chars(doc.body, ccfg); + + // Use SQLite's unixepoch() for updated_at normally; vec table also stores updated_at as unix epoch. + // Here we store a best-effort "now" from SQLite (unixepoch()) would require a query; instead store 0 + // or a local time. For v0, we store 0 and let schema default handle other tables. + // If you want accuracy, query SELECT unixepoch() once per run and reuse it. + std::int64_t now_epoch = 0; + + for (size_t i = 0; i < chunks.size(); i++) { + std::string chunk_id = doc.doc_id + "#" + std::to_string(i); + + // Chunk metadata (minimal) + json cmeta = json::object(); + cmeta["chunk_index"] = (int)i; + + std::string chunk_title = doc.title; // simple: repeat doc title + + sqlite_insert_chunk(ss, chunk_id, doc.doc_id, src.source_id, (int)i, + chunk_title, chunks[i], json_dump_compact(cmeta)); + + sqlite_insert_fts(ss, chunk_id, chunk_title, chunks[i]); + + // Optional vectors + if (ecfg.enabled) { + // Build embedding input text, then generate pseudo embedding. + // Replace pseudo_embedding() with a real embedding provider in ProxySQL. + std::string emb_input = build_embedding_input(ecfg, row, chunks[i]); + std::vector emb = pseudo_embedding(emb_input, ecfg.dim); + + // Insert into sqlite3-vec table + sqlite_insert_vec(ss, emb, chunk_id, doc.doc_id, src.source_id, now_epoch); + } + } + + ingested_docs++; + if (ingested_docs % 1000 == 0) { + std::cerr << " progress: ingested_docs=" << ingested_docs + << " skipped_docs=" << skipped_docs << "\n"; + } + } + + mysql_free_result(res); + mysql_close(mdb); + sqlite_finalize_all(ss); + + std::cerr << "Done source " << src.name + << " ingested_docs=" << ingested_docs + << " skipped_docs=" << skipped_docs << "\n"; +} + +// ------------------------- +// Main +// ------------------------- + +int main(int argc, char** argv) { + if (argc != 2) { + std::cerr << "Usage: " << argv[0] << " \n"; + return 2; + } + + const char* sqlite_path = argv[1]; + + sqlite3* db = nullptr; + if (sqlite3_open(sqlite_path, &db) != SQLITE_OK) { + fatal("Could not open SQLite DB: " + std::string(sqlite_path)); + } + + // Pragmas (safe defaults) + sqlite_exec(db, "PRAGMA foreign_keys = ON;"); + sqlite_exec(db, "PRAGMA journal_mode = WAL;"); + sqlite_exec(db, "PRAGMA synchronous = NORMAL;"); + + // Single transaction for speed + if (sqlite_exec(db, "BEGIN IMMEDIATE;") != SQLITE_OK) { + sqlite3_close(db); + fatal("Failed to begin transaction"); + } + + bool ok = true; + try { + std::vector sources = load_sources(db); + if (sources.empty()) { + std::cerr << "No enabled sources found in rag_sources.\n"; + } + for (size_t i = 0; i < sources.size(); i++) { + ingest_source(db, sources[i]); + } + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << "\n"; + ok = false; + } catch (...) { + std::cerr << "Unknown exception\n"; + ok = false; + } + + if (ok) { + if (sqlite_exec(db, "COMMIT;") != SQLITE_OK) { + sqlite_exec(db, "ROLLBACK;"); + sqlite3_close(db); + fatal("Failed to commit transaction"); + } + } else { + sqlite_exec(db, "ROLLBACK;"); + sqlite3_close(db); + return 1; + } + + sqlite3_close(db); + return 0; +} + diff --git a/RAG_POC/schema.sql b/RAG_POC/schema.sql new file mode 100644 index 000000000..2a40c3e7a --- /dev/null +++ b/RAG_POC/schema.sql @@ -0,0 +1,172 @@ +-- ============================================================ +-- ProxySQL RAG Index Schema (SQLite) +-- v0: documents + chunks + FTS5 + sqlite3-vec embeddings +-- ============================================================ + +PRAGMA foreign_keys = ON; +PRAGMA journal_mode = WAL; +PRAGMA synchronous = NORMAL; + +-- ============================================================ +-- 1) rag_sources: control plane +-- Defines where to fetch from + how to transform + chunking. +-- ============================================================ +CREATE TABLE IF NOT EXISTS rag_sources ( + source_id INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, -- e.g. "stack_posts" + enabled INTEGER NOT NULL DEFAULT 1, + + -- Where to retrieve from (PoC: connect directly; later can be "via ProxySQL") + backend_type TEXT NOT NULL, -- "mysql" | "postgres" | ... + backend_host TEXT NOT NULL, + backend_port INTEGER NOT NULL, + backend_user TEXT NOT NULL, + backend_pass TEXT NOT NULL, + backend_db TEXT NOT NULL, -- database/schema name + + table_name TEXT NOT NULL, -- e.g. "posts" + pk_column TEXT NOT NULL, -- e.g. "Id" + + -- Optional: restrict ingestion; appended to SELECT as WHERE + where_sql TEXT, -- e.g. "PostTypeId IN (1,2)" + + -- REQUIRED: mapping from source row -> rag_documents fields + -- JSON spec describing doc_id, title/body concat, metadata pick/rename, etc. + doc_map_json TEXT NOT NULL, + + -- REQUIRED: chunking strategy (enabled, chunk_size, overlap, etc.) + chunking_json TEXT NOT NULL, + + -- Optional: embedding strategy (how to build embedding input text) + -- In v0 you can keep it NULL/empty; define later without schema changes. + embedding_json TEXT, + + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + updated_at INTEGER NOT NULL DEFAULT (unixepoch()) +); + +CREATE INDEX IF NOT EXISTS idx_rag_sources_enabled + ON rag_sources(enabled); + +CREATE INDEX IF NOT EXISTS idx_rag_sources_backend + ON rag_sources(backend_type, backend_host, backend_port, backend_db, table_name); + + +-- ============================================================ +-- 2) rag_documents: canonical documents +-- One document per source row (e.g. one per posts.Id). +-- ============================================================ +CREATE TABLE IF NOT EXISTS rag_documents ( + doc_id TEXT PRIMARY KEY, -- stable: e.g. "posts:12345" + source_id INTEGER NOT NULL REFERENCES rag_sources(source_id), + source_name TEXT NOT NULL, -- copy of rag_sources.name for convenience + pk_json TEXT NOT NULL, -- e.g. {"Id":12345} + + title TEXT, + body TEXT, + metadata_json TEXT NOT NULL DEFAULT '{}', -- JSON object + + updated_at INTEGER NOT NULL DEFAULT (unixepoch()), + deleted INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_rag_documents_source_updated + ON rag_documents(source_id, updated_at); + +CREATE INDEX IF NOT EXISTS idx_rag_documents_source_deleted + ON rag_documents(source_id, deleted); + + +-- ============================================================ +-- 3) rag_chunks: chunked content +-- The unit we index in FTS and vectors. +-- ============================================================ +CREATE TABLE IF NOT EXISTS rag_chunks ( + chunk_id TEXT PRIMARY KEY, -- e.g. "posts:12345#0" + doc_id TEXT NOT NULL REFERENCES rag_documents(doc_id), + source_id INTEGER NOT NULL REFERENCES rag_sources(source_id), + + chunk_index INTEGER NOT NULL, -- 0..N-1 + title TEXT, + body TEXT NOT NULL, + + -- Optional per-chunk metadata (e.g. offsets, has_code, section label) + metadata_json TEXT NOT NULL DEFAULT '{}', + + updated_at INTEGER NOT NULL DEFAULT (unixepoch()), + deleted INTEGER NOT NULL DEFAULT 0 +); + +CREATE UNIQUE INDEX IF NOT EXISTS uq_rag_chunks_doc_idx + ON rag_chunks(doc_id, chunk_index); + +CREATE INDEX IF NOT EXISTS idx_rag_chunks_source_doc + ON rag_chunks(source_id, doc_id); + +CREATE INDEX IF NOT EXISTS idx_rag_chunks_deleted + ON rag_chunks(deleted); + + +-- ============================================================ +-- 4) rag_fts_chunks: FTS5 index (contentless) +-- Maintained explicitly by the ingester. +-- Notes: +-- - chunk_id is stored but UNINDEXED. +-- - Use bm25(rag_fts_chunks) for ranking. +-- ============================================================ +CREATE VIRTUAL TABLE IF NOT EXISTS rag_fts_chunks +USING fts5( + chunk_id UNINDEXED, + title, + body, + tokenize = 'unicode61' +); + + +-- ============================================================ +-- 5) rag_vec_chunks: sqlite3-vec index +-- Stores embeddings per chunk for vector search. +-- +-- IMPORTANT: +-- - dimension must match your embedding model (example: 1536). +-- - metadata columns are included to help join/filter. +-- ============================================================ +CREATE VIRTUAL TABLE IF NOT EXISTS rag_vec_chunks +USING vec0( + embedding float[1536], -- change if you use another dimension + chunk_id TEXT, -- join key back to rag_chunks + doc_id TEXT, -- optional convenience + source_id INTEGER, -- optional convenience + updated_at INTEGER -- optional convenience +); + +-- Optional: convenience view for debugging / SQL access patterns +CREATE VIEW IF NOT EXISTS rag_chunk_view AS +SELECT + c.chunk_id, + c.doc_id, + c.source_id, + d.source_name, + d.pk_json, + COALESCE(c.title, d.title) AS title, + c.body, + d.metadata_json AS doc_metadata_json, + c.metadata_json AS chunk_metadata_json, + c.updated_at +FROM rag_chunks c +JOIN rag_documents d ON d.doc_id = c.doc_id +WHERE c.deleted = 0 AND d.deleted = 0; + + +-- ============================================================ +-- 6) (Optional) sync state placeholder for later incremental ingestion +-- Not used in v0, but reserving it avoids later schema churn. +-- ============================================================ +CREATE TABLE IF NOT EXISTS rag_sync_state ( + source_id INTEGER PRIMARY KEY REFERENCES rag_sources(source_id), + mode TEXT NOT NULL DEFAULT 'poll', -- 'poll' | 'cdc' + cursor_json TEXT NOT NULL DEFAULT '{}', -- watermark/checkpoint + last_ok_at INTEGER, + last_error TEXT +); + diff --git a/RAG_POC/sql-examples.md b/RAG_POC/sql-examples.md new file mode 100644 index 000000000..b7b52128f --- /dev/null +++ b/RAG_POC/sql-examples.md @@ -0,0 +1,348 @@ +# ProxySQL RAG Index — SQL Examples (FTS, Vectors, Hybrid) + +This file provides concrete SQL examples for querying the ProxySQL-hosted SQLite RAG index directly (for debugging, internal dashboards, or SQL-native applications). + +The **preferred interface for AI agents** remains MCP tools (`mcp-tools.md`). SQL access should typically be restricted to trusted callers. + +Assumed tables: +- `rag_documents` +- `rag_chunks` +- `rag_fts_chunks` (FTS5) +- `rag_vec_chunks` (sqlite3-vec vec0 table) + +--- + +## 0. Common joins and inspection + +### 0.1 Inspect one document and its chunks +```sql +SELECT * FROM rag_documents WHERE doc_id = 'posts:12345'; +SELECT * FROM rag_chunks WHERE doc_id = 'posts:12345' ORDER BY chunk_index; +``` + +### 0.2 Use the convenience view (if enabled) +```sql +SELECT * FROM rag_chunk_view WHERE doc_id = 'posts:12345' ORDER BY chunk_id; +``` + +--- + +## 1. FTS5 examples + +### 1.1 Basic FTS search (top 10) +```sql +SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts_raw +FROM rag_fts_chunks f +WHERE rag_fts_chunks MATCH 'json_extract mysql' +ORDER BY score_fts_raw +LIMIT 10; +``` + +### 1.2 Join FTS results to chunk text and document metadata +```sql +SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts_raw, + c.doc_id, + COALESCE(c.title, d.title) AS title, + c.body AS chunk_body, + d.metadata_json AS doc_metadata_json +FROM rag_fts_chunks f +JOIN rag_chunks c ON c.chunk_id = f.chunk_id +JOIN rag_documents d ON d.doc_id = c.doc_id +WHERE rag_fts_chunks MATCH 'json_extract mysql' + AND c.deleted = 0 AND d.deleted = 0 +ORDER BY score_fts_raw +LIMIT 10; +``` + +### 1.3 Apply a source filter (by source_id) +```sql +SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts_raw +FROM rag_fts_chunks f +JOIN rag_chunks c ON c.chunk_id = f.chunk_id +WHERE rag_fts_chunks MATCH 'replication lag' + AND c.source_id = 1 +ORDER BY score_fts_raw +LIMIT 20; +``` + +### 1.4 Phrase queries, boolean operators (FTS5) +```sql +-- phrase +SELECT chunk_id FROM rag_fts_chunks +WHERE rag_fts_chunks MATCH '"group replication"' +LIMIT 20; + +-- boolean: term1 AND term2 +SELECT chunk_id FROM rag_fts_chunks +WHERE rag_fts_chunks MATCH 'mysql AND deadlock' +LIMIT 20; + +-- boolean: term1 NOT term2 +SELECT chunk_id FROM rag_fts_chunks +WHERE rag_fts_chunks MATCH 'mysql NOT mariadb' +LIMIT 20; +``` + +--- + +## 2. Vector search examples (sqlite3-vec) + +Vector SQL varies slightly depending on sqlite3-vec build and how you bind vectors. +Below are **two patterns** you can implement in ProxySQL. + +### 2.1 Pattern A (recommended): ProxySQL computes embeddings; SQL receives a bound vector +In this pattern, ProxySQL: +1) Computes the query embedding in C++ +2) Executes SQL with a bound parameter `:qvec` representing the embedding + +A typical “nearest neighbors” query shape is: + +```sql +-- PSEUDOCODE: adapt to sqlite3-vec's exact operator/function in your build. +SELECT + v.chunk_id, + v.distance AS distance_raw +FROM rag_vec_chunks v +WHERE v.embedding MATCH :qvec +ORDER BY distance_raw +LIMIT 10; +``` + +Then join to chunks: +```sql +-- PSEUDOCODE: join with content and metadata +SELECT + v.chunk_id, + v.distance AS distance_raw, + c.doc_id, + c.body AS chunk_body, + d.metadata_json AS doc_metadata_json +FROM ( + SELECT chunk_id, distance + FROM rag_vec_chunks + WHERE embedding MATCH :qvec + ORDER BY distance + LIMIT 10 +) v +JOIN rag_chunks c ON c.chunk_id = v.chunk_id +JOIN rag_documents d ON d.doc_id = c.doc_id; +``` + +### 2.2 Pattern B (debug): store a query vector in a temporary table +This is useful when you want to run vector queries manually in SQL without MCP support. + +```sql +CREATE TEMP TABLE tmp_query_vec(qvec BLOB); +-- Insert the query vector (float32 array blob). The insertion is usually done by tooling, not manually. +-- INSERT INTO tmp_query_vec VALUES (X'...'); + +-- PSEUDOCODE: use tmp_query_vec.qvec as the query embedding +SELECT + v.chunk_id, + v.distance +FROM rag_vec_chunks v, tmp_query_vec t +WHERE v.embedding MATCH t.qvec +ORDER BY v.distance +LIMIT 10; +``` + +--- + +## 3. Hybrid search examples + +Hybrid retrieval is best implemented in the MCP layer because it mixes ranking systems and needs careful bounding. +However, you can approximate hybrid behavior using SQL to validate logic. + +### 3.1 Hybrid Mode A: Parallel FTS + Vector then fuse (RRF) + +#### Step 1: FTS top 50 (ranked) +```sql +WITH fts AS ( + SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts_raw + FROM rag_fts_chunks f + WHERE rag_fts_chunks MATCH :fts_query + ORDER BY score_fts_raw + LIMIT 50 +) +SELECT * FROM fts; +``` + +#### Step 2: Vector top 50 (ranked) +```sql +WITH vec AS ( + SELECT + v.chunk_id, + v.distance AS distance_raw + FROM rag_vec_chunks v + WHERE v.embedding MATCH :qvec + ORDER BY v.distance + LIMIT 50 +) +SELECT * FROM vec; +``` + +#### Step 3: Fuse via Reciprocal Rank Fusion (RRF) +In SQL you need ranks. SQLite supports window functions in modern builds. + +```sql +WITH +fts AS ( + SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts_raw, + ROW_NUMBER() OVER (ORDER BY bm25(rag_fts_chunks)) AS rank_fts + FROM rag_fts_chunks f + WHERE rag_fts_chunks MATCH :fts_query + LIMIT 50 +), +vec AS ( + SELECT + v.chunk_id, + v.distance AS distance_raw, + ROW_NUMBER() OVER (ORDER BY v.distance) AS rank_vec + FROM rag_vec_chunks v + WHERE v.embedding MATCH :qvec + LIMIT 50 +), +merged AS ( + SELECT + COALESCE(fts.chunk_id, vec.chunk_id) AS chunk_id, + fts.rank_fts, + vec.rank_vec, + fts.score_fts_raw, + vec.distance_raw + FROM fts + FULL OUTER JOIN vec ON vec.chunk_id = fts.chunk_id +), +rrf AS ( + SELECT + chunk_id, + score_fts_raw, + distance_raw, + rank_fts, + rank_vec, + (1.0 / (60.0 + COALESCE(rank_fts, 1000000))) + + (1.0 / (60.0 + COALESCE(rank_vec, 1000000))) AS score_rrf + FROM merged +) +SELECT + r.chunk_id, + r.score_rrf, + c.doc_id, + c.body AS chunk_body +FROM rrf r +JOIN rag_chunks c ON c.chunk_id = r.chunk_id +ORDER BY r.score_rrf DESC +LIMIT 10; +``` + +**Important**: SQLite does not support `FULL OUTER JOIN` directly in all builds. +For production, implement the merge/fuse in C++ (MCP layer). This SQL is illustrative. + +### 3.2 Hybrid Mode B: Broad FTS then vector rerank (candidate generation) + +#### Step 1: FTS candidate set (top 200) +```sql +WITH candidates AS ( + SELECT + f.chunk_id, + bm25(rag_fts_chunks) AS score_fts_raw + FROM rag_fts_chunks f + WHERE rag_fts_chunks MATCH :fts_query + ORDER BY score_fts_raw + LIMIT 200 +) +SELECT * FROM candidates; +``` + +#### Step 2: Vector rerank within candidates +Conceptually: +- Join candidates to `rag_vec_chunks` and compute distance to `:qvec` +- Keep top 10 + +```sql +WITH candidates AS ( + SELECT + f.chunk_id + FROM rag_fts_chunks f + WHERE rag_fts_chunks MATCH :fts_query + ORDER BY bm25(rag_fts_chunks) + LIMIT 200 +), +reranked AS ( + SELECT + v.chunk_id, + v.distance AS distance_raw + FROM rag_vec_chunks v + JOIN candidates c ON c.chunk_id = v.chunk_id + WHERE v.embedding MATCH :qvec + ORDER BY v.distance + LIMIT 10 +) +SELECT + r.chunk_id, + r.distance_raw, + ch.doc_id, + ch.body +FROM reranked r +JOIN rag_chunks ch ON ch.chunk_id = r.chunk_id; +``` + +As above, the exact `MATCH :qvec` syntax may need adaptation to your sqlite3-vec build; implement vector query execution in C++ and keep SQL as internal glue. + +--- + +## 4. Common “application-friendly” queries + +### 4.1 Return doc_id + score + title only (no bodies) +```sql +SELECT + f.chunk_id, + c.doc_id, + COALESCE(c.title, d.title) AS title, + bm25(rag_fts_chunks) AS score_fts_raw +FROM rag_fts_chunks f +JOIN rag_chunks c ON c.chunk_id = f.chunk_id +JOIN rag_documents d ON d.doc_id = c.doc_id +WHERE rag_fts_chunks MATCH :q +ORDER BY score_fts_raw +LIMIT 20; +``` + +### 4.2 Return top doc_ids (deduplicate by doc_id) +```sql +WITH ranked_chunks AS ( + SELECT + c.doc_id, + bm25(rag_fts_chunks) AS score_fts_raw + FROM rag_fts_chunks f + JOIN rag_chunks c ON c.chunk_id = f.chunk_id + WHERE rag_fts_chunks MATCH :q + ORDER BY score_fts_raw + LIMIT 200 +) +SELECT doc_id, MIN(score_fts_raw) AS best_score +FROM ranked_chunks +GROUP BY doc_id +ORDER BY best_score +LIMIT 20; +``` + +--- + +## 5. Practical guidance + +- Use SQL mode mainly for debugging and internal tooling. +- Prefer MCP tools for agent interaction: + - stable schemas + - strong guardrails + - consistent hybrid scoring +- Implement hybrid fusion in C++ (not in SQL) to avoid dialect limitations and to keep scoring correct.