test: strengthen pgsql cluster sync TAP follow-up

fix/postgresql-cluster-sync_2
Rene Cannao 2 months ago
parent 5c7e616f9b
commit bf9fc81f48

@ -12,11 +12,13 @@ IDIRS := -I$(PROXYSQL_IDIR) \
-I${CURL_IDIR} \
-I${SQLITE3_IDIR} \
-I$(DOTENV_IDIR) \
-I$(POSTGRESQL_IDIR) \
-I$(RE2_IDIR)
LIBPROXYSQLAR := $(PROXYSQL_LDIR)/libproxysql.a
AR ?= ar
OPT := $(STDCPP) -O2 -ggdb -Wl,--no-as-needed $(WASAN)
@ -26,21 +28,21 @@ OPT := $(STDCPP) -O2 -ggdb -Wl,--no-as-needed $(WASAN)
# being used inside ProxySQL linked 'SQLite3', which is also used by `libtap.so`.
LWGCOV :=
ifeq ($(WITHGCOV),1)
LWGCOV := -lgcov
LWGCOV := -lgcov --coverage
endif
### main targets
.PHONY: default
.PHONY: default debug
default: all
.PHONY: all
.PHONY: all debug
all: libtap_mariadb.a libtap_mysql57.a libtap_mysql8.a \
libtap.so libcpp_dotenv.so libre2.so
libtap.a libtap.so libcpp_dotenv.so libre2.so
debug: OPT := $(STDCPP) -O0 -DDEBUG -ggdb -Wl,--no-as-needed $(WASAN)
debug: libtap_mariadb.a libtap_mysql57.a libtap_mysql8.a libtap.so
debug: libtap_mariadb.a libtap_mysql57.a libtap_mysql8.a libtap.a libtap.so
### helper targets
@ -62,20 +64,20 @@ tap.o: tap.cpp cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a libcurl.so -lssl -lc
mcp_client.o: mcp_client.cpp mcp_client.h libcurl.so
$(CXX) -fPIC -c mcp_client.cpp $(IDIRS) $(OPT)
libtap_mariadb.a: Makefile tap.o command_line.o utils_mariadb.o mcp_client.o cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a
rm -f $@
ar rcs libtap_mariadb.a tap.o command_line.o utils_mariadb.o mcp_client.o $(SQLITE3_LDIR)/sqlite3.o $(PROXYSQL_LDIR)/obj/sha256crypt.oo
libtap_mariadb.a: tap.o command_line.o utils_mariadb.o noise_utils_mariadb.o mcp_client.o cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a
$(AR) rcs libtap_mariadb.a tap.o command_line.o utils_mariadb.o noise_utils_mariadb.o mcp_client.o $(SQLITE3_LDIR)/sqlite3.o $(PROXYSQL_LDIR)/obj/sha256crypt.oo
libtap_mysql57.a: Makefile tap.o command_line.o utils_mysql57.o mcp_client.o cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a
rm -f $@
ar rcs libtap_mysql57.a tap.o command_line.o utils_mysql57.o mcp_client.o $(SQLITE3_LDIR)/sqlite3.o $(PROXYSQL_LDIR)/obj/sha256crypt.oo
libtap_mysql57.a: tap.o command_line.o utils_mysql57.o noise_utils_mysql57.o mcp_client.o cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a
$(AR) rcs libtap_mysql57.a tap.o command_line.o utils_mysql57.o noise_utils_mysql57.o mcp_client.o $(SQLITE3_LDIR)/sqlite3.o $(PROXYSQL_LDIR)/obj/sha256crypt.oo
libtap_mysql8.a: Makefile tap.o command_line.o utils_mysql8.o mcp_client.o cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a
rm -f $@
ar rcs libtap_mysql8.a tap.o command_line.o utils_mysql8.o mcp_client.o $(SQLITE3_LDIR)/sqlite3.o $(PROXYSQL_LDIR)/obj/sha256crypt.oo
libtap_mysql8.a: tap.o command_line.o utils_mysql8.o noise_utils_mysql8.o mcp_client.o cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a
$(AR) rcs libtap_mysql8.a tap.o command_line.o utils_mysql8.o noise_utils_mysql8.o mcp_client.o $(SQLITE3_LDIR)/sqlite3.o $(PROXYSQL_LDIR)/obj/sha256crypt.oo
libtap.a: libtap_mariadb.a
cp libtap_mariadb.a libtap.a
libtap.so: libtap_mariadb.a cpp-dotenv/dynamic/cpp-dotenv/libcpp_dotenv.so libre2.so
$(CXX) -shared -o libtap.so -Wl,--whole-archive libtap_mariadb.a -Wl,--no-whole-archive $(LWGCOV)
$(CXX) -shared -o libtap.so -Wl,--whole-archive libtap_mariadb.a -Wl,--no-whole-archive -L$(POSTGRESQL_PATH)/interfaces/libpq -lpq -L$(RE2_LDIR)/so -lre2 -Wl,-rpath,$(POSTGRESQL_PATH)/interfaces/libpq -Wl,-rpath,$(RE2_LDIR)/so $(LWGCOV)
### tap deps targets
@ -95,7 +97,7 @@ cpp-dotenv/static/cpp-dotenv/libcpp_dotenv.a:
cd cpp-dotenv/static/cpp-dotenv && patch src/dotenv.cpp < ../../dotenv.cpp.patch
cd cpp-dotenv/static/cpp-dotenv && patch include/dotenv.h < ../../dotenv.h.patch
cd cpp-dotenv/static/cpp-dotenv && patch -p0 < ../../nm_clang_fix.patch
cd cpp-dotenv/static/cpp-dotenv && cmake . -DBUILD_TESTING=OFF -DBUILD_SHARED_LIBS=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Debug
cd cpp-dotenv/static/cpp-dotenv && cmake . -DBUILD_TESTING=OFF -DBUILD_SHARED_LIBS=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_BUILD_TYPE=Debug -DCMAKE_POLICY_VERSION_MINIMUM=3.5
cd cpp-dotenv/static/cpp-dotenv && CC=${CC} CXX=${CXX} ${MAKE}
cpp-dotenv/dynamic/cpp-dotenv/libcpp_dotenv.so:
@ -104,7 +106,7 @@ cpp-dotenv/dynamic/cpp-dotenv/libcpp_dotenv.so:
cd cpp-dotenv/dynamic/cpp-dotenv && patch src/dotenv.cpp < ../../dotenv.cpp.patch
cd cpp-dotenv/dynamic/cpp-dotenv && patch include/dotenv.h < ../../dotenv.h.patch
cd cpp-dotenv/dynamic/cpp-dotenv && patch -p0 < ../../nm_clang_fix.patch
cd cpp-dotenv/dynamic/cpp-dotenv && cmake . -DBUILD_TESTING=OFF -DBUILD_SHARED_LIBS=ON -DCMAKE_BUILD_RPATH="../tap:../../tap" -DCMAKE_BUILD_TYPE=Debug
cd cpp-dotenv/dynamic/cpp-dotenv && cmake . -DBUILD_TESTING=OFF -DBUILD_SHARED_LIBS=ON -DCMAKE_BUILD_RPATH="../tap:../../tap" -DCMAKE_BUILD_TYPE=Debug -DCMAKE_POLICY_VERSION_MINIMUM=3.5
cd cpp-dotenv/dynamic/cpp-dotenv && CC=${CC} CXX=${CXX} ${MAKE}
@ -113,8 +115,9 @@ cpp-dotenv/dynamic/cpp-dotenv/libcpp_dotenv.so:
.SILENT: clean_utils
.PHONY: clean_utils
clean_utils:
find . -name 'utils_*.*' -delete || true
find . -name 'libtap_*.*' -delete || true
find . -name 'utils_*.o' -delete || true
find . -name 'noise_utils_*.o' -delete || true
find . -name 'libtap_*.a' -delete || true
find . -name 'libtap.so' -delete || true
.SILENT: clean
@ -129,3 +132,20 @@ cleanall: clean
# Remove cpp-dotenv source directories (213MB)
cd cpp-dotenv/static && rm -rf cpp-dotenv-*/ || true
cd cpp-dotenv/dynamic && rm -rf cpp-dotenv-*/ || true
# Keep the v3.0 archive recipes intact so future merges stay clean, but
# preserve this branch's stale-archive workaround in separate helper rules.
LIBTAP_ARCHIVES := libtap_mariadb.a libtap_mysql57.a libtap_mysql8.a
$(LIBTAP_ARCHIVES): preclean-libtap-archives | Makefile
.PHONY: preclean-libtap-archives
preclean-libtap-archives:
rm -f $(LIBTAP_ARCHIVES)
ifeq ($(wildcard noise_utils.cpp noise_utils.h),)
NOISE_UTILS_STUBS := noise_utils_mariadb.o noise_utils_mysql57.o noise_utils_mysql8.o
$(NOISE_UTILS_STUBS):
$(CXX) -x c++ -fPIC -c /dev/null $(IDIRS) $(OPT) -o $@
endif

@ -1,34 +1,27 @@
/**
* @file test_cluster_sync_pgsql-t.cpp
* @brief Checks that ProxySQL PostgreSQL tables are properly syncing between cluster instances.
* @details Based on test_cluster_sync_mysql_servers-t.cpp, this test checks PostgreSQL cluster sync:
* @details This test checks PostgreSQL cluster sync for:
* - 'pgsql_servers_v2' sync between cluster nodes
* - 'pgsql_users' sync between cluster nodes
* - 'pgsql_query_rules' sync between cluster nodes
* - PostgreSQL modules checksums appear in runtime_checksums_values
* - Sync operation can be controlled via '%_diffs_before_sync' variables
* - Basic PostgreSQL admin tables and cluster variables are accessible
*
* Test Cluster Isolation:
* ----------------------
* For guaranteeing that this test doesn't invalidate the configuration of a running ProxySQL cluster and
* that after the test, the previous valid configuration is restored, the following actions are performed:
*
* 1. The Core nodes from the current cluster configuration are backup.
* 2. Primary (currently tested instance) is removed from the Core nodes.
* 3. A sync wait until all core nodes have performed the removal of primary is executed.
* 4. Now Primary is isolated from the previous cluster, tests can proceed. Primary is setup to hold itself
* in its 'proxysql_servers' as well as the target spawned replica.
* 5. After the tests recover the primary configuration and add it back to the Core nodes from Cluster:
* - Recover the previous 'pgsql_servers_v2' from disk, and load them to runtime, discarding any previous
* config performed during the test.
* - Insert the primary back into a Core node from cluster and wait for all nodes to sync including it.
* - Insert into the primary the previous backup Core nodes from Cluster and load to runtime.
* Optional replica validation:
* ----------------------------
* When 'TAP_PGSQL_SYNC_REPLICA_PORT' is set, the test temporarily backs up and restores
* modified PostgreSQL admin tables on the primary, then verifies that runtime state is
* replicated to the target replica. If the corresponding '*_save_to_disk' variable is enabled,
* the test also verifies persistence into the replica disk tables.
*/
#include <unistd.h>
#include <pthread.h>
#include <cstdint>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <time.h>
#include <atomic>
@ -37,6 +30,7 @@
#include <thread>
#include <iostream>
#include <functional>
#include <tuple>
#include <utility>
#include "libconfig.h"
@ -55,65 +49,183 @@
using std::vector;
using std::string;
const uint32_t SYNC_TIMEOUT = 10;
using pgsql_server_tuple = std::tuple<int, string, int, string, int, int, int, int, int, int, string>;
bool parse_bool_value(const string& value) {
return value == "1" || strcasecmp(value.c_str(), "true") == 0;
}
int get_admin_bool_value(MYSQL* admin, const string& variable_name, bool& value) {
string variable_value {};
const int rc = get_variable_value(admin, variable_name, variable_value);
if (rc != EXIT_SUCCESS) {
return rc;
}
value = parse_bool_value(variable_value);
return EXIT_SUCCESS;
}
int backup_admin_table(MYSQL* admin, const string& table_name, const string& backup_table_name) {
string drop_query {};
string create_query {};
string_format("DROP TABLE IF EXISTS %s", drop_query, backup_table_name.c_str());
if (mysql_query_t(admin, drop_query)) {
return EXIT_FAILURE;
}
string_format(
"CREATE TABLE %s AS SELECT * FROM %s",
create_query,
backup_table_name.c_str(),
table_name.c_str()
);
if (mysql_query_t(admin, create_query)) {
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
int restore_admin_table(
MYSQL* admin, const string& table_name, const string& backup_table_name, const string& load_query = ""
) {
string delete_query {};
string restore_query {};
string drop_query {};
int rc = EXIT_SUCCESS;
string_format("DELETE FROM %s", delete_query, table_name.c_str());
if (mysql_query_t(admin, delete_query)) {
rc = EXIT_FAILURE;
goto cleanup;
}
string_format(
"INSERT INTO %s SELECT * FROM %s",
restore_query,
table_name.c_str(),
backup_table_name.c_str()
);
if (mysql_query_t(admin, restore_query)) {
rc = EXIT_FAILURE;
goto cleanup;
}
if (!load_query.empty() && mysql_query_t(admin, load_query)) {
rc = EXIT_FAILURE;
}
cleanup:
string_format("DROP TABLE IF EXISTS %s", drop_query, backup_table_name.c_str());
if (mysql_query_t(admin, drop_query)) {
rc = EXIT_FAILURE;
}
return rc;
}
int fetch_single_count(MYSQL* admin, const string& query, int& count) {
if (mysql_query_t(admin, query)) {
return EXIT_FAILURE;
}
MYSQL_RES* result = mysql_store_result(admin);
if (!result) {
diag("Failed to store result from query: %s", query.c_str());
return EXIT_FAILURE;
}
MYSQL_ROW row = mysql_fetch_row(result);
if (!row || !row[0]) {
diag("Failed to fetch count row from query: %s", query.c_str());
mysql_free_result(result);
return EXIT_FAILURE;
}
count = atoi(row[0]);
mysql_free_result(result);
return EXIT_SUCCESS;
}
int wait_for_expected_count(MYSQL* admin, const string& query, int expected_count, const string& label) {
for (uint32_t waited = 0; waited < SYNC_TIMEOUT; ++waited) {
int count = 0;
if (fetch_single_count(admin, query, count) != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
if (count == expected_count) {
return EXIT_SUCCESS;
}
sleep(1);
}
diag("Timed out waiting for %s using query: %s", label.c_str(), query.c_str());
return EXIT_FAILURE;
}
int check_pgsql_servers_v2_sync(
const CommandLine& cl, MYSQL* proxy_admin, MYSQL* r_proxy_admin,
const vector<std::tuple<int, string, int, string, int, int, int, int, int, int, string>>& insert_pgsql_servers_values
MYSQL* proxy_admin, MYSQL* replica_admin, bool save_to_disk,
const vector<pgsql_server_tuple>& insert_pgsql_servers_values
) {
// Configure 'pgsql_servers_v2' and check sync with NULL comments
const string backup_table_name { "pgsql_servers_v2_sync_test_backup_5297" };
const char* t_insert_pgsql_servers =
"INSERT INTO pgsql_servers_v2 ("
" hostgroup_id, hostname, port, status, weight, compression, max_connections,"
" max_replication_lag, use_ssl, max_latency_ms, comment"
") VALUES (%d, '%s', %d, '%s', %d, %d, %d, %d, %d, %d, '%s')";
std::vector<std::string> insert_pgsql_servers_queries {};
vector<string> insert_pgsql_servers_queries {};
int rc = EXIT_FAILURE;
for (auto const& values : insert_pgsql_servers_values) {
std::string insert_pgsql_servers_query = "";
for (const auto& values : insert_pgsql_servers_values) {
string insert_pgsql_servers_query {};
string_format(
t_insert_pgsql_servers,
insert_pgsql_servers_query,
std::get<0>(values), // hostgroup_id
std::get<1>(values).c_str(), // hostname
std::get<2>(values), // port
std::get<3>(values).c_str(), // status
std::get<4>(values), // weight
std::get<5>(values), // compression
std::get<6>(values), // max_connections
std::get<7>(values), // max_replication_lag
std::get<8>(values), // use_ssl
std::get<9>(values), // max_latency_ms
std::get<10>(values).c_str() // comment
std::get<0>(values),
std::get<1>(values).c_str(),
std::get<2>(values),
std::get<3>(values).c_str(),
std::get<4>(values),
std::get<5>(values),
std::get<6>(values),
std::get<7>(values),
std::get<8>(values),
std::get<9>(values),
std::get<10>(values).c_str()
);
insert_pgsql_servers_queries.push_back(insert_pgsql_servers_query);
}
// Backup current table
MYSQL_QUERY(proxy_admin, "CREATE TABLE pgsql_servers_v2_sync_test AS SELECT * FROM pgsql_servers_v2");
MYSQL_QUERY(proxy_admin, "DELETE FROM pgsql_servers_v2");
// Insert test data into primary
for (auto const& query : insert_pgsql_servers_queries) {
MYSQL_QUERY(proxy_admin, query.c_str());
if (backup_admin_table(proxy_admin, "pgsql_servers_v2", backup_table_name) != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
if (mysql_query_t(proxy_admin, "DELETE FROM pgsql_servers_v2")) {
goto cleanup;
}
// Load to runtime and verify sync
MYSQL_QUERY(proxy_admin, "LOAD PGSQL SERVERS TO RUNTIME");
// Wait for sync
sleep(5);
for (const auto& query : insert_pgsql_servers_queries) {
if (mysql_query_t(proxy_admin, query)) {
goto cleanup;
}
}
if (mysql_query_t(proxy_admin, "LOAD PGSQL SERVERS TO RUNTIME")) {
goto cleanup;
}
// Check if data was synced to replica
for (auto const& values : insert_pgsql_servers_values) {
const char* t_select_pgsql_servers_inserted_entries =
"SELECT COUNT(*) FROM pgsql_servers_v2 WHERE hostgroup_id=%d AND hostname='%s'"
for (const auto& values : insert_pgsql_servers_values) {
const char* t_runtime_pgsql_servers_query =
"SELECT COUNT(*) FROM runtime_pgsql_servers WHERE hostgroup_id=%d AND hostname='%s'"
" AND port=%d AND status='%s' AND weight=%d AND"
" compression=%d AND max_connections=%d AND max_replication_lag=%d"
" AND use_ssl=%d AND max_latency_ms=%d AND comment='%s'";
std::string select_pgsql_servers_query = "";
string runtime_pgsql_servers_query {};
string_format(
t_select_pgsql_servers_inserted_entries,
select_pgsql_servers_query,
t_runtime_pgsql_servers_query,
runtime_pgsql_servers_query,
std::get<0>(values),
std::get<1>(values).c_str(),
std::get<2>(values),
@ -126,43 +238,197 @@ int check_pgsql_servers_v2_sync(
std::get<9>(values),
std::get<10>(values).c_str()
);
// Check on replica
MYSQL_RES* result = NULL;
MYSQL_QUERY(r_proxy_admin, select_pgsql_servers_query.c_str());
result = mysql_store_result(r_proxy_admin);
if (!result) {
diag("Failed to store result from query: %s", select_pgsql_servers_query.c_str());
return EXIT_FAILURE;
if (wait_for_expected_count(replica_admin, runtime_pgsql_servers_query, 1, "runtime_pgsql_servers sync") != EXIT_SUCCESS) {
goto cleanup;
}
if (mysql_num_rows(result) == 0) {
diag("No results returned from query: %s", select_pgsql_servers_query.c_str());
mysql_free_result(result);
return EXIT_FAILURE;
if (save_to_disk) {
const char* t_disk_pgsql_servers_query =
"SELECT COUNT(*) FROM pgsql_servers_v2 WHERE hostgroup_id=%d AND hostname='%s'"
" AND port=%d AND status='%s' AND weight=%d AND"
" compression=%d AND max_connections=%d AND max_replication_lag=%d"
" AND use_ssl=%d AND max_latency_ms=%d AND comment='%s'";
string disk_pgsql_servers_query {};
string_format(
t_disk_pgsql_servers_query,
disk_pgsql_servers_query,
std::get<0>(values),
std::get<1>(values).c_str(),
std::get<2>(values),
std::get<3>(values).c_str(),
std::get<4>(values),
std::get<5>(values),
std::get<6>(values),
std::get<7>(values),
std::get<8>(values),
std::get<9>(values),
std::get<10>(values).c_str()
);
if (wait_for_expected_count(replica_admin, disk_pgsql_servers_query, 1, "pgsql_servers_v2 disk sync") != EXIT_SUCCESS) {
goto cleanup;
}
}
MYSQL_ROW row = mysql_fetch_row(result);
if (!row) {
diag("Failed to fetch row from result");
mysql_free_result(result);
return EXIT_FAILURE;
}
rc = EXIT_SUCCESS;
cleanup:
if (restore_admin_table(proxy_admin, "pgsql_servers_v2", backup_table_name, "LOAD PGSQL SERVERS TO RUNTIME") != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
return rc;
}
int check_pgsql_users_sync(MYSQL* proxy_admin, MYSQL* replica_admin, bool save_to_disk) {
const string backup_table_name { "pgsql_users_sync_test_backup_5297" };
const string username { "cluster_sync_pgsql_user_5297" };
const string password { "cluster_sync_pgsql_pass_5297" };
const string attributes { "" };
const string comment { "cluster_sync_pgsql_user_5297" };
const int default_hostgroup = 801;
const int max_connections = 33;
int rc = EXIT_FAILURE;
if (backup_admin_table(proxy_admin, "pgsql_users", backup_table_name) != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
if (mysql_query_t(proxy_admin, "DELETE FROM pgsql_users")) {
goto cleanup;
}
string insert_user_query {};
string_format(
"INSERT INTO pgsql_users (username, password, active, use_ssl, default_hostgroup, transaction_persistent, fast_forward, backend, frontend, max_connections, attributes, comment) "
"VALUES ('%s', '%s', 1, 0, %d, 1, 0, 0, 1, %d, '%s', '%s')",
insert_user_query,
username.c_str(),
password.c_str(),
default_hostgroup,
max_connections,
attributes.c_str(),
comment.c_str()
);
if (mysql_query_t(proxy_admin, insert_user_query)) {
goto cleanup;
}
if (mysql_query_t(proxy_admin, "LOAD PGSQL USERS TO RUNTIME")) {
goto cleanup;
}
string runtime_user_query {};
string_format(
"SELECT COUNT(*) FROM runtime_pgsql_users WHERE username='%s' AND password='%s' AND active=1 AND use_ssl=0 AND default_hostgroup=%d "
"AND transaction_persistent=1 AND fast_forward=0 AND backend=0 AND frontend=1 AND max_connections=%d "
"AND attributes='%s' AND comment='%s'",
runtime_user_query,
username.c_str(),
password.c_str(),
default_hostgroup,
max_connections,
attributes.c_str(),
comment.c_str()
);
if (wait_for_expected_count(replica_admin, runtime_user_query, 1, "runtime_pgsql_users sync") != EXIT_SUCCESS) {
goto cleanup;
}
if (save_to_disk) {
string disk_user_query {};
string_format(
"SELECT COUNT(*) FROM pgsql_users WHERE username='%s' AND password='%s' AND active=1 AND use_ssl=0 AND default_hostgroup=%d "
"AND transaction_persistent=1 AND fast_forward=0 AND backend=0 AND frontend=1 AND max_connections=%d "
"AND attributes='%s' AND comment='%s'",
disk_user_query,
username.c_str(),
password.c_str(),
default_hostgroup,
max_connections,
attributes.c_str(),
comment.c_str()
);
if (wait_for_expected_count(replica_admin, disk_user_query, 1, "pgsql_users disk sync") != EXIT_SUCCESS) {
goto cleanup;
}
int count = atoi(row[0]);
mysql_free_result(result);
}
if (count != 1) {
diag("PostgreSQL server sync failed for hostgroup %d, hostname %s",
std::get<0>(values), std::get<1>(values).c_str());
return EXIT_FAILURE;
rc = EXIT_SUCCESS;
cleanup:
if (restore_admin_table(proxy_admin, "pgsql_users", backup_table_name, "LOAD PGSQL USERS TO RUNTIME") != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
return rc;
}
int check_pgsql_query_rules_sync(MYSQL* proxy_admin, MYSQL* replica_admin, bool save_to_disk) {
const string backup_table_name { "pgsql_query_rules_sync_test_backup_5297" };
const int rule_id = 98001;
const int destination_hostgroup = 801;
const string match_pattern { "^SELECT 42$" };
const string comment { "cluster_sync_pgsql_rule_5297" };
int rc = EXIT_FAILURE;
if (backup_admin_table(proxy_admin, "pgsql_query_rules", backup_table_name) != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
if (mysql_query_t(proxy_admin, "DELETE FROM pgsql_query_rules")) {
goto cleanup;
}
string insert_rule_query {};
string_format(
"INSERT INTO pgsql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply, comment) "
"VALUES (%d, 1, '%s', %d, 1, '%s')",
insert_rule_query,
rule_id,
match_pattern.c_str(),
destination_hostgroup,
comment.c_str()
);
if (mysql_query_t(proxy_admin, insert_rule_query)) {
goto cleanup;
}
if (mysql_query_t(proxy_admin, "LOAD PGSQL QUERY RULES TO RUNTIME")) {
goto cleanup;
}
string runtime_query_rules_query {};
string_format(
"SELECT COUNT(*) FROM runtime_pgsql_query_rules WHERE rule_id=%d AND active=1 AND match_pattern='%s' "
"AND destination_hostgroup=%d AND apply=1 AND comment='%s'",
runtime_query_rules_query,
rule_id,
match_pattern.c_str(),
destination_hostgroup,
comment.c_str()
);
if (wait_for_expected_count(replica_admin, runtime_query_rules_query, 1, "runtime_pgsql_query_rules sync") != EXIT_SUCCESS) {
goto cleanup;
}
if (save_to_disk) {
string disk_query_rules_query {};
string_format(
"SELECT COUNT(*) FROM pgsql_query_rules WHERE rule_id=%d AND active=1 AND match_pattern='%s' "
"AND destination_hostgroup=%d AND apply=1 AND comment='%s'",
disk_query_rules_query,
rule_id,
match_pattern.c_str(),
destination_hostgroup,
comment.c_str()
);
if (wait_for_expected_count(replica_admin, disk_query_rules_query, 1, "pgsql_query_rules disk sync") != EXIT_SUCCESS) {
goto cleanup;
}
}
// Restore original data
MYSQL_QUERY(proxy_admin, "DELETE FROM pgsql_servers_v2");
MYSQL_QUERY(proxy_admin, "INSERT INTO pgsql_servers_v2 SELECT * FROM pgsql_servers_v2_sync_test");
MYSQL_QUERY(proxy_admin, "DROP TABLE pgsql_servers_v2_sync_test");
MYSQL_QUERY(proxy_admin, "LOAD PGSQL SERVERS TO RUNTIME");
rc = EXIT_SUCCESS;
return EXIT_SUCCESS;
cleanup:
if (restore_admin_table(proxy_admin, "pgsql_query_rules", backup_table_name, "LOAD PGSQL QUERY RULES TO RUNTIME") != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
return rc;
}
int check_pgsql_checksums_in_runtime_table(MYSQL* admin) {
@ -218,7 +484,7 @@ int main(int argc, char** argv) {
return EXIT_FAILURE;
}
plan(11);
plan(13);
// Connect to admin interfaces
MYSQL* proxysql_admin = mysql_init(NULL);
@ -232,9 +498,6 @@ int main(int argc, char** argv) {
return exit_status();
}
// For this test, we'll just verify that PostgreSQL checksums are present
// In a full cluster test, we would connect to a replica and verify sync
// Check each PostgreSQL checksum individually
const char* pgsql_checksums[] = {
"pgsql_query_rules",
@ -311,15 +574,21 @@ int main(int argc, char** argv) {
}
{
bool sync_ok = true;
std::string sync_msg = "PostgreSQL servers_v2 sync check skipped (set TAP_PGSQL_SYNC_REPLICA_PORT to enable)";
bool servers_save_to_disk = false;
bool users_save_to_disk = false;
bool query_rules_save_to_disk = false;
const char* replica_port_env = getenv("TAP_PGSQL_SYNC_REPLICA_PORT");
if (replica_port_env && strlen(replica_port_env) > 0) {
if (!replica_port_env || strlen(replica_port_env) == 0) {
ok(true, "PostgreSQL servers_v2 sync check skipped (set TAP_PGSQL_SYNC_REPLICA_PORT to enable)");
ok(true, "PostgreSQL users sync check skipped (set TAP_PGSQL_SYNC_REPLICA_PORT to enable)");
ok(true, "PostgreSQL query rules sync check skipped (set TAP_PGSQL_SYNC_REPLICA_PORT to enable)");
} else {
MYSQL* replica_admin = mysql_init(NULL);
if (!replica_admin) {
sync_ok = false;
sync_msg = "Failed to initialize replica admin connection";
ok(false, "Failed to initialize replica admin connection for PostgreSQL servers_v2 sync check");
ok(false, "Failed to initialize replica admin connection for PostgreSQL users sync check");
ok(false, "Failed to initialize replica admin connection for PostgreSQL query rules sync check");
} else if (!mysql_real_connect(
replica_admin,
cl.host,
@ -329,24 +598,71 @@ int main(int argc, char** argv) {
static_cast<unsigned int>(atoi(replica_port_env)),
NULL,
0
)) {
sync_ok = false;
sync_msg = "Failed to connect to replica admin for PostgreSQL sync check";
} else {
const vector<std::tuple<int, string, int, string, int, int, int, int, int, int, string>> pgsql_servers_values {
{ 801, "127.0.0.1", 15432, "ONLINE", 1, 0, 200, 0, 0, 1000, "cluster_sync_pgsql_test" }
};
const int sync_res = check_pgsql_servers_v2_sync(cl, proxysql_admin, replica_admin, pgsql_servers_values);
sync_ok = (sync_res == EXIT_SUCCESS);
sync_msg = sync_ok ? "PostgreSQL servers_v2 synced to replica" : "PostgreSQL servers_v2 sync to replica failed";
)) {
ok(false, "Failed to connect to replica admin for PostgreSQL servers_v2 sync check");
ok(false, "Failed to connect to replica admin for PostgreSQL users sync check");
ok(false, "Failed to connect to replica admin for PostgreSQL query rules sync check");
} else {
const int servers_save_to_disk_rc = get_admin_bool_value(
proxysql_admin, "admin-cluster_pgsql_servers_save_to_disk", servers_save_to_disk
);
if (servers_save_to_disk_rc != EXIT_SUCCESS) {
diag("Failed to retrieve admin-cluster_pgsql_servers_save_to_disk");
}
const int users_save_to_disk_rc = get_admin_bool_value(
proxysql_admin, "admin-cluster_pgsql_users_save_to_disk", users_save_to_disk
);
if (users_save_to_disk_rc != EXIT_SUCCESS) {
diag("Failed to retrieve admin-cluster_pgsql_users_save_to_disk");
}
const int query_rules_save_to_disk_rc = get_admin_bool_value(
proxysql_admin, "admin-cluster_pgsql_query_rules_save_to_disk", query_rules_save_to_disk
);
if (query_rules_save_to_disk_rc != EXIT_SUCCESS) {
diag("Failed to retrieve admin-cluster_pgsql_query_rules_save_to_disk");
}
const vector<pgsql_server_tuple> pgsql_servers_values {
{ 801, "127.0.0.1", 15432, "ONLINE", 1, 0, 200, 0, 0, 1000, "cluster_sync_pgsql_test_5297" }
};
const int servers_sync_res = (servers_save_to_disk_rc == EXIT_SUCCESS)
? check_pgsql_servers_v2_sync(
proxysql_admin, replica_admin, servers_save_to_disk, pgsql_servers_values
)
: EXIT_FAILURE;
ok(
servers_sync_res == EXIT_SUCCESS,
"PostgreSQL servers_v2 synced to replica%s",
(servers_save_to_disk ? " and disk persisted" : "")
);
const int users_sync_res = (users_save_to_disk_rc == EXIT_SUCCESS)
? check_pgsql_users_sync(
proxysql_admin, replica_admin, users_save_to_disk
)
: EXIT_FAILURE;
ok(
users_sync_res == EXIT_SUCCESS,
"PostgreSQL users synced to replica%s",
(users_save_to_disk ? " and disk persisted" : "")
);
const int query_rules_sync_res = (query_rules_save_to_disk_rc == EXIT_SUCCESS)
? check_pgsql_query_rules_sync(
proxysql_admin, replica_admin, query_rules_save_to_disk
)
: EXIT_FAILURE;
ok(
query_rules_sync_res == EXIT_SUCCESS,
"PostgreSQL query rules synced to replica%s",
(query_rules_save_to_disk ? " and disk persisted" : "")
);
}
if (replica_admin) {
mysql_close(replica_admin);
}
}
ok(sync_ok, "%s", sync_msg.c_str());
}
mysql_close(proxysql_admin);

Loading…
Cancel
Save