feat: add mysqlx listener sockets and worker threads

Plugin start now opens TCP listeners for each active route in
runtime_mysqlx_routes.  An accept thread dispatches incoming
connections to worker threads round-robin.  Phase 1 workers
accept-then-close; Tasks 7-8 will add X Protocol handshake
and backend relay.
pull/5642/head
Rene Cannao 3 months ago
parent 0b11bce37e
commit 5b5bbfbcaf

@ -18,7 +18,8 @@ CXXFLAGS := $(STDCPP) -fPIC $(OPTZ) $(WGCOV) $(WASAN)
SRCS := $(PLUGIN_DIR)/src/mysqlx_plugin.cpp \
$(PLUGIN_DIR)/src/mysqlx_admin_schema.cpp \
$(PLUGIN_DIR)/src/mysqlx_config_store.cpp
$(PLUGIN_DIR)/src/mysqlx_config_store.cpp \
$(PLUGIN_DIR)/src/mysqlx_worker.cpp
HEADERS := $(wildcard $(PLUGIN_DIR)/include/*.h) \
$(PROXYSQL_PATH)/include/ProxySQL_Plugin.h
OBJS := $(patsubst $(PLUGIN_DIR)/src/%.cpp,$(ODIR)/%.o,$(SRCS))

@ -4,6 +4,7 @@
#include "ProxySQL_Plugin.h"
#include "mysqlx_admin_schema.h"
#include "mysqlx_config_store.h"
#include "mysqlx_worker.h"
#include <memory>

@ -0,0 +1,48 @@
#ifndef PROXYSQL_MYSQLX_WORKER_H
#define PROXYSQL_MYSQLX_WORKER_H
#include <atomic>
#include <cstdint>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
struct MysqlxListenerHandle {
int fd { -1 };
std::string bind_address {};
uint16_t port { 0 };
std::string route_name {};
};
class MysqlxWorker {
public:
explicit MysqlxWorker(uint32_t worker_id);
~MysqlxWorker();
MysqlxWorker(const MysqlxWorker&) = delete;
MysqlxWorker& operator=(const MysqlxWorker&) = delete;
void start();
void stop();
bool is_running() const;
void enqueue_client_fd(int fd);
private:
void run();
uint32_t worker_id_;
std::atomic<bool> running_ { false };
std::thread thread_ {};
std::mutex queue_mutex_ {};
std::vector<int> pending_fds_ {};
};
// Listener management — called from plugin start/stop
bool mysqlx_start_listeners_from_runtime_routes(class SQLite3DB& admindb);
void mysqlx_stop_listeners();
// Returns the number of active listeners (for testing)
size_t mysqlx_listener_count();
#endif /* PROXYSQL_MYSQLX_WORKER_H */

@ -16,11 +16,21 @@ bool mysqlx_init(ProxySQL_PluginServices* services) {
bool mysqlx_start() {
MysqlxPluginContext& ctx = mysqlx_context();
// Open listener sockets for active routes if an admin DB is available.
if (ctx.services != nullptr && ctx.services->get_admindb != nullptr) {
SQLite3DB* admindb = ctx.services->get_admindb();
if (admindb != nullptr) {
mysqlx_start_listeners_from_runtime_routes(*admindb);
}
}
ctx.started = true;
return true;
}
bool mysqlx_stop() {
mysqlx_stop_listeners();
MysqlxPluginContext& ctx = mysqlx_context();
ctx.started = false;
return true;

@ -0,0 +1,264 @@
#include "mysqlx_worker.h"
#include "sqlite3db.h"
#include <arpa/inet.h>
#include <cerrno>
#include <cstring>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
namespace {
// Global listener state, managed by mysqlx_start/stop_listeners.
std::vector<MysqlxListenerHandle> g_listeners {};
std::vector<std::unique_ptr<MysqlxWorker>> g_workers {};
std::atomic<bool> g_accept_running { false };
std::thread g_accept_thread {};
uint32_t g_rr_index { 0 };
bool parse_bind(const std::string& bind, std::string& host, uint16_t& port) {
auto pos = bind.rfind(':');
if (pos == std::string::npos || pos == 0 || pos == bind.size() - 1) {
return false;
}
host = bind.substr(0, pos);
int p = std::atoi(bind.substr(pos + 1).c_str());
if (p <= 0 || p > 65535) {
return false;
}
port = static_cast<uint16_t>(p);
return true;
}
int create_listener_socket(const std::string& host, uint16_t port) {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
return -1;
}
int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
sockaddr_in addr {};
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
if (inet_pton(AF_INET, host.c_str(), &addr.sin_addr) != 1) {
close(fd);
return -1;
}
if (bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0) {
close(fd);
return -1;
}
if (listen(fd, 128) < 0) {
close(fd);
return -1;
}
return fd;
}
void accept_loop() {
while (g_accept_running.load()) {
// Simple poll across all listeners with a timeout so we can
// check g_accept_running periodically.
fd_set rfds;
FD_ZERO(&rfds);
int maxfd = -1;
for (const auto& listener : g_listeners) {
if (listener.fd >= 0) {
FD_SET(listener.fd, &rfds);
if (listener.fd > maxfd) {
maxfd = listener.fd;
}
}
}
if (maxfd < 0) {
// No valid listeners; sleep briefly and retry.
struct timespec ts { 0, 100000000 }; // 100ms
nanosleep(&ts, nullptr);
continue;
}
struct timeval tv { 0, 200000 }; // 200ms timeout
int ready = select(maxfd + 1, &rfds, nullptr, nullptr, &tv);
if (ready <= 0) {
continue;
}
for (const auto& listener : g_listeners) {
if (listener.fd >= 0 && FD_ISSET(listener.fd, &rfds)) {
sockaddr_in client_addr {};
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(listener.fd, reinterpret_cast<sockaddr*>(&client_addr), &client_len);
if (client_fd < 0) {
continue;
}
// Round-robin dispatch to workers.
if (!g_workers.empty()) {
uint32_t idx = g_rr_index++ % static_cast<uint32_t>(g_workers.size());
g_workers[idx]->enqueue_client_fd(client_fd);
} else {
close(client_fd);
}
}
}
}
}
} // namespace
// ---------------------------------------------------------------------------
// MysqlxWorker
// ---------------------------------------------------------------------------
MysqlxWorker::MysqlxWorker(uint32_t worker_id) : worker_id_(worker_id) {}
MysqlxWorker::~MysqlxWorker() {
stop();
}
void MysqlxWorker::start() {
running_.store(true);
thread_ = std::thread(&MysqlxWorker::run, this);
}
void MysqlxWorker::stop() {
running_.store(false);
if (thread_.joinable()) {
thread_.join();
}
}
bool MysqlxWorker::is_running() const {
return running_.load();
}
void MysqlxWorker::enqueue_client_fd(int fd) {
std::lock_guard<std::mutex> lock(queue_mutex_);
pending_fds_.push_back(fd);
}
void MysqlxWorker::run() {
while (running_.load()) {
std::vector<int> fds {};
{
std::lock_guard<std::mutex> lock(queue_mutex_);
fds.swap(pending_fds_);
}
for (int fd : fds) {
// Phase 1 MVP: accept connection, then close it.
// Tasks 7-8 will add X Protocol handshake and backend relay here.
close(fd);
}
if (fds.empty()) {
struct timespec ts { 0, 50000000 }; // 50ms
nanosleep(&ts, nullptr);
}
}
}
// ---------------------------------------------------------------------------
// Listener management
// ---------------------------------------------------------------------------
bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& admindb) {
// Read active routes from runtime_mysqlx_routes.
char* error = nullptr;
std::unique_ptr<SQLite3_result> result(
admindb.execute_statement(
"SELECT name, bind FROM runtime_mysqlx_routes WHERE active=1",
&error
)
);
if (error != nullptr) {
free(error);
return false;
}
if (!result || result->rows.empty()) {
// No active routes — not an error, just nothing to listen on.
return true;
}
// Open listeners.
for (auto* row : result->rows) {
if (row == nullptr || row->fields[0] == nullptr || row->fields[1] == nullptr) {
continue;
}
std::string route_name = row->fields[0];
std::string bind_str = row->fields[1];
std::string host {};
uint16_t port = 0;
if (!parse_bind(bind_str, host, port)) {
continue;
}
int fd = create_listener_socket(host, port);
if (fd < 0) {
continue;
}
MysqlxListenerHandle handle {};
handle.fd = fd;
handle.bind_address = host;
handle.port = port;
handle.route_name = route_name;
g_listeners.push_back(std::move(handle));
}
if (g_listeners.empty()) {
return true;
}
// Start one worker thread (MVP; scale later).
g_workers.push_back(std::make_unique<MysqlxWorker>(0));
g_workers.back()->start();
// Start the accept thread.
g_accept_running.store(true);
g_accept_thread = std::thread(accept_loop);
return true;
}
void mysqlx_stop_listeners() {
// Signal the accept loop to stop.
g_accept_running.store(false);
if (g_accept_thread.joinable()) {
g_accept_thread.join();
}
// Stop all workers.
for (auto& worker : g_workers) {
worker->stop();
}
g_workers.clear();
// Close all listener sockets.
for (auto& listener : g_listeners) {
if (listener.fd >= 0) {
close(listener.fd);
listener.fd = -1;
}
}
g_listeners.clear();
g_rr_index = 0;
}
size_t mysqlx_listener_count() {
return g_listeners.size();
}

@ -0,0 +1,103 @@
/**
* test_mysqlx_listener_smoke-t.cpp
*
* Verify that the mysqlx plugin opens a TCP listener when started with
* an active route, and that a plain TCP connection succeeds.
*
* This is a hybrid unit test: it links against libproxysql.a and loads
* the mysqlx plugin .so without a running ProxySQL daemon.
*/
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include "ProxySQL_PluginManager.h"
#include "sqlite3db.h"
#include "tap.h"
#ifndef PROXYSQL_MYSQLX_PLUGIN_PATH
#error "PROXYSQL_MYSQLX_PLUGIN_PATH must be defined at compile time"
#endif
// Pick a high ephemeral port unlikely to collide.
static const uint16_t TEST_MYSQLX_PORT = 16603;
int main() {
plan(8);
// Step 1: Load the plugin.
ProxySQL_PluginManager mgr;
std::string err {};
ok(mgr.load(PROXYSQL_MYSQLX_PLUGIN_PATH, err),
"load mysqlx plugin succeeds: %s", err.c_str());
// Step 2: Create in-memory admin DB and register plugin schema.
SQLite3DB admindb;
int dbrc = admindb.open(const_cast<char*>(":memory:"), SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
ok(dbrc == 0, "admin sqlite opens");
// Wire the admin DB into plugin services so init and start can use it.
ok(mgr.init_all(err), "init_all registers schema: %s", err.c_str());
// Step 3: Create the runtime_mysqlx_routes table and seed a route.
admindb.execute(
"CREATE TABLE IF NOT EXISTS runtime_mysqlx_routes ("
" name VARCHAR NOT NULL PRIMARY KEY,"
" bind VARCHAR NOT NULL,"
" destination_hostgroup INT NOT NULL,"
" fallback_hostgroup INT,"
" strategy VARCHAR NOT NULL DEFAULT 'first_available',"
" active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1,"
" attributes VARCHAR NOT NULL DEFAULT '',"
" comment VARCHAR NOT NULL DEFAULT ''"
" )"
);
char insert_sql[256];
snprintf(insert_sql, sizeof(insert_sql),
"INSERT INTO runtime_mysqlx_routes (name, bind, destination_hostgroup, strategy, active) "
"VALUES ('smoke_route', '127.0.0.1:%u', 0, 'first_available', 1)",
TEST_MYSQLX_PORT);
ok(admindb.execute(insert_sql), "seed route row");
// Step 4: Start listeners from the runtime routes table.
// We call the worker API directly rather than through plugin start,
// because plugin start needs the service wiring which is complex
// in a unit test. This tests the listener code path directly.
{
// Include the worker header via the plugin header.
extern bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& db);
extern size_t mysqlx_listener_count();
ok(mysqlx_start_listeners_from_runtime_routes(admindb),
"start_listeners_from_runtime_routes succeeds");
ok(mysqlx_listener_count() == 1, "one listener opened");
}
// Step 5: TCP connect to the listener port.
// Give the accept thread a moment to start.
usleep(100000); // 100ms
int fd = socket(AF_INET, SOCK_STREAM, 0);
ok(fd >= 0, "client socket created");
sockaddr_in addr {};
addr.sin_family = AF_INET;
addr.sin_port = htons(TEST_MYSQLX_PORT);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
int rc = connect(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
ok(rc == 0, "TCP connect to mysqlx listener on port %u succeeds", TEST_MYSQLX_PORT);
close(fd);
// Step 6: Clean up.
{
extern void mysqlx_stop_listeners();
mysqlx_stop_listeners();
}
return exit_status();
}

@ -320,7 +320,8 @@ UNIT_TESTS := smoke_test-t query_cache_unit-t query_processor_unit-t \
plugin_registry_unit-t \
test_mysqlx_plugin_load-t \
mysqlx_config_store_unit-t \
test_mysqlx_admin_tables-t
test_mysqlx_admin_tables-t \
test_mysqlx_listener_smoke-t
.PHONY: all
all: $(UNIT_TESTS)
@ -368,6 +369,13 @@ mysqlx_config_store_unit-t: mysqlx_config_store_unit-t.cpp $(PROXYSQL_PATH)/plug
$(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) $(MYLIBS) \
$(ALLOW_MULTI_DEF) -o $@
test_mysqlx_listener_smoke-t: ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) $(LIBPROXYSQLAR) mysqlx_plugin_build
$(CXX) ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) \
-DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \
-I$(PROXYSQL_PATH)/plugins/mysqlx/include \
$(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \
$(MYLIBS) -ldl -lpthread $(ALLOW_MULTI_DEF) -o $@
test_mysqlx_admin_tables-t: ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o $(LIBPROXYSQLAR) mysqlx_plugin_build
$(CXX) ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o \
-DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \

Loading…
Cancel
Save