diff --git a/plugins/mysqlx/src/mysqlx_plugin.cpp b/plugins/mysqlx/src/mysqlx_plugin.cpp index 24463b6b1..9b89217a1 100644 --- a/plugins/mysqlx/src/mysqlx_plugin.cpp +++ b/plugins/mysqlx/src/mysqlx_plugin.cpp @@ -48,50 +48,100 @@ bool parse_bind_addr(const std::string& bind, std::string& host, int& port) { return true; } -bool sync_disk_to_memory(SQLite3DB& admindb) { +// Atomically replace `dest` with the rows from `source`. Every admindb.execute() +// return value is checked: on any failure we ROLLBACK (defensively, even after a +// failed BEGIN) and skip the table. Returns false if any step for this table +// failed. The transaction wrap is what makes this safe — without the return +// checks, a failed INSERT between a successful DELETE and an unconditional +// COMMIT would silently wipe the destination. +bool replace_table_atomically(SQLite3DB& admindb, + ProxySQL_PluginServices* services, + const char* dest, + const char* source) { + auto log_err = [services](const char* msg) { + if (services != nullptr && services->log_message != nullptr) { + services->log_message(3, msg); + } + }; + + if (!admindb.execute("BEGIN")) { + std::string m = "mysqlx sync: BEGIN failed for "; + m += dest; + log_err(m.c_str()); + return false; + } + + std::string q = "DELETE FROM "; + q += dest; + if (!admindb.execute(q.c_str())) { + std::string m = "mysqlx sync: DELETE failed for "; + m += dest; + log_err(m.c_str()); + admindb.execute("ROLLBACK"); + return false; + } + + q = "INSERT INTO "; + q += dest; + q += " SELECT * FROM "; + q += source; + if (!admindb.execute(q.c_str())) { + std::string m = "mysqlx sync: INSERT failed for "; + m += dest; + log_err(m.c_str()); + admindb.execute("ROLLBACK"); + return false; + } + + if (!admindb.execute("COMMIT")) { + std::string m = "mysqlx sync: COMMIT failed for "; + m += dest; + log_err(m.c_str()); + // Defensive — COMMIT may have partially succeeded; best-effort ROLLBACK. + admindb.execute("ROLLBACK"); + return false; + } + return true; +} + +bool sync_disk_to_memory(SQLite3DB& admindb, ProxySQL_PluginServices* services) { const char* tables[] = { "mysqlx_users", "mysqlx_routes", "mysqlx_backend_endpoints", "mysqlx_variables", }; + bool all_ok = true; for (const char* tbl : tables) { - admindb.execute("BEGIN"); - std::string q = "DELETE FROM main."; - q += tbl; - admindb.execute(q.c_str()); - - q = "INSERT INTO main."; - q += tbl; - q += " SELECT * FROM disk."; - q += tbl; - admindb.execute(q.c_str()); - admindb.execute("COMMIT"); + std::string dest = "main."; + dest += tbl; + std::string source = "disk."; + source += tbl; + if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) { + all_ok = false; + } } - return true; + return all_ok; } -bool copy_to_runtime(SQLite3DB& admindb) { +bool copy_to_runtime(SQLite3DB& admindb, ProxySQL_PluginServices* services) { const char* pairs[][2] = { {"mysqlx_users", "runtime_mysqlx_users"}, {"mysqlx_routes", "runtime_mysqlx_routes"}, {"mysqlx_backend_endpoints", "runtime_mysqlx_backend_endpoints"}, {"mysqlx_variables", "runtime_mysqlx_variables"}, }; + bool all_ok = true; for (const auto& p : pairs) { - admindb.execute("BEGIN"); - std::string q = "DELETE FROM main."; - q += p[1]; - admindb.execute(q.c_str()); - - q = "INSERT INTO main."; - q += p[1]; - q += " SELECT * FROM main."; - q += p[0]; - admindb.execute(q.c_str()); - admindb.execute("COMMIT"); + std::string dest = "main."; + dest += p[1]; + std::string source = "main."; + source += p[0]; + if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) { + all_ok = false; + } } - return true; + return all_ok; } bool mysqlx_start() { @@ -100,8 +150,8 @@ bool mysqlx_start() { if (ctx.services != nullptr && ctx.services->get_admindb != nullptr) { SQLite3DB* admindb = ctx.services->get_admindb(); if (admindb != nullptr) { - sync_disk_to_memory(*admindb); - copy_to_runtime(*admindb); + sync_disk_to_memory(*admindb, ctx.services); + copy_to_runtime(*admindb, ctx.services); std::string err; if (!ctx.config_store->load_from_runtime(*admindb, err)) { diff --git a/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp b/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp index a0e6f39f0..694413c10 100644 --- a/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp +++ b/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp @@ -572,20 +572,37 @@ static void test_forward_empty_frame() { // test to exercise the invariant that an empty source table overwrites a // populated destination — the skip (if count==0) that used to guard this // was a correctness bug that left stale rows in main.* across restarts. +// +// The signature/semantics match the production replace_table_atomically(): +// every execute() return is checked, a failure at any step triggers ROLLBACK +// and the function returns false. This preserves the destination's +// pre-transaction state when the INSERT fails — the atomicity guarantee the +// transaction wrap is supposed to deliver. static bool replace_table_contents(SQLite3DB& db, const char* dest_table, const char* source_table) { - db.execute("BEGIN"); + if (!db.execute("BEGIN")) { + return false; + } std::string q = "DELETE FROM "; q += dest_table; - db.execute(q.c_str()); + if (!db.execute(q.c_str())) { + db.execute("ROLLBACK"); + return false; + } q = "INSERT INTO "; q += dest_table; q += " SELECT * FROM "; q += source_table; - db.execute(q.c_str()); - db.execute("COMMIT"); + if (!db.execute(q.c_str())) { + db.execute("ROLLBACK"); + return false; + } + if (!db.execute("COMMIT")) { + db.execute("ROLLBACK"); + return false; + } return true; } @@ -616,8 +633,46 @@ static void test_empty_source_clears_stale_dest() { "empty source overwrites stale dest (dest is empty after replace)"); } +// Verifies that when INSERT fails mid-transaction, the DELETE is rolled back +// and the destination retains its pre-transaction contents. Without the +// execute()-return checks added by this follow-up, the unconditional COMMIT +// would persist the DELETE and silently wipe the destination. +static void test_insert_failure_rolls_back() { + SQLite3DB db; + db.open(const_cast(":memory:"), + SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE); + + // src has odd ids; dst has a CHECK that requires even ids. The INSERT + // will fail at the first odd value, leaving the transaction needing + // ROLLBACK to preserve the stale rows we seeded into dst. + db.execute("CREATE TABLE src (id INT PRIMARY KEY)"); + db.execute("CREATE TABLE dst (id INT PRIMARY KEY CHECK (id % 2 = 0))"); + + db.execute("INSERT INTO src (id) VALUES (1)"); + db.execute("INSERT INTO src (id) VALUES (3)"); + + db.execute("INSERT INTO dst (id) VALUES (10)"); + db.execute("INSERT INTO dst (id) VALUES (20)"); + + int dst_cnt_before = db.return_one_int("SELECT COUNT(*) FROM dst"); + ok(dst_cnt_before == 2, + "precondition: dst has 2 pre-existing rows"); + + bool ok_return = replace_table_contents(db, "dst", "src"); + ok(ok_return == false, + "replace_table_contents returns false when INSERT violates CHECK"); + + int dst_cnt_after = db.return_one_int("SELECT COUNT(*) FROM dst"); + ok(dst_cnt_after == 2, + "dst still has 2 rows after failed INSERT (DELETE was rolled back)"); + + int dst_sum = db.return_one_int("SELECT COALESCE(SUM(id), 0) FROM dst"); + ok(dst_sum == 30, + "dst retains original rows (sum=30), not the invalid odd ids from src"); +} + int main() { - plan(35); + plan(39); test_server_response_terminal_frame(); test_server_response_non_terminal_keeps_waiting(); @@ -633,6 +688,7 @@ int main() { test_client_disconnect_detected(); test_forward_empty_frame(); test_empty_source_clears_stale_dest(); + test_insert_failure_rolls_back(); return exit_status(); }