You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
proxysql/test/tap/tests/unit/mysqlx_robustness_unit-t.cpp

1544 lines
52 KiB

#include "mysqlx_session.h"
#include "mysqlx_protocol.h"
#include "mysqlx_thread.h"
#include "mysqlx_plugin.h"
#include "sqlite3db.h"
#include "mysqlx_config_store.h"
#include "mysqlx_stats.h"
#include "tap.h"
#include "test_globals.h"
#include "test_init.h"
#include "mysqlx.pb.h"
#include "mysqlx_connection.pb.h"
#include "mysqlx_session.pb.h"
#include "mysqlx_datatypes.pb.h"
#include "mysqlx_resultset.pb.h"
#include "mysqlx_sql.pb.h"
#include <cerrno>
#include <cstdio>
#include <cstring>
#include <fcntl.h>
#include <map>
#include <memory>
#include <mutex>
#include <poll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <vector>
#include <thread>
#include <atomic>
static void write_x_frame(int fd, uint8_t msg_type, const uint8_t* payload, size_t payload_len) {
uint32_t size = static_cast<uint32_t>(payload_len) + 1;
uint8_t header[5];
header[0] = size & 0xFF;
header[1] = (size >> 8) & 0xFF;
header[2] = (size >> 16) & 0xFF;
header[3] = (size >> 24) & 0xFF;
header[4] = msg_type;
write(fd, header, 5);
if (payload_len > 0) {
write(fd, payload, payload_len);
}
}
static ssize_t read_x_frame(int fd, uint8_t* buf, size_t buf_size, int timeout_ms = 200) {
struct pollfd pfd;
pfd.fd = fd;
pfd.events = POLLIN;
int pret = poll(&pfd, 1, timeout_ms);
if (pret <= 0) return -1;
uint8_t header[5];
ssize_t r;
do {
r = read(fd, header, 5);
} while (r < 0 && errno == EINTR);
if (r != 5) return -1;
uint32_t payload_size = header[0] | (header[1] << 8) | (header[2] << 16) | (header[3] << 24);
uint8_t msg_type = header[4];
if (5 + payload_size > buf_size) return -1;
buf[0] = header[0]; buf[1] = header[1]; buf[2] = header[2]; buf[3] = header[3]; buf[4] = msg_type;
if (payload_size > 1) {
pret = poll(&pfd, 1, timeout_ms);
if (pret <= 0) return -1;
do {
r = read(fd, buf + 5, payload_size - 1);
} while (r < 0 && errno == EINTR);
if (r != static_cast<ssize_t>(payload_size - 1)) return -1;
}
return 4 + payload_size;
}
// Shared "default" config store and thread used by setup_authenticated_session.
// After the Task 4 wiring, a session that wants to reach WAITING_CLIENT_XMSG
// via the real auth handler needs: (a) an identity_lookup that returns an
// identity with a non-empty default_route, and (b) a config store reachable
// via the session's thread_ptr_ that knows about that route and has at least
// one endpoint in its hostgroup. Without these two ingredients,
// resolve_backend_target() will fail (4000/4001/4002) and the session will
// transition to X_SESSION_CLOSING instead of WAITING_CLIENT_XMSG.
//
// The helpers below lazily construct a process-global thread+store pair,
// configured once for the "default_test_route" happy-path fixture. Tests
// that want a different routing outcome should not call
// setup_authenticated_session; they should drive the session state
// machine directly (see test_routing_unknown_user for the pattern).
static MysqlxConfigStore& default_test_config_store() {
static MysqlxConfigStore store;
static bool initialized = false;
if (!initialized) {
std::unordered_map<std::string, MysqlxRoute> routes;
MysqlxRoute r {};
r.name = "default_test_route";
r.destination_hostgroup = 10;
r.strategy = "first_available";
routes.emplace("default_test_route", r);
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> endpoints;
MysqlxBackendEndpoint ep {};
ep.hostname = "127.0.0.1";
ep.mysql_port = 3306;
ep.mysqlx_port = 33060;
endpoints[10].push_back(ep);
store.install_for_test(std::move(routes), std::move(endpoints));
initialized = true;
}
return store;
}
static Mysqlx_Thread& default_test_thread() {
static Mysqlx_Thread thr;
static bool initialized = false;
if (!initialized) {
thr.init(0);
thr.set_config_store(&default_test_config_store());
initialized = true;
}
return thr;
}
static void setup_authenticated_session(int fds[2], MysqlxSession& sess) {
// Drive the real MYSQL41 handshake so the session's own fields
// (username_, schema_, identity_, status_) land where the auth path
// would leave them. The hardened auth flow refuses to authenticate
// without an identity_lookup_ installed, so wire one that returns a
// matching identity and compute a proper scramble against the
// identity's password.
sess.init(fds[0], &default_test_thread());
const std::string TEST_PASSWORD = "testpass";
sess.set_identity_lookup([TEST_PASSWORD](const std::string& u)
-> std::optional<MysqlxResolvedIdentity> {
MysqlxResolvedIdentity id {};
id.username = u;
id.x_enabled = true;
id.password = TEST_PASSWORD;
id.allowed_auth_methods = "MYSQL41";
id.default_route = "default_test_route";
return id;
});
sess.to_process = true;
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_CON_CAPABILITIES_GET, nullptr, 0);
sess.handler();
usleep(5000);
{ uint8_t buf[4096]; read_x_frame(fds[1], buf, sizeof(buf)); }
sess.to_process = true;
Mysqlx::Session::AuthenticateStart auth_start;
auth_start.set_mech_name("MYSQL41");
auth_start.set_auth_data(std::string("\0\0testuser", 11));
std::string serialized;
auth_start.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_START,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.handler();
std::vector<uint8_t> challenge;
usleep(5000);
{
uint8_t buf[4096];
ssize_t r = read_x_frame(fds[1], buf, sizeof(buf));
if (r > 5 && buf[4] == Mysqlx::ServerMessages_Type_SESS_AUTHENTICATE_CONTINUE) {
Mysqlx::Session::AuthenticateContinue server_cont;
if (server_cont.ParseFromArray(buf + 5, static_cast<int>(r - 5))) {
const std::string& cd = server_cont.auth_data();
challenge.assign(cd.begin(), cd.end());
}
}
}
auto scramble_bytes = mysqlx_mysql41_scramble(challenge, TEST_PASSWORD);
std::string scramble_hex = "*";
static const char hex[] = "0123456789ABCDEF";
for (uint8_t b : scramble_bytes) {
scramble_hex.push_back(hex[(b >> 4) & 0xF]);
scramble_hex.push_back(hex[b & 0xF]);
}
sess.to_process = true;
Mysqlx::Session::AuthenticateContinue cont;
cont.set_auth_data(scramble_hex);
cont.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_CONTINUE,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.handler();
usleep(5000);
{ uint8_t buf[4096]; read_x_frame(fds[1], buf, sizeof(buf)); }
}
static void detach_session_fds(MysqlxSession& sess) {
sess.client_ds().init(XDS_FRONTEND, -1);
if (sess.backend_conn()) {
sess.backend_conn()->set_fd(-1);
}
}
static void test_server_response_terminal_frame() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::IDLE);
conn->set_reusable(true);
sess.backend_conn() = conn;
sess.server_ds().init(XDS_BACKEND, backend_fds[0]);
sess.set_status(MysqlxSession::WAITING_SERVER_XMSG);
// SQL_STMT_EXECUTE_OK is only a terminal frame under the
// RESP_WAITING_STMT_EXECUTE / RESP_WAITING_PREPARE_EXECUTE
// response_state — see is_terminal_frame in mysqlx_session.cpp.
// The default RESP_IDLE rejects every frame, so the test must
// position the session in the response state matching the
// terminator it's about to send.
sess.set_response_state_for_test(RESP_WAITING_STMT_EXECUTE);
Mysqlx::Sql::StmtExecuteOk exec_ok;
std::string ok_s;
exec_ok.SerializeToString(&ok_s);
write_x_frame(backend_fds[1], Mysqlx::ServerMessages_Type_SQL_STMT_EXECUTE_OK,
reinterpret_cast<const uint8_t*>(ok_s.data()), ok_s.size());
sess.to_process = true;
sess.handler();
ok(sess.get_status() == MysqlxSession::WAITING_CLIENT_XMSG,
"terminal frame (SQL_STMT_EXECUTE_OK) returns session to WAITING_CLIENT_XMSG");
ok(sess.backend_conn() == nullptr, "backend returned to pool after terminal frame");
uint8_t buf[4096];
usleep(5000);
ssize_t r = read_x_frame(client_fds[1], buf, sizeof(buf));
ok(r > 0, "client received forwarded frame");
if (r > 0) {
ok(buf[4] == Mysqlx::ServerMessages_Type_SQL_STMT_EXECUTE_OK,
"forwarded frame is SQL_STMT_EXECUTE_OK");
} else {
ok(false, "forwarded frame is SQL_STMT_EXECUTE_OK");
}
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
close(backend_fds[0]); close(backend_fds[1]);
}
static void test_server_response_non_terminal_keeps_waiting() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::IDLE);
conn->set_reusable(true);
sess.backend_conn() = conn;
sess.server_ds().init(XDS_BACKEND, backend_fds[0]);
sess.set_status(MysqlxSession::WAITING_SERVER_XMSG);
// ColumnMetaData is only allowed under one of the resultset-
// producing response states (STMT_EXECUTE / CRUD / PREPARE_EXECUTE
// / CURSOR_OPEN). Pin to RESP_WAITING_STMT_EXECUTE — the
// is_frame_allowed contract is identical for ColumnMetaData
// across all four.
sess.set_response_state_for_test(RESP_WAITING_STMT_EXECUTE);
Mysqlx::Resultset::ColumnMetaData col;
col.set_type(Mysqlx::Resultset::ColumnMetaData_FieldType_SINT);
std::string col_s;
col.SerializeToString(&col_s);
write_x_frame(backend_fds[1], Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA,
reinterpret_cast<const uint8_t*>(col_s.data()), col_s.size());
sess.to_process = true;
sess.handler();
ok(sess.get_status() == MysqlxSession::WAITING_SERVER_XMSG,
"non-terminal frame (ColumnMetaData) keeps session in WAITING_SERVER_XMSG");
ok(sess.backend_conn() != nullptr, "backend NOT returned to pool after non-terminal frame");
uint8_t buf[4096];
usleep(5000);
ssize_t r = read_x_frame(client_fds[1], buf, sizeof(buf));
ok(r > 0, "client received forwarded non-terminal frame");
if (r > 0) {
ok(buf[4] == Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA,
"forwarded frame is ColumnMetaData");
} else {
ok(false, "forwarded frame is ColumnMetaData");
}
detach_session_fds(sess);
close(backend_fds[0]); close(backend_fds[1]);
close(client_fds[0]);
char drain[4096];
while (read(client_fds[1], drain, sizeof(drain)) > 0);
close(client_fds[1]);
}
static void test_server_response_multi_frame_pipeline() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::IDLE);
conn->set_reusable(true);
sess.backend_conn() = conn;
sess.server_ds().init(XDS_BACKEND, backend_fds[0]);
sess.set_status(MysqlxSession::WAITING_SERVER_XMSG);
// Pipeline: ColumnMetaData → Row → SQL_STMT_EXECUTE_OK. All
// three are accepted under RESP_WAITING_STMT_EXECUTE; Row's
// allow-list is gated on seen_column_metadata_ which the
// handler flips when it forwards the metadata frame.
sess.set_response_state_for_test(RESP_WAITING_STMT_EXECUTE);
uint8_t dummy_payload[] = {0x01, 0x02, 0x03};
write_x_frame(backend_fds[1], Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA,
dummy_payload, sizeof(dummy_payload));
write_x_frame(backend_fds[1], Mysqlx::ServerMessages_Type_RESULTSET_ROW,
dummy_payload, sizeof(dummy_payload));
Mysqlx::Sql::StmtExecuteOk exec_ok;
std::string ok_s;
exec_ok.SerializeToString(&ok_s);
write_x_frame(backend_fds[1], Mysqlx::ServerMessages_Type_SQL_STMT_EXECUTE_OK,
reinterpret_cast<const uint8_t*>(ok_s.data()), ok_s.size());
sess.to_process = true;
sess.handler();
ok(sess.get_status() == MysqlxSession::WAITING_CLIENT_XMSG,
"multi-frame pipeline: session returns to WAITING_CLIENT_XMSG");
ok(sess.backend_conn() == nullptr, "backend returned to pool after multi-frame response");
uint8_t buf[4096];
usleep(5000);
int frame_count = 0;
for (int i = 0; i < 10; i++) {
ssize_t r = read_x_frame(client_fds[1], buf, sizeof(buf));
if (r <= 0) break;
frame_count++;
}
ok(frame_count == 3, "client received all 3 frames (ColumnMetaData + Row + StmtExecuteOk)");
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
close(backend_fds[0]); close(backend_fds[1]);
}
static void test_backend_disconnect_during_query() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::IDLE);
conn->set_reusable(true);
sess.backend_conn() = conn;
sess.server_ds().init(XDS_BACKEND, backend_fds[0]);
sess.set_status(MysqlxSession::WAITING_SERVER_XMSG);
close(backend_fds[1]);
sess.to_process = true;
sess.handler();
ok(!sess.is_healthy(), "session unhealthy after backend disconnect");
ok(sess.backend_conn() == nullptr, "backend returned to pool after disconnect");
uint8_t buf[4096];
usleep(5000);
ssize_t r = read_x_frame(client_fds[1], buf, sizeof(buf));
ok(r > 0, "client received error response");
if (r > 0) {
ok(buf[4] == Mysqlx::ServerMessages_Type_ERROR, "response is ERROR after backend disconnect");
} else {
ok(false, "response is ERROR after backend disconnect");
}
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
}
static void test_backend_fd_negative_no_crash() {
diag(">>> %s", __func__);
int client_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(-1);
conn->set_state(MysqlxConnection::IDLE);
sess.backend_conn() = conn;
sess.server_ds().init(XDS_BACKEND, -1);
sess.set_status(MysqlxSession::WAITING_SERVER_XMSG);
sess.to_process = true;
sess.handler();
ok(sess.get_status() == MysqlxSession::WAITING_CLIENT_XMSG,
"session returns to WAITING_CLIENT_XMSG when server fd < 0");
ok(sess.backend_conn() == nullptr, "backend returned to pool when server fd < 0");
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
}
static void test_mysql41_no_credential_lookup_rejects() {
diag(">>> %s", __func__);
int fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
MysqlxSession sess;
// Attach to the shared test thread+store and pre-seed a resolvable
// identity. The asserted invariant here is that without an
// identity_lookup set, credential verification in
// handler_auth_challenge_response is skipped; a valid default_route
// is still required to reach WAITING_CLIENT_XMSG (Task 4 wiring),
// so we inject one directly.
sess.init(fds[0], &default_test_thread());
MysqlxResolvedIdentity id {};
id.username = "testuser";
id.x_enabled = true;
id.default_route = "default_test_route";
sess.inject_identity_for_test(id);
sess.to_process = true;
Mysqlx::Session::AuthenticateStart auth_start;
auth_start.set_mech_name("MYSQL41");
auth_start.set_auth_data(std::string("\0\0testuser", 11));
std::string serialized;
auth_start.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_START,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.handler();
uint8_t buf[4096];
usleep(5000);
read_x_frame(fds[1], buf, sizeof(buf));
Mysqlx::Session::AuthenticateContinue cont;
cont.set_auth_data("*AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
cont.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_CONTINUE,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.to_process = true;
sess.handler();
ok(!sess.is_healthy(), "without credential_lookup, MYSQL41 auth must be rejected (no open-proxy fallback)");
ok(sess.get_status() != MysqlxSession::WAITING_CLIENT_XMSG,
"session not in WAITING_CLIENT_XMSG after auth rejection");
detach_session_fds(sess);
close(fds[0]); close(fds[1]);
}
static void test_auth_start_empty_payload() {
diag(">>> %s", __func__);
int fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
MysqlxSession sess;
sess.init(fds[0], nullptr);
sess.to_process = true;
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_START, nullptr, 0);
sess.handler();
ok(!sess.is_healthy(), "unhealthy after empty AuthenticateStart payload");
uint8_t buf[4096];
usleep(5000);
ssize_t r = read_x_frame(fds[1], buf, sizeof(buf));
ok(r > 0, "got error for empty payload");
if (r > 0) {
ok(buf[4] == Mysqlx::ServerMessages_Type_ERROR, "response is ERROR");
} else {
ok(false, "response is ERROR");
}
detach_session_fds(sess);
close(fds[0]); close(fds[1]);
}
static void test_auth_start_malformed_protobuf() {
diag(">>> %s", __func__);
int fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
MysqlxSession sess;
sess.init(fds[0], nullptr);
sess.to_process = true;
uint8_t garbage[] = {0xFF, 0xFE, 0xFD, 0xFC, 0xFB};
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_START, garbage, sizeof(garbage));
sess.handler();
ok(!sess.is_healthy(), "unhealthy after malformed AuthenticateStart protobuf");
uint8_t buf[4096];
usleep(5000);
ssize_t r = read_x_frame(fds[1], buf, sizeof(buf));
ok(r > 0, "got error for malformed protobuf");
if (r > 0) {
ok(buf[4] == Mysqlx::ServerMessages_Type_ERROR, "response is ERROR for malformed protobuf");
} else {
ok(false, "response is ERROR for malformed protobuf");
}
detach_session_fds(sess);
close(fds[0]); close(fds[1]);
}
static void test_auth_challenge_wrong_msg_type() {
diag(">>> %s", __func__);
int fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
MysqlxSession sess;
sess.init(fds[0], nullptr);
sess.to_process = true;
Mysqlx::Session::AuthenticateStart auth_start;
auth_start.set_mech_name("MYSQL41");
std::string serialized;
auth_start.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_START,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.handler();
usleep(5000);
uint8_t buf[4096];
read_x_frame(fds[1], buf, sizeof(buf));
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SQL_STMT_EXECUTE, nullptr, 0);
sess.to_process = true;
sess.handler();
ok(!sess.is_healthy(), "unhealthy after wrong message type during auth challenge");
detach_session_fds(sess);
close(fds[0]); close(fds[1]);
}
static void test_return_backend_no_thread() {
diag(">>> %s", __func__);
int fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(fds, sess);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::IDLE);
conn->set_reusable(true);
sess.backend_conn() = conn;
sess.server_ds().init(XDS_BACKEND, backend_fds[0]);
sess.set_status(MysqlxSession::WAITING_SERVER_XMSG);
sess.backend_conn()->set_fd(-1);
close(backend_fds[1]);
close(backend_fds[0]);
sess.to_process = true;
sess.handler();
ok(sess.backend_conn() == nullptr, "backend_conn_ nulled after disconnect with no thread");
detach_session_fds(sess);
close(fds[0]); close(fds[1]);
}
static void test_connection_limit_config() {
diag(">>> %s", __func__);
Mysqlx_Thread thread;
thread.set_max_sessions(1);
ok(thread.get_max_sessions() == 1, "max_sessions set to 1");
thread.set_max_sessions(100);
ok(thread.get_max_sessions() == 100, "max_sessions updated to 100");
}
static void test_client_disconnect_detected() {
diag(">>> %s", __func__);
int fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
MysqlxSession sess;
sess.init(fds[0], nullptr);
sess.to_process = true;
close(fds[1]);
int rc = sess.handler();
ok(!sess.is_healthy(), "session unhealthy after client disconnect");
ok(rc == -1, "handler returns -1 on client disconnect");
detach_session_fds(sess);
close(fds[0]);
}
static void test_check_connect_bad_fd() {
diag(">>> %s", __func__);
// Bug fix (PR #5641 review): check_connect() must report a hard error
// when poll() sees POLLNVAL on a closed/invalid fd, instead of waiting
// out the connect timeout. Create an fd, close it so it's invalid, then
// hand it to the connection and call check_connect().
int s = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
ok(s >= 0, "socket() created for bad-fd test");
close(s);
MysqlxConnection conn;
conn.set_fd(s);
conn.set_state(MysqlxConnection::CONNECTING);
conn.set_connect_timeout(10000);
int rc = conn.check_connect();
ok(rc == -1, "check_connect() returns -1 on invalid fd");
ok(conn.get_state() == MysqlxConnection::ERROR_STATE,
"check_connect() transitions to ERROR_STATE on invalid fd");
conn.set_fd(-1); // prevent dtor from double-closing
}
static void test_check_connect_success_path() {
diag(">>> %s", __func__);
// Positive path: an already-connected socketpair endpoint reports POLLOUT
// ready immediately, SO_ERROR is 0, so check_connect() must transition to
// AUTHENTICATING and return 0.
int fds[2];
int sp = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
ok(sp == 0, "socketpair() created for success-path test");
// Make non-blocking to mimic start_connect() fd flags.
int flags = fcntl(fds[0], F_GETFL, 0);
fcntl(fds[0], F_SETFL, flags | O_NONBLOCK);
MysqlxConnection conn;
conn.set_fd(fds[0]);
conn.set_state(MysqlxConnection::CONNECTING);
conn.set_connect_timeout(10000);
int rc = conn.check_connect();
ok(rc == 0, "check_connect() returns 0 when socket is writable and SO_ERROR==0");
ok(conn.get_state() == MysqlxConnection::AUTHENTICATING,
"check_connect() transitions to AUTHENTICATING on successful connect");
conn.set_fd(-1); // detach before manual close
close(fds[0]);
close(fds[1]);
}
// Regression guard: after backend auth completes, the data-plane reads/writes
// must route through backend_conn_->backend_ds(), not a session-owned stream
// re-wrapped around the raw backend fd. The TLS handshake (when backend TLS
// is required) runs against backend_conn_->backend_ds() and the resulting
// SSL* lives on that stream; rewrapping the fd would discard it and cause
// cleartext I/O on a TLS-expecting socket.
//
// We can't cheaply drive a full TLS handshake inside a unit test, so we
// instead assert the structural invariant: MysqlxSession::server_ds() is an
// alias for MysqlxConnection::backend_ds() whenever a backend is attached,
// and the session's placeholder stream is never initialized post-auth.
static void test_server_ds_aliases_backend_conn_backend_ds() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
// With no backend attached yet, server_ds() returns the session's
// placeholder: fd == -1 and status never set to something data-plane
// readers would treat as live.
ok(sess.backend_conn() == nullptr,
"pre-attach: no backend connection is associated with the session");
ok(sess.server_ds().get_fd() == -1,
"pre-attach: server_ds() placeholder has fd == -1");
// Attach a backend and initialize its backend_ds() (as the real auth
// path does via MysqlxConnection::init_backend_ds()).
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::IDLE);
conn->set_reusable(true);
conn->init_backend_ds(backend_fds[0]);
sess.backend_conn() = conn;
// The critical invariant: server_ds() now aliases backend_conn_->backend_ds().
// If this ever regresses to wrapping the raw fd in a fresh stream, the
// TLS state living on backend_conn_->backend_ds_ would be bypassed.
ok(&sess.server_ds() == &sess.backend_conn()->backend_ds(),
"post-attach: server_ds() aliases backend_conn_->backend_ds() (TLS state preserved)");
ok(sess.server_ds().get_fd() == backend_fds[0],
"post-attach: server_ds().get_fd() reflects the backend fd");
// return_backend_to_pool (via session close) must not tear down the
// TLS-aware stream on the cached connection. Driving this path via the
// session handler while the backend connection is null would invoke
// return_backend_to_pool(); here we simply verify that after clearing
// the backend pointer the placeholder is once again what server_ds()
// returns and its fd is still -1.
delete conn;
sess.backend_conn() = nullptr;
ok(sess.server_ds().get_fd() == -1,
"post-detach: server_ds() placeholder has fd == -1 (never re-initialized)");
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
close(backend_fds[0]); close(backend_fds[1]);
}
static void test_forward_empty_frame() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::IDLE);
conn->set_reusable(true);
sess.backend_conn() = conn;
sess.server_ds().init(XDS_BACKEND, backend_fds[0]);
sess.to_process = true;
Mysqlx::Sql::StmtExecute stmt;
stmt.set_stmt("SELECT 1");
std::string stmt_s;
stmt.SerializeToString(&stmt_s);
write_x_frame(client_fds[1], Mysqlx::ClientMessages_Type_SQL_STMT_EXECUTE,
reinterpret_cast<const uint8_t*>(stmt_s.data()), stmt_s.size());
sess.set_status(MysqlxSession::WAITING_CLIENT_XMSG);
sess.handler();
uint8_t buf[4096];
usleep(5000);
ssize_t r = read_x_frame(backend_fds[1], buf, sizeof(buf));
ok(r > 0, "frame forwarded to backend");
if (r > 0) {
ok(buf[4] == Mysqlx::ClientMessages_Type_SQL_STMT_EXECUTE,
"forwarded message type is SQL_STMT_EXECUTE");
} else {
ok(false, "forwarded message type is SQL_STMT_EXECUTE");
}
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
close(backend_fds[0]); close(backend_fds[1]);
}
// Mirrors the atomic replace sequence used by sync_disk_to_memory() /
// copy_to_runtime() in plugins/mysqlx/src/mysqlx_plugin.cpp. Kept in the
// test to exercise the invariant that an empty source table overwrites a
// populated destination — the skip (if count==0) that used to guard this
// was a correctness bug that left stale rows in main.* across restarts.
//
// The signature/semantics match the production replace_table_atomically():
// every execute() return is checked, a failure at any step triggers ROLLBACK
// and the function returns false. This preserves the destination's
// pre-transaction state when the INSERT fails — the atomicity guarantee the
// transaction wrap is supposed to deliver.
static bool replace_table_contents(SQLite3DB& db,
const char* dest_table,
const char* source_table) {
if (!db.execute("BEGIN")) {
return false;
}
std::string q = "DELETE FROM ";
q += dest_table;
if (!db.execute(q.c_str())) {
db.execute("ROLLBACK");
return false;
}
q = "INSERT INTO ";
q += dest_table;
q += " SELECT * FROM ";
q += source_table;
if (!db.execute(q.c_str())) {
db.execute("ROLLBACK");
return false;
}
if (!db.execute("COMMIT")) {
db.execute("ROLLBACK");
return false;
}
return true;
}
static void test_empty_source_clears_stale_dest() {
diag(">>> %s", __func__);
SQLite3DB db;
db.open(const_cast<char*>(":memory:"),
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
db.execute("CREATE TABLE src (id INT PRIMARY KEY, name VARCHAR)");
db.execute("CREATE TABLE dst (id INT PRIMARY KEY, name VARCHAR)");
// Dest starts with stale rows; source is empty. This is exactly the
// scenario the old `if (count == 0) continue;` mishandled — after
// restart, sync_disk_to_memory would see disk count==0, skip, and
// leave the stale rows in main.*.
db.execute("INSERT INTO dst (id, name) VALUES (1, 'stale_a')");
db.execute("INSERT INTO dst (id, name) VALUES (2, 'stale_b')");
int src_cnt = db.return_one_int("SELECT COUNT(*) FROM src");
int dst_cnt_before = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(src_cnt == 0 && dst_cnt_before == 2,
"precondition: empty source, 2 stale rows in dest");
replace_table_contents(db, "dst", "src");
int dst_cnt_after = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(dst_cnt_after == 0,
"empty source overwrites stale dest (dest is empty after replace)");
}
// Verifies that when INSERT fails mid-transaction, the DELETE is rolled back
// and the destination retains its pre-transaction contents. Without the
// execute()-return checks added by this follow-up, the unconditional COMMIT
// would persist the DELETE and silently wipe the destination.
static void test_insert_failure_rolls_back() {
diag(">>> %s", __func__);
SQLite3DB db;
db.open(const_cast<char*>(":memory:"),
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
// src has odd ids; dst has a CHECK that requires even ids. The INSERT
// will fail at the first odd value, leaving the transaction needing
// ROLLBACK to preserve the stale rows we seeded into dst.
db.execute("CREATE TABLE src (id INT PRIMARY KEY)");
db.execute("CREATE TABLE dst (id INT PRIMARY KEY CHECK (id % 2 = 0))");
db.execute("INSERT INTO src (id) VALUES (1)");
db.execute("INSERT INTO src (id) VALUES (3)");
db.execute("INSERT INTO dst (id) VALUES (10)");
db.execute("INSERT INTO dst (id) VALUES (20)");
int dst_cnt_before = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(dst_cnt_before == 2,
"precondition: dst has 2 pre-existing rows");
bool ok_return = replace_table_contents(db, "dst", "src");
ok(ok_return == false,
"replace_table_contents returns false when INSERT violates CHECK");
int dst_cnt_after = db.return_one_int("SELECT COUNT(*) FROM dst");
ok(dst_cnt_after == 2,
"dst still has 2 rows after failed INSERT (DELETE was rolled back)");
int dst_sum = db.return_one_int("SELECT COALESCE(SUM(id), 0) FROM dst");
ok(dst_sum == 30,
"dst retains original rows (sum=30), not the invalid odd ids from src");
}
static void test_listener_route_tracking() {
diag(">>> %s", __func__);
// Construct two threads; associate a named route listener with each and
// verify that `remove_listener_for_route` tears down only the target
// listener and is idempotent when called a second time for the same name.
Mysqlx_Thread thr0;
Mysqlx_Thread thr1;
thr0.init(0);
thr1.init(1);
int rc_a = thr0.add_listener("127.0.0.1", 0, "A");
int rc_b = thr1.add_listener("127.0.0.1", 0, "B");
ok(rc_a == 0 && thr0.get_listener_count() == 1,
"thread 0 has 1 listener for route A");
ok(rc_b == 0 && thr1.get_listener_count() == 1,
"thread 1 has 1 listener for route B");
bool removed_a = thr0.remove_listener_for_route("A");
ok(removed_a && thr0.get_listener_count() == 0,
"remove_listener_for_route('A') removed thread 0's only listener");
bool removed_again = thr0.remove_listener_for_route("A");
ok(!removed_again,
"second remove_listener_for_route('A') returns false (idempotent)");
thr1.remove_listeners();
}
static void test_listener_reconciliation() {
diag(">>> %s", __func__);
// Populate a fresh MysqlxConfigStore with one active route and verify
// that `mysqlx_reconcile_listeners_impl` binds exactly one listener
// across the two-thread pool and records the ownership mapping.
// (The reconciler now reads its desired route set from the store, not
// from runtime_mysqlx_routes — which is an admin-side projection of the
// store, only populated on demand.)
MysqlxConfigStore store;
{
std::unordered_map<std::string, MysqlxRoute> routes;
MysqlxRoute r {};
r.name = "route_rec";
r.bind = "127.0.0.1:0";
r.destination_hostgroup = 0;
r.active = true;
routes.emplace("route_rec", r);
store.install_for_test(std::move(routes), {});
}
std::vector<std::unique_ptr<Mysqlx_Thread>> threads;
for (int i = 0; i < 2; i++) {
auto t = std::make_unique<Mysqlx_Thread>();
t->init(i);
threads.push_back(std::move(t));
}
std::map<std::string, int> route_to_thread;
std::mutex route_map_mutex;
int next_rr_index = 0;
mysqlx_reconcile_listeners_impl(
store, threads, route_to_thread, route_map_mutex, next_rr_index
);
int total = threads[0]->get_listener_count() + threads[1]->get_listener_count();
ok(total == 1 && route_to_thread.count("route_rec") == 1,
"reconcile bound exactly one listener for route_rec and recorded mapping");
for (auto& t : threads) {
t->remove_listeners();
}
}
// Ask the kernel for a free TCP port by binding to :0, reading back the
// assignment, and closing. Leaves a brief TIME_WAIT window but the listeners
// we build use SO_REUSEADDR so reuse is safe.
static int pick_free_port() {
int s = socket(AF_INET, SOCK_STREAM, 0);
if (s < 0) return -1;
int opt = 1;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = 0;
if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
close(s);
return -1;
}
socklen_t len = sizeof(addr);
if (getsockname(s, (struct sockaddr*)&addr, &len) < 0) {
close(s);
return -1;
}
int port = ntohs(addr.sin_port);
close(s);
return port;
}
static void test_listener_reconciliation_bind_change() {
diag(">>> %s", __func__);
// Verify that mutating a route's `bind` in MysqlxConfigStore and re-running
// the reconciler closes the old listener and opens a new one at the new
// port — i.e. the reconciler does NOT silently keep the stale listener
// alive just because the route name is unchanged.
int port1 = pick_free_port();
int port2 = pick_free_port();
ok(port1 > 0 && port2 > 0 && port1 != port2,
"picked two distinct free ports for bind-change reconcile test");
auto install_route_with_bind = [](MysqlxConfigStore& store, int port) {
std::unordered_map<std::string, MysqlxRoute> routes;
MysqlxRoute r {};
r.name = "reads";
r.bind = std::string("127.0.0.1:") + std::to_string(port);
r.destination_hostgroup = 0;
r.active = true;
routes.emplace("reads", r);
store.install_for_test(std::move(routes), {});
};
MysqlxConfigStore store_v1;
install_route_with_bind(store_v1, port1);
std::vector<std::unique_ptr<Mysqlx_Thread>> threads;
for (int i = 0; i < 2; i++) {
auto t = std::make_unique<Mysqlx_Thread>();
t->init(i);
threads.push_back(std::move(t));
}
std::map<std::string, int> route_to_thread;
std::mutex route_map_mutex;
int next_rr_index = 0;
mysqlx_reconcile_listeners_impl(
store_v1, threads, route_to_thread, route_map_mutex, next_rr_index
);
int total1 = threads[0]->get_listener_count() + threads[1]->get_listener_count();
ok(total1 == 1 && route_to_thread.count("reads") == 1,
"initial reconcile: one listener for 'reads' at port1");
// Re-install with the new bind and re-reconcile.
MysqlxConfigStore store_v2;
install_route_with_bind(store_v2, port2);
mysqlx_reconcile_listeners_impl(
store_v2, threads, route_to_thread, route_map_mutex, next_rr_index
);
int total2 = threads[0]->get_listener_count() + threads[1]->get_listener_count();
ok(total2 == 1,
"bind change reconcile: still exactly one listener (old closed, new opened)");
// Confirm the current listener is bound to port2, not port1.
int owner_tidx = route_to_thread.count("reads") ? route_to_thread["reads"] : -1;
std::string addr = (owner_tidx >= 0)
? threads[owner_tidx]->get_listener_addr_for_route("reads")
: std::string();
char expected[64];
snprintf(expected, sizeof(expected), "127.0.0.1:%d", port2);
ok(addr == expected,
"bind change reconcile: listener now bound to the new port");
for (auto& t : threads) {
t->remove_listeners();
}
}
static void test_route_exists_predicate() {
diag(">>> %s", __func__);
// Empty store: any lookup returns false.
{
MysqlxConfigStore store;
ok(store.route_exists("nope") == false,
"route_exists returns false for unknown route on empty store");
}
// Populated store: reports true for configured routes and false for
// unconfigured names, letting callers distinguish "unknown route" from
// "route exists but has no backend".
{
MysqlxConfigStore store;
std::unordered_map<std::string, MysqlxRoute> routes;
MysqlxRoute r {};
r.name = "reads";
r.destination_hostgroup = 20;
routes.emplace("reads", r);
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> endpoints;
store.install_for_test(std::move(routes), std::move(endpoints));
ok(store.route_exists("reads") == true,
"route_exists returns true for configured route");
ok(store.route_exists("writes") == false,
"route_exists returns false for route name not in store");
}
}
// --- resolve_backend_target() tests ---
// Shared fixture: populate a config store with a single route "reads" that
// points at hostgroup 20, which in turn has one endpoint on 127.0.0.1:33060.
static void install_reads_route_with_endpoint(MysqlxConfigStore& store) {
std::unordered_map<std::string, MysqlxRoute> routes;
MysqlxRoute r {};
r.name = "reads";
r.destination_hostgroup = 20;
r.strategy = "first_available";
routes.emplace("reads", r);
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> endpoints;
MysqlxBackendEndpoint ep {};
ep.hostname = "127.0.0.1";
ep.mysql_port = 3306;
ep.mysqlx_port = 33060;
endpoints[20].push_back(ep);
store.install_for_test(std::move(routes), std::move(endpoints));
}
// Route "reads" -> hostgroup 20, but hostgroup 20 has no endpoints. Used
// to exercise the no-backend (4002) path distinctly from unknown-route.
static void install_reads_route_without_endpoint(MysqlxConfigStore& store) {
std::unordered_map<std::string, MysqlxRoute> routes;
MysqlxRoute r {};
r.name = "reads";
r.destination_hostgroup = 20;
r.strategy = "first_available";
routes.emplace("reads", r);
std::unordered_map<int, std::vector<MysqlxBackendEndpoint>> endpoints;
store.install_for_test(std::move(routes), std::move(endpoints));
}
static void test_routing_happy_path() {
diag(">>> %s", __func__);
MysqlxConfigStore store;
install_reads_route_with_endpoint(store);
Mysqlx_Thread thr;
thr.init(0);
thr.set_config_store(&store);
MysqlxSession sess;
sess.init(-1, &thr);
MysqlxResolvedIdentity id {};
id.username = "testuser";
id.x_enabled = true;
id.default_route = "reads";
sess.inject_identity_for_test(id);
int rc = sess.resolve_backend_target_for_test();
ok(rc == 0, "resolve_backend_target returns 0 on happy path");
ok(sess.target_hostgroup_for_test() == 20 &&
sess.target_address_for_test() == "127.0.0.1" &&
sess.target_port_for_test() == 33060,
"target fields populated: hostgroup=20, address=127.0.0.1, port=33060");
}
static void test_routing_no_default_route() {
diag(">>> %s", __func__);
MysqlxConfigStore store; // intentionally empty
Mysqlx_Thread thr;
thr.init(0);
thr.set_config_store(&store);
MysqlxSession sess;
sess.init(-1, &thr);
MysqlxResolvedIdentity id {};
id.username = "u";
id.x_enabled = true;
id.default_route = ""; // no default route assigned
sess.inject_identity_for_test(id);
int rc = sess.resolve_backend_target_for_test();
ok(rc == 4000, "resolve returns 4000 for empty default_route");
}
static void test_routing_unknown_route() {
diag(">>> %s", __func__);
MysqlxConfigStore store; // no "nope" route installed
Mysqlx_Thread thr;
thr.init(0);
thr.set_config_store(&store);
MysqlxSession sess;
sess.init(-1, &thr);
MysqlxResolvedIdentity id {};
id.username = "u";
id.x_enabled = true;
id.default_route = "nope";
sess.inject_identity_for_test(id);
int rc = sess.resolve_backend_target_for_test();
ok(rc == 4001, "resolve returns 4001 for unknown route");
}
static void test_routing_no_backend() {
diag(">>> %s", __func__);
MysqlxConfigStore store;
install_reads_route_without_endpoint(store);
Mysqlx_Thread thr;
thr.init(0);
thr.set_config_store(&store);
MysqlxSession sess;
sess.init(-1, &thr);
MysqlxResolvedIdentity id {};
id.username = "u";
id.x_enabled = true;
id.default_route = "reads";
sess.inject_identity_for_test(id);
int rc = sess.resolve_backend_target_for_test();
ok(rc == 4002, "resolve returns 4002 when route has no endpoints");
}
// For each failure mode, record_conn_err() must fire with a specific
// (route_name, hostgroup) tuple. Covering all three in one test keeps the
// assertion count aligned with the plan while still exercising each
// (route, hg) contract.
static void test_routing_stats_on_failure() {
diag(">>> %s", __func__);
// Empty default_route -> ("", 0)
{
MysqlxConfigStore store;
Mysqlx_Thread thr;
thr.init(0);
thr.set_config_store(&store);
MysqlxSession sess;
sess.init(-1, &thr);
MysqlxResolvedIdentity id {};
id.x_enabled = true;
id.default_route = "";
sess.inject_identity_for_test(id);
mysqlx_stats().reset_for_test();
sess.resolve_backend_target_for_test();
auto a = mysqlx_stats().get_last_conn_err_for_test();
// Unknown route -> ("nope", 0)
MysqlxConfigStore store2;
Mysqlx_Thread thr2;
thr2.init(0);
thr2.set_config_store(&store2);
MysqlxSession sess2;
sess2.init(-1, &thr2);
MysqlxResolvedIdentity id2 {};
id2.x_enabled = true;
id2.default_route = "nope";
sess2.inject_identity_for_test(id2);
mysqlx_stats().reset_for_test();
sess2.resolve_backend_target_for_test();
auto b = mysqlx_stats().get_last_conn_err_for_test();
// No backend -> ("reads", 20)
MysqlxConfigStore store3;
install_reads_route_without_endpoint(store3);
Mysqlx_Thread thr3;
thr3.init(0);
thr3.set_config_store(&store3);
MysqlxSession sess3;
sess3.init(-1, &thr3);
MysqlxResolvedIdentity id3 {};
id3.x_enabled = true;
id3.default_route = "reads";
sess3.inject_identity_for_test(id3);
mysqlx_stats().reset_for_test();
sess3.resolve_backend_target_for_test();
auto c = mysqlx_stats().get_last_conn_err_for_test();
bool all_match =
a.has_value() && a->first == "" && a->second == 0 &&
b.has_value() && b->first == "nope" && b->second == 0 &&
c.has_value() && c->first == "reads" && c->second == 20;
ok(all_match,
"record_conn_err tuples: ('',0) for no-default; ('nope',0) for "
"unknown; ('reads',20) for no-backend");
}
}
// Drive the auth-start PLAIN path with an identity lookup that always
// returns nullopt. Expected: session emits Error 1045 and goes unhealthy
// without ever transitioning to WAITING_CLIENT_XMSG or sending Ok.
static void test_routing_unknown_user() {
diag(">>> %s", __func__);
int fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
MysqlxSession sess;
sess.init(fds[0], nullptr);
sess.set_identity_lookup([](const std::string&)
-> std::optional<MysqlxResolvedIdentity> { return std::nullopt; });
// PLAIN requires TLS at the protocol layer; the test covers only the
// lookup-returns-nullopt branch in MYSQL41 challenge response.
sess.to_process = true;
Mysqlx::Session::AuthenticateStart auth_start;
auth_start.set_mech_name("MYSQL41");
auth_start.set_auth_data(std::string("\0\0ghost", 8));
std::string serialized;
auth_start.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_START,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.handler();
usleep(5000);
uint8_t buf[4096];
read_x_frame(fds[1], buf, sizeof(buf));
Mysqlx::Session::AuthenticateContinue cont;
cont.set_auth_data("*0000000000000000000000000000000000000000");
cont.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_CONTINUE,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.to_process = true;
sess.handler();
ok(!sess.is_healthy() &&
sess.get_status() != MysqlxSession::WAITING_CLIENT_XMSG,
"unknown user: session unhealthy and did not reach WAITING_CLIENT_XMSG");
detach_session_fds(sess);
close(fds[0]); close(fds[1]);
}
// Integration test for the Task 4 wiring: PLAIN auth succeeds credential-
// wise (correct password) but the resolved identity carries an empty
// default_route. Expected outcome after resolve_backend_target() runs
// pre-Ok:
// - session transitions to X_SESSION_CLOSING (not WAITING_CLIENT_XMSG).
// - is_healthy() is false.
// - the client sees an X-Protocol ERROR frame on the wire, NOT an Ok.
// Prior to this wiring, sessions reached WAITING_CLIENT_XMSG with
// target_address_ == "" and target_port_ == 0, then attempted to connect
// to ""; port 0 on the first client query. This test locks that door.
static void test_plain_auth_empty_default_route_closes_session() {
diag(">>> %s", __func__);
int fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
// Dedicated store/thread for this test so we don't pollute the shared
// default fixture. The store is intentionally *not* populated with
// "default_test_route" here; what matters is that the identity_lookup
// returns an identity whose default_route is empty, which short-circuits
// resolve_backend_target() with 4000 before any route lookup runs.
MysqlxConfigStore store;
Mysqlx_Thread thr;
thr.init(0);
thr.set_config_store(&store);
MysqlxSession sess;
sess.init(fds[0], &thr);
// PLAIN auth requires TLS at the protocol layer; mark the client data
// stream as encrypted so handle_auth_plain() proceeds past the
// "PLAIN authentication requires TLS" gate.
sess.client_ds().set_encrypted(true);
// Identity lookup returns an enabled user with a KNOWN password and
// an EMPTY default_route. Credential verification must succeed so the
// auth path reaches resolve_backend_target(), which then fails with 4000.
sess.set_identity_lookup([](const std::string& username)
-> std::optional<MysqlxResolvedIdentity> {
MysqlxResolvedIdentity id {};
id.username = username;
id.x_enabled = true;
id.password = "secret123"; // cleartext; derive_stored_hash SHA1s it.
id.default_route = ""; // the point of this test
return id;
});
// Send AuthenticateStart with PLAIN mechanism and correct password.
// PLAIN auth_data is \0 <authzid> \0 <user> \0 <password> ; we use
// an empty authzid, "testuser", then "secret123".
sess.to_process = true;
Mysqlx::Session::AuthenticateStart auth_start;
auth_start.set_mech_name("PLAIN");
std::string plain_payload;
plain_payload.push_back('\0');
plain_payload.append("testuser");
plain_payload.push_back('\0');
plain_payload.append("secret123");
auth_start.set_auth_data(plain_payload);
std::string serialized;
auth_start.SerializeToString(&serialized);
write_x_frame(fds[1], Mysqlx::ClientMessages_Type_SESS_AUTHENTICATE_START,
reinterpret_cast<const uint8_t*>(serialized.data()), serialized.size());
sess.handler();
ok(sess.get_status() == MysqlxSession::X_SESSION_CLOSING,
"PLAIN auth + empty default_route: session in X_SESSION_CLOSING");
ok(!sess.is_healthy(),
"PLAIN auth + empty default_route: session marked unhealthy");
usleep(5000);
uint8_t buf[4096];
ssize_t r = read_x_frame(fds[1], buf, sizeof(buf));
ok(r > 5 && buf[4] == Mysqlx::ServerMessages_Type_ERROR,
"PLAIN auth + empty default_route: client receives ERROR frame (not Ok)");
detach_session_fds(sess);
close(fds[0]); close(fds[1]);
}
// Exercise the session's backend-connection setup site in
// handler_connecting_server and assert that the value passed to
// set_backend_user() tracks the resolved identity rather than only the
// frontend username. Pre-fix (PR #5641), the site hardcoded
// backend_conn_->set_backend_user(username_.c_str()) while pairing it
// with identity_->backend_password — which, for mysqlx_users rows whose
// backend_auth_mode is `service_account` and whose backend_username
// differs from the frontend username, paired userA's password with
// userB's name and broke backend auth.
//
// Strategy: drive the session far enough that username_ and identity_
// are both populated (setup_authenticated_session), then pre-seed
// backend_conn_ in AUTHENTICATING + BACKEND_AUTH_NOT_STARTED state
// attached to a valid socketpair fd. Flipping status_ to
// CONNECTING_SERVER and firing handler() lands us at the set_backend_user
// call site without invoking the real TCP connect. The socketpair is
// nonblocking (MysqlxDataStream::init sets O_NONBLOCK) so
// step_auth_capabilities_get() writes the cap-get frame and returns;
// the subsequent try_read_one_frame() sees nothing and the handler
// bails out cleanly with backend_user_ observable via the test accessor.
static void test_backend_user_uses_backend_username_when_set() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
// Replace identity_ so it carries a DISTINCT backend_username, mimicking
// a service_account-mode mysqlx_users row.
MysqlxResolvedIdentity id {};
id.username = "front_user";
id.x_enabled = true;
id.default_route = "default_test_route";
id.backend_auth_mode = MysqlxBackendAuthMode::service_account;
id.backend_username = "svc_account_42";
id.backend_password = "svc_pw";
sess.inject_identity_for_test(id);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::AUTHENTICATING);
sess.backend_conn() = conn;
sess.set_status(MysqlxSession::CONNECTING_SERVER);
sess.to_process = true;
sess.handler();
ok(std::string(conn->get_backend_user_for_test()) == "svc_account_42",
"service_account mode: set_backend_user uses identity_->backend_username");
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
close(backend_fds[0]); close(backend_fds[1]);
}
// Mirror test for the mapped-mode (default) path: identity has an EMPTY
// backend_username, so the fallback must reuse the frontend username_.
// setup_authenticated_session drives MYSQL41 auth with "testuser" as the
// frontend username, so that is what we expect to land in backend_user_.
static void test_backend_user_falls_back_to_frontend_username() {
diag(">>> %s", __func__);
int client_fds[2], backend_fds[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds);
socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds);
MysqlxSession sess;
setup_authenticated_session(client_fds, sess);
// Re-inject identity with EMPTY backend_username: mapped-mode default.
MysqlxResolvedIdentity id {};
id.username = "testuser";
id.x_enabled = true;
id.default_route = "default_test_route";
id.backend_auth_mode = MysqlxBackendAuthMode::mapped;
id.backend_username = ""; // the point of this test
sess.inject_identity_for_test(id);
MysqlxConnection* conn = new MysqlxConnection();
conn->set_fd(backend_fds[0]);
conn->set_state(MysqlxConnection::AUTHENTICATING);
sess.backend_conn() = conn;
sess.set_status(MysqlxSession::CONNECTING_SERVER);
sess.to_process = true;
sess.handler();
ok(std::string(conn->get_backend_user_for_test()) == "testuser",
"mapped mode (empty backend_username): set_backend_user falls back to frontend username_");
detach_session_fds(sess);
close(client_fds[0]); close(client_fds[1]);
close(backend_fds[0]); close(backend_fds[1]);
}
int main() {
setvbuf(stdout, nullptr, _IOLBF, 0);
plan(74);
diag("=== mysqlx_robustness_unit-t starting ===");
test_server_response_terminal_frame();
test_server_response_non_terminal_keeps_waiting();
test_server_response_multi_frame_pipeline();
test_backend_disconnect_during_query();
test_backend_fd_negative_no_crash();
test_mysql41_no_credential_lookup_rejects();
test_auth_start_empty_payload();
test_auth_start_malformed_protobuf();
test_auth_challenge_wrong_msg_type();
test_return_backend_no_thread();
test_connection_limit_config();
test_client_disconnect_detected();
test_check_connect_bad_fd();
test_check_connect_success_path();
test_server_ds_aliases_backend_conn_backend_ds();
test_forward_empty_frame();
test_empty_source_clears_stale_dest();
test_insert_failure_rolls_back();
test_listener_route_tracking();
test_listener_reconciliation();
test_listener_reconciliation_bind_change();
test_route_exists_predicate();
test_routing_happy_path();
test_routing_no_default_route();
test_routing_unknown_route();
test_routing_no_backend();
test_routing_stats_on_failure();
test_routing_unknown_user();
test_plain_auth_empty_default_route_closes_session();
test_backend_user_uses_backend_username_when_set();
test_backend_user_falls_back_to_frontend_username();
return exit_status();
}