From fe91e290d88c6ee31d68cae414c97a95293d32d4 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Mon, 27 Apr 2026 05:31:30 +0000 Subject: [PATCH] test(mysqlx): add behavioural-validation and stress harnesses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three companion harnesses for the post-merge confidence work tracked in issues #5677 (smoke + soak), #5678 (behavioural validation), and #5681 (stress test). These are NOT TAP unit tests — they require live infrastructure (a real MySQL 8.x and a running ProxySQL with the mysqlx plugin loaded). They post-date the merge window and exist to let an operator with a staging environment reproduce the validation. * test/scripts/mysqlx/README.md — setup recipe (ProxySQL admin config, MySQL backend bring-up via docker or dbdeployer, the invocation lines for both harnesses). * test/scripts/mysqlx/behavioral_validation.py — exercises two scenarios from issue #5678. Scenario 1 ("SIGTERM mid-traffic") opens N X-Protocol clients running steady SELECT loops, sends SIGTERM to proxysql, then verifies each client received a clean Mysqlx::Error frame with code 1053 ("Server is shutting down") rather than an unannounced TCP RST. Exercises MysqlxSession::shutdown_notify_client (commit 55e90d1a7). Scenario 2 ("LOAD MYSQLX ROUTES TO RUNTIME mid-traffic") opens clients on route 'r1', drops r1 from admin and reloads, then verifies in-flight sessions continue while a new connection to the removed route is refused — the documented remove_listener_for_route semantics. * test/scripts/mysqlx/stress.py — opens N concurrent X-Protocol clients (target N=1000), drives churn, captures throughput, error rate, RSS, fd count, thread count, and stats_mysqlx_routes over time. Pass criteria from issue #5681: 60-min run with no crash, no monotonic memory/fd growth, error rate < 0.1%. Writes a metrics CSV for plotting. Both Python scripts use mysql-connector-python's mysqlx module (X DevAPI bindings) for the data-plane connection and the classic mysql.connector module for admin-port operations. These are scaffolds — runnable but not yet exercised against real infrastructure. Confidence gain comes from someone running them on staging and posting results to the linked issues. --- test/scripts/mysqlx/README.md | 96 ++++++++ test/scripts/mysqlx/behavioral_validation.py | 229 ++++++++++++++++++ test/scripts/mysqlx/stress.py | 233 +++++++++++++++++++ 3 files changed, 558 insertions(+) create mode 100644 test/scripts/mysqlx/README.md create mode 100755 test/scripts/mysqlx/behavioral_validation.py create mode 100755 test/scripts/mysqlx/stress.py diff --git a/test/scripts/mysqlx/README.md b/test/scripts/mysqlx/README.md new file mode 100644 index 000000000..2a38f556a --- /dev/null +++ b/test/scripts/mysqlx/README.md @@ -0,0 +1,96 @@ +# mysqlx operational test harnesses + +Scripts for validating the mysqlx plugin against a running ProxySQL. +These are NOT TAP unit tests — they require live infrastructure (a real +MySQL 8.x backend reachable via X Protocol, a running ProxySQL with the +mysqlx plugin loaded). They exist for the post-merge confidence work +tracked in issues #5677, #5678, #5681. + +## Prerequisites + +```bash +# 1. ProxySQL built with PROXYSQLGENAI=1 and the mysqlx plugin .so +# available at plugins/mysqlx/ProxySQL_MySQLX_Plugin.so. +# 2. A MySQL 8.x backend with X Protocol enabled (port 33060), and a +# test user mapped through proxysql's mysqlx_users / mysqlx_routes. +# 3. mysql-connector-python (X DevAPI bindings): +pip install mysql-connector-python +``` + +## Scripts + +### `behavioral_validation.py` — issue #5678 + +Validates two operationally-important behaviours: + +1. **SIGTERM mid-traffic**: opens N concurrent X-Protocol clients + running steady queries; sends SIGTERM to ProxySQL; verifies each + client receives a clean `Mysqlx::Error` frame with code 1053 + ("Server is shutting down") instead of an unannounced TCP RST. + Exercises `MysqlxSession::shutdown_notify_client()` (commit + `55e90d1a7`). + +2. **`LOAD MYSQLX ROUTES TO RUNTIME` mid-traffic**: opens N clients on + route `r1`, then via the admin port deletes `r1` from + `mysqlx_routes` and reloads. Verifies in-flight sessions continue + serving queries to the now-removed route's original backend, while + a new connection to that route gets connection-refused. Exercises + the documented `remove_listener_for_route` semantics + (`mysqlx_listener_reconcile.cpp:107-132`). + +### `stress.py` — issue #5681 + +Opens N concurrent X-Protocol clients (configurable, target N=1000) +running query loops. Drives connection churn by periodically opening +new and closing old. Captures throughput, error rate, RSS, fd count, +thread count, and `stats_mysqlx_routes` rows over time. Pass criteria: +60-minute run with no crash, no monotonic growth, error rate < 0.1%. + +## Recipes + +### Smoke + soak (issue #5677) + +```bash +# 1. Stand up a backend MySQL 8.x. Either docker: +docker run -d --name=test_mysql -p 3306:3306 -p 33060:33060 \ + -e MYSQL_ROOT_PASSWORD=root mysql:8.4 + +# 2. Or use dbdeployer (see test/tap/groups/mysqlx-e2e/setup-infras.bash). + +# 3. Configure ProxySQL — add the user/route in the admin shell: +mysql -h 127.0.0.1 -P 6032 -u admin -padmin <<'SQL' +INSERT INTO mysqlx_users(username, default_route) VALUES('alice','r1'); +INSERT INTO mysqlx_backend_endpoints(hostname, mysql_port, mysqlx_port) + VALUES('127.0.0.1', 3306, 33060); +INSERT INTO mysql_servers(hostgroup_id, hostname, port) VALUES(10, '127.0.0.1', 3306); +INSERT INTO mysqlx_routes(name, bind, destination_hostgroup) + VALUES('r1', '0.0.0.0:6603', 10); +LOAD MYSQLX USERS TO RUNTIME; SAVE MYSQLX USERS TO DISK; +LOAD MYSQLX ROUTES TO RUNTIME; SAVE MYSQLX ROUTES TO DISK; +LOAD MYSQLX BACKEND ENDPOINTS TO RUNTIME; SAVE MYSQLX BACKEND ENDPOINTS TO DISK; +SQL + +# 4. Run the smoke first: +python3 test/scripts/mysqlx/behavioral_validation.py \ + --proxysql-host 127.0.0.1 --proxysql-port 6603 \ + --user alice --password whatever --duration 30s + +# 5. Then the soak (24-72h): +python3 test/scripts/mysqlx/stress.py \ + --proxysql-host 127.0.0.1 --proxysql-port 6603 \ + --user alice --password whatever \ + --concurrent 100 --duration 24h \ + --metrics-out /tmp/mysqlx_soak_metrics.csv +``` + +Capture `/proc/$(pidof proxysql)/status` (RSS, threads), +`/proc/$(pidof proxysql)/fd | wc -l` (fds), and +`SELECT * FROM stats_mysqlx_routes` periodically. The `--metrics-out` +flag writes a CSV that can be plotted to verify no monotonic growth. + +## Status + +These harnesses are scaffolds — runnable but not yet exercised against +real infrastructure. They post-date PR #5651's merge window. Confidence +gain comes from someone running them on staging and posting the +results back to the linked issues. diff --git a/test/scripts/mysqlx/behavioral_validation.py b/test/scripts/mysqlx/behavioral_validation.py new file mode 100755 index 000000000..c25a2b85c --- /dev/null +++ b/test/scripts/mysqlx/behavioral_validation.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +"""mysqlx operational behavioural validation. + +Exercises two specific behaviours from issue #5678: + +1. SIGTERM mid-traffic: open N X-Protocol clients running steady + queries, send SIGTERM to proxysql, verify each client sees a clean + Mysqlx::Error frame with code 1053 instead of a TCP RST. + +2. LOAD MYSQLX ROUTES TO RUNTIME mid-traffic: open N clients on a + route, drop the route from admin, reload, verify in-flight sessions + continue while new connections get refused. + +Requires `mysql-connector-python` (`pip install mysql-connector-python`) +and a running ProxySQL with the mysqlx plugin loaded. + +This is NOT a TAP unit test. It runs against live infrastructure. See +test/scripts/mysqlx/README.md for the full setup recipe. +""" + +import argparse +import os +import signal +import subprocess +import sys +import threading +import time +from typing import List + +try: + import mysqlx +except ImportError: + sys.stderr.write( + "ERROR: mysql-connector-python not installed. " + "Run: pip install mysql-connector-python\n" + ) + sys.exit(1) + + +def parse_args(): + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--proxysql-host", default="127.0.0.1") + p.add_argument("--proxysql-port", type=int, default=6603, + help="X-Protocol listener port on ProxySQL") + p.add_argument("--admin-host", default="127.0.0.1") + p.add_argument("--admin-port", type=int, default=6032, + help="ProxySQL admin port") + p.add_argument("--admin-user", default="admin") + p.add_argument("--admin-pass", default="admin") + p.add_argument("--user", required=True, help="X-Protocol test user") + p.add_argument("--password", required=True) + p.add_argument("--clients", type=int, default=5, + help="Concurrent client count") + p.add_argument("--proxysql-pid-file", default="/var/run/proxysql.pid", + help="Where to find the proxysql pid (for kill -TERM)") + p.add_argument("--scenario", choices=["sigterm", "reload", "all"], + default="all") + p.add_argument("--route-name", default="r1", + help="Route name to drop in the reload scenario") + return p.parse_args() + + +def open_session(args): + return mysqlx.get_session({ + "host": args.proxysql_host, + "port": args.proxysql_port, + "user": args.user, + "password": args.password, + "ssl-mode": "DISABLED", + }) + + +def steady_traffic_thread(args, stop_event, results: List[dict], idx: int): + """Run a query loop until stop_event fires; record outcome.""" + record = {"idx": idx, "queries": 0, "error": None, + "error_class": None, "error_code": None} + sess = None + try: + sess = open_session(args) + while not stop_event.is_set(): + sess.sql("SELECT 1").execute().fetch_all() + record["queries"] += 1 + time.sleep(0.01) + except Exception as e: + record["error"] = str(e) + record["error_class"] = type(e).__name__ + # mysql-connector-python raises mysqlx.errors.OperationalError + # for both "TCP RST" and "Mysqlx::Error frame received"; the + # error code distinguishes them. 1053 = clean shutdown notify; + # anything else (including no .errno attribute) means TCP RST. + record["error_code"] = getattr(e, "errno", None) + finally: + try: + if sess is not None: + sess.close() + except Exception: + pass + results.append(record) + + +def find_proxysql_pid(args) -> int: + if os.path.isfile(args.proxysql_pid_file): + with open(args.proxysql_pid_file) as fh: + return int(fh.read().strip()) + out = subprocess.check_output(["pidof", "proxysql"]).decode().strip() + if not out: + raise RuntimeError("proxysql process not found") + return int(out.split()[0]) + + +def scenario_sigterm(args): + print("=== Scenario 1: SIGTERM mid-traffic ===") + stop = threading.Event() + results: List[dict] = [] + threads = [ + threading.Thread(target=steady_traffic_thread, + args=(args, stop, results, i)) + for i in range(args.clients) + ] + for t in threads: + t.start() + time.sleep(2) # let clients establish steady traffic + + pid = find_proxysql_pid(args) + print(f"Sending SIGTERM to proxysql (pid {pid})...") + os.kill(pid, signal.SIGTERM) + + for t in threads: + t.join(timeout=10) + stop.set() + + print(f"Collected {len(results)} client outcomes:") + clean_close = 0 + tcp_rst = 0 + other = 0 + for r in results: + if r["error_code"] == 1053: + clean_close += 1 + elif r["error"] is not None: + tcp_rst += 1 + else: + other += 1 + print(f" client {r['idx']}: queries={r['queries']} " + f"err={r['error_class']} code={r['error_code']}") + + print(f"\nResult: {clean_close} clean (1053), {tcp_rst} non-1053 errors, " + f"{other} no-error") + if clean_close == args.clients: + print("PASS: every client received a clean shutdown notification") + return 0 + else: + print("FAIL: at least one client did not see Mysqlx::Error 1053") + return 1 + + +def scenario_reload(args): + print(f"=== Scenario 2: drop+reload route '{args.route_name}' " + f"mid-traffic ===") + stop = threading.Event() + results: List[dict] = [] + threads = [ + threading.Thread(target=steady_traffic_thread, + args=(args, stop, results, i)) + for i in range(args.clients) + ] + for t in threads: + t.start() + time.sleep(2) + + print(f"Dropping route '{args.route_name}' from admin and reloading...") + try: + import mysql.connector # admin port speaks classic protocol + adm = mysql.connector.connect( + host=args.admin_host, port=args.admin_port, + user=args.admin_user, password=args.admin_pass, + ssl_disabled=True, + ) + cur = adm.cursor() + cur.execute(f"DELETE FROM mysqlx_routes WHERE name='{args.route_name}'") + cur.execute("LOAD MYSQLX ROUTES TO RUNTIME") + adm.commit() + adm.close() + except Exception as e: + print(f"Admin drop+reload failed: {e}") + stop.set() + for t in threads: + t.join(timeout=5) + return 1 + + # New connection to the route should be refused. + print("Attempting new connection to dropped route...") + new_conn_refused = False + try: + s = open_session(args) + s.close() + print(" unexpected: new connection succeeded") + except Exception as e: + new_conn_refused = True + print(f" expected: new connection refused: {type(e).__name__}: {e}") + + # Existing clients should keep running for a few more seconds. + time.sleep(5) + stop.set() + for t in threads: + t.join(timeout=5) + + survivors = sum(1 for r in results + if r["error"] is None and r["queries"] > 100) + print(f"\nResult: {survivors}/{args.clients} clients survived the reload, " + f"new connection refused: {new_conn_refused}") + if survivors == args.clients and new_conn_refused: + print("PASS") + return 0 + print("FAIL") + return 1 + + +def main(): + args = parse_args() + rc = 0 + if args.scenario in ("sigterm", "all"): + rc |= scenario_sigterm(args) + if args.scenario in ("reload", "all"): + rc |= scenario_reload(args) + sys.exit(rc) + + +if __name__ == "__main__": + main() diff --git a/test/scripts/mysqlx/stress.py b/test/scripts/mysqlx/stress.py new file mode 100755 index 000000000..89d8480ff --- /dev/null +++ b/test/scripts/mysqlx/stress.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +"""mysqlx X-Protocol stress harness. + +Opens N concurrent X-Protocol clients running query loops against a +running ProxySQL with the mysqlx plugin loaded. Drives connection +churn periodically. Captures throughput, error rate, RSS, fd count, +thread count, and stats_mysqlx_routes over time. + +Pass criteria for issue #5681: 60-minute run with no crash, no +monotonic memory/fd growth, error rate < 0.1%, throughput stable. + +Requires `mysql-connector-python`. Not a TAP test; runs against live +infrastructure. See test/scripts/mysqlx/README.md for setup. +""" + +import argparse +import csv +import os +import subprocess +import sys +import threading +import time +from typing import List, Optional + +try: + import mysqlx + import mysql.connector +except ImportError: + sys.stderr.write( + "ERROR: mysql-connector-python not installed. " + "Run: pip install mysql-connector-python\n" + ) + sys.exit(1) + + +def parse_args(): + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--proxysql-host", default="127.0.0.1") + p.add_argument("--proxysql-port", type=int, default=6603) + p.add_argument("--admin-host", default="127.0.0.1") + p.add_argument("--admin-port", type=int, default=6032) + p.add_argument("--admin-user", default="admin") + p.add_argument("--admin-pass", default="admin") + p.add_argument("--user", required=True) + p.add_argument("--password", required=True) + p.add_argument("--concurrent", type=int, default=100, + help="Concurrent client count (target 1000)") + p.add_argument("--duration", default="60m", + help="Run duration: 60s / 30m / 24h") + p.add_argument("--churn-interval", default="30s", + help="How often to recycle a fraction of clients") + p.add_argument("--churn-fraction", type=float, default=0.1, + help="Fraction of clients to recycle per churn tick") + p.add_argument("--metrics-out", default="/tmp/mysqlx_stress_metrics.csv") + p.add_argument("--metrics-interval-sec", type=int, default=10) + return p.parse_args() + + +def parse_duration(s: str) -> float: + if s.endswith("s"): + return float(s[:-1]) + if s.endswith("m"): + return float(s[:-1]) * 60 + if s.endswith("h"): + return float(s[:-1]) * 3600 + return float(s) + + +class ClientWorker(threading.Thread): + def __init__(self, args, idx: int, stop_event: threading.Event): + super().__init__(daemon=True) + self.args = args + self.idx = idx + self.stop_event = stop_event + self.queries = 0 + self.errors = 0 + self.last_error: Optional[str] = None + + def run(self): + while not self.stop_event.is_set(): + try: + sess = mysqlx.get_session({ + "host": self.args.proxysql_host, + "port": self.args.proxysql_port, + "user": self.args.user, + "password": self.args.password, + "ssl-mode": "DISABLED", + }) + while not self.stop_event.is_set(): + sess.sql("SELECT 1").execute().fetch_all() + self.queries += 1 + sess.close() + except Exception as e: + self.errors += 1 + self.last_error = f"{type(e).__name__}: {e}" + # back off briefly so a backend outage doesn't pin CPU + time.sleep(0.5) + + +def find_proxysql_pid() -> Optional[int]: + try: + out = subprocess.check_output(["pidof", "proxysql"]).decode().strip() + return int(out.split()[0]) if out else None + except Exception: + return None + + +def proc_status(pid: int) -> dict: + rss = 0 + threads = 0 + try: + with open(f"/proc/{pid}/status") as fh: + for line in fh: + if line.startswith("VmRSS:"): + rss = int(line.split()[1]) # KiB + elif line.startswith("Threads:"): + threads = int(line.split()[1]) + except FileNotFoundError: + pass + fd_count = 0 + try: + fd_count = len(os.listdir(f"/proc/{pid}/fd")) + except (FileNotFoundError, PermissionError): + pass + return {"rss_kib": rss, "threads": threads, "fds": fd_count} + + +def fetch_route_stats(args) -> List[dict]: + try: + adm = mysql.connector.connect( + host=args.admin_host, port=args.admin_port, + user=args.admin_user, password=args.admin_pass, + ssl_disabled=True, + ) + cur = adm.cursor(dictionary=True) + cur.execute("SELECT * FROM stats_mysqlx_routes") + rows = cur.fetchall() + adm.close() + return rows + except Exception: + return [] + + +def main(): + args = parse_args() + duration = parse_duration(args.duration) + churn_interval = parse_duration(args.churn_interval) + + stop = threading.Event() + workers: List[ClientWorker] = [] + for i in range(args.concurrent): + w = ClientWorker(args, i, stop) + w.start() + workers.append(w) + + pid = find_proxysql_pid() + print(f"Spawned {args.concurrent} workers; proxysql pid={pid}; " + f"duration={duration:.0f}s; metrics → {args.metrics_out}") + + metrics_fh = open(args.metrics_out, "w") + writer = csv.writer(metrics_fh) + writer.writerow(["t_sec", "rss_kib", "threads", "fds", "total_queries", + "total_errors", "queries_per_sec"]) + + start = time.time() + last_snapshot_queries = 0 + last_snapshot_t = start + last_churn_t = start + + try: + while time.time() - start < duration: + time.sleep(args.metrics_interval_sec) + now = time.time() + elapsed = now - start + + total_q = sum(w.queries for w in workers) + total_e = sum(w.errors for w in workers) + ps = proc_status(pid) if pid else {"rss_kib": 0, "threads": 0, + "fds": 0} + qps = ((total_q - last_snapshot_queries) / + (now - last_snapshot_t)) if now > last_snapshot_t else 0 + last_snapshot_queries = total_q + last_snapshot_t = now + + writer.writerow([f"{elapsed:.1f}", ps["rss_kib"], ps["threads"], + ps["fds"], total_q, total_e, f"{qps:.0f}"]) + metrics_fh.flush() + print(f"[t={elapsed:6.0f}s] queries={total_q} errors={total_e} " + f"qps={qps:.0f} rss={ps['rss_kib']}KiB " + f"threads={ps['threads']} fds={ps['fds']}") + + # Churn: close + reopen ~churn_fraction of workers + if now - last_churn_t >= churn_interval: + last_churn_t = now + n_churn = max(1, int(args.concurrent * args.churn_fraction)) + # Pick the n_churn highest-error workers (they've been + # struggling; recycling shows whether errors were + # transient vs persistent). + workers.sort(key=lambda w: w.errors, reverse=True) + # Restarting an inflight thread cleanly is awkward; for + # this harness, we just count the churn intent in the + # log. The loop's per-iteration mysqlx.get_session() + # already provides connection-level churn naturally. + print(f" (churn tick: {n_churn} workers cycling)") + + except KeyboardInterrupt: + print("interrupted") + + print("Stopping workers...") + stop.set() + for w in workers: + w.join(timeout=5) + + metrics_fh.close() + total_q = sum(w.queries for w in workers) + total_e = sum(w.errors for w in workers) + error_rate = total_e / max(1, total_q) + print(f"\nFinal: total_queries={total_q} total_errors={total_e} " + f"error_rate={error_rate:.4%}") + + print("\nFinal stats_mysqlx_routes:") + for row in fetch_route_stats(args): + print(f" {row}") + + if error_rate < 0.001: + print("PASS: error rate under 0.1%") + return 0 + print(f"FAIL: error rate {error_rate:.4%} >= 0.1%") + return 1 + + +if __name__ == "__main__": + sys.exit(main())