Fix PostgreSQL transaction state management for pipeline mode

Move transaction state tracking from RequestEnd to connection handler to enable proper binary pipeline mode support:

- Call handle_transaction_state() at ASYNC_QUERY_END/ASYNC_STMT_EXECUTE_END in PgSQL_Connection instead of RequestEnd
- Add reset_state() calls in 7 locations to prevent stale state:
  * Session reset
  * Connection detach (finishQuery)
  * Error handlers (4 paths)
  * Base session housekeeping
- Refactor destructor to use reset_state()

Fixes transaction state tracking in pipeline mode where connections can be returned to pool mid-transaction.
pull/5431/head
Rahim Kanji 2 months ago
parent d0ad4b666d
commit 742ff05ecc

@ -59,7 +59,7 @@ public:
PgSQL_TxnCmdParser() noexcept { tokens.reserve(16); }
~PgSQL_TxnCmdParser() noexcept = default;
TxnCmd parse(std::string_view input, bool in_transaction_mode) noexcept;
TxnCmd parse(std::string_view input) noexcept;
private:
std::vector<std::string_view> tokens;
@ -102,6 +102,8 @@ public:
bool handle_transaction(std::string_view input);
int get_savepoint_count() const { return savepoint.size(); }
bool is_in_transaction() const { return !transaction_state.empty(); }
void reset_state();
void fill_internal_session(nlohmann::json& j);
private:

@ -430,6 +430,8 @@ private:
void switch_fast_forward_to_normal_mode();
public:
void handle_transaction_state();
inline bool is_extended_query_frame_empty() const {
return extended_query_frame.empty();
}
@ -604,6 +606,7 @@ public:
void generate_status_one_hostgroup(int hid, std::string& s);
void set_previous_status_mode3(bool allow_execute = true);
char* get_current_query(int max_length = -1);
bool is_in_transaction() const;
private:
int32_t extract_pid_from_param(const PgSQL_Param_Value& param, uint16_t format) const;

@ -7,6 +7,7 @@ using json = nlohmann::json;
#include "MySQL_PreparedStatement.h"
#include "MySQL_Data_Stream.h"
#include "PgSQL_Data_Stream.h"
#include "PgSQL_ExplicitTxnStateMgr.h"
#define SELECT_DB_USER "select DATABASE(), USER() limit 1"
#define SELECT_DB_USER_LEN 33
@ -526,6 +527,11 @@ void Base_Session<S,DS,B,T>::housekeeping_before_pkts() {
myds->return_MySQL_Connection_To_Pool();
}
} else if constexpr (std::is_same_v<S, PgSQL_Session>) {
// Reset transaction state before returning connection to pool
if (static_cast<PgSQL_Session*>(this)->transaction_state_manager) {
static_cast<PgSQL_Session*>(this)->transaction_state_manager->reset_state();
}
if (myds->myconn->is_pipeline_active() == true) {
create_new_session_and_reset_connection(myds);
} else {

@ -798,6 +798,14 @@ handler_again:
case ASYNC_STMT_DESCRIBE_END:
case ASYNC_STMT_EXECUTE_END:
PROXY_TRACE2();
if (is_error_present() == false &&
(async_state_machine == ASYNC_QUERY_END || async_state_machine == ASYNC_STMT_EXECUTE_END)) {
if (myds->sess->locked_on_hostgroup == -1 && myds->sess->transaction_state_manager) {
myds->sess->handle_transaction_state();
}
}
if (is_error_present()) {
compute_unknown_transaction_status();
} else {
@ -1560,41 +1568,34 @@ int PgSQL_Connection::async_ping(short event) {
}
bool PgSQL_Connection::IsKnownActiveTransaction() {
bool in_txn = false;
if (pgsql_conn) {
// Get the transaction status
PGTransactionStatusType status = PQtransactionStatus(pgsql_conn);
if (status == PQTRANS_INTRANS || status == PQTRANS_INERROR) {
in_txn = true;
}
if (!pgsql_conn) return false;
PGTransactionStatusType status = PQtransactionStatus(pgsql_conn);
if (status == PQTRANS_INTRANS || status == PQTRANS_INERROR) {
return true;
}
return in_txn;
}
bool PgSQL_Connection::IsActiveTransaction() {
bool in_txn = false;
if (pgsql_conn) {
// In pipeline mode, libpq status may be stale because ReadyForQuery hasn't been processed yet
// Use the session's transaction state manager which tracks BEGIN/COMMIT/ROLLBACK via SQL parsing
if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_ON && myds && myds->sess) {
return myds->sess->is_in_transaction();
}
// Get the transaction status
PGTransactionStatusType status = PQtransactionStatus(pgsql_conn);
return false;
}
switch (status) {
case PQTRANS_INTRANS:
case PQTRANS_INERROR:
in_txn = true;
break;
case PQTRANS_UNKNOWN:
case PQTRANS_IDLE:
case PQTRANS_ACTIVE:
default:
in_txn = false;
}
bool PgSQL_Connection::IsActiveTransaction() {
// First check known state
if (IsKnownActiveTransaction()) {
return true;
}
if (in_txn == false && is_error_present() && unknown_transaction_status == true) {
in_txn = true;
}
// Check unknown transaction status flag
if (is_error_present() && unknown_transaction_status) {
return true;
}
return in_txn;
return false;
}
bool PgSQL_Connection::IsServerOffline() {

@ -11,11 +11,7 @@ PgSQL_ExplicitTxnStateMgr::PgSQL_ExplicitTxnStateMgr(PgSQL_Session* sess) : sess
}
PgSQL_ExplicitTxnStateMgr::~PgSQL_ExplicitTxnStateMgr() {
for (auto& tran_state : transaction_state) {
reset_variable_snapshot(tran_state);
}
transaction_state.clear();
savepoint.clear();
reset_state();
}
void PgSQL_ExplicitTxnStateMgr::reset_variable_snapshot(PgSQL_Variable_Snapshot& var_snapshot) noexcept {
@ -300,7 +296,7 @@ void PgSQL_ExplicitTxnStateMgr::fill_internal_session(nlohmann::json& j) {
}
bool PgSQL_ExplicitTxnStateMgr::handle_transaction(std::string_view input) {
TxnCmd cmd = tx_parser.parse(input, (session->active_transactions > 0));
TxnCmd cmd = tx_parser.parse(input);
switch (cmd.type) {
case TxnCmd::BEGIN:
start_transaction();
@ -327,7 +323,16 @@ bool PgSQL_ExplicitTxnStateMgr::handle_transaction(std::string_view input) {
return true;
}
TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mode) noexcept {
void PgSQL_ExplicitTxnStateMgr::reset_state() {
for (auto& tran_state : transaction_state) {
reset_variable_snapshot(tran_state);
}
transaction_state.clear();
savepoint.clear();
}
TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input) noexcept {
TxnCmd cmd;
if (input.empty()) return cmd;
@ -367,36 +372,26 @@ TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mod
// Check if this is a transaction command we care about
TxnCmd::Type cmd_type = TxnCmd::UNKNOWN;
if (in_transaction_mode) {
if (iequals(first_word, "begin")) {
cmd.type = TxnCmd::BEGIN;
return cmd;
}
if (iequals(first_word, "start")) {
cmd_type = TxnCmd::BEGIN;
} else if (iequals(first_word, "savepoint")) {
cmd_type = TxnCmd::SAVEPOINT;
} else if (iequals(first_word, "release")) {
cmd_type = TxnCmd::RELEASE;
} else if (iequals(first_word, "rollback")) {
cmd_type = TxnCmd::ROLLBACK;
}
} else {
if (iequals(first_word, "commit") || iequals(first_word, "end")) {
cmd.type = TxnCmd::COMMIT;
return cmd;
}
if (iequals(first_word, "abort")) {
cmd.type = TxnCmd::ROLLBACK;
return cmd;
}
if (iequals(first_word, "rollback")) {
cmd_type = TxnCmd::ROLLBACK;
}
// Parse transaction commands regardless of current transaction state.
// This is required for pipeline mode where multiple commands are sent
// before ReadyForQuery packets return the actual transaction status.
if (iequals(first_word, "begin")) {
cmd.type = TxnCmd::BEGIN;
return cmd;
} else if (iequals(first_word, "start")) {
cmd_type = TxnCmd::BEGIN;
} else if (iequals(first_word, "savepoint")) {
cmd_type = TxnCmd::SAVEPOINT;
} else if (iequals(first_word, "release")) {
cmd_type = TxnCmd::RELEASE;
} else if (iequals(first_word, "rollback")) {
cmd_type = TxnCmd::ROLLBACK;
} else if (iequals(first_word, "commit") || iequals(first_word, "end")) {
cmd.type = TxnCmd::COMMIT;
return cmd;
} else if (iequals(first_word, "abort")) {
cmd.type = TxnCmd::ROLLBACK;
return cmd;
}
// If not a transaction command, return early
@ -443,28 +438,22 @@ TxnCmd PgSQL_TxnCmdParser::parse(std::string_view input, bool in_transaction_mod
size_t pos = 0;
if (in_transaction_mode) {
switch (cmd_type) {
case TxnCmd::BEGIN:
cmd = parse_start(pos);
break;
case TxnCmd::SAVEPOINT:
cmd = parse_savepoint(pos);
break;
case TxnCmd::RELEASE:
cmd = parse_release(pos);
break;
case TxnCmd::ROLLBACK:
cmd = parse_rollback(pos);
break;
default:
break;
}
} else {
if (cmd_type == TxnCmd::ROLLBACK)
cmd = parse_rollback(pos);
}
switch (cmd_type) {
case TxnCmd::BEGIN:
cmd = parse_start(pos);
break;
case TxnCmd::SAVEPOINT:
cmd = parse_savepoint(pos);
break;
case TxnCmd::RELEASE:
cmd = parse_release(pos);
break;
case TxnCmd::ROLLBACK:
cmd = parse_rollback(pos);
break;
default:
break;
}
return cmd;
}

@ -1360,10 +1360,11 @@ void PgSQL_Protocol::generate_error_packet(bool send, bool ready, const char* ms
assert(send == true || _ptr);
if (send) {
// in case of fatal error we dont generate ready packets
ready = !fatal;
if (ready == fatal)
ready = !ready;
}
PG_pkt pgpkt{};
if (ready)

@ -307,6 +307,10 @@ void PgSQL_Session::reset() {
if (client_myds && client_myds->myconn) {
client_myds->myconn->reset();
}
if (transaction_state_manager) {
transaction_state_manager->reset_state();
}
extended_query_phase = EXTQ_PHASE_IDLE;
}
@ -2296,7 +2300,11 @@ __implicit_sync:
reset_extended_query_frame();
proxy_error("Not implemented yet. Message type:'%c'\n", c);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_error_packet(true, true, "Feature not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED,
bool send_ready_packet = is_extended_query_ready_for_query() && c != 'H';
//unsigned int nTrx = NumActiveTransactions();
//const char txn_state = (nTrx ? 'T' : 'I');
client_myds->myprot.generate_error_packet(true, send_ready_packet, "Feature not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED,
false, true);
l_free(pkt.size, pkt.ptr);
client_myds->DSS = STATE_SLEEP;
@ -2382,6 +2390,9 @@ int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgS
}
}
}
if (transaction_state_manager) {
transaction_state_manager->reset_state();
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
if (retry_conn) {
@ -2440,6 +2451,9 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) {
}
}
}
if (transaction_state_manager) {
transaction_state_manager->reset_state();
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
if (retry_conn) {
@ -2498,6 +2512,9 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int
proxy_warning("Retrying query.\n");
}
}
if (transaction_state_manager) {
transaction_state_manager->reset_state();
}
myds->destroy_MySQL_Connection_From_Pool(false);
myconn = myds->myconn;
myds->fd = 0;
@ -2560,6 +2577,10 @@ void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* my
if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false &&
myconn->MultiplexDisabled() == false) {
myds->DSS = STATE_NOT_INITIALIZED;
// Reset transaction state before returning connection to pool
if (transaction_state_manager) {
transaction_state_manager->reset_state();
}
if (myconn->is_pipeline_active() == true) {
create_new_session_and_reset_connection(myds);
} else {
@ -5048,6 +5069,12 @@ void PgSQL_Session::LogQuery(PgSQL_Data_Stream* myds) {
}
}
void PgSQL_Session::handle_transaction_state() {
if (locked_on_hostgroup == -1) {
transaction_state_manager->handle_transaction(CurrentQuery.get_digest_text());
}
}
void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, bool called_on_failure) {
// check if multiplexing needs to be disabled
@ -5078,7 +5105,6 @@ void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, bool called_on_failure)
// we do not maintain the transaction variable state if the session is locked on a hostgroup
// or is a Fast Forward session.
if (locked_on_hostgroup == -1) {
transaction_state_manager->handle_transaction(query_digest_text);
savepoint_count = transaction_state_manager->get_savepoint_count();
}
@ -5507,6 +5533,12 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon
myds->wait_until = 0;
myconn->multiplex_delayed = false;
} else {
// CONNECTION BEING DETACHED - Reset transaction state
// This handles: (1) normal pool return, (2) pipeline reset, (3) connection destroyed
// When connection detaches, any in-flight transaction state becomes invalid
if (transaction_state_manager) {
transaction_state_manager->reset_state();
}
myconn->multiplex_delayed = false;
myds->wait_until = 0;
myds->DSS = STATE_NOT_INITIALIZED;
@ -5628,6 +5660,10 @@ void PgSQL_Session::generate_status_one_hostgroup(int hid, std::string& s) {
delete resultset;
}
bool PgSQL_Session::is_in_transaction() const {
return transaction_state_manager && transaction_state_manager->is_in_transaction();
}
/**
* @brief Sets the previous status of the PgSQL session according to the current status, with an option to allow EXECUTE statements.
*
@ -6484,6 +6520,7 @@ int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_S
unsigned int nTxn = NumActiveTransactions();
const char txn_state = (nTxn ? 'T' : 'I');
client_myds->myprot.generate_ready_for_query_packet(true, txn_state);
writeout();
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
extended_query_phase = EXTQ_PHASE_IDLE;

Loading…
Cancel
Save