From acbd7d0df3723373144b2ac4ae68ec41dedc409e Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Wed, 18 Feb 2026 09:38:39 +0000 Subject: [PATCH] Implement PostgreSQL eventslog circular buffer and sink pipeline --- include/PgSQL_Logger.hpp | 116 ++++++++++ lib/PgSQL_Logger.cpp | 461 +++++++++++++++++++++++++++++++++++++-- lib/PgSQL_Thread.cpp | 3 + 3 files changed, 567 insertions(+), 13 deletions(-) diff --git a/include/PgSQL_Logger.hpp b/include/PgSQL_Logger.hpp index c3af60f86..456c9d19e 100644 --- a/include/PgSQL_Logger.hpp +++ b/include/PgSQL_Logger.hpp @@ -2,9 +2,47 @@ #define __CLASS_PGSQL_LOGGER_H #include "proxysql.h" #include "cpp.h" +#include +#include +#include +#include +#include +#include +#include #define PROXYSQL_LOGGER_PTHREAD_MUTEX +struct p_pl_counter { + enum metric { + memory_copy_count = 0, + disk_copy_count, + get_all_events_calls_count, + get_all_events_events_count, + total_memory_copy_time_us, + total_disk_copy_time_us, + total_get_all_events_time_us, + total_events_copied_to_memory, + total_events_copied_to_disk, + circular_buffer_events_added_count, + circular_buffer_events_dropped_count, + __size + }; +}; + +struct p_pl_gauge { + enum metric { + circular_buffer_events_size, + __size + }; +}; + +struct pl_metrics_map_idx { + enum index { + counters = 0, + gauges + }; +}; + enum class PGSQL_LOG_EVENT_TYPE { SIMPLE_QUERY, AUTH_OK, @@ -53,9 +91,14 @@ class PgSQL_Event { uint64_t affected_rows; uint64_t rows_sent; + char* sqlstate; + char* errmsg; + bool free_on_delete; public: PgSQL_Event(PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len); + PgSQL_Event(const PgSQL_Event& other); + ~PgSQL_Event(); uint64_t write(std::fstream *f, PgSQL_Session *sess); uint64_t write_query_format_1(std::fstream *f); uint64_t write_query_format_2_json(std::fstream *f); @@ -66,6 +109,36 @@ class PgSQL_Event { void set_extra_info(char *); void set_affected_rows(uint64_t ar); void set_rows_sent(uint64_t rs); + /** + * @brief Sets SQLSTATE and textual error information associated with this event. + * @param _sqlstate SQLSTATE code. + * @param _errmsg Error message string. + */ + void set_error(const char* _sqlstate, const char* _errmsg); + friend class PgSQL_Logger; +}; + +/** + * @brief Thread-safe circular buffer for advanced PostgreSQL query events logging. + */ +class PgSQL_Logger_CircularBuffer { +private: + std::deque event_buffer; + std::mutex mutex; + std::atomic eventsAddedCount; + std::atomic eventsDroppedCount; + +public: + std::atomic buffer_size; + explicit PgSQL_Logger_CircularBuffer(size_t size); + ~PgSQL_Logger_CircularBuffer(); + void insert(PgSQL_Event* event); + void get_all_events(std::vector& events); + size_t size(); + size_t getBufferSize() const; + void setBufferSize(size_t newSize); + unsigned long long getEventsAddedCount() const { return eventsAddedCount; } + unsigned long long getEventsDroppedCount() const { return eventsDroppedCount; } }; class PgSQL_Logger { @@ -86,6 +159,27 @@ class PgSQL_Logger { unsigned int max_log_file_size; std::fstream *logfile; } audit; + + /** + * @brief Accumulated runtime metrics for PostgreSQL advanced events logging. + */ + struct EventLogMetrics { + std::atomic memoryCopyCount; + std::atomic diskCopyCount; + std::atomic getAllEventsCallsCount; + std::atomic getAllEventsEventsCount; + std::atomic totalMemoryCopyTimeMicros; + std::atomic totalDiskCopyTimeMicros; + std::atomic totalGetAllEventsTimeMicros; + std::atomic totalEventsCopiedToMemory; + std::atomic totalEventsCopiedToDisk; + } metrics; + + struct { + std::array p_counter_array {}; + std::array p_gauge_array {}; + } prom_metrics; + #ifdef PROXYSQL_LOGGER_PTHREAD_MUTEX pthread_mutex_t wmutex; #else @@ -113,6 +207,28 @@ class PgSQL_Logger { void flush(); void wrlock(); void wrunlock(); + PgSQL_Logger_CircularBuffer* PgLogCB; + + /** + * @brief Inserts PostgreSQL events into a SQLite table using bulk prepared statements. + */ + void insertPgSQLEventsIntoDb(SQLite3DB* db, const std::string& tableName, size_t numEvents, std::vector::const_iterator begin); + + /** + * @brief Drains buffered PostgreSQL events and writes them to memory and/or disk stats tables. + * @return Number of drained events. + */ + int processEvents(SQLite3DB* statsdb, SQLite3DB* statsdb_disk); + + /** + * @brief Returns all PostgreSQL logger metrics for stats table export. + */ + std::unordered_map getAllMetrics() const; + + /** + * @brief Prometheus serial exposer update hook for PostgreSQL logger metrics. + */ + void p_update_metrics(); }; #endif /* __CLASS_PGSQL_LOGGER_H */ diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index e279059c4..e6d53b60b 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -3,6 +3,7 @@ using json = nlohmann::json; #define PROXYJSON #include +#include #include "proxysql.h" #include "cpp.h" @@ -24,6 +25,109 @@ using json = nlohmann::json; extern PgSQL_Logger *GloPgSQL_Logger; +using metric_name = std::string; +using metric_help = std::string; +using metric_tags = std::map; + +using pl_counter_tuple = + std::tuple< + p_pl_counter::metric, + metric_name, + metric_help, + metric_tags + >; + +using pl_gauge_tuple = + std::tuple< + p_pl_gauge::metric, + metric_name, + metric_help, + metric_tags + >; + +using pl_counter_vector = std::vector; +using pl_gauge_vector = std::vector; + +const std::tuple +pl_metrics_map = std::make_tuple( + pl_counter_vector { + std::make_tuple( + p_pl_counter::memory_copy_count, + "proxysql_pgsql_logger_copy_total", + "Number of times events were copied to the in-memory/on-disk databases.", + metric_tags { { "target", "memory" } } + ), + std::make_tuple( + p_pl_counter::disk_copy_count, + "proxysql_pgsql_logger_copy_total", + "Number of times events were copied to the in-memory/on-disk databases.", + metric_tags { { "target", "disk" } } + ), + std::make_tuple( + p_pl_counter::get_all_events_calls_count, + "proxysql_pgsql_logger_get_all_events_calls_total", + "Number of times the 'get_all_events' method was called.", + metric_tags {} + ), + std::make_tuple( + p_pl_counter::get_all_events_events_count, + "proxysql_pgsql_logger_get_all_events_events_total", + "Number of events retrieved by the `get_all_events` method.", + metric_tags {} + ), + std::make_tuple( + p_pl_counter::total_memory_copy_time_us, + "proxysql_pgsql_logger_copy_seconds_total", + "Total time spent copying events to the in-memory/on-disk databases.", + metric_tags { { "target", "memory" } } + ), + std::make_tuple( + p_pl_counter::total_disk_copy_time_us, + "proxysql_pgsql_logger_copy_seconds_total", + "Total time spent copying events to the in-memory/on-disk databases.", + metric_tags { { "target", "disk" } } + ), + std::make_tuple( + p_pl_counter::total_get_all_events_time_us, + "proxysql_pgsql_logger_get_all_events_seconds_total", + "Total time spent in `get_all_events` method.", + metric_tags {} + ), + std::make_tuple( + p_pl_counter::total_events_copied_to_memory, + "proxysql_pgsql_logger_events_copied_total", + "Total number of events copied to the in-memory/on-disk databases.", + metric_tags { { "target", "memory" } } + ), + std::make_tuple( + p_pl_counter::total_events_copied_to_disk, + "proxysql_pgsql_logger_events_copied_total", + "Total number of events copied to the in-memory/on-disk databases.", + metric_tags { { "target", "disk" } } + ), + std::make_tuple( + p_pl_counter::circular_buffer_events_added_count, + "proxysql_pgsql_logger_circular_buffer_events_total", + "The total number of events added/dropped to/from the circular buffer.", + metric_tags { { "type", "added" } } + ), + std::make_tuple( + p_pl_counter::circular_buffer_events_dropped_count, + "proxysql_pgsql_logger_circular_buffer_events_total", + "The total number of events added/dropped to/from the circular buffer.", + metric_tags { { "type", "dropped" } } + ), + }, + pl_gauge_vector { + std::make_tuple( + p_pl_gauge::circular_buffer_events_size, + "proxysql_pgsql_logger_circular_buffer_events", + "Number of events currently present in the circular buffer.", + metric_tags {} + ), + } +); + static uint8_t encode_length(uint64_t len, unsigned char *hd) { if (len < 251) return 1; if (len < 65536) { if (hd) { *hd=0xfc; }; return 3; } @@ -47,20 +151,105 @@ PgSQL_Event::PgSQL_Event (PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * thread_id=_thread_id; username=_username; schemaname=_schemaname; + username_len=0; + schemaname_len=0; + client_stmt_name_len=0; start_time=_start_time; end_time=_end_time; query_digest=_query_digest; + query_ptr = NULL; + query_len = 0; client=_client; client_len=_client_len; et=_et; hid=UINT64_MAX; server=NULL; + server_len=0; extra_info = NULL; have_affected_rows=false; affected_rows=0; have_rows_sent=false; rows_sent=0; client_stmt_name=NULL; + sqlstate = NULL; + errmsg = NULL; + free_on_delete = false; // do not own pointers by default + memset(buf, 0, sizeof(buf)); +} + +PgSQL_Event::PgSQL_Event(const PgSQL_Event& other) { + memcpy(this, &other, sizeof(PgSQL_Event)); + + if (other.username != nullptr) { + username = strdup(other.username); + } + if (other.schemaname != nullptr) { + schemaname = strdup(other.schemaname); + } + if (other.query_ptr != nullptr) { + size_t maxQueryLen = pgsql_thread___eventslog_buffer_max_query_length; + size_t lenToCopy = std::min(other.query_len, maxQueryLen); + query_ptr = (char*)malloc(lenToCopy + 1); + memcpy(query_ptr, other.query_ptr, lenToCopy); + query_ptr[lenToCopy] = '\0'; + query_len = lenToCopy; + } + if (other.server != nullptr) { + server = (char*)malloc(server_len + 1); + memcpy(server, other.server, server_len); + server[server_len] = '\0'; + } + if (other.client != nullptr) { + client = (char*)malloc(client_len + 1); + memcpy(client, other.client, client_len); + client[client_len] = '\0'; + } + if (other.extra_info != nullptr) { + extra_info = strdup(other.extra_info); + } + if (other.client_stmt_name != nullptr) { + client_stmt_name = strdup(other.client_stmt_name); + } + if (other.sqlstate != nullptr) { + sqlstate = strdup(other.sqlstate); + } + if (other.errmsg != nullptr) { + errmsg = strdup(other.errmsg); + } + + free_on_delete = true; +} + +PgSQL_Event::~PgSQL_Event() { + if (free_on_delete == true) { + if (username != nullptr) { + free(username); username = nullptr; + } + if (schemaname != nullptr) { + free(schemaname); schemaname = nullptr; + } + if (query_ptr != nullptr) { + free(query_ptr); query_ptr = nullptr; + } + if (server != nullptr) { + free(server); server = nullptr; + } + if (client != nullptr) { + free(client); client = nullptr; + } + if (extra_info != nullptr) { + free(extra_info); extra_info = nullptr; + } + if (client_stmt_name != nullptr) { + free(client_stmt_name); client_stmt_name = nullptr; + } + if (sqlstate != nullptr) { + free(sqlstate); sqlstate = nullptr; + } + if (errmsg != nullptr) { + free(errmsg); errmsg = nullptr; + } + } } void PgSQL_Event::set_client_stmt_name(char* client_stmt_name) { @@ -98,6 +287,11 @@ void PgSQL_Event::set_server(int _hid, const char *ptr, int len) { hid=_hid; } +void PgSQL_Event::set_error(const char* _sqlstate, const char* _errmsg) { + sqlstate = const_cast(_sqlstate); + errmsg = const_cast(_errmsg); +} + uint64_t PgSQL_Event::write(std::fstream *f, PgSQL_Session *sess) { uint64_t total_bytes=0; switch (et) { @@ -406,6 +600,12 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) { if (have_rows_sent == true) { j["rows_sent"] = rows_sent; } + if (sqlstate != nullptr) { + j["sqlstate"] = sqlstate; + } + if (errmsg != nullptr) { + j["error"] = errmsg; + } j["query"] = string(query_ptr, query_len); j["starttime_timestamp_us"] = start_time; { @@ -451,7 +651,7 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) { extern PgSQL_Query_Processor* GloPgQPro; -PgSQL_Logger::PgSQL_Logger() { +PgSQL_Logger::PgSQL_Logger() : metrics{0, 0, 0, 0, 0, 0, 0, 0, 0} { events.enabled=false; events.base_filename=NULL; events.datadir=NULL; @@ -471,6 +671,14 @@ PgSQL_Logger::PgSQL_Logger() { audit.logfile=NULL; audit.log_file_id=0; audit.max_log_file_size=100*1024*1024; + PgLogCB = new PgSQL_Logger_CircularBuffer(0); + + init_prometheus_counter_array( + pl_metrics_map, this->prom_metrics.p_counter_array + ); + init_prometheus_gauge_array( + pl_metrics_map, this->prom_metrics.p_gauge_array + ); }; PgSQL_Logger::~PgSQL_Logger() { @@ -482,6 +690,7 @@ PgSQL_Logger::~PgSQL_Logger() { free(audit.datadir); } free(audit.base_filename); + delete PgLogCB; }; void PgSQL_Logger::wrlock() { @@ -658,8 +867,11 @@ void PgSQL_Logger::audit_set_datadir(char *s) { }; void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { - if (events.enabled==false) return; - if (events.logfile==NULL) return; + int elmhs = pgsql_thread___eventslog_buffer_history_size; + if (elmhs == 0) { + if (events.enabled == false) return; + if (events.logfile == NULL) return; + } // 'PgSQL_Session::client_myds' could be NULL in case of 'RequestEnd' being called over a freshly created session // due to a failed 'CONNECTION_RESET'. Because this scenario isn't a client request, we just return. if (sess->client_myds==NULL || sess->client_myds->myconn== NULL) return; @@ -758,6 +970,9 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { me.set_affected_rows(sess->CurrentQuery.affected_rows); } me.set_rows_sent(sess->CurrentQuery.rows_sent); + if (myds && myds->myconn && myds->myconn->is_error_present()) { + me.set_error(myds->myconn->get_error_code_str(), myds->myconn->get_error_message().c_str()); + } int sl=0; char *sa=(char *)""; // default @@ -782,17 +997,20 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { // right before the write to disk //wrlock(); - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloPgSQL_Logger->wrlock(); - - me.write(events.logfile, sess); - - - unsigned long curpos=events.logfile->tellp(); - if (curpos > events.max_log_file_size) { - events_flush_log_unlocked(); + if ((events.enabled == true) && (events.logfile != nullptr)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump + GloPgSQL_Logger->wrlock(); + me.write(events.logfile, sess); + unsigned long curpos=events.logfile->tellp(); + if (curpos > events.max_log_file_size) { + events_flush_log_unlocked(); + } + wrunlock(); + } + if (PgLogCB->buffer_size != 0) { + PgSQL_Event* me2 = new PgSQL_Event(me); + PgLogCB->insert(me2); } - wrunlock(); if (cl && sess->client_myds->addr.port) { free(ca); @@ -1057,3 +1275,220 @@ void PgSQL_Logger::print_version() { fprintf(stderr,"Standard ProxySQL PgSQL Logger rev. %s -- %s -- %s\n", PROXYSQL_PGSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); } +PgSQL_Logger_CircularBuffer::PgSQL_Logger_CircularBuffer(size_t size) : + event_buffer(), + eventsAddedCount(0), eventsDroppedCount(0), + buffer_size(size) {} + +PgSQL_Logger_CircularBuffer::~PgSQL_Logger_CircularBuffer() { + std::lock_guard lock(mutex); + for (PgSQL_Event* event : event_buffer) { + delete event; + } +} + +void PgSQL_Logger_CircularBuffer::insert(PgSQL_Event* event) { + std::lock_guard lock(mutex); + eventsAddedCount++; + if (buffer_size == 0) { + delete event; + eventsDroppedCount++; + return; + } + while (event_buffer.size() >= buffer_size) { + delete event_buffer.front(); + event_buffer.pop_front(); + eventsDroppedCount++; + } + event_buffer.push_back(event); +} + +void PgSQL_Logger_CircularBuffer::get_all_events(std::vector& events) { + std::lock_guard lock(mutex); + events.reserve(event_buffer.size()); + events.insert(events.end(), event_buffer.begin(), event_buffer.end()); + event_buffer.clear(); +} + +size_t PgSQL_Logger_CircularBuffer::size() { + std::lock_guard lock(mutex); + return event_buffer.size(); +} + +size_t PgSQL_Logger_CircularBuffer::getBufferSize() const { + return buffer_size; +} + +void PgSQL_Logger_CircularBuffer::setBufferSize(size_t newSize) { + std::lock_guard lock(mutex); + buffer_size = newSize; + while (event_buffer.size() > buffer_size) { + delete event_buffer.front(); + event_buffer.pop_front(); + eventsDroppedCount++; + } +} + +void PgSQL_Logger::insertPgSQLEventsIntoDb(SQLite3DB* db, const std::string& tableName, size_t numEvents, std::vector::const_iterator begin) { + int rc = 0; + const int numcols = 17; + + std::string coldefs = "(thread_id, username, database, start_time, end_time, query_digest, query, server, client, event_type, hid, extra_info, affected_rows, rows_sent, client_stmt_name, sqlstate, error)"; + std::string query1s = "INSERT INTO " + tableName + coldefs + " VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)"; + std::string query32s = "INSERT INTO " + tableName + coldefs + " VALUES " + generate_multi_rows_query(32, numcols); + + auto [rc1, statement1_unique] = db->prepare_v2(query1s.c_str()); + ASSERT_SQLITE_OK(rc1, db); + auto [rc2, statement32_unique] = db->prepare_v2(query32s.c_str()); + ASSERT_SQLITE_OK(rc2, db); + sqlite3_stmt* statement1 = statement1_unique.get(); + sqlite3_stmt* statement32 = statement32_unique.get(); + + auto bind_text_or_null = [&](sqlite3_stmt* stmt, int idx, const char* v) { + if (v) { + rc = (*proxy_sqlite3_bind_text)(stmt, idx, v, -1, SQLITE_TRANSIENT); + } else { + rc = (*proxy_sqlite3_bind_null)(stmt, idx); + } + ASSERT_SQLITE_OK(rc, db); + }; + + char digest_hex_str[20]; + db->execute("BEGIN"); + + int row_idx = 0; + int max_bulk_row_idx = (numEvents / 32) * 32; + for (std::vector::const_iterator it = begin; it != begin + numEvents; ++it) { + PgSQL_Event* event = *it; + int idx = row_idx % 32; + sqlite3_stmt* stmt = (row_idx < max_bulk_row_idx ? statement32 : statement1); + int base = (row_idx < max_bulk_row_idx ? idx * numcols : 0); + + rc = (*proxy_sqlite3_bind_int)(stmt, base + 1, event->thread_id); ASSERT_SQLITE_OK(rc, db); + bind_text_or_null(stmt, base + 2, event->username); + bind_text_or_null(stmt, base + 3, event->schemaname); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 4, event->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 5, event->end_time); ASSERT_SQLITE_OK(rc, db); + sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest); + bind_text_or_null(stmt, base + 6, digest_hex_str); + bind_text_or_null(stmt, base + 7, event->query_ptr); + bind_text_or_null(stmt, base + 8, event->server); + bind_text_or_null(stmt, base + 9, event->client); + rc = (*proxy_sqlite3_bind_int)(stmt, base + 10, (int)event->et); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 11, event->hid); ASSERT_SQLITE_OK(rc, db); + bind_text_or_null(stmt, base + 12, event->extra_info); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 13, event->affected_rows); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(stmt, base + 14, event->rows_sent); ASSERT_SQLITE_OK(rc, db); + bind_text_or_null(stmt, base + 15, event->client_stmt_name); + bind_text_or_null(stmt, base + 16, event->sqlstate); + bind_text_or_null(stmt, base + 17, event->errmsg); + + if (row_idx < max_bulk_row_idx) { + if (idx == 31) { + SAFE_SQLITE3_STEP2(statement32); + rc = (*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, db); + } + } else { + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + + row_idx++; + } + + db->execute("COMMIT"); +} + +int PgSQL_Logger::processEvents(SQLite3DB* statsdb, SQLite3DB* statsdb_disk) { + unsigned long long startTimeMicros = monotonic_time(); + std::vector events = {}; + PgLogCB->get_all_events(events); + metrics.getAllEventsCallsCount++; + if (events.empty()) return 0; + + unsigned long long afterGetAllEventsTimeMicros = monotonic_time(); + metrics.getAllEventsEventsCount += events.size(); + metrics.totalGetAllEventsTimeMicros += (afterGetAllEventsTimeMicros - startTimeMicros); + + if (statsdb_disk != nullptr) { + unsigned long long diskStartTimeMicros = monotonic_time(); + insertPgSQLEventsIntoDb(statsdb_disk, "history_pgsql_query_events", events.size(), events.begin()); + unsigned long long diskEndTimeMicros = monotonic_time(); + metrics.diskCopyCount++; + metrics.totalDiskCopyTimeMicros += (diskEndTimeMicros - diskStartTimeMicros); + metrics.totalEventsCopiedToDisk += events.size(); + } + + if (statsdb != nullptr) { + unsigned long long memoryStartTimeMicros = monotonic_time(); + size_t maxInMemorySize = pgsql_thread___eventslog_table_memory_size; + size_t numEventsToInsert = std::min(events.size(), maxInMemorySize); + + if (events.size() >= maxInMemorySize) { + statsdb->execute("DELETE FROM stats_pgsql_query_events"); + } else { + int current_rows = statsdb->return_one_int((char*)"SELECT COUNT(*) FROM stats_pgsql_query_events"); + int rows_to_keep = maxInMemorySize - events.size(); + if (current_rows > rows_to_keep) { + int rows_to_delete = (current_rows - rows_to_keep); + std::string query = "SELECT MAX(id) FROM (SELECT id FROM stats_pgsql_query_events ORDER BY id LIMIT " + std::to_string(rows_to_delete) + ")"; + int maxIdToDelete = statsdb->return_one_int(query.c_str()); + std::string delete_stmt = "DELETE FROM stats_pgsql_query_events WHERE id <= " + std::to_string(maxIdToDelete); + statsdb->execute(delete_stmt.c_str()); + } + } + insertPgSQLEventsIntoDb(statsdb, "stats_pgsql_query_events", numEventsToInsert, events.begin()); + unsigned long long memoryEndTimeMicros = monotonic_time(); + metrics.memoryCopyCount++; + metrics.totalMemoryCopyTimeMicros += (memoryEndTimeMicros - memoryStartTimeMicros); + metrics.totalEventsCopiedToMemory += numEventsToInsert; + } + + for (PgSQL_Event* event : events) { + delete event; + } + + return events.size(); +} + +std::unordered_map PgSQL_Logger::getAllMetrics() const { + std::unordered_map allMetrics; + + allMetrics["memoryCopyCount"] = metrics.memoryCopyCount; + allMetrics["diskCopyCount"] = metrics.diskCopyCount; + allMetrics["getAllEventsCallsCount"] = metrics.getAllEventsCallsCount; + allMetrics["getAllEventsEventsCount"] = metrics.getAllEventsEventsCount; + allMetrics["totalMemoryCopyTimeMicros"] = metrics.totalMemoryCopyTimeMicros; + allMetrics["totalDiskCopyTimeMicros"] = metrics.totalDiskCopyTimeMicros; + allMetrics["totalGetAllEventsTimeMicros"] = metrics.totalGetAllEventsTimeMicros; + allMetrics["totalEventsCopiedToMemory"] = metrics.totalEventsCopiedToMemory; + allMetrics["totalEventsCopiedToDisk"] = metrics.totalEventsCopiedToDisk; + allMetrics["circularBufferEventsAddedCount"] = PgLogCB->getEventsAddedCount(); + allMetrics["circularBufferEventsDroppedCount"] = PgLogCB->getEventsDroppedCount(); + allMetrics["circularBufferEventsSize"] = PgLogCB->size(); + + return allMetrics; +} + +void PgSQL_Logger::p_update_metrics() { + using pl_c = p_pl_counter; + const auto& counters { this->prom_metrics.p_counter_array }; + + p_update_counter(counters[pl_c::memory_copy_count], metrics.memoryCopyCount); + p_update_counter(counters[pl_c::disk_copy_count], metrics.diskCopyCount); + p_update_counter(counters[pl_c::get_all_events_calls_count], metrics.getAllEventsCallsCount); + p_update_counter(counters[pl_c::get_all_events_events_count], metrics.getAllEventsEventsCount); + p_update_counter(counters[pl_c::total_memory_copy_time_us], metrics.totalMemoryCopyTimeMicros / (1000.0 * 1000)); + p_update_counter(counters[pl_c::total_disk_copy_time_us], metrics.totalDiskCopyTimeMicros / (1000.0 * 1000)); + p_update_counter(counters[pl_c::total_get_all_events_time_us], metrics.totalGetAllEventsTimeMicros / (1000.0 * 1000)); + p_update_counter(counters[pl_c::total_events_copied_to_memory], metrics.totalEventsCopiedToMemory); + p_update_counter(counters[pl_c::total_events_copied_to_disk], metrics.totalEventsCopiedToDisk); + p_update_counter(counters[pl_c::circular_buffer_events_added_count], PgLogCB->getEventsAddedCount()); + p_update_counter(counters[pl_c::circular_buffer_events_dropped_count], PgLogCB->getEventsDroppedCount()); + + using pl_g = p_pl_gauge; + const auto& gauges { this->prom_metrics.p_gauge_array }; + gauges[pl_g::circular_buffer_events_size]->Set(PgLogCB->size()); +} diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index c5a415bf3..383ef3015 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -4021,6 +4021,9 @@ void PgSQL_Thread::refresh_variables() { if (pgsql_thread___eventslog_filename) free(pgsql_thread___eventslog_filename); pgsql_thread___eventslog_filesize = GloPTH->get_variable_int((char*)"eventslog_filesize"); pgsql_thread___eventslog_buffer_history_size = GloPTH->get_variable_int((char*)"eventslog_buffer_history_size"); + if (GloPgSQL_Logger && GloPgSQL_Logger->PgLogCB->getBufferSize() != static_cast(pgsql_thread___eventslog_buffer_history_size)) { + GloPgSQL_Logger->PgLogCB->setBufferSize(pgsql_thread___eventslog_buffer_history_size); + } pgsql_thread___eventslog_table_memory_size = GloPTH->get_variable_int((char*)"eventslog_table_memory_size"); pgsql_thread___eventslog_buffer_max_query_length = GloPTH->get_variable_int((char*)"eventslog_buffer_max_query_length"); pgsql_thread___eventslog_default_log = GloPTH->get_variable_int((char*)"eventslog_default_log");