diff --git a/.gitignore b/.gitignore index eb49d6b35..637d65778 100644 --- a/.gitignore +++ b/.gitignore @@ -177,6 +177,12 @@ test/tap/tests/test_cluster_sync_config/test_cluster_sync.cnf .aider* GEMINI.md +# generated MySQL parser artifacts +lib/MySQL_Lexer.yy.c +lib/MySQL_Parser.output +lib/MySQL_Parser.tab.c +lib/MySQL_Parser.tab.h + # Database discovery output files discovery_*.md database_discovery_report.md diff --git a/doc/internal/pgsql_advanced_query_logging_architecture.md b/doc/internal/pgsql_advanced_query_logging_architecture.md new file mode 100644 index 000000000..712e4caf5 --- /dev/null +++ b/doc/internal/pgsql_advanced_query_logging_architecture.md @@ -0,0 +1,210 @@ +# PostgreSQL Advanced Query Logging Architecture + +## Document Status +- Status: Implemented (as-built) +- Scope: PostgreSQL advanced events logging parity with MySQL, including buffer, SQLite sinks, dump commands, scheduler sync, and metrics +- Branch: `v3.0_pgsql_advanced_logging` + +## 1. Objective and Delivered Outcome +The PostgreSQL logging pipeline now supports advanced query events logging with the same operational model already used by MySQL: +- query event capture at request completion +- in-memory circular buffering +- manual dump from buffer to `stats` and/or `stats_history` +- optional periodic auto-dump to disk +- logger metrics in `stats_pgsql_global` and Prometheus + +This implementation is additive. Existing PostgreSQL file-based events log and audit log behavior remains available. + +## 2. As-Built Runtime Architecture + +### 2.1 Capture path +1. PostgreSQL sessions call query logging at request completion. +2. `PgSQL_Logger::log_request()` builds a `PgSQL_Event` from session/backend state. +3. If file logging is enabled, the event is written to events log file. +4. If `pgsql-eventslog_buffer_history_size > 0`, the event is deep-copied and inserted into the PostgreSQL events circular buffer. + +Implemented in: +- `lib/PgSQL_Logger.cpp` +- `include/PgSQL_Logger.hpp` + +### 2.2 Buffering model +A dedicated `PgSQL_Logger_CircularBuffer` provides: +- thread-safe insertion/drain using mutex +- bounded size via runtime variable +- event drop accounting when the buffer is full or resized smaller + +Runtime resizing is wired in PostgreSQL thread variable refresh: +- `PgSQL_Thread::refresh_variables()` applies `eventslog_buffer_history_size` changes to the live circular buffer. + +Implemented in: +- `include/PgSQL_Logger.hpp` +- `lib/PgSQL_Logger.cpp` +- `lib/PgSQL_Thread.cpp` + +### 2.3 Drain and persistence pipeline +`PgSQL_Logger::processEvents(SQLite3DB* statsdb, SQLite3DB* statsdb_disk)` drains the buffer and persists events to: +- memory table: `stats_pgsql_query_events` when `statsdb != nullptr` +- disk table: `history_pgsql_query_events` when `statsdb_disk != nullptr` + +Behavior: +- batched SQLite inserts with prepared statements +- in-memory retention bound by `pgsql-eventslog_table_memory_size` +- query digest serialized as hex text (`0x...`), matching MySQL table style +- `sqlstate` and textual `error` persisted for failed queries + +Implemented in: +- `lib/PgSQL_Logger.cpp` + +## 3. Data Model + +### 3.1 Memory table +`stats.stats_pgsql_query_events` + +Columns: +- `id` +- `thread_id` +- `username` +- `database` +- `start_time` +- `end_time` +- `query_digest` +- `query` +- `server` +- `client` +- `event_type` +- `hid` +- `extra_info` +- `affected_rows` +- `rows_sent` +- `client_stmt_name` +- `sqlstate` +- `error` + +Implemented in: +- `include/ProxySQL_Admin_Tables_Definitions.h` +- `lib/Admin_Bootstrap.cpp` + +### 3.2 Disk history table +`stats_history.history_pgsql_query_events` + +Columns match `stats_pgsql_query_events`. + +Indexes: +- `idx_history_pgsql_query_events_start_time` on `start_time` +- `idx_history_pgsql_query_events_query_digest` on `query_digest` + +Implemented in: +- `include/ProxySQL_Statistics.hpp` +- `lib/ProxySQL_Statistics.cpp` + +## 4. Admin Interface and Control Surface + +### 4.1 Dump commands +PostgreSQL-specific dump commands are now available: +- `DUMP PGSQL EVENTSLOG FROM BUFFER TO MEMORY` +- `DUMP PGSQL EVENTSLOG FROM BUFFER TO DISK` +- `DUMP PGSQL EVENTSLOG FROM BUFFER TO BOTH` + +Command handling executes `GloPgSQL_Logger->processEvents(...)` with the selected sink targets. + +These commands are exposed by the shared Admin module and are available from both Admin protocol endpoints: +- MySQL protocol on port `6032` +- PostgreSQL protocol on port `6132` + +Implemented in: +- `lib/Admin_Handler.cpp` + +### 4.2 Runtime/config variables +PostgreSQL thread variables used by advanced logging: +- `pgsql-eventslog_buffer_history_size` +- `pgsql-eventslog_table_memory_size` +- `pgsql-eventslog_buffer_max_query_length` + +Admin scheduling variable: +- `admin-stats_pgsql_eventslog_sync_buffer_to_disk` + +Implemented in: +- `include/PgSQL_Thread.h` +- `include/proxysql_structs.h` +- `include/proxysql_admin.h` +- `lib/ProxySQL_Admin.cpp` + +## 5. Scheduler Integration + +### 5.1 PostgreSQL auto-dump to disk +Admin main loop now periodically flushes PostgreSQL buffered events to `history_pgsql_query_events` when: +- `stats_pgsql_eventslog_sync_buffer_to_disk > 0` +- timer interval is elapsed + +### 5.2 MySQL symmetry fix +The same scheduler loop now also invokes MySQL buffered events dump to disk based on `stats_mysql_eventslog_sync_buffer_to_disk`, ensuring symmetric behavior across both protocols. + +Implemented in: +- `lib/ProxySQL_Admin.cpp` + +## 6. Metrics Architecture + +### 6.1 Logger internal metrics +PostgreSQL logger tracks: +- memory/disk copy counts +- total copy time (memory/disk) +- get-all-events calls/time/count +- total events copied (memory/disk) +- circular buffer added/dropped totals +- circular buffer current size + +Implemented in: +- `include/PgSQL_Logger.hpp` +- `lib/PgSQL_Logger.cpp` + +### 6.2 Stats table exposure +Metrics are exported to `stats_pgsql_global` with `PgSQL_Logger_` prefix. + +Implemented in: +- `lib/ProxySQL_Admin_Stats.cpp` + +### 6.3 Prometheus exposure +Prometheus metric family `proxysql_pgsql_logger_*` is exposed through the serial metrics updater path. + +Implemented in: +- `lib/PgSQL_Logger.cpp` +- `lib/ProxySQL_Admin.cpp` + +## 7. Compatibility and Semantics +- Existing `DUMP EVENTSLOG ...` remains MySQL behavior for compatibility. +- New PostgreSQL syntax is explicit: `DUMP PGSQL EVENTSLOG ...`. +- PostgreSQL event error fields use `sqlstate + error` (textual message). +- PostgreSQL event table uses `database` column naming. + +## 8. Code Touchpoint Summary +- Logger core: + - `include/PgSQL_Logger.hpp` + - `lib/PgSQL_Logger.cpp` +- Thread/runtime integration: + - `lib/PgSQL_Thread.cpp` +- Admin commands and scheduler: + - `lib/Admin_Handler.cpp` + - `lib/ProxySQL_Admin.cpp` +- Table definitions/bootstrap: + - `include/ProxySQL_Admin_Tables_Definitions.h` + - `include/ProxySQL_Statistics.hpp` + - `lib/Admin_Bootstrap.cpp` + - `lib/ProxySQL_Statistics.cpp` +- Stats/Prometheus metrics: + - `lib/ProxySQL_Admin_Stats.cpp` + +## 9. Validation and Acceptance Mapping +Implemented acceptance validation via TAP test: +- `test/tap/tests/pgsql_query_logging_memory-t.cpp` +- `test/tap/tests/pgsql_query_logging_autodump-t.cpp` + +Coverage: +- table schema shape validation (`stats_pgsql_query_events`, `history_pgsql_query_events`) +- buffer dump command execution +- success/error event accounting in memory and history tables +- `sqlstate` capture for representative PostgreSQL errors +- non-empty textual `error` capture for error rows +- scheduler-driven periodic dump to `history_pgsql_query_events` via `admin-stats_pgsql_eventslog_sync_buffer_to_disk` + +TAP group registration: +- `test/tap/groups/groups.json` (`pgsql_query_logging_memory-t`, `pgsql_query_logging_autodump-t`) diff --git a/include/PgSQL_Logger.hpp b/include/PgSQL_Logger.hpp index c3af60f86..12752ea0a 100644 --- a/include/PgSQL_Logger.hpp +++ b/include/PgSQL_Logger.hpp @@ -2,9 +2,59 @@ #define __CLASS_PGSQL_LOGGER_H #include "proxysql.h" #include "cpp.h" +#include +#include +#include +#include +#include +#include +#include #define PROXYSQL_LOGGER_PTHREAD_MUTEX +/** + * @brief Counter metric indices for PostgreSQL advanced query logging. + */ +struct p_pl_counter { + enum metric { + memory_copy_count = 0, + disk_copy_count, + get_all_events_calls_count, + get_all_events_events_count, + total_memory_copy_time_us, + total_disk_copy_time_us, + total_get_all_events_time_us, + total_events_copied_to_memory, + total_events_copied_to_disk, + circular_buffer_events_added_count, + circular_buffer_events_dropped_count, + __size + }; +}; + +/** + * @brief Gauge metric indices for PostgreSQL advanced query logging. + */ +struct p_pl_gauge { + enum metric { + circular_buffer_events_size, + __size + }; +}; + +/** + * @brief Metric map tuple indexes used by Prometheus helper initializers. + */ +struct pl_metrics_map_idx { + enum index { + counters = 0, + gauges + }; +}; + +/** + * @brief PostgreSQL event types captured by advanced logging. + */ enum class PGSQL_LOG_EVENT_TYPE { SIMPLE_QUERY, AUTH_OK, @@ -25,6 +75,9 @@ enum class PGSQL_LOG_EVENT_TYPE { STMT_PREPARE }; +/** + * @brief Query or connection event payload written to PostgreSQL logs/sinks. + */ class PgSQL_Event { private: uint32_t thread_id; @@ -53,19 +106,126 @@ class PgSQL_Event { uint64_t affected_rows; uint64_t rows_sent; + char* sqlstate; + char* errmsg; + bool free_on_delete; + bool free_error_on_delete; public: + /** + * @brief Builds an event object using session/query context. + */ PgSQL_Event(PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len); + /** + * @brief Deep-copy constructor used by circular buffer insertion. + */ + PgSQL_Event(const PgSQL_Event& other); + /** + * @brief Copy assignment is disabled to prevent shallow pointer ownership bugs. + */ + PgSQL_Event& operator=(const PgSQL_Event&) = delete; + /** + * @brief Frees event-owned allocations for deep-copied instances. + */ + ~PgSQL_Event(); + /** + * @brief Serializes this event into the configured events/audit file format. + * @return Number of bytes written for binary format, 0 for JSON format. + */ uint64_t write(std::fstream *f, PgSQL_Session *sess); + /** + * @brief Writes binary format payload for query events. + */ uint64_t write_query_format_1(std::fstream *f); + /** + * @brief Writes JSON format payload for query events. + */ uint64_t write_query_format_2_json(std::fstream *f); + /** + * @brief Writes JSON payload for authentication/audit events. + */ void write_auth(std::fstream *f, PgSQL_Session *sess); + /** + * @brief Sets client prepared statement name associated with this event. + */ void set_client_stmt_name(char* client_stmt_name); + /** + * @brief Sets event query text and effective query length. + */ void set_query(const char *ptr, int len); + /** + * @brief Sets backend endpoint and hostgroup information. + */ void set_server(int _hid, const char *ptr, int len); + /** + * @brief Sets event extra info text payload. + */ void set_extra_info(char *); + /** + * @brief Sets affected rows flag/value for this event. + */ void set_affected_rows(uint64_t ar); + /** + * @brief Sets rows-sent flag/value for this event. + */ void set_rows_sent(uint64_t rs); + /** + * @brief Sets SQLSTATE and textual error information associated with this event. + * @param _sqlstate SQLSTATE code. + * @param _errmsg Error message string. + */ + void set_error(const char* _sqlstate, const char* _errmsg); + friend class PgSQL_Logger; +}; + +/** + * @brief Thread-safe circular buffer for advanced PostgreSQL query events logging. + */ +class PgSQL_Logger_CircularBuffer { +private: + std::deque event_buffer; + std::mutex mutex; + std::atomic eventsAddedCount; + std::atomic eventsDroppedCount; + +public: + std::atomic buffer_size; + /** + * @brief Creates a bounded circular buffer with the supplied capacity. + */ + explicit PgSQL_Logger_CircularBuffer(size_t size); + /** + * @brief Destroys the circular buffer and frees retained events. + */ + ~PgSQL_Logger_CircularBuffer(); + /** + * @brief Inserts one event, evicting older entries when full. + */ + void insert(PgSQL_Event* event); + /** + * @brief Drains all buffered events into @p events and clears the buffer. + */ + void get_all_events(std::vector& events); + /** + * @brief Returns current number of retained events. + */ + size_t size(); + /** + * @brief Returns configured buffer capacity. + */ + size_t getBufferSize() const; + /** + * @brief Updates buffer capacity and drops oldest events if oversized. + */ + void setBufferSize(size_t newSize); + /** + * @brief Returns total number of inserted events. + */ + unsigned long long getEventsAddedCount() const { return eventsAddedCount; } + /** + * @brief Returns total number of dropped/evicted events. + */ + unsigned long long getEventsDroppedCount() const { return eventsDroppedCount; } }; class PgSQL_Logger { @@ -86,6 +246,27 @@ class PgSQL_Logger { unsigned int max_log_file_size; std::fstream *logfile; } audit; + + /** + * @brief Accumulated runtime metrics for PostgreSQL advanced events logging. + */ + struct EventLogMetrics { + std::atomic memoryCopyCount; + std::atomic diskCopyCount; + std::atomic getAllEventsCallsCount; + std::atomic getAllEventsEventsCount; + std::atomic totalMemoryCopyTimeMicros; + std::atomic totalDiskCopyTimeMicros; + std::atomic totalGetAllEventsTimeMicros; + std::atomic totalEventsCopiedToMemory; + std::atomic totalEventsCopiedToDisk; + } metrics; + + struct { + std::array p_counter_array {}; + std::array p_gauge_array {}; + } prom_metrics; + #ifdef PROXYSQL_LOGGER_PTHREAD_MUTEX pthread_mutex_t wmutex; #else @@ -98,7 +279,13 @@ class PgSQL_Logger { unsigned int events_find_next_id(); unsigned int audit_find_next_id(); public: + /** + * @brief Constructs PostgreSQL logger state, circular buffer, and metrics. + */ PgSQL_Logger(); + /** + * @brief Destroys logger-owned resources and buffered events. + */ ~PgSQL_Logger(); void print_version(); void flush_log(); @@ -113,6 +300,28 @@ class PgSQL_Logger { void flush(); void wrlock(); void wrunlock(); + PgSQL_Logger_CircularBuffer* PgLogCB; + + /** + * @brief Inserts PostgreSQL events into a SQLite table using bulk prepared statements. + */ + void insertPgSQLEventsIntoDb(SQLite3DB* db, const std::string& tableName, size_t numEvents, std::vector::const_iterator begin); + + /** + * @brief Drains buffered PostgreSQL events and writes them to memory and/or disk stats tables. + * @return Number of drained events. + */ + int processEvents(SQLite3DB* statsdb, SQLite3DB* statsdb_disk); + + /** + * @brief Returns all PostgreSQL logger metrics for stats table export. + */ + std::unordered_map getAllMetrics() const; + + /** + * @brief Prometheus serial exposer update hook for PostgreSQL logger metrics. + */ + void p_update_metrics(); }; #endif /* __CLASS_PGSQL_LOGGER_H */ diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index bb874463f..5a4b174f3 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -1026,6 +1026,12 @@ public: int poll_timeout_on_failure; char* eventslog_filename; int eventslog_filesize; + /** @brief Circular buffer size for PostgreSQL advanced events logging. */ + int eventslog_buffer_history_size; + /** @brief Maximum rows retained in stats_pgsql_query_events in-memory table. */ + int eventslog_table_memory_size; + /** @brief Maximum query length copied into PostgreSQL eventslog circular buffer. */ + int eventslog_buffer_max_query_length; int eventslog_default_log; int eventslog_format; char* auditlog_filename; diff --git a/include/ProxySQL_Admin_Tables_Definitions.h b/include/ProxySQL_Admin_Tables_Definitions.h index 7bcd941d0..7214f3dcd 100644 --- a/include/ProxySQL_Admin_Tables_Definitions.h +++ b/include/ProxySQL_Admin_Tables_Definitions.h @@ -340,6 +340,10 @@ #define STATS_SQLITE_TABLE_PGSQL_CLIENT_HOST_CACHE "CREATE TABLE stats_pgsql_client_host_cache (client_address VARCHAR NOT NULL , error_count INT NOT NULL , last_updated BIGINT NOT NULL)" #define STATS_SQLITE_TABLE_PGSQL_CLIENT_HOST_CACHE_RESET "CREATE TABLE stats_pgsql_client_host_cache_reset (client_address VARCHAR NOT NULL , error_count INT NOT NULL , last_updated BIGINT NOT NULL)" #define STATS_SQLITE_TABLE_PGSQL_QUERY_RULES "CREATE TABLE stats_pgsql_query_rules (rule_id INTEGER PRIMARY KEY , hits INT NOT NULL)" +/** + * @brief In-memory PostgreSQL query events table used by advanced events logging. + */ +#define STATS_SQLITE_TABLE_PGSQL_QUERY_EVENTS "CREATE TABLE stats_pgsql_query_events (id INTEGER PRIMARY KEY AUTOINCREMENT , thread_id INTEGER , username TEXT , database TEXT , start_time INTEGER , end_time INTEGER , query_digest TEXT , query TEXT , server TEXT , client TEXT , event_type INTEGER , hid INTEGER , extra_info TEXT , affected_rows INTEGER , rows_sent INTEGER , client_stmt_name TEXT , sqlstate TEXT , error TEXT)" #define STATS_SQLITE_TABLE_PGSQL_COMMANDS_COUNTERS "CREATE TABLE stats_pgsql_commands_counters (Command VARCHAR NOT NULL PRIMARY KEY , Total_Time_us INT NOT NULL , Total_cnt INT NOT NULL , cnt_100us INT NOT NULL , cnt_500us INT NOT NULL , cnt_1ms INT NOT NULL , cnt_5ms INT NOT NULL , cnt_10ms INT NOT NULL , cnt_50ms INT NOT NULL , cnt_100ms INT NOT NULL , cnt_500ms INT NOT NULL , cnt_1s INT NOT NULL , cnt_5s INT NOT NULL , cnt_10s INT NOT NULL , cnt_INFs)" #define STATS_SQLITE_TABLE_PGSQL_QUERY_DIGEST "CREATE TABLE stats_pgsql_query_digest (hostgroup INT , database VARCHAR NOT NULL , username VARCHAR NOT NULL , client_address VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , sum_rows_affected INTEGER NOT NULL , sum_rows_sent INTEGER NOT NULL , PRIMARY KEY(hostgroup, database, username, client_address, digest))" #define STATS_SQLITE_TABLE_PGSQL_QUERY_DIGEST_RESET "CREATE TABLE stats_pgsql_query_digest_reset (hostgroup INT , database VARCHAR NOT NULL , username VARCHAR NOT NULL , client_address VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , sum_rows_affected INTEGER NOT NULL , sum_rows_sent INTEGER NOT NULL , PRIMARY KEY(hostgroup, database, username, client_address, digest))" diff --git a/include/ProxySQL_Statistics.hpp b/include/ProxySQL_Statistics.hpp index 7e1c1ad49..7279ee4b2 100644 --- a/include/ProxySQL_Statistics.hpp +++ b/include/ProxySQL_Statistics.hpp @@ -93,6 +93,10 @@ #define STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_EVENTS "CREATE TABLE history_mysql_query_events (id INTEGER PRIMARY KEY AUTOINCREMENT , thread_id INTEGER , username TEXT , schemaname TEXT , start_time INTEGER , end_time INTEGER , query_digest TEXT , query TEXT , server TEXT , client TEXT , event_type INTEGER , hid INTEGER , extra_info TEXT , affected_rows INTEGER , last_insert_id INTEGER , rows_sent INTEGER , client_stmt_id INTEGER , gtid TEXT , errno INT , error TEXT)" +/** + * @brief On-disk PostgreSQL query events table used by advanced events logging. + */ +#define STATSDB_SQLITE_TABLE_HISTORY_PGSQL_QUERY_EVENTS "CREATE TABLE history_pgsql_query_events (id INTEGER PRIMARY KEY AUTOINCREMENT , thread_id INTEGER , username TEXT , database TEXT , start_time INTEGER , end_time INTEGER , query_digest TEXT , query TEXT , server TEXT , client TEXT , event_type INTEGER , hid INTEGER , extra_info TEXT , affected_rows INTEGER , rows_sent INTEGER , client_stmt_name TEXT , sqlstate TEXT , error TEXT)" // Generic time-series metrics table #define STATSDB_SQLITE_TABLE_TSDB_METRICS \ @@ -118,6 +122,7 @@ class ProxySQL_Statistics { unsigned long long next_timer_mysql_query_digest_to_disk; unsigned long long next_timer_system_cpu; unsigned long long last_timer_mysql_dump_eventslog_to_disk = 0; + unsigned long long last_timer_pgsql_dump_eventslog_to_disk = 0; #ifndef NOJEM unsigned long long next_timer_system_memory; #endif @@ -141,6 +146,8 @@ class ProxySQL_Statistics { int stats_system_cpu; int stats_mysql_query_digest_to_disk; int stats_mysql_eventslog_sync_buffer_to_disk; + /** @brief Periodic disk sync interval (seconds) for PostgreSQL eventslog buffer. */ + int stats_pgsql_eventslog_sync_buffer_to_disk; #ifndef NOJEM int stats_system_memory; #endif @@ -175,6 +182,12 @@ class ProxySQL_Statistics { * The dump interval is retrieved from the ProxySQL configuration. If the dump interval is 0, no dumping is performed. */ bool MySQL_Logger_dump_eventslog_timetoget(unsigned long long currentTimeMicros); + /** + * @brief Checks if it's time to dump PostgreSQL eventslog buffer to disk. + * @param currentTimeMicros The current time in microseconds. + * @return True when periodic PostgreSQL events dump should run. + */ + bool PgSQL_Logger_dump_eventslog_timetoget(unsigned long long currentTimeMicros); #ifndef NOJEM bool system_memory_timetoget(unsigned long long); diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 40879cc6e..dd1800d80 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -326,6 +326,8 @@ class ProxySQL_Admin { int stats_mysql_query_cache; int stats_mysql_query_digest_to_disk; int stats_mysql_eventslog_sync_buffer_to_disk; + /** @brief Periodic disk sync interval (seconds) for PostgreSQL eventslog buffer. */ + int stats_pgsql_eventslog_sync_buffer_to_disk; int stats_system_cpu; int stats_system_memory; bool restapi_enabled; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 0efb79576..9ed1b4fea 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -1174,6 +1174,9 @@ __thread char* pgsql_thread___auditlog_filename; __thread int pgsql_thread___auditlog_filesize; __thread char* pgsql_thread___eventslog_filename; __thread int pgsql_thread___eventslog_filesize; +__thread int pgsql_thread___eventslog_buffer_history_size; +__thread int pgsql_thread___eventslog_table_memory_size; +__thread int pgsql_thread___eventslog_buffer_max_query_length; __thread int pgsql_thread___eventslog_default_log; __thread int pgsql_thread___eventslog_format; __thread char* pgsql_thread___firewall_whitelist_errormsg; @@ -1478,6 +1481,9 @@ extern __thread char* pgsql_thread___auditlog_filename; extern __thread int pgsql_thread___auditlog_filesize; extern __thread char* pgsql_thread___eventslog_filename; extern __thread int pgsql_thread___eventslog_filesize; +extern __thread int pgsql_thread___eventslog_buffer_history_size; +extern __thread int pgsql_thread___eventslog_table_memory_size; +extern __thread int pgsql_thread___eventslog_buffer_max_query_length; extern __thread int pgsql_thread___eventslog_default_log; extern __thread int pgsql_thread___eventslog_format; extern __thread char* pgsql_thread___firewall_whitelist_errormsg; diff --git a/lib/Admin_Bootstrap.cpp b/lib/Admin_Bootstrap.cpp index d67ad2192..abb9e91a6 100644 --- a/lib/Admin_Bootstrap.cpp +++ b/lib/Admin_Bootstrap.cpp @@ -910,6 +910,7 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_stats,"stats_pgsql_query_digest", STATS_SQLITE_TABLE_PGSQL_QUERY_DIGEST); insert_into_tables_defs(tables_defs_stats,"stats_pgsql_query_digest_reset", STATS_SQLITE_TABLE_PGSQL_QUERY_DIGEST_RESET); insert_into_tables_defs(tables_defs_stats,"stats_pgsql_prepared_statements_info", STATS_SQLITE_TABLE_PGSQL_PREPARED_STATEMENTS_INFO); + insert_into_tables_defs(tables_defs_stats,"stats_pgsql_query_events", STATS_SQLITE_TABLE_PGSQL_QUERY_EVENTS); // ProxySQL Cluster insert_into_tables_defs(tables_defs_admin,"proxysql_servers", ADMIN_SQLITE_TABLE_PROXYSQL_SERVERS); diff --git a/lib/Admin_Handler.cpp b/lib/Admin_Handler.cpp index 086a05b9a..1b39d8f14 100644 --- a/lib/Admin_Handler.cpp +++ b/lib/Admin_Handler.cpp @@ -3243,6 +3243,37 @@ void admin_session_handler(S* sess, void *_pa, PtrSize_t *pkt) { goto __run_query; } + if (!strncasecmp("DUMP PGSQL EVENTSLOG ", query_no_space, strlen("DUMP PGSQL EVENTSLOG "))) { + int num_rows = 0; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "Received command DUMP PGSQL EVENTSLOG: %s\n", query_no_space); + proxy_info("Received command DUMP PGSQL EVENTSLOG: %s\n", query_no_space); + + std::map> commandMap = { + {"DUMP PGSQL EVENTSLOG FROM BUFFER TO MEMORY", {SPA->statsdb, nullptr}}, + {"DUMP PGSQL EVENTSLOG FROM BUFFER TO DISK", {nullptr, SPA->statsdb_disk}}, + {"DUMP PGSQL EVENTSLOG FROM BUFFER TO BOTH", {SPA->statsdb, SPA->statsdb_disk}} + }; + + string s = string(query_no_space); + auto it = commandMap.find(s); + if (it != commandMap.end()) { + if (GloPgSQL_Logger == nullptr) { + proxy_warning("PgSQL logger not initialized for command: %s\n", query_no_space); + const string err_msg = "PgSQL logger not initialized"; + SPA->send_error_msg_to_client(sess, const_cast(err_msg.c_str())); + } else { + num_rows = GloPgSQL_Logger->processEvents(it->second.first, it->second.second); + SPA->send_ok_msg_to_client(sess, NULL, num_rows, query_no_space); + } + } else { + proxy_warning("Received invalid command DUMP PGSQL EVENTSLOG: %s\n", query_no_space); + const string err_msg = "Invalid DUMP PGSQL EVENTSLOG command"; + SPA->send_error_msg_to_client(sess, const_cast(err_msg.c_str())); + } + run_query = false; + goto __run_query; + } + // handle special queries from Cluster // for bug #1188 , ProxySQL Admin needs to know the exact query diff --git a/lib/Makefile b/lib/Makefile index b59bd9095..e440b32b8 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -121,7 +121,8 @@ $(ODIR): clean: - rm -rf *.pid $(ODIR)/*.oo $(ODIR)/*.o $(ODIR)/*.gcno $(ODIR)/*.gcda *.ko *.so *~ core libproxysql.a $(ODIR) + rm -rf *.pid $(ODIR)/*.oo $(ODIR)/*.o $(ODIR)/*.gcno $(ODIR)/*.gcda *.ko *.so *~ core libproxysql.a $(ODIR) \ + MySQL_Lexer.yy.c MySQL_Parser.output MySQL_Parser.tab.c MySQL_Parser.tab.h ## self note diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index e279059c4..7a549105d 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -3,6 +3,7 @@ using json = nlohmann::json; #define PROXYJSON #include +#include #include "proxysql.h" #include "cpp.h" @@ -23,6 +24,110 @@ using json = nlohmann::json; #define PROXYSQL_PGSQL_LOGGER_VERSION "2.5.0421" DEB extern PgSQL_Logger *GloPgSQL_Logger; +extern PgSQL_Threads_Handler* GloPTH; + +using metric_name = std::string; +using metric_help = std::string; +using metric_tags = std::map; + +using pl_counter_tuple = + std::tuple< + p_pl_counter::metric, + metric_name, + metric_help, + metric_tags + >; + +using pl_gauge_tuple = + std::tuple< + p_pl_gauge::metric, + metric_name, + metric_help, + metric_tags + >; + +using pl_counter_vector = std::vector; +using pl_gauge_vector = std::vector; + +const std::tuple +pl_metrics_map = std::make_tuple( + pl_counter_vector { + std::make_tuple( + p_pl_counter::memory_copy_count, + "proxysql_pgsql_logger_copy_total", + "Number of times events were copied to the in-memory/on-disk databases.", + metric_tags { { "target", "memory" } } + ), + std::make_tuple( + p_pl_counter::disk_copy_count, + "proxysql_pgsql_logger_copy_total", + "Number of times events were copied to the in-memory/on-disk databases.", + metric_tags { { "target", "disk" } } + ), + std::make_tuple( + p_pl_counter::get_all_events_calls_count, + "proxysql_pgsql_logger_get_all_events_calls_total", + "Number of times the 'get_all_events' method was called.", + metric_tags {} + ), + std::make_tuple( + p_pl_counter::get_all_events_events_count, + "proxysql_pgsql_logger_get_all_events_events_total", + "Number of events retrieved by the `get_all_events` method.", + metric_tags {} + ), + std::make_tuple( + p_pl_counter::total_memory_copy_time_us, + "proxysql_pgsql_logger_copy_seconds_total", + "Total time spent copying events to the in-memory/on-disk databases.", + metric_tags { { "target", "memory" } } + ), + std::make_tuple( + p_pl_counter::total_disk_copy_time_us, + "proxysql_pgsql_logger_copy_seconds_total", + "Total time spent copying events to the in-memory/on-disk databases.", + metric_tags { { "target", "disk" } } + ), + std::make_tuple( + p_pl_counter::total_get_all_events_time_us, + "proxysql_pgsql_logger_get_all_events_seconds_total", + "Total time spent in `get_all_events` method.", + metric_tags {} + ), + std::make_tuple( + p_pl_counter::total_events_copied_to_memory, + "proxysql_pgsql_logger_events_copied_total", + "Total number of events copied to the in-memory/on-disk databases.", + metric_tags { { "target", "memory" } } + ), + std::make_tuple( + p_pl_counter::total_events_copied_to_disk, + "proxysql_pgsql_logger_events_copied_total", + "Total number of events copied to the in-memory/on-disk databases.", + metric_tags { { "target", "disk" } } + ), + std::make_tuple( + p_pl_counter::circular_buffer_events_added_count, + "proxysql_pgsql_logger_circular_buffer_events_total", + "The total number of events added/dropped to/from the circular buffer.", + metric_tags { { "type", "added" } } + ), + std::make_tuple( + p_pl_counter::circular_buffer_events_dropped_count, + "proxysql_pgsql_logger_circular_buffer_events_total", + "The total number of events added/dropped to/from the circular buffer.", + metric_tags { { "type", "dropped" } } + ), + }, + pl_gauge_vector { + std::make_tuple( + p_pl_gauge::circular_buffer_events_size, + "proxysql_pgsql_logger_circular_buffer_events", + "Number of events currently present in the circular buffer.", + metric_tags {} + ), + } +); static uint8_t encode_length(uint64_t len, unsigned char *hd) { if (len < 251) return 1; @@ -43,24 +148,146 @@ static inline int write_encoded_length(unsigned char *p, uint64_t val, uint8_t l return len; } +/** + * @brief Returns runtime max query length for PostgreSQL events buffer copies. + * + * @details The PGSQL events dump commands can run from either Admin protocol + * endpoint (MySQL/6032 or PostgreSQL/6132). To keep behavior identical across + * both, this helper reads the canonical runtime value from `GloPTH->variables` + * when available, instead of relying on protocol-thread TLS state. + */ +static size_t get_pgsql_eventslog_buffer_max_query_length_runtime() { + if (GloPTH != nullptr) { + return static_cast(GloPTH->variables.eventslog_buffer_max_query_length); + } + return static_cast(pgsql_thread___eventslog_buffer_max_query_length); +} + +/** + * @brief Returns runtime max in-memory rows for `stats_pgsql_query_events`. + * + * @details See `get_pgsql_eventslog_buffer_max_query_length_runtime()` for + * rationale on using `GloPTH->variables` rather than protocol-thread TLS. + */ +static size_t get_pgsql_eventslog_table_memory_size_runtime() { + if (GloPTH != nullptr) { + return static_cast(GloPTH->variables.eventslog_table_memory_size); + } + return static_cast(pgsql_thread___eventslog_table_memory_size); +} + PgSQL_Event::PgSQL_Event (PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len) { thread_id=_thread_id; username=_username; schemaname=_schemaname; + username_len=0; + schemaname_len=0; + client_stmt_name_len=0; start_time=_start_time; end_time=_end_time; query_digest=_query_digest; + query_ptr = NULL; + query_len = 0; client=_client; client_len=_client_len; et=_et; hid=UINT64_MAX; server=NULL; + server_len=0; extra_info = NULL; have_affected_rows=false; affected_rows=0; have_rows_sent=false; rows_sent=0; client_stmt_name=NULL; + sqlstate = NULL; + errmsg = NULL; + free_on_delete = false; // do not own pointers by default + free_error_on_delete = false; // only error fields can be independently owned + memset(buf, 0, sizeof(buf)); +} + +PgSQL_Event::PgSQL_Event(const PgSQL_Event& other) { + memcpy(this, &other, sizeof(PgSQL_Event)); + + if (other.username != nullptr) { + username = strdup(other.username); + } + if (other.schemaname != nullptr) { + schemaname = strdup(other.schemaname); + } + if (other.query_ptr != nullptr) { + size_t maxQueryLen = get_pgsql_eventslog_buffer_max_query_length_runtime(); + size_t lenToCopy = std::min(other.query_len, maxQueryLen); + query_ptr = (char*)malloc(lenToCopy + 1); + memcpy(query_ptr, other.query_ptr, lenToCopy); + query_ptr[lenToCopy] = '\0'; + query_len = lenToCopy; + } + if (other.server != nullptr) { + server = (char*)malloc(server_len + 1); + memcpy(server, other.server, server_len); + server[server_len] = '\0'; + } + if (other.client != nullptr) { + client = (char*)malloc(client_len + 1); + memcpy(client, other.client, client_len); + client[client_len] = '\0'; + } + if (other.extra_info != nullptr) { + extra_info = strdup(other.extra_info); + } + if (other.client_stmt_name != nullptr) { + client_stmt_name = strdup(other.client_stmt_name); + } + if (other.sqlstate != nullptr) { + sqlstate = strdup(other.sqlstate); + } + if (other.errmsg != nullptr) { + errmsg = strdup(other.errmsg); + } + + free_on_delete = true; + free_error_on_delete = false; +} + +PgSQL_Event::~PgSQL_Event() { + if (free_on_delete == true) { + if (username != nullptr) { + free(username); username = nullptr; + } + if (schemaname != nullptr) { + free(schemaname); schemaname = nullptr; + } + if (query_ptr != nullptr) { + free(query_ptr); query_ptr = nullptr; + } + if (server != nullptr) { + free(server); server = nullptr; + } + if (client != nullptr) { + free(client); client = nullptr; + } + if (extra_info != nullptr) { + free(extra_info); extra_info = nullptr; + } + if (client_stmt_name != nullptr) { + free(client_stmt_name); client_stmt_name = nullptr; + } + if (sqlstate != nullptr) { + free(sqlstate); sqlstate = nullptr; + } + if (errmsg != nullptr) { + free(errmsg); errmsg = nullptr; + } + } else if (free_error_on_delete == true) { + if (sqlstate != nullptr) { + free(sqlstate); sqlstate = nullptr; + } + if (errmsg != nullptr) { + free(errmsg); errmsg = nullptr; + } + } } void PgSQL_Event::set_client_stmt_name(char* client_stmt_name) { @@ -98,6 +325,22 @@ void PgSQL_Event::set_server(int _hid, const char *ptr, int len) { hid=_hid; } +void PgSQL_Event::set_error(const char* _sqlstate, const char* _errmsg) { + if (free_on_delete == true || free_error_on_delete == true) { + if (sqlstate != nullptr) { + free(sqlstate); + sqlstate = nullptr; + } + if (errmsg != nullptr) { + free(errmsg); + errmsg = nullptr; + } + } + sqlstate = (_sqlstate != nullptr) ? strdup(_sqlstate) : nullptr; + errmsg = (_errmsg != nullptr) ? strdup(_errmsg) : nullptr; + free_error_on_delete = true; +} + uint64_t PgSQL_Event::write(std::fstream *f, PgSQL_Session *sess) { uint64_t total_bytes=0; switch (et) { @@ -406,6 +649,12 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) { if (have_rows_sent == true) { j["rows_sent"] = rows_sent; } + if (sqlstate != nullptr) { + j["sqlstate"] = sqlstate; + } + if (errmsg != nullptr) { + j["error"] = errmsg; + } j["query"] = string(query_ptr, query_len); j["starttime_timestamp_us"] = start_time; { @@ -451,7 +700,7 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) { extern PgSQL_Query_Processor* GloPgQPro; -PgSQL_Logger::PgSQL_Logger() { +PgSQL_Logger::PgSQL_Logger() : metrics{0, 0, 0, 0, 0, 0, 0, 0, 0} { events.enabled=false; events.base_filename=NULL; events.datadir=NULL; @@ -471,6 +720,14 @@ PgSQL_Logger::PgSQL_Logger() { audit.logfile=NULL; audit.log_file_id=0; audit.max_log_file_size=100*1024*1024; + PgLogCB = new PgSQL_Logger_CircularBuffer(0); + + init_prometheus_counter_array( + pl_metrics_map, this->prom_metrics.p_counter_array + ); + init_prometheus_gauge_array( + pl_metrics_map, this->prom_metrics.p_gauge_array + ); }; PgSQL_Logger::~PgSQL_Logger() { @@ -482,6 +739,7 @@ PgSQL_Logger::~PgSQL_Logger() { free(audit.datadir); } free(audit.base_filename); + delete PgLogCB; }; void PgSQL_Logger::wrlock() { @@ -658,8 +916,11 @@ void PgSQL_Logger::audit_set_datadir(char *s) { }; void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { - if (events.enabled==false) return; - if (events.logfile==NULL) return; + int elmhs = pgsql_thread___eventslog_buffer_history_size; + if (elmhs == 0) { + if (events.enabled == false) return; + if (events.logfile == NULL) return; + } // 'PgSQL_Session::client_myds' could be NULL in case of 'RequestEnd' being called over a freshly created session // due to a failed 'CONNECTION_RESET'. Because this scenario isn't a client request, we just return. if (sess->client_myds==NULL || sess->client_myds->myconn== NULL) return; @@ -758,6 +1019,9 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { me.set_affected_rows(sess->CurrentQuery.affected_rows); } me.set_rows_sent(sess->CurrentQuery.rows_sent); + if (myds && myds->myconn && myds->myconn->is_error_present()) { + me.set_error(myds->myconn->get_error_code_str(), myds->myconn->get_error_message().c_str()); + } int sl=0; char *sa=(char *)""; // default @@ -782,17 +1046,20 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { // right before the write to disk //wrlock(); - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloPgSQL_Logger->wrlock(); - - me.write(events.logfile, sess); - - - unsigned long curpos=events.logfile->tellp(); - if (curpos > events.max_log_file_size) { - events_flush_log_unlocked(); + if ((events.enabled == true) && (events.logfile != nullptr)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump + GloPgSQL_Logger->wrlock(); + me.write(events.logfile, sess); + unsigned long curpos=events.logfile->tellp(); + if (curpos > events.max_log_file_size) { + events_flush_log_unlocked(); + } + wrunlock(); + } + if (PgLogCB->buffer_size.load(std::memory_order_relaxed) != 0) { + PgSQL_Event* me2 = new PgSQL_Event(me); + PgLogCB->insert(me2); } - wrunlock(); if (cl && sess->client_myds->addr.port) { free(ca); @@ -1057,3 +1324,228 @@ void PgSQL_Logger::print_version() { fprintf(stderr,"Standard ProxySQL PgSQL Logger rev. %s -- %s -- %s\n", PROXYSQL_PGSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); } +PgSQL_Logger_CircularBuffer::PgSQL_Logger_CircularBuffer(size_t size) : + event_buffer(), + eventsAddedCount(0), eventsDroppedCount(0), + buffer_size(size) {} + +PgSQL_Logger_CircularBuffer::~PgSQL_Logger_CircularBuffer() { + std::lock_guard lock(mutex); + for (PgSQL_Event* event : event_buffer) { + delete event; + } +} + +void PgSQL_Logger_CircularBuffer::insert(PgSQL_Event* event) { + std::lock_guard lock(mutex); + eventsAddedCount++; + const size_t current_buffer_size = buffer_size.load(std::memory_order_relaxed); + if (current_buffer_size == 0) { + delete event; + eventsDroppedCount++; + return; + } + while (event_buffer.size() >= current_buffer_size) { + delete event_buffer.front(); + event_buffer.pop_front(); + eventsDroppedCount++; + } + event_buffer.push_back(event); +} + +void PgSQL_Logger_CircularBuffer::get_all_events(std::vector& events) { + std::lock_guard lock(mutex); + events.reserve(event_buffer.size()); + events.insert(events.end(), event_buffer.begin(), event_buffer.end()); + event_buffer.clear(); +} + +size_t PgSQL_Logger_CircularBuffer::size() { + std::lock_guard lock(mutex); + return event_buffer.size(); +} + +size_t PgSQL_Logger_CircularBuffer::getBufferSize() const { + return buffer_size.load(std::memory_order_relaxed); +} + +void PgSQL_Logger_CircularBuffer::setBufferSize(size_t newSize) { + std::lock_guard lock(mutex); + buffer_size.store(newSize, std::memory_order_relaxed); + while (event_buffer.size() > newSize) { + delete event_buffer.front(); + event_buffer.pop_front(); + eventsDroppedCount++; + } +} + +void PgSQL_Logger::insertPgSQLEventsIntoDb(SQLite3DB* db, const std::string& tableName, size_t numEvents, std::vector::const_iterator begin) { + int rc = 0; + const int numcols = 17; + + std::string coldefs = "(thread_id, username, database, start_time, end_time, query_digest, query, server, client, event_type, hid, extra_info, affected_rows, rows_sent, client_stmt_name, sqlstate, error)"; + std::string query1s = "INSERT INTO " + tableName + coldefs + " VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)"; + std::string query32s = "INSERT INTO " + tableName + coldefs + " VALUES " + generate_multi_rows_query(32, numcols); + + auto [rc1, statement1_unique] = db->prepare_v2(query1s.c_str()); + ASSERT_SQLITE_OK(rc1, db); + auto [rc2, statement32_unique] = db->prepare_v2(query32s.c_str()); + ASSERT_SQLITE_OK(rc2, db); + sqlite3_stmt* statement1 = statement1_unique.get(); + sqlite3_stmt* statement32 = statement32_unique.get(); + + auto bind_text_or_null = [&](sqlite3_stmt* stmt, int idx, const char* v) { + if (v) { + rc = (*proxy_sqlite3_bind_text)(stmt, idx, v, -1, SQLITE_TRANSIENT); + } else { + rc = (*proxy_sqlite3_bind_null)(stmt, idx); + } + ASSERT_SQLITE_OK(rc, db); + }; + + char digest_hex_str[20]; + db->execute("BEGIN"); + + int row_idx = 0; + int max_bulk_row_idx = (numEvents / 32) * 32; + for (std::vector::const_iterator it = begin; it != begin + numEvents; ++it) { + PgSQL_Event* event = *it; + int idx = row_idx % 32; + sqlite3_stmt* stmt = (row_idx < max_bulk_row_idx ? statement32 : statement1); + int base = (row_idx < max_bulk_row_idx ? idx * numcols : 0); + + rc = (*proxy_sqlite3_bind_int)(stmt, base + 1, event->thread_id); ASSERT_SQLITE_OK(rc, db); + bind_text_or_null(stmt, base + 2, event->username); + bind_text_or_null(stmt, base + 3, event->schemaname); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 4, event->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 5, event->end_time); ASSERT_SQLITE_OK(rc, db); + sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest); + bind_text_or_null(stmt, base + 6, digest_hex_str); + bind_text_or_null(stmt, base + 7, event->query_ptr); + bind_text_or_null(stmt, base + 8, event->server); + bind_text_or_null(stmt, base + 9, event->client); + rc = (*proxy_sqlite3_bind_int)(stmt, base + 10, (int)event->et); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 11, event->hid); ASSERT_SQLITE_OK(rc, db); + bind_text_or_null(stmt, base + 12, event->extra_info); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 13, event->affected_rows); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 14, event->rows_sent); ASSERT_SQLITE_OK(rc, db); + bind_text_or_null(stmt, base + 15, event->client_stmt_name); + bind_text_or_null(stmt, base + 16, event->sqlstate); + bind_text_or_null(stmt, base + 17, event->errmsg); + + if (row_idx < max_bulk_row_idx) { + if (idx == 31) { + SAFE_SQLITE3_STEP2(statement32); + rc = (*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, db); + } + } else { + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + + row_idx++; + } + + db->execute("COMMIT"); +} + +int PgSQL_Logger::processEvents(SQLite3DB* statsdb, SQLite3DB* statsdb_disk) { + unsigned long long startTimeMicros = monotonic_time(); + std::vector events = {}; + PgLogCB->get_all_events(events); + metrics.getAllEventsCallsCount++; + if (events.empty()) return 0; + + unsigned long long afterGetAllEventsTimeMicros = monotonic_time(); + metrics.getAllEventsEventsCount += events.size(); + metrics.totalGetAllEventsTimeMicros += (afterGetAllEventsTimeMicros - startTimeMicros); + + if (statsdb_disk != nullptr) { + unsigned long long diskStartTimeMicros = monotonic_time(); + insertPgSQLEventsIntoDb(statsdb_disk, "history_pgsql_query_events", events.size(), events.begin()); + unsigned long long diskEndTimeMicros = monotonic_time(); + metrics.diskCopyCount++; + metrics.totalDiskCopyTimeMicros += (diskEndTimeMicros - diskStartTimeMicros); + metrics.totalEventsCopiedToDisk += events.size(); + } + + if (statsdb != nullptr) { + unsigned long long memoryStartTimeMicros = monotonic_time(); + size_t maxInMemorySize = get_pgsql_eventslog_table_memory_size_runtime(); + size_t numEventsToInsert = std::min(events.size(), maxInMemorySize); + + if (events.size() >= maxInMemorySize) { + statsdb->execute("DELETE FROM stats_pgsql_query_events"); + } else { + int current_rows_i = statsdb->return_one_int((char*)"SELECT COUNT(*) FROM stats_pgsql_query_events"); + size_t current_rows = (current_rows_i > 0) ? static_cast(current_rows_i) : 0; + size_t rows_to_keep = maxInMemorySize - events.size(); + if (current_rows > rows_to_keep) { + size_t rows_to_delete = (current_rows - rows_to_keep); + std::string query = "SELECT MAX(id) FROM (SELECT id FROM stats_pgsql_query_events ORDER BY id LIMIT " + std::to_string(rows_to_delete) + ")"; + int maxIdToDelete = statsdb->return_one_int(query.c_str()); + std::string delete_stmt = "DELETE FROM stats_pgsql_query_events WHERE id <= " + std::to_string(maxIdToDelete); + statsdb->execute(delete_stmt.c_str()); + } + } + if (numEventsToInsert > 0) { + auto begin_it = events.begin(); + if (events.size() > numEventsToInsert) { + begin_it = events.end() - numEventsToInsert; + } + insertPgSQLEventsIntoDb(statsdb, "stats_pgsql_query_events", numEventsToInsert, begin_it); + } + unsigned long long memoryEndTimeMicros = monotonic_time(); + metrics.memoryCopyCount++; + metrics.totalMemoryCopyTimeMicros += (memoryEndTimeMicros - memoryStartTimeMicros); + metrics.totalEventsCopiedToMemory += numEventsToInsert; + } + + for (PgSQL_Event* event : events) { + delete event; + } + + return events.size(); +} + +std::unordered_map PgSQL_Logger::getAllMetrics() const { + std::unordered_map allMetrics; + + allMetrics["memoryCopyCount"] = metrics.memoryCopyCount; + allMetrics["diskCopyCount"] = metrics.diskCopyCount; + allMetrics["getAllEventsCallsCount"] = metrics.getAllEventsCallsCount; + allMetrics["getAllEventsEventsCount"] = metrics.getAllEventsEventsCount; + allMetrics["totalMemoryCopyTimeMicros"] = metrics.totalMemoryCopyTimeMicros; + allMetrics["totalDiskCopyTimeMicros"] = metrics.totalDiskCopyTimeMicros; + allMetrics["totalGetAllEventsTimeMicros"] = metrics.totalGetAllEventsTimeMicros; + allMetrics["totalEventsCopiedToMemory"] = metrics.totalEventsCopiedToMemory; + allMetrics["totalEventsCopiedToDisk"] = metrics.totalEventsCopiedToDisk; + allMetrics["circularBufferEventsAddedCount"] = PgLogCB->getEventsAddedCount(); + allMetrics["circularBufferEventsDroppedCount"] = PgLogCB->getEventsDroppedCount(); + allMetrics["circularBufferEventsSize"] = PgLogCB->size(); + + return allMetrics; +} + +void PgSQL_Logger::p_update_metrics() { + using pl_c = p_pl_counter; + const auto& counters { this->prom_metrics.p_counter_array }; + + p_update_counter(counters[pl_c::memory_copy_count], metrics.memoryCopyCount); + p_update_counter(counters[pl_c::disk_copy_count], metrics.diskCopyCount); + p_update_counter(counters[pl_c::get_all_events_calls_count], metrics.getAllEventsCallsCount); + p_update_counter(counters[pl_c::get_all_events_events_count], metrics.getAllEventsEventsCount); + p_update_counter(counters[pl_c::total_memory_copy_time_us], metrics.totalMemoryCopyTimeMicros / (1000.0 * 1000)); + p_update_counter(counters[pl_c::total_disk_copy_time_us], metrics.totalDiskCopyTimeMicros / (1000.0 * 1000)); + p_update_counter(counters[pl_c::total_get_all_events_time_us], metrics.totalGetAllEventsTimeMicros / (1000.0 * 1000)); + p_update_counter(counters[pl_c::total_events_copied_to_memory], metrics.totalEventsCopiedToMemory); + p_update_counter(counters[pl_c::total_events_copied_to_disk], metrics.totalEventsCopiedToDisk); + p_update_counter(counters[pl_c::circular_buffer_events_added_count], PgLogCB->getEventsAddedCount()); + p_update_counter(counters[pl_c::circular_buffer_events_dropped_count], PgLogCB->getEventsDroppedCount()); + + using pl_g = p_pl_gauge; + const auto& gauges { this->prom_metrics.p_gauge_array }; + gauges[pl_g::circular_buffer_events_size]->Set(PgLogCB->size()); +} diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index c3aa74136..383ef3015 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -291,6 +291,9 @@ static char* pgsql_thread_variables_names[] = { (char*)"connect_timeout_server_max", (char*)"eventslog_filename", (char*)"eventslog_filesize", + (char*)"eventslog_buffer_history_size", + (char*)"eventslog_table_memory_size", + (char*)"eventslog_buffer_max_query_length", (char*)"eventslog_default_log", (char*)"eventslog_format", (char*)"auditlog_filename", @@ -1129,6 +1132,9 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() { variables.interfaces = strdup((char*)""); variables.eventslog_filename = strdup((char*)""); // proxysql-mysql-eventslog is recommended variables.eventslog_filesize = 100 * 1024 * 1024; + variables.eventslog_buffer_history_size = 0; + variables.eventslog_table_memory_size = 10000; + variables.eventslog_buffer_max_query_length = 32 * 1024; variables.eventslog_default_log = 0; variables.eventslog_format = 1; variables.auditlog_filename = strdup((char*)""); @@ -2240,6 +2246,9 @@ char** PgSQL_Threads_Handler::get_variables_list() { // logs VariablesPointers_int["auditlog_filesize"] = make_tuple(&variables.auditlog_filesize, 1024 * 1024, 1 * 1024 * 1024 * 1024, false); VariablesPointers_int["eventslog_filesize"] = make_tuple(&variables.eventslog_filesize, 1024 * 1024, 1 * 1024 * 1024 * 1024, false); + VariablesPointers_int["eventslog_buffer_history_size"] = make_tuple(&variables.eventslog_buffer_history_size, 0, 8 * 1024 * 1024, false); + VariablesPointers_int["eventslog_table_memory_size"] = make_tuple(&variables.eventslog_table_memory_size, 0, 8 * 1024 * 1024, false); + VariablesPointers_int["eventslog_buffer_max_query_length"] = make_tuple(&variables.eventslog_buffer_max_query_length, 128, 32 * 1024 * 1024, false); VariablesPointers_int["eventslog_default_log"] = make_tuple(&variables.eventslog_default_log, 0, 1, false); // various VariablesPointers_int["long_query_time"] = make_tuple(&variables.long_query_time, 0, 20 * 24 * 3600 * 1000, false); @@ -4011,6 +4020,12 @@ void PgSQL_Thread::refresh_variables() { if (pgsql_thread___eventslog_filename) free(pgsql_thread___eventslog_filename); pgsql_thread___eventslog_filesize = GloPTH->get_variable_int((char*)"eventslog_filesize"); + pgsql_thread___eventslog_buffer_history_size = GloPTH->get_variable_int((char*)"eventslog_buffer_history_size"); + if (GloPgSQL_Logger && GloPgSQL_Logger->PgLogCB->getBufferSize() != static_cast(pgsql_thread___eventslog_buffer_history_size)) { + GloPgSQL_Logger->PgLogCB->setBufferSize(pgsql_thread___eventslog_buffer_history_size); + } + pgsql_thread___eventslog_table_memory_size = GloPTH->get_variable_int((char*)"eventslog_table_memory_size"); + pgsql_thread___eventslog_buffer_max_query_length = GloPTH->get_variable_int((char*)"eventslog_buffer_max_query_length"); pgsql_thread___eventslog_default_log = GloPTH->get_variable_int((char*)"eventslog_default_log"); pgsql_thread___eventslog_format = GloPTH->get_variable_int((char*)"eventslog_format"); pgsql_thread___eventslog_filename = GloPTH->get_variable_string((char*)"eventslog_filename"); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 64b462da1..a0f2e73ed 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -377,6 +377,8 @@ static char * admin_variables_names[]= { (char *)"stats_mysql_connection_pool", (char *)"stats_mysql_query_cache", (char *)"stats_mysql_query_digest_to_disk", + (char *)"stats_mysql_eventslog_sync_buffer_to_disk", + (char *)"stats_pgsql_eventslog_sync_buffer_to_disk", (char *)"stats_system_cpu", (char *)"stats_system_memory", (char *)"mysql_ifaces", @@ -2048,6 +2050,8 @@ void ProxySQL_Admin::vacuum_stats(bool is_admin) { "stats_mysql_query_digest_reset", "stats_pgsql_query_digest", "stats_pgsql_query_digest_reset", + "stats_mysql_query_events", + "stats_pgsql_query_events", "stats_mysql_query_rules", "stats_pgsql_query_rules", "stats_mysql_users", @@ -2514,6 +2518,26 @@ __end_while_pool: curtime2 = curtime2/1000; proxy_info("Automatically saved stats_mysql_query_digest to disk: %llums to write %d entries\n", curtime2-curtime1, r1); } + if (GloProxyStats->MySQL_Logger_dump_eventslog_timetoget(curtime)) { + if (GloMyLogger != nullptr) { + unsigned long long curtime1 = monotonic_time(); + int r1 = GloMyLogger->processEvents(nullptr, SPA->statsdb_disk); + unsigned long long curtime2 = monotonic_time(); + curtime1 = curtime1 / 1000; + curtime2 = curtime2 / 1000; + proxy_info("Automatically dumped MySQL eventslog buffer to disk: %llums to write %d entries\n", curtime2 - curtime1, r1); + } + } + if (GloProxyStats->PgSQL_Logger_dump_eventslog_timetoget(curtime)) { + if (GloPgSQL_Logger != nullptr) { + unsigned long long curtime1 = monotonic_time(); + int r1 = GloPgSQL_Logger->processEvents(nullptr, SPA->statsdb_disk); + unsigned long long curtime2 = monotonic_time(); + curtime1 = curtime1 / 1000; + curtime2 = curtime2 / 1000; + proxy_info("Automatically dumped PgSQL eventslog buffer to disk: %llums to write %d entries\n", curtime2 - curtime1, r1); + } + } if (GloProxyStats->system_cpu_timetoget(curtime)) { GloProxyStats->system_cpu_sets(); } @@ -2704,6 +2728,10 @@ void update_modules_metrics() { if (GloMyLogger) { GloMyLogger->p_update_metrics(); } + // Update PostgreSQL logger metrics + if (GloPgSQL_Logger) { + GloPgSQL_Logger->p_update_metrics(); + } // Update admin metrics GloAdmin->p_update_metrics(); } @@ -2822,6 +2850,7 @@ ProxySQL_Admin::ProxySQL_Admin() : variables.stats_mysql_query_cache = 60; variables.stats_mysql_query_digest_to_disk = 0; variables.stats_mysql_eventslog_sync_buffer_to_disk = 0; + variables.stats_pgsql_eventslog_sync_buffer_to_disk = 0; variables.stats_system_cpu = 60; variables.stats_system_memory = 60; GloProxyStats->variables.stats_mysql_connection_pool = 60; @@ -2829,6 +2858,7 @@ ProxySQL_Admin::ProxySQL_Admin() : GloProxyStats->variables.stats_mysql_query_cache = 60; GloProxyStats->variables.stats_mysql_query_digest_to_disk = 0; GloProxyStats->variables.stats_mysql_eventslog_sync_buffer_to_disk = 0; + GloProxyStats->variables.stats_pgsql_eventslog_sync_buffer_to_disk = 0; GloProxyStats->variables.stats_system_cpu = 60; #ifndef NOJEM GloProxyStats->variables.stats_system_memory = 60; @@ -3756,6 +3786,10 @@ char * ProxySQL_Admin::get_variable(char *name) { snprintf(intbuf, sizeof(intbuf),"%d",variables.stats_mysql_eventslog_sync_buffer_to_disk); return strdup(intbuf); } + if (!strcasecmp(name,"stats_pgsql_eventslog_sync_buffer_to_disk")) { + snprintf(intbuf, sizeof(intbuf),"%d",variables.stats_pgsql_eventslog_sync_buffer_to_disk); + return strdup(intbuf); + } if (!strcasecmp(name,"stats_system_cpu")) { snprintf(intbuf, sizeof(intbuf),"%d",variables.stats_system_cpu); return strdup(intbuf); @@ -4098,6 +4132,16 @@ bool ProxySQL_Admin::set_variable(char *name, char *value, bool lock) { // this return false; } } + if (!strcasecmp(name,"stats_pgsql_eventslog_sync_buffer_to_disk")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 24*3600) { + variables.stats_pgsql_eventslog_sync_buffer_to_disk=intv; + GloProxyStats->variables.stats_pgsql_eventslog_sync_buffer_to_disk=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"stats_system_cpu")) { int intv=atoi(value); if (intv >= 0 && intv <= 600) { diff --git a/lib/ProxySQL_Admin_Stats.cpp b/lib/ProxySQL_Admin_Stats.cpp index a3de82061..5e369094e 100644 --- a/lib/ProxySQL_Admin_Stats.cpp +++ b/lib/ProxySQL_Admin_Stats.cpp @@ -19,6 +19,7 @@ #include "MySQL_Query_Processor.h" #include "PgSQL_Query_Processor.h" #include "MySQL_Logger.hpp" +#include "PgSQL_Logger.hpp" #ifdef PROXYSQLGENAI #include "MCP_Thread.h" #include "Query_Tool_Handler.h" @@ -52,6 +53,7 @@ extern PgSQL_Query_Processor* GloPgQPro; extern ProxySQL_Cluster *GloProxyCluster; extern ProxySQL_Statistics *GloProxyStats; extern MySQL_Logger *GloMyLogger; +extern PgSQL_Logger *GloPgSQL_Logger; void ProxySQL_Admin::p_update_metrics() { // Update proxysql_uptime @@ -783,6 +785,22 @@ void ProxySQL_Admin::stats___pgsql_global() { statsdb->execute(query); free(query); } + + if (GloPgSQL_Logger != nullptr) { + const string prefix = "PgSQL_Logger_"; + const string q_row_insert { "INSERT INTO stats_pgsql_global VALUES (?1, ?2)" }; + int rc = 0; + stmt_unique_ptr u_row_stmt { nullptr }; + std::tie(rc, u_row_stmt) = statsdb->prepare_v2(q_row_insert.c_str()); + ASSERT_SQLITE_OK(rc, statsdb); + sqlite3_stmt* const row_stmt { u_row_stmt.get() }; + std::unordered_map metrics = GloPgSQL_Logger->getAllMetrics(); + for (std::unordered_map::iterator it = metrics.begin(); it != metrics.end(); it++) { + string var_name = prefix + it->first; + sqlite3_global_stats_row_step(statsdb, row_stmt, var_name.c_str(), it->second); + } + } + statsdb->execute("COMMIT"); } diff --git a/lib/ProxySQL_Statistics.cpp b/lib/ProxySQL_Statistics.cpp index 5244fa1be..bc2f86411 100644 --- a/lib/ProxySQL_Statistics.cpp +++ b/lib/ProxySQL_Statistics.cpp @@ -278,6 +278,7 @@ void ProxySQL_Statistics::init() { insert_into_tables_defs(tables_defs_statsdb_disk,"history_mysql_query_events", STATSDB_SQLITE_TABLE_HISTORY_MYSQL_QUERY_EVENTS); + insert_into_tables_defs(tables_defs_statsdb_disk,"history_pgsql_query_events", STATSDB_SQLITE_TABLE_HISTORY_PGSQL_QUERY_EVENTS); // TSDB tables insert_into_tables_defs(tables_defs_statsdb_disk,"tsdb_metrics", STATSDB_SQLITE_TABLE_TSDB_METRICS); @@ -304,6 +305,9 @@ void ProxySQL_Statistics::init() { statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_events_start_time ON history_mysql_query_events(start_time)"); statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_mysql_query_events_query_digest ON history_mysql_query_events(query_digest)"); + statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_pgsql_query_events_start_time ON history_pgsql_query_events(start_time)"); + statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_history_pgsql_query_events_query_digest ON history_pgsql_query_events(query_digest)"); + statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_tsdb_metrics_metric_time ON tsdb_metrics(metric_name, timestamp)"); statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_tsdb_metrics_hour_metric_bucket ON tsdb_metrics_hour(metric_name, bucket)"); statsdb_disk->execute("CREATE INDEX IF NOT EXISTS idx_tsdb_backend_health_time ON tsdb_backend_health(timestamp)"); @@ -418,6 +422,18 @@ bool ProxySQL_Statistics::MySQL_Logger_dump_eventslog_timetoget(unsigned long lo return false; } +bool ProxySQL_Statistics::PgSQL_Logger_dump_eventslog_timetoget(unsigned long long currentTimeMicros) { + if (variables.stats_pgsql_eventslog_sync_buffer_to_disk) { // only proceed if not zero + unsigned long long t = variables.stats_pgsql_eventslog_sync_buffer_to_disk; // originally in seconds + t = t * 1000 * 1000; + if (currentTimeMicros > last_timer_pgsql_dump_eventslog_to_disk + t) { + last_timer_pgsql_dump_eventslog_to_disk = currentTimeMicros; + return true; + } + } + return false; +} + bool ProxySQL_Statistics::MySQL_Threads_Handler_timetoget(unsigned long long curtime) { unsigned int i = (unsigned int)variables.stats_mysql_connections; if (i) { diff --git a/test/tap/groups/groups.json b/test/tap/groups/groups.json index f4e6be2f0..e0ad91d54 100644 --- a/test/tap/groups/groups.json +++ b/test/tap/groups/groups.json @@ -48,6 +48,8 @@ "pgsql-notice_test-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], "pgsql-query_cache_soft_ttl_pct-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], "pgsql-query_cache_test-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], + "pgsql_query_logging_autodump-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], + "pgsql_query_logging_memory-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], "pgsql-reg_test_4707_threshold_resultset_size-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], "pgsql-reg_test_4716_single_semicolon-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], "pgsql-reg_test_4867_query_rules-t" : [ "default-g1","mysql-auto_increment_delay_multiplex=0-g1","mysql-multiplexing=false-g1","mysql-query_digests=0-g1","mysql-query_digests_keep_comment=1-g1" ], @@ -247,6 +249,7 @@ "pgsql-query_digests_stages_test-t": [ "default-g4", "mysql-auto_increment_delay_multiplex=0-g4", "mysql-multiplexing=false-g4", "mysql-query_digests=0-g4", "mysql-query_digests_keep_comment=1-g4" ], "pgsql_admin_metacmds-t": [ "default-g4", "mysql-auto_increment_delay_multiplex=0-g4", "mysql-multiplexing=false-g4", "mysql-query_digests=0-g4", "mysql-query_digests_keep_comment=1-g4" ], "pgsql-monitor_ssl_connections_test-t": [ "default-g4", "mysql-auto_increment_delay_multiplex=0-g4", "mysql-multiplexing=false-g4", "mysql-query_digests=0-g4", "mysql-query_digests_keep_comment=1-g4" ], + "pgsql_query_logging_autodump-t": [ "default-g4" ], "pgsql-parameterized_kill_queries_test-t": [ "default-g4", "mysql-auto_increment_delay_multiplex=0-g4", "mysql-multiplexing=false-g4", "mysql-query_digests=0-g4", "mysql-query_digests_keep_comment=1-g4" ], "pgsql-reg_test_5284_frontend_ssl_enforcement-t": [ "default-g4", "mysql-auto_increment_delay_multiplex=0-g4", "mysql-multiplexing=false-g4", "mysql-query_digests=0-g4", "mysql-query_digests_keep_comment=1-g4" ], "pgsql-reg_test_5273_bind_parameter_format-t": [ "default-g4", "mysql-auto_increment_delay_multiplex=0-g4", "mysql-multiplexing=false-g4", "mysql-query_digests=0-g4", "mysql-query_digests_keep_comment=1-g4" ], diff --git a/test/tap/tests/pgsql_query_logging_autodump-t.cpp b/test/tap/tests/pgsql_query_logging_autodump-t.cpp new file mode 100644 index 000000000..77f60a019 --- /dev/null +++ b/test/tap/tests/pgsql_query_logging_autodump-t.cpp @@ -0,0 +1,194 @@ +/** + * @file pgsql_query_logging_autodump-t.cpp + * @brief TAP test for PostgreSQL eventslog automatic buffer-to-disk sync. + */ + +#include +#include +#include +#include + +#include "libpq-fe.h" + +#include "command_line.h" +#include "tap.h" +#include "utils.h" + +using PGConnPtr = std::unique_ptr; +using std::string; + +/** + * @brief Creates a PostgreSQL connection from a libpq connection string. + */ +PGConnPtr create_connection(const std::string& conn_info) { + PGconn* conn = PQconnectdb(conn_info.c_str()); + if (conn == nullptr || PQstatus(conn) != CONNECTION_OK) { + if (conn) { + diag("Connection failed: %s", PQerrorMessage(conn)); + PQfinish(conn); + } else { + diag("Connection failed: PQconnectdb returned nullptr"); + } + return PGConnPtr(nullptr, &PQfinish); + } + return PGConnPtr(conn, &PQfinish); +} + +/** + * @brief Executes a query and expects command-ok or tuples-ok. + */ +bool exec_ok(PGconn* conn, const std::string& query) { + PGresult* res = PQexec(conn, query.c_str()); + if (res == nullptr) { + diag("Query failed (null result): %s", query.c_str()); + return false; + } + ExecStatusType status = PQresultStatus(res); + bool ok_status = (status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK); + if (!ok_status) { + diag("Query failed: %s", query.c_str()); + diag("Error: %s", PQresultErrorMessage(res)); + } + PQclear(res); + return ok_status; +} + +/** + * @brief Executes scalar query returning one integer result. + */ +bool query_one_int(PGconn* conn, const std::string& query, long long& value) { + PGresult* res = PQexec(conn, query.c_str()); + if (res == nullptr) { + diag("Scalar query failed (null result): %s", query.c_str()); + return false; + } + if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1 || PQnfields(res) != 1) { + diag("Scalar query returned unexpected shape: %s", query.c_str()); + diag("Error: %s", PQresultErrorMessage(res)); + PQclear(res); + return false; + } + value = atoll(PQgetvalue(res, 0, 0)); + PQclear(res); + return true; +} + +/** + * @brief Waits until row count in history table has increased by at least delta. + */ +bool wait_for_history_delta(PGconn* admin_conn, long long baseline, long long delta, int timeout_seconds) { + for (int i = 0; i < timeout_seconds; ++i) { + long long count = 0; + if (!query_one_int(admin_conn, "SELECT COUNT(*) FROM history_pgsql_query_events", count)) { + return false; + } + if (count - baseline >= delta) { + return true; + } + sleep(1); + } + return false; +} + +int main() { + CommandLine cl; + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + plan(8); + + std::stringstream admin_ss; + admin_ss << "host=" << cl.pgsql_admin_host + << " port=" << cl.pgsql_admin_port + << " user=" << cl.admin_username + << " password=" << cl.admin_password + << " dbname=postgres"; + PGConnPtr admin_conn = create_connection(admin_ss.str()); + ok(admin_conn != nullptr, "Connected to PostgreSQL admin interface"); + if (!admin_conn) { + return exit_status(); + } + + std::stringstream backend_ss; + backend_ss << "host=" << cl.pgsql_host + << " port=" << cl.pgsql_port + << " user=" << cl.pgsql_username + << " password=" << cl.pgsql_password; + PGConnPtr backend_conn = create_connection(backend_ss.str()); + ok(backend_conn != nullptr, "Connected to PostgreSQL frontend interface"); + if (!backend_conn) { + return exit_status(); + } + + bool setup_ok = true; + setup_ok = setup_ok && exec_ok(admin_conn.get(), "SET pgsql-eventslog_buffer_history_size=1000000"); + setup_ok = setup_ok && exec_ok(admin_conn.get(), "SET pgsql-eventslog_default_log=1"); + setup_ok = setup_ok && exec_ok(admin_conn.get(), "SET admin-stats_pgsql_eventslog_sync_buffer_to_disk=1"); + setup_ok = setup_ok && exec_ok(admin_conn.get(), "LOAD PGSQL VARIABLES TO RUNTIME"); + setup_ok = setup_ok && exec_ok(admin_conn.get(), "LOAD ADMIN VARIABLES TO RUNTIME"); + setup_ok = setup_ok && exec_ok(admin_conn.get(), "DUMP PGSQL EVENTSLOG FROM BUFFER TO DISK"); + setup_ok = setup_ok && exec_ok(admin_conn.get(), "DELETE FROM history_pgsql_query_events"); + ok(setup_ok, "Configured PGSQL eventslog buffer and auto-dump scheduler"); + if (!setup_ok) { + return exit_status(); + } + + long long baseline = 0; + bool baseline_ok = query_one_int(admin_conn.get(), "SELECT COUNT(*) FROM history_pgsql_query_events", baseline); + ok(baseline_ok, "Collected baseline row count from history_pgsql_query_events"); + if (!baseline_ok) { + return exit_status(); + } + + const int num_queries = 30; + bool run_queries_ok = true; + for (int i = 0; i < num_queries; ++i) { + PGresult* res = PQexec(backend_conn.get(), "SELECT 1"); + if (res == nullptr || PQresultStatus(res) != PGRES_TUPLES_OK) { + run_queries_ok = false; + if (res) { + diag("Query failed during workload generation: %s", PQresultErrorMessage(res)); + } else { + diag("Query failed during workload generation: null result"); + } + if (res) PQclear(res); + break; + } + PQclear(res); + } + ok(run_queries_ok, "Generated PostgreSQL query workload"); + if (!run_queries_ok) { + return exit_status(); + } + + bool auto_dump_ok = wait_for_history_delta(admin_conn.get(), baseline, num_queries, 20); + ok(auto_dump_ok, "Automatic scheduler dumped buffered PGSQL events to disk"); + + long long success_rows = -1; + bool success_rows_ok = query_one_int( + admin_conn.get(), + "SELECT COUNT(*) FROM history_pgsql_query_events WHERE sqlstate IS NULL", + success_rows + ); + if (!success_rows_ok || success_rows < num_queries) { + diag( + "Expected >= %d successful rows, got %lld (query_ok=%s)", + num_queries, + success_rows, + success_rows_ok ? "true" : "false" + ); + } + ok(success_rows_ok && success_rows >= num_queries, "History table includes expected successful rows"); + + bool cleanup_ok = true; + cleanup_ok = cleanup_ok && exec_ok(admin_conn.get(), "SET admin-stats_pgsql_eventslog_sync_buffer_to_disk=0"); + cleanup_ok = cleanup_ok && exec_ok(admin_conn.get(), "SET pgsql-eventslog_default_log=0"); + cleanup_ok = cleanup_ok && exec_ok(admin_conn.get(), "SET pgsql-eventslog_buffer_history_size=0"); + cleanup_ok = cleanup_ok && exec_ok(admin_conn.get(), "LOAD ADMIN VARIABLES TO RUNTIME"); + cleanup_ok = cleanup_ok && exec_ok(admin_conn.get(), "LOAD PGSQL VARIABLES TO RUNTIME"); + ok(cleanup_ok, "Cleanup completed and auto-dump scheduler disabled"); + + return exit_status(); +} diff --git a/test/tap/tests/pgsql_query_logging_memory-t.cpp b/test/tap/tests/pgsql_query_logging_memory-t.cpp new file mode 100644 index 000000000..f491f12dc --- /dev/null +++ b/test/tap/tests/pgsql_query_logging_memory-t.cpp @@ -0,0 +1,320 @@ +/** + * @file pgsql_query_logging_memory-t.cpp + * @brief TAP test for PostgreSQL advanced query logging in memory and history tables. + */ + +#include +#include +#include +#include +#include + +#include "libpq-fe.h" + +#include "command_line.h" +#include "tap.h" +#include "utils.h" + +using PGConnPtr = std::unique_ptr; +using std::string; + +/** + * @brief Opens a PostgreSQL connection using the supplied connection parameters. + */ +PGConnPtr create_connection(const std::string& conn_info) { + PGconn* conn = PQconnectdb(conn_info.c_str()); + if (!conn || PQstatus(conn) != CONNECTION_OK) { + if (conn) { + diag("Connection failed: %s", PQerrorMessage(conn)); + PQfinish(conn); + } else { + diag("Connection failed: PQconnectdb returned nullptr"); + } + return PGConnPtr(nullptr, &PQfinish); + } + return PGConnPtr(conn, &PQfinish); +} + +/** + * @brief Executes a statement and expects either command-ok or tuples-ok result. + */ +bool exec_ok(PGconn* conn, const std::string& query) { + PGresult* res = PQexec(conn, query.c_str()); + if (res == nullptr) { + diag("Query failed (null result): %s", query.c_str()); + return false; + } + ExecStatusType st = PQresultStatus(res); + bool ok_status = (st == PGRES_COMMAND_OK || st == PGRES_TUPLES_OK); + if (!ok_status) { + diag("Query failed: %s", query.c_str()); + diag("Error: %s", PQresultErrorMessage(res)); + } + PQclear(res); + return ok_status; +} + +/** + * @brief Executes a scalar query returning one integer value. + */ +bool query_one_int(PGconn* conn, const std::string& query, long long& value) { + PGresult* res = PQexec(conn, query.c_str()); + if (res == nullptr) { + diag("Scalar query returned null result: %s", query.c_str()); + return false; + } + if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1 || PQnfields(res) != 1) { + diag("Scalar query returned unexpected shape: %s", query.c_str()); + diag("Error: %s", PQresultErrorMessage(res)); + PQclear(res); + return false; + } + value = atoll(PQgetvalue(res, 0, 0)); + PQclear(res); + return true; +} + +/** + * @brief Validates field names for a query result. + */ +bool check_columns(PGconn* conn, const std::string& query, const std::vector& expected_columns) { + PGresult* res = PQexec(conn, query.c_str()); + if (res == nullptr) { + diag("Column check query returned null result: %s", query.c_str()); + return false; + } + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + diag("Column check query failed: %s", query.c_str()); + diag("Error: %s", PQresultErrorMessage(res)); + PQclear(res); + return false; + } + + bool same_count = (PQnfields(res) == static_cast(expected_columns.size())); + if (!same_count) { + diag("Column count mismatch for query: %s", query.c_str()); + diag("Expected: %zu, got: %d", expected_columns.size(), PQnfields(res)); + PQclear(res); + return false; + } + + for (int i = 0; i < PQnfields(res); ++i) { + const char* actual = PQfname(res, i); + if (actual == nullptr || expected_columns[i] != actual) { + diag("Column mismatch at position %d for query: %s", i, query.c_str()); + diag("Expected: %s, got: %s", expected_columns[i].c_str(), (actual ? actual : "")); + PQclear(res); + return false; + } + } + + PQclear(res); + return true; +} + +/** + * @brief Reads SQLSTATE counts from the target events table. + */ +bool get_sqlstate_counts(PGconn* conn, const std::string& table_name, std::map& counts) { + const std::string query = + "SELECT COALESCE(sqlstate, ''), COUNT(*) " + "FROM " + table_name + " " + "GROUP BY COALESCE(sqlstate, '') " + "ORDER BY COALESCE(sqlstate, '')"; + + PGresult* res = PQexec(conn, query.c_str()); + if (res == nullptr) { + diag("SQLSTATE count query returned null result: %s", table_name.c_str()); + return false; + } + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + diag("SQLSTATE count query failed for table %s: %s", table_name.c_str(), PQresultErrorMessage(res)); + PQclear(res); + return false; + } + + counts.clear(); + for (int i = 0; i < PQntuples(res); ++i) { + const std::string sqlstate = PQgetvalue(res, i, 0); + const int count = atoi(PQgetvalue(res, i, 1)); + counts[sqlstate] = count; + } + + PQclear(res); + return true; +} + +int main() { + CommandLine cl; + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + const unsigned int num_selects = 200; + const std::vector expected_columns = { + "id", "thread_id", "username", "database", "start_time", "end_time", "query_digest", + "query", "server", "client", "event_type", "hid", "extra_info", "affected_rows", + "rows_sent", "client_stmt_name", "sqlstate", "error" + }; + + unsigned int p = 2; // table column checks + p += num_selects / 10; // successful SELECT checks + p += 3; // error checks + p += 10; // row accounting + SQLSTATE checks + plan(p); + + std::stringstream admin_ss; + admin_ss << "host=" << cl.pgsql_admin_host + << " port=" << cl.pgsql_admin_port + << " user=" << cl.admin_username + << " password=" << cl.admin_password + << " dbname=postgres"; + PGConnPtr admin_conn = create_connection(admin_ss.str()); + if (!admin_conn) { + return -1; + } + + ok( + check_columns(admin_conn.get(), "SELECT * FROM stats_pgsql_query_events LIMIT 0", expected_columns), + "stats_pgsql_query_events columns match expectation" + ); + ok( + check_columns(admin_conn.get(), "SELECT * FROM history_pgsql_query_events LIMIT 0", expected_columns), + "history_pgsql_query_events columns match expectation" + ); + + if (!exec_ok(admin_conn.get(), "SET pgsql-eventslog_buffer_history_size=1000000")) return EXIT_FAILURE; + if (!exec_ok(admin_conn.get(), "SET pgsql-eventslog_default_log=1")) return EXIT_FAILURE; + if (!exec_ok(admin_conn.get(), "LOAD PGSQL VARIABLES TO RUNTIME")) return EXIT_FAILURE; + if (!exec_ok(admin_conn.get(), "DUMP PGSQL EVENTSLOG FROM BUFFER TO BOTH")) return EXIT_FAILURE; + if (!exec_ok(admin_conn.get(), "DELETE FROM stats_pgsql_query_events")) return EXIT_FAILURE; + if (!exec_ok(admin_conn.get(), "DELETE FROM history_pgsql_query_events")) return EXIT_FAILURE; + + std::stringstream proxy_ss; + proxy_ss << "host=" << cl.pgsql_host + << " port=" << cl.pgsql_port + << " user=" << cl.pgsql_username + << " password=" << cl.pgsql_password; + PGConnPtr proxy_conn = create_connection(proxy_ss.str()); + if (!proxy_conn) { + return -1; + } + + for (unsigned int i = 0; i < num_selects; ++i) { + PGresult* res = PQexec(proxy_conn.get(), "SELECT 1"); + bool q_ok = (res != nullptr && PQresultStatus(res) == PGRES_TUPLES_OK); + if (!q_ok) { + diag("SELECT 1 failed at iteration %u: %s", i + 1, (res ? PQresultErrorMessage(res) : "null result")); + if (res) PQclear(res); + return EXIT_FAILURE; + } + PQclear(res); + if ((i + 1) % 10 == 0) { + ok(1, "SELECT 1 query successful (iteration %u)", i + 1); + } + } + + { + PGresult* res = PQexec(proxy_conn.get(), "SELEEEEECT 1"); + const char* sqlstate = (res ? PQresultErrorField(res, PG_DIAG_SQLSTATE) : nullptr); + ok(res && PQresultStatus(res) == PGRES_FATAL_ERROR && sqlstate && std::string(sqlstate) == "42601", + "Syntax error captured with SQLSTATE 42601"); + if (res) PQclear(res); + } + + { + PGresult* res = PQexec(proxy_conn.get(), "SELECT * FROM pgsql_non_existing_table_advanced_logging_test"); + const char* sqlstate = (res ? PQresultErrorField(res, PG_DIAG_SQLSTATE) : nullptr); + ok(res && PQresultStatus(res) == PGRES_FATAL_ERROR && sqlstate && std::string(sqlstate) == "42P01", + "Undefined table error captured with SQLSTATE 42P01"); + if (res) PQclear(res); + } + + { + PGresult* res = PQexec(proxy_conn.get(), "SELECT 1/0"); + const char* sqlstate = (res ? PQresultErrorField(res, PG_DIAG_SQLSTATE) : nullptr); + ok(res && PQresultStatus(res) == PGRES_FATAL_ERROR && sqlstate && std::string(sqlstate) == "22012", + "Division by zero error captured with SQLSTATE 22012"); + if (res) PQclear(res); + } + + if (!exec_ok(admin_conn.get(), "DUMP PGSQL EVENTSLOG FROM BUFFER TO BOTH")) return EXIT_FAILURE; + + const long long expected_total = static_cast(num_selects) + 3; + long long history_total = -1; + long long stats_total = -1; + long long history_success = -1; + long long stats_success = -1; + long long history_error_msg_missing = -1; + long long stats_error_msg_missing = -1; + + ok( + query_one_int(admin_conn.get(), "SELECT COUNT(*) FROM history_pgsql_query_events", history_total) && + history_total == expected_total, + "history_pgsql_query_events row count matches expectation" + ); + ok( + query_one_int(admin_conn.get(), "SELECT COUNT(*) FROM stats_pgsql_query_events", stats_total) && + stats_total == expected_total, + "stats_pgsql_query_events row count matches expectation" + ); + ok( + query_one_int(admin_conn.get(), "SELECT COUNT(*) FROM history_pgsql_query_events WHERE sqlstate IS NULL", history_success) && + history_success == static_cast(num_selects), + "history_pgsql_query_events success row count matches expectation" + ); + ok( + query_one_int(admin_conn.get(), "SELECT COUNT(*) FROM stats_pgsql_query_events WHERE sqlstate IS NULL", stats_success) && + stats_success == static_cast(num_selects), + "stats_pgsql_query_events success row count matches expectation" + ); + + std::map expected_sqlstate_counts = { + {"", static_cast(num_selects)}, + {"22012", 1}, + {"42601", 1}, + {"42P01", 1} + }; + std::map history_sqlstate_counts; + std::map stats_sqlstate_counts; + + ok( + get_sqlstate_counts(admin_conn.get(), "history_pgsql_query_events", history_sqlstate_counts) && + history_sqlstate_counts == expected_sqlstate_counts, + "history_pgsql_query_events SQLSTATE distribution matches expectation" + ); + ok( + get_sqlstate_counts(admin_conn.get(), "stats_pgsql_query_events", stats_sqlstate_counts) && + stats_sqlstate_counts == expected_sqlstate_counts, + "stats_pgsql_query_events SQLSTATE distribution matches expectation" + ); + + ok( + query_one_int( + admin_conn.get(), + "SELECT COUNT(*) FROM history_pgsql_query_events WHERE sqlstate IS NOT NULL AND (error IS NULL OR error='')", + history_error_msg_missing + ) && history_error_msg_missing == 0, + "history_pgsql_query_events has non-empty error messages for error rows" + ); + ok( + query_one_int( + admin_conn.get(), + "SELECT COUNT(*) FROM stats_pgsql_query_events WHERE sqlstate IS NOT NULL AND (error IS NULL OR error='')", + stats_error_msg_missing + ) && stats_error_msg_missing == 0, + "stats_pgsql_query_events has non-empty error messages for error rows" + ); + + ok( + exec_ok(admin_conn.get(), "DUMP PGSQL EVENTSLOG FROM BUFFER TO MEMORY"), + "DUMP PGSQL EVENTSLOG FROM BUFFER TO MEMORY succeeds" + ); + ok( + exec_ok(admin_conn.get(), "DUMP PGSQL EVENTSLOG FROM BUFFER TO DISK"), + "DUMP PGSQL EVENTSLOG FROM BUFFER TO DISK succeeds" + ); + + return exit_status(); +}