diff --git a/plugins/mysqlx/Makefile b/plugins/mysqlx/Makefile index 5a95f044e..a9140caeb 100644 --- a/plugins/mysqlx/Makefile +++ b/plugins/mysqlx/Makefile @@ -31,7 +31,6 @@ CXXFLAGS := $(STDCPP) -fPIC $(OPTZ) $(WGCOV) $(WASAN) -pthread SRCS := $(PLUGIN_DIR)/src/mysqlx_plugin.cpp \ $(PLUGIN_DIR)/src/mysqlx_admin_schema.cpp \ $(PLUGIN_DIR)/src/mysqlx_config_store.cpp \ - $(PLUGIN_DIR)/src/mysqlx_worker.cpp \ $(PLUGIN_DIR)/src/mysqlx_protocol.cpp \ $(PLUGIN_DIR)/src/mysqlx_frontend_session.cpp \ $(PLUGIN_DIR)/src/mysqlx_backend_session.cpp \ diff --git a/plugins/mysqlx/include/mysqlx_worker.h b/plugins/mysqlx/include/mysqlx_worker.h deleted file mode 100644 index a257fc2de..000000000 --- a/plugins/mysqlx/include/mysqlx_worker.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef PROXYSQL_MYSQLX_WORKER_H -#define PROXYSQL_MYSQLX_WORKER_H - -#include -#include -#include -#include -#include -#include - -struct MysqlxListenerHandle { - int fd { -1 }; - std::string bind_address {}; - uint16_t port { 0 }; - std::string route_name {}; -}; - -class MysqlxWorker { -public: - explicit MysqlxWorker(uint32_t worker_id); - ~MysqlxWorker(); - - MysqlxWorker(const MysqlxWorker&) = delete; - MysqlxWorker& operator=(const MysqlxWorker&) = delete; - - void start(); - void stop(); - bool is_running() const; - void enqueue_client_fd(int fd); - -private: - void run(); - - uint32_t worker_id_; - std::atomic running_ { false }; - std::thread thread_ {}; - std::mutex queue_mutex_ {}; - std::vector pending_fds_ {}; -}; - -// Listener management — called from plugin start/stop -bool mysqlx_start_listeners_from_runtime_routes(class SQLite3DB& admindb); -void mysqlx_stop_listeners(); - -// Returns the number of active listeners (for testing) -size_t mysqlx_listener_count(); - -#endif /* PROXYSQL_MYSQLX_WORKER_H */ diff --git a/plugins/mysqlx/src/mysqlx_worker.cpp b/plugins/mysqlx/src/mysqlx_worker.cpp deleted file mode 100644 index 080707319..000000000 --- a/plugins/mysqlx/src/mysqlx_worker.cpp +++ /dev/null @@ -1,306 +0,0 @@ -#include "mysqlx_worker.h" - -#include "mysqlx_backend_session.h" -#include "mysqlx_frontend_session.h" -#include "mysqlx_plugin.h" -#include "mysqlx_stats.h" -#include "sqlite3db.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace { - -// Global listener state, managed by mysqlx_start/stop_listeners. -std::vector g_listeners {}; -std::vector> g_workers {}; -std::atomic g_accept_running { false }; -std::thread g_accept_thread {}; -std::atomic g_rr_index { 0 }; - -bool parse_bind(const std::string& bind, std::string& host, uint16_t& port) { - if (!bind.empty() && bind[0] == '[') { - auto closing = bind.find(']'); - if (closing != std::string::npos) { - host = bind.substr(1, closing - 1); - port = 33060; - if (closing + 1 < bind.size() && bind[closing + 1] == ':') { - port = static_cast(std::atoi(bind.substr(closing + 2).c_str())); - } - return true; - } - } - auto pos = bind.rfind(':'); - if (pos == std::string::npos || pos == 0 || pos == bind.size() - 1) { - return false; - } - - host = bind.substr(0, pos); - int p = std::atoi(bind.substr(pos + 1).c_str()); - if (p <= 0 || p > 65535) { - return false; - } - - port = static_cast(p); - return true; -} - -int create_listener_socket(const std::string& host, uint16_t port) { - std::string port_str = std::to_string(port); - struct addrinfo hints {}, *result = nullptr; - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - - if (getaddrinfo(host.empty() ? nullptr : host.c_str(), port_str.c_str(), &hints, &result) != 0) { - return -1; - } - - int fd = -1; - for (struct addrinfo* rp = result; rp != nullptr; rp = rp->ai_next) { - fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (fd < 0) continue; - - int opt = 1; - setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); - - int nodelay = 1; - setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); - - if (bind(fd, rp->ai_addr, rp->ai_addrlen) == 0 && listen(fd, 128) == 0) { - break; - } - close(fd); - fd = -1; - } - freeaddrinfo(result); - - return fd; -} - -void accept_loop() { - while (g_accept_running.load()) { - // Build pollfd array from listeners. - std::vector pfds {}; - for (const auto& listener : g_listeners) { - if (listener.fd >= 0) { - pfds.push_back({ listener.fd, POLLIN, 0 }); - } - } - - if (pfds.empty()) { - struct timespec ts { 0, 100000000 }; // 100ms - nanosleep(&ts, nullptr); - continue; - } - - int ready = poll(pfds.data(), static_cast(pfds.size()), 200 /*ms*/); - if (ready <= 0) { - continue; - } - - for (const auto& pfd : pfds) { - if (pfd.revents & POLLIN) { - int client_fd = accept(pfd.fd, nullptr, nullptr); - if (client_fd < 0) { - continue; - } - - int nodelay = 1; - setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); - - if (!g_workers.empty()) { - uint32_t idx = g_rr_index.fetch_add(1, std::memory_order_relaxed) - % static_cast(g_workers.size()); - g_workers[idx]->enqueue_client_fd(client_fd); - } else { - close(client_fd); - } - } - } - } -} - -} // namespace - -// --------------------------------------------------------------------------- -// MysqlxWorker -// --------------------------------------------------------------------------- - -MysqlxWorker::MysqlxWorker(uint32_t worker_id) : worker_id_(worker_id) {} - -MysqlxWorker::~MysqlxWorker() { - stop(); -} - -void MysqlxWorker::start() { - running_.store(true); - thread_ = std::thread(&MysqlxWorker::run, this); -} - -void MysqlxWorker::stop() { - running_.store(false); - if (thread_.joinable()) { - thread_.join(); - } -} - -bool MysqlxWorker::is_running() const { - return running_.load(); -} - -void MysqlxWorker::enqueue_client_fd(int fd) { - std::lock_guard lock(queue_mutex_); - pending_fds_.push_back(fd); -} - -void MysqlxWorker::run() { - while (running_.load()) { - std::vector fds {}; - { - std::lock_guard lock(queue_mutex_); - fds.swap(pending_fds_); - } - - for (int fd : fds) { - MysqlxFrontendSession session(fd); - MysqlxPluginContext& ctx = mysqlx_context(); - - if (ctx.config_store && session.run_handshake_and_auth(*ctx.config_store)) { - // Capture topology generation at session bind time. - // A future GR observer only needs to call bump_topology_generation() - // to trigger re-evaluation on the next session or reconnect. - uint64_t bound_generation = ctx.config_store->topology_generation(); - (void)bound_generation; // Phase 2 seam: compare on reconnect/reroute. - - const MysqlxResolvedIdentity& identity = session.identity(); - MysqlxBackendEndpoint endpoint = ctx.config_store->pick_endpoint(identity.default_route); - - if (endpoint.hostname.empty()) { - mysqlx_send_error(fd, 4000, "No backend endpoint available for route"); - int hg = ctx.config_store->route_hostgroup(identity.default_route); - mysqlx_stats().record_conn_err(identity.default_route, hg); - } else { - MysqlxBackendSession backend; - std::string err {}; - int hg = ctx.config_store->route_hostgroup(identity.default_route); - if (backend.connect(identity, endpoint, err)) { - mysqlx_stats().record_conn_ok(identity.default_route, hg); - mysqlx_stats().record_conn_used(identity.default_route, hg); - backend.relay(fd); - } else { - mysqlx_send_error(fd, 4001, "Backend connection failed: " + err); - mysqlx_stats().record_conn_err(identity.default_route, hg); - } - } - } - close(fd); - } - - if (fds.empty()) { - struct timespec ts { 0, 50000000 }; // 50ms - nanosleep(&ts, nullptr); - } - } -} - -// --------------------------------------------------------------------------- -// Listener management -// --------------------------------------------------------------------------- - -bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& admindb) { - // Read active routes from runtime_mysqlx_routes. - char* error = nullptr; - std::unique_ptr result( - admindb.execute_statement( - "SELECT name, bind FROM runtime_mysqlx_routes WHERE active=1", - &error - ) - ); - if (error != nullptr) { - free(error); - return false; - } - if (!result || result->rows.empty()) { - // No active routes — not an error, just nothing to listen on. - return true; - } - - // Open listeners. - for (auto* row : result->rows) { - if (row == nullptr || row->fields[0] == nullptr || row->fields[1] == nullptr) { - continue; - } - - std::string route_name = row->fields[0]; - std::string bind_str = row->fields[1]; - std::string host {}; - uint16_t port = 0; - - if (!parse_bind(bind_str, host, port)) { - continue; - } - - int fd = create_listener_socket(host, port); - if (fd < 0) { - continue; - } - - MysqlxListenerHandle handle {}; - handle.fd = fd; - handle.bind_address = host; - handle.port = port; - handle.route_name = route_name; - g_listeners.push_back(std::move(handle)); - } - - if (g_listeners.empty()) { - return true; - } - - // Start one worker thread (MVP; scale later). - g_workers.push_back(std::make_unique(0)); - g_workers.back()->start(); - - // Start the accept thread. - g_accept_running.store(true); - g_accept_thread = std::thread(accept_loop); - - return true; -} - -void mysqlx_stop_listeners() { - // Signal the accept loop to stop. - g_accept_running.store(false); - if (g_accept_thread.joinable()) { - g_accept_thread.join(); - } - - // Stop all workers. - for (auto& worker : g_workers) { - worker->stop(); - } - g_workers.clear(); - - // Close all listener sockets. - for (auto& listener : g_listeners) { - if (listener.fd >= 0) { - close(listener.fd); - listener.fd = -1; - } - } - g_listeners.clear(); - g_rr_index = 0; -} - -size_t mysqlx_listener_count() { - return g_listeners.size(); -} diff --git a/test/tap/groups/groups.json b/test/tap/groups/groups.json index 8befa5be2..2dca0db64 100644 --- a/test/tap/groups/groups.json +++ b/test/tap/groups/groups.json @@ -341,7 +341,6 @@ "test_mysql_connect_retries-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], "test_mysql_connect_retries_delay-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], "test_mysqlx_admin_tables-t" : [ "unit-tests-g1" ], - "test_mysqlx_listener_smoke-t" : [ "unit-tests-g1" ], "test_mysqlx_plugin_load-t" : [ "unit-tests-g1" ], "test_mysql_hostgroup_attributes-1-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], "test_mysql_query_digests_stages-t" : [ "legacy-g3","mysql84-g3","mysql-auto_increment_delay_multiplex=0-g3","mysql-multiplexing=false-g3","mysql-query_digests=0-g3","mysql-query_digests_keep_comment=1-g3" ], diff --git a/test/tap/tests/test_mysqlx_listener_smoke-t.cpp b/test/tap/tests/test_mysqlx_listener_smoke-t.cpp deleted file mode 100644 index b4d344bbd..000000000 --- a/test/tap/tests/test_mysqlx_listener_smoke-t.cpp +++ /dev/null @@ -1,186 +0,0 @@ -/** - * test_mysqlx_listener_smoke-t.cpp - * - * Verify that the mysqlx plugin opens TCP listeners when started with - * active routes, and that plain TCP connections succeed. - * - * This is a hybrid unit test: it links against libproxysql.a and loads - * the mysqlx plugin .so without a running ProxySQL daemon. - * - * Stub implementations are provided for session/config/stats symbols - * referenced by MysqlxWorker::run() so that only mysqlx_worker.cpp - * needs to be compiled directly. - */ - -#include -#include -#include -#include - -#include "ProxySQL_PluginManager.h" -#include "sqlite3db.h" -#include "tap.h" - -#include "mysqlx_config_store.h" -#include "mysqlx_frontend_session.h" -#include "mysqlx_backend_session.h" -#include "mysqlx_plugin.h" -#include "mysqlx_stats.h" -#include "mysqlx_protocol.h" - -#ifndef PROXYSQL_MYSQLX_PLUGIN_PATH -#error "PROXYSQL_MYSQLX_PLUGIN_PATH must be defined at compile time" -#endif - -static const uint16_t TEST_MYSQLX_PORT = 16603; -static const uint16_t TEST_MYSQLX_PORT_A = 46101; -static const uint16_t TEST_MYSQLX_PORT_B = 46102; - -extern bool mysqlx_start_listeners_from_runtime_routes(SQLite3DB& db); -extern size_t mysqlx_listener_count(); -extern void mysqlx_stop_listeners(); - -static bool seed_route(SQLite3DB& db, const char* name, const char* bind, int hg) { - char sql[256]; - snprintf(sql, sizeof(sql), - "INSERT INTO runtime_mysqlx_routes (name, bind, destination_hostgroup, strategy, active) " - "VALUES ('%s', '%s', %d, 'first_available', 1)", - name, bind, hg); - return db.execute(sql); -} - -// --------------------------------------------------------------------------- -// Stub implementations for symbols referenced by MysqlxWorker::run(). -// The smoke test only verifies listener lifecycle and TCP connectivity; -// it does not exercise the X Protocol session path. -// --------------------------------------------------------------------------- - -MysqlxFrontendSession::MysqlxFrontendSession(int fd) : client_fd_(fd) {} -MysqlxFrontendSession::~MysqlxFrontendSession() { if (client_fd_ >= 0) close(client_fd_); } -bool MysqlxFrontendSession::run_handshake_and_auth(MysqlxConfigStore&) { return false; } - -MysqlxBackendSession::MysqlxBackendSession() = default; -MysqlxBackendSession::~MysqlxBackendSession() = default; -bool MysqlxBackendSession::connect(const MysqlxResolvedIdentity&, const MysqlxBackendEndpoint&, std::string&) { return false; } -bool MysqlxBackendSession::relay(int) { return false; } - -static MysqlxPluginContext g_stub_context {}; -MysqlxPluginContext& mysqlx_context() { return g_stub_context; } - -static MysqlxStatsStore g_stub_stats; -MysqlxStatsStore& mysqlx_stats() { return g_stub_stats; } -void MysqlxStatsStore::record_conn_ok(const std::string&) {} -void MysqlxStatsStore::record_conn_err(const std::string&) {} - -uint64_t MysqlxConfigStore::topology_generation() const { return 0; } -MysqlxBackendEndpoint MysqlxConfigStore::pick_endpoint(const std::string&) const { return {}; } - -bool mysqlx_send_error(int, uint16_t, const std::string&, const std::string&) { return false; } - -int main() { - plan(15); - - ProxySQL_PluginManager mgr; - std::string err {}; - - ok(mgr.load(PROXYSQL_MYSQLX_PLUGIN_PATH, err), - "load mysqlx plugin succeeds: %s", err.c_str()); - - SQLite3DB admindb; - int dbrc = admindb.open(const_cast(":memory:"), SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE); - ok(dbrc == 0, "admin sqlite opens"); - - ok(mgr.init_all(err), "init_all registers schema: %s", err.c_str()); - - admindb.execute( - "CREATE TABLE IF NOT EXISTS runtime_mysqlx_routes (" - " name VARCHAR NOT NULL PRIMARY KEY," - " bind VARCHAR NOT NULL," - " destination_hostgroup INT NOT NULL," - " fallback_hostgroup INT," - " strategy VARCHAR NOT NULL DEFAULT 'first_available'," - " active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1," - " attributes VARCHAR NOT NULL DEFAULT ''," - " comment VARCHAR NOT NULL DEFAULT ''" - " )" - ); - - // ok 4: No listeners before any start_listeners call. - ok(mysqlx_listener_count() == 0, "listener count is 0 before any listeners start"); - - // ok 5: Start with empty routes table. - { - bool rc = mysqlx_start_listeners_from_runtime_routes(admindb); - ok(rc == true && mysqlx_listener_count() == 0, - "start with empty routes table: succeeds, count stays 0"); - } - mysqlx_stop_listeners(); - - // ok 6-10: Existing single-route test. - ok(seed_route(admindb, "smoke_route", "127.0.0.1:16603", 0), - "seed route row"); - - ok(mysqlx_start_listeners_from_runtime_routes(admindb), - "start_listeners_from_runtime_routes succeeds"); - ok(mysqlx_listener_count() == 1, "one listener opened"); - - usleep(100000); - - int fd = socket(AF_INET, SOCK_STREAM, 0); - ok(fd >= 0, "client socket created"); - - sockaddr_in addr {}; - addr.sin_family = AF_INET; - addr.sin_port = htons(TEST_MYSQLX_PORT); - inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); - - int rc = connect(fd, reinterpret_cast(&addr), sizeof(addr)); - ok(rc == 0, "TCP connect to mysqlx listener on port %u succeeds", TEST_MYSQLX_PORT); - close(fd); - - mysqlx_stop_listeners(); - - // ok 11: Count drops to 0 after stop. - ok(mysqlx_listener_count() == 0, "count drops to 0 after stop"); - - // ok 12: Re-start with two routes on different ports. - admindb.execute("DELETE FROM runtime_mysqlx_routes"); - seed_route(admindb, "route_a", "127.0.0.1:46101", 1); - seed_route(admindb, "route_b", "127.0.0.1:46102", 2); - - mysqlx_start_listeners_from_runtime_routes(admindb); - usleep(100000); - ok(mysqlx_listener_count() == 2, "re-start: two listeners on ports %u and %u", - TEST_MYSQLX_PORT_A, TEST_MYSQLX_PORT_B); - - // ok 13: TCP connect to first listener. - { - int fd2 = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in a {}; - a.sin_family = AF_INET; - a.sin_port = htons(TEST_MYSQLX_PORT_A); - inet_pton(AF_INET, "127.0.0.1", &a.sin_addr); - int r = connect(fd2, reinterpret_cast(&a), sizeof(a)); - ok(r == 0, "TCP connect to listener on port %u succeeds", TEST_MYSQLX_PORT_A); - close(fd2); - } - - // ok 14: TCP connect to second listener. - { - int fd3 = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in b {}; - b.sin_family = AF_INET; - b.sin_port = htons(TEST_MYSQLX_PORT_B); - inet_pton(AF_INET, "127.0.0.1", &b.sin_addr); - int r = connect(fd3, reinterpret_cast(&b), sizeof(b)); - ok(r == 0, "TCP connect to listener on port %u succeeds", TEST_MYSQLX_PORT_B); - close(fd3); - } - - // ok 15: Stop when already stopped. - mysqlx_stop_listeners(); - mysqlx_stop_listeners(); - ok(mysqlx_listener_count() == 0, "stop when no listeners active: no crash, count stays 0"); - - return exit_status(); -} diff --git a/test/tap/tests/unit/Makefile b/test/tap/tests/unit/Makefile index 2b0dab331..974efce90 100644 --- a/test/tap/tests/unit/Makefile +++ b/test/tap/tests/unit/Makefile @@ -326,7 +326,6 @@ UNIT_TESTS := smoke_test-t query_cache_unit-t query_processor_unit-t \ test_mysqlx_plugin_load-t \ mysqlx_config_store_unit-t \ test_mysqlx_admin_tables-t \ - test_mysqlx_listener_smoke-t \ mysqlx_protocol_unit-t \ mysqlx_protocol_socket_unit-t \ mysqlx_route_store_unit-t \ @@ -461,13 +460,6 @@ mysqlx_protocol_socket_unit-t: mysqlx_protocol_socket_unit-t.cpp $(PROXYSQL_PATH $(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \ $(MYLIBS) -lprotobuf -lssl -lcrypto $(ALLOW_MULTI_DEF) -o $@ -test_mysqlx_listener_smoke-t: ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) $(LIBPROXYSQLAR) mysqlx_plugin_build - $(CXX) ../test_mysqlx_listener_smoke-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_worker.cpp $(TEST_HELPERS_OBJ) \ - -DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \ - -I$(PROXYSQL_PATH)/plugins/mysqlx/include \ - $(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \ - $(MYLIBS) -ldl -lpthread $(ALLOW_MULTI_DEF) -o $@ - test_mysqlx_admin_tables-t: ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o $(LIBPROXYSQLAR) mysqlx_plugin_build $(CXX) ../test_mysqlx_admin_tables-t.cpp $(ODIR)/tap.o $(ODIR)/test_globals.o $(ODIR)/test_init.o \ -DPROXYSQL_MYSQLX_PLUGIN_PATH=\"$(MYSQLX_PLUGIN_SO)\" \