mirror of https://github.com/sysown/proxysql
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
192 lines
6.2 KiB
192 lines
6.2 KiB
/**
|
|
* @file binlog_rpl.h
|
|
* @brief Runs binlog replication sessions via MARIADB_RPL with COM_REGISTER_SLAVE.
|
|
* @details Uses the MariaDB client replication API to exercise binlog streaming
|
|
* functionality. Setting rpl->host before mariadb_rpl_open() triggers
|
|
* COM_REGISTER_SLAVE registration.
|
|
*
|
|
* The helper runs two replication sessions. Each session:
|
|
* 1. Opens a fresh connection to ProxySQL
|
|
* 2. Issues a lightweight pre-replication query
|
|
* 3. Configures replication session variables (checksum, heartbeat)
|
|
* 4. Initializes MARIADB_RPL with rpl->host set (triggers COM_REGISTER_SLAVE)
|
|
* 5. Opens replication and fetches events to verify the stream is active
|
|
* 6. Closes the replication session cleanly
|
|
*
|
|
* Returns EXIT_SUCCESS only if both sessions succeed.
|
|
*/
|
|
|
|
#ifndef BINLOG_RPL_H
|
|
#define BINLOG_RPL_H
|
|
|
|
#include <cstdlib>
|
|
#include <cstdio>
|
|
#include <cstring>
|
|
#include <unistd.h>
|
|
|
|
#include "mysql.h"
|
|
#include "mariadb_rpl.h"
|
|
#include "command_line.h"
|
|
#include "tap.h"
|
|
#include "utils.h"
|
|
|
|
// Number of heartbeats to wait per replication session to confirm stream is active
|
|
#define BINLOG_RPL_NHB 3
|
|
|
|
/**
|
|
* @brief Run a replication session against ProxySQL.
|
|
* @details Opens a connection, issues a pre-query, configures replication
|
|
* variables, then uses MARIADB_RPL with rpl->host set to send
|
|
* COM_REGISTER_SLAVE. Fetches events until BINLOG_RPL_NHB heartbeats
|
|
* are received, verifying the binlog stream is active.
|
|
*
|
|
* @param cl CommandLine with connection parameters.
|
|
* @param session_id A label for diagnostic output (e.g. 1 or 2).
|
|
* @param server_id Unique server_id for the replication registration.
|
|
* @return EXIT_SUCCESS if the session completes successfully, EXIT_FAILURE otherwise.
|
|
*/
|
|
static int run_replication_session(const CommandLine& cl, int session_id, int server_id) {
|
|
int rc = EXIT_FAILURE;
|
|
MYSQL* mysql = NULL;
|
|
MYSQL_RES* res = NULL;
|
|
MARIADB_RPL* rpl = NULL;
|
|
MARIADB_RPL_EVENT* event = NULL;
|
|
int num_heartbeats = 0;
|
|
int num_events = 0;
|
|
|
|
diag("Session %d: Connecting to ProxySQL at %s:%d as %s",
|
|
session_id, cl.host, cl.port, cl.username);
|
|
|
|
mysql = mysql_init(NULL);
|
|
if (!mysql) {
|
|
diag("Session %d: mysql_init failed", session_id);
|
|
goto cleanup;
|
|
}
|
|
|
|
// Force CLIENT_DEPRECATE_EOF to match backend capabilities for fast-forward
|
|
mysql->options.client_flag |= CLIENT_DEPRECATE_EOF;
|
|
|
|
if (!mysql_real_connect(mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
|
|
diag("Session %d: Connection failed: %s", session_id, mysql_error(mysql));
|
|
goto cleanup;
|
|
}
|
|
|
|
// Issue a pre-replication query that disables multiplexing. This ensures the
|
|
// backend connection stays pinned to the session so that when COM_REGISTER_SLAVE
|
|
// triggers fast_forward, the existing backend is reused instead of requiring a
|
|
// new connection.
|
|
if (run_q(mysql, "SELECT @@hostname")) {
|
|
goto cleanup;
|
|
}
|
|
res = mysql_store_result(mysql);
|
|
mysql_free_result(res);
|
|
|
|
// Configure replication session variables
|
|
if (run_q(mysql, "SET @master_binlog_checksum = 'NONE'")) {
|
|
goto cleanup;
|
|
}
|
|
if (run_q(mysql, "SET @master_heartbeat_period = 2000000000")) {
|
|
goto cleanup;
|
|
}
|
|
|
|
// Initialize replication handle
|
|
rpl = mariadb_rpl_init(mysql);
|
|
if (!rpl) {
|
|
diag("Session %d: mariadb_rpl_init failed", session_id);
|
|
goto cleanup;
|
|
}
|
|
|
|
rpl->server_id = server_id;
|
|
rpl->start_position = 4;
|
|
rpl->flags = MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS;
|
|
|
|
// KEY: Setting rpl->host causes mariadb_rpl_open() to send COM_REGISTER_SLAVE
|
|
// before proceeding with the binlog dump. This is the core mechanism under test.
|
|
rpl->host = strdup("127.0.0.1");
|
|
rpl->port = cl.port;
|
|
|
|
diag("Session %d: Opening replication with server_id=%d (COM_REGISTER_SLAVE will be sent)",
|
|
session_id, server_id);
|
|
|
|
if (mariadb_rpl_open(rpl)) {
|
|
diag("Session %d: mariadb_rpl_open failed: [%d] %s",
|
|
session_id, mysql_errno(rpl->mysql), mysql_error(rpl->mysql));
|
|
goto cleanup;
|
|
}
|
|
|
|
// Fetch events until we receive enough heartbeats to confirm the stream is active
|
|
diag("Session %d: Fetching binlog events, waiting for %d heartbeats...",
|
|
session_id, BINLOG_RPL_NHB);
|
|
|
|
while (num_heartbeats < BINLOG_RPL_NHB &&
|
|
(event = mariadb_rpl_fetch(rpl, event))) {
|
|
num_events++;
|
|
if (event->event_type == HEARTBEAT_LOG_EVENT_V2 ||
|
|
event->event_type == HEARTBEAT_LOG_EVENT) {
|
|
num_heartbeats++;
|
|
diag("Session %d: Heartbeat %d/%d received (total events: %d)",
|
|
session_id, num_heartbeats, BINLOG_RPL_NHB, num_events);
|
|
}
|
|
}
|
|
|
|
if (num_heartbeats < BINLOG_RPL_NHB && event == NULL) {
|
|
diag("Session %d: Replication stream ended prematurely: %s",
|
|
session_id, mysql_error(mysql));
|
|
}
|
|
|
|
if (num_heartbeats < BINLOG_RPL_NHB) {
|
|
diag("Session %d: FAILED - received only %d/%d heartbeats",
|
|
session_id, num_heartbeats, BINLOG_RPL_NHB);
|
|
goto cleanup;
|
|
}
|
|
|
|
diag("Session %d: SUCCESS - received %d heartbeats across %d events",
|
|
session_id, num_heartbeats, num_events);
|
|
rc = EXIT_SUCCESS;
|
|
|
|
cleanup:
|
|
mariadb_free_rpl_event(event);
|
|
if (rpl) {
|
|
mariadb_rpl_close(rpl);
|
|
}
|
|
if (mysql) {
|
|
mysql_close(mysql);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
/**
|
|
* @brief Run two binlog replication sessions via MARIADB_RPL.
|
|
* @details Executes two independent replication sessions, each sending
|
|
* COM_REGISTER_SLAVE. Used by TAP tests:
|
|
* - test_com_register_slave_enables_fast_forward-t: verifies COM_REGISTER_SLAVE
|
|
* enables fast-forward mode and binlog streaming works
|
|
* - test_binlog_reader_uses_previous_hostgroup-t: verifies fast-forward
|
|
* connections use the hostgroup from prior COM_QUERY
|
|
*
|
|
* @param cl CommandLine with connection parameters.
|
|
* @return EXIT_SUCCESS if both sessions succeed, EXIT_FAILURE otherwise.
|
|
*/
|
|
static int run_binlog_rpl(const CommandLine& cl) {
|
|
diag("=== Binlog RPL test: Starting ===");
|
|
diag("Running two replication sessions to exercise COM_REGISTER_SLAVE");
|
|
|
|
// Session 1
|
|
int rc = run_replication_session(cl, 1, 100001);
|
|
if (rc != EXIT_SUCCESS) {
|
|
diag("=== Binlog RPL test: FAILED (session 1) ===");
|
|
return EXIT_FAILURE;
|
|
}
|
|
|
|
// Session 2
|
|
rc = run_replication_session(cl, 2, 100002);
|
|
if (rc != EXIT_SUCCESS) {
|
|
diag("=== Binlog RPL test: FAILED (session 2) ===");
|
|
return EXIT_FAILURE;
|
|
}
|
|
|
|
diag("=== Binlog RPL test: SUCCESS (both sessions completed) ===");
|
|
return EXIT_SUCCESS;
|
|
}
|
|
|
|
#endif // BINLOG_RPL_H
|