From 742ff05ecc795fd2fde4b37de6c4292f6ee833e8 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Mon, 2 Mar 2026 16:58:09 +0500 Subject: [PATCH] Fix PostgreSQL transaction state management for pipeline mode Move transaction state tracking from RequestEnd to connection handler to enable proper binary pipeline mode support: - Call handle_transaction_state() at ASYNC_QUERY_END/ASYNC_STMT_EXECUTE_END in PgSQL_Connection instead of RequestEnd - Add reset_state() calls in 7 locations to prevent stale state: * Session reset * Connection detach (finishQuery) * Error handlers (4 paths) * Base session housekeeping - Refactor destructor to use reset_state() Fixes transaction state tracking in pipeline mode where connections can be returned to pool mid-transaction. --- include/PgSQL_ExplicitTxnStateMgr.h | 4 +- include/PgSQL_Session.h | 3 + lib/Base_Session.cpp | 6 ++ lib/PgSQL_Connection.cpp | 59 +++++++-------- lib/PgSQL_ExplicitTxnStateMgr.cpp | 107 +++++++++++++--------------- lib/PgSQL_Protocol.cpp | 5 +- lib/PgSQL_Session.cpp | 41 ++++++++++- 7 files changed, 132 insertions(+), 93 deletions(-) diff --git a/include/PgSQL_ExplicitTxnStateMgr.h b/include/PgSQL_ExplicitTxnStateMgr.h index ce2ccd8df..18b6d22f3 100644 --- a/include/PgSQL_ExplicitTxnStateMgr.h +++ b/include/PgSQL_ExplicitTxnStateMgr.h @@ -59,7 +59,7 @@ public: PgSQL_TxnCmdParser() noexcept { tokens.reserve(16); } ~PgSQL_TxnCmdParser() noexcept = default; - TxnCmd parse(std::string_view input, bool in_transaction_mode) noexcept; + TxnCmd parse(std::string_view input) noexcept; private: std::vector tokens; @@ -102,6 +102,8 @@ public: bool handle_transaction(std::string_view input); int get_savepoint_count() const { return savepoint.size(); } + bool is_in_transaction() const { return !transaction_state.empty(); } + void reset_state(); void fill_internal_session(nlohmann::json& j); private: diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index b78b2af45..5c18238a2 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -430,6 +430,8 @@ private: void switch_fast_forward_to_normal_mode(); public: + void handle_transaction_state(); + inline bool is_extended_query_frame_empty() const { return extended_query_frame.empty(); } @@ -604,6 +606,7 @@ public: void generate_status_one_hostgroup(int hid, std::string& s); void set_previous_status_mode3(bool allow_execute = true); char* get_current_query(int max_length = -1); + bool is_in_transaction() const; private: int32_t extract_pid_from_param(const PgSQL_Param_Value& param, uint16_t format) const; diff --git a/lib/Base_Session.cpp b/lib/Base_Session.cpp index 73c4823c8..71f2a55ca 100644 --- a/lib/Base_Session.cpp +++ b/lib/Base_Session.cpp @@ -7,6 +7,7 @@ using json = nlohmann::json; #include "MySQL_PreparedStatement.h" #include "MySQL_Data_Stream.h" #include "PgSQL_Data_Stream.h" +#include "PgSQL_ExplicitTxnStateMgr.h" #define SELECT_DB_USER "select DATABASE(), USER() limit 1" #define SELECT_DB_USER_LEN 33 @@ -526,6 +527,11 @@ void Base_Session::housekeeping_before_pkts() { myds->return_MySQL_Connection_To_Pool(); } } else if constexpr (std::is_same_v) { + // Reset transaction state before returning connection to pool + if (static_cast(this)->transaction_state_manager) { + static_cast(this)->transaction_state_manager->reset_state(); + } + if (myds->myconn->is_pipeline_active() == true) { create_new_session_and_reset_connection(myds); } else { diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index dece0ac8a..304415536 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -798,6 +798,14 @@ handler_again: case ASYNC_STMT_DESCRIBE_END: case ASYNC_STMT_EXECUTE_END: PROXY_TRACE2(); + + if (is_error_present() == false && + (async_state_machine == ASYNC_QUERY_END || async_state_machine == ASYNC_STMT_EXECUTE_END)) { + if (myds->sess->locked_on_hostgroup == -1 && myds->sess->transaction_state_manager) { + myds->sess->handle_transaction_state(); + } + } + if (is_error_present()) { compute_unknown_transaction_status(); } else { @@ -1560,41 +1568,34 @@ int PgSQL_Connection::async_ping(short event) { } bool PgSQL_Connection::IsKnownActiveTransaction() { - bool in_txn = false; - if (pgsql_conn) { - // Get the transaction status - PGTransactionStatusType status = PQtransactionStatus(pgsql_conn); - if (status == PQTRANS_INTRANS || status == PQTRANS_INERROR) { - in_txn = true; - } + if (!pgsql_conn) return false; + + PGTransactionStatusType status = PQtransactionStatus(pgsql_conn); + if (status == PQTRANS_INTRANS || status == PQTRANS_INERROR) { + return true; } - return in_txn; -} -bool PgSQL_Connection::IsActiveTransaction() { - bool in_txn = false; - if (pgsql_conn) { + // In pipeline mode, libpq status may be stale because ReadyForQuery hasn't been processed yet + // Use the session's transaction state manager which tracks BEGIN/COMMIT/ROLLBACK via SQL parsing + if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_ON && myds && myds->sess) { + return myds->sess->is_in_transaction(); + } - // Get the transaction status - PGTransactionStatusType status = PQtransactionStatus(pgsql_conn); + return false; +} - switch (status) { - case PQTRANS_INTRANS: - case PQTRANS_INERROR: - in_txn = true; - break; - case PQTRANS_UNKNOWN: - case PQTRANS_IDLE: - case PQTRANS_ACTIVE: - default: - in_txn = false; - } +bool PgSQL_Connection::IsActiveTransaction() { + // First check known state + if (IsKnownActiveTransaction()) { + return true; + } - if (in_txn == false && is_error_present() && unknown_transaction_status == true) { - in_txn = true; - } + // Check unknown transaction status flag + if (is_error_present() && unknown_transaction_status) { + return true; } - return in_txn; + + return false; } bool PgSQL_Connection::IsServerOffline() { diff --git a/lib/PgSQL_ExplicitTxnStateMgr.cpp b/lib/PgSQL_ExplicitTxnStateMgr.cpp index f8d023485..34a0bb1e9 100644 --- a/lib/PgSQL_ExplicitTxnStateMgr.cpp +++ b/lib/PgSQL_ExplicitTxnStateMgr.cpp @@ -11,11 +11,7 @@ PgSQL_ExplicitTxnStateMgr::PgSQL_ExplicitTxnStateMgr(PgSQL_Session* sess) : sess } PgSQL_ExplicitTxnStateMgr::~PgSQL_ExplicitTxnStateMgr() { - for (auto& tran_state : transaction_state) { - reset_variable_snapshot(tran_state); - } - transaction_state.clear(); - savepoint.clear(); + reset_state(); } void PgSQL_ExplicitTxnStateMgr::reset_variable_snapshot(PgSQL_Variable_Snapshot& var_snapshot) noexcept { @@ -300,7 +296,7 @@ void PgSQL_ExplicitTxnStateMgr::fill_internal_session(nlohmann::json& j) { } bool PgSQL_ExplicitTxnStateMgr::handle_transaction(std::string_view input) { - TxnCmd cmd = tx_parser.parse(input, (session->active_transactions > 0)); + TxnCmd cmd = tx_parser.parse(input); switch (cmd.type) { case TxnCmd::BEGIN: start_transaction(); @@ -327,7 +323,16 @@ bool PgSQL_ExplicitTxnStateMgr::handle_transaction(std::string_view input) { return true; } -TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mode) noexcept { +void PgSQL_ExplicitTxnStateMgr::reset_state() { + + for (auto& tran_state : transaction_state) { + reset_variable_snapshot(tran_state); + } + transaction_state.clear(); + savepoint.clear(); +} + +TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input) noexcept { TxnCmd cmd; if (input.empty()) return cmd; @@ -367,36 +372,26 @@ TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mod // Check if this is a transaction command we care about TxnCmd::Type cmd_type = TxnCmd::UNKNOWN; - if (in_transaction_mode) { - if (iequals(first_word, "begin")) { - cmd.type = TxnCmd::BEGIN; - return cmd; - } - - if (iequals(first_word, "start")) { - cmd_type = TxnCmd::BEGIN; - } else if (iequals(first_word, "savepoint")) { - cmd_type = TxnCmd::SAVEPOINT; - } else if (iequals(first_word, "release")) { - cmd_type = TxnCmd::RELEASE; - } else if (iequals(first_word, "rollback")) { - cmd_type = TxnCmd::ROLLBACK; - } - } else { - - if (iequals(first_word, "commit") || iequals(first_word, "end")) { - cmd.type = TxnCmd::COMMIT; - return cmd; - } - - if (iequals(first_word, "abort")) { - cmd.type = TxnCmd::ROLLBACK; - return cmd; - } - - if (iequals(first_word, "rollback")) { - cmd_type = TxnCmd::ROLLBACK; - } + // Parse transaction commands regardless of current transaction state. + // This is required for pipeline mode where multiple commands are sent + // before ReadyForQuery packets return the actual transaction status. + if (iequals(first_word, "begin")) { + cmd.type = TxnCmd::BEGIN; + return cmd; + } else if (iequals(first_word, "start")) { + cmd_type = TxnCmd::BEGIN; + } else if (iequals(first_word, "savepoint")) { + cmd_type = TxnCmd::SAVEPOINT; + } else if (iequals(first_word, "release")) { + cmd_type = TxnCmd::RELEASE; + } else if (iequals(first_word, "rollback")) { + cmd_type = TxnCmd::ROLLBACK; + } else if (iequals(first_word, "commit") || iequals(first_word, "end")) { + cmd.type = TxnCmd::COMMIT; + return cmd; + } else if (iequals(first_word, "abort")) { + cmd.type = TxnCmd::ROLLBACK; + return cmd; } // If not a transaction command, return early @@ -443,28 +438,22 @@ TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mod size_t pos = 0; - if (in_transaction_mode) { - - switch (cmd_type) { - case TxnCmd::BEGIN: - cmd = parse_start(pos); - break; - case TxnCmd::SAVEPOINT: - cmd = parse_savepoint(pos); - break; - case TxnCmd::RELEASE: - cmd = parse_release(pos); - break; - case TxnCmd::ROLLBACK: - cmd = parse_rollback(pos); - break; - default: - break; - } - } else { - if (cmd_type == TxnCmd::ROLLBACK) - cmd = parse_rollback(pos); - } + switch (cmd_type) { + case TxnCmd::BEGIN: + cmd = parse_start(pos); + break; + case TxnCmd::SAVEPOINT: + cmd = parse_savepoint(pos); + break; + case TxnCmd::RELEASE: + cmd = parse_release(pos); + break; + case TxnCmd::ROLLBACK: + cmd = parse_rollback(pos); + break; + default: + break; + } return cmd; } diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index e990be514..a7a642cae 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1360,10 +1360,11 @@ void PgSQL_Protocol::generate_error_packet(bool send, bool ready, const char* ms assert(send == true || _ptr); if (send) { - // in case of fatal error we dont generate ready packets - ready = !fatal; + if (ready == fatal) + ready = !ready; } + PG_pkt pgpkt{}; if (ready) diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index f0d48c9a9..95e0953df 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -307,6 +307,10 @@ void PgSQL_Session::reset() { if (client_myds && client_myds->myconn) { client_myds->myconn->reset(); } + + if (transaction_state_manager) { + transaction_state_manager->reset_state(); + } extended_query_phase = EXTQ_PHASE_IDLE; } @@ -2296,7 +2300,11 @@ __implicit_sync: reset_extended_query_frame(); proxy_error("Not implemented yet. Message type:'%c'\n", c); client_myds->setDSS_STATE_QUERY_SENT_NET(); - client_myds->myprot.generate_error_packet(true, true, "Feature not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED, + + bool send_ready_packet = is_extended_query_ready_for_query() && c != 'H'; + //unsigned int nTrx = NumActiveTransactions(); + //const char txn_state = (nTrx ? 'T' : 'I'); + client_myds->myprot.generate_error_packet(true, send_ready_packet, "Feature not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED, false, true); l_free(pkt.size, pkt.ptr); client_myds->DSS = STATE_SLEEP; @@ -2382,6 +2390,9 @@ int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgS } } } + if (transaction_state_manager) { + transaction_state_manager->reset_state(); + } myds->destroy_MySQL_Connection_From_Pool(false); myds->fd = 0; if (retry_conn) { @@ -2440,6 +2451,9 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) { } } } + if (transaction_state_manager) { + transaction_state_manager->reset_state(); + } myds->destroy_MySQL_Connection_From_Pool(false); myds->fd = 0; if (retry_conn) { @@ -2498,6 +2512,9 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int proxy_warning("Retrying query.\n"); } } + if (transaction_state_manager) { + transaction_state_manager->reset_state(); + } myds->destroy_MySQL_Connection_From_Pool(false); myconn = myds->myconn; myds->fd = 0; @@ -2560,6 +2577,10 @@ void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* my if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) { myds->DSS = STATE_NOT_INITIALIZED; + // Reset transaction state before returning connection to pool + if (transaction_state_manager) { + transaction_state_manager->reset_state(); + } if (myconn->is_pipeline_active() == true) { create_new_session_and_reset_connection(myds); } else { @@ -5048,6 +5069,12 @@ void PgSQL_Session::LogQuery(PgSQL_Data_Stream* myds) { } } +void PgSQL_Session::handle_transaction_state() { + if (locked_on_hostgroup == -1) { + transaction_state_manager->handle_transaction(CurrentQuery.get_digest_text()); + } +} + void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, bool called_on_failure) { // check if multiplexing needs to be disabled @@ -5078,7 +5105,6 @@ void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, bool called_on_failure) // we do not maintain the transaction variable state if the session is locked on a hostgroup // or is a Fast Forward session. if (locked_on_hostgroup == -1) { - transaction_state_manager->handle_transaction(query_digest_text); savepoint_count = transaction_state_manager->get_savepoint_count(); } @@ -5507,6 +5533,12 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon myds->wait_until = 0; myconn->multiplex_delayed = false; } else { + // CONNECTION BEING DETACHED - Reset transaction state + // This handles: (1) normal pool return, (2) pipeline reset, (3) connection destroyed + // When connection detaches, any in-flight transaction state becomes invalid + if (transaction_state_manager) { + transaction_state_manager->reset_state(); + } myconn->multiplex_delayed = false; myds->wait_until = 0; myds->DSS = STATE_NOT_INITIALIZED; @@ -5628,6 +5660,10 @@ void PgSQL_Session::generate_status_one_hostgroup(int hid, std::string& s) { delete resultset; } +bool PgSQL_Session::is_in_transaction() const { + return transaction_state_manager && transaction_state_manager->is_in_transaction(); +} + /** * @brief Sets the previous status of the PgSQL session according to the current status, with an option to allow EXECUTE statements. * @@ -6484,6 +6520,7 @@ int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_S unsigned int nTxn = NumActiveTransactions(); const char txn_state = (nTxn ? 'T' : 'I'); client_myds->myprot.generate_ready_for_query_packet(true, txn_state); + writeout(); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; extended_query_phase = EXTQ_PHASE_IDLE;