logger: harden buffered logging flush/rotation semantics

This commit addresses three correctness regressions introduced by
thread-local log buffering in PR #5364 (query logging performance):

1) stale logfile pointer/UAF race during concurrent rotate/close
2) stale logfile-open state after close
3) non-global flush behavior in admin/format-switch paths

What was fixed

- Make `flush_and_rotate()` consume the current logfile pointer under lock
  - Signature changed from `std::fstream*` to `std::fstream*&`
  - Prevents dereferencing a stale stream pointer captured before lock
    acquisition while another thread rotates/closes the file
  - Updated declaration/definition and all call sites

- Add explicit synchronization for cross-thread buffer draining
  - Added `LogBufferThreadContext::buffer_lock`
  - Any path that appends or flushes a thread buffer now locks this mutex
  - Guarantees force-flush from admin/config paths cannot race with
    worker-thread appends on the same context

- Restore global forced flush semantics where required
  - Extended `MySQL_Logger::flush` and `PgSQL_Logger::flush` to
    `flush(bool force = false)`
  - `force=false`: preserves existing low-overhead worker-loop behavior
    (per-thread timeout-based flush)
  - `force=true`: snapshots all known thread contexts and drains both
    events/audit buffers regardless of timeout
  - `flush_log()` now calls `flush(true)` before file rotation, so admin
    flush and format-switch operations no longer miss pending thread buffers

- Avoid unintended rotation during forced draining
  - In `force=true` path, flush uses `rotate_fn=nullptr`
  - Drains buffered payload into the current file first
  - `flush_log()` then performs one controlled rotate/open step

- Fix logfile state tracking after close
  - `events_close_log_unlocked()` and `audit_close_log_unlocked()` now set
    `logfile_open=false` when the stream is closed
  - Prevents write paths from treating a closed stream as open

- Remove per-thread-context dependency during metadata header write
  - `events_open_log_unlocked()` now uses a local `LogBuffer` for metadata
    emission in format=1 instead of reusing a thread context buffer
  - Keeps open/rotate path independent from worker context lifecycle

- Keep callers consistent and non-duplicative
  - Since `flush_log()` now force-drains internally, removed redundant
    explicit `flush()` calls from:
    - MySQL/PgSQL `eventslog_format` switch handlers
    - `ProxySQL_Admin::flush_logs`

Behavioral outcome

- No stale stream pointer use when close/rotate interleaves with flush
- No false-positive logfile-open state after close
- `FLUSH LOGS` and eventslog format switch now drain all known thread
  buffers before rotating, preventing dropped/misplaced buffered records

Validation

- Built modified objects directly
- Ran full debug build with GENAI enabled:

  make clean && export PROXYSQLGENAI=1 && make debug -j24

  Build completed successfully.
pull/5389/head
Rene Cannao 2 months ago
parent 568fe1524e
commit caf324f911

@ -551,7 +551,7 @@ public:
/**
* @brief Flushes the log files.
*/
void flush();
void flush(bool force = false);
/**
* @brief Acquires a write lock.

@ -126,7 +126,7 @@ class PgSQL_Logger {
void audit_set_base_filename();
void log_request(PgSQL_Session *, PgSQL_Data_Stream *);
void log_audit_entry(PGSQL_LOG_EVENT_TYPE, PgSQL_Session *, PgSQL_Data_Stream *, char *e = NULL);
void flush();
void flush(bool force = false);
void wrlock();
void wrunlock();
};

@ -146,6 +146,7 @@ private:
std::uniform_real_distribution<double> dist; ///< Uniform distribution [0.0, 1.0)
public:
std::mutex buffer_lock; ///< Protects cross-thread flush operations on thread-local buffers.
LogBuffer events; ///< Event log buffer and timestamp
LogBuffer audit; ///< Audit log buffer and timestamp
@ -198,7 +199,7 @@ LogBufferThreadContext* GetLogBufferThreadContext(std::unordered_map<pthread_t,
*/
bool flush_and_rotate(
LogBuffer& buffer,
std::fstream* logfile,
std::fstream*& logfile,
unsigned int& current_log_size,
unsigned int max_log_file_size,
std::function<void()> lock_fn,

@ -1220,22 +1220,23 @@ MySQL_Logger::~MySQL_Logger() {
std::lock_guard<std::mutex> lock(log_thread_contexts_lock);
for (const auto& kv : log_thread_contexts) {
LogBufferThreadContext* log_ctx = kv.second.get();
if (!log_ctx->events.empty()) {
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
if (!log_ctx->events.empty()) {
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
0
);
}
if (!log_ctx->audit.empty()) {
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
);
}
if (!log_ctx->audit.empty()) {
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
0
);
}
);
}
}
}
if (events.datadir) {
@ -1267,6 +1268,7 @@ void MySQL_Logger::wrunlock() {
void MySQL_Logger::flush_log() {
if (audit.enabled==false && events.enabled==false) return;
flush(true);
wrlock();
events_flush_log_unlocked();
audit_flush_log_unlocked();
@ -1295,6 +1297,7 @@ void MySQL_Logger::events_close_log_unlocked() {
events.logfile->close();
delete events.logfile;
events.logfile=NULL;
set_events_logfile_open(false);
}
}
@ -1304,6 +1307,7 @@ void MySQL_Logger::audit_close_log_unlocked() {
audit.logfile->close();
delete audit.logfile;
audit.logfile=NULL;
set_audit_logfile_open(false);
}
}
@ -1343,7 +1347,7 @@ void MySQL_Logger::events_open_log_unlocked() {
proxy_info("Starting new mysql event log file %s\n", filen);
if (mysql_thread___eventslog_format == 1) {
// create a new event, type PROXYSQL_METADATA, that writes the ProxySQL version as part of the payload
LogBufferThreadContext *log_ctx = get_log_thread_context();
LogBuffer metadata_buf;
json j = {};
j["version"] = string(PROXYSQL_VERSION);
string msg = j.dump();
@ -1360,10 +1364,9 @@ void MySQL_Logger::events_open_log_unlocked() {
nullptr // no session associated
);
metaEvent.set_query((char *)"",0);
metaEvent.write(&log_ctx->events, nullptr);
log_ctx->events.flush_to_file(events.logfile);
events.current_log_size += log_ctx->events.size();
log_ctx->events.reset(monotonic_time());
metaEvent.write(&metadata_buf, nullptr);
metadata_buf.flush_to_file(events.logfile);
events.current_log_size += metadata_buf.size();
}
}
catch (const std::ofstream::failure&) {
@ -1607,17 +1610,18 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds, con
//wrlock();
if (is_events_logfile_open()) {
me.write(&log_ctx->events, sess);
if (log_ctx->events.size() > static_cast<size_t>(mysql_thread___eventslog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
monotonic_time()
);
}
}
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
me.write(&log_ctx->events, sess);
if (log_ctx->events.size() > static_cast<size_t>(mysql_thread___eventslog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
monotonic_time()
);
}
}
if (MyLogCB->buffer_size != 0) {
MySQL_Event *me2 = new MySQL_Event(me);
@ -1769,17 +1773,18 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ
//wrlock();
if (is_audit_logfile_open()) {
me.write(&log_ctx->audit, sess);
if (log_ctx->audit.size() > static_cast<size_t>(mysql_thread___auditlog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
monotonic_time()
);
}
}
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
me.write(&log_ctx->audit, sess);
if (log_ctx->audit.size() > static_cast<size_t>(mysql_thread___auditlog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
monotonic_time()
);
}
}
if (cl && sess->client_myds->addr.port) {
free(ca);
@ -1789,43 +1794,86 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ
}
}
void MySQL_Logger::flush() {
LogBufferThreadContext* log_ctx = get_log_thread_context();
const uint64_t current_time = monotonic_time();
// eventslog
if (is_events_logfile_open()) {
if (log_ctx->events.size() > 0 &&
(current_time - log_ctx->events.get_last_flush_time()) > static_cast<uint64_t>(mysql_thread___eventslog_flush_timeout) * 1000) {
flush_and_rotate(
log_ctx->events,
events.logfile,
events.current_log_size,
events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
current_time
);
}
}
void MySQL_Logger::flush(bool force) {
const uint64_t current_time = monotonic_time();
if (force) {
std::vector<LogBufferThreadContext*> contexts;
{
std::lock_guard<std::mutex> lock(log_thread_contexts_lock);
contexts.reserve(log_thread_contexts.size());
for (const auto& kv : log_thread_contexts) {
contexts.push_back(kv.second.get());
}
}
for (LogBufferThreadContext* ctx : contexts) {
std::lock_guard<std::mutex> ctx_lock(ctx->buffer_lock);
if (is_events_logfile_open() && !ctx->events.empty()) {
flush_and_rotate(
ctx->events,
events.logfile,
events.current_log_size,
events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
current_time
);
}
if (is_audit_logfile_open() && !ctx->audit.empty()) {
flush_and_rotate(
ctx->audit,
audit.logfile,
audit.current_log_size,
audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
current_time
);
}
}
return;
}
LogBufferThreadContext* log_ctx = get_log_thread_context();
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
// auditlogs
if (is_audit_logfile_open()) {
if (log_ctx->audit.size() > 0 &&
// eventslog
if (is_events_logfile_open()) {
if (log_ctx->events.size() > 0 &&
(current_time - log_ctx->events.get_last_flush_time()) > static_cast<uint64_t>(mysql_thread___eventslog_flush_timeout) * 1000) {
flush_and_rotate(
log_ctx->events,
events.logfile,
events.current_log_size,
events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
current_time
);
}
}
// auditlogs
if (is_audit_logfile_open()) {
if (log_ctx->audit.size() > 0 &&
(current_time - log_ctx->audit.get_last_flush_time()) > static_cast<uint64_t>(mysql_thread___auditlog_flush_timeout) * 1000) {
flush_and_rotate(
flush_and_rotate(
log_ctx->audit,
audit.logfile,
audit.current_log_size,
audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
current_time
);
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
current_time
);
}
}
}
}
unsigned int MySQL_Logger::events_find_next_id() {

@ -2042,8 +2042,6 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi
// if we are switching format, we need to switch file too
if (GloMyLogger) {
proxy_info("Switching query logging format from %d to %d\n", variables.eventslog_format , intv);
// write existing logs (if any) to file before switching the file
GloMyLogger->flush();
GloMyLogger->flush_log();
}
variables.eventslog_format=intv;

@ -14,6 +14,7 @@ using json = nlohmann::json;
#include <dirent.h>
#include <libgen.h>
#include <vector>
#ifdef DEBUG
@ -494,22 +495,23 @@ PgSQL_Logger::~PgSQL_Logger() {
std::lock_guard<std::mutex> lock(log_thread_contexts_lock);
for (const auto& kv : log_thread_contexts) {
LogBufferThreadContext* log_ctx = kv.second.get();
if (!log_ctx->events.empty()) {
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
if (!log_ctx->events.empty()) {
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
0
);
}
if (!log_ctx->audit.empty()) {
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
);
}
if (!log_ctx->audit.empty()) {
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
0
);
}
);
}
}
}
@ -541,6 +543,7 @@ void PgSQL_Logger::wrunlock() {
void PgSQL_Logger::flush_log() {
if (audit.enabled==false && events.enabled==false) return;
flush(true);
wrlock();
events_flush_log_unlocked();
audit_flush_log_unlocked();
@ -569,6 +572,7 @@ void PgSQL_Logger::events_close_log_unlocked() {
events.logfile->close();
delete events.logfile;
events.logfile=NULL;
set_events_logfile_open(false);
}
}
@ -578,6 +582,7 @@ void PgSQL_Logger::audit_close_log_unlocked() {
audit.logfile->close();
delete audit.logfile;
audit.logfile=NULL;
set_audit_logfile_open(false);
}
}
@ -853,17 +858,18 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
//wrlock();
if (is_events_logfile_open()) {
me.write(&log_ctx->events, sess);
if (log_ctx->events.size() > static_cast<size_t>(pgsql_thread___eventslog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
monotonic_time()
);
}
}
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
me.write(&log_ctx->events, sess);
if (log_ctx->events.size() > static_cast<size_t>(pgsql_thread___eventslog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump
flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
monotonic_time()
);
}
}
if (cl && sess->client_myds->addr.port) {
free(ca);
@ -1006,16 +1012,17 @@ void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess
//wrlock();
if (is_audit_logfile_open()) {
me.write(&log_ctx->audit, sess);
if (log_ctx->audit.size() > static_cast<size_t>(pgsql_thread___auditlog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
monotonic_time());
}
}
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
me.write(&log_ctx->audit, sess);
if (log_ctx->audit.size() > static_cast<size_t>(pgsql_thread___auditlog_flush_size)) {
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump
flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
monotonic_time());
}
}
if (cl && sess->client_myds->addr.port) {
free(ca);
@ -1025,43 +1032,85 @@ void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess
}
}
void PgSQL_Logger::flush() {
void PgSQL_Logger::flush(bool force) {
const uint64_t current_time = monotonic_time();
if (force) {
std::vector<LogBufferThreadContext*> contexts;
{
std::lock_guard<std::mutex> lock(log_thread_contexts_lock);
contexts.reserve(log_thread_contexts.size());
for (const auto& kv : log_thread_contexts) {
contexts.push_back(kv.second.get());
}
}
for (LogBufferThreadContext* ctx : contexts) {
std::lock_guard<std::mutex> ctx_lock(ctx->buffer_lock);
if (is_events_logfile_open() && !ctx->events.empty()) {
flush_and_rotate(
ctx->events,
events.logfile,
events.current_log_size,
events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
current_time
);
}
if (is_audit_logfile_open() && !ctx->audit.empty()) {
flush_and_rotate(
ctx->audit,
audit.logfile,
audit.current_log_size,
audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
nullptr,
current_time
);
}
}
return;
}
LogBufferThreadContext* log_ctx = get_log_thread_context();
const uint64_t current_time = monotonic_time();
std::lock_guard<std::mutex> ctx_lock(log_ctx->buffer_lock);
// eventslog
if (is_events_logfile_open()) {
if (log_ctx->events.size() > 0 &&
// eventslog
if (is_events_logfile_open()) {
if (log_ctx->events.size() > 0 &&
(current_time - log_ctx->events.get_last_flush_time()) > static_cast<uint64_t>(pgsql_thread___eventslog_flush_timeout) * 1000) {
flush_and_rotate(
flush_and_rotate(
log_ctx->events,
events.logfile,
events.current_log_size,
events.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
current_time
);
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { events_flush_log_unlocked(); },
current_time
);
}
}
}
// auditlogs
if (is_audit_logfile_open()) {
if (log_ctx->audit.size() > 0 &&
// auditlogs
if (is_audit_logfile_open()) {
if (log_ctx->audit.size() > 0 &&
(current_time - log_ctx->audit.get_last_flush_time()) > static_cast<uint64_t>(pgsql_thread___auditlog_flush_timeout) * 1000) {
flush_and_rotate(
flush_and_rotate(
log_ctx->audit,
audit.logfile,
audit.current_log_size,
audit.max_log_file_size,
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
current_time
);
[this]() { wrlock(); },
[this]() { wrunlock(); },
[this]() { audit_flush_log_unlocked(); },
current_time
);
}
}
}
}
unsigned int PgSQL_Logger::events_find_next_id() {

@ -1764,8 +1764,6 @@ bool PgSQL_Threads_Handler::set_variable(char* name, const char* value) { // thi
// if we are switching format, we need to switch file too
if (GloPgSQL_Logger) {
proxy_info("Switching query logging format from %d to %d\n", variables.eventslog_format, intv);
// write existing logs (if any) to file before switching the file
GloPgSQL_Logger->flush();
GloPgSQL_Logger->flush_log();
}
variables.eventslog_format = intv;

@ -1095,13 +1095,9 @@ void flush_logs_handler() {
void ProxySQL_Admin::flush_logs() {
if (GloMyLogger) {
// flush any buffered logs before flushing log file
GloMyLogger->flush();
GloMyLogger->flush_log();
}
if (GloPgSQL_Logger) {
// flush any buffered logs before flushing log file
GloPgSQL_Logger->flush();
GloPgSQL_Logger->flush_log();
}
this->flush_error_log();

@ -79,7 +79,7 @@ void LogBuffer::flush_to_file(std::fstream* logfile) {
bool flush_and_rotate(
LogBuffer& buffer,
std::fstream* logfile,
std::fstream*& logfile,
unsigned int& current_log_size,
unsigned int max_log_file_size,
std::function<void()> lock_fn,

Loading…
Cancel
Save