fix(plugin-chassis,mysqlx): chassis read-path scaling, graceful shutdown, hardening

Six independent items from the independent review of PR #5651, batched
together because each one alone is small.

1) lib/ProxySQL_PluginManager.cpp: replace g_active_plugin_manager_mutex
   with a std::shared_mutex. Readers (dispatch_admin_command,
   dispatch_query_hook, resolve_alias_to_canonical) take a shared lock
   so multiple worker threads can be inside plugin callbacks
   concurrently; writers (publish/unpublish in load/stop) take the
   unique lock.

   The previous std::mutex serialized every plugin-callback dispatch on
   one mutex. Once a plugin actually wires a query hook into
   MySQL_Thread / PgSQL_Thread, every concurrent client query on every
   worker would have queued behind that mutex — silently negating
   ProxySQL's per-worker parallelism. The lock-free
   proxysql_has_configured_plugin_query_hook() didn't help, since the
   actual dispatch still took the lock. Switching to shared_mutex on
   the read path lets dispatch scale linearly.

2) plugins/mysqlx/include/mysqlx_session.h +
   plugins/mysqlx/src/mysqlx_session.cpp +
   plugins/mysqlx/src/mysqlx_thread.cpp: add
   MysqlxSession::shutdown_notify_client() and call it from
   Mysqlx_Thread::run() on the way out of the worker loop.

   Previously Mysqlx_Thread::stop() flipped running_=false and joined.
   The destructor then deleted sessions, closing their fds. Connected
   clients saw an unannounced TCP RST mid-response and a torn TLS
   record. Now the worker, on its way out, walks sessions_ and for each
   live session: enqueues a Mysqlx::Error frame (code 1053, "Server is
   shutting down", FATAL severity); flushes one write_to_net pass;
   if TLS is up, calls SSL_set_quiet_shutdown(1) + SSL_shutdown so the
   peer's TLS stack sees a proper close_notify rather than a torn
   record. Best-effort throughout — never blocks on unresponsive peers
   because the process is exiting.

3) plugins/mysqlx/include/mysqlx_session.h +
   plugins/mysqlx/src/mysqlx_session.cpp: remove the dead
   TLS_PASSTHROUGH enum value and the two corresponding branches.

   handler_tls_accept_init's first three lines and the
   `tls_mode_ != TLS_PASSTHROUGH` predicate in handler_connecting_server
   only ran when set_tls_mode(TLS_PASSTHROUGH) had been called, which
   never happened in production — the `mysqlx_tls_mode` config column
   is never plumbed into a session. Worse, the PASSTHROUGH branch did
   not actually implement an opaque pipe (it just skipped TLS
   termination and resumed clear-text X-Protocol parsing, which would
   desync any real client). Drop the value rather than carry a
   misleading enum that suggests a feature exists. Future passthrough
   work should reintroduce a properly-wired implementation.

4) plugins/mysqlx/src/mysqlx_connection.cpp: check the return value of
   inet_pton in start_connect; fail fast on anything that isn't a
   valid IPv4 dotted-quad.

   Previously the return was discarded. inet_pton on a hostname (or
   IPv6 literal, or empty string) silently left sin_addr at 0.0.0.0 —
   producing a connect to 0.0.0.0/INADDR_ANY rather than the intended
   target. Real footgun because mysqlx_backend_endpoints.hostname
   accepts arbitrary strings. Now: fail with ERROR_STATE so the
   misconfig surfaces instead of routing traffic to the wrong target.
   Hostname resolution is still the upstream pipeline's job;
   start_connect deliberately stays narrow.

5) plugins/mysqlx/src/mysqlx_data_stream.cpp: move
   do_ssl_handshake's 64 KiB scratch buffer from the stack to a
   thread_local static.

   ASan-instrumented builds and large-thread-pool configs can run with
   thread stacks tight enough that a stack-allocated 64 KiB local
   straddles the limit. Each Mysqlx_Thread owns its own thread_local
   instance so the buffer is not shared between threads.

6) plugins/mysqlx/src/mysqlx_thread.cpp: document the listener-removal
   semantics on remove_listener_for_route.

   Document that already-accepted sessions on a listener that's being
   removed continue running against their existing target_hostgroup_ /
   target_address_ / target_port_ until they finish or hit idle
   timeout. That matches surrounding MySQL behaviour (DROP TABLE
   doesn't cancel in-flight queries; ALTER doesn't kick off open
   prepared statements). Future change can call shutdown_notify_client
   on matching sessions if active disconnection becomes desirable.

NOT changed: the agent-flagged "compression overshoot" issue at
plugins/mysqlx/src/mysqlx_session.cpp:1283. The zstd-stream loop
already caps the resize at `cap` on line 1369 (`if (new_sz > cap)
new_sz = cap;`) before the resize, so decompressed never grows past
the cap; there is no overshoot. Verified by tracing the loop. Skipped.

Verified locally:
- plugin .so builds clean with PROXYSQL40=1 and the implied tier flags.
- libproxysql.a and src/proxysql build clean.
- plugin chassis tests (plugin_lifecycle_unit-t,
  plugin_dispatch_unit-t, plugin_manager_unit-t,
  plugin_query_hook_unit-t) build and pass with the new shared_mutex
  read path.  plugin_manager_unit-t shows the same 2 pre-existing
  destructor-related failures it had before this commit (verified by
  stash + rebuild).
- mysqlx_robustness_unit-t passes 74/74.
- mysqlx_session_unit-t has the same pre-existing failures at 33-34.
pull/5700/head
Rene Cannao 2 months ago
parent df7e335e23
commit 55e90d1a76

@ -12,6 +12,7 @@
#include <cstring>
#include <dlfcn.h>
#include <mutex>
#include <shared_mutex>
#include <strings.h>
#include "proxysql.h"
@ -28,7 +29,15 @@ namespace {
std::atomic<ProxySQL_PluginManager*> g_active_plugin_manager { nullptr };
ProxySQL_PluginManager* g_registry_target = nullptr;
std::mutex g_active_plugin_manager_mutex {};
// Guards swaps of g_active_plugin_manager. Readers (dispatch_admin_command,
// dispatch_query_hook, resolve_alias_to_canonical) take a shared lock, so
// many worker threads can be running through plugin callbacks at the same
// time without serializing on a single std::mutex. Writers — load/init/stop
// paths that publish or unpublish the manager pointer — take the unique
// lock. This change is what keeps query-hook dispatch from collapsing the
// per-worker MySQL_Thread / PgSQL_Thread parallelism onto one mutex once a
// plugin actually wires a hook into the hot path.
std::shared_mutex g_active_plugin_manager_mutex {};
// Serializes load/init/stop operations. Held for the duration of a plugin
// lifecycle transition so two reload paths cannot race on g_registry_target /
// g_registry_registration_*. Distinct from g_active_plugin_manager_mutex,
@ -827,7 +836,10 @@ bool proxysql_dispatch_configured_plugin_admin_command(
const std::string& sql,
ProxySQL_PluginCommandResult& result
) {
std::lock_guard<std::mutex> lock(g_active_plugin_manager_mutex);
// Reader: shared lock so concurrent admin sessions can dispatch
// plugin commands in parallel. The unique-lock writers (publish /
// unpublish in load_/stop_configured_plugins) still serialize swaps.
std::shared_lock<std::shared_mutex> lock(g_active_plugin_manager_mutex);
if (g_active_plugin_manager.load() == nullptr) {
return false;
}
@ -841,7 +853,7 @@ std::string proxysql_resolve_configured_plugin_admin_alias(const std::string& sq
// in the manager, and the caller typically releases the lock before
// dispatching. A borrowed c_str() would dangle if the manager is swapped
// out on reload between resolve and dispatch. Copy out under the lock.
std::lock_guard<std::mutex> lock(g_active_plugin_manager_mutex);
std::shared_lock<std::shared_mutex> lock(g_active_plugin_manager_mutex);
ProxySQL_PluginManager* mgr = g_active_plugin_manager.load();
if (mgr == nullptr) {
return {};
@ -854,7 +866,11 @@ bool proxysql_dispatch_configured_plugin_query_hook(
const ProxySQL_PluginQueryHookPayload& payload,
ProxySQL_PluginQueryHookResult& result
) {
std::lock_guard<std::mutex> lock(g_active_plugin_manager_mutex);
// Reader: shared lock so query-hook dispatch on the data-plane hot
// path scales across MySQL_Thread / PgSQL_Thread workers instead of
// serializing on a single std::mutex. This is the change that lets a
// plugin wire a query hook without collapsing per-worker parallelism.
std::shared_lock<std::shared_mutex> lock(g_active_plugin_manager_mutex);
ProxySQL_PluginManager* mgr = g_active_plugin_manager.load();
if (mgr == nullptr) {
return false;
@ -894,7 +910,7 @@ bool proxysql_load_configured_plugins(
std::lock_guard<std::mutex> lifecycle_lock(g_plugin_lifecycle_mutex);
err.clear();
{
std::lock_guard<std::mutex> lock(g_active_plugin_manager_mutex);
std::unique_lock<std::shared_mutex> lock(g_active_plugin_manager_mutex);
g_active_plugin_manager.store(nullptr, std::memory_order_release);
}
manager.reset();
@ -939,7 +955,7 @@ bool proxysql_load_configured_plugins(
// proxysql_init_configured_plugins() returns will race plain writes
// against plain reads.
{
std::lock_guard<std::mutex> lock(g_active_plugin_manager_mutex);
std::unique_lock<std::shared_mutex> lock(g_active_plugin_manager_mutex);
manager = std::move(next_manager);
g_active_plugin_manager.store(manager.get(), std::memory_order_release);
}
@ -999,7 +1015,7 @@ bool proxysql_stop_configured_plugins(
std::lock_guard<std::mutex> lifecycle_lock(g_plugin_lifecycle_mutex);
err.clear();
{
std::lock_guard<std::mutex> lock(g_active_plugin_manager_mutex);
std::unique_lock<std::shared_mutex> lock(g_active_plugin_manager_mutex);
g_active_plugin_manager.store(nullptr, std::memory_order_release);
}
if (!manager) {

@ -35,10 +35,19 @@ enum MysqlxResponseState {
RESP_WAITING_SESS_RESET
};
// Per-session TLS posture. Today the runtime only distinguishes
// "TLS termination is allowed" (TLS_TERMINATE) from "this session is
// not configured for TLS at all" (TLS_OFF). The earlier draft also
// carried a TLS_PASSTHROUGH value with two corresponding branches in
// session.cpp that pretended to enable an end-to-end pipe; that path
// was never actually wired up (no call to set_tls_mode() ever set
// PASSTHROUGH, and the dead branches did not implement an opaque
// pipe — they just skipped TLS termination and resumed clear-text
// X-Protocol parsing, which would have desynced any real client).
// Removing the value rather than leaving a misleading enum.
enum MysqlxTlsMode {
TLS_OFF = 0,
TLS_TERMINATE,
TLS_PASSTHROUGH
TLS_TERMINATE
};
// X Protocol compression algorithm negotiated via Mysqlx.Connection.Capabilities.
@ -111,6 +120,16 @@ public:
uint64_t get_last_active_time() const { return last_active_time_; }
void set_last_active_time(uint64_t t) { last_active_time_ = t; }
// Best-effort graceful shutdown notification: send a Mysqlx::Error
// with code 1053 (server shutting down) to the client and drain any
// pending writes (and run SSL_shutdown if a TLS session is up). Used
// by Mysqlx_Thread::run() on the way out so connected clients see a
// clean X-Protocol error frame instead of an unannounced TCP RST or
// a half-finished TLS record. Safe to call regardless of session
// status_; idempotent in the sense that subsequent calls just no-op
// against an already-drained data stream.
void shutdown_notify_client();
// --- Test-only accessors ---
// These exist so unit tests can drive resolve_backend_target() in
// isolation from the full auth state machine. They are not called by

@ -62,7 +62,21 @@ int MysqlxConnection::start_connect(const char* host, int port) {
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
inet_pton(AF_INET, host, &addr.sin_addr);
// Reject anything that isn't an IPv4 dotted-quad. Previously the
// return value was discarded and inet_pton on a hostname (or empty
// string, or "::1", or anything else) silently left sin_addr at
// 0.0.0.0, producing a connect to localhost-or-INADDR_ANY-equivalent
// — a real footgun because mysqlx_backend_endpoints.hostname accepts
// arbitrary strings. Hostnames are not supported here; the upstream
// path is expected to have already resolved them. Failing fast with
// ERROR_STATE surfaces the misconfig instead of silently routing
// traffic to the wrong target.
if (inet_pton(AF_INET, host ? host : "", &addr.sin_addr) != 1) {
close(fd_);
fd_ = -1;
state_ = ERROR_STATE;
return -1;
}
int rc = ::connect(fd_, (struct sockaddr*)&addr, sizeof(addr));
if (rc == 0) { state_ = AUTHENTICATING; return 0; }
if (errno == EINPROGRESS) { state_ = CONNECTING; return 1; }

@ -201,7 +201,14 @@ bool MysqlxDataStream::do_ssl_handshake() {
ssl_handshake_done_ = true;
encrypted_ = true;
queue_encrypted_output();
uint8_t plain[65536];
// 64 KiB scratch buffer for the immediate post-handshake drain.
// thread_local static avoids putting it on the worker stack —
// some thread-stack budgets (e.g. ASan-instrumented builds, or
// thread_pool_size >> default thread stack rlimit) would put a
// stack-allocated 64 KiB local on the wrong side of the limit.
// Each Mysqlx_Thread owns its own thread_local instance so the
// buffer is not shared between threads.
static thread_local uint8_t plain[65536];
int dec;
while ((dec = SSL_read(ssl_, plain, sizeof(plain))) > 0) {
feed_bytes(plain, static_cast<size_t>(dec));

@ -916,12 +916,6 @@ void MysqlxSession::handler_session_closing() {
}
void MysqlxSession::handler_tls_accept_init() {
if (tls_mode_ == TLS_PASSTHROUGH) {
status_ = CONNECTING_CLIENT;
to_process = true;
return;
}
if (!client_ds_.ssl_init_done()) {
SSL_CTX* ctx = thread_ptr_ ? thread_ptr_->get_ssl_ctx() : nullptr;
if (!ctx) {
@ -1095,7 +1089,7 @@ void MysqlxSession::handler_connecting_server() {
backend_conn_->set_backend_user(backend_user.c_str());
backend_conn_->set_backend_schema(schema_.c_str());
if (client_ds_.is_encrypted() && tls_mode_ != TLS_PASSTHROUGH) {
if (client_ds_.is_encrypted()) {
if (thread_ptr_ && thread_ptr_->get_ssl_ctx()) {
backend_conn_->set_backend_tls_required(true);
backend_conn_->set_ssl_ctx(thread_ptr_->get_ssl_ctx());
@ -1146,6 +1140,34 @@ void MysqlxSession::return_backend_to_pool() {
// so no reset is needed.
}
void MysqlxSession::shutdown_notify_client() {
// Best-effort: enqueue a fatal X-Protocol error frame, then push the
// queued bytes to the wire. Skip if the client write side is already
// gone (fd <0) or if the session is already closing. Both branches
// are reachable in normal lifecycle and we don't want shutdown to
// hang on a half-closed peer.
if (client_ds_.get_fd() < 0) return;
if (status_ == X_SESSION_CLOSED || status_ == X_SESSION_CLOSING) {
return;
}
send_error(1053, "Server is shutting down", /*fatal=*/true);
// Drain the queued frame to the wire. Best-effort: a single
// write_to_net() pass — if the client is unresponsive we don't want
// shutdown to block.
client_ds_.write_to_net();
// If TLS is active, ask OpenSSL to send close_notify so the peer's
// TLS stack sees a clean shutdown rather than a torn-down record.
// SSL_set_quiet_shutdown(1) suppresses the bidirectional handshake
// — appropriate during process exit when waiting for the peer's
// close_notify is undesirable.
if (SSL* ssl = client_ds_.get_ssl()) {
SSL_set_quiet_shutdown(ssl, 1);
SSL_shutdown(ssl);
}
status_ = X_SESSION_CLOSING;
healthy = false;
}
void MysqlxSession::send_error(int code, const char* msg, bool fatal) {
Mysqlx::Error err;
err.set_code(code);

@ -114,6 +114,19 @@ void Mysqlx_Thread::run() {
if (rc > 0) process_ready_fds(nfds);
process_all_sessions();
}
// Shutdown drain. The destructor will close fds and free sessions
// shortly after we return. Before that, give every still-connected
// client a clean X-Protocol Error frame (code 1053, "Server is
// shutting down") plus a TLS close_notify so peers see a graceful
// teardown rather than an unannounced TCP RST or a torn TLS record.
// Best-effort: a single non-blocking write pass per session and a
// quiet-shutdown SSL_shutdown, since process exit is imminent and
// we cannot block on unresponsive peers.
std::lock_guard<std::mutex> lock(sessions_mutex_);
for (auto* sess : sessions_) {
if (sess) sess->shutdown_notify_client();
}
}
void Mysqlx_Thread::rebuild_poll_set() {
@ -311,6 +324,19 @@ int Mysqlx_Thread::add_listener(const char* bind_addr, int port, const char* rou
return 0;
}
// Removes the listener fd associated with `route_name`. Sessions that
// were already accepted on this listener and are mid-flight are NOT
// disturbed — they continue against their existing target_hostgroup_ /
// target_address_ / target_port_ until they finish their current
// transaction or hit idle timeout. This matches the surrounding
// MySQL-protocol behaviour (DROP TABLE doesn't cancel in-flight
// queries; an ALTER doesn't kick off open prepared statements). The
// route name is consulted at backend-target resolution time which only
// happens during auth, so an authenticated session does not depend on
// the route name still being live. If a future use case requires
// active disconnection of in-flight sessions on route removal, walk
// sessions_ here, match identity_->default_route, and call
// shutdown_notify_client() on each.
bool Mysqlx_Thread::remove_listener_for_route(const char* route_name) {
if (route_name == nullptr) return false;
std::lock_guard<std::mutex> lock(listener_mutex_);

Loading…
Cancel
Save