diff --git a/include/MySQL_Logger.hpp b/include/MySQL_Logger.hpp index 1d151321d..6169cefba 100644 --- a/include/MySQL_Logger.hpp +++ b/include/MySQL_Logger.hpp @@ -551,7 +551,7 @@ public: /** * @brief Flushes the log files. */ - void flush(); + void flush(bool force = false); /** * @brief Acquires a write lock. diff --git a/include/PgSQL_Logger.hpp b/include/PgSQL_Logger.hpp index 18b3ec423..05842f8b4 100644 --- a/include/PgSQL_Logger.hpp +++ b/include/PgSQL_Logger.hpp @@ -126,7 +126,7 @@ class PgSQL_Logger { void audit_set_base_filename(); void log_request(PgSQL_Session *, PgSQL_Data_Stream *); void log_audit_entry(PGSQL_LOG_EVENT_TYPE, PgSQL_Session *, PgSQL_Data_Stream *, char *e = NULL); - void flush(); + void flush(bool force = false); void wrlock(); void wrunlock(); }; diff --git a/include/log_utils.h b/include/log_utils.h index 29475deb6..7d3e8e4dc 100644 --- a/include/log_utils.h +++ b/include/log_utils.h @@ -146,6 +146,7 @@ private: std::uniform_real_distribution dist; ///< Uniform distribution [0.0, 1.0) public: + std::mutex buffer_lock; ///< Protects cross-thread flush operations on thread-local buffers. LogBuffer events; ///< Event log buffer and timestamp LogBuffer audit; ///< Audit log buffer and timestamp @@ -198,7 +199,7 @@ LogBufferThreadContext* GetLogBufferThreadContext(std::unordered_map lock_fn, diff --git a/lib/MySQL_Logger.cpp b/lib/MySQL_Logger.cpp index 27a8eafaa..d71182610 100644 --- a/lib/MySQL_Logger.cpp +++ b/lib/MySQL_Logger.cpp @@ -1220,22 +1220,23 @@ MySQL_Logger::~MySQL_Logger() { std::lock_guard lock(log_thread_contexts_lock); for (const auto& kv : log_thread_contexts) { LogBufferThreadContext* log_ctx = kv.second.get(); - if (!log_ctx->events.empty()) { - flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - nullptr, + std::lock_guard ctx_lock(log_ctx->buffer_lock); + if (!log_ctx->events.empty()) { + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, 0 - ); - } - if (!log_ctx->audit.empty()) { - flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - nullptr, + ); + } + if (!log_ctx->audit.empty()) { + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, 0 - ); - } + ); + } } } if (events.datadir) { @@ -1267,6 +1268,7 @@ void MySQL_Logger::wrunlock() { void MySQL_Logger::flush_log() { if (audit.enabled==false && events.enabled==false) return; + flush(true); wrlock(); events_flush_log_unlocked(); audit_flush_log_unlocked(); @@ -1295,6 +1297,7 @@ void MySQL_Logger::events_close_log_unlocked() { events.logfile->close(); delete events.logfile; events.logfile=NULL; + set_events_logfile_open(false); } } @@ -1304,6 +1307,7 @@ void MySQL_Logger::audit_close_log_unlocked() { audit.logfile->close(); delete audit.logfile; audit.logfile=NULL; + set_audit_logfile_open(false); } } @@ -1343,7 +1347,7 @@ void MySQL_Logger::events_open_log_unlocked() { proxy_info("Starting new mysql event log file %s\n", filen); if (mysql_thread___eventslog_format == 1) { // create a new event, type PROXYSQL_METADATA, that writes the ProxySQL version as part of the payload - LogBufferThreadContext *log_ctx = get_log_thread_context(); + LogBuffer metadata_buf; json j = {}; j["version"] = string(PROXYSQL_VERSION); string msg = j.dump(); @@ -1360,10 +1364,9 @@ void MySQL_Logger::events_open_log_unlocked() { nullptr // no session associated ); metaEvent.set_query((char *)"",0); - metaEvent.write(&log_ctx->events, nullptr); - log_ctx->events.flush_to_file(events.logfile); - events.current_log_size += log_ctx->events.size(); - log_ctx->events.reset(monotonic_time()); + metaEvent.write(&metadata_buf, nullptr); + metadata_buf.flush_to_file(events.logfile); + events.current_log_size += metadata_buf.size(); } } catch (const std::ofstream::failure&) { @@ -1607,17 +1610,18 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds, con //wrlock(); if (is_events_logfile_open()) { - me.write(&log_ctx->events, sess); - if (log_ctx->events.size() > static_cast(mysql_thread___eventslog_flush_size)) { - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { events_flush_log_unlocked(); }, - monotonic_time() - ); - } - } + std::lock_guard ctx_lock(log_ctx->buffer_lock); + me.write(&log_ctx->events, sess); + if (log_ctx->events.size() > static_cast(mysql_thread___eventslog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + monotonic_time() + ); + } + } if (MyLogCB->buffer_size != 0) { MySQL_Event *me2 = new MySQL_Event(me); @@ -1769,17 +1773,18 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ //wrlock(); if (is_audit_logfile_open()) { - me.write(&log_ctx->audit, sess); - if (log_ctx->audit.size() > static_cast(mysql_thread___auditlog_flush_size)) { - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump - flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { audit_flush_log_unlocked(); }, - monotonic_time() - ); - } - } + std::lock_guard ctx_lock(log_ctx->buffer_lock); + me.write(&log_ctx->audit, sess); + if (log_ctx->audit.size() > static_cast(mysql_thread___auditlog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + monotonic_time() + ); + } + } if (cl && sess->client_myds->addr.port) { free(ca); @@ -1789,43 +1794,86 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ } } -void MySQL_Logger::flush() { - LogBufferThreadContext* log_ctx = get_log_thread_context(); - const uint64_t current_time = monotonic_time(); - - // eventslog - if (is_events_logfile_open()) { - if (log_ctx->events.size() > 0 && - (current_time - log_ctx->events.get_last_flush_time()) > static_cast(mysql_thread___eventslog_flush_timeout) * 1000) { - flush_and_rotate( - log_ctx->events, - events.logfile, - events.current_log_size, - events.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { events_flush_log_unlocked(); }, - current_time - ); - } - } +void MySQL_Logger::flush(bool force) { + const uint64_t current_time = monotonic_time(); + + if (force) { + std::vector contexts; + { + std::lock_guard lock(log_thread_contexts_lock); + contexts.reserve(log_thread_contexts.size()); + for (const auto& kv : log_thread_contexts) { + contexts.push_back(kv.second.get()); + } + } + + for (LogBufferThreadContext* ctx : contexts) { + std::lock_guard ctx_lock(ctx->buffer_lock); + if (is_events_logfile_open() && !ctx->events.empty()) { + flush_and_rotate( + ctx->events, + events.logfile, + events.current_log_size, + events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, + current_time + ); + } + if (is_audit_logfile_open() && !ctx->audit.empty()) { + flush_and_rotate( + ctx->audit, + audit.logfile, + audit.current_log_size, + audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, + current_time + ); + } + } + return; + } + + LogBufferThreadContext* log_ctx = get_log_thread_context(); + + std::lock_guard ctx_lock(log_ctx->buffer_lock); - // auditlogs - if (is_audit_logfile_open()) { - if (log_ctx->audit.size() > 0 && + // eventslog + if (is_events_logfile_open()) { + if (log_ctx->events.size() > 0 && + (current_time - log_ctx->events.get_last_flush_time()) > static_cast(mysql_thread___eventslog_flush_timeout) * 1000) { + flush_and_rotate( + log_ctx->events, + events.logfile, + events.current_log_size, + events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + current_time + ); + } + } + + // auditlogs + if (is_audit_logfile_open()) { + if (log_ctx->audit.size() > 0 && (current_time - log_ctx->audit.get_last_flush_time()) > static_cast(mysql_thread___auditlog_flush_timeout) * 1000) { - flush_and_rotate( + flush_and_rotate( log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { audit_flush_log_unlocked(); }, - current_time - ); + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + current_time + ); } - } + } } unsigned int MySQL_Logger::events_find_next_id() { diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index bb150b377..4c7c8a0b1 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -2042,8 +2042,6 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi // if we are switching format, we need to switch file too if (GloMyLogger) { proxy_info("Switching query logging format from %d to %d\n", variables.eventslog_format , intv); - // write existing logs (if any) to file before switching the file - GloMyLogger->flush(); GloMyLogger->flush_log(); } variables.eventslog_format=intv; diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index 3668d7455..1ebca2edd 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -14,6 +14,7 @@ using json = nlohmann::json; #include #include +#include #ifdef DEBUG @@ -494,22 +495,23 @@ PgSQL_Logger::~PgSQL_Logger() { std::lock_guard lock(log_thread_contexts_lock); for (const auto& kv : log_thread_contexts) { LogBufferThreadContext* log_ctx = kv.second.get(); - if (!log_ctx->events.empty()) { - flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - nullptr, + std::lock_guard ctx_lock(log_ctx->buffer_lock); + if (!log_ctx->events.empty()) { + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, 0 - ); - } - if (!log_ctx->audit.empty()) { - flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - nullptr, + ); + } + if (!log_ctx->audit.empty()) { + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, 0 - ); - } + ); + } } } @@ -541,6 +543,7 @@ void PgSQL_Logger::wrunlock() { void PgSQL_Logger::flush_log() { if (audit.enabled==false && events.enabled==false) return; + flush(true); wrlock(); events_flush_log_unlocked(); audit_flush_log_unlocked(); @@ -569,6 +572,7 @@ void PgSQL_Logger::events_close_log_unlocked() { events.logfile->close(); delete events.logfile; events.logfile=NULL; + set_events_logfile_open(false); } } @@ -578,6 +582,7 @@ void PgSQL_Logger::audit_close_log_unlocked() { audit.logfile->close(); delete audit.logfile; audit.logfile=NULL; + set_audit_logfile_open(false); } } @@ -853,17 +858,18 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { //wrlock(); if (is_events_logfile_open()) { - me.write(&log_ctx->events, sess); - if (log_ctx->events.size() > static_cast(pgsql_thread___eventslog_flush_size)) { - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { events_flush_log_unlocked(); }, - monotonic_time() - ); - } - } + std::lock_guard ctx_lock(log_ctx->buffer_lock); + me.write(&log_ctx->events, sess); + if (log_ctx->events.size() > static_cast(pgsql_thread___eventslog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + monotonic_time() + ); + } + } if (cl && sess->client_myds->addr.port) { free(ca); @@ -1006,16 +1012,17 @@ void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess //wrlock(); if (is_audit_logfile_open()) { - me.write(&log_ctx->audit, sess); - if (log_ctx->audit.size() > static_cast(pgsql_thread___auditlog_flush_size)) { - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump - flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { audit_flush_log_unlocked(); }, - monotonic_time()); - } - } + std::lock_guard ctx_lock(log_ctx->buffer_lock); + me.write(&log_ctx->audit, sess); + if (log_ctx->audit.size() > static_cast(pgsql_thread___auditlog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + monotonic_time()); + } + } if (cl && sess->client_myds->addr.port) { free(ca); @@ -1025,43 +1032,85 @@ void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess } } -void PgSQL_Logger::flush() { +void PgSQL_Logger::flush(bool force) { + const uint64_t current_time = monotonic_time(); + + if (force) { + std::vector contexts; + { + std::lock_guard lock(log_thread_contexts_lock); + contexts.reserve(log_thread_contexts.size()); + for (const auto& kv : log_thread_contexts) { + contexts.push_back(kv.second.get()); + } + } + + for (LogBufferThreadContext* ctx : contexts) { + std::lock_guard ctx_lock(ctx->buffer_lock); + if (is_events_logfile_open() && !ctx->events.empty()) { + flush_and_rotate( + ctx->events, + events.logfile, + events.current_log_size, + events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, + current_time + ); + } + if (is_audit_logfile_open() && !ctx->audit.empty()) { + flush_and_rotate( + ctx->audit, + audit.logfile, + audit.current_log_size, + audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr, + current_time + ); + } + } + return; + } + LogBufferThreadContext* log_ctx = get_log_thread_context(); - const uint64_t current_time = monotonic_time(); + std::lock_guard ctx_lock(log_ctx->buffer_lock); - // eventslog - if (is_events_logfile_open()) { - if (log_ctx->events.size() > 0 && + // eventslog + if (is_events_logfile_open()) { + if (log_ctx->events.size() > 0 && (current_time - log_ctx->events.get_last_flush_time()) > static_cast(pgsql_thread___eventslog_flush_timeout) * 1000) { - flush_and_rotate( + flush_and_rotate( log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { events_flush_log_unlocked(); }, - current_time - ); + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + current_time + ); } - } + } - // auditlogs - if (is_audit_logfile_open()) { - if (log_ctx->audit.size() > 0 && + // auditlogs + if (is_audit_logfile_open()) { + if (log_ctx->audit.size() > 0 && (current_time - log_ctx->audit.get_last_flush_time()) > static_cast(pgsql_thread___auditlog_flush_timeout) * 1000) { - flush_and_rotate( + flush_and_rotate( log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, - [this]() { wrlock(); }, - [this]() { wrunlock(); }, - [this]() { audit_flush_log_unlocked(); }, - current_time - ); + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + current_time + ); } - } + } } unsigned int PgSQL_Logger::events_find_next_id() { diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index cd63cd2ce..29b0c8d53 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -1764,8 +1764,6 @@ bool PgSQL_Threads_Handler::set_variable(char* name, const char* value) { // thi // if we are switching format, we need to switch file too if (GloPgSQL_Logger) { proxy_info("Switching query logging format from %d to %d\n", variables.eventslog_format, intv); - // write existing logs (if any) to file before switching the file - GloPgSQL_Logger->flush(); GloPgSQL_Logger->flush_log(); } variables.eventslog_format = intv; diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 01fa45217..4543a5359 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -1095,13 +1095,9 @@ void flush_logs_handler() { void ProxySQL_Admin::flush_logs() { if (GloMyLogger) { - // flush any buffered logs before flushing log file - GloMyLogger->flush(); GloMyLogger->flush_log(); } if (GloPgSQL_Logger) { - // flush any buffered logs before flushing log file - GloPgSQL_Logger->flush(); GloPgSQL_Logger->flush_log(); } this->flush_error_log(); diff --git a/lib/log_utils.cpp b/lib/log_utils.cpp index 319fd84b0..ce1d4bd97 100644 --- a/lib/log_utils.cpp +++ b/lib/log_utils.cpp @@ -79,7 +79,7 @@ void LogBuffer::flush_to_file(std::fstream* logfile) { bool flush_and_rotate( LogBuffer& buffer, - std::fstream* logfile, + std::fstream*& logfile, unsigned int& current_log_size, unsigned int max_log_file_size, std::function lock_fn,