feat: support plugin-owned admin tables and commands

mysqlx-plugin-impl
Rene Cannao 1 month ago
parent 80fa6bee24
commit cd15afdd1e

@ -43,12 +43,18 @@ using proxysql_plugin_register_command_cb =
using proxysql_plugin_snapshot_cb =
SQLite3_result *(*)();
using proxysql_plugin_db_handle_cb =
SQLite3DB *(*)();
using proxysql_plugin_log_message_cb =
void (*)(int, const char *);
struct ProxySQL_PluginServices {
proxysql_plugin_register_table_cb register_table;
proxysql_plugin_register_command_cb register_command;
proxysql_plugin_db_handle_cb get_admindb;
proxysql_plugin_db_handle_cb get_configdb;
proxysql_plugin_db_handle_cb get_statsdb;
proxysql_plugin_snapshot_cb get_mysql_users_snapshot;
proxysql_plugin_snapshot_cb get_mysql_servers_snapshot;
proxysql_plugin_snapshot_cb get_mysql_group_replication_hostgroups_snapshot;

@ -20,6 +20,14 @@ public:
bool init_all(std::string &err);
bool start_all(std::string &err);
bool stop_all();
const std::vector<ProxySQL_PluginTableDef>& tables(ProxySQL_PluginDBKind kind) const;
bool dispatch_admin_command(const ProxySQL_PluginCommandContext& ctx, const std::string& sql, ProxySQL_PluginCommandResult& result) const;
void register_table_for_test(const ProxySQL_PluginTableDef& def);
bool register_command_for_test(const std::string& sql);
bool has_command_for_test(const std::string& sql) const;
void register_table(const ProxySQL_PluginTableDef& def);
bool register_command(const char* sql, proxysql_plugin_admin_command_cb cb);
size_t size() const;
@ -32,10 +40,20 @@ private:
bool stopped{false};
};
struct registered_command_t {
std::string sql {};
proxysql_plugin_admin_command_cb cb { nullptr };
};
std::vector<plugin_handle_t> plugins_;
ProxySQL_PluginServices services_;
std::vector<ProxySQL_PluginTableDef> tables_admin_;
std::vector<ProxySQL_PluginTableDef> tables_config_;
std::vector<ProxySQL_PluginTableDef> tables_stats_;
std::vector<registered_command_t> commands_;
};
ProxySQL_PluginManager* proxysql_get_plugin_manager();
bool proxysql_load_configured_plugins(
std::unique_ptr<ProxySQL_PluginManager>& manager,
const std::vector<std::string>& plugin_modules,

@ -16,6 +16,8 @@
#include "ProxySQL_RESTAPI_Server.hpp"
#include "ProxySQL_Plugin.h"
#include "proxysql_typedefs.h"
#include "query_digest_topk.h"
@ -681,7 +683,9 @@ class ProxySQL_Admin {
void send_ok_msg_to_client(S* sess, const char *msg, int rows, const char* query);
template <typename S>
void send_error_msg_to_client(S* sess, const char *msg, uint16_t mysql_err_code=1045);
#ifdef DEBUG
template <typename S>
bool dispatch_plugin_admin_command(S* sess, const char *sql);
#ifdef DEBUG
// these two following functions used to just call and return one function each
// this approach was replaced when we introduced debug filters
//int load_debug_to_runtime() { return flush_debug_levels_database_to_runtime(admindb); }

@ -38,6 +38,7 @@ using json = nlohmann::json;
#include "PgSQL_Authentication.h"
#include "MySQL_LDAP_Authentication.hpp"
#include "MySQL_PreparedStatement.h"
#include "ProxySQL_PluginManager.h"
#include "ProxySQL_Cluster.hpp"
#include "ProxySQL_Statistics.hpp"
#include "MySQL_Logger.hpp"
@ -927,15 +928,27 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) {
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_servers_clients_status", STATS_SQLITE_TABLE_PROXYSQL_SERVERS_CLIENTS_STATUS);
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_message_metrics", STATS_SQLITE_TABLE_PROXYSQL_MESSAGE_METRICS);
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_message_metrics_reset", STATS_SQLITE_TABLE_PROXYSQL_MESSAGE_METRICS_RESET);
#ifdef PROXYSQLGENAI
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS);
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters_reset", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS_RESET);
#ifdef PROXYSQLGENAI
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS);
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_tools_counters_reset", STATS_SQLITE_TABLE_MCP_QUERY_TOOLS_COUNTERS_RESET);
// MCP query digest stats
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_digest", STATS_SQLITE_TABLE_MCP_QUERY_DIGEST);
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_digest_reset", STATS_SQLITE_TABLE_MCP_QUERY_DIGEST_RESET);
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_rules", STATS_SQLITE_TABLE_MCP_QUERY_RULES); // Reuse same schema for stats
#endif /* PROXYSQLGENAI */
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_digest_reset", STATS_SQLITE_TABLE_MCP_QUERY_DIGEST_RESET);
insert_into_tables_defs(tables_defs_stats,"stats_mcp_query_rules", STATS_SQLITE_TABLE_MCP_QUERY_RULES); // Reuse same schema for stats
#endif /* PROXYSQLGENAI */
if (ProxySQL_PluginManager* plugin_manager = proxysql_get_plugin_manager()) {
for (const auto& def : plugin_manager->tables(ProxySQL_PluginDBKind::admin_db)) {
insert_into_tables_defs(tables_defs_admin, def.table_name, def.table_def);
}
for (const auto& def : plugin_manager->tables(ProxySQL_PluginDBKind::config_db)) {
insert_into_tables_defs(tables_defs_config, def.table_name, def.table_def);
}
for (const auto& def : plugin_manager->tables(ProxySQL_PluginDBKind::stats_db)) {
insert_into_tables_defs(tables_defs_stats, def.table_name, def.table_def);
}
}
// init ldap here
init_ldap();

@ -5296,6 +5296,13 @@ __end_show_commands:
run_query=false;
}
}
if (run_query && sess->session_type == PROXYSQL_SESSION_ADMIN) {
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
if (SPA->dispatch_plugin_admin_command(sess, query_no_space)) {
run_query=false;
goto __run_query;
}
}
__run_query:
if (sess->proxysql_node_address && (__sync_fetch_and_add(&glovars.shutdown,0)==0)) {

@ -18,6 +18,7 @@ using json = nlohmann::json;
#include "MySQL_HostGroups_Manager.h"
#include "PgSQL_HostGroups_Manager.h"
#include "ProxySQL_PluginManager.h"
#include "mysql.h"
#include "proxysql_admin.h"
#include "Discovery_Schema.h"
@ -6221,12 +6222,35 @@ void ProxySQL_Admin::send_error_msg_to_client(S* sess, const char *msg, uint16_t
}
}
template <typename S>
bool ProxySQL_Admin::dispatch_plugin_admin_command(S* sess, const char* sql) {
ProxySQL_PluginManager* plugin_manager = proxysql_get_plugin_manager();
if (plugin_manager == nullptr) {
return false;
}
ProxySQL_PluginCommandContext ctx { admindb, configdb, statsdb };
ProxySQL_PluginCommandResult result { 0, 0, "" };
if (!plugin_manager->dispatch_admin_command(ctx, sql, result)) {
return false;
}
if (result.error_code == 0) {
send_ok_msg_to_client(sess, result.message.empty() ? NULL : result.message.c_str(), static_cast<int>(result.rows_affected), sql);
} else {
send_error_msg_to_client(sess, result.message.c_str(), static_cast<uint16_t>(result.error_code));
}
return true;
}
// Explicit template instantiations for send_ok_msg_to_client and send_error_msg_to_client
// These must come after the template definitions above
template void ProxySQL_Admin::send_ok_msg_to_client<MySQL_Session>(MySQL_Session*, char const*, int, char const*);
template void ProxySQL_Admin::send_ok_msg_to_client<PgSQL_Session>(PgSQL_Session*, char const*, int, char const*);
template void ProxySQL_Admin::send_error_msg_to_client<MySQL_Session>(MySQL_Session*, char const*, unsigned short);
template void ProxySQL_Admin::send_error_msg_to_client<PgSQL_Session>(PgSQL_Session*, char const*, unsigned short);
template bool ProxySQL_Admin::dispatch_plugin_admin_command<MySQL_Session>(MySQL_Session*, const char*);
template bool ProxySQL_Admin::dispatch_plugin_admin_command<PgSQL_Session>(PgSQL_Session*, const char*);
template <enum SERVER_TYPE pt>
void ProxySQL_Admin::__delete_inactive_users(enum cred_username_type usertype) {

@ -2,9 +2,15 @@
#include <cstring>
#include <dlfcn.h>
#include <strings.h>
#include "proxysql.h"
namespace {
ProxySQL_PluginManager* g_active_plugin_manager = nullptr;
ProxySQL_PluginManager* g_registry_target = nullptr;
std::string format_dl_error(const char *prefix) {
const char *dl_err = dlerror();
if (dl_err == nullptr) {
@ -20,14 +26,69 @@ std::string plugin_name(const ProxySQL_PluginDescriptor *descriptor) {
return descriptor->name;
}
void register_table_service(const ProxySQL_PluginTableDef& def) {
if (g_registry_target != nullptr) {
g_registry_target->register_table(def);
}
}
void register_command_service(const char* sql, proxysql_plugin_admin_command_cb cb) {
if (g_registry_target != nullptr) {
g_registry_target->register_command(sql, cb);
}
}
SQLite3DB* get_admindb_service() {
return nullptr;
}
SQLite3DB* get_configdb_service() {
return nullptr;
}
SQLite3DB* get_statsdb_service() {
return nullptr;
}
void log_message_service(int level, const char* message) {
if (message == nullptr) {
return;
}
switch (level) {
case 3:
proxy_error("%s\n", message);
break;
case 4:
proxy_warning("%s\n", message);
break;
default:
proxy_info("%s\n", message);
break;
}
}
bool sql_equals_ci(const std::string& lhs, const std::string& rhs) {
return strcasecmp(lhs.c_str(), rhs.c_str()) == 0;
}
} // namespace
ProxySQL_PluginManager::ProxySQL_PluginManager() {
std::memset(&services_, 0, sizeof(services_));
services_.register_table = &register_table_service;
services_.register_command = &register_command_service;
services_.get_admindb = &get_admindb_service;
services_.get_configdb = &get_configdb_service;
services_.get_statsdb = &get_statsdb_service;
services_.log_message = &log_message_service;
}
ProxySQL_PluginManager::~ProxySQL_PluginManager() {
stop_all();
if (g_active_plugin_manager == this) {
g_active_plugin_manager = nullptr;
}
for (auto it = plugins_.rbegin(); it != plugins_.rend(); ++it) {
if (it->handle != nullptr) {
dlclose(it->handle);
@ -86,10 +147,13 @@ bool ProxySQL_PluginManager::init_all(std::string &err) {
plugin.initialized = true;
continue;
}
g_registry_target = this;
if (!plugin.descriptor->init(&services_)) {
g_registry_target = nullptr;
err = "plugin init failed: " + plugin_name(plugin.descriptor);
return false;
}
g_registry_target = nullptr;
plugin.initialized = true;
}
@ -144,6 +208,83 @@ size_t ProxySQL_PluginManager::size() const {
return plugins_.size();
}
const std::vector<ProxySQL_PluginTableDef>& ProxySQL_PluginManager::tables(ProxySQL_PluginDBKind kind) const {
switch (kind) {
case ProxySQL_PluginDBKind::admin_db:
return tables_admin_;
case ProxySQL_PluginDBKind::config_db:
return tables_config_;
case ProxySQL_PluginDBKind::stats_db:
default:
return tables_stats_;
}
}
bool ProxySQL_PluginManager::dispatch_admin_command(const ProxySQL_PluginCommandContext& ctx, const std::string& sql, ProxySQL_PluginCommandResult& result) const {
for (const auto& command : commands_) {
if (!sql_equals_ci(command.sql, sql)) {
continue;
}
if (command.cb == nullptr) {
return false;
}
result = command.cb(ctx, sql.c_str());
return true;
}
return false;
}
void ProxySQL_PluginManager::register_table_for_test(const ProxySQL_PluginTableDef& def) {
register_table(def);
}
bool ProxySQL_PluginManager::register_command_for_test(const std::string& sql) {
return register_command(sql.c_str(), nullptr);
}
bool ProxySQL_PluginManager::has_command_for_test(const std::string& sql) const {
for (const auto& command : commands_) {
if (sql_equals_ci(command.sql, sql)) {
return true;
}
}
return false;
}
void ProxySQL_PluginManager::register_table(const ProxySQL_PluginTableDef& def) {
switch (def.db_kind) {
case ProxySQL_PluginDBKind::admin_db:
tables_admin_.push_back(def);
break;
case ProxySQL_PluginDBKind::config_db:
tables_config_.push_back(def);
break;
case ProxySQL_PluginDBKind::stats_db:
tables_stats_.push_back(def);
break;
}
}
bool ProxySQL_PluginManager::register_command(const char* sql, proxysql_plugin_admin_command_cb cb) {
if (sql == nullptr || *sql == '\0') {
return false;
}
for (const auto& command : commands_) {
if (strcasecmp(command.sql.c_str(), sql) == 0) {
return false;
}
}
commands_.push_back({sql, cb});
return true;
}
ProxySQL_PluginManager* proxysql_get_plugin_manager() {
return g_active_plugin_manager;
}
bool proxysql_load_configured_plugins(
std::unique_ptr<ProxySQL_PluginManager>& manager,
const std::vector<std::string>& plugin_modules,
@ -151,6 +292,7 @@ bool proxysql_load_configured_plugins(
) {
err.clear();
manager.reset();
g_active_plugin_manager = nullptr;
if (plugin_modules.empty()) {
return true;
@ -169,6 +311,7 @@ bool proxysql_load_configured_plugins(
}
manager = std::move(next_manager);
g_active_plugin_manager = manager.get();
return true;
}
@ -190,6 +333,7 @@ bool proxysql_stop_configured_plugins(
) {
err.clear();
if (!manager) {
g_active_plugin_manager = nullptr;
return true;
}
@ -199,5 +343,6 @@ bool proxysql_stop_configured_plugins(
}
manager.reset();
g_active_plugin_manager = nullptr;
return true;
}

@ -308,7 +308,8 @@ UNIT_TESTS := smoke_test-t query_cache_unit-t query_processor_unit-t \
genai_mysql_catalog_unit-t \
admin_disk_upgrade_unit-t \
glovars_unit-t \
plugin_manager_unit-t
plugin_manager_unit-t \
plugin_registry_unit-t
.PHONY: all
all: $(UNIT_TESTS)

@ -0,0 +1,22 @@
#include "ProxySQL_PluginManager.h"
#include "ProxySQL_Plugin.h"
#include "tap.h"
int main() {
plan(4);
ProxySQL_PluginManager mgr;
ProxySQL_PluginTableDef def {
ProxySQL_PluginDBKind::admin_db,
"mysqlx_users",
"CREATE TABLE mysqlx_users (username VARCHAR NOT NULL PRIMARY KEY)"
};
mgr.register_table_for_test(def);
ok(mgr.tables(ProxySQL_PluginDBKind::admin_db).size() == static_cast<size_t>(1), "plugin admin table is stored");
ok(mgr.tables(ProxySQL_PluginDBKind::config_db).size() == static_cast<size_t>(0), "config tables start empty");
ok(mgr.register_command_for_test("LOAD MYSQLX USERS TO RUNTIME"), "command registration succeeds");
ok(mgr.has_command_for_test("LOAD MYSQLX USERS TO RUNTIME"), "registered command is discoverable");
return exit_status();
}
Loading…
Cancel
Save