diff --git a/RAG_POC/INGEST_USAGE_GUIDE.md b/RAG_POC/INGEST_USAGE_GUIDE.md index 8e12c0894..60b99cd7a 100644 --- a/RAG_POC/INGEST_USAGE_GUIDE.md +++ b/RAG_POC/INGEST_USAGE_GUIDE.md @@ -23,6 +23,9 @@ mysql -h 127.0.0.1 -P 6030 -u root -proot rag_db < setup_source.sql # 4. Run ingestion ./rag_ingest ingest --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db + +# 5. For detailed logging (optional) +./rag_ingest ingest --log-level=debug --host=127.0.0.1 -P 6030 -u root -p root -D rag_db ``` --- @@ -103,6 +106,42 @@ INSERT INTO rag_sources ( ## Command-Line Options +### Logging + +Control log verbosity with `--log-level` (available for all commands): + +```bash +--log-level=LEVEL +``` + +| Level | Output | Use Case | +|-------|--------|----------| +| `error` | Only errors | Production scripts, minimal logging | +| `warn` | Warnings + errors | Detect issues without verbose output | +| `info` | **Default** | Progress, statistics, key events | +| `debug` | Detailed info | SQL queries, configuration values, diagnostics | +| `trace` | Everything | Fine-grained function entry/exit, development | + +**Examples:** +```bash +# Minimal output (errors only) +./rag_ingest ingest --log-level=error --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db + +# Default (info level) +./rag_ingest ingest --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db + +# Detailed debugging +./rag_ingest ingest --log-level=debug --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db + +# Maximum verbosity +./rag_ingest ingest --log-level=trace --host=127.0.0.1 --port=6030 --user=root --password=root --database=rag_db +``` + +**Output Format:** +- Timestamps: `[YYYY-MM-DD HH:MM:SS]` +- Log levels: `[ERROR]`, `[WARN]`, `[INFO]`, `[DEBUG]`, `[TRACE]` +- Color-coded (ANSI colors for terminal output) + ### init Initialize database schema. @@ -110,14 +149,19 @@ Initialize database schema. ```bash ./rag_ingest init [OPTIONS] -Options: +Common Options: -h, --host=name Connect to host (default: 127.0.0.1) -P, --port=# Port number to use (default: 6030) -u, --user=name User for login -p, --password=name Password to use -D, --database=name Database to use (required) - --vec-dim=# Vector dimension for rag_vec_chunks table (default: 1536) -?, --help Show this help message + +Logging Options: + --log-level=LEVEL Log verbosity: error, warn, info, debug, trace (default: info) + +Init Options: + --vec-dim=# Vector dimension for rag_vec_chunks table (default: 1536) ``` ### ingest @@ -127,13 +171,40 @@ Run ingestion from configured sources. ```bash ./rag_ingest ingest [OPTIONS] -Options: +Common Options: -h, --host=name Connect to host (default: 127.0.0.1) -P, --port=# Port number to use (default: 6030) -u, --user=name User for login -p, --password=name Password to use -D, --database=name Database to use (required) -?, --help Show this help message + +Logging Options: + --log-level=LEVEL Log verbosity: error, warn, info, debug, trace (default: info) +``` + +### query + +Vector similarity search using embeddings. + +```bash +./rag_ingest query [OPTIONS] + +Common Options: + -h, --host=name Connect to host (default: 127.0.0.1) + -P, --port=# Port number to use (default: 6030) + -u, --user=name User for login + -p, --password=name Password to use + -D, --database=name Database to use (required) + -?, --help Show this help message + +Logging Options: + --log-level=LEVEL Log verbosity: error, warn, info, debug, trace (default: info) + +Query Options: + -t, --text=text Query text to search for (required) + -s, --source-id=# Source ID to search (default: all enabled sources) + -l, --limit=# Maximum results to return (default: 5) ``` --- @@ -372,18 +443,121 @@ The tool tracks the last processed primary key value in `rag_sync_state`. Subseq --- +## Transaction Handling + +### Per-Source Commits + +Each data source is processed in its own transaction: + +```text +Source 1: BEGIN IMMEDIATE → ingest data → COMMIT ✅ +Source 2: BEGIN IMMEDIATE → ingest data → ROLLBACK ❌ (error occurred) +Source 3: BEGIN IMMEDIATE → ingest data → COMMIT ✅ +``` + +**Benefits:** +- **Isolated failures**: If source 2 fails, sources 1 and 3 are still committed +- **Shorter locks**: Each table is only locked during its own ingestion +- **Better recovery**: Partial progress is preserved on failures +- **Lower memory**: Changes are flushed per source instead of held until end + +### Transaction Logging + +```bash +./rag_ingest ingest -h 127.0.0.1 -P 6030 -u root -p root -D rag_index +# Output: +[INFO] Processing source 1 of 3 +[DEBUG] Starting transaction for source 1... +[INFO] Committing source 1... + +[INFO] Processing source 2 of 3 +[DEBUG] Starting transaction for source 2... +[WARN] Rolling back source 2 due to errors + +[INFO] Processing source 3 of 3 +[DEBUG] Starting transaction for source 3... +[INFO] Committing source 3... + +[INFO] === 'ingest' command complete === + Succeeded: 2 + Failed: 1 +``` + +### Multiple Sources Example + +```sql +-- Configure multiple sources +INSERT INTO rag_sources (name, enabled, backend_type, ...) +VALUES + ('stack_overflow', 1, 'mysql', '127.0.0.1', 3306, ...), + ('github_issues', 1, 'mysql', '127.0.0.1', 3306, ...), + ('discussions', 1, 'mysql', '127.0.0.1', 3306, ...); +``` + +If `github_issues` fails (e.g., connection timeout), the other two sources are still ingested successfully. + +--- + ## Monitoring Progress +### Default Logging (INFO level) + ```bash -# Progress is printed to stderr ./rag_ingest ingest -h 127.0.0.1 -P 6030 -u root -p root -D rag_index # Output: -# Ingesting source_id=1 name=my_source backend=mysql table=posts -# progress: ingested_docs=1000 skipped_docs=50 -# progress: ingested_docs=2000 skipped_docs=100 -# Done source my_source ingested_docs=2500 skipped_docs=120 +[2026-01-28 12:34:56] [INFO] === RAG Ingestion Tool Starting === +[2026-01-28 12:34:56] [INFO] Loaded 1 enabled source(s) +[2026-01-28 12:34:57] [INFO] === Starting ingestion for source_id=1, name=my_source === +[2026-01-28 12:34:58] [INFO] Backend query returned 10000 row(s) to process +[2026-01-28 12:35:00] [INFO] Progress: ingested_docs=1000, skipped_docs=50, chunks=4000 +[2026-01-28 12:35:02] [INFO] Progress: ingested_docs=2000, skipped_docs=100, chunks=8000 +[2026-01-28 12:35:10] [INFO] === Source ingestion complete: my_source === +[2026-01-28 12:35:10] [INFO] ingested_docs=9850, skipped_docs=150, total_chunks=39400 +[2026-01-28 12:35:10] [INFO] embedding_batches=2463 +``` + +### Detailed Logging (DEBUG level) + +```bash +./rag_ingest ingest --log-level=debug -h 127.0.0.1 -P 6030 -u root -p root -D rag_index +# Output includes: +# - Connection parameters +# - SQL queries executed +# - Configuration parsing (chunking, embeddings) +# - Per-document operations +# - Chunk counts per document +# - Embedding batch operations +# - Sync state updates ``` +### Maximum Verbosity (TRACE level) + +```bash +./rag_ingest ingest --log-level=trace -h 127.0.0.1 -P 6030 -u root -p root -D rag_index +# Output includes EVERYTHING: +# - Function entry/exit +# - Individual SQL statement execution +# - Per-chunk operations +# - Internal state changes +``` + +### Progress Indicators + +| Interval | Trigger | Output | +|----------|---------|--------| +| Per-command | Start/end | `=== RAG Ingestion Tool Starting ===` | +| Per-source | Start/end | `=== Starting ingestion for source_id=X, name=Y ===` | +| Every 1000 docs | During processing | `Progress: ingested_docs=1000, skipped_docs=50, chunks=4000` | +| Per-batch | Embeddings | `Generating embeddings for batch of 16 chunks...` | +| End of source | Summary | `ingested_docs=9850, skipped_docs=150, total_chunks=39400` | + +### Understanding the Output + +- **ingested_docs**: New documents added to the index +- **skipped_docs**: Documents already in the index (not re-processed) +- **total_chunks**: Total chunks created across all ingested documents +- **embedding_batches**: Number of embedding API calls made (for embedding-enabled sources) + --- ## Verification @@ -506,6 +680,42 @@ DELETE FROM rag_vec_chunks WHERE source_id=1; - Check network connectivity to embedding service - Increase `timeout_ms` if needed +### Too much / too little output + +- Use `--log-level=error` for production scripts (minimal output) +- Use `--log-level=info` for normal operation (default) +- Use `--log-level=debug` to see SQL queries and configuration values +- Use `--log-level=trace` for development and deep troubleshooting + +### Debugging SQL queries + +```bash +# Use --log-level=debug to see all SQL queries being executed +./rag_ingest ingest --log-level=debug -h 127.0.0.1 -P 6030 -u root -p root -D rag_db +# Output will include: +# - SELECT queries to rag_sources, rag_sync_state, rag_documents +# - INSERT statements for documents, chunks, FTS entries +# - Backend SELECT query being built and executed +``` + +### Checking configuration values + +```bash +# Use --log-level=debug to see parsed configuration +./rag_ingest ingest --log-level=debug -h 127.0.0.1 -P 6030 -u root -p root -D rag_db +# Output includes: +# - Chunking config: enabled=yes, chunk_size=4000, overlap=400 +# - Embedding config: enabled=yes, provider=openai, model=text-embedding-3-small +# - Watermark/resync values +``` + +### Performance issues + +- Use `--log-level=debug` to see embedding batch operations +- Check `embedding_batches` count in final summary +- Reduce `batch_size` in `embedding_json` if API timeouts occur +- Increase `timeout_ms` for slower embedding services + --- ## Architecture Notes diff --git a/RAG_POC/rag_ingest.cpp b/RAG_POC/rag_ingest.cpp index c63028317..fd64ae173 100644 --- a/RAG_POC/rag_ingest.cpp +++ b/RAG_POC/rag_ingest.cpp @@ -120,6 +120,9 @@ #include #include #include +#include +#include +#include #include #include @@ -131,6 +134,107 @@ #include "json.hpp" using json = nlohmann::json; +// =========================================================================== +// Logging Infrastructure +// =========================================================================== + +/** + * @brief Detailed logging system for rag_ingest with timestamp and log levels + * + * Log Levels: + * - ERROR: Critical errors that prevent operation + * - WARN: Warning messages for non-critical issues + * - INFO: Informational messages about operation progress + * - DEBUG: Detailed debugging information + * - TRACE: Very fine-grained tracing (function entry/exit, etc.) + */ +enum class LogLevel { + ERROR, + WARN, + INFO, + DEBUG, + TRACE +}; + +struct Logger { + LogLevel min_level = LogLevel::INFO; + bool use_colors = true; + bool show_timestamp = true; + bool show_level = true; + + // ANSI color codes + static const char* color_reset() { return "\033[0m"; } + static const char* color_red() { return "\033[31m"; } + static const char* color_yellow() { return "\033[33m"; } + static const char* color_green() { return "\033[32m"; } + static const char* color_cyan() { return "\033[36m"; } + static const char* color_gray() { return "\033[90m"; } + + static const char* level_string(LogLevel level) { + switch (level) { + case LogLevel::ERROR: return "ERROR"; + case LogLevel::WARN: return "WARN"; + case LogLevel::INFO: return "INFO"; + case LogLevel::DEBUG: return "DEBUG"; + case LogLevel::TRACE: return "TRACE"; + } + return "UNKNOWN"; + } + + static const char* level_color(LogLevel level) { + switch (level) { + case LogLevel::ERROR: return color_red(); + case LogLevel::WARN: return color_yellow(); + case LogLevel::INFO: return color_green(); + case LogLevel::DEBUG: return color_cyan(); + case LogLevel::TRACE: return color_gray(); + } + return color_reset(); + } + + bool should_log(LogLevel level) const { + return level <= min_level; + } + + void log(LogLevel level, const std::string& msg) { + if (!should_log(level)) return; + + std::ostream& out = (level == LogLevel::ERROR || level == LogLevel::WARN) ? std::cerr : std::cout; + + if (use_colors) out << level_color(level); + + if (show_timestamp) { + auto now = std::chrono::system_clock::now(); + auto time = std::chrono::system_clock::to_time_t(now); + char time_buf[64]; + struct tm timeinfo; + localtime_r(&time, &timeinfo); + std::strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", &timeinfo); + out << "[" << time_buf << "] "; + } + + if (show_level) { + out << "[" << level_string(level) << "] "; + } + + out << msg; + + if (use_colors) out << color_reset(); + + out << "\n"; + out.flush(); + } + + void trace(const std::string& msg) { log(LogLevel::TRACE, msg); } + void debug(const std::string& msg) { log(LogLevel::DEBUG, msg); } + void info(const std::string& msg) { log(LogLevel::INFO, msg); } + void warn(const std::string& msg) { log(LogLevel::WARN, msg); } + void error(const std::string& msg) { log(LogLevel::ERROR, msg); } +}; + +// Global logger instance +static Logger g_logger; + // =========================================================================== // Utility Functions // =========================================================================== @@ -202,8 +306,15 @@ struct MySQLDB { * sqlite_master table. If not, logs an error and exits. */ void verify_sqlite_server() { + g_logger.info("Verifying SQLite Server connection..."); + // Try to query sqlite_master - this will only work on SQLite Server - if (mysql_query(conn, "SELECT name FROM sqlite_master LIMIT 1") != 0) { + const char* verify_sql = "SELECT name FROM sqlite_master LIMIT 1"; + g_logger.debug(std::string("Executing verification query: ") + verify_sql); + + if (mysql_query(conn, verify_sql) != 0) { + g_logger.error("SQLite Server verification failed"); + std::cerr << "\n" << "========================================\n" << "ERROR: Not connected to SQLite Server!\n" @@ -228,8 +339,13 @@ struct MySQLDB { // Free the result from the verification query MYSQL_RES* res = mysql_store_result(conn); if (res) { + my_ulonglong rows = mysql_num_rows(res); + g_logger.debug("SQLite Server verification: sqlite_master query returned " + + std::to_string(rows) + " row(s)"); mysql_free_result(res); } + + g_logger.info("SQLite Server verification successful"); } /** @@ -242,15 +358,31 @@ struct MySQLDB { */ void connect(const char* host, int port, const char* user, const char* pass, const char* db) { + g_logger.info("Connecting to SQLite Server (MySQL protocol gateway)..."); + g_logger.debug(std::string("Connection params: host=") + host + + ", port=" + std::to_string(port) + + ", user=" + user + + ", database=" + db); + conn = mysql_init(nullptr); - if (!conn) fatal("mysql_init failed"); + if (!conn) { + g_logger.error("mysql_init failed: out of memory"); + fatal("mysql_init failed"); + } + g_logger.trace("mysql_init successful"); mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4"); + g_logger.trace("Set charset to utf8mb4"); if (!mysql_real_connect(conn, host, user, pass, db, port, nullptr, 0)) { + g_logger.error(std::string("MySQL connect failed: ") + mysql_error(conn)); fatal(std::string("MySQL connect failed: ") + mysql_error(conn)); } + g_logger.info("Connected to SQLite Server successfully"); + g_logger.debug(std::string("Server info: ") + mysql_get_server_info(conn)); + g_logger.debug(std::string("Host info: ") + mysql_get_host_info(conn)); + // Verify we're connected to SQLite Server verify_sqlite_server(); } @@ -260,10 +392,14 @@ struct MySQLDB { * @param sql SQL statement to execute */ void execute(const char* sql) { + g_logger.trace(std::string("Executing SQL: ") + sql); if (mysql_query(conn, sql) != 0) { + g_logger.error(std::string("MySQL error: ") + mysql_error(conn)); + g_logger.error(std::string("Failed SQL: ") + sql); std::cerr << "MySQL error: " << mysql_error(conn) << "\nSQL: " << sql << "\n"; fatal("Query failed"); } + g_logger.trace("SQL executed successfully"); } /** @@ -272,9 +408,12 @@ struct MySQLDB { * @return true if successful, false otherwise */ bool try_execute(const char* sql) { + g_logger.trace(std::string("Trying SQL: ") + sql); if (mysql_query(conn, sql) != 0) { + g_logger.debug(std::string("SQL failed (expected): ") + mysql_error(conn)); return false; } + g_logger.trace("SQL executed successfully"); return true; } @@ -284,13 +423,18 @@ struct MySQLDB { * @return MYSQL_RES* Result set (caller must free with mysql_free_result) */ MYSQL_RES* query(const char* sql) { + g_logger.debug(std::string("Executing query: ") + sql); if (mysql_query(conn, sql) != 0) { + g_logger.error(std::string("MySQL query failed: ") + mysql_error(conn)); fatal(std::string("MySQL query failed: ") + mysql_error(conn) + "\nSQL: " + sql); } MYSQL_RES* res = mysql_store_result(conn); if (!res) { + g_logger.error(std::string("mysql_store_result failed: ") + mysql_error(conn)); fatal(std::string("mysql_store_result failed: ") + mysql_error(conn)); } + my_ulonglong rows = mysql_num_rows(res); + g_logger.debug(std::string("Query returned ") + std::to_string(rows) + " row(s)"); return res; } }; @@ -404,8 +548,12 @@ struct PendingEmbedding { // =========================================================================== static ChunkingConfig parse_chunking_json(const json& j) { + g_logger.trace("Parsing chunking_json configuration"); ChunkingConfig cfg; - if (!j.is_object()) return cfg; + if (!j.is_object()) { + g_logger.debug("chunking_json is not an object, using defaults"); + return cfg; + } if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); if (j.contains("unit")) cfg.unit = j["unit"].get(); @@ -413,23 +561,46 @@ static ChunkingConfig parse_chunking_json(const json& j) { if (j.contains("overlap")) cfg.overlap = j["overlap"].get(); if (j.contains("min_chunk_size")) cfg.min_chunk_size = j["min_chunk_size"].get(); - if (cfg.chunk_size <= 0) cfg.chunk_size = 4000; - if (cfg.overlap < 0) cfg.overlap = 0; - if (cfg.overlap >= cfg.chunk_size) cfg.overlap = cfg.chunk_size / 4; - if (cfg.min_chunk_size < 0) cfg.min_chunk_size = 0; + // Validate and sanitize + if (cfg.chunk_size <= 0) { + g_logger.debug("chunk_size <= 0, using default 4000"); + cfg.chunk_size = 4000; + } + if (cfg.overlap < 0) { + g_logger.debug("overlap < 0, setting to 0"); + cfg.overlap = 0; + } + if (cfg.overlap >= cfg.chunk_size) { + g_logger.warn("overlap >= chunk_size, reducing to chunk_size/4"); + cfg.overlap = cfg.chunk_size / 4; + } + if (cfg.min_chunk_size < 0) { + g_logger.debug("min_chunk_size < 0, setting to 0"); + cfg.min_chunk_size = 0; + } if (cfg.unit != "chars") { - std::cerr << "WARN: chunking_json.unit=" << cfg.unit - << " not supported in v0. Falling back to chars.\n"; + g_logger.warn(std::string("chunking_json.unit=") + cfg.unit + + " not supported, falling back to 'chars'"); cfg.unit = "chars"; } + g_logger.debug(std::string("Chunking config: enabled=") + (cfg.enabled ? "yes" : "no") + + ", unit=" + cfg.unit + + ", chunk_size=" + std::to_string(cfg.chunk_size) + + ", overlap=" + std::to_string(cfg.overlap) + + ", min_chunk_size=" + std::to_string(cfg.min_chunk_size)); + return cfg; } static EmbeddingConfig parse_embedding_json(const json& j) { + g_logger.trace("Parsing embedding_json configuration"); EmbeddingConfig cfg; - if (!j.is_object()) return cfg; + if (!j.is_object()) { + g_logger.debug("embedding_json is not an object, using defaults"); + return cfg; + } if (j.contains("enabled")) cfg.enabled = j["enabled"].get(); if (j.contains("dim")) cfg.dim = j["dim"].get(); @@ -441,9 +612,27 @@ static EmbeddingConfig parse_embedding_json(const json& j) { if (j.contains("batch_size")) cfg.batch_size = j["batch_size"].get(); if (j.contains("timeout_ms")) cfg.timeout_ms = j["timeout_ms"].get(); - if (cfg.dim <= 0) cfg.dim = 1536; - if (cfg.batch_size <= 0) cfg.batch_size = 16; - if (cfg.timeout_ms <= 0) cfg.timeout_ms = 20000; + // Validate and sanitize + if (cfg.dim <= 0) { + g_logger.debug("dim <= 0, using default 1536"); + cfg.dim = 1536; + } + if (cfg.batch_size <= 0) { + g_logger.debug("batch_size <= 0, using default 16"); + cfg.batch_size = 16; + } + if (cfg.timeout_ms <= 0) { + g_logger.debug("timeout_ms <= 0, using default 20000"); + cfg.timeout_ms = 20000; + } + + g_logger.debug(std::string("Embedding config: enabled=") + (cfg.enabled ? "yes" : "no") + + ", provider=" + cfg.provider + + ", model=" + cfg.model + + ", dim=" + std::to_string(cfg.dim) + + ", batch_size=" + std::to_string(cfg.batch_size) + + ", timeout_ms=" + std::to_string(cfg.timeout_ms)); + return cfg; } @@ -554,20 +743,32 @@ static json build_metadata(const json& meta_spec, const RowMap& row) { // =========================================================================== static std::vector chunk_text_chars(const std::string& text, const ChunkingConfig& cfg) { + g_logger.trace(std::string("chunk_text_chars: text_size=") + std::to_string(text.size()) + + ", enabled=" + (cfg.enabled ? "yes" : "no") + + ", chunk_size=" + std::to_string(cfg.chunk_size)); + std::vector chunks; if (!cfg.enabled) { + g_logger.trace("Chunking disabled, using single chunk"); chunks.push_back(text); return chunks; } if ((int)text.size() <= cfg.chunk_size) { + g_logger.trace("Text size <= chunk_size, using single chunk"); chunks.push_back(text); return chunks; } int step = cfg.chunk_size - cfg.overlap; - if (step <= 0) step = cfg.chunk_size; + if (step <= 0) { + g_logger.debug("step <= 0, setting to chunk_size"); + step = cfg.chunk_size; + } + + g_logger.debug(std::string("Chunking with step=") + std::to_string(step) + + ", expected_chunks=" + std::to_string((text.size() + step - 1) / step)); for (int start = 0; start < (int)text.size(); start += step) { int end = start + cfg.chunk_size; @@ -576,15 +777,21 @@ static std::vector chunk_text_chars(const std::string& text, const if (len <= 0) break; if (len < cfg.min_chunk_size && !chunks.empty()) { + g_logger.trace(std::string("Final chunk too small (") + std::to_string(len) + + " < " + std::to_string(cfg.min_chunk_size) + "), appending to previous"); chunks.back() += text.substr(start, len); break; } chunks.push_back(text.substr(start, len)); + g_logger.trace(std::string("Created chunk ") + std::to_string(chunks.size()) + + ": start=" + std::to_string(start) + + ", len=" + std::to_string(len)); if (end == (int)text.size()) break; } + g_logger.debug(std::string("Created ") + std::to_string(chunks.size()) + " chunks"); return chunks; } @@ -593,8 +800,18 @@ static std::vector chunk_text_chars(const std::string& text, const // =========================================================================== static MYSQL* mysql_connect_or_die(const RagSource& s) { + g_logger.info(std::string("Connecting to backend MySQL: ") + s.host + ":" + std::to_string(s.port) + + ", db=" + s.db + ", user=" + s.user); + g_logger.debug(std::string("Backend connection params: host=") + s.host + + ", port=" + std::to_string(s.port) + + ", user=" + s.user + + ", db=" + s.db); + MYSQL* conn = mysql_init(nullptr); - if (!conn) fatal("mysql_init failed"); + if (!conn) { + g_logger.error("Backend mysql_init failed: out of memory"); + fatal("mysql_init failed"); + } mysql_options(conn, MYSQL_SET_CHARSET_NAME, "utf8mb4"); @@ -607,9 +824,13 @@ static MYSQL* mysql_connect_or_die(const RagSource& s) { nullptr, 0)) { std::string err = mysql_error(conn); + g_logger.error(std::string("Backend MySQL connect failed: ") + err); mysql_close(conn); fatal("MySQL connect failed: " + err); } + + g_logger.info("Connected to backend MySQL successfully"); + g_logger.debug(std::string("Backend server info: ") + mysql_get_server_info(conn)); return conn; } @@ -774,6 +995,7 @@ static void update_sync_state(MySQLDB& db, int source_id, const json& cursor_jso // =========================================================================== static bool doc_exists(MySQLDB& db, const std::string& doc_id) { + g_logger.trace(std::string("Checking if doc exists: ") + doc_id); std::string escaped_id = sql_escape_single_quotes(doc_id); std::ostringstream sql; sql << "SELECT 1 FROM rag_documents WHERE doc_id = '" << escaped_id << "' LIMIT 1"; @@ -782,6 +1004,7 @@ static bool doc_exists(MySQLDB& db, const std::string& doc_id) { my_ulonglong rows = mysql_num_rows(res); mysql_free_result(res); + g_logger.trace(std::string("doc_exists: ") + doc_id + " -> " + (rows > 0 ? "true" : "false")); return rows > 0; } @@ -793,6 +1016,10 @@ static void insert_doc(MySQLDB& db, const std::string& title, const std::string& body, const std::string& meta_json) { + g_logger.debug(std::string("Inserting document: ") + doc_id + + ", title_length=" + std::to_string(title.size()) + + ", body_length=" + std::to_string(body.size())); + std::string e_doc_id = sql_escape_single_quotes(doc_id); std::string e_source_name = sql_escape_single_quotes(source_name); std::string e_pk_json = sql_escape_single_quotes(pk_json); @@ -807,6 +1034,7 @@ static void insert_doc(MySQLDB& db, << e_pk_json << "', '" << e_title << "', '" << e_body << "', '" << e_meta << "')"; db.execute(sql.str().c_str()); + g_logger.trace("Document inserted successfully"); } static void insert_chunk(MySQLDB& db, @@ -817,6 +1045,10 @@ static void insert_chunk(MySQLDB& db, const std::string& title, const std::string& body, const std::string& meta_json) { + g_logger.trace(std::string("Inserting chunk: ") + chunk_id + + ", chunk_index=" + std::to_string(chunk_index) + + ", body_length=" + std::to_string(body.size())); + std::string e_chunk_id = sql_escape_single_quotes(chunk_id); std::string e_doc_id = sql_escape_single_quotes(doc_id); std::string e_title = sql_escape_single_quotes(title); @@ -836,6 +1068,7 @@ static void insert_fts(MySQLDB& db, const std::string& chunk_id, const std::string& title, const std::string& body) { + g_logger.trace(std::string("Inserting FTS entry: ") + chunk_id); std::string e_chunk_id = sql_escape_single_quotes(chunk_id); std::string e_title = sql_escape_single_quotes(title); std::string e_body = sql_escape_single_quotes(body); @@ -966,9 +1199,20 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { : api_base(std::move(base)), api_key(std::move(key)), model(std::move(mdl)), timeout_ms(timeout) {} std::vector> embed(const std::vector& inputs, int dim) override { - if (api_base.empty()) throw std::runtime_error("embedding api_base is empty"); - if (api_key.empty()) throw std::runtime_error("embedding api_key is empty"); - if (model.empty()) throw std::runtime_error("embedding model is empty"); + g_logger.info(std::string("OpenAI embed: processing ") + std::to_string(inputs.size()) + " inputs"); + + if (api_base.empty()) { + g_logger.error("embedding api_base is empty"); + throw std::runtime_error("embedding api_base is empty"); + } + if (api_key.empty()) { + g_logger.error("embedding api_key is empty"); + throw std::runtime_error("embedding api_key is empty"); + } + if (model.empty()) { + g_logger.error("embedding model is empty"); + throw std::runtime_error("embedding model is empty"); + } json req; req["model"] = model; @@ -980,7 +1224,10 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { if (!url.empty() && url.back() == '/') url.pop_back(); url += "/embeddings"; - std::cerr << " Calling OpenAI API: " << url << " (model=" << model << ", chunks=" << inputs.size() << ")\n"; + g_logger.debug(std::string("Calling OpenAI API: ") + url + + ", model=" + model + + ", inputs=" + std::to_string(inputs.size()) + + ", dim=" + std::to_string(dim)); CURL* curl = curl_easy_init(); if (!curl) throw std::runtime_error("curl_easy_init failed"); @@ -1007,32 +1254,62 @@ struct OpenAIEmbeddingProvider : public EmbeddingProvider { curl_slist_free_all(headers); curl_easy_cleanup(curl); - if (res != CURLE_OK) throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res)); - if (status < 200 || status >= 300) throw std::runtime_error("embedding request failed with status " + std::to_string(status)); + if (res != CURLE_OK) { + g_logger.error(std::string("curl error: ") + curl_easy_strerror(res)); + throw std::runtime_error(std::string("curl error: ") + curl_easy_strerror(res)); + } + if (status < 200 || status >= 300) { + g_logger.error(std::string("embedding request failed with status ") + std::to_string(status)); + throw std::runtime_error("embedding request failed with status " + std::to_string(status)); + } + + g_logger.debug(std::string("HTTP response status: ") + std::to_string(status) + + ", body_size=" + std::to_string(buf.data.size())); json resp = json::parse(buf.data); - if (!resp.contains("data") || !resp["data"].is_array()) throw std::runtime_error("embedding response missing data array"); + if (!resp.contains("data") || !resp["data"].is_array()) { + g_logger.error("embedding response missing data array"); + throw std::runtime_error("embedding response missing data array"); + } std::vector> out; out.reserve(resp["data"].size()); for (const auto& item : resp["data"]) { - if (!item.contains("embedding") || !item["embedding"].is_array()) throw std::runtime_error("embedding item missing embedding array"); + if (!item.contains("embedding") || !item["embedding"].is_array()) { + g_logger.error("embedding item missing embedding array"); + throw std::runtime_error("embedding item missing embedding array"); + } std::vector vec; vec.reserve(item["embedding"].size()); for (const auto& v : item["embedding"]) vec.push_back(v.get()); - if ((int)vec.size() != dim) throw std::runtime_error("embedding dimension mismatch"); + if ((int)vec.size() != dim) { + g_logger.error(std::string("embedding dimension mismatch: got ") + + std::to_string(vec.size()) + ", expected " + std::to_string(dim)); + throw std::runtime_error("embedding dimension mismatch"); + } out.push_back(std::move(vec)); } - if (out.size() != inputs.size()) throw std::runtime_error("embedding response size mismatch"); + if (out.size() != inputs.size()) { + g_logger.error(std::string("embedding response size mismatch: got ") + + std::to_string(out.size()) + ", expected " + std::to_string(inputs.size())); + throw std::runtime_error("embedding response size mismatch"); + } + + g_logger.info(std::string("OpenAI embed completed: ") + std::to_string(out.size()) + " embeddings generated"); return out; } }; static std::unique_ptr build_embedding_provider(const EmbeddingConfig& cfg) { + g_logger.debug(std::string("Building embedding provider: ") + cfg.provider); if (cfg.provider == "openai") { + g_logger.debug(std::string("Using OpenAI provider: api_base=") + cfg.api_base + + ", model=" + cfg.model + + ", timeout_ms=" + std::to_string(cfg.timeout_ms)); return std::make_unique(cfg.api_base, cfg.api_key, cfg.model, cfg.timeout_ms); } + g_logger.debug("Using stub embedding provider"); return std::make_unique(); } @@ -1041,6 +1318,8 @@ static std::unique_ptr build_embedding_provider(const Embeddi // =========================================================================== static std::vector load_sources(MySQLDB& db) { + g_logger.info("Loading enabled sources from rag_sources..."); + std::vector out; const char* sql = @@ -1054,6 +1333,7 @@ static std::vector load_sources(MySQLDB& db) { MYSQL_FIELD* fields = mysql_fetch_fields(res); MYSQL_ROW row; + int source_count = 0; while ((row = mysql_fetch_row(res)) != nullptr) { RagSource s; s.source_id = atoi(row[0]); @@ -1075,29 +1355,42 @@ static std::vector load_sources(MySQLDB& db) { const char* chunk_j = row[13]; const char* emb_j = row[14]; + g_logger.debug(std::string("Loading source_id=") + std::to_string(s.source_id) + + ", name=" + s.name + + ", backend_type=" + s.backend_type + + ", table=" + s.db + "." + s.table_name); + try { s.doc_map_json = json::parse(doc_map ? doc_map : "{}"); s.chunking_json = json::parse(chunk_j ? chunk_j : "{}"); if (emb_j && std::strlen(emb_j) > 0) s.embedding_json = json::parse(emb_j); else s.embedding_json = json(); } catch (const std::exception& e) { + g_logger.error(std::string("Invalid JSON in rag_sources.source_id=") + + std::to_string(s.source_id) + ": " + e.what()); mysql_free_result(res); fatal("Invalid JSON in rag_sources.source_id=" + std::to_string(s.source_id) + ": " + e.what()); } if (!s.doc_map_json.is_object()) { + g_logger.error(std::string("doc_map_json must be a JSON object for source_id=") + + std::to_string(s.source_id)); mysql_free_result(res); fatal("doc_map_json must be a JSON object for source_id=" + std::to_string(s.source_id)); } if (!s.chunking_json.is_object()) { + g_logger.error(std::string("chunking_json must be a JSON object for source_id=") + + std::to_string(s.source_id)); mysql_free_result(res); fatal("chunking_json must be a JSON object for source_id=" + std::to_string(s.source_id)); } out.push_back(std::move(s)); + source_count++; } mysql_free_result(res); + g_logger.info(std::string("Loaded ") + std::to_string(source_count) + " enabled source(s)"); return out; } @@ -1230,24 +1523,28 @@ static size_t flush_embedding_batch(std::vector& pending, MySQLDB& db) { if (pending.empty()) return 0; - std::cerr << " Generating embeddings for batch of " << pending.size() << " chunks...\n"; + g_logger.info(std::string("Generating embeddings for batch of ") + std::to_string(pending.size()) + " chunks..."); + g_logger.trace("Building input texts for embedding batch..."); std::vector inputs; inputs.reserve(pending.size()); for (const auto& p : pending) { inputs.push_back(p.input_text); } + g_logger.debug("Calling embedder for batch..."); std::vector> embeddings = embedder->embed(inputs, ecfg.dim); + g_logger.debug("Storing embeddings to rag_vec_chunks..."); for (size_t i = 0; i < pending.size() && i < embeddings.size(); i++) { const auto& p = pending[i]; + g_logger.trace(std::string("Storing embedding for chunk_id=") + p.chunk_id); insert_vec(db, embeddings[i], p.chunk_id, p.doc_id, p.source_id); } size_t count = pending.size(); pending.clear(); - std::cerr << " Successfully stored " << count << " embeddings\n"; + g_logger.info(std::string("Successfully stored ") + std::to_string(count) + " embeddings"); return count; } @@ -1256,35 +1553,53 @@ static size_t flush_embedding_batch(std::vector& pending, // =========================================================================== static void ingest_source(MySQLDB& db, const RagSource& src) { - std::cerr << "Ingesting source_id=" << src.source_id - << " name=" << src.name - << " backend=" << src.backend_type - << " table=" << src.table_name << "\n"; + g_logger.info(std::string("=== Starting ingestion for source_id=") + std::to_string(src.source_id) + + ", name=" + src.name + + ", backend=" + src.backend_type + + ", table=" + src.table_name + " ==="); if (src.backend_type != "mysql") { - std::cerr << " Skipping: backend_type not supported in v0.\n"; + g_logger.warn(std::string("Skipping source ") + src.name + ": backend_type '" + src.backend_type + "' not supported"); return; } + g_logger.debug("Parsing chunking and embedding configurations..."); ChunkingConfig ccfg = parse_chunking_json(src.chunking_json); EmbeddingConfig ecfg = parse_embedding_json(src.embedding_json); + std::unique_ptr embedder; if (ecfg.enabled) { + g_logger.info("Embeddings enabled, building embedding provider..."); embedder = build_embedding_provider(ecfg); + } else { + g_logger.info("Embeddings disabled for this source"); } + g_logger.debug("Loading sync cursor (watermark)..."); json cursor_json = load_sync_cursor_json(db, src.source_id); SyncCursor cursor = parse_sync_cursor(cursor_json, src.pk_column); + if (cursor.has_value) { + g_logger.info(std::string("Resuming from watermark: column=") + cursor.column + + (cursor.numeric ? ", value=" + std::to_string(cursor.num_value) + : ", value=" + cursor.str_value)); + } else { + g_logger.info("No previous watermark found, starting from beginning"); + } + MYSQL* mdb = mysql_connect_or_die(src); + g_logger.debug("Building SELECT query with incremental filter..."); std::vector cols = collect_needed_columns(src, ecfg); if (!cursor.column.empty()) add_unique(cols, cursor.column); std::string extra_filter = build_incremental_filter(cursor); std::string sel = build_select_sql(src, cols, extra_filter); + g_logger.debug(std::string("Executing backend query:\n") + sel); + if (mysql_query(mdb, sel.c_str()) != 0) { std::string err = mysql_error(mdb); + g_logger.error(std::string("Backend MySQL query failed: ") + err); mysql_close(mdb); fatal("MySQL query failed: " + err + "\nSQL: " + sel); } @@ -1292,12 +1607,18 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { MYSQL_RES* res = mysql_store_result(mdb); if (!res) { std::string err = mysql_error(mdb); + g_logger.error(std::string("Backend mysql_store_result failed: ") + err); mysql_close(mdb); fatal("mysql_store_result failed: " + err); } + my_ulonglong total_rows = mysql_num_rows(res); + g_logger.info(std::string("Backend query returned ") + std::to_string(total_rows) + " row(s) to process"); + std::uint64_t ingested_docs = 0; std::uint64_t skipped_docs = 0; + std::uint64_t total_chunks = 0; + std::uint64_t embedding_batches = 0; std::vector pending_embeddings; MYSQL_ROW r; @@ -1347,7 +1668,10 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { BuiltDoc doc = build_document_from_row(src, row); + g_logger.trace(std::string("Processing doc_id=") + doc.doc_id); + if (doc_exists(db, doc.doc_id)) { + g_logger.trace(std::string("Skipping existing doc_id=") + doc.doc_id); skipped_docs++; continue; } @@ -1356,6 +1680,8 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { doc.doc_id, doc.pk_json, doc.title, doc.body, doc.metadata_json); std::vector chunks = chunk_text_chars(doc.body, ccfg); + g_logger.debug(std::string("Created ") + std::to_string(chunks.size()) + " chunks for doc_id=" + doc.doc_id); + total_chunks += chunks.size(); for (size_t i = 0; i < chunks.size(); i++) { std::string chunk_id = doc.doc_id + "#" + std::to_string(i); @@ -1375,6 +1701,7 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { pending_embeddings.push_back({chunk_id, doc.doc_id, src.source_id, emb_input}); if ((int)pending_embeddings.size() >= ecfg.batch_size) { + embedding_batches++; flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, db); } } @@ -1382,32 +1709,42 @@ static void ingest_source(MySQLDB& db, const RagSource& src) { ingested_docs++; if (ingested_docs % 1000 == 0) { - std::cerr << " progress: ingested_docs=" << ingested_docs - << " skipped_docs=" << skipped_docs << "\n"; + g_logger.info(std::string("Progress: ingested_docs=") + std::to_string(ingested_docs) + + ", skipped_docs=" + std::to_string(skipped_docs) + + ", chunks=" + std::to_string(total_chunks)); } } if (ecfg.enabled && !pending_embeddings.empty()) { + embedding_batches++; + g_logger.debug("Flushing final pending embeddings batch..."); flush_embedding_batch(pending_embeddings, embedder.get(), ecfg, db); } mysql_free_result(res); mysql_close(mdb); + g_logger.info("Updating sync state with new watermark..."); if (!cursor_json.is_object()) cursor_json = json::object(); if (!cursor.column.empty()) cursor_json["column"] = cursor.column; if (max_set) { if (max_numeric) { + g_logger.debug(std::string("New watermark value (numeric): ") + std::to_string(max_num)); cursor_json["value"] = max_num; } else { + g_logger.debug(std::string("New watermark value (string): ") + max_str); cursor_json["value"] = max_str; } } update_sync_state(db, src.source_id, cursor_json); - std::cerr << "Done source " << src.name - << " ingested_docs=" << ingested_docs - << " skipped_docs=" << skipped_docs << "\n"; + g_logger.info(std::string("=== Source ingestion complete: ") + src.name + " ==="); + g_logger.info(std::string(" ingested_docs=") + std::to_string(ingested_docs) + + ", skipped_docs=" + std::to_string(skipped_docs) + + ", total_chunks=" + std::to_string(total_chunks)); + if (ecfg.enabled) { + g_logger.info(std::string(" embedding_batches=") + std::to_string(embedding_batches)); + } } // =========================================================================== @@ -1457,10 +1794,17 @@ static bool table_exists(MySQLDB& db, const std::string& table_name) { * @return true if schema was created, false if already exists */ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { + g_logger.info(std::string("Initializing RAG schema (vec_dim=") + std::to_string(vec_dim) + ")..."); + // Check if schema is complete by checking for rag_sync_state table // (rag_sync_state is created last, so if it exists, schema is complete) bool schema_complete = table_exists(db, "rag_sync_state"); + if (schema_complete) { + g_logger.info("Schema already exists (rag_sync_state table found)"); + return false; + } + g_logger.debug("Creating rag_sources table..."); // Note: PRAGMA commands are SQLite-specific and not supported through MySQL protocol // The SQLite backend should have these configured already @@ -1489,7 +1833,9 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { db.execute("CREATE INDEX IF NOT EXISTS idx_rag_sources_enabled ON rag_sources(enabled)"); db.execute("CREATE INDEX IF NOT EXISTS idx_rag_sources_backend ON rag_sources(backend_type, backend_host, backend_port, backend_db, table_name)"); + g_logger.trace("rag_sources table created"); + g_logger.debug("Creating rag_documents table..."); // Create rag_documents table db.execute( "CREATE TABLE IF NOT EXISTS rag_documents (" @@ -1507,7 +1853,9 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_updated ON rag_documents(source_id, updated_at)"); db.execute("CREATE INDEX IF NOT EXISTS idx_rag_documents_source_deleted ON rag_documents(source_id, deleted)"); + g_logger.trace("rag_documents table created"); + g_logger.debug("Creating rag_chunks table..."); // Create rag_chunks table db.execute( "CREATE TABLE IF NOT EXISTS rag_chunks (" @@ -1526,7 +1874,9 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { db.execute("CREATE UNIQUE INDEX IF NOT EXISTS uq_rag_chunks_doc_idx ON rag_chunks(doc_id, chunk_index)"); db.execute("CREATE INDEX IF NOT EXISTS idx_rag_chunks_source_doc ON rag_chunks(source_id, doc_id)"); db.execute("CREATE INDEX IF NOT EXISTS idx_rag_chunks_deleted ON rag_chunks(deleted)"); + g_logger.trace("rag_chunks table created"); + g_logger.debug("Creating rag_fts_chunks FTS5 virtual table..."); // Create FTS5 virtual table db.execute( "CREATE VIRTUAL TABLE IF NOT EXISTS rag_fts_chunks " @@ -1537,9 +1887,11 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { " tokenize = 'unicode61'" ")" ); + g_logger.trace("rag_fts_chunks FTS5 table created"); // Create vec0 virtual table for embeddings // Note: This may fail if sqlite-vec extension is not loaded + g_logger.debug(std::string("Creating rag_vec_chunks vec0 virtual table (dim=") + std::to_string(vec_dim) + ")..."); std::ostringstream vec_sql; vec_sql << "CREATE VIRTUAL TABLE IF NOT EXISTS rag_vec_chunks " << "USING vec0(" @@ -1550,10 +1902,13 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { << " updated_at INTEGER" << ")"; if (!db.try_execute(vec_sql.str().c_str())) { - std::cerr << "Warning: vec0 table creation failed (sqlite-vec extension not available). Vector embeddings will be disabled.\n"; + g_logger.warn("vec0 table creation failed (sqlite-vec extension not available). Vector embeddings will be disabled."); + } else { + g_logger.trace("rag_vec_chunks vec0 table created"); } // Create convenience view + g_logger.debug("Creating rag_chunk_view convenience view..."); db.execute( "CREATE VIEW IF NOT EXISTS rag_chunk_view AS " "SELECT " @@ -1571,8 +1926,10 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { "JOIN rag_documents d ON d.doc_id = c.doc_id " "WHERE c.deleted = 0 AND d.deleted = 0" ); + g_logger.trace("rag_chunk_view created"); // Create sync state table + g_logger.debug("Creating rag_sync_state table..."); db.execute( "CREATE TABLE IF NOT EXISTS rag_sync_state (" " source_id INTEGER PRIMARY KEY REFERENCES rag_sources(source_id)," @@ -1582,7 +1939,9 @@ static bool init_schema(MySQLDB& db, int vec_dim = 1536) { " last_error TEXT" ")" ); + g_logger.trace("rag_sync_state table created"); + g_logger.info("RAG schema initialization complete"); return !schema_complete; // Return true if we created it, false if it was already complete } @@ -1607,8 +1966,32 @@ struct ConnParams { // Init-specific parameters int vec_dim = 1536; // Vector dimension for vec0 table + + // Logging parameters + std::string log_level = "info"; // Log level: error, warn, info, debug, trace }; +/** + * @brief Parse log level string to LogLevel enum + * @param level_str Log level string (case-insensitive: error, warn, info, debug, trace) + * @return Corresponding LogLevel enum value + */ +static LogLevel parse_log_level(const std::string& level_str) { + std::string lower = level_str; + for (char& c : lower) { + c = static_cast(std::tolower(static_cast(c))); + } + + if (lower == "error") return LogLevel::ERROR; + if (lower == "warn" || lower == "warning") return LogLevel::WARN; + if (lower == "info") return LogLevel::INFO; + if (lower == "debug") return LogLevel::DEBUG; + if (lower == "trace") return LogLevel::TRACE; + + // Default to INFO if unknown + return LogLevel::INFO; +} + static void print_usage(const char* prog_name) { std::cerr << "Usage:\n"; std::cerr << " Initialize schema:\n"; @@ -1628,6 +2011,9 @@ static void print_usage(const char* prog_name) { std::cerr << " -D, --database=name Database to use (required)\n"; std::cerr << " -?, --help Show this help message\n"; std::cerr << "\n"; + std::cerr << "Logging Options:\n"; + std::cerr << " --log-level=LEVEL Log level: error, warn, info, debug, trace (default: info)\n"; + std::cerr << "\n"; std::cerr << "Init Options:\n"; std::cerr << " --vec-dim=# Vector dimension for rag_vec_chunks table (default: 1536)\n"; std::cerr << "\n"; @@ -1646,16 +2032,17 @@ static void print_usage(const char* prog_name) { */ static std::string parse_args(int argc, char** argv, ConnParams& params) { static struct option long_options[] = { - {"host", required_argument, 0, 'h'}, - {"port", required_argument, 0, 'P'}, - {"user", required_argument, 0, 'u'}, - {"password", required_argument, 0, 'p'}, - {"database", required_argument, 0, 'D'}, - {"text", required_argument, 0, 't'}, - {"source-id",required_argument, 0, 's'}, - {"limit", required_argument, 0, 'l'}, - {"vec-dim", required_argument, 0, 1000}, // Using 1000 as short code - {"help", no_argument, 0, '?'}, + {"host", required_argument, 0, 'h'}, + {"port", required_argument, 0, 'P'}, + {"user", required_argument, 0, 'u'}, + {"password", required_argument, 0, 'p'}, + {"database", required_argument, 0, 'D'}, + {"text", required_argument, 0, 't'}, + {"source-id", required_argument, 0, 's'}, + {"limit", required_argument, 0, 'l'}, + {"vec-dim", required_argument, 0, 1000}, // Using 1000 as short code + {"log-level", required_argument, 0, 1001}, // Using 1001 as short code + {"help", no_argument, 0, '?'}, {0, 0, 0, 0} }; @@ -1713,6 +2100,9 @@ static std::string parse_args(int argc, char** argv, ConnParams& params) { return ""; } break; + case 1001: // --log-level + params.log_level = optarg; + break; case '?': default: return ""; @@ -1743,30 +2133,47 @@ int main(int argc, char** argv) { return 2; } + // Set log level from command line parameter + g_logger.min_level = parse_log_level(params.log_level); + + g_logger.info("=== RAG Ingestion Tool Starting ==="); + + g_logger.info(std::string("Command: ") + command); + g_logger.info(std::string("Log level: ") + params.log_level); + g_logger.debug(std::string("Connection params: host=") + params.host + + ", port=" + std::to_string(params.port) + + ", database=" + params.database); + // Initialize command if (command == "init") { + g_logger.info("Executing 'init' command..."); MySQLDB db; db.connect(params.host.c_str(), params.port, params.user.c_str(), params.pass.c_str(), params.database.c_str()); bool created = init_schema(db, params.vec_dim); if (created) { + g_logger.info("Schema created successfully"); std::cout << "Schema created successfully (vec_dim=" << params.vec_dim << ").\n"; } else { + g_logger.info("Schema already exists"); std::cout << "Schema already exists.\n"; } + g_logger.info("=== 'init' command complete ==="); return 0; } // Ingest command if (command == "ingest") { + g_logger.info("Executing 'ingest' command..."); MySQLDB db; db.connect(params.host.c_str(), params.port, params.user.c_str(), params.pass.c_str(), params.database.c_str()); // Check if schema exists before proceeding if (!table_exists(db, "rag_sources")) { + g_logger.error("RAG schema not found. Please run 'init' command first."); std::cerr << "Error: RAG schema not found. Please run 'init' command first:\n"; std::cerr << " " << argv[0] << " init -h " << params.host << " -P " << params.port << " -u " << params.user @@ -1774,42 +2181,72 @@ int main(int argc, char** argv) { return 1; } + g_logger.debug("Initializing libcurl..."); curl_global_init(CURL_GLOBAL_DEFAULT); - // Note: PRAGMA commands are SQLite-specific and not supported through MySQL protocol - // The SQLite backend should have these configured already - db.execute("BEGIN IMMEDIATE;"); + std::vector sources = load_sources(db); + if (sources.empty()) { + g_logger.warn("No enabled sources found in rag_sources"); + std::cerr << "No enabled sources found in rag_sources.\n"; + } - bool ok = true; - try { - std::vector sources = load_sources(db); - if (sources.empty()) { - std::cerr << "No enabled sources found in rag_sources.\n"; - } - for (size_t i = 0; i < sources.size(); i++) { + // Per-source transaction handling + int succeeded = 0; + int failed = 0; + + for (size_t i = 0; i < sources.size(); i++) { + g_logger.info(std::string("Processing source ") + std::to_string(i + 1) + + " of " + std::to_string(sources.size())); + + // Start transaction for this source + g_logger.debug("Starting transaction for source " + std::to_string(sources[i].source_id) + "..."); + db.execute("BEGIN IMMEDIATE;"); + + bool source_ok = true; + try { ingest_source(db, sources[i]); + } catch (const std::exception& e) { + g_logger.error(std::string("Exception during source ingestion: ") + e.what()); + std::cerr << "Exception: " << e.what() << "\n"; + source_ok = false; + } catch (...) { + g_logger.error("Unknown exception during source ingestion"); + std::cerr << "Unknown exception\n"; + source_ok = false; + } + + // Commit or rollback this source's transaction + if (source_ok) { + g_logger.info("Committing source " + std::to_string(sources[i].source_id) + "..."); + db.execute("COMMIT;"); + succeeded++; + } else { + g_logger.warn("Rolling back source " + std::to_string(sources[i].source_id) + " due to errors"); + db.execute("ROLLBACK;"); + failed++; } - } catch (const std::exception& e) { - std::cerr << "Exception: " << e.what() << "\n"; - ok = false; - } catch (...) { - std::cerr << "Unknown exception\n"; - ok = false; } - db.execute(ok ? "COMMIT;" : "ROLLBACK;"); + g_logger.debug("Cleaning up libcurl..."); curl_global_cleanup(); - return ok ? 0 : 1; + + g_logger.info(std::string("=== 'ingest' command complete ===") + + "\n Succeeded: " + std::to_string(succeeded) + + "\n Failed: " + std::to_string(failed)); + + return (failed > 0) ? 1 : 0; } // Query command if (command == "query") { + g_logger.info(std::string("Executing 'query' command: ") + params.query_text); MySQLDB db; db.connect(params.host.c_str(), params.port, params.user.c_str(), params.pass.c_str(), params.database.c_str()); // Check if schema exists if (!table_exists(db, "rag_sources")) { + g_logger.error("RAG schema not found. Please run 'init' command first."); std::cerr << "Error: RAG schema not found. Please run 'init' command first:\n"; std::cerr << " " << argv[0] << " init -h " << params.host << " -P " << params.port << " -u " << params.user @@ -1817,6 +2254,7 @@ int main(int argc, char** argv) { return 1; } + g_logger.debug("Initializing libcurl..."); curl_global_init(CURL_GLOBAL_DEFAULT); try { @@ -1825,12 +2263,14 @@ int main(int argc, char** argv) { // Filter by source_id if specified if (params.source_id >= 0) { + g_logger.debug(std::string("Filtering by source_id=") + std::to_string(params.source_id)); auto it = std::remove_if(sources.begin(), sources.end(), [params](const RagSource& s) { return s.source_id != params.source_id; }); sources.erase(it, sources.end()); } if (sources.empty()) { + g_logger.warn("No enabled sources found for query"); std::cerr << "No enabled sources found"; if (params.source_id >= 0) { std::cerr << " for source_id=" << params.source_id; @@ -1842,8 +2282,11 @@ int main(int argc, char** argv) { // Use the first source's embedding config RagSource& source = sources[0]; + g_logger.debug(std::string("Using source_id=") + std::to_string(source.source_id) + + " for embedding config"); if (source.embedding_json.empty()) { + g_logger.error("Embeddings not configured for source"); std::cerr << "Error: Embeddings not configured for source " << source.source_id << "\n"; curl_global_cleanup(); return 1; @@ -1851,26 +2294,32 @@ int main(int argc, char** argv) { EmbeddingConfig emb_cfg = parse_embedding_json(source.embedding_json); if (!emb_cfg.enabled) { + g_logger.error("Embeddings not enabled for source"); std::cerr << "Error: Embeddings not enabled for source " << source.source_id << "\n"; curl_global_cleanup(); return 1; } + g_logger.info(std::string("Generating embedding for query using: ") + emb_cfg.provider); std::cout << "Generating embedding for query using: " << emb_cfg.provider << "\n"; // Build embedding provider auto embedder = build_embedding_provider(emb_cfg); // Generate embedding for query + g_logger.debug("Generating query embedding..."); std::vector query_inputs = {params.query_text}; std::vector> query_embeddings = embedder->embed(query_inputs, emb_cfg.dim); if (query_embeddings.empty() || query_embeddings[0].empty()) { + g_logger.error("Failed to generate embedding for query"); std::cerr << "Error: Failed to generate embedding for query\n"; curl_global_cleanup(); return 1; } + g_logger.info("Query embedding generated successfully"); + // Convert embedding to hex string for vec0 MATCH std::string query_hex; query_hex.reserve(query_embeddings[0].size() * 8); @@ -1885,6 +2334,8 @@ int main(int argc, char** argv) { source_filter = "AND c.source_id = " + std::to_string(params.source_id); } + g_logger.debug(std::string("Building vector search query, limit=") + std::to_string(params.limit)); + // Use subquery with VALUES to provide the query embedding // This creates a temporary single-row result with the query embedding std::string search_sql = @@ -1900,6 +2351,7 @@ int main(int argc, char** argv) { "ORDER BY v.distance"; // Execute search + g_logger.info("Executing vector search query..."); MYSQL_RES* result = db.query(search_sql.c_str()); if (result) { MYSQL_ROW row; @@ -1917,6 +2369,7 @@ int main(int argc, char** argv) { } mysql_free_result(result); + g_logger.info(std::string("Vector search complete: ") + std::to_string(row_count) + " result(s)"); if (row_count == 0) { std::cout << "No results found.\n"; } else { @@ -1925,20 +2378,26 @@ int main(int argc, char** argv) { } } catch (const std::exception& e) { + g_logger.error(std::string("Exception during query: ") + e.what()); std::cerr << "Exception: " << e.what() << "\n"; curl_global_cleanup(); return 1; } catch (...) { + g_logger.error("Unknown exception during query"); std::cerr << "Unknown exception\n"; curl_global_cleanup(); return 1; } + g_logger.debug("Cleaning up libcurl..."); curl_global_cleanup(); + + g_logger.info("=== 'query' command complete ==="); return 0; } // Unknown command + g_logger.error(std::string("Unknown command: ") + command); print_usage(argv[0]); return 2; }