fix(mysqlx): sync empty source tables to overwrite stale rows

What: removed `if (cnt == 0) continue;` in `sync_disk_to_memory` and
`copy_to_runtime` in `plugins/mysqlx/src/mysqlx_plugin.cpp`. Also removed
the now-dead `SELECT COUNT(*)` + result unpacking that those skip
conditions depended on.

Why: the original skip was meant as an optimization for "nothing to
copy" but it was a correctness bug. If a user emptied a mysqlx table in
`main` and saved to disk, on next restart `sync_disk_to_memory` would
see disk count == 0, skip the replace, and leave stale rows in `main.*`.
Same gap for main -> runtime. The BEGIN/DELETE/INSERT/COMMIT sequence is
already atomic and correctly no-ops when the source is empty (DELETE
dest, INSERT zero rows).

Testing: added a unit test at
`test/tap/tests/unit/mysqlx_robustness_unit-t.cpp` exercising the
"empty source overwrites stale dest" invariant. Also added
`mysqlx_config_store.cpp` to the `mysqlx_robustness_unit-t` link line so
the test binary resolves `MysqlxConfigStore::resolve_identity` (a
latent build gap uncovered while validating the fix). Assertion count
33 -> 35.

Out of scope: no refactor of the inline atomic-replace sequence (its
atomicity is pre-existing). No changes to the admin-schema `copy_table`
path, which was already unconditional.
fix/mysqlx-stale-row-sync
Rene Cannao 1 month ago
parent 923cbfeadc
commit 82fe27f4b0

@ -56,22 +56,8 @@ bool sync_disk_to_memory(SQLite3DB& admindb) {
"mysqlx_variables",
};
for (const char* tbl : tables) {
char* err = nullptr;
std::string q = "SELECT COUNT(*) FROM disk.";
q += tbl;
std::unique_ptr<SQLite3_result> disk_res(admindb.execute_statement(q.c_str(), &err));
std::unique_ptr<char, void(*)(void*)> err_guard(err, &free);
if (err) continue;
if (!disk_res || disk_res->rows.empty() || !disk_res->rows[0] || !disk_res->rows[0]->fields[0]) {
continue;
}
int disk_cnt = atoi(disk_res->rows[0]->fields[0]);
disk_res.reset();
err_guard.reset();
if (disk_cnt == 0) continue;
admindb.execute("BEGIN");
q = "DELETE FROM main.";
std::string q = "DELETE FROM main.";
q += tbl;
admindb.execute(q.c_str());
@ -93,22 +79,8 @@ bool copy_to_runtime(SQLite3DB& admindb) {
{"mysqlx_variables", "runtime_mysqlx_variables"},
};
for (const auto& p : pairs) {
char* err = nullptr;
std::string q = "SELECT COUNT(*) FROM main.";
q += p[0];
std::unique_ptr<SQLite3_result> res(admindb.execute_statement(q.c_str(), &err));
std::unique_ptr<char, void(*)(void*)> err_guard(err, &free);
if (err) continue;
if (!res || res->rows.empty() || !res->rows[0] || !res->rows[0]->fields[0]) {
continue;
}
int cnt = atoi(res->rows[0]->fields[0]);
res.reset();
err_guard.reset();
if (cnt == 0) continue;
admindb.execute("BEGIN");
q = "DELETE FROM main.";
std::string q = "DELETE FROM main.";
q += p[1];
admindb.execute(q.c_str());

@ -524,8 +524,8 @@ mysqlx_credential_verify_unit-t: mysqlx_credential_verify_unit-t.cpp $(PROXYSQL_
$(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \
$(MYLIBS) -lprotobuf -lssl -lcrypto $(ALLOW_MULTI_DEF) -o $@
mysqlx_robustness_unit-t: mysqlx_robustness_unit-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_session.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_data_stream.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_connection.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_thread.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_protocol.cpp $(MYSQLX_PROTO_OBJS) $(TEST_HELPERS_OBJ) $(LIBPROXYSQLAR)
$(CXX) $< $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_session.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_data_stream.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_connection.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_thread.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_protocol.cpp $(MYSQLX_PROTO_OBJS) $(TEST_HELPERS_OBJ) \
mysqlx_robustness_unit-t: mysqlx_robustness_unit-t.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_session.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_data_stream.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_connection.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_thread.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_protocol.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_config_store.cpp $(MYSQLX_PROTO_OBJS) $(TEST_HELPERS_OBJ) $(LIBPROXYSQLAR)
$(CXX) $< $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_session.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_data_stream.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_connection.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_thread.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_protocol.cpp $(PROXYSQL_PATH)/plugins/mysqlx/src/mysqlx_config_store.cpp $(MYSQLX_PROTO_OBJS) $(TEST_HELPERS_OBJ) \
-I$(PROXYSQL_PATH)/plugins/mysqlx/include -I$(MYSQLX_PROTO_DIR) \
$(IDIRS) $(LDIRS) $(OPT) $(LIBPROXYSQLAR_FULL) $(STATIC_LIBS) \
$(MYLIBS) -lprotobuf -lssl -lcrypto $(ALLOW_MULTI_DEF) -lpthread -o $@

@ -1,6 +1,7 @@
#include "mysqlx_session.h"
#include "mysqlx_protocol.h"
#include "mysqlx_thread.h"
#include "sqlite3db.h"
#include "tap.h"
#include "test_globals.h"
#include "test_init.h"
@ -566,8 +567,57 @@ static void test_forward_empty_frame() {
close(backend_fds[0]); close(backend_fds[1]);
}
// Mirrors the atomic replace sequence used by sync_disk_to_memory() /
// copy_to_runtime() in plugins/mysqlx/src/mysqlx_plugin.cpp. Kept in the
// test to exercise the invariant that an empty source table overwrites a
// populated destination — the skip (if count==0) that used to guard this
// was a correctness bug that left stale rows in main.* across restarts.
static bool replace_table_contents(SQLite3DB& db,
const char* dest_table,
const char* source_table) {
db.execute("BEGIN");
std::string q = "DELETE FROM ";
q += dest_table;
db.execute(q.c_str());
q = "INSERT INTO ";
q += dest_table;
q += " SELECT * FROM ";
q += source_table;
db.execute(q.c_str());
db.execute("COMMIT");
return true;
}
static void test_empty_source_clears_stale_dest() {
SQLite3DB db;
db.open(const_cast<char*>(":memory:"),
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
db.execute("CREATE TABLE src (id INT PRIMARY KEY, name VARCHAR)");
db.execute("CREATE TABLE dst (id INT PRIMARY KEY, name VARCHAR)");
// Dest starts with stale rows; source is empty. This is exactly the
// scenario the old `if (count == 0) continue;` mishandled — after
// restart, sync_disk_to_memory would see disk count==0, skip, and
// leave the stale rows in main.*.
db.execute("INSERT INTO dst (id, name) VALUES (1, 'stale_a')");
db.execute("INSERT INTO dst (id, name) VALUES (2, 'stale_b')");
int src_cnt = db.return_one_int("SELECT COUNT(*) FROM src");
int dst_cnt_before = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(src_cnt == 0 && dst_cnt_before == 2,
"precondition: empty source, 2 stale rows in dest");
replace_table_contents(db, "dst", "src");
int dst_cnt_after = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(dst_cnt_after == 0,
"empty source overwrites stale dest (dest is empty after replace)");
}
int main() {
plan(33);
plan(35);
test_server_response_terminal_frame();
test_server_response_non_terminal_keeps_waiting();
@ -582,6 +632,7 @@ int main() {
test_connection_limit_config();
test_client_disconnect_detected();
test_forward_empty_frame();
test_empty_source_clears_stale_dest();
return exit_status();
}

Loading…
Cancel
Save