mirror of https://github.com/sysown/proxysql
Three companion harnesses for the post-merge confidence work tracked
in issues #5677 (smoke + soak), #5678 (behavioural validation), and
#5681 (stress test). These are NOT TAP unit tests — they require live
infrastructure (a real MySQL 8.x and a running ProxySQL with the
mysqlx plugin loaded). They post-date the merge window and exist to
let an operator with a staging environment reproduce the validation.
* test/scripts/mysqlx/README.md — setup recipe (ProxySQL admin
config, MySQL backend bring-up via docker or dbdeployer, the
invocation lines for both harnesses).
* test/scripts/mysqlx/behavioral_validation.py — exercises two
scenarios from issue #5678. Scenario 1 ("SIGTERM mid-traffic")
opens N X-Protocol clients running steady SELECT loops, sends
SIGTERM to proxysql, then verifies each client received a clean
Mysqlx::Error frame with code 1053 ("Server is shutting down")
rather than an unannounced TCP RST. Exercises
MysqlxSession::shutdown_notify_client (commit 55e90d1a7).
Scenario 2 ("LOAD MYSQLX ROUTES TO RUNTIME mid-traffic") opens
clients on route 'r1', drops r1 from admin and reloads, then
verifies in-flight sessions continue while a new connection to the
removed route is refused — the documented
remove_listener_for_route semantics.
* test/scripts/mysqlx/stress.py — opens N concurrent X-Protocol
clients (target N=1000), drives churn, captures throughput, error
rate, RSS, fd count, thread count, and stats_mysqlx_routes over
time. Pass criteria from issue #5681: 60-min run with no crash, no
monotonic memory/fd growth, error rate < 0.1%. Writes a metrics CSV
for plotting.
Both Python scripts use mysql-connector-python's mysqlx module (X
DevAPI bindings) for the data-plane connection and the classic
mysql.connector module for admin-port operations.
These are scaffolds — runnable but not yet exercised against real
infrastructure. Confidence gain comes from someone running them on
staging and posting results to the linked issues.
fix/test-mysqlx-plugin-load-phase-b
parent
83725ea4e5
commit
fe91e290d8
@ -0,0 +1,96 @@
|
||||
# mysqlx operational test harnesses
|
||||
|
||||
Scripts for validating the mysqlx plugin against a running ProxySQL.
|
||||
These are NOT TAP unit tests — they require live infrastructure (a real
|
||||
MySQL 8.x backend reachable via X Protocol, a running ProxySQL with the
|
||||
mysqlx plugin loaded). They exist for the post-merge confidence work
|
||||
tracked in issues #5677, #5678, #5681.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
```bash
|
||||
# 1. ProxySQL built with PROXYSQLGENAI=1 and the mysqlx plugin .so
|
||||
# available at plugins/mysqlx/ProxySQL_MySQLX_Plugin.so.
|
||||
# 2. A MySQL 8.x backend with X Protocol enabled (port 33060), and a
|
||||
# test user mapped through proxysql's mysqlx_users / mysqlx_routes.
|
||||
# 3. mysql-connector-python (X DevAPI bindings):
|
||||
pip install mysql-connector-python
|
||||
```
|
||||
|
||||
## Scripts
|
||||
|
||||
### `behavioral_validation.py` — issue #5678
|
||||
|
||||
Validates two operationally-important behaviours:
|
||||
|
||||
1. **SIGTERM mid-traffic**: opens N concurrent X-Protocol clients
|
||||
running steady queries; sends SIGTERM to ProxySQL; verifies each
|
||||
client receives a clean `Mysqlx::Error` frame with code 1053
|
||||
("Server is shutting down") instead of an unannounced TCP RST.
|
||||
Exercises `MysqlxSession::shutdown_notify_client()` (commit
|
||||
`55e90d1a7`).
|
||||
|
||||
2. **`LOAD MYSQLX ROUTES TO RUNTIME` mid-traffic**: opens N clients on
|
||||
route `r1`, then via the admin port deletes `r1` from
|
||||
`mysqlx_routes` and reloads. Verifies in-flight sessions continue
|
||||
serving queries to the now-removed route's original backend, while
|
||||
a new connection to that route gets connection-refused. Exercises
|
||||
the documented `remove_listener_for_route` semantics
|
||||
(`mysqlx_listener_reconcile.cpp:107-132`).
|
||||
|
||||
### `stress.py` — issue #5681
|
||||
|
||||
Opens N concurrent X-Protocol clients (configurable, target N=1000)
|
||||
running query loops. Drives connection churn by periodically opening
|
||||
new and closing old. Captures throughput, error rate, RSS, fd count,
|
||||
thread count, and `stats_mysqlx_routes` rows over time. Pass criteria:
|
||||
60-minute run with no crash, no monotonic growth, error rate < 0.1%.
|
||||
|
||||
## Recipes
|
||||
|
||||
### Smoke + soak (issue #5677)
|
||||
|
||||
```bash
|
||||
# 1. Stand up a backend MySQL 8.x. Either docker:
|
||||
docker run -d --name=test_mysql -p 3306:3306 -p 33060:33060 \
|
||||
-e MYSQL_ROOT_PASSWORD=root mysql:8.4
|
||||
|
||||
# 2. Or use dbdeployer (see test/tap/groups/mysqlx-e2e/setup-infras.bash).
|
||||
|
||||
# 3. Configure ProxySQL — add the user/route in the admin shell:
|
||||
mysql -h 127.0.0.1 -P 6032 -u admin -padmin <<'SQL'
|
||||
INSERT INTO mysqlx_users(username, default_route) VALUES('alice','r1');
|
||||
INSERT INTO mysqlx_backend_endpoints(hostname, mysql_port, mysqlx_port)
|
||||
VALUES('127.0.0.1', 3306, 33060);
|
||||
INSERT INTO mysql_servers(hostgroup_id, hostname, port) VALUES(10, '127.0.0.1', 3306);
|
||||
INSERT INTO mysqlx_routes(name, bind, destination_hostgroup)
|
||||
VALUES('r1', '0.0.0.0:6603', 10);
|
||||
LOAD MYSQLX USERS TO RUNTIME; SAVE MYSQLX USERS TO DISK;
|
||||
LOAD MYSQLX ROUTES TO RUNTIME; SAVE MYSQLX ROUTES TO DISK;
|
||||
LOAD MYSQLX BACKEND ENDPOINTS TO RUNTIME; SAVE MYSQLX BACKEND ENDPOINTS TO DISK;
|
||||
SQL
|
||||
|
||||
# 4. Run the smoke first:
|
||||
python3 test/scripts/mysqlx/behavioral_validation.py \
|
||||
--proxysql-host 127.0.0.1 --proxysql-port 6603 \
|
||||
--user alice --password whatever --duration 30s
|
||||
|
||||
# 5. Then the soak (24-72h):
|
||||
python3 test/scripts/mysqlx/stress.py \
|
||||
--proxysql-host 127.0.0.1 --proxysql-port 6603 \
|
||||
--user alice --password whatever \
|
||||
--concurrent 100 --duration 24h \
|
||||
--metrics-out /tmp/mysqlx_soak_metrics.csv
|
||||
```
|
||||
|
||||
Capture `/proc/$(pidof proxysql)/status` (RSS, threads),
|
||||
`/proc/$(pidof proxysql)/fd | wc -l` (fds), and
|
||||
`SELECT * FROM stats_mysqlx_routes` periodically. The `--metrics-out`
|
||||
flag writes a CSV that can be plotted to verify no monotonic growth.
|
||||
|
||||
## Status
|
||||
|
||||
These harnesses are scaffolds — runnable but not yet exercised against
|
||||
real infrastructure. They post-date PR #5651's merge window. Confidence
|
||||
gain comes from someone running them on staging and posting the
|
||||
results back to the linked issues.
|
||||
@ -0,0 +1,229 @@
|
||||
#!/usr/bin/env python3
|
||||
"""mysqlx operational behavioural validation.
|
||||
|
||||
Exercises two specific behaviours from issue #5678:
|
||||
|
||||
1. SIGTERM mid-traffic: open N X-Protocol clients running steady
|
||||
queries, send SIGTERM to proxysql, verify each client sees a clean
|
||||
Mysqlx::Error frame with code 1053 instead of a TCP RST.
|
||||
|
||||
2. LOAD MYSQLX ROUTES TO RUNTIME mid-traffic: open N clients on a
|
||||
route, drop the route from admin, reload, verify in-flight sessions
|
||||
continue while new connections get refused.
|
||||
|
||||
Requires `mysql-connector-python` (`pip install mysql-connector-python`)
|
||||
and a running ProxySQL with the mysqlx plugin loaded.
|
||||
|
||||
This is NOT a TAP unit test. It runs against live infrastructure. See
|
||||
test/scripts/mysqlx/README.md for the full setup recipe.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
try:
|
||||
import mysqlx
|
||||
except ImportError:
|
||||
sys.stderr.write(
|
||||
"ERROR: mysql-connector-python not installed. "
|
||||
"Run: pip install mysql-connector-python\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("--proxysql-host", default="127.0.0.1")
|
||||
p.add_argument("--proxysql-port", type=int, default=6603,
|
||||
help="X-Protocol listener port on ProxySQL")
|
||||
p.add_argument("--admin-host", default="127.0.0.1")
|
||||
p.add_argument("--admin-port", type=int, default=6032,
|
||||
help="ProxySQL admin port")
|
||||
p.add_argument("--admin-user", default="admin")
|
||||
p.add_argument("--admin-pass", default="admin")
|
||||
p.add_argument("--user", required=True, help="X-Protocol test user")
|
||||
p.add_argument("--password", required=True)
|
||||
p.add_argument("--clients", type=int, default=5,
|
||||
help="Concurrent client count")
|
||||
p.add_argument("--proxysql-pid-file", default="/var/run/proxysql.pid",
|
||||
help="Where to find the proxysql pid (for kill -TERM)")
|
||||
p.add_argument("--scenario", choices=["sigterm", "reload", "all"],
|
||||
default="all")
|
||||
p.add_argument("--route-name", default="r1",
|
||||
help="Route name to drop in the reload scenario")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def open_session(args):
|
||||
return mysqlx.get_session({
|
||||
"host": args.proxysql_host,
|
||||
"port": args.proxysql_port,
|
||||
"user": args.user,
|
||||
"password": args.password,
|
||||
"ssl-mode": "DISABLED",
|
||||
})
|
||||
|
||||
|
||||
def steady_traffic_thread(args, stop_event, results: List[dict], idx: int):
|
||||
"""Run a query loop until stop_event fires; record outcome."""
|
||||
record = {"idx": idx, "queries": 0, "error": None,
|
||||
"error_class": None, "error_code": None}
|
||||
sess = None
|
||||
try:
|
||||
sess = open_session(args)
|
||||
while not stop_event.is_set():
|
||||
sess.sql("SELECT 1").execute().fetch_all()
|
||||
record["queries"] += 1
|
||||
time.sleep(0.01)
|
||||
except Exception as e:
|
||||
record["error"] = str(e)
|
||||
record["error_class"] = type(e).__name__
|
||||
# mysql-connector-python raises mysqlx.errors.OperationalError
|
||||
# for both "TCP RST" and "Mysqlx::Error frame received"; the
|
||||
# error code distinguishes them. 1053 = clean shutdown notify;
|
||||
# anything else (including no .errno attribute) means TCP RST.
|
||||
record["error_code"] = getattr(e, "errno", None)
|
||||
finally:
|
||||
try:
|
||||
if sess is not None:
|
||||
sess.close()
|
||||
except Exception:
|
||||
pass
|
||||
results.append(record)
|
||||
|
||||
|
||||
def find_proxysql_pid(args) -> int:
|
||||
if os.path.isfile(args.proxysql_pid_file):
|
||||
with open(args.proxysql_pid_file) as fh:
|
||||
return int(fh.read().strip())
|
||||
out = subprocess.check_output(["pidof", "proxysql"]).decode().strip()
|
||||
if not out:
|
||||
raise RuntimeError("proxysql process not found")
|
||||
return int(out.split()[0])
|
||||
|
||||
|
||||
def scenario_sigterm(args):
|
||||
print("=== Scenario 1: SIGTERM mid-traffic ===")
|
||||
stop = threading.Event()
|
||||
results: List[dict] = []
|
||||
threads = [
|
||||
threading.Thread(target=steady_traffic_thread,
|
||||
args=(args, stop, results, i))
|
||||
for i in range(args.clients)
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
time.sleep(2) # let clients establish steady traffic
|
||||
|
||||
pid = find_proxysql_pid(args)
|
||||
print(f"Sending SIGTERM to proxysql (pid {pid})...")
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
|
||||
for t in threads:
|
||||
t.join(timeout=10)
|
||||
stop.set()
|
||||
|
||||
print(f"Collected {len(results)} client outcomes:")
|
||||
clean_close = 0
|
||||
tcp_rst = 0
|
||||
other = 0
|
||||
for r in results:
|
||||
if r["error_code"] == 1053:
|
||||
clean_close += 1
|
||||
elif r["error"] is not None:
|
||||
tcp_rst += 1
|
||||
else:
|
||||
other += 1
|
||||
print(f" client {r['idx']}: queries={r['queries']} "
|
||||
f"err={r['error_class']} code={r['error_code']}")
|
||||
|
||||
print(f"\nResult: {clean_close} clean (1053), {tcp_rst} non-1053 errors, "
|
||||
f"{other} no-error")
|
||||
if clean_close == args.clients:
|
||||
print("PASS: every client received a clean shutdown notification")
|
||||
return 0
|
||||
else:
|
||||
print("FAIL: at least one client did not see Mysqlx::Error 1053")
|
||||
return 1
|
||||
|
||||
|
||||
def scenario_reload(args):
|
||||
print(f"=== Scenario 2: drop+reload route '{args.route_name}' "
|
||||
f"mid-traffic ===")
|
||||
stop = threading.Event()
|
||||
results: List[dict] = []
|
||||
threads = [
|
||||
threading.Thread(target=steady_traffic_thread,
|
||||
args=(args, stop, results, i))
|
||||
for i in range(args.clients)
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
time.sleep(2)
|
||||
|
||||
print(f"Dropping route '{args.route_name}' from admin and reloading...")
|
||||
try:
|
||||
import mysql.connector # admin port speaks classic protocol
|
||||
adm = mysql.connector.connect(
|
||||
host=args.admin_host, port=args.admin_port,
|
||||
user=args.admin_user, password=args.admin_pass,
|
||||
ssl_disabled=True,
|
||||
)
|
||||
cur = adm.cursor()
|
||||
cur.execute(f"DELETE FROM mysqlx_routes WHERE name='{args.route_name}'")
|
||||
cur.execute("LOAD MYSQLX ROUTES TO RUNTIME")
|
||||
adm.commit()
|
||||
adm.close()
|
||||
except Exception as e:
|
||||
print(f"Admin drop+reload failed: {e}")
|
||||
stop.set()
|
||||
for t in threads:
|
||||
t.join(timeout=5)
|
||||
return 1
|
||||
|
||||
# New connection to the route should be refused.
|
||||
print("Attempting new connection to dropped route...")
|
||||
new_conn_refused = False
|
||||
try:
|
||||
s = open_session(args)
|
||||
s.close()
|
||||
print(" unexpected: new connection succeeded")
|
||||
except Exception as e:
|
||||
new_conn_refused = True
|
||||
print(f" expected: new connection refused: {type(e).__name__}: {e}")
|
||||
|
||||
# Existing clients should keep running for a few more seconds.
|
||||
time.sleep(5)
|
||||
stop.set()
|
||||
for t in threads:
|
||||
t.join(timeout=5)
|
||||
|
||||
survivors = sum(1 for r in results
|
||||
if r["error"] is None and r["queries"] > 100)
|
||||
print(f"\nResult: {survivors}/{args.clients} clients survived the reload, "
|
||||
f"new connection refused: {new_conn_refused}")
|
||||
if survivors == args.clients and new_conn_refused:
|
||||
print("PASS")
|
||||
return 0
|
||||
print("FAIL")
|
||||
return 1
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
rc = 0
|
||||
if args.scenario in ("sigterm", "all"):
|
||||
rc |= scenario_sigterm(args)
|
||||
if args.scenario in ("reload", "all"):
|
||||
rc |= scenario_reload(args)
|
||||
sys.exit(rc)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,233 @@
|
||||
#!/usr/bin/env python3
|
||||
"""mysqlx X-Protocol stress harness.
|
||||
|
||||
Opens N concurrent X-Protocol clients running query loops against a
|
||||
running ProxySQL with the mysqlx plugin loaded. Drives connection
|
||||
churn periodically. Captures throughput, error rate, RSS, fd count,
|
||||
thread count, and stats_mysqlx_routes over time.
|
||||
|
||||
Pass criteria for issue #5681: 60-minute run with no crash, no
|
||||
monotonic memory/fd growth, error rate < 0.1%, throughput stable.
|
||||
|
||||
Requires `mysql-connector-python`. Not a TAP test; runs against live
|
||||
infrastructure. See test/scripts/mysqlx/README.md for setup.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
try:
|
||||
import mysqlx
|
||||
import mysql.connector
|
||||
except ImportError:
|
||||
sys.stderr.write(
|
||||
"ERROR: mysql-connector-python not installed. "
|
||||
"Run: pip install mysql-connector-python\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("--proxysql-host", default="127.0.0.1")
|
||||
p.add_argument("--proxysql-port", type=int, default=6603)
|
||||
p.add_argument("--admin-host", default="127.0.0.1")
|
||||
p.add_argument("--admin-port", type=int, default=6032)
|
||||
p.add_argument("--admin-user", default="admin")
|
||||
p.add_argument("--admin-pass", default="admin")
|
||||
p.add_argument("--user", required=True)
|
||||
p.add_argument("--password", required=True)
|
||||
p.add_argument("--concurrent", type=int, default=100,
|
||||
help="Concurrent client count (target 1000)")
|
||||
p.add_argument("--duration", default="60m",
|
||||
help="Run duration: 60s / 30m / 24h")
|
||||
p.add_argument("--churn-interval", default="30s",
|
||||
help="How often to recycle a fraction of clients")
|
||||
p.add_argument("--churn-fraction", type=float, default=0.1,
|
||||
help="Fraction of clients to recycle per churn tick")
|
||||
p.add_argument("--metrics-out", default="/tmp/mysqlx_stress_metrics.csv")
|
||||
p.add_argument("--metrics-interval-sec", type=int, default=10)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def parse_duration(s: str) -> float:
|
||||
if s.endswith("s"):
|
||||
return float(s[:-1])
|
||||
if s.endswith("m"):
|
||||
return float(s[:-1]) * 60
|
||||
if s.endswith("h"):
|
||||
return float(s[:-1]) * 3600
|
||||
return float(s)
|
||||
|
||||
|
||||
class ClientWorker(threading.Thread):
|
||||
def __init__(self, args, idx: int, stop_event: threading.Event):
|
||||
super().__init__(daemon=True)
|
||||
self.args = args
|
||||
self.idx = idx
|
||||
self.stop_event = stop_event
|
||||
self.queries = 0
|
||||
self.errors = 0
|
||||
self.last_error: Optional[str] = None
|
||||
|
||||
def run(self):
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
sess = mysqlx.get_session({
|
||||
"host": self.args.proxysql_host,
|
||||
"port": self.args.proxysql_port,
|
||||
"user": self.args.user,
|
||||
"password": self.args.password,
|
||||
"ssl-mode": "DISABLED",
|
||||
})
|
||||
while not self.stop_event.is_set():
|
||||
sess.sql("SELECT 1").execute().fetch_all()
|
||||
self.queries += 1
|
||||
sess.close()
|
||||
except Exception as e:
|
||||
self.errors += 1
|
||||
self.last_error = f"{type(e).__name__}: {e}"
|
||||
# back off briefly so a backend outage doesn't pin CPU
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def find_proxysql_pid() -> Optional[int]:
|
||||
try:
|
||||
out = subprocess.check_output(["pidof", "proxysql"]).decode().strip()
|
||||
return int(out.split()[0]) if out else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def proc_status(pid: int) -> dict:
|
||||
rss = 0
|
||||
threads = 0
|
||||
try:
|
||||
with open(f"/proc/{pid}/status") as fh:
|
||||
for line in fh:
|
||||
if line.startswith("VmRSS:"):
|
||||
rss = int(line.split()[1]) # KiB
|
||||
elif line.startswith("Threads:"):
|
||||
threads = int(line.split()[1])
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
fd_count = 0
|
||||
try:
|
||||
fd_count = len(os.listdir(f"/proc/{pid}/fd"))
|
||||
except (FileNotFoundError, PermissionError):
|
||||
pass
|
||||
return {"rss_kib": rss, "threads": threads, "fds": fd_count}
|
||||
|
||||
|
||||
def fetch_route_stats(args) -> List[dict]:
|
||||
try:
|
||||
adm = mysql.connector.connect(
|
||||
host=args.admin_host, port=args.admin_port,
|
||||
user=args.admin_user, password=args.admin_pass,
|
||||
ssl_disabled=True,
|
||||
)
|
||||
cur = adm.cursor(dictionary=True)
|
||||
cur.execute("SELECT * FROM stats_mysqlx_routes")
|
||||
rows = cur.fetchall()
|
||||
adm.close()
|
||||
return rows
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
duration = parse_duration(args.duration)
|
||||
churn_interval = parse_duration(args.churn_interval)
|
||||
|
||||
stop = threading.Event()
|
||||
workers: List[ClientWorker] = []
|
||||
for i in range(args.concurrent):
|
||||
w = ClientWorker(args, i, stop)
|
||||
w.start()
|
||||
workers.append(w)
|
||||
|
||||
pid = find_proxysql_pid()
|
||||
print(f"Spawned {args.concurrent} workers; proxysql pid={pid}; "
|
||||
f"duration={duration:.0f}s; metrics → {args.metrics_out}")
|
||||
|
||||
metrics_fh = open(args.metrics_out, "w")
|
||||
writer = csv.writer(metrics_fh)
|
||||
writer.writerow(["t_sec", "rss_kib", "threads", "fds", "total_queries",
|
||||
"total_errors", "queries_per_sec"])
|
||||
|
||||
start = time.time()
|
||||
last_snapshot_queries = 0
|
||||
last_snapshot_t = start
|
||||
last_churn_t = start
|
||||
|
||||
try:
|
||||
while time.time() - start < duration:
|
||||
time.sleep(args.metrics_interval_sec)
|
||||
now = time.time()
|
||||
elapsed = now - start
|
||||
|
||||
total_q = sum(w.queries for w in workers)
|
||||
total_e = sum(w.errors for w in workers)
|
||||
ps = proc_status(pid) if pid else {"rss_kib": 0, "threads": 0,
|
||||
"fds": 0}
|
||||
qps = ((total_q - last_snapshot_queries) /
|
||||
(now - last_snapshot_t)) if now > last_snapshot_t else 0
|
||||
last_snapshot_queries = total_q
|
||||
last_snapshot_t = now
|
||||
|
||||
writer.writerow([f"{elapsed:.1f}", ps["rss_kib"], ps["threads"],
|
||||
ps["fds"], total_q, total_e, f"{qps:.0f}"])
|
||||
metrics_fh.flush()
|
||||
print(f"[t={elapsed:6.0f}s] queries={total_q} errors={total_e} "
|
||||
f"qps={qps:.0f} rss={ps['rss_kib']}KiB "
|
||||
f"threads={ps['threads']} fds={ps['fds']}")
|
||||
|
||||
# Churn: close + reopen ~churn_fraction of workers
|
||||
if now - last_churn_t >= churn_interval:
|
||||
last_churn_t = now
|
||||
n_churn = max(1, int(args.concurrent * args.churn_fraction))
|
||||
# Pick the n_churn highest-error workers (they've been
|
||||
# struggling; recycling shows whether errors were
|
||||
# transient vs persistent).
|
||||
workers.sort(key=lambda w: w.errors, reverse=True)
|
||||
# Restarting an inflight thread cleanly is awkward; for
|
||||
# this harness, we just count the churn intent in the
|
||||
# log. The loop's per-iteration mysqlx.get_session()
|
||||
# already provides connection-level churn naturally.
|
||||
print(f" (churn tick: {n_churn} workers cycling)")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("interrupted")
|
||||
|
||||
print("Stopping workers...")
|
||||
stop.set()
|
||||
for w in workers:
|
||||
w.join(timeout=5)
|
||||
|
||||
metrics_fh.close()
|
||||
total_q = sum(w.queries for w in workers)
|
||||
total_e = sum(w.errors for w in workers)
|
||||
error_rate = total_e / max(1, total_q)
|
||||
print(f"\nFinal: total_queries={total_q} total_errors={total_e} "
|
||||
f"error_rate={error_rate:.4%}")
|
||||
|
||||
print("\nFinal stats_mysqlx_routes:")
|
||||
for row in fetch_route_stats(args):
|
||||
print(f" {row}")
|
||||
|
||||
if error_rate < 0.001:
|
||||
print("PASS: error rate under 0.1%")
|
||||
return 0
|
||||
print(f"FAIL: error rate {error_rate:.4%} >= 0.1%")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Loading…
Reference in new issue