diff --git a/plugins/mysqlx/Makefile b/plugins/mysqlx/Makefile index 90495bba3..36374abbb 100644 --- a/plugins/mysqlx/Makefile +++ b/plugins/mysqlx/Makefile @@ -18,7 +18,8 @@ CXXFLAGS := $(STDCPP) -fPIC $(OPTZ) $(WGCOV) $(WASAN) SRCS := $(PLUGIN_DIR)/src/mysqlx_plugin.cpp \ $(PLUGIN_DIR)/src/mysqlx_admin_schema.cpp \ - $(PLUGIN_DIR)/src/mysqlx_config_store.cpp + $(PLUGIN_DIR)/src/mysqlx_config_store.cpp \ + $(PLUGIN_DIR)/src/mysqlx_worker.cpp HEADERS := $(wildcard $(PLUGIN_DIR)/include/*.h) \ $(PROXYSQL_PATH)/include/ProxySQL_Plugin.h OBJS := $(patsubst $(PLUGIN_DIR)/src/%.cpp,$(ODIR)/%.o,$(SRCS)) diff --git a/plugins/mysqlx/include/mysqlx_plugin.h b/plugins/mysqlx/include/mysqlx_plugin.h index 09eff3e3a..dfdeeb14a 100644 --- a/plugins/mysqlx/include/mysqlx_plugin.h +++ b/plugins/mysqlx/include/mysqlx_plugin.h @@ -4,6 +4,7 @@ #include "ProxySQL_Plugin.h" #include "mysqlx_admin_schema.h" #include "mysqlx_config_store.h" +#include "mysqlx_worker.h" #include diff --git a/plugins/mysqlx/include/mysqlx_worker.h b/plugins/mysqlx/include/mysqlx_worker.h new file mode 100644 index 000000000..a257fc2de --- /dev/null +++ b/plugins/mysqlx/include/mysqlx_worker.h @@ -0,0 +1,48 @@ +#ifndef PROXYSQL_MYSQLX_WORKER_H +#define PROXYSQL_MYSQLX_WORKER_H + +#include +#include +#include +#include +#include +#include + +struct MysqlxListenerHandle { + int fd { -1 }; + std::string bind_address {}; + uint16_t port { 0 }; + std::string route_name {}; +}; + +class MysqlxWorker { +public: + explicit MysqlxWorker(uint32_t worker_id); + ~MysqlxWorker(); + + MysqlxWorker(const MysqlxWorker&) = delete; + MysqlxWorker& operator=(const MysqlxWorker&) = delete; + + void start(); + void stop(); + bool is_running() const; + void enqueue_client_fd(int fd); + +private: + void run(); + + uint32_t worker_id_; + std::atomic running_ { false }; + std::thread thread_ {}; + std::mutex queue_mutex_ {}; + std::vector pending_fds_ {}; +}; + +// Listener management — called from plugin start/stop +bool mysqlx_start_listeners_from_runtime_routes(class SQLite3DB& admindb); +void mysqlx_stop_listeners(); + +// Returns the number of active listeners (for testing) +size_t mysqlx_listener_count(); + +#endif /* PROXYSQL_MYSQLX_WORKER_H */ diff --git a/plugins/mysqlx/src/mysqlx_plugin.cpp b/plugins/mysqlx/src/mysqlx_plugin.cpp index 0489bf5c8..06f0e2416 100644 --- a/plugins/mysqlx/src/mysqlx_plugin.cpp +++ b/plugins/mysqlx/src/mysqlx_plugin.cpp @@ -16,11 +16,21 @@ bool mysqlx_init(ProxySQL_PluginServices* services) { bool mysqlx_start() { MysqlxPluginContext& ctx = mysqlx_context(); + + // Open listener sockets for active routes if an admin DB is available. + if (ctx.services != nullptr && ctx.services->get_admindb != nullptr) { + SQLite3DB* admindb = ctx.services->get_admindb(); + if (admindb != nullptr) { + mysqlx_start_listeners_from_runtime_routes(*admindb); + } + } + ctx.started = true; return true; } bool mysqlx_stop() { + mysqlx_stop_listeners(); MysqlxPluginContext& ctx = mysqlx_context(); ctx.started = false; return true; diff --git a/plugins/mysqlx/src/mysqlx_worker.cpp b/plugins/mysqlx/src/mysqlx_worker.cpp new file mode 100644 index 000000000..5a0d22e10 --- /dev/null +++ b/plugins/mysqlx/src/mysqlx_worker.cpp @@ -0,0 +1,264 @@ +#include "mysqlx_worker.h" + +#include "sqlite3db.h" + +#include +#include +#include +#include +#include +#include + +namespace { + +// Global listener state, managed by mysqlx_start/stop_listeners. +std::vector g_listeners {}; +std::vector> g_workers {}; +std::atomic g_accept_running { false }; +std::thread g_accept_thread {}; +uint32_t g_rr_index { 0 }; + +bool parse_bind(const std::string& bind, std::string& host, uint16_t& port) { + auto pos = bind.rfind(':'); + if (pos == std::string::npos || pos == 0 || pos == bind.size() - 1) { + return false; + } + + host = bind.substr(0, pos); + int p = std::atoi(bind.substr(pos + 1).c_str()); + if (p <= 0 || p > 65535) { + return false; + } + + port = static_cast(p); + return true; +} + +int create_listener_socket(const std::string& host, uint16_t port) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + return -1; + } + + int opt = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + sockaddr_in addr {}; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + + if (inet_pton(AF_INET, host.c_str(), &addr.sin_addr) != 1) { + close(fd); + return -1; + } + + if (bind(fd, reinterpret_cast(&addr), sizeof(addr)) < 0) { + close(fd); + return -1; + } + + if (listen(fd, 128) < 0) { + close(fd); + return -1; + } + + return fd; +} + +void accept_loop() { + while (g_accept_running.load()) { + // Simple poll across all listeners with a timeout so we can + // check g_accept_running periodically. + fd_set rfds; + FD_ZERO(&rfds); + int maxfd = -1; + + for (const auto& listener : g_listeners) { + if (listener.fd >= 0) { + FD_SET(listener.fd, &rfds); + if (listener.fd > maxfd) { + maxfd = listener.fd; + } + } + } + + if (maxfd < 0) { + // No valid listeners; sleep briefly and retry. + struct timespec ts { 0, 100000000 }; // 100ms + nanosleep(&ts, nullptr); + continue; + } + + struct timeval tv { 0, 200000 }; // 200ms timeout + int ready = select(maxfd + 1, &rfds, nullptr, nullptr, &tv); + if (ready <= 0) { + continue; + } + + for (const auto& listener : g_listeners) { + if (listener.fd >= 0 && FD_ISSET(listener.fd, &rfds)) { + sockaddr_in client_addr {}; + socklen_t client_len = sizeof(client_addr); + int client_fd = accept(listener.fd, reinterpret_cast(&client_addr), &client_len); + if (client_fd < 0) { + continue; + } + + // Round-robin dispatch to workers. + if (!g_workers.empty()) { + uint32_t idx = g_rr_index++ % static_cast(g_workers.size()); + g_workers[idx]->enqueue_client_fd(client_fd); + } else { + close(client_fd); + } + } + } + } +} + +} // namespace + +// --------------------------------------------------------------------------- +// MysqlxWorker +// --------------------------------------------------------------------------- + +MysqlxWorker::MysqlxWorker(uint32_t worker_id) : worker_id_(worker_id) {} + +MysqlxWorker::~MysqlxWorker() { + stop(); +} + +void MysqlxWorker::start() { + running_.store(true); + thread_ = std::thread(&MysqlxWorker::run, this); +} + +void MysqlxWorker::stop() { + running_.store(false); + if (thread_.joinable()) { + thread_.join(); + } +} + +bool MysqlxWorker::is_running() const { + return running_.load(); +} + +void MysqlxWorker::enqueue_client_fd(int fd) { + std::lock_guard lock(queue_mutex_); + pending_fds_.push_back(fd); +} + +void MysqlxWorker::run() { + while (running_.load()) { + std::vector fds {}; + { + std::lock_guard lock(queue_mutex_); + fds.swap(pending_fds_); + } + + for (int fd : fds) { + // Phase 1 MVP: accept connection, then close it. + // Tasks 7-8 will add X Protocol handshake and backend relay here. + close(fd); + } + + if (fds.empty()) { + struct timespec ts { 0, 50000000 }; // 50ms + nanosleep(&ts, nullptr); + } + } +} + +// --------------------------------------------------------------------------- +// Listener management +// --------------------------------------------------------------------------- + +bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& admindb) { + // Read active routes from runtime_mysqlx_routes. + char* error = nullptr; + std::unique_ptr result( + admindb.execute_statement( + "SELECT name, bind FROM runtime_mysqlx_routes WHERE active=1", + &error + ) + ); + if (error != nullptr) { + free(error); + return false; + } + if (!result || result->rows.empty()) { + // No active routes — not an error, just nothing to listen on. + return true; + } + + // Open listeners. + for (auto* row : result->rows) { + if (row == nullptr || row->fields[0] == nullptr || row->fields[1] == nullptr) { + continue; + } + + std::string route_name = row->fields[0]; + std::string bind_str = row->fields[1]; + std::string host {}; + uint16_t port = 0; + + if (!parse_bind(bind_str, host, port)) { + continue; + } + + int fd = create_listener_socket(host, port); + if (fd < 0) { + continue; + } + + MysqlxListenerHandle handle {}; + handle.fd = fd; + handle.bind_address = host; + handle.port = port; + handle.route_name = route_name; + g_listeners.push_back(std::move(handle)); + } + + if (g_listeners.empty()) { + return true; + } + + // Start one worker thread (MVP; scale later). + g_workers.push_back(std::make_unique(0)); + g_workers.back()->start(); + + // Start the accept thread. + g_accept_running.store(true); + g_accept_thread = std::thread(accept_loop); + + return true; +} + +void mysqlx_stop_listeners() { + // Signal the accept loop to stop. + g_accept_running.store(false); + if (g_accept_thread.joinable()) { + g_accept_thread.join(); + } + + // Stop all workers. + for (auto& worker : g_workers) { + worker->stop(); + } + g_workers.clear(); + + // Close all listener sockets. + for (auto& listener : g_listeners) { + if (listener.fd >= 0) { + close(listener.fd); + listener.fd = -1; + } + } + g_listeners.clear(); + g_rr_index = 0; +} + +size_t mysqlx_listener_count() { + return g_listeners.size(); +} diff --git a/test/tap/tests/test_mysqlx_listener_smoke-t.cpp b/test/tap/tests/test_mysqlx_listener_smoke-t.cpp new file mode 100644 index 000000000..bd573c754 --- /dev/null +++ b/test/tap/tests/test_mysqlx_listener_smoke-t.cpp @@ -0,0 +1,103 @@ +/** + * test_mysqlx_listener_smoke-t.cpp + * + * Verify that the mysqlx plugin opens a TCP listener when started with + * an active route, and that a plain TCP connection succeeds. + * + * This is a hybrid unit test: it links against libproxysql.a and loads + * the mysqlx plugin .so without a running ProxySQL daemon. + */ + +#include +#include +#include +#include + +#include "ProxySQL_PluginManager.h" +#include "sqlite3db.h" +#include "tap.h" + +#ifndef PROXYSQL_MYSQLX_PLUGIN_PATH +#error "PROXYSQL_MYSQLX_PLUGIN_PATH must be defined at compile time" +#endif + +// Pick a high ephemeral port unlikely to collide. +static const uint16_t TEST_MYSQLX_PORT = 16603; + +int main() { + plan(8); + + // Step 1: Load the plugin. + ProxySQL_PluginManager mgr; + std::string err {}; + + ok(mgr.load(PROXYSQL_MYSQLX_PLUGIN_PATH, err), + "load mysqlx plugin succeeds: %s", err.c_str()); + + // Step 2: Create in-memory admin DB and register plugin schema. + SQLite3DB admindb; + int dbrc = admindb.open(const_cast(":memory:"), SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE); + ok(dbrc == 0, "admin sqlite opens"); + + // Wire the admin DB into plugin services so init and start can use it. + ok(mgr.init_all(err), "init_all registers schema: %s", err.c_str()); + + // Step 3: Create the runtime_mysqlx_routes table and seed a route. + admindb.execute( + "CREATE TABLE IF NOT EXISTS runtime_mysqlx_routes (" + " name VARCHAR NOT NULL PRIMARY KEY," + " bind VARCHAR NOT NULL," + " destination_hostgroup INT NOT NULL," + " fallback_hostgroup INT," + " strategy VARCHAR NOT NULL DEFAULT 'first_available'," + " active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1," + " attributes VARCHAR NOT NULL DEFAULT ''," + " comment VARCHAR NOT NULL DEFAULT ''" + " )" + ); + + char insert_sql[256]; + snprintf(insert_sql, sizeof(insert_sql), + "INSERT INTO runtime_mysqlx_routes (name, bind, destination_hostgroup, strategy, active) " + "VALUES ('smoke_route', '127.0.0.1:%u', 0, 'first_available', 1)", + TEST_MYSQLX_PORT); + ok(admindb.execute(insert_sql), "seed route row"); + + // Step 4: Start listeners from the runtime routes table. + // We call the worker API directly rather than through plugin start, + // because plugin start needs the service wiring which is complex + // in a unit test. This tests the listener code path directly. + { + // Include the worker header via the plugin header. + extern bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& db); + extern size_t mysqlx_listener_count(); + + ok(mysqlx_start_listeners_from_runtime_routes(admindb), + "start_listeners_from_runtime_routes succeeds"); + ok(mysqlx_listener_count() == 1, "one listener opened"); + } + + // Step 5: TCP connect to the listener port. + // Give the accept thread a moment to start. + usleep(100000); // 100ms + + int fd = socket(AF_INET, SOCK_STREAM, 0); + ok(fd >= 0, "client socket created"); + + sockaddr_in addr {}; + addr.sin_family = AF_INET; + addr.sin_port = htons(TEST_MYSQLX_PORT); + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + + int rc = connect(fd, reinterpret_cast(&addr), sizeof(addr)); + ok(rc == 0, "TCP connect to mysqlx listener on port %u succeeds", TEST_MYSQLX_PORT); + close(fd); + + // Step 6: Clean up. + { + extern void mysqlx_stop_listeners(); + mysqlx_stop_listeners(); + } + + return exit_status(); +} diff --git a/test/tap/tests/unit/Makefile b/test/tap/tests/unit/Makefile index a1fd0f090..65216da6e 100644 --- a/test/tap/tests/unit/Makefile +++ b/test/tap/tests/unit/Makefile @@ -320,7 +320,8 @@ UNIT_TESTS := smoke_test-t query_cache_unit-t query_processor_unit-t \ plugin_registry_unit-t \ test_mysqlx_plugin_load-t \ mysqlx_config_store_unit-t \ - test_mysqlx_admin_tables-t + test_mysqlx_admin_tables-t \ + test_mysqlx_listener_smoke-t .PHONY: all all: $(UNIT_TESTS) @@ -368,6 +369,13 @@ mysqlx_config_store_unit-t: mysqlx_config_store_unit-t.cpp $(PROXYSQL_PATH)/plug $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) $(MYLIBS) \ $(ALLOW_MULTI_DEF) -o $@ +test_mysqlx_listener_smoke-t: ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) $(LIBPROXYSQLAR) mysqlx_plugin_build + $(CXX) ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) \ + -DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \ + -I$(PROXYSQL_PATH)/plugins/mysqlx/include \ + $(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \ + $(MYLIBS) -ldl -lpthread $(ALLOW_MULTI_DEF) -o $@ + test_mysqlx_admin_tables-t: ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o $(LIBPROXYSQLAR) mysqlx_plugin_build $(CXX) ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o \ -DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \