diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 53e36628f..e5bcc6914 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -387,6 +387,18 @@ private: int handler_ProcessingQueryError_CheckBackendConnectionStatus(PgSQL_Data_Stream* myds); void SetQueryTimeout(); bool handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds); + // Synthesize ErrorResponse(25P02) + NoticeResponse(backend text, no 57P01) + + // ReadyForQuery('E') to the client, destroy the backend pool connection, set + // tx_poisoned=true. Returns true if poison was applied; false means the + // caller must fall back to the current terminate-the-session flow (e.g. + // admin var off, result transfer already started, or a preflight failed). + bool handler_minus1_PoisonTransaction(PgSQL_Data_Stream* myds); + // While tx_poisoned, classify a 'Q' packet and either clear the poison + // and synthesize a ROLLBACK response (for ROLLBACK / COMMIT / ABORT / + // ROLLBACK TO SAVEPOINT) or reject with ERROR 25P02 (anything else). + // Returns true if the packet was handled here. Increments the + // pgsql_tx_poisoned_{recovered,rejected_statements}_total counters. + bool handler_poisoned_simple_query(PtrSize_t* pkt); void handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn); bool handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int& handler_ret); void handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, bool& wrong_pass); @@ -463,6 +475,20 @@ public: bool describe_mode{ false }; char describe_table_name[256]{ 0 }; + // When a backend connection breaks mid-transaction AND the admin var + // pgsql-preserve_client_on_broken_backend_in_tx is on, we synthesize an + // ERROR 25P02 (current transaction is aborted) + ReadyForQuery('E') to + // the client, destroy the backend pool connection, and set this flag + // true instead of tearing down the client session. While this is true, + // the query intake path short-circuits before query rules: + // ROLLBACK / ROLLBACK TO SAVEPOINT / ABORT -> synthesize + // CommandComplete('ROLLBACK') + ReadyForQuery('I'), clear flag. + // COMMIT -> same ROLLBACK response + NoticeResponse carrying the + // "there is no transaction in progress" warning, clear flag. + // anything else (including RELEASE SAVEPOINT) -> reply ERROR 25P02 + // + ReadyForQuery('E'), stay poisoned. + bool tx_poisoned{ false }; + #ifdef DEBUG PgSQL_Connection* dbg_extended_query_backend_conn = nullptr; #endif diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 3f89222ef..b389afda0 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -243,6 +243,12 @@ public: struct { unsigned long long stvar[PG_st_var_END]; unsigned int active_transactions; + // tx-poisoned feature counters. Each PgSQL thread maintains its own + // (lock-free) and PgSQL_Threads_Handler aggregates across threads for + // stats_pgsql_global exposure. See preserve_client_on_broken_backend_in_tx. + unsigned long long tx_poisoned_total; + unsigned long long tx_poisoned_recovered_total; + unsigned long long tx_poisoned_rejected_statements_total; } status_variables; struct { @@ -976,6 +982,7 @@ public: bool have_ssl; bool multiplexing; // bool stmt_multiplexing; + bool preserve_client_on_broken_backend_in_tx; bool log_unhealthy_connections; bool enforce_autocommit_on_reads; bool autocommit_false_not_reusable; @@ -1627,6 +1634,13 @@ public: */ unsigned int get_active_transations(); + // Aggregated tx-poisoned counters across all PgSQL threads. These back the + // pgsql_tx_poisoned_total / pgsql_tx_poisoned_recovered_total / + // pgsql_tx_poisoned_rejected_statements_total rows in stats_pgsql_global. + unsigned long long get_tx_poisoned_total(); + unsigned long long get_tx_poisoned_recovered_total(); + unsigned long long get_tx_poisoned_rejected_statements_total(); + #ifdef IDLE_THREADS /** * @brief Retrieves the number of non-idle client connections across all threads. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 5c6765032..63df47892 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -1107,6 +1107,7 @@ __thread int pgsql_thread___client_host_error_counts; __thread int pgsql_thread___connect_retries_on_failure; __thread int pgsql_thread___connect_retries_delay; __thread bool pgsql_thread___multiplexing; +__thread bool pgsql_thread___preserve_client_on_broken_backend_in_tx; __thread int pgsql_thread___connection_delay_multiplex_ms; __thread int pgsql_thread___connection_max_age_ms; __thread int pgsql_thread___connect_timeout_client; @@ -1443,6 +1444,7 @@ extern __thread int pgsql_thread___client_host_error_counts; extern __thread int pgsql_thread___connect_retries_on_failure; extern __thread int pgsql_thread___connect_retries_delay; extern __thread bool pgsql_thread___multiplexing; +extern __thread bool pgsql_thread___preserve_client_on_broken_backend_in_tx; extern __thread int pgsql_thread___connection_delay_multiplex_ms; extern __thread int pgsql_thread___connection_max_age_ms; extern __thread int pgsql_thread___connect_timeout_client; diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index d3de11d8b..c101e5fcb 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -318,6 +318,9 @@ void PgSQL_Session::reset() { transaction_state_manager->reset_state(); } extended_query_phase = EXTQ_PHASE_IDLE; + // Clear any poisoned-transaction state — if the session is being reset we're + // past the scope of the poison. + tx_poisoned = false; #ifdef PROXYSQLFFTO ffto_bypassed = false; if (m_ffto) { @@ -384,6 +387,113 @@ PgSQL_Session::~PgSQL_Session() { delete transaction_state_manager; } +// Called from handler_special_queries when tx_poisoned == true. Classifies the +// incoming simple-query packet and either recovers the session (ROLLBACK / +// COMMIT / ABORT family) or rejects with ERROR 25P02 (anything else, including +// RELEASE SAVEPOINT per Postgres native behavior). Returns true in all cases +// — the caller drops the packet and loops back to wait for the next client +// message. +// +// Response shape: +// * Recovery (ROLLBACK / ROLLBACK TO SAVEPOINT / ABORT / COMMIT): +// CommandComplete('ROLLBACK') + ReadyForQuery('I'). For COMMIT also a +// preceding NoticeResponse with "there is no transaction in progress" +// — matches Postgres native behavior for COMMIT inside an aborted tx. +// * Rejection (anything else, incl. RELEASE SAVEPOINT): +// ErrorResponse(25P02) + ReadyForQuery('E'), tx_poisoned stays true. +bool PgSQL_Session::handler_poisoned_simple_query(PtrSize_t* pkt) { + if (pkt->size <= 5) { + // malformed / empty — behave like rejection, keep poisoned. + thread->status_variables.tx_poisoned_rejected_statements_total++; + PG_pkt pgpkt{}; + pgpkt.set_multi_pkt_mode(true); + pgpkt.write_generic('E', "cscscscsc", + 'S', "ERROR", 'V', "ERROR", + 'C', PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_IN_FAILED_SQL_TRANSACTION), + 'M', "current transaction is aborted, commands ignored until end of transaction block", + 0); + pgpkt.write_ReadyForQuery('E'); + pgpkt.set_multi_pkt_mode(false); + auto buff = pgpkt.detach(); + client_myds->PSarrayOUT->add((void*)buff.first, buff.second); + client_myds->DSS = STATE_SLEEP; + l_free(pkt->size, pkt->ptr); + if (mirror == false) RequestEnd(NULL, false); + return true; + } + + const char* sql = (char*)pkt->ptr + 5; + unsigned int sql_len = pkt->size - 5; + + // Skip leading whitespace. Postgres accepts leading whitespace in a SimpleQuery + // command buffer; so should our classifier. + while (sql_len > 0 && (*sql == ' ' || *sql == '\t' || *sql == '\n' || *sql == '\r')) { + sql++; + sql_len--; + } + + enum RecoveryKind { KIND_REJECT, KIND_ROLLBACK, KIND_COMMIT }; + RecoveryKind kind = KIND_REJECT; + + // Case-insensitive prefix match, followed by a word boundary so "ROLLBACKET" + // doesn't get classified as ROLLBACK. + auto matches_kw = [&](const char* kw, unsigned int klen) -> bool { + if (sql_len < klen) return false; + if (strncasecmp(sql, kw, klen) != 0) return false; + if (sql_len == klen) return true; + char ch = sql[klen]; + return !(ch == '_' || (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch >= '0' && ch <= '9')); + }; + + // Order matters: ROLLBACK (covers ROLLBACK and ROLLBACK TO SAVEPOINT) before + // anything else. ABORT is a Postgres synonym for ROLLBACK. COMMIT and END are + // handled as COMMIT (both are Postgres COMMIT aliases; END rolls back inside + // an aborted tx same as COMMIT does). + if (matches_kw("ROLLBACK", 8) || matches_kw("ABORT", 5)) { + kind = KIND_ROLLBACK; + } else if (matches_kw("COMMIT", 6) || matches_kw("END", 3)) { + kind = KIND_COMMIT; + } + // RELEASE SAVEPOINT and anything else: KIND_REJECT. Matches Postgres: only + // ROLLBACK / COMMIT / ABORT / SAVEPOINT-to-rollback end an aborted tx. + + PG_pkt pgpkt{}; + pgpkt.set_multi_pkt_mode(true); + + if (kind == KIND_REJECT) { + thread->status_variables.tx_poisoned_rejected_statements_total++; + pgpkt.write_generic('E', "cscscscsc", + 'S', "ERROR", 'V', "ERROR", + 'C', PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_IN_FAILED_SQL_TRANSACTION), + 'M', "current transaction is aborted, commands ignored until end of transaction block", + 0); + pgpkt.write_ReadyForQuery('E'); + } else { + // Recovery. Clear the poison and issue a ROLLBACK-shaped response. + thread->status_variables.tx_poisoned_recovered_total++; + if (kind == KIND_COMMIT) { + // Match Postgres native: COMMIT inside an aborted tx emits a WARNING + // notice and rolls back. + pgpkt.write_generic('N', "cscscscsc", + 'S', "WARNING", 'V', "WARNING", + 'C', PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_SUCCESSFUL_COMPLETION), + 'M', "there is no transaction in progress", + 0); + } + pgpkt.write_CommandComplete("ROLLBACK"); + pgpkt.write_ReadyForQuery('I'); + tx_poisoned = false; + } + + pgpkt.set_multi_pkt_mode(false); + auto buff = pgpkt.detach(); + client_myds->PSarrayOUT->add((void*)buff.first, buff.second); + client_myds->DSS = STATE_SLEEP; + l_free(pkt->size, pkt->ptr); + if (mirror == false) RequestEnd(NULL, false); + return true; +} + bool PgSQL_Session::handler_CommitRollback(PtrSize_t* pkt) { if (pkt->size <= 5) { return false; } char c = ((char*)pkt->ptr)[5]; @@ -628,6 +738,13 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { bool PgSQL_Session::handler_special_queries(PtrSize_t* pkt, bool* lock_hostgroup) { + // Earliest gate: if the session is poisoned (backend died mid-tx and we + // kept the client session alive), classify-and-respond here, BEFORE query + // rules, digests, routing, mirror, SQLi detection, etc. + if (tx_poisoned) { + return handler_poisoned_simple_query(pkt); + } + if ((pkt->size >= 7 + 5) && (strncasecmp("LISTEN ", (const char*)pkt->ptr + 5, 7) == 0)) { client_myds->DSS = STATE_QUERY_SENT_NET; proxy_warning("LISTEN command is not supported\n"); @@ -2059,6 +2176,39 @@ __implicit_sync: } } else { char command = c = *((unsigned char*)pkt.ptr); + // Poisoned-session gate for Extended Query messages. + // Simple Query ('Q') falls through to handler_special_queries, + // which dispatches to handler_poisoned_simple_query. Extended + // Query (P/B/D/C/E/S) is rejected here in V1: P/B/D/C/E are + // swallowed silently, and S emits ErrorResponse(25P02) + + // ReadyForQuery('E'). Client must issue a Simple-Query ROLLBACK + // to recover. QUIT ('X') is always honored. + if (tx_poisoned && command != 'Q' && command != 'X') { + if (command == 'P' || command == 'B' || command == 'D' || + command == 'C' || command == 'E') { + thread->status_variables.tx_poisoned_rejected_statements_total++; + l_free(pkt.size, pkt.ptr); + continue; + } + if (command == 'S') { + thread->status_variables.tx_poisoned_rejected_statements_total++; + PG_pkt pgpkt{}; + pgpkt.set_multi_pkt_mode(true); + pgpkt.write_generic('E', "cscscscsc", + 'S', "ERROR", 'V', "ERROR", + 'C', PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_IN_FAILED_SQL_TRANSACTION), + 'M', "current transaction is aborted, commands ignored until end of transaction block (extended-query path; issue ROLLBACK via simple query to recover)", + 0); + pgpkt.write_ReadyForQuery('E'); + pgpkt.set_multi_pkt_mode(false); + auto buff = pgpkt.detach(); + client_myds->PSarrayOUT->add((void*)buff.first, buff.second); + client_myds->DSS = STATE_SLEEP; + l_free(pkt.size, pkt.ptr); + if (mirror == false) RequestEnd(NULL, false); + continue; + } + } switch (command) { case 'Q': { @@ -2442,10 +2592,77 @@ void PgSQL_Session::SetQueryTimeout() { } } +// Synthesize an aborted-transaction ErrorResponse + NoticeResponse + +// ReadyForQuery('E') to the client so the application can react with ROLLBACK +// without having to reconnect. Called only from handler_minus1_ClientLibraryError +// when we are certain: (a) admin variable is on, (b) the session was in an +// explicit transaction, (c) the result-set transfer has not already started to +// the client (synthesizing 25P02 mid-stream would corrupt the protocol), and +// (d) the client data stream is valid. +// +// On success: the three messages are queued on client_myds->PSarrayOUT, +// tx_poisoned is set to true, and the pgsql_tx_poisoned_total counter is +// incremented. Returns true. The backend connection is NOT destroyed here — +// that stays the caller's job. +// +// On failure (preflight not satisfied): returns false, no side effects. +bool PgSQL_Session::handler_minus1_PoisonTransaction(PgSQL_Data_Stream* myds) { + if (pgsql_thread___preserve_client_on_broken_backend_in_tx == false) return false; + PgSQL_Connection* myconn = myds ? myds->myconn : nullptr; + if (myconn == nullptr) return false; + if (myconn->IsActiveTransaction() == false) return false; + if (myconn->query_result && myconn->query_result->is_transfer_started()) { + // streaming-result fallback: too late to cleanly synthesize 25P02. + return false; + } + if (client_myds == nullptr || client_myds->PSarrayOUT == nullptr) return false; + + const char* backend_err_msg = myconn->get_error_message().c_str(); + const bool has_backend_err_msg = (backend_err_msg != nullptr && backend_err_msg[0] != '\0'); + + PG_pkt pgpkt{}; + pgpkt.set_multi_pkt_mode(true); + // ErrorResponse at severity=ERROR, SQLSTATE=25P02 (in_failed_sql_transaction). + // Not FATAL — libpq treats FATAL as connection-loss and drops the socket. + pgpkt.write_generic('E', "cscscscsc", + 'S', "ERROR", + 'V', "ERROR", + 'C', PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_IN_FAILED_SQL_TRANSACTION), + 'M', "current transaction is aborted, commands ignored until end of transaction block", + 0); + // NoticeResponse carrying the backend's original message text as context, but + // NOT its SQLSTATE (e.g. 57P01) — per design, the only SQLSTATE surfaced to + // the client is the synthesized 25P02. + if (has_backend_err_msg) { + pgpkt.write_generic('N', "cscscscsc", + 'S', "WARNING", + 'V', "WARNING", + 'C', PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_SUCCESSFUL_COMPLETION), + 'M', backend_err_msg, + 0); + } + // ReadyForQuery with txn_state='E' signals the client it is in an aborted + // transaction — libpq will report PQTRANS_INERROR and accept only + // ROLLBACK/COMMIT/ABORT. + pgpkt.write_ReadyForQuery('E'); + pgpkt.set_multi_pkt_mode(false); + auto buff = pgpkt.detach(); + client_myds->PSarrayOUT->add((void*)buff.first, buff.second); + client_myds->DSS = STATE_SLEEP; + + tx_poisoned = true; + thread->status_variables.tx_poisoned_total++; + proxy_warning("Backend connection broken mid-transaction; poisoning client session. " + "Client must issue ROLLBACK (or COMMIT, which will roll back) to recover.\n"); + return true; +} + // this function used to be inline. // now it returns: // true: NEXT_IMMEDIATE(CONNECTING_SERVER) needs to be called -// false: continue +// false: continue (caller must additionally check tx_poisoned — if set, the +// session is to stay open for the client to issue ROLLBACK; otherwise +// the caller should terminate the session with handler_ret=-1) bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) { PgSQL_Connection* myconn = myds->myconn; bool retry_conn = false; @@ -2469,6 +2686,14 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) { } } } + // If we're in an explicit transaction and retry was refused (per the + // unknown_transaction_status guard), try to poison the client session + // instead of terminating it. On success, tx_poisoned is set and the + // caller will see return=false + tx_poisoned=true and keep the session + // open for the client to issue ROLLBACK. + if (retry_conn == false) { + (void)handler_minus1_PoisonTransaction(myds); + } if (transaction_state_manager) { transaction_state_manager->reset_state(); } @@ -3212,6 +3437,13 @@ handler_again: if (myconn->is_connection_in_reusable_state() == false) { if (handler_minus1_ClientLibraryError(myds)) { NEXT_IMMEDIATE(CONNECTING_SERVER); + } else if (tx_poisoned) { + // Backend died mid-transaction and preserve_client_on_broken_backend_in_tx + // is on: the client already received the synthesized ERROR 25P02 + + // ReadyForQuery('E'). Keep the session open so the client can issue + // ROLLBACK; wrap up this query and fall through to the end of the + // processing loop. + RequestEnd(myds, true); } else { handler_ret = -1; return handler_ret; diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 91f6c98d4..9c8c27903 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -376,6 +376,7 @@ static char* pgsql_thread_variables_names[] = { (char*)"max_transaction_idle_time", (char*)"max_transaction_time", (char*)"multiplexing", + (char*)"preserve_client_on_broken_backend_in_tx", (char*)"log_unhealthy_connections", (char*)"enforce_autocommit_on_reads", (char*)"autocommit_false_not_reusable", @@ -1173,6 +1174,7 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() { variables.have_ssl = true; // changed in 2.6.0 , was false by default for performance reason variables.commands_stats = true; variables.multiplexing = true; + variables.preserve_client_on_broken_backend_in_tx = true; variables.log_unhealthy_connections = true; variables.enforce_autocommit_on_reads = false; variables.autocommit_false_not_reusable = false; @@ -2198,6 +2200,7 @@ char** PgSQL_Threads_Handler::get_variables_list() { VariablesPointers_bool["monitor_wait_timeout"] = make_tuple(&variables.monitor_wait_timeout, false); VariablesPointers_bool["monitor_writer_is_also_reader"] = make_tuple(&variables.monitor_writer_is_also_reader, false); VariablesPointers_bool["multiplexing"] = make_tuple(&variables.multiplexing, false); + VariablesPointers_bool["preserve_client_on_broken_backend_in_tx"] = make_tuple(&variables.preserve_client_on_broken_backend_in_tx, false); VariablesPointers_bool["query_cache_stores_empty_result"] = make_tuple(&variables.query_cache_stores_empty_result, false); VariablesPointers_bool["query_digests"] = make_tuple(&variables.query_digests, false); VariablesPointers_bool["query_digests_lowercase"] = make_tuple(&variables.query_digests_lowercase, false); @@ -3965,6 +3968,7 @@ void PgSQL_Thread::refresh_variables() { pgsql_thread___connect_retries_on_failure = GloPTH->get_variable_int((char*)"connect_retries_on_failure"); pgsql_thread___connect_retries_delay = GloPTH->get_variable_int((char*)"connect_retries_delay"); pgsql_thread___multiplexing = (bool)GloPTH->get_variable_int((char*)"multiplexing"); + pgsql_thread___preserve_client_on_broken_backend_in_tx = (bool)GloPTH->get_variable_int((char*)"preserve_client_on_broken_backend_in_tx"); pgsql_thread___connection_delay_multiplex_ms = GloPTH->get_variable_int((char*)"connection_delay_multiplex_ms"); pgsql_thread___connection_max_age_ms = GloPTH->get_variable_int((char*)"connection_max_age_ms"); pgsql_thread___connect_timeout_client = GloPTH->get_variable_int((char*)"connect_timeout_client"); @@ -4241,6 +4245,9 @@ PgSQL_Thread::PgSQL_Thread() { servers_table_version_current = 0; status_variables.active_transactions = 0; + status_variables.tx_poisoned_total = 0; + status_variables.tx_poisoned_recovered_total = 0; + status_variables.tx_poisoned_rejected_statements_total = 0; for (unsigned int i = 0; i < PG_st_var_END; i++) { status_variables.stvar[i] = 0; @@ -4417,6 +4424,27 @@ SQLite3_result* PgSQL_Threads_Handler::SQL3_GlobalStatus(bool _memory) { pta[1] = buf; result->add_row(pta); } + { // Transactions poisoned by a mid-tx backend death. See + // pgsql-preserve_client_on_broken_backend_in_tx. + pta[0] = (char*)"pgsql_tx_poisoned_total"; + sprintf(buf, "%llu", get_tx_poisoned_total()); + pta[1] = buf; + result->add_row(pta); + } + { // Poisoned sessions recovered via client ROLLBACK / COMMIT / ABORT. + pta[0] = (char*)"pgsql_tx_poisoned_recovered_total"; + sprintf(buf, "%llu", get_tx_poisoned_recovered_total()); + pta[1] = buf; + result->add_row(pta); + } + { // Client statements rejected with ERROR 25P02 while the session was + // in the poisoned state (includes extended-query P/B/D/C/E/S while + // poisoned, and any non-recovery simple-query statement). + pta[0] = (char*)"pgsql_tx_poisoned_rejected_statements_total"; + sprintf(buf, "%llu", get_tx_poisoned_rejected_statements_total()); + pta[1] = buf; + result->add_row(pta); + } { // Connections created pta[0] = (char*)"Client_Connections_aborted"; sprintf(buf, "%lu", PgHGM->status.client_connections_aborted); @@ -5550,6 +5578,28 @@ unsigned int PgSQL_Threads_Handler::get_active_transations() { return q; } +// Tx-poisoned counter aggregators. These mirror get_active_transations() but +// sum unsigned long long counters. Used when rendering stats_pgsql_global. +#define DEFINE_PG_TXPOISON_GETTER(_name) \ + unsigned long long PgSQL_Threads_Handler::get_##_name() { \ + if ((__sync_fetch_and_add(&status_variables.threads_initialized, 0) == 0) \ + || this->shutdown_) return 0; \ + unsigned long long total = 0; \ + for (unsigned int i = 0; i < num_threads; i++) { \ + if (!pgsql_threads) break; \ + PgSQL_Thread* thr = (PgSQL_Thread*)pgsql_threads[i].worker; \ + if (thr) \ + total += __sync_fetch_and_add(&thr->status_variables._name, 0); \ + } \ + return total; \ + } + +DEFINE_PG_TXPOISON_GETTER(tx_poisoned_total) +DEFINE_PG_TXPOISON_GETTER(tx_poisoned_recovered_total) +DEFINE_PG_TXPOISON_GETTER(tx_poisoned_rejected_statements_total) + +#undef DEFINE_PG_TXPOISON_GETTER + #ifdef IDLE_THREADS unsigned int PgSQL_Threads_Handler::get_non_idle_client_connections() { if ((__sync_fetch_and_add(&status_variables.threads_initialized, 0) == 0) || this->shutdown_) return 0;