fix(mysqlx): make sync transactions atomic on execute() failure

sync_disk_to_memory() and copy_to_runtime() in mysqlx_plugin.cpp now check
every admindb.execute() return value. On BEGIN / DELETE / INSERT / COMMIT
failure the code issues a best-effort ROLLBACK, logs via
services->log_message, and skips the current table. Both functions return
false if any table's transaction failed, true if every replace succeeded.

coderabbitai flagged this on #5642, #5643, and #5644 (outside-diff because
the file appears in each PR's surrounding context). Without the return
checks, a failed INSERT landing between a successful DELETE and the
unconditional COMMIT silently wiped routes / users / endpoints /
backend_endpoints — the transaction wrap advertised atomicity it did not
deliver. The fix restores that guarantee.

The shared BEGIN/DELETE/INSERT/COMMIT body is now a local helper
replace_table_atomically() that takes a ProxySQL_PluginServices* so it
can log via the existing plugin log_message callback; the two callers
reduce to four-line loops over their table lists. mysqlx_start passes
ctx.services through; its return values remain advisory (log-and-continue
on a false return), intentionally — plugins shouldn't block proxysql
startup on one table's sync failure. Upgrading to fail-fast is a future
decision.

Testing: added test_insert_failure_rolls_back in
test/tap/tests/unit/mysqlx_robustness_unit-t.cpp. It seeds a dst table
with a CHECK (id %% 2 = 0) constraint plus pre-existing rows, points src
at odd ids, runs the atomic replace, and asserts (a) the function
returns false, (b) dst still holds its original rows, (c) the row sum
matches the pre-transaction state. plan(35) -> plan(39). All 39
assertions pass.

Out of scope: the Makefile link-line fix from 82fe27f4b still applies;
no further ripple.
fix/mysqlx-stale-row-sync
Rene Cannao 1 month ago
parent 82fe27f4b0
commit 7e2f7828ae

@ -48,50 +48,100 @@ bool parse_bind_addr(const std::string& bind, std::string& host, int& port) {
return true;
}
bool sync_disk_to_memory(SQLite3DB& admindb) {
// Atomically replace `dest` with the rows from `source`. Every admindb.execute()
// return value is checked: on any failure we ROLLBACK (defensively, even after a
// failed BEGIN) and skip the table. Returns false if any step for this table
// failed. The transaction wrap is what makes this safe — without the return
// checks, a failed INSERT between a successful DELETE and an unconditional
// COMMIT would silently wipe the destination.
bool replace_table_atomically(SQLite3DB& admindb,
ProxySQL_PluginServices* services,
const char* dest,
const char* source) {
auto log_err = [services](const char* msg) {
if (services != nullptr && services->log_message != nullptr) {
services->log_message(3, msg);
}
};
if (!admindb.execute("BEGIN")) {
std::string m = "mysqlx sync: BEGIN failed for ";
m += dest;
log_err(m.c_str());
return false;
}
std::string q = "DELETE FROM ";
q += dest;
if (!admindb.execute(q.c_str())) {
std::string m = "mysqlx sync: DELETE failed for ";
m += dest;
log_err(m.c_str());
admindb.execute("ROLLBACK");
return false;
}
q = "INSERT INTO ";
q += dest;
q += " SELECT * FROM ";
q += source;
if (!admindb.execute(q.c_str())) {
std::string m = "mysqlx sync: INSERT failed for ";
m += dest;
log_err(m.c_str());
admindb.execute("ROLLBACK");
return false;
}
if (!admindb.execute("COMMIT")) {
std::string m = "mysqlx sync: COMMIT failed for ";
m += dest;
log_err(m.c_str());
// Defensive — COMMIT may have partially succeeded; best-effort ROLLBACK.
admindb.execute("ROLLBACK");
return false;
}
return true;
}
bool sync_disk_to_memory(SQLite3DB& admindb, ProxySQL_PluginServices* services) {
const char* tables[] = {
"mysqlx_users",
"mysqlx_routes",
"mysqlx_backend_endpoints",
"mysqlx_variables",
};
bool all_ok = true;
for (const char* tbl : tables) {
admindb.execute("BEGIN");
std::string q = "DELETE FROM main.";
q += tbl;
admindb.execute(q.c_str());
q = "INSERT INTO main.";
q += tbl;
q += " SELECT * FROM disk.";
q += tbl;
admindb.execute(q.c_str());
admindb.execute("COMMIT");
std::string dest = "main.";
dest += tbl;
std::string source = "disk.";
source += tbl;
if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) {
all_ok = false;
}
}
return true;
return all_ok;
}
bool copy_to_runtime(SQLite3DB& admindb) {
bool copy_to_runtime(SQLite3DB& admindb, ProxySQL_PluginServices* services) {
const char* pairs[][2] = {
{"mysqlx_users", "runtime_mysqlx_users"},
{"mysqlx_routes", "runtime_mysqlx_routes"},
{"mysqlx_backend_endpoints", "runtime_mysqlx_backend_endpoints"},
{"mysqlx_variables", "runtime_mysqlx_variables"},
};
bool all_ok = true;
for (const auto& p : pairs) {
admindb.execute("BEGIN");
std::string q = "DELETE FROM main.";
q += p[1];
admindb.execute(q.c_str());
q = "INSERT INTO main.";
q += p[1];
q += " SELECT * FROM main.";
q += p[0];
admindb.execute(q.c_str());
admindb.execute("COMMIT");
std::string dest = "main.";
dest += p[1];
std::string source = "main.";
source += p[0];
if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) {
all_ok = false;
}
}
return true;
return all_ok;
}
bool mysqlx_start() {
@ -100,8 +150,8 @@ bool mysqlx_start() {
if (ctx.services != nullptr && ctx.services->get_admindb != nullptr) {
SQLite3DB* admindb = ctx.services->get_admindb();
if (admindb != nullptr) {
sync_disk_to_memory(*admindb);
copy_to_runtime(*admindb);
sync_disk_to_memory(*admindb, ctx.services);
copy_to_runtime(*admindb, ctx.services);
std::string err;
if (!ctx.config_store->load_from_runtime(*admindb, err)) {

@ -572,20 +572,37 @@ static void test_forward_empty_frame() {
// test to exercise the invariant that an empty source table overwrites a
// populated destination — the skip (if count==0) that used to guard this
// was a correctness bug that left stale rows in main.* across restarts.
//
// The signature/semantics match the production replace_table_atomically():
// every execute() return is checked, a failure at any step triggers ROLLBACK
// and the function returns false. This preserves the destination's
// pre-transaction state when the INSERT fails — the atomicity guarantee the
// transaction wrap is supposed to deliver.
static bool replace_table_contents(SQLite3DB& db,
const char* dest_table,
const char* source_table) {
db.execute("BEGIN");
if (!db.execute("BEGIN")) {
return false;
}
std::string q = "DELETE FROM ";
q += dest_table;
db.execute(q.c_str());
if (!db.execute(q.c_str())) {
db.execute("ROLLBACK");
return false;
}
q = "INSERT INTO ";
q += dest_table;
q += " SELECT * FROM ";
q += source_table;
db.execute(q.c_str());
db.execute("COMMIT");
if (!db.execute(q.c_str())) {
db.execute("ROLLBACK");
return false;
}
if (!db.execute("COMMIT")) {
db.execute("ROLLBACK");
return false;
}
return true;
}
@ -616,8 +633,46 @@ static void test_empty_source_clears_stale_dest() {
"empty source overwrites stale dest (dest is empty after replace)");
}
// Verifies that when INSERT fails mid-transaction, the DELETE is rolled back
// and the destination retains its pre-transaction contents. Without the
// execute()-return checks added by this follow-up, the unconditional COMMIT
// would persist the DELETE and silently wipe the destination.
static void test_insert_failure_rolls_back() {
SQLite3DB db;
db.open(const_cast<char*>(":memory:"),
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
// src has odd ids; dst has a CHECK that requires even ids. The INSERT
// will fail at the first odd value, leaving the transaction needing
// ROLLBACK to preserve the stale rows we seeded into dst.
db.execute("CREATE TABLE src (id INT PRIMARY KEY)");
db.execute("CREATE TABLE dst (id INT PRIMARY KEY CHECK (id % 2 = 0))");
db.execute("INSERT INTO src (id) VALUES (1)");
db.execute("INSERT INTO src (id) VALUES (3)");
db.execute("INSERT INTO dst (id) VALUES (10)");
db.execute("INSERT INTO dst (id) VALUES (20)");
int dst_cnt_before = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(dst_cnt_before == 2,
"precondition: dst has 2 pre-existing rows");
bool ok_return = replace_table_contents(db, "dst", "src");
ok(ok_return == false,
"replace_table_contents returns false when INSERT violates CHECK");
int dst_cnt_after = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(dst_cnt_after == 2,
"dst still has 2 rows after failed INSERT (DELETE was rolled back)");
int dst_sum = db.return_one_int("SELECT COALESCE(SUM(id), 0) FROM dst");
ok(dst_sum == 30,
"dst retains original rows (sum=30), not the invalid odd ids from src");
}
int main() {
plan(35);
plan(39);
test_server_response_terminal_frame();
test_server_response_non_terminal_keeps_waiting();
@ -633,6 +688,7 @@ int main() {
test_client_disconnect_detected();
test_forward_empty_frame();
test_empty_source_clears_stale_dest();
test_insert_failure_rolls_back();
return exit_status();
}

Loading…
Cancel
Save