From 979b3a81f4dc639016ddb00a67e2869f8421896a Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 30 Apr 2025 14:58:51 +0500 Subject: [PATCH] Added PgSQL_ExplicitTxnStateMgr in session Dump transaction state in proxysql internal session output --- include/PgSQL_Session.h | 2 ++ lib/PgSQL_Session.cpp | 38 ++++++++++++++++++++++---------------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 96975f608..4b2ada944 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -15,6 +15,7 @@ class PgSQL_Query_Result; +class PgSQL_ExplicitTxnStateMgr; //#include "../deps/json/json.hpp" //using json = nlohmann::json; @@ -389,6 +390,7 @@ public: PgSQL_Data_Stream* client_myds; #endif // 0 PgSQL_Data_Stream* server_myds; + PgSQL_ExplicitTxnStateMgr* transaction_state_manager; #if 0 /* * @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 5426a49ce..21767a99a 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -25,7 +25,7 @@ using json = nlohmann::json; #include "ProxySQL_Cluster.hpp" #include "PgSQL_Query_Cache.h" #include "PgSQL_Variables_Validator.h" - +#include "PgSQL_ExplicitTxnStateMgr.h" #include "libinjection.h" #include "libinjection_sqli.h" @@ -579,6 +579,7 @@ PgSQL_Session::PgSQL_Session() { last_HG_affected_rows = -1; // #1421 : advanced support for LAST_INSERT_ID() proxysql_node_address = NULL; use_ldap_auth = false; + transaction_state_manager = new PgSQL_ExplicitTxnStateMgr(this); } void PgSQL_Session::reset() { @@ -678,6 +679,8 @@ PgSQL_Session::~PgSQL_Session() { for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { reset_default_session_variable((enum pgsql_variable_name)i); } + if (transaction_state_manager) + delete transaction_state_manager; } bool PgSQL_Session::handler_CommitRollback(PtrSize_t* pkt) { @@ -777,6 +780,9 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { j["qpo"]["max_lag_ms"] = qpo->max_lag_ms; j["user_attributes"] = (user_attributes ? user_attributes : ""); j["transaction_persistent"] = transaction_persistent; + + transaction_state_manager->fill_internal_session(j["transaction_state"]); + if (client_myds != NULL) { // only if client_myds is defined j["client"]["stream"]["pkts_recv"] = client_myds->pkts_recv; j["client"]["stream"]["pkts_sent"] = client_myds->pkts_sent; @@ -1820,7 +1826,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) { NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); } else { - __exit_handler_again___status_CONNECTING_SERVER_with_err: +__exit_handler_again___status_CONNECTING_SERVER_with_err: bool is_error_present = myconn->is_error_present(); if (is_error_present) { client_myds->myprot.generate_error_packet(true, true, myconn->error_info.message.c_str(), @@ -5463,26 +5469,26 @@ void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, const unsigned int myerr if (status != PROCESSING_STMT_EXECUTE) { qdt = CurrentQuery.get_digest_text(); - } - else { + } else { qdt = CurrentQuery.stmt_info->digest_text; } + // is savepoint currently present in transaction. + int savepoint_count = -1; // haven't checked yet + + // we do not maintain the transaction variable state if the session is locked on a hostgroup + // or is a Fast Forward session. + if (locked_on_hostgroup == -1 && session_fast_forward == SESSION_FORWARD_TYPE_NONE) { + transaction_state_manager->handle_transaction(qdt); + savepoint_count = transaction_state_manager->get_savepoint_count(); + } + if (qdt && myds && myds->myconn) { - myds->myconn->ProcessQueryAndSetStatusFlags(qdt); + myds->myconn->ProcessQueryAndSetStatusFlags(qdt, savepoint_count); } - switch (status) { - /*case PROCESSING_STMT_EXECUTE: - case PROCESSING_STMT_PREPARE: - // if a prepared statement is executed, LogQuery was already called - break; - */ - default: - if (session_fast_forward == SESSION_FORWARD_TYPE_NONE) { - LogQuery(myds); - } - break; + if (session_fast_forward == SESSION_FORWARD_TYPE_NONE) { + LogQuery(myds); } GloPgQPro->delete_QP_out(qpo);