From 7e2f7828ae970f17a84207783fec998ba9b0caa8 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Sat, 18 Apr 2026 08:44:35 +0000 Subject: [PATCH] fix(mysqlx): make sync transactions atomic on execute() failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sync_disk_to_memory() and copy_to_runtime() in mysqlx_plugin.cpp now check every admindb.execute() return value. On BEGIN / DELETE / INSERT / COMMIT failure the code issues a best-effort ROLLBACK, logs via services->log_message, and skips the current table. Both functions return false if any table's transaction failed, true if every replace succeeded. coderabbitai flagged this on #5642, #5643, and #5644 (outside-diff because the file appears in each PR's surrounding context). Without the return checks, a failed INSERT landing between a successful DELETE and the unconditional COMMIT silently wiped routes / users / endpoints / backend_endpoints — the transaction wrap advertised atomicity it did not deliver. The fix restores that guarantee. The shared BEGIN/DELETE/INSERT/COMMIT body is now a local helper replace_table_atomically() that takes a ProxySQL_PluginServices* so it can log via the existing plugin log_message callback; the two callers reduce to four-line loops over their table lists. mysqlx_start passes ctx.services through; its return values remain advisory (log-and-continue on a false return), intentionally — plugins shouldn't block proxysql startup on one table's sync failure. Upgrading to fail-fast is a future decision. Testing: added test_insert_failure_rolls_back in test/tap/tests/unit/mysqlx_robustness_unit-t.cpp. It seeds a dst table with a CHECK (id %% 2 = 0) constraint plus pre-existing rows, points src at odd ids, runs the atomic replace, and asserts (a) the function returns false, (b) dst still holds its original rows, (c) the row sum matches the pre-transaction state. plan(35) -> plan(39). All 39 assertions pass. Out of scope: the Makefile link-line fix from 82fe27f4b still applies; no further ripple. --- plugins/mysqlx/src/mysqlx_plugin.cpp | 106 +++++++++++++----- .../tests/unit/mysqlx_robustness_unit-t.cpp | 66 ++++++++++- 2 files changed, 139 insertions(+), 33 deletions(-) diff --git a/plugins/mysqlx/src/mysqlx_plugin.cpp b/plugins/mysqlx/src/mysqlx_plugin.cpp index 24463b6b1..9b89217a1 100644 --- a/plugins/mysqlx/src/mysqlx_plugin.cpp +++ b/plugins/mysqlx/src/mysqlx_plugin.cpp @@ -48,50 +48,100 @@ bool parse_bind_addr(const std::string& bind, std::string& host, int& port) { return true; } -bool sync_disk_to_memory(SQLite3DB& admindb) { +// Atomically replace `dest` with the rows from `source`. Every admindb.execute() +// return value is checked: on any failure we ROLLBACK (defensively, even after a +// failed BEGIN) and skip the table. Returns false if any step for this table +// failed. The transaction wrap is what makes this safe — without the return +// checks, a failed INSERT between a successful DELETE and an unconditional +// COMMIT would silently wipe the destination. +bool replace_table_atomically(SQLite3DB& admindb, + ProxySQL_PluginServices* services, + const char* dest, + const char* source) { + auto log_err = [services](const char* msg) { + if (services != nullptr && services->log_message != nullptr) { + services->log_message(3, msg); + } + }; + + if (!admindb.execute("BEGIN")) { + std::string m = "mysqlx sync: BEGIN failed for "; + m += dest; + log_err(m.c_str()); + return false; + } + + std::string q = "DELETE FROM "; + q += dest; + if (!admindb.execute(q.c_str())) { + std::string m = "mysqlx sync: DELETE failed for "; + m += dest; + log_err(m.c_str()); + admindb.execute("ROLLBACK"); + return false; + } + + q = "INSERT INTO "; + q += dest; + q += " SELECT * FROM "; + q += source; + if (!admindb.execute(q.c_str())) { + std::string m = "mysqlx sync: INSERT failed for "; + m += dest; + log_err(m.c_str()); + admindb.execute("ROLLBACK"); + return false; + } + + if (!admindb.execute("COMMIT")) { + std::string m = "mysqlx sync: COMMIT failed for "; + m += dest; + log_err(m.c_str()); + // Defensive — COMMIT may have partially succeeded; best-effort ROLLBACK. + admindb.execute("ROLLBACK"); + return false; + } + return true; +} + +bool sync_disk_to_memory(SQLite3DB& admindb, ProxySQL_PluginServices* services) { const char* tables[] = { "mysqlx_users", "mysqlx_routes", "mysqlx_backend_endpoints", "mysqlx_variables", }; + bool all_ok = true; for (const char* tbl : tables) { - admindb.execute("BEGIN"); - std::string q = "DELETE FROM main."; - q += tbl; - admindb.execute(q.c_str()); - - q = "INSERT INTO main."; - q += tbl; - q += " SELECT * FROM disk."; - q += tbl; - admindb.execute(q.c_str()); - admindb.execute("COMMIT"); + std::string dest = "main."; + dest += tbl; + std::string source = "disk."; + source += tbl; + if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) { + all_ok = false; + } } - return true; + return all_ok; } -bool copy_to_runtime(SQLite3DB& admindb) { +bool copy_to_runtime(SQLite3DB& admindb, ProxySQL_PluginServices* services) { const char* pairs[][2] = { {"mysqlx_users", "runtime_mysqlx_users"}, {"mysqlx_routes", "runtime_mysqlx_routes"}, {"mysqlx_backend_endpoints", "runtime_mysqlx_backend_endpoints"}, {"mysqlx_variables", "runtime_mysqlx_variables"}, }; + bool all_ok = true; for (const auto& p : pairs) { - admindb.execute("BEGIN"); - std::string q = "DELETE FROM main."; - q += p[1]; - admindb.execute(q.c_str()); - - q = "INSERT INTO main."; - q += p[1]; - q += " SELECT * FROM main."; - q += p[0]; - admindb.execute(q.c_str()); - admindb.execute("COMMIT"); + std::string dest = "main."; + dest += p[1]; + std::string source = "main."; + source += p[0]; + if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) { + all_ok = false; + } } - return true; + return all_ok; } bool mysqlx_start() { @@ -100,8 +150,8 @@ bool mysqlx_start() { if (ctx.services != nullptr && ctx.services->get_admindb != nullptr) { SQLite3DB* admindb = ctx.services->get_admindb(); if (admindb != nullptr) { - sync_disk_to_memory(*admindb); - copy_to_runtime(*admindb); + sync_disk_to_memory(*admindb, ctx.services); + copy_to_runtime(*admindb, ctx.services); std::string err; if (!ctx.config_store->load_from_runtime(*admindb, err)) { diff --git a/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp b/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp index a0e6f39f0..694413c10 100644 --- a/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp +++ b/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp @@ -572,20 +572,37 @@ static void test_forward_empty_frame() { // test to exercise the invariant that an empty source table overwrites a // populated destination — the skip (if count==0) that used to guard this // was a correctness bug that left stale rows in main.* across restarts. +// +// The signature/semantics match the production replace_table_atomically(): +// every execute() return is checked, a failure at any step triggers ROLLBACK +// and the function returns false. This preserves the destination's +// pre-transaction state when the INSERT fails — the atomicity guarantee the +// transaction wrap is supposed to deliver. static bool replace_table_contents(SQLite3DB& db, const char* dest_table, const char* source_table) { - db.execute("BEGIN"); + if (!db.execute("BEGIN")) { + return false; + } std::string q = "DELETE FROM "; q += dest_table; - db.execute(q.c_str()); + if (!db.execute(q.c_str())) { + db.execute("ROLLBACK"); + return false; + } q = "INSERT INTO "; q += dest_table; q += " SELECT * FROM "; q += source_table; - db.execute(q.c_str()); - db.execute("COMMIT"); + if (!db.execute(q.c_str())) { + db.execute("ROLLBACK"); + return false; + } + if (!db.execute("COMMIT")) { + db.execute("ROLLBACK"); + return false; + } return true; } @@ -616,8 +633,46 @@ static void test_empty_source_clears_stale_dest() { "empty source overwrites stale dest (dest is empty after replace)"); } +// Verifies that when INSERT fails mid-transaction, the DELETE is rolled back +// and the destination retains its pre-transaction contents. Without the +// execute()-return checks added by this follow-up, the unconditional COMMIT +// would persist the DELETE and silently wipe the destination. +static void test_insert_failure_rolls_back() { + SQLite3DB db; + db.open(const_cast(":memory:"), + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE); + + // src has odd ids; dst has a CHECK that requires even ids. The INSERT + // will fail at the first odd value, leaving the transaction needing + // ROLLBACK to preserve the stale rows we seeded into dst. + db.execute("CREATE TABLE src (id INT PRIMARY KEY)"); + db.execute("CREATE TABLE dst (id INT PRIMARY KEY CHECK (id % 2 = 0))"); + + db.execute("INSERT INTO src (id) VALUES (1)"); + db.execute("INSERT INTO src (id) VALUES (3)"); + + db.execute("INSERT INTO dst (id) VALUES (10)"); + db.execute("INSERT INTO dst (id) VALUES (20)"); + + int dst_cnt_before = db.return_one_int("SELECT COUNT(*) FROM dst"); + ok(dst_cnt_before == 2, + "precondition: dst has 2 pre-existing rows"); + + bool ok_return = replace_table_contents(db, "dst", "src"); + ok(ok_return == false, + "replace_table_contents returns false when INSERT violates CHECK"); + + int dst_cnt_after = db.return_one_int("SELECT COUNT(*) FROM dst"); + ok(dst_cnt_after == 2, + "dst still has 2 rows after failed INSERT (DELETE was rolled back)"); + + int dst_sum = db.return_one_int("SELECT COALESCE(SUM(id), 0) FROM dst"); + ok(dst_sum == 30, + "dst retains original rows (sum=30), not the invalid odd ids from src"); +} + int main() { - plan(35); + plan(39); test_server_response_terminal_frame(); test_server_response_non_terminal_keeps_waiting(); @@ -633,6 +688,7 @@ int main() { test_client_disconnect_detected(); test_forward_empty_frame(); test_empty_source_clears_stale_dest(); + test_insert_failure_rolls_back(); return exit_status(); }