From 1bb79135c0eeccb05b564fba075efc35f95b5192 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Mon, 4 May 2026 03:20:15 +0000 Subject: [PATCH] feat(rag): incremental sync with hash-based update and delete detection (Feature D) Upgrade RAG ingestion from insert-only to full incremental sync: detect updated rows, re-chunk and re-embed changed documents, and soft-delete documents no longer present in the source. Schema change: - Add content_hash VARCHAR(64) column to rag_documents - Idempotent ALTER TABLE migration for existing databases SHA-256 content hashing: - compute_content_hash() using OpenSSL SHA256 over title|body|metadata - Hash computed per row, compared against stored hash to detect changes Update detection (replaces skip-existing): - Query existing content_hash for each doc_id - If hash matches: skip (unchanged) - If hash differs: soft-delete old doc + chunks + FTS + vec entries, then insert new version with updated content_hash - If new doc: insert with content_hash Delete detection: - After processing all rows for a source, query all active doc_ids for that source_id and compare against seen_doc_ids set - Documents not seen in this batch are soft-deleted (deleted=1) with their chunks, FTS entries, and vector entries cleaned up Logging: - Track ingested_docs, updated_docs, skipped_docs separately - Report soft-deleted count per source --- RAG_POC/rag_ingest.cpp | 109 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 98 insertions(+), 11 deletions(-) diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index fd64ae173..99daadea6 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -129,6 +129,7 @@ #include #include #include +#include #include #include "json.hpp" @@ -477,6 +478,19 @@ static std::string sql_escape_single_quotes(const std::string& s) { return out; } +#include + +static std::string compute_content_hash(const std::string& input) { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256(reinterpret_cast(input.c_str()), input.size(), hash); + + char hex[SHA256_DIGEST_LENGTH * 2 + 1]; + for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) { + snprintf(hex + i * 2, 3, "%02x", hash[i]); + } + return std::string(hex); +} + static std::string json_dump_compact(const json& j) { return j.dump(); } @@ -1015,7 +1029,8 @@ static void insert_doc(MySQLDB& db, const std::string& pk_json, const std::string& title, const std::string& body, - const std::string& meta_json) { + const std::string& meta_json, + const std::string& content_hash) { g_logger.debug(std::string("Inserting document: ") + doc_id + ", title_length=" + std::to_string(title.size()) + ", body_length=" + std::to_string(body.size())); @@ -1026,12 +1041,12 @@ static void insert_doc(MySQLDB& db, std::string e_title = sql_escape_single_quotes(title); std::string e_body = sql_escape_single_quotes(body); std::string e_meta = sql_escape_single_quotes(meta_json); + std::string e_hash = sql_escape_single_quotes(content_hash); - // Use std::ostringstream to avoid fixed buffer size issues std::ostringstream sql; - sql << "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json) " + sql << "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json, content_hash) " << "VALUES('" << e_doc_id << "', " << source_id << ", '" << e_source_name << "', '" - << e_pk_json << "', '" << e_title << "', '" << e_body << "', '" << e_meta << "')"; + << e_pk_json << "', '" << e_title << "', '" << e_body << "', '" << e_meta << "', '" << e_hash << "')"; db.execute(sql.str().c_str()); g_logger.trace("Document inserted successfully"); @@ -1617,9 +1632,11 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { std::uint64_t ingested_docs = 0; std::uint64_t skipped_docs = 0; + std::uint64_t updated_docs = 0; std::uint64_t total_chunks = 0; std::uint64_t embedding_batches = 0; std::vector pending_embeddings; + std::set seen_doc_ids; MYSQL_ROW r; bool max_set = false; @@ -1670,14 +1687,47 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { g_logger.trace(std::string("Processing doc_id=") + doc.doc_id); - if (doc_exists(db, doc.doc_id)) { - g_logger.trace(std::string("Skipping existing doc_id=") + doc.doc_id); - skipped_docs++; - continue; + std::string content_input = doc.title + "|" + doc.body + "|" + doc.metadata_json; + std::string content_hash = compute_content_hash(content_input); + + seen_doc_ids.insert(doc.doc_id); + + std::string existing_hash; + bool doc_changed = false; + { + std::string escaped_id = sql_escape_single_quotes(doc.doc_id); + std::ostringstream qsql; + qsql << "SELECT content_hash FROM rag_documents WHERE doc_id = '" << escaped_id << "' AND deleted = 0 LIMIT 1"; + MYSQL_RES* qres = db.query(qsql.str().c_str()); + if (qres) { + MYSQL_ROW qrow = mysql_fetch_row(qres); + if (qrow && qrow[0]) { + existing_hash = qrow[0]; + } + mysql_free_result(qres); + } + } + + if (!existing_hash.empty()) { + if (existing_hash == content_hash) { + g_logger.trace(std::string("Document unchanged: ") + doc.doc_id); + skipped_docs++; + continue; + } + doc_changed = true; + g_logger.debug(std::string("Document changed: ") + doc.doc_id + " (hash mismatch)"); + + std::string escaped_id = sql_escape_single_quotes(doc.doc_id); + db.execute(("UPDATE rag_documents SET deleted = 1, updated_at = unixepoch() WHERE doc_id = '" + escaped_id + "'").c_str()); + db.execute(("DELETE FROM rag_chunks WHERE doc_id = '" + escaped_id + "'").c_str()); + db.execute(("DELETE FROM rag_fts_chunks WHERE chunk_id LIKE '" + escaped_id + "#%'").c_str()); + db.execute(("DELETE FROM rag_vec_chunks WHERE doc_id = '" + escaped_id + "'").c_str()); + } else { + g_logger.debug(std::string("New document: ") + doc.doc_id); } insert_doc(db, src.source_id, src.name, - doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json); + doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json, content_hash); std::vector chunks = chunk_text_chars(doc.body, ccfg); g_logger.debug(std::string("Created ") + std::to_string(chunks.size()) + " chunks for doc_id=" + doc.doc_id); @@ -1707,9 +1757,14 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { } } - ingested_docs++; - if (ingested_docs % 1000 == 0) { + if (doc_changed) { + updated_docs++; + } else { + ingested_docs++; + } + if ((ingested_docs + updated_docs) % 1000 == 0) { g_logger.info(std::string("Progress: ingested_docs=") + std::to_string(ingested_docs) + + ", updated_docs=" + std::to_string(updated_docs) + ", skipped_docs=" + std::to_string(skipped_docs) + ", chunks=" + std::to_string(total_chunks)); } @@ -1724,6 +1779,34 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { mysql_free_result(res); mysql_close(mdb); + g_logger.info("Detecting removed documents..."); + { + std::ostringstream qsql; + qsql << "SELECT doc_id FROM rag_documents WHERE source_id = " << src.source_id << " AND deleted = 0"; + MYSQL_RES* dres = db.query(qsql.str().c_str()); + int soft_deleted = 0; + if (dres) { + MYSQL_ROW drow; + while ((drow = mysql_fetch_row(dres)) != nullptr) { + std::string existing_doc_id = drow[0] ? drow[0] : ""; + if (!existing_doc_id.empty() && seen_doc_ids.find(existing_doc_id) == seen_doc_ids.end()) { + g_logger.debug(std::string("Document removed from source: ") + existing_doc_id + " (soft deleting)"); + std::string escaped_id = sql_escape_single_quotes(existing_doc_id); + db.execute(("UPDATE rag_documents SET deleted = 1, updated_at = unixepoch() WHERE doc_id = '" + escaped_id + "'").c_str()); + db.execute(("DELETE FROM rag_chunks WHERE doc_id = '" + escaped_id + "'").c_str()); + db.execute(("DELETE FROM rag_fts_chunks WHERE chunk_id LIKE '" + escaped_id + "#%'").c_str()); + db.execute(("DELETE FROM rag_vec_chunks WHERE doc_id = '" + escaped_id + "'").c_str()); + soft_deleted++; + } + } + mysql_free_result(dres); + } + if (soft_deleted > 0) { + g_logger.info(std::string("Soft-deleted ") + std::to_string(soft_deleted) + + " documents no longer in source '" + src.name + "'"); + } + } + g_logger.info("Updating sync state with new watermark..."); if (!cursor_json.is_object()) cursor_json = json::object(); if (!cursor.column.empty()) cursor_json["column"] = cursor.column; @@ -1740,6 +1823,7 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { g_logger.info(std::string("=== Source ingestion complete: ") + src.name + " ==="); g_logger.info(std::string(" ingested_docs=") + std::to_string(ingested_docs) + + ", updated_docs=" + std::to_string(updated_docs) + ", skipped_docs=" + std::to_string(skipped_docs) + ", total_chunks=" + std::to_string(total_chunks)); if (ecfg.enabled) { @@ -1846,11 +1930,14 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { " title TEXT," " body TEXT," " metadata_json TEXT NOT NULL DEFAULT '{}'," + " content_hash VARCHAR(64)," " updated_at INTEGER NOT NULL DEFAULT (unixepoch())," " deleted INTEGER NOT NULL DEFAULT 0" ")" ); + db.execute("ALTER TABLE rag_documents ADD COLUMN content_hash VARCHAR(64)"); + db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_updated ON rag_documents(source_id, updated_at)"); db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_deleted ON rag_documents(source_id, deleted)"); g_logger.trace("rag_documents table created");