chore(mysqlx): retire dormant MysqlxWorker path and its smoke test

Deletes the mysqlx worker implementation and the TAP smoke test that
was its only caller. Also removes the corresponding entries from the
plugin Makefile, the unit-tests Makefile, and the CI groups manifest.

Files removed:
- plugins/mysqlx/src/mysqlx_worker.cpp
- plugins/mysqlx/include/mysqlx_worker.h
- test/tap/tests/test_mysqlx_listener_smoke-t.cpp

References cleaned:
- plugins/mysqlx/Makefile: dropped mysqlx_worker.cpp from SRCS
- test/tap/tests/unit/Makefile: dropped test_mysqlx_listener_smoke-t
  from UNIT_TESTS and deleted its build rule
- test/tap/groups/groups.json: removed the smoke-test entry from the
  unit-tests-g1 group

Why: the worker was an earlier parallel implementation of the mysqlx
session path (separate accept thread + worker queue + per-listener
route tracking + identity.default_route -> pick_endpoint). It never
reached any call site in the production proxysql binary. The only
consumers of its public API were the smoke test and the worker code
itself. The route-identity fix in PR #5641 used the worker's design
as a reference and implemented the fix in the active session path
(Mysqlx_Thread + MysqlxSession + MysqlxConfigStore::pick_endpoint).
With that landed, the dormant code is pure tech debt — reviewers
repeatedly mistook its logic for the active path.

Why retire the smoke test at the same time: the test exercised
mysqlx_start_listeners_from_runtime_routes() which was the worker's
only entry function. With the worker gone, the smoke test cannot
link; keeping it would produce a broken CI target.

Out of scope: no changes to the active session path, no changes to
the MysqlxConfigStore or admin-schema tables, no changes to any
other tests. Pure deletion + Makefile/groups cleanup.
chore/retire-dead-mysqlx-worker
Rene Cannao 1 month ago
parent 923cbfeadc
commit 98aee7db21

@ -31,7 +31,6 @@ CXXFLAGS := $(STDCPP) -fPIC $(OPTZ) $(WGCOV) $(WASAN) -pthread
SRCS := $(PLUGIN_DIR)/src/mysqlx_plugin.cpp \
$(PLUGIN_DIR)/src/mysqlx_admin_schema.cpp \
$(PLUGIN_DIR)/src/mysqlx_config_store.cpp \
$(PLUGIN_DIR)/src/mysqlx_worker.cpp \
$(PLUGIN_DIR)/src/mysqlx_protocol.cpp \
$(PLUGIN_DIR)/src/mysqlx_frontend_session.cpp \
$(PLUGIN_DIR)/src/mysqlx_backend_session.cpp \

@ -1,48 +0,0 @@
#ifndef PROXYSQL_MYSQLX_WORKER_H
#define PROXYSQL_MYSQLX_WORKER_H
#include <atomic>
#include <cstdint>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
struct MysqlxListenerHandle {
int fd { -1 };
std::string bind_address {};
uint16_t port { 0 };
std::string route_name {};
};
class MysqlxWorker {
public:
explicit MysqlxWorker(uint32_t worker_id);
~MysqlxWorker();
MysqlxWorker(const MysqlxWorker&) = delete;
MysqlxWorker& operator=(const MysqlxWorker&) = delete;
void start();
void stop();
bool is_running() const;
void enqueue_client_fd(int fd);
private:
void run();
uint32_t worker_id_;
std::atomic<bool> running_ { false };
std::thread thread_ {};
std::mutex queue_mutex_ {};
std::vector<int> pending_fds_ {};
};
// Listener management — called from plugin start/stop
bool mysqlx_start_listeners_from_runtime_routes(class SQLite3DB& admindb);
void mysqlx_stop_listeners();
// Returns the number of active listeners (for testing)
size_t mysqlx_listener_count();
#endif /* PROXYSQL_MYSQLX_WORKER_H */

@ -1,306 +0,0 @@
#include "mysqlx_worker.h"
#include "mysqlx_backend_session.h"
#include "mysqlx_frontend_session.h"
#include "mysqlx_plugin.h"
#include "mysqlx_stats.h"
#include "sqlite3db.h"
#include <arpa/inet.h>
#include <cstdlib>
#include <cerrno>
#include <cstring>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <poll.h>
#include <sys/socket.h>
#include <unistd.h>
namespace {
// Global listener state, managed by mysqlx_start/stop_listeners.
std::vector<MysqlxListenerHandle> g_listeners {};
std::vector<std::unique_ptr<MysqlxWorker>> g_workers {};
std::atomic<bool> g_accept_running { false };
std::thread g_accept_thread {};
std::atomic<uint32_t> g_rr_index { 0 };
bool parse_bind(const std::string& bind, std::string& host, uint16_t& port) {
if (!bind.empty() && bind[0] == '[') {
auto closing = bind.find(']');
if (closing != std::string::npos) {
host = bind.substr(1, closing - 1);
port = 33060;
if (closing + 1 < bind.size() && bind[closing + 1] == ':') {
port = static_cast<uint16_t>(std::atoi(bind.substr(closing + 2).c_str()));
}
return true;
}
}
auto pos = bind.rfind(':');
if (pos == std::string::npos || pos == 0 || pos == bind.size() - 1) {
return false;
}
host = bind.substr(0, pos);
int p = std::atoi(bind.substr(pos + 1).c_str());
if (p <= 0 || p > 65535) {
return false;
}
port = static_cast<uint16_t>(p);
return true;
}
int create_listener_socket(const std::string& host, uint16_t port) {
std::string port_str = std::to_string(port);
struct addrinfo hints {}, *result = nullptr;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if (getaddrinfo(host.empty() ? nullptr : host.c_str(), port_str.c_str(), &hints, &result) != 0) {
return -1;
}
int fd = -1;
for (struct addrinfo* rp = result; rp != nullptr; rp = rp->ai_next) {
fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd < 0) continue;
int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
int nodelay = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0 && listen(fd, 128) == 0) {
break;
}
close(fd);
fd = -1;
}
freeaddrinfo(result);
return fd;
}
void accept_loop() {
while (g_accept_running.load()) {
// Build pollfd array from listeners.
std::vector<struct pollfd> pfds {};
for (const auto& listener : g_listeners) {
if (listener.fd >= 0) {
pfds.push_back({ listener.fd, POLLIN, 0 });
}
}
if (pfds.empty()) {
struct timespec ts { 0, 100000000 }; // 100ms
nanosleep(&ts, nullptr);
continue;
}
int ready = poll(pfds.data(), static_cast<nfds_t>(pfds.size()), 200 /*ms*/);
if (ready <= 0) {
continue;
}
for (const auto& pfd : pfds) {
if (pfd.revents & POLLIN) {
int client_fd = accept(pfd.fd, nullptr, nullptr);
if (client_fd < 0) {
continue;
}
int nodelay = 1;
setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
if (!g_workers.empty()) {
uint32_t idx = g_rr_index.fetch_add(1, std::memory_order_relaxed)
% static_cast<uint32_t>(g_workers.size());
g_workers[idx]->enqueue_client_fd(client_fd);
} else {
close(client_fd);
}
}
}
}
}
} // namespace
// ---------------------------------------------------------------------------
// MysqlxWorker
// ---------------------------------------------------------------------------
MysqlxWorker::MysqlxWorker(uint32_t worker_id) : worker_id_(worker_id) {}
MysqlxWorker::~MysqlxWorker() {
stop();
}
void MysqlxWorker::start() {
running_.store(true);
thread_ = std::thread(&MysqlxWorker::run, this);
}
void MysqlxWorker::stop() {
running_.store(false);
if (thread_.joinable()) {
thread_.join();
}
}
bool MysqlxWorker::is_running() const {
return running_.load();
}
void MysqlxWorker::enqueue_client_fd(int fd) {
std::lock_guard<std::mutex> lock(queue_mutex_);
pending_fds_.push_back(fd);
}
void MysqlxWorker::run() {
while (running_.load()) {
std::vector<int> fds {};
{
std::lock_guard<std::mutex> lock(queue_mutex_);
fds.swap(pending_fds_);
}
for (int fd : fds) {
MysqlxFrontendSession session(fd);
MysqlxPluginContext& ctx = mysqlx_context();
if (ctx.config_store && session.run_handshake_and_auth(*ctx.config_store)) {
// Capture topology generation at session bind time.
// A future GR observer only needs to call bump_topology_generation()
// to trigger re-evaluation on the next session or reconnect.
uint64_t bound_generation = ctx.config_store->topology_generation();
(void)bound_generation; // Phase 2 seam: compare on reconnect/reroute.
const MysqlxResolvedIdentity& identity = session.identity();
MysqlxBackendEndpoint endpoint = ctx.config_store->pick_endpoint(identity.default_route);
if (endpoint.hostname.empty()) {
mysqlx_send_error(fd, 4000, "No backend endpoint available for route");
int hg = ctx.config_store->route_hostgroup(identity.default_route);
mysqlx_stats().record_conn_err(identity.default_route, hg);
} else {
MysqlxBackendSession backend;
std::string err {};
int hg = ctx.config_store->route_hostgroup(identity.default_route);
if (backend.connect(identity, endpoint, err)) {
mysqlx_stats().record_conn_ok(identity.default_route, hg);
mysqlx_stats().record_conn_used(identity.default_route, hg);
backend.relay(fd);
} else {
mysqlx_send_error(fd, 4001, "Backend connection failed: " + err);
mysqlx_stats().record_conn_err(identity.default_route, hg);
}
}
}
close(fd);
}
if (fds.empty()) {
struct timespec ts { 0, 50000000 }; // 50ms
nanosleep(&ts, nullptr);
}
}
}
// ---------------------------------------------------------------------------
// Listener management
// ---------------------------------------------------------------------------
bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& admindb) {
// Read active routes from runtime_mysqlx_routes.
char* error = nullptr;
std::unique_ptr<SQLite3_result> result(
admindb.execute_statement(
"SELECT name, bind FROM runtime_mysqlx_routes WHERE active=1",
&error
)
);
if (error != nullptr) {
free(error);
return false;
}
if (!result || result->rows.empty()) {
// No active routes — not an error, just nothing to listen on.
return true;
}
// Open listeners.
for (auto* row : result->rows) {
if (row == nullptr || row->fields[0] == nullptr || row->fields[1] == nullptr) {
continue;
}
std::string route_name = row->fields[0];
std::string bind_str = row->fields[1];
std::string host {};
uint16_t port = 0;
if (!parse_bind(bind_str, host, port)) {
continue;
}
int fd = create_listener_socket(host, port);
if (fd < 0) {
continue;
}
MysqlxListenerHandle handle {};
handle.fd = fd;
handle.bind_address = host;
handle.port = port;
handle.route_name = route_name;
g_listeners.push_back(std::move(handle));
}
if (g_listeners.empty()) {
return true;
}
// Start one worker thread (MVP; scale later).
g_workers.push_back(std::make_unique<MysqlxWorker>(0));
g_workers.back()->start();
// Start the accept thread.
g_accept_running.store(true);
g_accept_thread = std::thread(accept_loop);
return true;
}
void mysqlx_stop_listeners() {
// Signal the accept loop to stop.
g_accept_running.store(false);
if (g_accept_thread.joinable()) {
g_accept_thread.join();
}
// Stop all workers.
for (auto& worker : g_workers) {
worker->stop();
}
g_workers.clear();
// Close all listener sockets.
for (auto& listener : g_listeners) {
if (listener.fd >= 0) {
close(listener.fd);
listener.fd = -1;
}
}
g_listeners.clear();
g_rr_index = 0;
}
size_t mysqlx_listener_count() {
return g_listeners.size();
}

@ -341,7 +341,6 @@
"test_mysql_connect_retries-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_mysql_connect_retries_delay-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_mysqlx_admin_tables-t" : [ "unit-tests-g1" ],
"test_mysqlx_listener_smoke-t" : [ "unit-tests-g1" ],
"test_mysqlx_plugin_load-t" : [ "unit-tests-g1" ],
"test_mysql_hostgroup_attributes-1-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],
"test_mysql_query_digests_stages-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ],

@ -1,186 +0,0 @@
/**
* test_mysqlx_listener_smoke-t.cpp
*
* Verify that the mysqlx plugin opens TCP listeners when started with
* active routes, and that plain TCP connections succeed.
*
* This is a hybrid unit test: it links against libproxysql.a and loads
* the mysqlx plugin .so without a running ProxySQL daemon.
*
* Stub implementations are provided for session/config/stats symbols
* referenced by MysqlxWorker::run() so that only mysqlx_worker.cpp
* needs to be compiled directly.
*/
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include "ProxySQL_PluginManager.h"
#include "sqlite3db.h"
#include "tap.h"
#include "mysqlx_config_store.h"
#include "mysqlx_frontend_session.h"
#include "mysqlx_backend_session.h"
#include "mysqlx_plugin.h"
#include "mysqlx_stats.h"
#include "mysqlx_protocol.h"
#ifndef PROXYSQL_MYSQLX_PLUGIN_PATH
#error "PROXYSQL_MYSQLX_PLUGIN_PATH must be defined at compile time"
#endif
static const uint16_t TEST_MYSQLX_PORT = 16603;
static const uint16_t TEST_MYSQLX_PORT_A = 46101;
static const uint16_t TEST_MYSQLX_PORT_B = 46102;
extern bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& db);
extern size_t mysqlx_listener_count();
extern void mysqlx_stop_listeners();
static bool seed_route(SQLite3DB& db, const char* name, const char* bind, int hg) {
char sql[256];
snprintf(sql, sizeof(sql),
"INSERT INTO runtime_mysqlx_routes (name, bind, destination_hostgroup, strategy, active) "
"VALUES ('%s', '%s', %d, 'first_available', 1)",
name, bind, hg);
return db.execute(sql);
}
// ---------------------------------------------------------------------------
// Stub implementations for symbols referenced by MysqlxWorker::run().
// The smoke test only verifies listener lifecycle and TCP connectivity;
// it does not exercise the X Protocol session path.
// ---------------------------------------------------------------------------
MysqlxFrontendSession::MysqlxFrontendSession(int fd) : client_fd_(fd) {}
MysqlxFrontendSession::~MysqlxFrontendSession() { if (client_fd_ >= 0) close(client_fd_); }
bool MysqlxFrontendSession::run_handshake_and_auth(MysqlxConfigStore&) { return false; }
MysqlxBackendSession::MysqlxBackendSession() = default;
MysqlxBackendSession::~MysqlxBackendSession() = default;
bool MysqlxBackendSession::connect(const MysqlxResolvedIdentity&, const MysqlxBackendEndpoint&, std::string&) { return false; }
bool MysqlxBackendSession::relay(int) { return false; }
static MysqlxPluginContext g_stub_context {};
MysqlxPluginContext& mysqlx_context() { return g_stub_context; }
static MysqlxStatsStore g_stub_stats;
MysqlxStatsStore& mysqlx_stats() { return g_stub_stats; }
void MysqlxStatsStore::record_conn_ok(const std::string&) {}
void MysqlxStatsStore::record_conn_err(const std::string&) {}
uint64_t MysqlxConfigStore::topology_generation() const { return 0; }
MysqlxBackendEndpoint MysqlxConfigStore::pick_endpoint(const std::string&) const { return {}; }
bool mysqlx_send_error(int, uint16_t, const std::string&, const std::string&) { return false; }
int main() {
plan(15);
ProxySQL_PluginManager mgr;
std::string err {};
ok(mgr.load(PROXYSQL_MYSQLX_PLUGIN_PATH, err),
"load mysqlx plugin succeeds: %s", err.c_str());
SQLite3DB admindb;
int dbrc = admindb.open(const_cast<char*>(":memory:"), SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
ok(dbrc == 0, "admin sqlite opens");
ok(mgr.init_all(err), "init_all registers schema: %s", err.c_str());
admindb.execute(
"CREATE TABLE IF NOT EXISTS runtime_mysqlx_routes ("
" name VARCHAR NOT NULL PRIMARY KEY,"
" bind VARCHAR NOT NULL,"
" destination_hostgroup INT NOT NULL,"
" fallback_hostgroup INT,"
" strategy VARCHAR NOT NULL DEFAULT 'first_available',"
" active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1,"
" attributes VARCHAR NOT NULL DEFAULT '',"
" comment VARCHAR NOT NULL DEFAULT ''"
" )"
);
// ok 4: No listeners before any start_listeners call.
ok(mysqlx_listener_count() == 0, "listener count is 0 before any listeners start");
// ok 5: Start with empty routes table.
{
bool rc = mysqlx_start_listeners_from_runtime_routes(admindb);
ok(rc == true && mysqlx_listener_count() == 0,
"start with empty routes table: succeeds, count stays 0");
}
mysqlx_stop_listeners();
// ok 6-10: Existing single-route test.
ok(seed_route(admindb, "smoke_route", "127.0.0.1:16603", 0),
"seed route row");
ok(mysqlx_start_listeners_from_runtime_routes(admindb),
"start_listeners_from_runtime_routes succeeds");
ok(mysqlx_listener_count() == 1, "one listener opened");
usleep(100000);
int fd = socket(AF_INET, SOCK_STREAM, 0);
ok(fd >= 0, "client socket created");
sockaddr_in addr {};
addr.sin_family = AF_INET;
addr.sin_port = htons(TEST_MYSQLX_PORT);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
int rc = connect(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
ok(rc == 0, "TCP connect to mysqlx listener on port %u succeeds", TEST_MYSQLX_PORT);
close(fd);
mysqlx_stop_listeners();
// ok 11: Count drops to 0 after stop.
ok(mysqlx_listener_count() == 0, "count drops to 0 after stop");
// ok 12: Re-start with two routes on different ports.
admindb.execute("DELETE FROM runtime_mysqlx_routes");
seed_route(admindb, "route_a", "127.0.0.1:46101", 1);
seed_route(admindb, "route_b", "127.0.0.1:46102", 2);
mysqlx_start_listeners_from_runtime_routes(admindb);
usleep(100000);
ok(mysqlx_listener_count() == 2, "re-start: two listeners on ports %u and %u",
TEST_MYSQLX_PORT_A, TEST_MYSQLX_PORT_B);
// ok 13: TCP connect to first listener.
{
int fd2 = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in a {};
a.sin_family = AF_INET;
a.sin_port = htons(TEST_MYSQLX_PORT_A);
inet_pton(AF_INET, "127.0.0.1", &a.sin_addr);
int r = connect(fd2, reinterpret_cast<sockaddr*>(&a), sizeof(a));
ok(r == 0, "TCP connect to listener on port %u succeeds", TEST_MYSQLX_PORT_A);
close(fd2);
}
// ok 14: TCP connect to second listener.
{
int fd3 = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in b {};
b.sin_family = AF_INET;
b.sin_port = htons(TEST_MYSQLX_PORT_B);
inet_pton(AF_INET, "127.0.0.1", &b.sin_addr);
int r = connect(fd3, reinterpret_cast<sockaddr*>(&b), sizeof(b));
ok(r == 0, "TCP connect to listener on port %u succeeds", TEST_MYSQLX_PORT_B);
close(fd3);
}
// ok 15: Stop when already stopped.
mysqlx_stop_listeners();
mysqlx_stop_listeners();
ok(mysqlx_listener_count() == 0, "stop when no listeners active: no crash, count stays 0");
return exit_status();
}

@ -326,7 +326,6 @@ UNIT_TESTS := smoke_test-t query_cache_unit-t query_processor_unit-t \
test_mysqlx_plugin_load-t \
mysqlx_config_store_unit-t \
test_mysqlx_admin_tables-t \
test_mysqlx_listener_smoke-t \
mysqlx_protocol_unit-t \
mysqlx_protocol_socket_unit-t \
mysqlx_route_store_unit-t \
@ -461,13 +460,6 @@ mysqlx_protocol_socket_unit-t: mysqlx_protocol_socket_unit-t.cpp $(PROXYSQL_PATH
$(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \
$(MYLIBS) -lprotobuf -lssl -lcrypto $(ALLOW_MULTI_DEF) -o $@
test_mysqlx_listener_smoke-t: ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) $(LIBPROXYSQLAR) mysqlx_plugin_build
$(CXX) ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) \
-DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \
-I$(PROXYSQL_PATH)/plugins/mysqlx/include \
$(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \
$(MYLIBS) -ldl -lpthread $(ALLOW_MULTI_DEF) -o $@
test_mysqlx_admin_tables-t: ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o $(LIBPROXYSQLAR) mysqlx_plugin_build
$(CXX) ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o \
-DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \

Loading…
Cancel
Save