diff --git a/plugins/mysqlx/include/mysqlx_session.h b/plugins/mysqlx/include/mysqlx_session.h index da551adc9..b8eacfbe9 100644 --- a/plugins/mysqlx/include/mysqlx_session.h +++ b/plugins/mysqlx/include/mysqlx_session.h @@ -299,6 +299,17 @@ private: uint64_t start_time_; uint64_t last_active_time_; MysqlxResponseState response_state_; + // Sub-state flag for the response_state_ matrix: gates whether + // RESULTSET_ROW frames are currently allowed in states where the + // X-Protocol requires ColumnMetaData to precede any Row. Set when + // COLUMN_META_DATA is forwarded from the backend in + // handler_waiting_server_msg(); cleared on each transition into a + // state that begins a new column-metadata sequence (STMT_EXECUTE, + // CRUD, PREPARE_EXECUTE, CURSOR_OPEN), at terminal-frame flush, and + // at init() / reset(). NOT cleared on entry to CURSOR_FETCH — per + // the X-Protocol spec, ColumnMetaData is sent at Cursor::Open and + // not re-sent at Cursor::Fetch, so the flag must carry across. + bool seen_column_metadata_; MysqlxTlsMode tls_mode_; // Compression negotiation state. compression_algo_ is set by @@ -375,6 +386,16 @@ public: const std::vector& client_write_buffer_for_test() const { return client_ds_.write_buffer_raw(); } +#ifdef MYSQLX_TEST_BUILD + // Test-only observers for the per-message response state machine, + // gated behind MYSQLX_TEST_BUILD because they expose private state + // the production code never reads back. Used by the validation + // tests in mysqlx_message_dispatch_unit-t to assert state + // transitions after dispatching synthetic backend frames. + MysqlxResponseState response_state_for_test() const { return response_state_; } + bool seen_column_metadata_for_test() const { return seen_column_metadata_; } + void set_response_state_for_test(MysqlxResponseState s) { response_state_ = s; } +#endif /* MYSQLX_TEST_BUILD */ }; #endif diff --git a/plugins/mysqlx/src/mysqlx_session.cpp b/plugins/mysqlx/src/mysqlx_session.cpp index 374fc4034..072116174 100644 --- a/plugins/mysqlx/src/mysqlx_session.cpp +++ b/plugins/mysqlx/src/mysqlx_session.cpp @@ -99,6 +99,7 @@ MysqlxSession::MysqlxSession() , start_time_(0) , last_active_time_(0) , response_state_(RESP_IDLE) + , seen_column_metadata_(false) , tls_mode_(TLS_OFF) , compression_algo_(MYSQLX_COMPR_NONE) , compression_combine_mixed_messages_(false) @@ -152,6 +153,8 @@ void MysqlxSession::init(int fd, Mysqlx_Thread* thread_ptr) { compression_combine_mixed_messages_ = false; compression_max_combine_messages_ = 0; reset_compression_state(); + response_state_ = RESP_IDLE; + seen_column_metadata_ = false; } void MysqlxSession::reset() { @@ -174,6 +177,8 @@ void MysqlxSession::reset() { compression_max_combine_messages_ = 0; reset_compression_state(); pre_auth_cap_msgs_ = 0; + response_state_ = RESP_IDLE; + seen_column_metadata_ = false; } int MysqlxSession::handler() { @@ -827,6 +832,7 @@ int MysqlxSession::dispatch_client_message(uint8_t msg_type) { return 0; case Mysqlx::ClientMessages_Type_SQL_STMT_EXECUTE: response_state_ = RESP_WAITING_STMT_EXECUTE; + seen_column_metadata_ = false; forward_to_backend(); return 0; case Mysqlx::ClientMessages_Type_CRUD_FIND: case Mysqlx::ClientMessages_Type_CRUD_INSERT: @@ -836,6 +842,7 @@ int MysqlxSession::dispatch_client_message(uint8_t msg_type) { case Mysqlx::ClientMessages_Type_CRUD_MODIFY_VIEW: case Mysqlx::ClientMessages_Type_CRUD_DROP_VIEW: response_state_ = RESP_WAITING_CRUD; + seen_column_metadata_ = false; forward_to_backend(); return 0; case Mysqlx::ClientMessages_Type_PREPARE_PREPARE: if (backend_conn_) backend_conn_->set_has_prepared_statement(true); @@ -843,14 +850,19 @@ int MysqlxSession::dispatch_client_message(uint8_t msg_type) { forward_to_backend(); return 0; case Mysqlx::ClientMessages_Type_PREPARE_EXECUTE: response_state_ = RESP_WAITING_PREPARE_EXECUTE; + seen_column_metadata_ = false; forward_to_backend(); return 0; case Mysqlx::ClientMessages_Type_PREPARE_DEALLOCATE: response_state_ = RESP_WAITING_PREPARE_DEALLOCATE; forward_to_backend(); return 0; case Mysqlx::ClientMessages_Type_CURSOR_OPEN: response_state_ = RESP_WAITING_CURSOR_OPEN; + seen_column_metadata_ = false; forward_to_backend(); return 0; case Mysqlx::ClientMessages_Type_CURSOR_FETCH: + // NOT cleared — Cursor::Fetch reuses ColumnMetaData from + // the preceding Cursor::Open, so the sub-state must carry + // across. response_state_ = RESP_WAITING_CURSOR_FETCH; forward_to_backend(); return 0; case Mysqlx::ClientMessages_Type_CURSOR_CLOSE: @@ -920,23 +932,78 @@ void MysqlxSession::forward_to_backend() { status_ = WAITING_SERVER_XMSG; } -// Permissive in this commit: returns true for every msg_type the previous -// code path implicitly accepted (which was "everything", since the old -// validation loop never rejected a frame — it only checked terminality -// and forwarded otherwise). The explicit allowed-frame contract is -// tightened in a follow-up commit; here we only split the predicate so -// the validation hook has a separate seam from the terminality probe. +// Per-state allowed-frame contract for backend frames. Returns true iff +// a backend message of msg_type is acceptable to forward in the current +// response_state_. Disallowed frames are rejected by the validation +// hook in handler_waiting_server_msg() with X-Protocol Error 4006. // -// NOTICE and ERROR are universal in every state. The remaining frames -// are matched against a per-state allow-set; states with no explicit -// case (RESP_IDLE today) currently fall through to the permissive -// default. The next commit narrows this; today the function returns -// true so behaviour is byte-for-byte identical to the prior path. -bool MysqlxSession::is_frame_allowed(uint8_t /*msg_type*/) const { - // Permissive pass-through for the refactor commit; behaviour is - // preserved byte-for-byte. Subsequent commits replace this body - // with the per-state allowed-frame matrix. - return true; +// Universal frames (allowed in every state): +// - NOTICE (non-terminal status messages, may be interleaved freely) +// - ERROR (terminates the response sequence with a per-message error) +// +// Per-state extras (in addition to the terminal frames already enumerated +// in is_terminal_frame): COLUMN_META_DATA, RESULTSET_ROW, and the +// non-terminal FETCH_DONE_MORE_* boundaries. RESULTSET_ROW is only +// allowed once seen_column_metadata_ has been set by a preceding +// COLUMN_META_DATA frame in the same response, except in CURSOR_FETCH +// where the metadata was sent at Cursor::Open and is already in scope. +// +// RESP_IDLE accepts no frames at all — any backend frame in that state +// is unsolicited and indicates a protocol-confused backend. +bool MysqlxSession::is_frame_allowed(uint8_t msg_type) const { + if (msg_type == Mysqlx::ServerMessages_Type_NOTICE) return true; + if (msg_type == Mysqlx::ServerMessages_Type_ERROR) return true; + if (is_terminal_frame(msg_type)) return true; + + switch (response_state_) { + case RESP_WAITING_STMT_EXECUTE: + case RESP_WAITING_CRUD: + case RESP_WAITING_PREPARE_EXECUTE: + case RESP_WAITING_CURSOR_OPEN: + // Resultset shape: ColumnMetaData, then zero or more Row, + // then non-terminal FetchDoneMore* boundaries between + // resultsets / out-params before the actual terminator. + if (msg_type == Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA) { + return true; + } + if (msg_type == Mysqlx::ServerMessages_Type_RESULTSET_ROW) { + // Row is only legal after metadata has been forwarded + // in this response. Without this guard a hostile + // backend could spray rows the client cannot parse. + return seen_column_metadata_; + } + if (msg_type == Mysqlx::ServerMessages_Type_RESULTSET_FETCH_DONE_MORE_RESULTSETS) { + return true; + } + // FETCH_DONE_MORE_OUT_PARAMS only flows through stored-proc + // shapes; STMT_EXECUTE and PREPARE_EXECUTE both can produce + // it. Allow on those two; CRUD doesn't have out-params but + // the allow is harmless (still requires a real terminator + // to advance the state machine). + if (msg_type == Mysqlx::ServerMessages_Type_RESULTSET_FETCH_DONE_MORE_OUT_PARAMS) { + return response_state_ == RESP_WAITING_STMT_EXECUTE || + response_state_ == RESP_WAITING_PREPARE_EXECUTE; + } + return false; + case RESP_WAITING_CURSOR_FETCH: + // Cursor::Fetch only carries Row frames (metadata was at + // Cursor::Open). FETCH_DONE / FETCH_SUSPENDED are terminal + // and already accepted via is_terminal_frame above. + return msg_type == Mysqlx::ServerMessages_Type_RESULTSET_ROW; + case RESP_WAITING_PREPARE_PREPARE: + case RESP_WAITING_PREPARE_DEALLOCATE: + case RESP_WAITING_CURSOR_CLOSE: + case RESP_WAITING_EXPECT: + case RESP_WAITING_SESS_RESET: + // These responses carry only their terminal Mysqlx.Ok + // (already handled above) plus universal NOTICE/ERROR. + return false; + case RESP_IDLE: + // No outstanding response — any backend frame here is + // unsolicited. + return false; + } + return false; } bool MysqlxSession::is_terminal_frame(uint8_t msg_type) const { @@ -1034,7 +1101,40 @@ void MysqlxSession::handler_waiting_server_msg() { const auto& frame = server_ds().front_frame(); uint8_t msg_type = frame[4]; + // Per-message response state machine: drop and reject any + // backend frame that is not in the allowed set for the + // current response_state_. This guards against a buggy or + // hostile backend pushing an out-of-shape frame the client + // would otherwise have to parse (and potentially desync on). + // The canonical case: a Row before its ColumnMetaData. We + // emit X-Protocol Error 4006 fatal, mark the backend + // non-reusable so it gets evicted (not pooled) by + // return_backend_to_pool, and transition the session to + // X_SESSION_CLOSING. The disallowed frame is dropped, not + // forwarded — so the bytes_recv counter (charged below in + // the happy path) is naturally not incremented for it. + if (!is_frame_allowed(msg_type)) { + server_ds().pop_frame(); + send_error(4006, + "Backend sent an unexpected message in the current response state; closing session.", + /*fatal=*/true); + if (backend_conn_) backend_conn_->set_reusable(false); + return_backend_to_pool(); + healthy = false; + status_ = X_SESSION_CLOSING; + client_ds_.write_to_net(); + return; + } + forward_frame_to_client(msg_type, frame); + // Track that the backend has shipped ColumnMetaData in this + // response so subsequent Row frames pass the gating check + // in is_frame_allowed. Set after the forward (the forward + // itself can fail TLS write, etc., but we don't unwind state + // on partial-write — the next iteration drives the data plane). + if (msg_type == Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA) { + seen_column_metadata_ = true; + } // Account the X-Protocol payload bytes the proxy is forwarding // from the backend to the client (size minus the 5-byte frame // header; 0-payload OK/EOF frames contribute 0). Charged to the @@ -1061,6 +1161,14 @@ void MysqlxSession::handler_waiting_server_msg() { // bounds how long any single batch can grow. flush_compression_batch(); response_state_ = RESP_IDLE; + // Clear the column-metadata sub-state on every response + // boundary. CURSOR_FETCH does not need the flag carried + // across (its allowed-frame set in is_frame_allowed accepts + // RESULTSET_ROW unconditionally because ColumnMetaData was + // sent at Cursor::Open); STMT_EXECUTE / CRUD / + // PREPARE_EXECUTE / CURSOR_OPEN all explicitly clear the + // flag at dispatch time. + seen_column_metadata_ = false; client_ds_.write_to_net(); return_backend_to_pool(); last_active_time_ = monotonic_time_ms(); diff --git a/test/tap/tests/unit/mysqlx_message_dispatch_unit-t.cpp b/test/tap/tests/unit/mysqlx_message_dispatch_unit-t.cpp index fe5f0c496..a0b25edea 100644 --- a/test/tap/tests/unit/mysqlx_message_dispatch_unit-t.cpp +++ b/test/tap/tests/unit/mysqlx_message_dispatch_unit-t.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -628,6 +629,304 @@ static void test_return_backend_on_session_close() { close(fds[1]); } +// ------------------------------------------------------------------ +// Per-message response-state validation tests (#5694). +// +// Each test drives an authenticated session into a specific +// response_state_, hands the session a fake backend over a socketpair, +// writes one or more synthetic server frames into the backend half of +// the pair, then calls handler() and inspects what was forwarded to +// the client and whether the session was rejected. The fixture mirrors +// check_dispatch_routes_to_backend's setup but uses the test-only +// set_response_state_for_test() to avoid having to drive a full client +// message round-trip per scenario. +// ------------------------------------------------------------------ + +// Helper to send a server-shaped X-Protocol frame. Wire format is +// direction-agnostic (5-byte header: little-endian length + msg type), +// so this is a renamed alias of write_x_frame for reader clarity at +// the call site. +static inline void write_server_frame(int fd, uint8_t msg_type, + const uint8_t* payload, size_t payload_len) { + write_x_frame(fd, msg_type, payload, payload_len); +} + +// Returns true iff the next frame on `fd` has the given server msg +// type. Drains the frame; non-blocking via the socketpair's default. +static bool client_received_frame(int fd, uint8_t expected_msg_type) { + uint8_t buf[4096]; + usleep(5000); + ssize_t r = read_x_frame(fd, buf, sizeof(buf)); + return (r > 4 && buf[4] == expected_msg_type); +} + +// Drains and discards any frames pending on `fd`. Used to clear the +// auth round-trip's leftover bytes before a test starts asserting on +// what the validation hook produced. fd is made non-blocking so the +// drain returns instead of waiting on an empty socket. +static void drain_frames(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + uint8_t buf[4096]; + usleep(5000); + while (true) { + ssize_t r = read(fd, buf, sizeof(buf)); + if (r <= 0) break; + } + fcntl(fd, F_SETFL, flags); +} + +// Common setup: build an authenticated session backed by a fake idle +// backend over a socketpair, drain the auth-OK frame from the client +// side, parking the session in WAITING_SERVER_XMSG with the requested +// response_state_ so the next handler() call enters the validation +// loop. Returns the four fds via out-params; caller closes. +static void setup_session_for_validation(MysqlxSession& sess, + int client_fds[2], + int backend_fds[2], + MysqlxResponseState rs) { + socketpair(AF_UNIX, SOCK_STREAM, 0, client_fds); + socketpair(AF_UNIX, SOCK_STREAM, 0, backend_fds); + + setup_authenticated_session(client_fds, sess); + drain_frames(client_fds[1]); + + MysqlxConnection* conn = new MysqlxConnection(); + conn->set_fd(backend_fds[0]); + conn->set_state(MysqlxConnection::IDLE); + conn->set_reusable(true); + sess.backend_conn() = conn; + sess.server_ds().init(XDS_BACKEND, backend_fds[0]); + + sess.set_status(MysqlxSession::WAITING_SERVER_XMSG); + sess.set_response_state_for_test(rs); + sess.to_process = true; +} + +// Test 1: STMT_EXECUTE → ROW (without preceding ColumnMetaData) → +// reject. Canonical hostile-backend case in the issue. +static void test_validation_stmt_execute_row_without_metadata() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_STMT_EXECUTE); + + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_ROW, nullptr, 0); + sess.handler(); + + ok(client_received_frame(client_fds[1], Mysqlx::ServerMessages_Type_ERROR), + "validation rejects ROW-without-metadata with X-Protocol Error"); + ok(!sess.is_healthy(), + "session marked unhealthy after rejected backend frame"); + ok(sess.get_status() == MysqlxSession::X_SESSION_CLOSING, + "session transitions to X_SESSION_CLOSING on validation reject"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + +// Test 2: STMT_EXECUTE → ColumnMetaData → ROW → SQL_STMT_EXECUTE_OK → +// happy-path forward. Validates the gating doesn't reject legitimate +// well-ordered traffic. +static void test_validation_stmt_execute_metadata_then_row() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_STMT_EXECUTE); + + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_ROW, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_SQL_STMT_EXECUTE_OK, nullptr, 0); + sess.handler(); + + ok(sess.is_healthy(), + "happy-path STMT_EXECUTE response is not rejected"); + ok(sess.response_state_for_test() == RESP_IDLE, + "response_state_ resets to IDLE after terminal"); + ok(!sess.seen_column_metadata_for_test(), + "seen_column_metadata_ cleared at terminal-frame flush"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + +// Test 3: CURSOR_OPEN → ColumnMetaData → ROW → FETCH_SUSPENDED → +// terminal, but seen_column_metadata_ is preserved per X-Protocol +// (Cursor::Fetch reuses the metadata from Cursor::Open). +static void test_validation_cursor_open_fetch_suspended() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_CURSOR_OPEN); + + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_ROW, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_FETCH_SUSPENDED, nullptr, 0); + sess.handler(); + + ok(sess.is_healthy(), + "CursorOpen with FETCH_SUSPENDED is not rejected"); + ok(sess.response_state_for_test() == RESP_IDLE, + "response_state_ resets after FETCH_SUSPENDED terminal"); + // Documented invariant: the column-metadata flag is cleared at + // terminal-frame flush regardless of which response shape ended. + // Cursor::Fetch's allowed-set unconditionally accepts ROW (the + // metadata was sent at Cursor::Open and carries on the wire to + // the client; the proxy's flag does not need to track this). + ok(!sess.seen_column_metadata_for_test(), + "seen_column_metadata_ cleared at FETCH_SUSPENDED terminal " + "(Cursor::Fetch's allow-set does not consult the flag)"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + +// Test 4: CURSOR_OPEN → ColumnMetaData → ROW → FETCH_DONE → terminal +// with the flag cleared. Mirrors test 3 for the fully-drained cursor +// path. +static void test_validation_cursor_open_fetch_done() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_CURSOR_OPEN); + + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_FETCH_DONE, nullptr, 0); + sess.handler(); + + ok(sess.is_healthy(), "CursorOpen with FETCH_DONE is not rejected"); + ok(sess.response_state_for_test() == RESP_IDLE, + "response_state_ resets after FETCH_DONE terminal"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + +// Test 5: PREPARE_PREPARE → SQL_STMT_EXECUTE_OK → reject (only Mysqlx.Ok +// is the valid terminal for Prepare::Prepare). +static void test_validation_prepare_prepare_rejects_stmt_execute_ok() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_PREPARE_PREPARE); + + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_SQL_STMT_EXECUTE_OK, nullptr, 0); + sess.handler(); + + ok(client_received_frame(client_fds[1], Mysqlx::ServerMessages_Type_ERROR), + "PREPARE_PREPARE rejects SQL_STMT_EXECUTE_OK with X-Protocol Error"); + ok(!sess.is_healthy(), + "session marked unhealthy after PREPARE_PREPARE rejection"); + ok(sess.get_status() == MysqlxSession::X_SESSION_CLOSING, + "session transitions to X_SESSION_CLOSING on PREPARE_PREPARE reject"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + +// Test 6: PREPARE_PREPARE → OK → terminal. Happy-path counterpart +// to test 5. +static void test_validation_prepare_prepare_accepts_ok() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_PREPARE_PREPARE); + + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_OK, nullptr, 0); + sess.handler(); + + ok(sess.is_healthy(), "PREPARE_PREPARE happy-path OK is not rejected"); + ok(sess.response_state_for_test() == RESP_IDLE, + "response_state_ resets after PREPARE_PREPARE OK terminal"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + +// Test 7: STMT_EXECUTE → ColumnMetaData → NOTICE → ROW → +// SQL_STMT_EXECUTE_OK. NOTICE is non-terminal in every state and must +// not consume the terminal slot or trigger rejection. +static void test_validation_notice_mid_result() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_STMT_EXECUTE); + + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_COLUMN_META_DATA, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_NOTICE, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_ROW, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_SQL_STMT_EXECUTE_OK, nullptr, 0); + sess.handler(); + + ok(sess.is_healthy(), + "NOTICE mid-result is not rejected"); + ok(sess.response_state_for_test() == RESP_IDLE, + "STMT_EXECUTE_OK terminates response after intermixed NOTICE"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + +// Test 8: CURSOR_FETCH → ROW (with no preceding ColumnMetaData in this +// response sequence) → forwarded, NOT rejected. Validates the +// per-state-pair carve-out — Cursor::Fetch's allowed-set accepts ROW +// unconditionally because the metadata was sent at Cursor::Open time. +static void test_validation_cursor_fetch_row_without_metadata_allowed() { + diag(">>> %s", __func__); + int client_fds[2], backend_fds[2]; + MysqlxSession sess; + setup_session_for_validation(sess, client_fds, backend_fds, + RESP_WAITING_CURSOR_FETCH); + + // Note: seen_column_metadata_ is *false* here — we deliberately + // did NOT pre-set it. The point is to prove CURSOR_FETCH does not + // consult the flag. + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_ROW, nullptr, 0); + write_server_frame(backend_fds[1], + Mysqlx::ServerMessages_Type_RESULTSET_FETCH_DONE, nullptr, 0); + sess.handler(); + + ok(sess.is_healthy(), + "CURSOR_FETCH accepts ROW without per-response metadata " + "(metadata carried over from Cursor::Open in the X-Protocol)"); + ok(sess.response_state_for_test() == RESP_IDLE, + "FETCH_DONE terminates the CURSOR_FETCH response"); + + detach_session_fds(sess); + close(client_fds[0]); close(client_fds[1]); + close(backend_fds[0]); close(backend_fds[1]); +} + int main() { setvbuf(stdout, nullptr, _IOLBF, 0); // Plan count rationale (per assertion source): @@ -677,5 +976,15 @@ int main() { test_forward_to_backend_with_socketpair(); test_return_backend_on_session_close(); + // Per-message response-state validation (#5694). + test_validation_stmt_execute_row_without_metadata(); + test_validation_stmt_execute_metadata_then_row(); + test_validation_cursor_open_fetch_suspended(); + test_validation_cursor_open_fetch_done(); + test_validation_prepare_prepare_rejects_stmt_execute_ok(); + test_validation_prepare_prepare_accepts_ok(); + test_validation_notice_mid_result(); + test_validation_cursor_fetch_row_without_metadata_allowed(); + return exit_status(); }