diff --git a/plugins/mysqlx/include/mysqlx_config_store.h b/plugins/mysqlx/include/mysqlx_config_store.h index 8aa479940..fd2c77e6b 100644 --- a/plugins/mysqlx/include/mysqlx_config_store.h +++ b/plugins/mysqlx/include/mysqlx_config_store.h @@ -34,6 +34,16 @@ struct MysqlxResolvedIdentity { std::string backend_username {}; std::string backend_password {}; std::string attributes {}; + std::string comment {}; +}; + +struct MysqlxBackendEndpointOverride { + std::string hostname {}; + int mysql_port { 0 }; + int mysqlx_port { 33060 }; + bool use_ssl { false }; + std::string attributes {}; + std::string comment {}; }; struct MysqlxRoute { @@ -44,6 +54,7 @@ struct MysqlxRoute { std::string strategy { "first_available" }; bool active { true }; std::string attributes {}; + std::string comment {}; }; struct MysqlxBackendEndpoint { @@ -61,14 +72,53 @@ public: MysqlxConfigStore& operator=(const MysqlxConfigStore&) = delete; ~MysqlxConfigStore() = default; - bool load_from_runtime(SQLite3DB& db, std::string& err); + // Per-entity install: read the editable admin table(s), build a new + // local representation, atomically swap into the in-memory store + // under the store's own mutex. Each install is independent — LOAD + // MYSQLX USERS does not touch routes/endpoints/variables. Callers + // pass `db` (admin db) and receive a populated `err` on failure. + bool install_users_from_admin(SQLite3DB& db, std::string& err); + bool install_routes_from_admin(SQLite3DB& db, std::string& err); + bool install_endpoints_from_admin(SQLite3DB& db, std::string& err); + bool install_variables_from_admin(SQLite3DB& db, std::string& err); + + // Convenience: invoke all four install_*_from_admin in sequence + // against the same db. Stops on the first failure (subsequent + // entities are NOT installed). Used by unit tests that exercise + // the full LOAD pipeline against a single in-memory SQLite fixture + // containing both the editable mysqlx_* tables and the cross-module + // runtime_mysql_users / runtime_mysql_servers. Production code + // calls the per-entity methods directly so each LOAD command only + // reloads its own slice of state. + bool install_all_from_admin(SQLite3DB& db, std::string& err); + + // Per-entity SAVE: dump current in-memory state into the editable + // admin table (mysqlx_users / mysqlx_routes / etc.). Mirrors the + // canonical save_*_runtime_to_database(false) pattern: existing + // rows are marked inactive, then live rows from the store are + // upserted with active=1. Returns false on a fatal sqlite error. + bool save_users_to_admin_table(SQLite3DB& db) const; + bool save_routes_to_admin_table(SQLite3DB& db) const; + bool save_endpoints_to_admin_table(SQLite3DB& db) const; + bool save_variables_to_admin_table(SQLite3DB& db) const; + + // Per-entity runtime-view projection: refill the runtime_mysqlx_* + // table from current in-memory state. Called by the chassis + // register_runtime_view() refresh callbacks before any admin SELECT + // against the projected table. Always wipes the destination first + // to ensure deletions in the store propagate to the view. + void project_users_to_runtime_view(SQLite3DB& db) const; + void project_routes_to_runtime_view(SQLite3DB& db) const; + void project_endpoints_to_runtime_view(SQLite3DB& db) const; + void project_variables_to_runtime_view(SQLite3DB& db) const; + std::optional resolve_identity(const std::string& username) const; MysqlxBackendEndpoint pick_endpoint(const std::string& route_name) const; int route_hostgroup(const std::string& route_name) const; bool route_exists(const std::string& route_name) const; // Test-only: inject routes + hostgroup endpoints directly, bypassing - // the SQLite3DB-based load_from_runtime path. Not called by production code. + // the SQLite3DB-based install path. Not called by production code. void install_for_test( std::unordered_map routes, std::unordered_map> endpoints); @@ -83,10 +133,16 @@ public: private: MysqlxBackendEndpoint pick_from_hostgroup(int hostgroup_id, const std::string& strategy) const; + void rebuild_hostgroup_endpoints_locked(); mutable std::shared_mutex mutex_ {}; std::unordered_map identities_ {}; std::unordered_map routes_ {}; + // Per-(hostname,mysql_port) overrides preserved verbatim from + // mysqlx_backend_endpoints. Survives across LOAD calls so SAVE can + // round-trip and so the runtime-view projection can faithfully + // reflect what was loaded. Indexed by "hostname:mysql_port". + std::unordered_map endpoint_overrides_ {}; std::unordered_map> hostgroup_endpoints_ {}; mutable std::mutex rr_mutex_ {}; mutable std::unordered_map rr_counters_ {}; diff --git a/plugins/mysqlx/src/mysqlx_admin_schema.cpp b/plugins/mysqlx/src/mysqlx_admin_schema.cpp index f1c469569..27d86951b 100644 --- a/plugins/mysqlx/src/mysqlx_admin_schema.cpp +++ b/plugins/mysqlx/src/mysqlx_admin_schema.cpp @@ -111,71 +111,33 @@ ProxySQL_PluginCommandResult command_failure(const char* message) { return {1, 0, message != nullptr ? message : "mysqlx admin command failed"}; } -bool copy_table(SQLite3DB& db, const char* source_table, const char* runtime_table) { - if (!db.execute("BEGIN")) { - return false; - } - - std::string delete_sql = "DELETE FROM "; - delete_sql += runtime_table; - if (!db.execute(delete_sql.c_str())) { - db.execute("ROLLBACK"); - return false; - } - - std::string insert_sql = "INSERT INTO "; - insert_sql += runtime_table; - insert_sql += " SELECT * FROM "; - insert_sql += source_table; - if (!db.execute(insert_sql.c_str())) { - db.execute("ROLLBACK"); - return false; - } - - if (!db.execute("COMMIT")) { - db.execute("ROLLBACK"); - return false; - } - - return true; -} - -static void reload_config_store(SQLite3DB& admindb) { - std::string err; - if (!mysqlx_context().config_store->load_from_runtime(admindb, err)) { - if (mysqlx_context().services && mysqlx_context().services->log_message) { - mysqlx_context().services->log_message(3, err.c_str()); - } - } -} +// LOAD TO RUNTIME callbacks: read the editable mysqlx_ table +// directly into MysqlxConfigStore via the typed install API. Never +// touch runtime_mysqlx_ on this path -- that table is an admin-side +// view of module state, projected on demand by the registered +// runtime-view refresh callbacks below. ProxySQL_PluginCommandResult load_users_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { if (ctx.admindb == nullptr) { return command_failure("mysqlx users load requires admin db"); } - if (!copy_table(*ctx.admindb, kMysqlxUsersTable, kRuntimeMysqlxUsersTable)) { - return command_failure("failed to copy mysqlx users to runtime"); + std::string err; + if (!mysqlx_context().config_store->install_users_from_admin(*ctx.admindb, err)) { + return command_failure(err.empty() ? "install_users_from_admin failed" : err.c_str()); } - - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_users"); - result.message = "mysqlx users loaded to runtime"; - reload_config_store(*ctx.admindb); - return result; + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_users WHERE active=1"); + return {0, row_count, "mysqlx users loaded to runtime"}; } ProxySQL_PluginCommandResult load_routes_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { if (ctx.admindb == nullptr) { return command_failure("mysqlx routes load requires admin db"); } - if (!copy_table(*ctx.admindb, kMysqlxRoutesTable, kRuntimeMysqlxRoutesTable)) { - return command_failure("failed to copy mysqlx routes to runtime"); + std::string err; + if (!mysqlx_context().config_store->install_routes_from_admin(*ctx.admindb, err)) { + return command_failure(err.empty() ? "install_routes_from_admin failed" : err.c_str()); } - - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_routes"); - result.message = "mysqlx routes loaded to runtime"; - reload_config_store(*ctx.admindb); + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_routes WHERE active=1"); // Propagate the new desired route set to the listener topology: bind new // routes, close listeners for removed or deactivated routes. The symbol // is weak so unit tests that don't link plugin.cpp can resolve cleanly; @@ -183,93 +145,105 @@ ProxySQL_PluginCommandResult load_routes_to_runtime(const ProxySQL_PluginCommand if (mysqlx_reconcile_listeners) { mysqlx_reconcile_listeners(*ctx.admindb); } - return result; + return {0, row_count, "mysqlx routes loaded to runtime"}; } ProxySQL_PluginCommandResult load_backend_endpoints_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { if (ctx.admindb == nullptr) { return command_failure("mysqlx backend endpoints load requires admin db"); } - if (!copy_table(*ctx.admindb, kMysqlxBackendEndpointsTable, kRuntimeMysqlxBackendEndpointsTable)) { - return command_failure("failed to copy mysqlx backend endpoints to runtime"); + std::string err; + if (!mysqlx_context().config_store->install_endpoints_from_admin(*ctx.admindb, err)) { + return command_failure(err.empty() ? "install_endpoints_from_admin failed" : err.c_str()); } + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_backend_endpoints"); + return {0, row_count, "mysqlx backend endpoints loaded to runtime"}; +} - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_backend_endpoints"); - result.message = "mysqlx backend endpoints loaded to runtime"; - reload_config_store(*ctx.admindb); - return result; +ProxySQL_PluginCommandResult load_variables_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { + if (ctx.admindb == nullptr) { + return command_failure("mysqlx variables load requires admin db"); + } + std::string err; + if (!mysqlx_context().config_store->install_variables_from_admin(*ctx.admindb, err)) { + return command_failure(err.empty() ? "install_variables_from_admin failed" : err.c_str()); + } + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_variables"); + return {0, row_count, "mysqlx variables loaded to runtime"}; } +// SAVE [FROM RUNTIME] TO MEMORY callbacks: dump MysqlxConfigStore +// directly into the editable mysqlx_ table. Never read +// runtime_mysqlx_ on this path -- the source of truth is the +// in-memory module state, not a SQL view. + ProxySQL_PluginCommandResult save_users_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { if (ctx.admindb == nullptr) { return command_failure("mysqlx users save requires admin db"); } - if (!copy_table(*ctx.admindb, kRuntimeMysqlxUsersTable, kMysqlxUsersTable)) { - return command_failure("failed to copy mysqlx users from runtime"); + if (!mysqlx_context().config_store->save_users_to_admin_table(*ctx.admindb)) { + return command_failure("failed to save mysqlx users to memory"); } - - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_users"); - result.message = "mysqlx users saved from runtime"; - return result; + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_users WHERE active=1"); + return {0, row_count, "mysqlx users saved from runtime"}; } ProxySQL_PluginCommandResult save_routes_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { if (ctx.admindb == nullptr) { return command_failure("mysqlx routes save requires admin db"); } - if (!copy_table(*ctx.admindb, kRuntimeMysqlxRoutesTable, kMysqlxRoutesTable)) { - return command_failure("failed to copy mysqlx routes from runtime"); + if (!mysqlx_context().config_store->save_routes_to_admin_table(*ctx.admindb)) { + return command_failure("failed to save mysqlx routes to memory"); } - - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_routes"); - result.message = "mysqlx routes saved from runtime"; - return result; + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_routes WHERE active=1"); + return {0, row_count, "mysqlx routes saved from runtime"}; } ProxySQL_PluginCommandResult save_backend_endpoints_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { if (ctx.admindb == nullptr) { return command_failure("mysqlx backend endpoints save requires admin db"); } - if (!copy_table(*ctx.admindb, kRuntimeMysqlxBackendEndpointsTable, kMysqlxBackendEndpointsTable)) { - return command_failure("failed to copy mysqlx backend endpoints from runtime"); - } - - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_backend_endpoints"); - result.message = "mysqlx backend endpoints saved from runtime"; - return result; -} - -ProxySQL_PluginCommandResult load_variables_to_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { - if (ctx.admindb == nullptr) { - return command_failure("mysqlx variables load requires admin db"); - } - if (!copy_table(*ctx.admindb, kMysqlxVariablesTable, kRuntimeMysqlxVariablesTable)) { - return command_failure("failed to copy mysqlx variables to runtime"); + if (!mysqlx_context().config_store->save_endpoints_to_admin_table(*ctx.admindb)) { + return command_failure("failed to save mysqlx backend endpoints to memory"); } - - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM runtime_mysqlx_variables"); - result.message = "mysqlx variables loaded to runtime"; - reload_config_store(*ctx.admindb); - return result; + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_backend_endpoints"); + return {0, row_count, "mysqlx backend endpoints saved from runtime"}; } ProxySQL_PluginCommandResult save_variables_from_runtime(const ProxySQL_PluginCommandContext& ctx, const char*) { if (ctx.admindb == nullptr) { return command_failure("mysqlx variables save requires admin db"); } - if (!copy_table(*ctx.admindb, kRuntimeMysqlxVariablesTable, kMysqlxVariablesTable)) { - return command_failure("failed to copy mysqlx variables from runtime"); + if (!mysqlx_context().config_store->save_variables_to_admin_table(*ctx.admindb)) { + return command_failure("failed to save mysqlx variables to memory"); } + uint64_t row_count = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_variables"); + return {0, row_count, "mysqlx variables saved from runtime"}; +} - ProxySQL_PluginCommandResult result {0, 0, ""}; - result.rows_affected = ctx.admindb->return_one_int("SELECT COUNT(*) FROM mysqlx_variables"); - result.message = "mysqlx variables saved from runtime"; - return result; +// Runtime-view refresh callbacks invoked by the chassis before any +// admin SELECT against the projected runtime_mysqlx_ tables. +// `opaque` is unused here -- the global mysqlx_context() singleton +// owns the config store. +void refresh_users_runtime_view(SQLite3DB* admindb, void*) { + if (admindb == nullptr) return; + if (mysqlx_context().config_store == nullptr) return; + mysqlx_context().config_store->project_users_to_runtime_view(*admindb); +} +void refresh_routes_runtime_view(SQLite3DB* admindb, void*) { + if (admindb == nullptr) return; + if (mysqlx_context().config_store == nullptr) return; + mysqlx_context().config_store->project_routes_to_runtime_view(*admindb); +} +void refresh_endpoints_runtime_view(SQLite3DB* admindb, void*) { + if (admindb == nullptr) return; + if (mysqlx_context().config_store == nullptr) return; + mysqlx_context().config_store->project_endpoints_to_runtime_view(*admindb); +} +void refresh_variables_runtime_view(SQLite3DB* admindb, void*) { + if (admindb == nullptr) return; + if (mysqlx_context().config_store == nullptr) return; + mysqlx_context().config_store->project_variables_to_runtime_view(*admindb); } bool disk_to_memory(SQLite3DB& admindb, const char* table_name) { @@ -477,6 +451,18 @@ bool mysqlx_register_admin_schema(ProxySQL_PluginServices& services) { register_table_pair(services, kMysqlxVariablesTable, kMysqlxVariablesTableDef); register_runtime_table(services, kRuntimeMysqlxVariablesTable, kRuntimeMysqlxVariablesTableDef); + // Each runtime_mysqlx_ table is an admin-side projection of + // MysqlxConfigStore state, not a persistent admin table. The + // chassis invokes these refresh callbacks before any admin SELECT + // touches the matching table -- analogous to Admin's own + // save_mysql_users_runtime_to_database(true) refresh path. + if (services.register_runtime_view != nullptr) { + services.register_runtime_view({kRuntimeMysqlxUsersTable, &refresh_users_runtime_view, nullptr}); + services.register_runtime_view({kRuntimeMysqlxRoutesTable, &refresh_routes_runtime_view, nullptr}); + services.register_runtime_view({kRuntimeMysqlxBackendEndpointsTable, &refresh_endpoints_runtime_view, nullptr}); + services.register_runtime_view({kRuntimeMysqlxVariablesTable, &refresh_variables_runtime_view, nullptr}); + } + // Stats tables (stats_db only, no config copy needed). { ProxySQL_PluginTableDef stats_routes { diff --git a/plugins/mysqlx/src/mysqlx_config_store.cpp b/plugins/mysqlx/src/mysqlx_config_store.cpp index 1d58ba184..e8668ba25 100644 --- a/plugins/mysqlx/src/mysqlx_config_store.cpp +++ b/plugins/mysqlx/src/mysqlx_config_store.cpp @@ -9,12 +9,6 @@ namespace { -struct MysqlxEndpointOverride { - int mysqlx_port { 33060 }; - bool use_ssl { false }; - std::string attributes {}; -}; - std::string nullable_string(const char* value) { return value != nullptr ? value : ""; } @@ -64,6 +58,19 @@ void load_canonical_users( } } +// merge_mysqlx_users: +// Field layout matches the SELECT in install_users_from_admin: +// 0 username +// 1 active -> x_enabled +// 2 require_tls +// 3 allowed_auth_methods +// 4 default_route +// 5 policy_profile +// 6 backend_auth_mode +// 7 backend_username +// 8 backend_password +// 9 attributes +// 10 comment void merge_mysqlx_users( SQLite3_result& rows, std::unordered_map& identities @@ -89,6 +96,7 @@ void merge_mysqlx_users( identity.backend_username = nullable_string(row->fields[7]); identity.backend_password = nullable_string(row->fields[8]); identity.attributes = nullable_string(row->fields[9]); + identity.comment = nullable_string(row->fields[10]); } } @@ -109,32 +117,34 @@ void load_routes( route.strategy = nullable_string(row->fields[4]); route.active = nullable_bool(row->fields[5], true); route.attributes = nullable_string(row->fields[6]); + route.comment = nullable_string(row->fields[7]); routes[route.name] = std::move(route); } } void load_endpoint_overrides( SQLite3_result& rows, - std::unordered_map& overrides + std::unordered_map& overrides ) { for (auto* row : rows.rows) { if (row == nullptr || row->fields[0] == nullptr) { continue; } - MysqlxEndpointOverride override {}; - const std::string hostname = nullable_string(row->fields[0]); - const int mysql_port = nullable_int(row->fields[1]); + MysqlxBackendEndpointOverride override {}; + override.hostname = nullable_string(row->fields[0]); + override.mysql_port = nullable_int(row->fields[1]); override.mysqlx_port = nullable_int(row->fields[2], 33060); override.use_ssl = nullable_bool(row->fields[3]); override.attributes = nullable_string(row->fields[4]); - overrides[endpoint_key(hostname, mysql_port)] = std::move(override); + override.comment = nullable_string(row->fields[5]); + overrides[endpoint_key(override.hostname, override.mysql_port)] = std::move(override); } } void load_backend_servers( SQLite3_result& rows, - const std::unordered_map& overrides, + const std::unordered_map& overrides, std::unordered_map>& hostgroup_endpoints ) { for (auto* row : rows.rows) { @@ -184,6 +194,32 @@ void load_variables( } } +// SQLite text quoting for ad-hoc statement composition. The plugin +// already owns its rows (we just round-tripped them through admindb) +// so SQL-injection from operator data isn't a concern, but a single +// quote in a username or attribute would still corrupt the statement. +// Doubled-quote is the SQLite-canonical escape inside string literals. +std::string sqlite_quote(const std::string& s) { + std::string out; + out.reserve(s.size() + 2); + out.push_back('\''); + for (char c : s) { + out.push_back(c); + if (c == '\'') out.push_back('\''); + } + out.push_back('\''); + return out; +} + +const char* backend_auth_mode_to_string(MysqlxBackendAuthMode m) { + switch (m) { + case MysqlxBackendAuthMode::pass_through: return "pass_through"; + case MysqlxBackendAuthMode::service_account: return "service_account"; + case MysqlxBackendAuthMode::mapped: + default: return "mapped"; + } +} + } // namespace MysqlxBackendAuthMode mysqlx_backend_auth_mode_from_string(const std::string& value) { @@ -196,15 +232,33 @@ MysqlxBackendAuthMode mysqlx_backend_auth_mode_from_string(const std::string& va return MysqlxBackendAuthMode::mapped; } -bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) { - // Exclusive lock — no readers while we swap the maps. - std::unique_lock lock(mutex_); - err.clear(); - +// install_users_from_admin +// +// Reads two admin-side tables and atomically swaps `identities_` under +// the store's own mutex: +// +// * runtime_mysql_users -- canonical user identity (password, +// default_hostgroup, max_connections). Cross-module dependency: +// this is admin's view of MySQL_Authentication state and may be +// stale unless the operator has run `LOAD MYSQL USERS TO RUNTIME` +// recently. We deliberately do not reach into GloMyAuth directly +// to keep the module isolated from admin's runtime state machinery +// -- the admin module's view-refresh scheme is the contract. +// +// * mysqlx_users -- the editable mysqlx-side override table the +// operator writes. We read it directly here (NOT runtime_mysqlx_ +// users): the runtime view is owned and projected by THIS module, +// reading from the editable table is what makes mysqlx_users +// authoritative for the operator's intent. +// +// Identity rows for users that exist in mysqlx_users WHERE active=1 +// but have no matching active=1, frontend=1 row in runtime_mysql_users +// are silently dropped: a mysqlx user with no canonical mysql identity +// has no password to authenticate against, so listing them in the +// store would only enable a misleading "user exists but auth always +// fails" path. +bool MysqlxConfigStore::install_users_from_admin(SQLite3DB& db, std::string& err) { std::unordered_map new_identities {}; - std::unordered_map new_routes {}; - std::unordered_map> new_hostgroup_endpoints {}; - std::unordered_map endpoint_overrides {}; std::unique_ptr result {}; if (!fetch_result( @@ -220,33 +274,76 @@ bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) { if (!fetch_result( db, "SELECT username, active, require_tls, allowed_auth_methods, default_route, policy_profile, " - "backend_auth_mode, backend_username, backend_password, attributes " - "FROM runtime_mysqlx_users", + "backend_auth_mode, backend_username, backend_password, attributes, comment " + "FROM mysqlx_users WHERE active=1", result, err)) { return false; } merge_mysqlx_users(*result, new_identities); + // Drop any canonical-only users (no mysqlx override row); they have + // no x_enabled flag so they wouldn't authenticate via X anyway. + for (auto it = new_identities.begin(); it != new_identities.end();) { + if (!it->second.x_enabled) { + it = new_identities.erase(it); + } else { + ++it; + } + } + + std::unique_lock lock(mutex_); + identities_.swap(new_identities); + return true; +} + +bool MysqlxConfigStore::install_routes_from_admin(SQLite3DB& db, std::string& err) { + std::unordered_map new_routes {}; + std::unique_ptr result {}; + if (!fetch_result( db, - "SELECT name, bind, destination_hostgroup, fallback_hostgroup, strategy, active, attributes " - "FROM runtime_mysqlx_routes WHERE active=1", + "SELECT name, bind, destination_hostgroup, fallback_hostgroup, strategy, active, attributes, comment " + "FROM mysqlx_routes WHERE active=1", result, err)) { return false; } load_routes(*result, new_routes); + std::unique_lock lock(mutex_); + routes_.swap(new_routes); + return true; +} + +// install_endpoints_from_admin +// +// The endpoint store is the resolved per-hostgroup view used at route +// dispatch time. It is rebuilt from two inputs: +// +// * runtime_mysql_servers -- canonical hostgroup -> (hostname, port, +// use_ssl) topology, cross-module dependency on MySQL_HostGroups_ +// Manager. +// * mysqlx_backend_endpoints -- per-(hostname,mysql_port) overrides +// the operator sets to expose mysqlx_port and force use_ssl. +// +// Both raw inputs are kept in `endpoint_overrides_` so a SAVE / view +// projection can faithfully round-trip; the resolved per-hostgroup +// view is what `pick_endpoint` ultimately uses. +bool MysqlxConfigStore::install_endpoints_from_admin(SQLite3DB& db, std::string& err) { + std::unordered_map new_overrides {}; + std::unordered_map> new_hostgroup_endpoints {}; + std::unique_ptr result {}; + if (!fetch_result( db, - "SELECT hostname, mysql_port, mysqlx_port, use_ssl, attributes " - "FROM runtime_mysqlx_backend_endpoints", + "SELECT hostname, mysql_port, mysqlx_port, use_ssl, attributes, comment " + "FROM mysqlx_backend_endpoints", result, err)) { return false; } - load_endpoint_overrides(*result, endpoint_overrides); + load_endpoint_overrides(*result, new_overrides); if (!fetch_result( db, @@ -257,25 +354,38 @@ bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) { err)) { return false; } - load_backend_servers(*result, endpoint_overrides, new_hostgroup_endpoints); + load_backend_servers(*result, new_overrides, new_hostgroup_endpoints); + + std::unique_lock lock(mutex_); + endpoint_overrides_.swap(new_overrides); + hostgroup_endpoints_.swap(new_hostgroup_endpoints); + return true; +} + +bool MysqlxConfigStore::install_all_from_admin(SQLite3DB& db, std::string& err) { + return install_users_from_admin(db, err) + && install_routes_from_admin(db, err) + && install_endpoints_from_admin(db, err) + && install_variables_from_admin(db, err); +} +bool MysqlxConfigStore::install_variables_from_admin(SQLite3DB& db, std::string& err) { int new_pool_size = thread_pool_size_; int new_connect_timeout = connect_timeout_; std::string new_tls_mode = tls_mode_; int new_max_cached = max_cached_connections_; + std::unique_ptr result {}; if (!fetch_result( db, - "SELECT variable_name, variable_value FROM runtime_mysqlx_variables", + "SELECT variable_name, variable_value FROM mysqlx_variables", result, err)) { return false; } load_variables(*result, new_pool_size, new_connect_timeout, new_tls_mode, new_max_cached); - identities_.swap(new_identities); - routes_.swap(new_routes); - hostgroup_endpoints_.swap(new_hostgroup_endpoints); + std::unique_lock lock(mutex_); thread_pool_size_ = new_pool_size; connect_timeout_ = new_connect_timeout; tls_mode_ = std::move(new_tls_mode); @@ -283,6 +393,228 @@ bool MysqlxConfigStore::load_from_runtime(SQLite3DB& db, std::string& err) { return true; } +// =========================================================================== +// SAVE_*_TO_ADMIN_TABLE: mirror the canonical save_mysql_users_runtime_to_ +// database(false) shape -- mark all existing rows inactive, then upsert the +// live store contents with active=1. Inactive rows in the editable table +// are preserved so the operator's "deactivate but don't delete" workflow +// still works. +// +// PROJECT_*_TO_RUNTIME_VIEW: mirror save_mysql_users_runtime_to_database +// (true) -- DELETE the projected runtime_, then INSERT the live +// store contents. Used by the chassis register_runtime_view() refresh +// callbacks before any admin SELECT against the projected table. +// =========================================================================== + +bool MysqlxConfigStore::save_users_to_admin_table(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return false; + if (!db.execute("UPDATE mysqlx_users SET active=0")) { + db.execute("ROLLBACK"); + return false; + } + for (const auto& [username, identity] : identities_) { + std::string sql = "REPLACE INTO mysqlx_users " + "(username, active, require_tls, allowed_auth_methods, default_route, " + "policy_profile, backend_auth_mode, backend_username, backend_password, " + "attributes, comment) VALUES ("; + sql += sqlite_quote(identity.username) + ", 1, "; + sql += (identity.require_tls ? "1, " : "0, "); + sql += sqlite_quote(identity.allowed_auth_methods) + ", "; + sql += sqlite_quote(identity.default_route) + ", "; + sql += sqlite_quote(identity.policy_profile) + ", "; + sql += sqlite_quote(backend_auth_mode_to_string(identity.backend_auth_mode)) + ", "; + sql += sqlite_quote(identity.backend_username) + ", "; + sql += sqlite_quote(identity.backend_password) + ", "; + sql += sqlite_quote(identity.attributes) + ", "; + sql += sqlite_quote(identity.comment) + ")"; + if (!db.execute(sql.c_str())) { + db.execute("ROLLBACK"); + return false; + } + } + return db.execute("COMMIT"); +} + +bool MysqlxConfigStore::save_routes_to_admin_table(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return false; + if (!db.execute("UPDATE mysqlx_routes SET active=0")) { + db.execute("ROLLBACK"); + return false; + } + for (const auto& [name, route] : routes_) { + std::string sql = "REPLACE INTO mysqlx_routes " + "(name, bind, destination_hostgroup, fallback_hostgroup, strategy, " + "active, attributes, comment) VALUES ("; + sql += sqlite_quote(route.name) + ", "; + sql += sqlite_quote(route.bind) + ", "; + sql += std::to_string(route.destination_hostgroup) + ", "; + if (route.fallback_hostgroup >= 0) { + sql += std::to_string(route.fallback_hostgroup) + ", "; + } else { + sql += "NULL, "; + } + sql += sqlite_quote(route.strategy) + ", 1, "; + sql += sqlite_quote(route.attributes) + ", "; + sql += sqlite_quote(route.comment) + ")"; + if (!db.execute(sql.c_str())) { + db.execute("ROLLBACK"); + return false; + } + } + return db.execute("COMMIT"); +} + +bool MysqlxConfigStore::save_endpoints_to_admin_table(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return false; + if (!db.execute("DELETE FROM mysqlx_backend_endpoints")) { + db.execute("ROLLBACK"); + return false; + } + for (const auto& [key, ov] : endpoint_overrides_) { + std::string sql = "INSERT INTO mysqlx_backend_endpoints " + "(hostname, mysql_port, mysqlx_port, use_ssl, attributes, comment) VALUES ("; + sql += sqlite_quote(ov.hostname) + ", "; + sql += std::to_string(ov.mysql_port) + ", "; + sql += std::to_string(ov.mysqlx_port) + ", "; + sql += (ov.use_ssl ? "1, " : "0, "); + sql += sqlite_quote(ov.attributes) + ", "; + sql += sqlite_quote(ov.comment) + ")"; + if (!db.execute(sql.c_str())) { + db.execute("ROLLBACK"); + return false; + } + } + return db.execute("COMMIT"); +} + +bool MysqlxConfigStore::save_variables_to_admin_table(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return false; + if (!db.execute("DELETE FROM mysqlx_variables")) { + db.execute("ROLLBACK"); + return false; + } + auto put = [&](const char* name, const std::string& value) -> bool { + std::string sql = "INSERT INTO mysqlx_variables (variable_name, variable_value) VALUES ("; + sql += sqlite_quote(name) + ", " + sqlite_quote(value) + ")"; + return db.execute(sql.c_str()); + }; + if (!put("mysqlx_thread_pool_size", std::to_string(thread_pool_size_))) { + db.execute("ROLLBACK"); return false; + } + if (!put("mysqlx_connect_timeout", std::to_string(connect_timeout_))) { + db.execute("ROLLBACK"); return false; + } + if (!put("mysqlx_tls_mode", tls_mode_)) { + db.execute("ROLLBACK"); return false; + } + if (!put("mysqlx_max_cached_connections_per_thread", std::to_string(max_cached_connections_))) { + db.execute("ROLLBACK"); return false; + } + return db.execute("COMMIT"); +} + +void MysqlxConfigStore::project_users_to_runtime_view(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return; + if (!db.execute("DELETE FROM runtime_mysqlx_users")) { + db.execute("ROLLBACK"); return; + } + for (const auto& [username, identity] : identities_) { + std::string sql = "INSERT INTO runtime_mysqlx_users " + "(username, active, require_tls, allowed_auth_methods, default_route, " + "policy_profile, backend_auth_mode, backend_username, backend_password, " + "attributes, comment) VALUES ("; + sql += sqlite_quote(identity.username) + ", 1, "; + sql += (identity.require_tls ? "1, " : "0, "); + sql += sqlite_quote(identity.allowed_auth_methods) + ", "; + sql += sqlite_quote(identity.default_route) + ", "; + sql += sqlite_quote(identity.policy_profile) + ", "; + sql += sqlite_quote(backend_auth_mode_to_string(identity.backend_auth_mode)) + ", "; + sql += sqlite_quote(identity.backend_username) + ", "; + sql += sqlite_quote(identity.backend_password) + ", "; + sql += sqlite_quote(identity.attributes) + ", "; + sql += sqlite_quote(identity.comment) + ")"; + if (!db.execute(sql.c_str())) { + db.execute("ROLLBACK"); return; + } + } + db.execute("COMMIT"); +} + +void MysqlxConfigStore::project_routes_to_runtime_view(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return; + if (!db.execute("DELETE FROM runtime_mysqlx_routes")) { + db.execute("ROLLBACK"); return; + } + for (const auto& [name, route] : routes_) { + std::string sql = "INSERT INTO runtime_mysqlx_routes " + "(name, bind, destination_hostgroup, fallback_hostgroup, strategy, " + "active, attributes, comment) VALUES ("; + sql += sqlite_quote(route.name) + ", "; + sql += sqlite_quote(route.bind) + ", "; + sql += std::to_string(route.destination_hostgroup) + ", "; + if (route.fallback_hostgroup >= 0) { + sql += std::to_string(route.fallback_hostgroup) + ", "; + } else { + sql += "NULL, "; + } + sql += sqlite_quote(route.strategy) + ", 1, "; + sql += sqlite_quote(route.attributes) + ", "; + sql += sqlite_quote(route.comment) + ")"; + if (!db.execute(sql.c_str())) { + db.execute("ROLLBACK"); return; + } + } + db.execute("COMMIT"); +} + +void MysqlxConfigStore::project_endpoints_to_runtime_view(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return; + if (!db.execute("DELETE FROM runtime_mysqlx_backend_endpoints")) { + db.execute("ROLLBACK"); return; + } + for (const auto& [key, ov] : endpoint_overrides_) { + std::string sql = "INSERT INTO runtime_mysqlx_backend_endpoints " + "(hostname, mysql_port, mysqlx_port, use_ssl, attributes, comment) VALUES ("; + sql += sqlite_quote(ov.hostname) + ", "; + sql += std::to_string(ov.mysql_port) + ", "; + sql += std::to_string(ov.mysqlx_port) + ", "; + sql += (ov.use_ssl ? "1, " : "0, "); + sql += sqlite_quote(ov.attributes) + ", "; + sql += sqlite_quote(ov.comment) + ")"; + if (!db.execute(sql.c_str())) { + db.execute("ROLLBACK"); return; + } + } + db.execute("COMMIT"); +} + +void MysqlxConfigStore::project_variables_to_runtime_view(SQLite3DB& db) const { + std::shared_lock lock(mutex_); + if (!db.execute("BEGIN")) return; + if (!db.execute("DELETE FROM runtime_mysqlx_variables")) { + db.execute("ROLLBACK"); return; + } + auto put = [&](const char* name, const std::string& value) -> bool { + std::string sql = "INSERT INTO runtime_mysqlx_variables (variable_name, variable_value) VALUES ("; + sql += sqlite_quote(name) + ", " + sqlite_quote(value) + ")"; + return db.execute(sql.c_str()); + }; + if (!put("mysqlx_thread_pool_size", std::to_string(thread_pool_size_)) || + !put("mysqlx_connect_timeout", std::to_string(connect_timeout_)) || + !put("mysqlx_tls_mode", tls_mode_) || + !put("mysqlx_max_cached_connections_per_thread", std::to_string(max_cached_connections_))) { + db.execute("ROLLBACK"); return; + } + db.execute("COMMIT"); +} + std::optional MysqlxConfigStore::resolve_identity(const std::string& username) const { std::shared_lock lock(mutex_); const auto it = identities_.find(username); diff --git a/plugins/mysqlx/src/mysqlx_plugin.cpp b/plugins/mysqlx/src/mysqlx_plugin.cpp index ee0e410fc..8f1dbf25d 100644 --- a/plugins/mysqlx/src/mysqlx_plugin.cpp +++ b/plugins/mysqlx/src/mysqlx_plugin.cpp @@ -141,41 +141,41 @@ bool sync_disk_to_memory(SQLite3DB& admindb, ProxySQL_PluginServices* services) return all_ok; } -bool copy_to_runtime(SQLite3DB& admindb, ProxySQL_PluginServices* services) { - const char* pairs[][2] = { - {"mysqlx_users", "runtime_mysqlx_users"}, - {"mysqlx_routes", "runtime_mysqlx_routes"}, - {"mysqlx_backend_endpoints", "runtime_mysqlx_backend_endpoints"}, - {"mysqlx_variables", "runtime_mysqlx_variables"}, - }; - bool all_ok = true; - for (const auto& p : pairs) { - std::string dest = "main."; - dest += p[1]; - std::string source = "main."; - source += p[0]; - if (!replace_table_atomically(admindb, services, dest.c_str(), source.c_str())) { - all_ok = false; - } - } - return all_ok; -} - bool mysqlx_start() { MysqlxPluginContext& ctx = mysqlx_context(); if (ctx.services != nullptr && ctx.services->get_admindb != nullptr) { SQLite3DB* admindb = ctx.services->get_admindb(); if (admindb != nullptr) { + // Disk -> memory: keep the editable tables in sync with disk + // on startup, same as the canonical proxysql_admin path does + // for mysql_users / mysql_servers / etc. This is admin-tier + // persistence and is legitimate. sync_disk_to_memory(*admindb, ctx.services); - copy_to_runtime(*admindb, ctx.services); + // Memory -> module: the editable mysqlx_* tables now drive + // the in-memory store directly. Each install_*_from_admin + // SELECTs the editable table (and the relevant cross-module + // runtime_mysql_* projections), builds a new local map, and + // atomically swaps it into MysqlxConfigStore. We deliberately + // do NOT replicate this data into runtime_mysqlx_* admin + // tables -- those are projected on demand via the chassis + // register_runtime_view() refresh callbacks declared below. std::string err; - if (!ctx.config_store->load_from_runtime(*admindb, err)) { + auto report_err = [&](const char* what) { if (ctx.services->log_message != nullptr) { - ctx.services->log_message(3, err.c_str()); + std::string msg = "mysqlx: "; + msg += what; + msg += ": "; + msg += err; + ctx.services->log_message(3, msg.c_str()); } - } + err.clear(); + }; + if (!ctx.config_store->install_users_from_admin(*admindb, err)) report_err("install_users_from_admin failed"); + if (!ctx.config_store->install_routes_from_admin(*admindb, err)) report_err("install_routes_from_admin failed"); + if (!ctx.config_store->install_endpoints_from_admin(*admindb, err)) report_err("install_endpoints_from_admin failed"); + if (!ctx.config_store->install_variables_from_admin(*admindb, err)) report_err("install_variables_from_admin failed"); } }