Merge rahim/v4.0_rag_ingest into v4.0_rag_ingest_2

Merge changes from PR #5318 (RAG ingestion feature) into the new
development branch v4.0_rag_ingest_2.

Changes include:
- RAG ingestion tool (rag_ingest) with chunking and embeddings
- MySQL_Catalog fixes for NULL pointer handling
- MCP server updates for RAG tools
- Comprehensive documentation and test scripts
pull/5324/head
Rene Cannao 4 months ago
commit 42a67ebaf2

@ -152,6 +152,15 @@ build_lib_debug: $(if $(LEGACY_BUILD),build_lib_debug_legacy,build_lib_debug_def
.PHONY: build_src_debug
build_src_debug: $(if $(LEGACY_BUILD),build_src_debug_legacy,build_src_debug_default)
# RAG ingester (PoC)
.PHONY: rag_ingest
rag_ingest: build_deps
cd RAG_POC && ${MAKE} CC=${CC} CXX=${CXX} CXXFLAGS="${CXXFLAGS}"
.PHONY: rag_ingest_clean
rag_ingest_clean:
cd RAG_POC && ${MAKE} clean
# legacy build targets (pre c++17)
.PHONY: build_deps_legacy
build_deps_legacy:

@ -0,0 +1,33 @@
CXX ?= g++
CXXFLAGS ?= -std=c++17 -O2
ROOT_DIR := ..
INCLUDES := \
-I$(ROOT_DIR)/deps/json \
-I$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/include \
-I$(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400 \
-I$(ROOT_DIR)/deps/curl/curl/include
LIBDIRS := \
-L$(ROOT_DIR)/deps/mariadb-client-library/mariadb_client/libmariadb
SQLITE3_OBJ := $(ROOT_DIR)/deps/sqlite3/sqlite-amalgamation-3500400/sqlite3.o
# Use static libcurl
CURL_STATIC_LIB := $(ROOT_DIR)/deps/curl/curl/lib/.libs/libcurl.a
LIBS := -lmariadbclient -lssl -lcrypto -lcrypt -ldl -lpthread $(CURL_STATIC_LIB) -lz
TARGET := rag_ingest
SOURCES := rag_ingest.cpp
.PHONY: all clean
all: $(TARGET)
$(TARGET): $(SOURCES)
$(CXX) $(CXXFLAGS) $(INCLUDES) $(LIBDIRS) $(SQLITE3_OBJ) $^ -o $@ $(LIBS)
clean:
rm -f $(TARGET)

File diff suppressed because it is too large Load Diff

@ -0,0 +1,29 @@
-- Sample MySQL dataset for rag_ingest testing
-- Creates a simple posts table and inserts a few rows.
CREATE DATABASE IF NOT EXISTS rag_test;
USE rag_test;
DROP TABLE IF EXISTS posts;
CREATE TABLE posts (
Id BIGINT NOT NULL PRIMARY KEY,
Title VARCHAR(255) NOT NULL,
Body TEXT NOT NULL,
Tags VARCHAR(255) NULL,
Score INT NOT NULL DEFAULT 0,
CreationDate DATETIME NOT NULL,
UpdatedAt DATETIME NULL
);
INSERT INTO posts (Id, Title, Body, Tags, Score, CreationDate, UpdatedAt) VALUES
(1, 'Hello RAG', 'This is the first test document. It contains sample text for chunking.', 'rag,test', 10, '2024-01-01 10:00:00', '2024-01-02 12:00:00'),
(2, 'Second Doc', 'A second document body. It has more text to ensure chunking works across boundaries.', 'example,docs', 5, '2024-01-03 09:30:00', '2024-01-03 11:00:00'),
(3, 'ProxySQL RAG', 'ProxySQL adds MCP and RAG support. This row is for ingestion testing.', 'proxysql,rag', 7, '2024-01-05 08:15:00', NULL),
(4, 'Short Note', 'Tiny.', 'misc', 1, '2024-01-06 13:00:00', NULL),
(5, 'Chunk Stress', 'This row contains a longer body to force multiple chunk boundaries when chunking is enabled. Repeat: This row contains a longer body to force multiple chunk boundaries when chunking is enabled.', 'long,chunk', 12, '2024-01-07 18:45:00', '2024-01-08 07:10:00'),
(6, 'Filter Candidate', 'This document should be filtered out by a high score threshold.', 'filter,test', 2, '2024-01-09 14:20:00', NULL),
(7, 'Tag Variation', 'Contains tags and mixed content for metadata pick/rename testing.', 'rag,meta,tag', 9, '2024-01-10 09:00:00', '2024-01-10 10:00:00'),
(8, 'Null Updated', 'Document with NULL UpdatedAt for null handling in source.', 'nulls', 6, '2024-01-11 16:30:00', NULL),
(9, 'High Score', 'This is a high score document for where_sql tests.', 'score,high', 20, '2024-01-12 08:00:00', '2024-01-12 09:30:00'),
(10, 'Low Score', 'Low score entry to test filters.', 'score,low', 0, '2024-01-13 12:00:00', NULL);

@ -0,0 +1,38 @@
-- Sample SQLite setup for rag_ingest testing
-- Inserts a sample rag_sources row that points to the MySQL sample.
-- Note: schema.sql must be loaded separately before this script.
-- insert a sample source
INSERT INTO rag_sources (
source_id,
name,
enabled,
backend_type,
backend_host,
backend_port,
backend_user,
backend_pass,
backend_db,
table_name,
pk_column,
where_sql,
doc_map_json,
chunking_json,
embedding_json
) VALUES (
1,
'mysql_posts',
1,
'mysql',
'127.0.0.1',
3306,
'root',
'root',
'rag_test',
'posts',
'Id',
'',
'{"doc_id":{"format":"posts:{Id}"},"title":{"concat":[{"col":"Title"}]},"body":{"concat":[{"col":"Body"}]},"metadata":{"pick":["Id","Tags","Score","CreationDate"],"rename":{"CreationDate":"CreationDate"}}}',
'{"enabled":true,"unit":"chars","chunk_size":4000,"overlap":400,"min_chunk_size":800}',
'{"enabled":true,"dim":1536,"model":"text-embedding-3-large","input":{"concat":[{"col":"Title"},{"lit":"\\nTags: "},{"col":"Tags"},{"lit":"\\n\\n"},{"chunk_body":true}]}}'
);

@ -0,0 +1,377 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "${ROOT_DIR}/.." && pwd)"
SQLITE_BIN="${SQLITE_BIN:-${REPO_ROOT}/deps/sqlite3/sqlite3/sqlite3}"
MYSQL_BIN="${MYSQL_BIN:-mysql}"
MYSQL_HOST="${MYSQL_HOST:-127.0.0.1}"
MYSQL_PORT="${MYSQL_PORT:-3306}"
MYSQL_USER="${MYSQL_USER:-root}"
MYSQL_PASS="${MYSQL_PASS:-root}"
# Embedding provider configuration (for phase 4/5)
EMBEDDING_PROVIDER="${EMBEDDING_PROVIDER:-stub}"
EMBEDDING_DIM="${EMBEDDING_DIM:-1536}"
OPENAI_API_BASE="${OPENAI_API_BASE:-}"
OPENAI_API_KEY="${OPENAI_API_KEY:-}"
OPENAI_MODEL="${OPENAI_MODEL:-hf:nomic-ai/nomic-embed-text-v1.5}"
OPENAI_EMBEDDING_DIM="${OPENAI_EMBEDDING_DIM:-}"
if [[ -z "${OPENAI_EMBEDDING_DIM}" ]]; then
if [[ "${OPENAI_MODEL}" == "hf:nomic-ai/nomic-embed-text-v1.5" ]]; then
OPENAI_EMBEDDING_DIM=768
else
OPENAI_EMBEDDING_DIM="${EMBEDDING_DIM}"
fi
fi
# Uncomment to test OpenAI-compatible embeddings
# export EMBEDDING_PROVIDER=openai
# export EMBEDDING_DIM=1536
export OPENAI_API_BASE="https://api.synthetic.new/openai/v1"
export OPENAI_API_KEY="your_api_key_here"
# export OPENAI_MODEL="hf:nomic-ai/nomic-embed-text-v1.5"
DB1="${ROOT_DIR}/rag_ingest_test.db"
DB_OPENAI="${ROOT_DIR}/rag_ingest_test_openai.db"
VEC_EXT="${REPO_ROOT}/deps/sqlite3/sqlite3/vec0.so"
export RAG_VEC0_EXT="${VEC_EXT}"
if [[ ! -f "${VEC_EXT}" ]]; then
echo "FATAL: vec0.so not found at ${VEC_EXT}" >&2
exit 1
fi
run_sqlite() {
local db="$1"
local sql="$2"
"${SQLITE_BIN}" "${db}" <<SQL
.load ${VEC_EXT}
${sql}
SQL
}
apply_schema_and_source() {
local db="$1"
local where_sql="$2"
local load_schema="$3"
local chunking_json_override="${4:-}"
local embedding_json_override="${5:-}"
local schema_override_path="${6:-}"
local schema_cmd=""
if [[ "${load_schema}" == "true" ]]; then
if [[ -n "${schema_override_path}" ]]; then
schema_cmd=".read ${schema_override_path}"$'\n'".read ${ROOT_DIR}/sample_sqlite.sql"
else
schema_cmd=".read ${ROOT_DIR}/schema.sql"$'\n'".read ${ROOT_DIR}/sample_sqlite.sql"
fi
fi
echo "==> SQLite DB: ${db}"
echo "==> load_schema: ${load_schema}"
echo "==> where_sql: ${where_sql:-<empty>}"
local chunking_json_value='{"enabled":false,"unit":"chars","chunk_size":4000,"overlap":400,"min_chunk_size":800}'
if [[ -n "${chunking_json_override}" ]]; then
chunking_json_value="${chunking_json_override}"
fi
echo "==> chunking_json: ${chunking_json_value}"
local embedding_json_value='{"enabled":false}'
if [[ -n "${embedding_json_override}" ]]; then
embedding_json_value="${embedding_json_override}"
fi
echo "==> embedding_json: ${embedding_json_value}"
"${SQLITE_BIN}" "${db}" <<SQL
.load ${VEC_EXT}
.bail on
.mode list
.separator |
.nullvalue NULL
${schema_cmd}
UPDATE rag_sources
SET chunking_json='${chunking_json_value}'
WHERE source_id=1;
UPDATE rag_sources
SET embedding_json='${embedding_json_value}'
WHERE source_id=1;
UPDATE rag_sources
SET where_sql='${where_sql}'
WHERE source_id=1;
SQL
}
import_mysql_seed() {
"${MYSQL_BIN}" \
-h"${MYSQL_HOST}" -P"${MYSQL_PORT}" \
-u"${MYSQL_USER}" -p"${MYSQL_PASS}" \
< "${ROOT_DIR}/sample_mysql.sql"
}
run_mysql_sql() {
local sql="$1"
"${MYSQL_BIN}" \
-h"${MYSQL_HOST}" -P"${MYSQL_PORT}" \
-u"${MYSQL_USER}" -p"${MYSQL_PASS}" \
-e "${sql}"
}
assert_eq() {
local label="$1"
local expected="$2"
local actual="$3"
if [[ "${expected}" != "${actual}" ]]; then
echo "FAIL: ${label} expected ${expected}, got ${actual}" >&2
exit 1
fi
echo "OK: ${label} = ${actual}"
}
fts_count() {
local db="$1"
local q="$2"
run_sqlite "${db}" "SELECT COUNT(*) FROM rag_fts_chunks WHERE rag_fts_chunks MATCH '${q}';"
}
fts_bm25_top() {
local db="$1"
local q="$2"
run_sqlite "${db}" "SELECT chunk_id FROM rag_fts_chunks WHERE rag_fts_chunks MATCH '${q}' ORDER BY bm25(rag_fts_chunks) LIMIT 1;"
}
vec_self_match() {
local db="$1"
local chunk_id="$2"
run_sqlite "${db}" "SELECT chunk_id FROM rag_vec_chunks WHERE embedding MATCH (SELECT embedding FROM rag_vec_chunks WHERE chunk_id='${chunk_id}') ORDER BY distance LIMIT 1;"
}
print_samples() {
local db="$1"
echo "==> Sample rag_documents"
run_sqlite "${db}" "SELECT doc_id, source_id, substr(title,1,40) AS title, json_extract(metadata_json,'$.Score') AS score FROM rag_documents ORDER BY doc_id LIMIT 5;"
echo "==> Sample rag_chunks"
run_sqlite "${db}" "SELECT chunk_id, doc_id, chunk_index, substr(body,1,50) AS body FROM rag_chunks ORDER BY chunk_id LIMIT 5;"
echo "==> Sample rag_fts_chunks matches for 'ProxySQL'"
run_sqlite "${db}" "SELECT chunk_id, substr(title,1,40) AS title FROM rag_fts_chunks WHERE rag_fts_chunks MATCH 'ProxySQL' ORDER BY chunk_id LIMIT 5;"
}
cleanup_db() {
rm -f "${DB1}"
rm -f "${DB_OPENAI}"
}
cleanup_db
# Phase 1: load schema + source, chunking disabled, no where filter
apply_schema_and_source "${DB1}" "" "true"
# Seed MySQL
import_mysql_seed
# Run rag_ingest
"${ROOT_DIR}/rag_ingest" "${DB1}"
# Validate counts (sample_mysql has 10 rows)
DOCS_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
CHUNKS_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")"
FTS_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_fts_chunks;")"
VEC_COUNT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_vec_chunks;")"
assert_eq "rag_documents" "10" "${DOCS_COUNT}"
assert_eq "rag_chunks (chunking disabled)" "10" "${CHUNKS_COUNT}"
assert_eq "rag_fts_chunks" "10" "${FTS_COUNT}"
assert_eq "rag_vec_chunks (embedding disabled)" "0" "${VEC_COUNT}"
print_samples "${DB1}"
# FTS tests (phase 1)
FTS_PHRASE_1="$(fts_count "${DB1}" '"ProxySQL adds MCP"')"
FTS_SHORT_1="$(fts_count "${DB1}" 'Short')"
FTS_TAG_1="$(fts_count "${DB1}" 'Tag')"
FTS_BM25_1="$(fts_bm25_top "${DB1}" 'ProxySQL')"
assert_eq "fts phrase (ProxySQL adds MCP)" "1" "${FTS_PHRASE_1}"
assert_eq "fts term (Short)" "1" "${FTS_SHORT_1}"
assert_eq "fts term (Tag)" "1" "${FTS_TAG_1}"
assert_eq "fts bm25 top (ProxySQL)" "posts:3#0" "${FTS_BM25_1}"
# Phase 1a: update skip behavior (existing docs are not updated)
run_mysql_sql "USE rag_test; UPDATE posts SET Title='Hello RAG UPDATED' WHERE Id=1;"
"${ROOT_DIR}/rag_ingest" "${DB1}"
TITLE_AFTER_UPDATE="$(run_sqlite "${DB1}" "SELECT title FROM rag_documents WHERE doc_id='posts:1';")"
assert_eq "rag_documents title unchanged on update" "Hello RAG" "${TITLE_AFTER_UPDATE}"
# Reset MySQL data after update test
import_mysql_seed
# Phase 1b: rag_sync_state watermark (incremental ingestion)
SYNC_COL_1="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.column') FROM rag_sync_state WHERE source_id=1;")"
SYNC_VAL_1="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.value') FROM rag_sync_state WHERE source_id=1;")"
assert_eq "rag_sync_state column" "Id" "${SYNC_COL_1}"
assert_eq "rag_sync_state value (initial)" "10" "${SYNC_VAL_1}"
# Delete one doc to verify watermark prevents backfill
run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks WHERE chunk_id LIKE 'posts:5#%';"
run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks WHERE chunk_id LIKE 'posts:5#%';"
run_sqlite "${DB1}" "DELETE FROM rag_chunks WHERE doc_id='posts:5';"
run_sqlite "${DB1}" "DELETE FROM rag_documents WHERE doc_id='posts:5';"
DOCS_AFTER_DELETE="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
assert_eq "rag_documents after delete" "9" "${DOCS_AFTER_DELETE}"
"${ROOT_DIR}/rag_ingest" "${DB1}"
DOCS_AFTER_REINGEST="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
CHUNKS_AFTER_REINGEST="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")"
FTS_AFTER_REINGEST="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_fts_chunks;")"
assert_eq "rag_documents after watermark reingest" "9" "${DOCS_AFTER_REINGEST}"
assert_eq "rag_chunks after watermark reingest" "9" "${CHUNKS_AFTER_REINGEST}"
assert_eq "rag_fts_chunks after watermark reingest" "9" "${FTS_AFTER_REINGEST}"
# Insert a new source row and ensure only it is ingested
run_mysql_sql "USE rag_test; INSERT INTO posts (Id, Title, Body, Tags, Score, CreationDate, UpdatedAt) VALUES (11, 'Watermark New', 'This row should be ingested via watermark.', 'wm,test', 1, '2024-01-14 10:00:00', '2024-01-14 11:00:00');"
"${ROOT_DIR}/rag_ingest" "${DB1}"
DOCS_AFTER_NEW="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
SYNC_VAL_2="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.value') FROM rag_sync_state WHERE source_id=1;")"
assert_eq "rag_documents after new row" "10" "${DOCS_AFTER_NEW}"
assert_eq "rag_sync_state value (after new row)" "11" "${SYNC_VAL_2}"
# Reset sync state for subsequent phases
run_sqlite "${DB1}" "DELETE FROM rag_sync_state;"
# Reset MySQL data after watermark insert
import_mysql_seed
# Phase 1c: UpdatedAt-based watermark filtering
run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_documents;"
run_sqlite "${DB1}" "INSERT OR REPLACE INTO rag_sync_state(source_id, mode, cursor_json, last_ok_at, last_error) VALUES (1, 'poll', '{\"column\":\"UpdatedAt\",\"value\":\"2024-01-10 10:00:00\"}', NULL, NULL);"
"${ROOT_DIR}/rag_ingest" "${DB1}"
DOCS_UPDATED_AT="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
SYNC_UPDATED_AT="$(run_sqlite "${DB1}" "SELECT json_extract(cursor_json,'$.value') FROM rag_sync_state WHERE source_id=1;")"
assert_eq "rag_documents (UpdatedAt watermark)" "1" "${DOCS_UPDATED_AT}"
assert_eq "rag_sync_state value (UpdatedAt)" "2024-01-12 09:30:00" "${SYNC_UPDATED_AT}"
# Reset sync state for subsequent phases
run_sqlite "${DB1}" "DELETE FROM rag_sync_state;"
# Phase 2: apply where filter, re-ingest after cleanup
run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_documents;"
apply_schema_and_source "${DB1}" "Score >= 7" "false"
"${ROOT_DIR}/rag_ingest" "${DB1}"
DOCS_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
CHUNKS_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")"
FTS_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_fts_chunks;")"
VEC_COUNT_2="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_vec_chunks;")"
# In sample_mysql: Score >= 7 matches Id 1,3,5,7,9 => 5 docs
assert_eq "rag_documents (where_sql)" "5" "${DOCS_COUNT_2}"
assert_eq "rag_chunks (where_sql)" "5" "${CHUNKS_COUNT_2}"
assert_eq "rag_fts_chunks (where_sql)" "5" "${FTS_COUNT_2}"
assert_eq "rag_vec_chunks (where_sql, embedding disabled)" "0" "${VEC_COUNT_2}"
print_samples "${DB1}"
# FTS tests (phase 2)
FTS_PROXYSQL_2="$(fts_count "${DB1}" 'ProxySQL')"
FTS_HIGH_2="$(fts_count "${DB1}" 'High')"
FTS_LOW_2="$(fts_count "${DB1}" 'Low')"
FTS_BM25_2="$(fts_bm25_top "${DB1}" 'High')"
assert_eq "fts term (ProxySQL)" "1" "${FTS_PROXYSQL_2}"
assert_eq "fts term (High)" "1" "${FTS_HIGH_2}"
assert_eq "fts term (Low)" "0" "${FTS_LOW_2}"
assert_eq "fts bm25 top (High)" "posts:9#0" "${FTS_BM25_2}"
# Phase 3: enable chunking and ensure rows split into multiple chunks
run_sqlite "${DB1}" "DELETE FROM rag_sync_state;"
run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_documents;"
apply_schema_and_source "${DB1}" "" "false" '{"enabled":true,"unit":"chars","chunk_size":50,"overlap":10,"min_chunk_size":10}'
"${ROOT_DIR}/rag_ingest" "${DB1}"
DOCS_COUNT_3="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
CHUNKS_COUNT_3="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")"
LONG_DOC_CHUNKS="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks WHERE doc_id='posts:5';")"
assert_eq "rag_documents (chunking enabled)" "10" "${DOCS_COUNT_3}"
if [[ "${CHUNKS_COUNT_3}" -le "${DOCS_COUNT_3}" ]]; then
echo "FAIL: rag_chunks should be greater than rag_documents when chunking enabled" >&2
exit 1
fi
if [[ "${LONG_DOC_CHUNKS}" -le "1" ]]; then
echo "FAIL: posts:5 should produce multiple chunks" >&2
exit 1
fi
print_samples "${DB1}"
# Phase 4: enable embeddings (stub) and validate vec rows
run_sqlite "${DB1}" "DELETE FROM rag_sync_state;"
run_sqlite "${DB1}" "DELETE FROM rag_vec_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_fts_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_chunks;"
run_sqlite "${DB1}" "DELETE FROM rag_documents;"
apply_schema_and_source "${DB1}" "" "false" '' "{\"enabled\":true,\"provider\":\"${EMBEDDING_PROVIDER}\",\"dim\":${EMBEDDING_DIM},\"input\":{\"concat\":[{\"col\":\"Title\"},{\"lit\":\"\\n\"},{\"chunk_body\":true}]}}"
"${ROOT_DIR}/rag_ingest" "${DB1}"
DOCS_COUNT_4="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_documents;")"
CHUNKS_COUNT_4="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_chunks;")"
VEC_COUNT_4="$(run_sqlite "${DB1}" "SELECT COUNT(*) FROM rag_vec_chunks;")"
assert_eq "rag_documents (embeddings enabled)" "10" "${DOCS_COUNT_4}"
assert_eq "rag_chunks (embeddings enabled)" "10" "${CHUNKS_COUNT_4}"
assert_eq "rag_vec_chunks (embeddings enabled)" "10" "${VEC_COUNT_4}"
VEC_MATCH_1="$(vec_self_match "${DB1}" 'posts:1#0')"
assert_eq "vec self-match (posts:1#0)" "posts:1#0" "${VEC_MATCH_1}"
print_samples "${DB1}"
# Phase 5: optional OpenAI-compatible embeddings test (requires env vars)
if [[ -n "${OPENAI_API_BASE}" && -n "${OPENAI_API_KEY}" ]]; then
OPENAI_SCHEMA_TMP="${ROOT_DIR}/schema_openai_tmp.sql"
sed "s/embedding float\[1536\]/embedding float[${OPENAI_EMBEDDING_DIM}]/" "${ROOT_DIR}/schema.sql" > "${OPENAI_SCHEMA_TMP}"
apply_schema_and_source "${DB_OPENAI}" "" "true" '' "{\"enabled\":true,\"provider\":\"openai\",\"api_base\":\"${OPENAI_API_BASE}\",\"api_key\":\"${OPENAI_API_KEY}\",\"model\":\"${OPENAI_MODEL}\",\"dim\":${OPENAI_EMBEDDING_DIM},\"input\":{\"concat\":[{\"col\":\"Title\"},{\"lit\":\"\\n\"},{\"chunk_body\":true}]}}" "${OPENAI_SCHEMA_TMP}"
"${ROOT_DIR}/rag_ingest" "${DB_OPENAI}"
DOCS_COUNT_5="$(run_sqlite "${DB_OPENAI}" "SELECT COUNT(*) FROM rag_documents;")"
CHUNKS_COUNT_5="$(run_sqlite "${DB_OPENAI}" "SELECT COUNT(*) FROM rag_chunks;")"
VEC_COUNT_5="$(run_sqlite "${DB_OPENAI}" "SELECT COUNT(*) FROM rag_vec_chunks;")"
assert_eq "rag_documents (openai embeddings)" "10" "${DOCS_COUNT_5}"
assert_eq "rag_chunks (openai embeddings)" "10" "${CHUNKS_COUNT_5}"
assert_eq "rag_vec_chunks (openai embeddings)" "10" "${VEC_COUNT_5}"
print_samples "${DB_OPENAI}"
rm -f "${OPENAI_SCHEMA_TMP}"
else
echo "==> OpenAI embeddings test skipped (set OPENAI_API_BASE and OPENAI_API_KEY)"
fi
echo "All tests passed."

@ -287,31 +287,55 @@ std::string MySQL_Catalog::search(
int limit,
int offset
) {
// Build SQL query with parameterized conditions to prevent SQL injection
std::ostringstream sql;
sql << "SELECT schema, kind, key, document, tags , links FROM catalog WHERE 1=1";
// FTS5 search requires a query
if (query.empty()) {
proxy_error("Catalog search requires a query parameter\n");
nlohmann::json error_result = {{"error", "Catalog search requires a query parameter"}};
return error_result.dump();
}
// Helper lambda to escape single quotes for SQLite SQL literals
auto escape_sql = [](const std::string& str) -> std::string {
std::string result;
result.reserve(str.length() * 2); // Reserve space for potential escaping
for (char c : str) {
if (c == '\'') {
result += '\''; // Escape single quote by doubling it
}
result += c;
}
return result;
};
bool has_schema = !schema.empty();
bool has_kind = !kind.empty();
bool has_tags = !tags.empty();
bool has_query = !query.empty();
// Escape query for use in FTS5 MATCH (MATCH doesn't support parameter binding)
std::string escaped_query = escape_sql(query);
if (has_schema) {
sql << " AND schema = ?";
}
if (has_kind) {
sql << " AND kind = ?";
// Build SQL query with FTS5 - include schema column
std::ostringstream sql;
sql << "SELECT c.schema, c.kind, c.key, c.document, c.tags, c.links "
<< "FROM catalog c "
<< "INNER JOIN catalog_fts f ON c.id = f.rowid "
<< "WHERE catalog_fts MATCH '" << escaped_query << "'";
// Add schema filter
if (!schema.empty()) {
sql << " AND c.schema = ?";
}
if (has_tags) {
sql << " AND tags LIKE ?";
// Add kind filter
if (!kind.empty()) {
sql << " AND c.kind = ?";
}
if (has_query) {
sql << " AND (key LIKE ? OR document LIKE ? OR tags LIKE ?)";
// Add tags filter
if (!tags.empty()) {
sql << " AND c.tags LIKE ?";
}
sql << " ORDER BY updated_at DESC LIMIT ? OFFSET ?";
// Order by relevance (BM25) and recency
sql << " ORDER BY bm25(f) ASC, c.updated_at DESC LIMIT ? OFFSET ?";
// Prepare statement
// Prepare the statement
sqlite3_stmt* stmt = NULL;
int rc = db->prepare_v2(sql.str().c_str(), &stmt);
if (rc != SQLITE_OK) {
@ -321,24 +345,16 @@ std::string MySQL_Catalog::search(
// Bind parameters
int param_idx = 1;
if (has_schema) {
std::string schema_pattern = schema;
(*proxy_sqlite3_bind_text)(stmt, param_idx++, schema_pattern.c_str(), -1, SQLITE_TRANSIENT);
if (!schema.empty()) {
(*proxy_sqlite3_bind_text)(stmt, param_idx++, schema.c_str(), -1, SQLITE_TRANSIENT);
}
if (has_kind) {
std::string kind_pattern = kind;
(*proxy_sqlite3_bind_text)(stmt, param_idx++, kind_pattern.c_str(), -1, SQLITE_TRANSIENT);
if (!kind.empty()) {
(*proxy_sqlite3_bind_text)(stmt, param_idx++, kind.c_str(), -1, SQLITE_TRANSIENT);
}
if (has_tags) {
if (!tags.empty()) {
std::string tags_pattern = "%" + tags + "%";
(*proxy_sqlite3_bind_text)(stmt, param_idx++, tags_pattern.c_str(), -1, SQLITE_TRANSIENT);
}
if (has_query) {
std::string query_pattern = "%" + query + "%";
(*proxy_sqlite3_bind_text)(stmt, param_idx++, query_pattern.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, param_idx++, query_pattern.c_str(), -1, SQLITE_TRANSIENT);
(*proxy_sqlite3_bind_text)(stmt, param_idx++, query_pattern.c_str(), -1, SQLITE_TRANSIENT);
}
(*proxy_sqlite3_bind_int)(stmt, param_idx++, limit);
(*proxy_sqlite3_bind_int)(stmt, param_idx++, offset);
@ -349,6 +365,8 @@ std::string MySQL_Catalog::search(
int step_rc;
while ((step_rc = (*proxy_sqlite3_step)(stmt)) == SQLITE_ROW) {
nlohmann::json entry;
// Columns: 0=schema, 1=kind, 2=key, 3=document, 4=tags, 5=links
entry["schema"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 0));
entry["kind"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 1));
entry["key"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 2));
@ -366,8 +384,10 @@ std::string MySQL_Catalog::search(
entry["document"] = nullptr;
}
entry["tags"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 4));
entry["links"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 5));
const char* tags_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 4);
entry["tags"] = tags_str ? std::string(tags_str) : nullptr;
const char* links_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 5);
entry["links"] = links_str ? std::string(links_str) : nullptr;
results.push_back(entry);
}
@ -486,8 +506,10 @@ std::string MySQL_Catalog::list(
entry["document"] = nullptr;
}
entry["tags"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 4));
entry["links"] = std::string((const char*)(*proxy_sqlite3_column_text)(stmt, 5));
const char* tags_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 4);
entry["tags"] = tags_str ? std::string(tags_str) : nullptr;
const char* links_str = (const char*)(*proxy_sqlite3_column_text)(stmt, 5);
entry["links"] = links_str ? std::string(links_str) : nullptr;
results.push_back(entry);
}

@ -216,6 +216,14 @@ ProxySQL_MCP_Server::~ProxySQL_MCP_Server() {
// Clean up all tool handlers stored in the handler object
if (handler) {
// Clean up MySQL Tool Handler
if (handler->mysql_tool_handler) {
proxy_info("Cleaning up MySQL Tool Handler...\n");
delete handler->mysql_tool_handler;
handler->mysql_tool_handler = NULL;
}
// Clean up Config Tool Handler
if (handler->config_tool_handler) {
proxy_info("Cleaning up Config Tool Handler...\n");

Loading…
Cancel
Save