fix(mysqlx): Admin/module separation for runtime config tables

Previously, plugin-chassis stored authoritative mysqlx runtime state
inside admin-db tables (runtime_mysqlx_users / _routes / _backend_
endpoints / _variables). LOAD/SAVE commands shuffled rows between
mysqlx_<X> and runtime_mysqlx_<X> via plain INSERT ... SELECT, and
MysqlxConfigStore::load_from_runtime read the runtime_<X> tables back
out into its in-memory map. The data lived in three places (editable
mysqlx_<X>, persistent runtime_mysqlx_<X>, in-memory store) with no
detection of skew between them. Issue #5687.

The canonical pattern (mysql_users / GloMyAuth / runtime_mysql_users
in lib/ProxySQL_Admin.cpp::__refresh_users / save_mysql_users_runtime
_to_database) keeps Admin and the module strictly separated:

  Admin            owns the editable configuration table and
                   provides a runtime_<X> view of module state
  Module           owns runtime state in its own data structures
  runtime_<X>      is rebuilt on demand from module state, not
                   persistent storage

This commit restructures the mysqlx plugin to match.

# MysqlxConfigStore: per-entity install / save / project triplets

Replaces the monolithic load_from_runtime() with three independent
operations per entity (users / routes / endpoints / variables):

  install_<X>_from_admin(db, err)
      LOAD <X> TO RUNTIME path. SELECT the editable mysqlx_<X>
      table (and the cross-module runtime_mysql_users /
      runtime_mysql_servers projections where applicable), build a
      new local representation, atomically swap into the in-memory
      store under the store's mutex.

  save_<X>_to_admin_table(db)
      SAVE <X> [FROM RUNTIME] TO MEMORY path. Mirror save_mysql_
      users_runtime_to_database(false): mark all rows in mysqlx_<X>
      inactive, then upsert the live store contents with active=1.
      Inactive rows the operator deactivated but didn't delete are
      preserved.

  project_<X>_to_runtime_view(db)
      Runtime-view refresh path invoked by the chassis before any
      admin SELECT touches runtime_mysqlx_<X>. Mirror save_mysql_
      users_runtime_to_database(true): DELETE the projected table,
      then INSERT live store contents.

install_all_from_admin() is a convenience wrapper that runs all four
in sequence; production code calls the per-entity methods so each
LOAD command only touches its own slice of state, and unit tests
have a single entry point that exercises the whole pipeline.

# MysqlxConfigStore data-model additions

  - MysqlxResolvedIdentity gains `comment` (preserved through round-
    trip; the canonical mysql_users path also preserves comments).
  - MysqlxRoute gains `comment` for the same reason.
  - New public MysqlxBackendEndpointOverride struct (replaces the
    file-local MysqlxEndpointOverride that used to be in the .cpp).
  - New endpoint_overrides_ map: per-(hostname,mysql_port) overrides
    preserved verbatim across LOAD calls so SAVE can round-trip and
    so the runtime-view projection can faithfully reflect what was
    loaded. Previously these overrides were dropped after being
    folded into hostgroup_endpoints_.

# Plugin: register_runtime_view + rewritten LOAD/SAVE callbacks

In mysqlx_admin_schema.cpp:
  - Removes copy_table() and reload_config_store(); they were the
    INSERT...SELECT shovel between editable and runtime tables that
    encoded the architectural mistake.
  - Each load_<X>_to_runtime callback now calls install_<X>_from_
    admin(*ctx.admindb, err) and never touches runtime_mysqlx_<X>.
  - Each save_<X>_from_runtime callback now calls save_<X>_to_admin_
    table(*ctx.admindb) and never reads runtime_mysqlx_<X>.
  - rows_affected on LOAD now reports the active row count in the
    editable table (the source); on SAVE it reports the row count
    in the editable table after the dump (the destination).
  - Adds four refresh_<X>_runtime_view free functions and registers
    them via services.register_runtime_view() during schema
    registration. The chassis invokes these before any admin SELECT
    against the projected table.

In mysqlx_plugin.cpp::mysqlx_start():
  - Drops copy_to_runtime() entirely (the old "copy editable
    mysqlx_<X> to runtime_mysqlx_<X> at startup" step that no longer
    has a purpose).
  - Replaces the single load_from_runtime call with the four
    install_<X>_from_admin calls so a failure in one entity is
    surfaced individually in the log.
  - Keeps sync_disk_to_memory() unchanged. Disk-tier persistence is
    legitimate admin behaviour; only the runtime-tier copy was wrong.

# Net effect at the SQL level

  - INSERT INTO runtime_mysqlx_<X> ... no longer happens on any LOAD
    or SAVE. The only writes to runtime_mysqlx_<X> are the on-demand
    projections, triggered by the chassis pre-SELECT refresh path.
  - SELECT FROM runtime_mysqlx_<X> always reflects current
    MysqlxConfigStore state, even if the operator never ran LOAD
    since the last edit to mysqlx_<X>.
  - mysqlx_<X> remains the editable table the operator writes to.

The bug-stay-out-of-runtime contract documented in
include/ProxySQL_Plugin.h (the rewritten doc block) now matches
what this plugin actually does.
fix/mysqlx-runtime-views-separation
Rene Cannao 4 weeks ago
parent f42c3ee1ab
commit 9da7300afe

@ -34,6 +34,16 @@ struct MysqlxResolvedIdentity {
std::string backend_username {};
std::string backend_password {};
std::string attributes {};
std::string comment {};
};
struct MysqlxBackendEndpointOverride {
std::string hostname {};
int mysql_port { 0 };
int mysqlx_port { 33060 };
bool use_ssl { false };
std::string attributes {};
std::string comment {};
};
struct MysqlxRoute {
@ -44,6 +54,7 @@ struct MysqlxRoute {
std::string strategy { "first_available" };
bool active { true };
std::string attributes {};
std::string comment {};
};
struct MysqlxBackendEndpoint {
@ -61,14 +72,53 @@ public:
MysqlxConfigStore& operator=(const MysqlxConfigStore&) = delete;
~MysqlxConfigStore() = default;
bool load_from_runtime(SQLite3DB& db, std::string& err);
// Per-entity install: read the editable admin table(s), build a new
// local representation, atomically swap into the in-memory store
// under the store's own mutex. Each install is independent — LOAD
// MYSQLX USERS does not touch routes/endpoints/variables. Callers
// pass `db` (admin db) and receive a populated `err` on failure.
bool install_users_from_admin(SQLite3DB& db, std::string& err);
bool install_routes_from_admin(SQLite3DB& db, std::string& err);
bool install_endpoints_from_admin(SQLite3DB& db, std::string& err);
bool install_variables_from_admin(SQLite3DB& db, std::string& err);
// Convenience: invoke all four install_*_from_admin in sequence
// against the same db. Stops on the first failure (subsequent
// entities are NOT installed). Used by unit tests that exercise
// the full LOAD pipeline against a single in-memory SQLite fixture
// containing both the editable mysqlx_* tables and the cross-module
// runtime_mysql_users / runtime_mysql_servers. Production code
// calls the per-entity methods directly so each LOAD command only
// reloads its own slice of state.
bool install_all_from_admin(SQLite3DB& db, std::string& err);
// Per-entity SAVE: dump current in-memory state into the editable
// admin table (mysqlx_users / mysqlx_routes / etc.). Mirrors the
// canonical save_*_runtime_to_database(false) pattern: existing
// rows are marked inactive, then live rows from the store are
// upserted with active=1. Returns false on a fatal sqlite error.
bool save_users_to_admin_table(SQLite3DB& db) const;
bool save_routes_to_admin_table(SQLite3DB& db) const;
bool save_endpoints_to_admin_table(SQLite3DB& db) const;
bool save_variables_to_admin_table(SQLite3DB& db) const;
// Per-entity runtime-view projection: refill the runtime_mysqlx_*
// table from current in-memory state. Called by the chassis
// register_runtime_view() refresh callbacks before any admin SELECT
// against the projected table. Always wipes the destination first
// to ensure deletions in the store propagate to the view.
void project_users_to_runtime_view(SQLite3DB& db) const;
void project_routes_to_runtime_view(SQLite3DB& db) const;
void project_endpoints_to_runtime_view(SQLite3DB& db) const;
void project_variables_to_runtime_view(SQLite3DB& db) const;
std::optional<MysqlxResolvedIdentity> resolve_identity(const std::string& username) const;
MysqlxBackendEndpoint pick_endpoint(const std::string& route_name) const;
int route_hostgroup(const std::string& route_name) const;
bool route_exists(const std::string& route_name) const;
// Test-only: inject routes + hostgroup endpoints directly, bypassing
// the SQLite3DB-based load_from_runtime path. Not called by production code.
// the SQLite3DB-based install path. Not called by production code.
void install_for_test(
std::unordered_map<std::string, MysqlxRoute> routes,
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> endpoints);
@ -83,10 +133,16 @@ public:
private:
MysqlxBackendEndpoint pick_from_hostgroup(int hostgroup_id, const std::string& strategy) const;
void rebuild_hostgroup_endpoints_locked();
mutable std::shared_mutex mutex_ {};
std::unordered_map<std::string, MysqlxResolvedIdentity> identities_ {};
std::unordered_map<std::string, MysqlxRoute> routes_ {};
// Per-(hostname,mysql_port) overrides preserved verbatim from
// mysqlx_backend_endpoints. Survives across LOAD calls so SAVE can
// round-trip and so the runtime-view projection can faithfully
// reflect what was loaded. Indexed by "hostname:mysql_port".
std::unordered_map<std::string, MysqlxBackendEndpointOverride> endpoint_overrides_ {};
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> hostgroup_endpoints_ {};
mutable std::mutex rr_mutex_ {};
mutable std::unordered_map<int, uint32_t> rr_counters_ {};

@ -111,71 +111,33 @@ ProxySQL_PluginCommandResult command_failure(const char* message) {
return {1, 0, message != nullptr ? message : "mysqlx admin command failed"};
}
bool copy_table(SQLite3DB& db, const char* source_table, const char* runtime_table) {
if (!db.execute("BEGIN")) {
return false;
}
std::string delete_sql = "DELETE FROM ";
delete_sql += runtime_table;
if (!db.execute(delete_sql.c_str())) {
db.execute("ROLLBACK");
return false;
}
std::string insert_sql = "INSERT INTO ";
insert_sql += runtime_table;
insert_sql += " SELECT * FROM ";
insert_sql += source_table;
if (!db.execute(insert_sql.c_str())) {
db.execute("ROLLBACK");
return false;
}
if (!db.execute("COMMIT")) {
db.execute("ROLLBACK");
return false;
}
return true;
}
static void reload_config_store(SQLite3DB& admindb) {
std::string err;
if (!mysqlx_context().config_store->load_from_runtime(admindb, err)) {
if (mysqlx_context().services && mysqlx_context().services->log_message) {
mysqlx_context().services->log_message(3, err.c_str());
}
}
}
// LOAD <X> TO RUNTIME callbacks: read the editable mysqlx_<X> table
// directly into MysqlxConfigStore via the typed install API. Never
// touch runtime_mysqlx_<X> on this path -- that table is an admin-side
// view of module state, projected on demand by the registered
// runtime-view refresh callbacks below.
ProxySQL_PluginCommandResult load_users_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx users load requires admin db");
}
if (!copy_table(*ctx.admindb, kMysqlxUsersTable, kRuntimeMysqlxUsersTable)) {
return command_failure("failed to copy mysqlx users to runtime");
std::string err;
if (!mysqlx_context().config_store->install_users_from_admin(*ctx.admindb, err)) {
return command_failure(err.empty() ? "install_users_from_admin failed" : err.c_str());
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_users");
result.message = "mysqlx users loaded to runtime";
reload_config_store(*ctx.admindb);
return result;
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_users WHERE active=1");
return {0, row_count, "mysqlx users loaded to runtime"};
}
ProxySQL_PluginCommandResult load_routes_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx routes load requires admin db");
}
if (!copy_table(*ctx.admindb, kMysqlxRoutesTable, kRuntimeMysqlxRoutesTable)) {
return command_failure("failed to copy mysqlx routes to runtime");
std::string err;
if (!mysqlx_context().config_store->install_routes_from_admin(*ctx.admindb, err)) {
return command_failure(err.empty() ? "install_routes_from_admin failed" : err.c_str());
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_routes");
result.message = "mysqlx routes loaded to runtime";
reload_config_store(*ctx.admindb);
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_routes WHERE active=1");
// Propagate the new desired route set to the listener topology: bind new
// routes, close listeners for removed or deactivated routes. The symbol
// is weak so unit tests that don't link plugin.cpp can resolve cleanly;
@ -183,93 +145,105 @@ ProxySQL_PluginCommandResult load_routes_to_runtime(const ProxySQL_PluginCommand
if (mysqlx_reconcile_listeners) {
mysqlx_reconcile_listeners(*ctx.admindb);
}
return result;
return {0, row_count, "mysqlx routes loaded to runtime"};
}
ProxySQL_PluginCommandResult load_backend_endpoints_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx backend endpoints load requires admin db");
}
if (!copy_table(*ctx.admindb, kMysqlxBackendEndpointsTable, kRuntimeMysqlxBackendEndpointsTable)) {
return command_failure("failed to copy mysqlx backend endpoints to runtime");
std::string err;
if (!mysqlx_context().config_store->install_endpoints_from_admin(*ctx.admindb, err)) {
return command_failure(err.empty() ? "install_endpoints_from_admin failed" : err.c_str());
}
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_backend_endpoints");
return {0, row_count, "mysqlx backend endpoints loaded to runtime"};
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_backend_endpoints");
result.message = "mysqlx backend endpoints loaded to runtime";
reload_config_store(*ctx.admindb);
return result;
ProxySQL_PluginCommandResult load_variables_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx variables load requires admin db");
}
std::string err;
if (!mysqlx_context().config_store->install_variables_from_admin(*ctx.admindb, err)) {
return command_failure(err.empty() ? "install_variables_from_admin failed" : err.c_str());
}
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_variables");
return {0, row_count, "mysqlx variables loaded to runtime"};
}
// SAVE <X> [FROM RUNTIME] TO MEMORY callbacks: dump MysqlxConfigStore
// directly into the editable mysqlx_<X> table. Never read
// runtime_mysqlx_<X> on this path -- the source of truth is the
// in-memory module state, not a SQL view.
ProxySQL_PluginCommandResult save_users_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx users save requires admin db");
}
if (!copy_table(*ctx.admindb, kRuntimeMysqlxUsersTable, kMysqlxUsersTable)) {
return command_failure("failed to copy mysqlx users from runtime");
if (!mysqlx_context().config_store->save_users_to_admin_table(*ctx.admindb)) {
return command_failure("failed to save mysqlx users to memory");
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_users");
result.message = "mysqlx users saved from runtime";
return result;
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_users WHERE active=1");
return {0, row_count, "mysqlx users saved from runtime"};
}
ProxySQL_PluginCommandResult save_routes_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx routes save requires admin db");
}
if (!copy_table(*ctx.admindb, kRuntimeMysqlxRoutesTable, kMysqlxRoutesTable)) {
return command_failure("failed to copy mysqlx routes from runtime");
if (!mysqlx_context().config_store->save_routes_to_admin_table(*ctx.admindb)) {
return command_failure("failed to save mysqlx routes to memory");
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_routes");
result.message = "mysqlx routes saved from runtime";
return result;
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_routes WHERE active=1");
return {0, row_count, "mysqlx routes saved from runtime"};
}
ProxySQL_PluginCommandResult save_backend_endpoints_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx backend endpoints save requires admin db");
}
if (!copy_table(*ctx.admindb, kRuntimeMysqlxBackendEndpointsTable, kMysqlxBackendEndpointsTable)) {
return command_failure("failed to copy mysqlx backend endpoints from runtime");
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_backend_endpoints");
result.message = "mysqlx backend endpoints saved from runtime";
return result;
}
ProxySQL_PluginCommandResult load_variables_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx variables load requires admin db");
}
if (!copy_table(*ctx.admindb, kMysqlxVariablesTable, kRuntimeMysqlxVariablesTable)) {
return command_failure("failed to copy mysqlx variables to runtime");
if (!mysqlx_context().config_store->save_endpoints_to_admin_table(*ctx.admindb)) {
return command_failure("failed to save mysqlx backend endpoints to memory");
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_variables");
result.message = "mysqlx variables loaded to runtime";
reload_config_store(*ctx.admindb);
return result;
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_backend_endpoints");
return {0, row_count, "mysqlx backend endpoints saved from runtime"};
}
ProxySQL_PluginCommandResult save_variables_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) {
if (ctx.admindb == nullptr) {
return command_failure("mysqlx variables save requires admin db");
}
if (!copy_table(*ctx.admindb, kRuntimeMysqlxVariablesTable, kMysqlxVariablesTable)) {
return command_failure("failed to copy mysqlx variables from runtime");
if (!mysqlx_context().config_store->save_variables_to_admin_table(*ctx.admindb)) {
return command_failure("failed to save mysqlx variables to memory");
}
uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_variables");
return {0, row_count, "mysqlx variables saved from runtime"};
}
ProxySQL_PluginCommandResult result {0, 0, ""};
result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_variables");
result.message = "mysqlx variables saved from runtime";
return result;
// Runtime-view refresh callbacks invoked by the chassis before any
// admin SELECT against the projected runtime_mysqlx_<X> tables.
// `opaque` is unused here -- the global mysqlx_context() singleton
// owns the config store.
void refresh_users_runtime_view(SQLite3DB* admindb, void*) {
if (admindb == nullptr) return;
if (mysqlx_context().config_store == nullptr) return;
mysqlx_context().config_store->project_users_to_runtime_view(*admindb);
}
void refresh_routes_runtime_view(SQLite3DB* admindb, void*) {
if (admindb == nullptr) return;
if (mysqlx_context().config_store == nullptr) return;
mysqlx_context().config_store->project_routes_to_runtime_view(*admindb);
}
void refresh_endpoints_runtime_view(SQLite3DB* admindb, void*) {
if (admindb == nullptr) return;
if (mysqlx_context().config_store == nullptr) return;
mysqlx_context().config_store->project_endpoints_to_runtime_view(*admindb);
}
void refresh_variables_runtime_view(SQLite3DB* admindb, void*) {
if (admindb == nullptr) return;
if (mysqlx_context().config_store == nullptr) return;
mysqlx_context().config_store->project_variables_to_runtime_view(*admindb);
}
bool disk_to_memory(SQLite3DB& admindb, const char* table_name) {
@ -477,6 +451,18 @@ bool mysqlx_register_admin_schema(ProxySQL_PluginServices& services) {
register_table_pair(services, kMysqlxVariablesTable, kMysqlxVariablesTableDef);
register_runtime_table(services, kRuntimeMysqlxVariablesTable, kRuntimeMysqlxVariablesTableDef);
// Each runtime_mysqlx_<X> table is an admin-side projection of
// MysqlxConfigStore state, not a persistent admin table. The
// chassis invokes these refresh callbacks before any admin SELECT
// touches the matching table -- analogous to Admin's own
// save_mysql_users_runtime_to_database(true) refresh path.
if (services.register_runtime_view != nullptr) {
services.register_runtime_view({kRuntimeMysqlxUsersTable, &refresh_users_runtime_view, nullptr});
services.register_runtime_view({kRuntimeMysqlxRoutesTable, &refresh_routes_runtime_view, nullptr});
services.register_runtime_view({kRuntimeMysqlxBackendEndpointsTable, &refresh_endpoints_runtime_view, nullptr});
services.register_runtime_view({kRuntimeMysqlxVariablesTable, &refresh_variables_runtime_view, nullptr});
}
// Stats tables (stats_db only, no config copy needed).
{
ProxySQL_PluginTableDef stats_routes {

@ -9,12 +9,6 @@
namespace {
struct MysqlxEndpointOverride {
int mysqlx_port { 33060 };
bool use_ssl { false };
std::string attributes {};
};
std::string nullable_string(const char* value) {
return value != nullptr ? value : "";
}
@ -64,6 +58,19 @@ void load_canonical_users(
}
}
// merge_mysqlx_users:
// Field layout matches the SELECT in install_users_from_admin:
// 0 username
// 1 active -> x_enabled
// 2 require_tls
// 3 allowed_auth_methods
// 4 default_route
// 5 policy_profile
// 6 backend_auth_mode
// 7 backend_username
// 8 backend_password
// 9 attributes
// 10 comment
void merge_mysqlx_users(
SQLite3_result& rows,
std::unordered_map<std::string, MysqlxResolvedIdentity>& identities
@ -89,6 +96,7 @@ void merge_mysqlx_users(
identity.backend_username = nullable_string(row->fields[7]);
identity.backend_password = nullable_string(row->fields[8]);
identity.attributes = nullable_string(row->fields[9]);
identity.comment = nullable_string(row->fields[10]);
}
}
@ -109,32 +117,34 @@ void load_routes(
route.strategy = nullable_string(row->fields[4]);
route.active = nullable_bool(row->fields[5], true);
route.attributes = nullable_string(row->fields[6]);
route.comment = nullable_string(row->fields[7]);
routes[route.name] = std::move(route);
}
}
void load_endpoint_overrides(
SQLite3_result& rows,
std::unordered_map<std::string, MysqlxEndpointOverride>& overrides
std::unordered_map<std::string, MysqlxBackendEndpointOverride>& overrides
) {
for (auto* row : rows.rows) {
if (row == nullptr || row->fields[0] == nullptr) {
continue;
}
MysqlxEndpointOverride override {};
const std::string hostname = nullable_string(row->fields[0]);
const int mysql_port = nullable_int(row->fields[1]);
MysqlxBackendEndpointOverride override {};
override.hostname = nullable_string(row->fields[0]);
override.mysql_port = nullable_int(row->fields[1]);
override.mysqlx_port = nullable_int(row->fields[2], 33060);
override.use_ssl = nullable_bool(row->fields[3]);
override.attributes = nullable_string(row->fields[4]);
overrides[endpoint_key(hostname, mysql_port)] = std::move(override);
override.comment = nullable_string(row->fields[5]);
overrides[endpoint_key(override.hostname, override.mysql_port)] = std::move(override);
}
}
void load_backend_servers(
SQLite3_result& rows,
const std::unordered_map<std::string, MysqlxEndpointOverride>& overrides,
const std::unordered_map<std::string, MysqlxBackendEndpointOverride>& overrides,
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>>& hostgroup_endpoints
) {
for (auto* row : rows.rows) {
@ -184,6 +194,32 @@ void load_variables(
}
}
// SQLite text quoting for ad-hoc statement composition. The plugin
// already owns its rows (we just round-tripped them through admindb)
// so SQL-injection from operator data isn't a concern, but a single
// quote in a username or attribute would still corrupt the statement.
// Doubled-quote is the SQLite-canonical escape inside string literals.
std::string sqlite_quote(const std::string& s) {
std::string out;
out.reserve(s.size() + 2);
out.push_back('\'');
for (char c : s) {
out.push_back(c);
if (c == '\'') out.push_back('\'');
}
out.push_back('\'');
return out;
}
const char* backend_auth_mode_to_string(MysqlxBackendAuthMode m) {
switch (m) {
case MysqlxBackendAuthMode::pass_through: return "pass_through";
case MysqlxBackendAuthMode::service_account: return "service_account";
case MysqlxBackendAuthMode::mapped:
default: return "mapped";
}
}
} // namespace
MysqlxBackendAuthMode mysqlx_backend_auth_mode_from_string(const std::string& value) {
@ -196,15 +232,33 @@ MysqlxBackendAuthMode mysqlx_backend_auth_mode_from_string(const std::string& va
return MysqlxBackendAuthMode::mapped;
}
bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) {
// Exclusive lock — no readers while we swap the maps.
std::unique_lock<std::shared_mutex> lock(mutex_);
err.clear();
// install_users_from_admin
//
// Reads two admin-side tables and atomically swaps `identities_` under
// the store's own mutex:
//
// * runtime_mysql_users -- canonical user identity (password,
// default_hostgroup, max_connections). Cross-module dependency:
// this is admin's view of MySQL_Authentication state and may be
// stale unless the operator has run `LOAD MYSQL USERS TO RUNTIME`
// recently. We deliberately do not reach into GloMyAuth directly
// to keep the module isolated from admin's runtime state machinery
// -- the admin module's view-refresh scheme is the contract.
//
// * mysqlx_users -- the editable mysqlx-side override table the
// operator writes. We read it directly here (NOT runtime_mysqlx_
// users): the runtime view is owned and projected by THIS module,
// reading from the editable table is what makes mysqlx_users
// authoritative for the operator's intent.
//
// Identity rows for users that exist in mysqlx_users WHERE active=1
// but have no matching active=1, frontend=1 row in runtime_mysql_users
// are silently dropped: a mysqlx user with no canonical mysql identity
// has no password to authenticate against, so listing them in the
// store would only enable a misleading "user exists but auth always
// fails" path.
bool MysqlxConfigStore::install_users_from_admin(SQLite3DB& db, std::string& err) {
std::unordered_map<std::string, MysqlxResolvedIdentity> new_identities {};
std::unordered_map<std::string, MysqlxRoute> new_routes {};
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> new_hostgroup_endpoints {};
std::unordered_map<std::string, MysqlxEndpointOverride> endpoint_overrides {};
std::unique_ptr<SQLite3_result> result {};
if (!fetch_result(
@ -220,33 +274,76 @@ bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) {
if (!fetch_result(
db,
"SELECT username, active, require_tls, allowed_auth_methods, default_route, policy_profile, "
"backend_auth_mode, backend_username, backend_password, attributes "
"FROM runtime_mysqlx_users",
"backend_auth_mode, backend_username, backend_password, attributes, comment "
"FROM mysqlx_users WHERE active=1",
result,
err)) {
return false;
}
merge_mysqlx_users(*result, new_identities);
// Drop any canonical-only users (no mysqlx override row); they have
// no x_enabled flag so they wouldn't authenticate via X anyway.
for (auto it = new_identities.begin(); it != new_identities.end();) {
if (!it->second.x_enabled) {
it = new_identities.erase(it);
} else {
++it;
}
}
std::unique_lock<std::shared_mutex> lock(mutex_);
identities_.swap(new_identities);
return true;
}
bool MysqlxConfigStore::install_routes_from_admin(SQLite3DB& db, std::string& err) {
std::unordered_map<std::string, MysqlxRoute> new_routes {};
std::unique_ptr<SQLite3_result> result {};
if (!fetch_result(
db,
"SELECT name, bind, destination_hostgroup, fallback_hostgroup, strategy, active, attributes "
"FROM runtime_mysqlx_routes WHERE active=1",
"SELECT name, bind, destination_hostgroup, fallback_hostgroup, strategy, active, attributes, comment "
"FROM mysqlx_routes WHERE active=1",
result,
err)) {
return false;
}
load_routes(*result, new_routes);
std::unique_lock<std::shared_mutex> lock(mutex_);
routes_.swap(new_routes);
return true;
}
// install_endpoints_from_admin
//
// The endpoint store is the resolved per-hostgroup view used at route
// dispatch time. It is rebuilt from two inputs:
//
// * runtime_mysql_servers -- canonical hostgroup -> (hostname, port,
// use_ssl) topology, cross-module dependency on MySQL_HostGroups_
// Manager.
// * mysqlx_backend_endpoints -- per-(hostname,mysql_port) overrides
// the operator sets to expose mysqlx_port and force use_ssl.
//
// Both raw inputs are kept in `endpoint_overrides_` so a SAVE / view
// projection can faithfully round-trip; the resolved per-hostgroup
// view is what `pick_endpoint` ultimately uses.
bool MysqlxConfigStore::install_endpoints_from_admin(SQLite3DB& db, std::string& err) {
std::unordered_map<std::string, MysqlxBackendEndpointOverride> new_overrides {};
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> new_hostgroup_endpoints {};
std::unique_ptr<SQLite3_result> result {};
if (!fetch_result(
db,
"SELECT hostname, mysql_port, mysqlx_port, use_ssl, attributes "
"FROM runtime_mysqlx_backend_endpoints",
"SELECT hostname, mysql_port, mysqlx_port, use_ssl, attributes, comment "
"FROM mysqlx_backend_endpoints",
result,
err)) {
return false;
}
load_endpoint_overrides(*result, endpoint_overrides);
load_endpoint_overrides(*result, new_overrides);
if (!fetch_result(
db,
@ -257,25 +354,38 @@ bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) {
err)) {
return false;
}
load_backend_servers(*result, endpoint_overrides, new_hostgroup_endpoints);
load_backend_servers(*result, new_overrides, new_hostgroup_endpoints);
std::unique_lock<std::shared_mutex> lock(mutex_);
endpoint_overrides_.swap(new_overrides);
hostgroup_endpoints_.swap(new_hostgroup_endpoints);
return true;
}
bool MysqlxConfigStore::install_all_from_admin(SQLite3DB& db, std::string& err) {
return install_users_from_admin(db, err)
&& install_routes_from_admin(db, err)
&& install_endpoints_from_admin(db, err)
&& install_variables_from_admin(db, err);
}
bool MysqlxConfigStore::install_variables_from_admin(SQLite3DB& db, std::string& err) {
int new_pool_size = thread_pool_size_;
int new_connect_timeout = connect_timeout_;
std::string new_tls_mode = tls_mode_;
int new_max_cached = max_cached_connections_;
std::unique_ptr<SQLite3_result> result {};
if (!fetch_result(
db,
"SELECT variable_name, variable_value FROM runtime_mysqlx_variables",
"SELECT variable_name, variable_value FROM mysqlx_variables",
result,
err)) {
return false;
}
load_variables(*result, new_pool_size, new_connect_timeout, new_tls_mode, new_max_cached);
identities_.swap(new_identities);
routes_.swap(new_routes);
hostgroup_endpoints_.swap(new_hostgroup_endpoints);
std::unique_lock<std::shared_mutex> lock(mutex_);
thread_pool_size_ = new_pool_size;
connect_timeout_ = new_connect_timeout;
tls_mode_ = std::move(new_tls_mode);
@ -283,6 +393,228 @@ bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) {
return true;
}
// ===========================================================================
// SAVE_*_TO_ADMIN_TABLE: mirror the canonical save_mysql_users_runtime_to_
// database(false) shape -- mark all existing rows inactive, then upsert the
// live store contents with active=1. Inactive rows in the editable table
// are preserved so the operator's "deactivate but don't delete" workflow
// still works.
//
// PROJECT_*_TO_RUNTIME_VIEW: mirror save_mysql_users_runtime_to_database
// (true) -- DELETE the projected runtime_<table>, then INSERT the live
// store contents. Used by the chassis register_runtime_view() refresh
// callbacks before any admin SELECT against the projected table.
// ===========================================================================
bool MysqlxConfigStore::save_users_to_admin_table(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return false;
if (!db.execute("UPDATE mysqlx_users SET active=0")) {
db.execute("ROLLBACK");
return false;
}
for (const auto& [username, identity] : identities_) {
std::string sql = "REPLACE INTO mysqlx_users "
"(username, active, require_tls, allowed_auth_methods, default_route, "
"policy_profile, backend_auth_mode, backend_username, backend_password, "
"attributes, comment) VALUES (";
sql += sqlite_quote(identity.username) + ", 1, ";
sql += (identity.require_tls ? "1, " : "0, ");
sql += sqlite_quote(identity.allowed_auth_methods) + ", ";
sql += sqlite_quote(identity.default_route) + ", ";
sql += sqlite_quote(identity.policy_profile) + ", ";
sql += sqlite_quote(backend_auth_mode_to_string(identity.backend_auth_mode)) + ", ";
sql += sqlite_quote(identity.backend_username) + ", ";
sql += sqlite_quote(identity.backend_password) + ", ";
sql += sqlite_quote(identity.attributes) + ", ";
sql += sqlite_quote(identity.comment) + ")";
if (!db.execute(sql.c_str())) {
db.execute("ROLLBACK");
return false;
}
}
return db.execute("COMMIT");
}
bool MysqlxConfigStore::save_routes_to_admin_table(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return false;
if (!db.execute("UPDATE mysqlx_routes SET active=0")) {
db.execute("ROLLBACK");
return false;
}
for (const auto& [name, route] : routes_) {
std::string sql = "REPLACE INTO mysqlx_routes "
"(name, bind, destination_hostgroup, fallback_hostgroup, strategy, "
"active, attributes, comment) VALUES (";
sql += sqlite_quote(route.name) + ", ";
sql += sqlite_quote(route.bind) + ", ";
sql += std::to_string(route.destination_hostgroup) + ", ";
if (route.fallback_hostgroup >= 0) {
sql += std::to_string(route.fallback_hostgroup) + ", ";
} else {
sql += "NULL, ";
}
sql += sqlite_quote(route.strategy) + ", 1, ";
sql += sqlite_quote(route.attributes) + ", ";
sql += sqlite_quote(route.comment) + ")";
if (!db.execute(sql.c_str())) {
db.execute("ROLLBACK");
return false;
}
}
return db.execute("COMMIT");
}
bool MysqlxConfigStore::save_endpoints_to_admin_table(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return false;
if (!db.execute("DELETE FROM mysqlx_backend_endpoints")) {
db.execute("ROLLBACK");
return false;
}
for (const auto& [key, ov] : endpoint_overrides_) {
std::string sql = "INSERT INTO mysqlx_backend_endpoints "
"(hostname, mysql_port, mysqlx_port, use_ssl, attributes, comment) VALUES (";
sql += sqlite_quote(ov.hostname) + ", ";
sql += std::to_string(ov.mysql_port) + ", ";
sql += std::to_string(ov.mysqlx_port) + ", ";
sql += (ov.use_ssl ? "1, " : "0, ");
sql += sqlite_quote(ov.attributes) + ", ";
sql += sqlite_quote(ov.comment) + ")";
if (!db.execute(sql.c_str())) {
db.execute("ROLLBACK");
return false;
}
}
return db.execute("COMMIT");
}
bool MysqlxConfigStore::save_variables_to_admin_table(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return false;
if (!db.execute("DELETE FROM mysqlx_variables")) {
db.execute("ROLLBACK");
return false;
}
auto put = [&](const char* name, const std::string& value) -> bool {
std::string sql = "INSERT INTO mysqlx_variables (variable_name, variable_value) VALUES (";
sql += sqlite_quote(name) + ", " + sqlite_quote(value) + ")";
return db.execute(sql.c_str());
};
if (!put("mysqlx_thread_pool_size", std::to_string(thread_pool_size_))) {
db.execute("ROLLBACK"); return false;
}
if (!put("mysqlx_connect_timeout", std::to_string(connect_timeout_))) {
db.execute("ROLLBACK"); return false;
}
if (!put("mysqlx_tls_mode", tls_mode_)) {
db.execute("ROLLBACK"); return false;
}
if (!put("mysqlx_max_cached_connections_per_thread", std::to_string(max_cached_connections_))) {
db.execute("ROLLBACK"); return false;
}
return db.execute("COMMIT");
}
void MysqlxConfigStore::project_users_to_runtime_view(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return;
if (!db.execute("DELETE FROM runtime_mysqlx_users")) {
db.execute("ROLLBACK"); return;
}
for (const auto& [username, identity] : identities_) {
std::string sql = "INSERT INTO runtime_mysqlx_users "
"(username, active, require_tls, allowed_auth_methods, default_route, "
"policy_profile, backend_auth_mode, backend_username, backend_password, "
"attributes, comment) VALUES (";
sql += sqlite_quote(identity.username) + ", 1, ";
sql += (identity.require_tls ? "1, " : "0, ");
sql += sqlite_quote(identity.allowed_auth_methods) + ", ";
sql += sqlite_quote(identity.default_route) + ", ";
sql += sqlite_quote(identity.policy_profile) + ", ";
sql += sqlite_quote(backend_auth_mode_to_string(identity.backend_auth_mode)) + ", ";
sql += sqlite_quote(identity.backend_username) + ", ";
sql += sqlite_quote(identity.backend_password) + ", ";
sql += sqlite_quote(identity.attributes) + ", ";
sql += sqlite_quote(identity.comment) + ")";
if (!db.execute(sql.c_str())) {
db.execute("ROLLBACK"); return;
}
}
db.execute("COMMIT");
}
void MysqlxConfigStore::project_routes_to_runtime_view(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return;
if (!db.execute("DELETE FROM runtime_mysqlx_routes")) {
db.execute("ROLLBACK"); return;
}
for (const auto& [name, route] : routes_) {
std::string sql = "INSERT INTO runtime_mysqlx_routes "
"(name, bind, destination_hostgroup, fallback_hostgroup, strategy, "
"active, attributes, comment) VALUES (";
sql += sqlite_quote(route.name) + ", ";
sql += sqlite_quote(route.bind) + ", ";
sql += std::to_string(route.destination_hostgroup) + ", ";
if (route.fallback_hostgroup >= 0) {
sql += std::to_string(route.fallback_hostgroup) + ", ";
} else {
sql += "NULL, ";
}
sql += sqlite_quote(route.strategy) + ", 1, ";
sql += sqlite_quote(route.attributes) + ", ";
sql += sqlite_quote(route.comment) + ")";
if (!db.execute(sql.c_str())) {
db.execute("ROLLBACK"); return;
}
}
db.execute("COMMIT");
}
void MysqlxConfigStore::project_endpoints_to_runtime_view(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return;
if (!db.execute("DELETE FROM runtime_mysqlx_backend_endpoints")) {
db.execute("ROLLBACK"); return;
}
for (const auto& [key, ov] : endpoint_overrides_) {
std::string sql = "INSERT INTO runtime_mysqlx_backend_endpoints "
"(hostname, mysql_port, mysqlx_port, use_ssl, attributes, comment) VALUES (";
sql += sqlite_quote(ov.hostname) + ", ";
sql += std::to_string(ov.mysql_port) + ", ";
sql += std::to_string(ov.mysqlx_port) + ", ";
sql += (ov.use_ssl ? "1, " : "0, ");
sql += sqlite_quote(ov.attributes) + ", ";
sql += sqlite_quote(ov.comment) + ")";
if (!db.execute(sql.c_str())) {
db.execute("ROLLBACK"); return;
}
}
db.execute("COMMIT");
}
void MysqlxConfigStore::project_variables_to_runtime_view(SQLite3DB& db) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
if (!db.execute("BEGIN")) return;
if (!db.execute("DELETE FROM runtime_mysqlx_variables")) {
db.execute("ROLLBACK"); return;
}
auto put = [&](const char* name, const std::string& value) -> bool {
std::string sql = "INSERT INTO runtime_mysqlx_variables (variable_name, variable_value) VALUES (";
sql += sqlite_quote(name) + ", " + sqlite_quote(value) + ")";
return db.execute(sql.c_str());
};
if (!put("mysqlx_thread_pool_size", std::to_string(thread_pool_size_)) ||
!put("mysqlx_connect_timeout", std::to_string(connect_timeout_)) ||
!put("mysqlx_tls_mode", tls_mode_) ||
!put("mysqlx_max_cached_connections_per_thread", std::to_string(max_cached_connections_))) {
db.execute("ROLLBACK"); return;
}
db.execute("COMMIT");
}
std::optional<MysqlxResolvedIdentity> MysqlxConfigStore::resolve_identity(const std::string& username) const {
std::shared_lock<std::shared_mutex> lock(mutex_);
const auto it = identities_.find(username);

@ -141,41 +141,41 @@ bool sync_disk_to_memory(SQLite3DB& admindb, ProxySQL_PluginServices* services)
return all_ok;
}
bool copy_to_runtime(SQLite3DB& admindb, ProxySQL_PluginServices* services) {
const char* pairs[][2] = {
{"mysqlx_users", "runtime_mysqlx_users"},
{"mysqlx_routes", "runtime_mysqlx_routes"},
{"mysqlx_backend_endpoints", "runtime_mysqlx_backend_endpoints"},
{"mysqlx_variables", "runtime_mysqlx_variables"},
};
bool all_ok = true;
for (const auto& p : pairs) {
std::string dest = "main.";
dest += p[1];
std::string source = "main.";
source += p[0];
if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) {
all_ok = false;
}
}
return all_ok;
}
bool mysqlx_start() {
MysqlxPluginContext& ctx = mysqlx_context();
if (ctx.services != nullptr && ctx.services->get_admindb != nullptr) {
SQLite3DB* admindb = ctx.services->get_admindb();
if (admindb != nullptr) {
// Disk -> memory: keep the editable tables in sync with disk
// on startup, same as the canonical proxysql_admin path does
// for mysql_users / mysql_servers / etc. This is admin-tier
// persistence and is legitimate.
sync_disk_to_memory(*admindb, ctx.services);
copy_to_runtime(*admindb, ctx.services);
// Memory -> module: the editable mysqlx_* tables now drive
// the in-memory store directly. Each install_*_from_admin
// SELECTs the editable table (and the relevant cross-module
// runtime_mysql_* projections), builds a new local map, and
// atomically swaps it into MysqlxConfigStore. We deliberately
// do NOT replicate this data into runtime_mysqlx_* admin
// tables -- those are projected on demand via the chassis
// register_runtime_view() refresh callbacks declared below.
std::string err;
if (!ctx.config_store->load_from_runtime(*admindb, err)) {
auto report_err = [&](const char* what) {
if (ctx.services->log_message != nullptr) {
ctx.services->log_message(3, err.c_str());
std::string msg = "mysqlx: ";
msg += what;
msg += ": ";
msg += err;
ctx.services->log_message(3, msg.c_str());
}
}
err.clear();
};
if (!ctx.config_store->install_users_from_admin(*admindb, err)) report_err("install_users_from_admin failed");
if (!ctx.config_store->install_routes_from_admin(*admindb, err)) report_err("install_routes_from_admin failed");
if (!ctx.config_store->install_endpoints_from_admin(*admindb, err)) report_err("install_endpoints_from_admin failed");
if (!ctx.config_store->install_variables_from_admin(*admindb, err)) report_err("install_variables_from_admin failed");
}
}

Loading…
Cancel
Save