feat(rag): incremental sync with hash-based update and delete detection (Feature D)

Upgrade RAG ingestion from insert-only to full incremental sync:
detect updated rows, re-chunk and re-embed changed documents, and
soft-delete documents no longer present in the source.

Schema change:
- Add content_hash VARCHAR(64) column to rag_documents
- Idempotent ALTER TABLE migration for existing databases

SHA-256 content hashing:
- compute_content_hash() using OpenSSL SHA256 over title|body|metadata
- Hash computed per row, compared against stored hash to detect changes

Update detection (replaces skip-existing):
- Query existing content_hash for each doc_id
- If hash matches: skip (unchanged)
- If hash differs: soft-delete old doc + chunks + FTS + vec entries,
  then insert new version with updated content_hash
- If new doc: insert with content_hash

Delete detection:
- After processing all rows for a source, query all active doc_ids
  for that source_id and compare against seen_doc_ids set
- Documents not seen in this batch are soft-deleted (deleted=1)
  with their chunks, FTS entries, and vector entries cleaned up

Logging:
- Track ingested_docs, updated_docs, skipped_docs separately
- Report soft-deleted count per source
issue-5729-stats-projection-abi
Rene Cannao 2 weeks ago
parent 1fe9c3a805
commit 1bb79135c0

@ -129,6 +129,7 @@
#include <vector>
#include <unordered_map>
#include <optional>
#include <set>
#include <getopt.h>
#include "json.hpp"
@ -477,6 +478,19 @@ static std::string sql_escape_single_quotes(const std::string& s) {
return out;
}
#include <openssl/sha.h>
static std::string compute_content_hash(const std::string& input) {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256(reinterpret_cast<const unsigned char*>(input.c_str()), input.size(), hash);
char hex[SHA256_DIGEST_LENGTH * 2 + 1];
for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) {
snprintf(hex + i * 2, 3, "%02x", hash[i]);
}
return std::string(hex);
}
static std::string json_dump_compact(const json& j) {
return j.dump();
}
@ -1015,7 +1029,8 @@ static void insert_doc(MySQLDB& db,
const std::string& pk_json,
const std::string& title,
const std::string& body,
const std::string& meta_json) {
const std::string& meta_json,
const std::string& content_hash) {
g_logger.debug(std::string("Inserting document: ") + doc_id +
", title_length=" + std::to_string(title.size()) +
", body_length=" + std::to_string(body.size()));
@ -1026,12 +1041,12 @@ static void insert_doc(MySQLDB& db,
std::string e_title = sql_escape_single_quotes(title);
std::string e_body = sql_escape_single_quotes(body);
std::string e_meta = sql_escape_single_quotes(meta_json);
std::string e_hash = sql_escape_single_quotes(content_hash);
// Use std::ostringstream to avoid fixed buffer size issues
std::ostringstream sql;
sql << "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json) "
sql << "INSERT INTO rag_documents(doc_id, source_id, source_name, pk_json, title, body, metadata_json, content_hash) "
<< "VALUES('" << e_doc_id << "', " << source_id << ", '" << e_source_name << "', '"
<< e_pk_json << "', '" << e_title << "', '" << e_body << "', '" << e_meta << "')";
<< e_pk_json << "', '" << e_title << "', '" << e_body << "', '" << e_meta << "', '" << e_hash << "')";
db.execute(sql.str().c_str());
g_logger.trace("Document inserted successfully");
@ -1617,9 +1632,11 @@ static void ingest_source(MySQLDB& db, const RagSource& src) {
std::uint64_t ingested_docs = 0;
std::uint64_t skipped_docs = 0;
std::uint64_t updated_docs = 0;
std::uint64_t total_chunks = 0;
std::uint64_t embedding_batches = 0;
std::vector<PendingEmbedding> pending_embeddings;
std::set<std::string> seen_doc_ids;
MYSQL_ROW r;
bool max_set = false;
@ -1670,14 +1687,47 @@ static void ingest_source(MySQLDB& db, const RagSource& src) {
g_logger.trace(std::string("Processing doc_id=") + doc.doc_id);
if (doc_exists(db, doc.doc_id)) {
g_logger.trace(std::string("Skipping existing doc_id=") + doc.doc_id);
skipped_docs++;
continue;
std::string content_input = doc.title + "|" + doc.body + "|" + doc.metadata_json;
std::string content_hash = compute_content_hash(content_input);
seen_doc_ids.insert(doc.doc_id);
std::string existing_hash;
bool doc_changed = false;
{
std::string escaped_id = sql_escape_single_quotes(doc.doc_id);
std::ostringstream qsql;
qsql << "SELECT content_hash FROM rag_documents WHERE doc_id = '" << escaped_id << "' AND deleted = 0 LIMIT 1";
MYSQL_RES* qres = db.query(qsql.str().c_str());
if (qres) {
MYSQL_ROW qrow = mysql_fetch_row(qres);
if (qrow && qrow[0]) {
existing_hash = qrow[0];
}
mysql_free_result(qres);
}
}
if (!existing_hash.empty()) {
if (existing_hash == content_hash) {
g_logger.trace(std::string("Document unchanged: ") + doc.doc_id);
skipped_docs++;
continue;
}
doc_changed = true;
g_logger.debug(std::string("Document changed: ") + doc.doc_id + " (hash mismatch)");
std::string escaped_id = sql_escape_single_quotes(doc.doc_id);
db.execute(("UPDATE rag_documents SET deleted = 1, updated_at = unixepoch() WHERE doc_id = '" + escaped_id + "'").c_str());
db.execute(("DELETE FROM rag_chunks WHERE doc_id = '" + escaped_id + "'").c_str());
db.execute(("DELETE FROM rag_fts_chunks WHERE chunk_id LIKE '" + escaped_id + "#%'").c_str());
db.execute(("DELETE FROM rag_vec_chunks WHERE doc_id = '" + escaped_id + "'").c_str());
} else {
g_logger.debug(std::string("New document: ") + doc.doc_id);
}
insert_doc(db, src.source_id, src.name,
doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json);
doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json, content_hash);
std::vector<std::string> chunks = chunk_text_chars(doc.body, ccfg);
g_logger.debug(std::string("Created ") + std::to_string(chunks.size()) + " chunks for doc_id=" + doc.doc_id);
@ -1707,9 +1757,14 @@ static void ingest_source(MySQLDB& db, const RagSource& src) {
}
}
ingested_docs++;
if (ingested_docs % 1000 == 0) {
if (doc_changed) {
updated_docs++;
} else {
ingested_docs++;
}
if ((ingested_docs + updated_docs) % 1000 == 0) {
g_logger.info(std::string("Progress: ingested_docs=") + std::to_string(ingested_docs) +
", updated_docs=" + std::to_string(updated_docs) +
", skipped_docs=" + std::to_string(skipped_docs) +
", chunks=" + std::to_string(total_chunks));
}
@ -1724,6 +1779,34 @@ static void ingest_source(MySQLDB& db, const RagSource& src) {
mysql_free_result(res);
mysql_close(mdb);
g_logger.info("Detecting removed documents...");
{
std::ostringstream qsql;
qsql << "SELECT doc_id FROM rag_documents WHERE source_id = " << src.source_id << " AND deleted = 0";
MYSQL_RES* dres = db.query(qsql.str().c_str());
int soft_deleted = 0;
if (dres) {
MYSQL_ROW drow;
while ((drow = mysql_fetch_row(dres)) != nullptr) {
std::string existing_doc_id = drow[0] ? drow[0] : "";
if (!existing_doc_id.empty() && seen_doc_ids.find(existing_doc_id) == seen_doc_ids.end()) {
g_logger.debug(std::string("Document removed from source: ") + existing_doc_id + " (soft deleting)");
std::string escaped_id = sql_escape_single_quotes(existing_doc_id);
db.execute(("UPDATE rag_documents SET deleted = 1, updated_at = unixepoch() WHERE doc_id = '" + escaped_id + "'").c_str());
db.execute(("DELETE FROM rag_chunks WHERE doc_id = '" + escaped_id + "'").c_str());
db.execute(("DELETE FROM rag_fts_chunks WHERE chunk_id LIKE '" + escaped_id + "#%'").c_str());
db.execute(("DELETE FROM rag_vec_chunks WHERE doc_id = '" + escaped_id + "'").c_str());
soft_deleted++;
}
}
mysql_free_result(dres);
}
if (soft_deleted > 0) {
g_logger.info(std::string("Soft-deleted ") + std::to_string(soft_deleted) +
" documents no longer in source '" + src.name + "'");
}
}
g_logger.info("Updating sync state with new watermark...");
if (!cursor_json.is_object()) cursor_json = json::object();
if (!cursor.column.empty()) cursor_json["column"] = cursor.column;
@ -1740,6 +1823,7 @@ static void ingest_source(MySQLDB& db, const RagSource& src) {
g_logger.info(std::string("=== Source ingestion complete: ") + src.name + " ===");
g_logger.info(std::string(" ingested_docs=") + std::to_string(ingested_docs) +
", updated_docs=" + std::to_string(updated_docs) +
", skipped_docs=" + std::to_string(skipped_docs) +
", total_chunks=" + std::to_string(total_chunks));
if (ecfg.enabled) {
@ -1846,11 +1930,14 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) {
" title TEXT,"
" body TEXT,"
" metadata_json TEXT NOT NULL DEFAULT '{}',"
" content_hash VARCHAR(64),"
" updated_at INTEGER NOT NULL DEFAULT (unixepoch()),"
" deleted INTEGER NOT NULL DEFAULT 0"
")"
);
db.execute("ALTER TABLE rag_documents ADD COLUMN content_hash VARCHAR(64)");
db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_updated ON rag_documents(source_id, updated_at)");
db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_deleted ON rag_documents(source_id, deleted)");
g_logger.trace("rag_documents table created");

Loading…
Cancel
Save