Preserve implicit transactions with pipeline + FLUSH #5118

Previously, each extended-query block was terminated with a SYNC,
which caused implicit transactions to commit prematurely. As a result,
earlier write operations (INSERT/UPDATE/DELETE) could not be rolled
back if a later statement in the same sequence failed.

This change switches to libpq pipeline mode and replaces intermediate
SYNC messages with FLUSH, ensuring that all client query frames execute
as part of the same implicit transaction. A final SYNC is still issued
to resynchronize the connection and make it safe for reuse in the pool.
pull/5119/head
Rahim Kanji 7 months ago
parent ff50bb4ec1
commit 23a764eaac

@ -317,8 +317,40 @@ public:
*/
void stmt_execute_cont(short event);
void reset_session_start();
void reset_session_cont(short event);
/**
* @brief Initiates the asynchronous reset of the PostgreSQL connection.
*
* Starts the internal state machine that resets connection to a clean,
* reusable state so it can safely re-enter the multiplexing pool.
*
*/
void reset_session_start();
/**
* @brief Continues the asynchronous reset of the PostgreSQL session.
*
* This method advances the state machine initiated by reset_session_start()
* to asynchronously reset the backend connection to a clean, reusable state.
*
* @param event The event flag indicating the current I/O event.
*/
void reset_session_cont(short event);
/**
* @brief Start a resynchronization attempt for the current backend connection.
*
* Send protocol-level Sync (or otherwise trigger the backend to reach
* ReadyForQuery) and transition the connection into the resynchronizing state.
*
*/
void resync_start();
/**
* @brief Continue a previously started resynchronization in response to an event.
*
* @param event The event flag indicating the current I/O event.
*/
void resync_cont(short event);
int async_connect(short event);
int async_query(short event, const char* stmt, unsigned long length, const char* backend_stmt_name = nullptr,
@ -326,6 +358,7 @@ public:
int async_ping(short event);
int async_reset_session(short event);
int async_send_simple_command(short event, char* stmt, unsigned long length); // no result set expected
int async_perform_resync(short event);
void next_event(PG_ASYNC_ST new_st);
bool is_connected() const;
@ -436,6 +469,7 @@ public:
void reset_error() { reset_error_info(error_info, false); }
bool reset_session_in_txn = false;
bool reset_session_in_pipeline = false;
PGresult* get_result();
void next_multi_statement_result(PGresult* result);
@ -473,9 +507,8 @@ public:
const char* get_pg_transaction_status_str();
unsigned int get_memory_usage() const;
char get_transaction_status_char();
inline
int get_backend_pid() { return (pgsql_conn) ? get_pg_backend_pid() : -1; }
inline int get_backend_pid() { return (pgsql_conn) ? get_pg_backend_pid() : -1; }
bool is_pipeline_active() { return (PQpipelineStatus(pgsql_conn) != PQ_PIPELINE_OFF); }
static int char_to_encoding(const char* name) {
return pg_char_to_encoding(name);
@ -602,6 +635,8 @@ public:
bool processing_multi_statement;
bool multiplex_delayed;
bool is_client_connection; // true if this is a client connection, false if it is a server connection
bool exit_pipeline_mode; // true if it is safe to exit pipeline mode
bool resync_failed; // true if the last resync attempt failed
PgSQL_STMTs_local_v14* local_stmts;
PgSQL_SrvC *parent;
@ -623,7 +658,8 @@ private:
ASYNC_ST fetch_result_end_st = ASYNC_QUERY_END;
inline void set_fetch_result_end_state(ASYNC_ST st) {
assert(st == ASYNC_QUERY_END || st == ASYNC_STMT_EXECUTE_END ||
st == ASYNC_STMT_DESCRIBE_END || st == ASYNC_STMT_PREPARE_END);
st == ASYNC_STMT_DESCRIBE_END || st == ASYNC_STMT_PREPARE_END ||
st == ASYNC_RESYNC_END);
fetch_result_end_st = st;
}
// Handles the COPY OUT response from the server.

@ -139,7 +139,9 @@ using Parse_Param_Types = std::vector<uint32_t>; // Vector of parameter types fo
enum PgSQL_Extended_Query_Flags : uint8_t {
PGSQL_EXTENDED_QUERY_FLAG_NONE = 0x00,
PGSQL_EXTENDED_QUERY_FLAG_DESCRIBE_PORTAL = 0x01
PGSQL_EXTENDED_QUERY_FLAG_DESCRIBE_PORTAL = 0x01,
PGSQL_EXTENDED_QUERY_FLAG_SYNC = 0x02,
PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE = 0x04,
};
struct PgSQL_Extended_Query_Info {
@ -318,7 +320,7 @@ private:
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
int handler_again___status_PINGING_SERVER();
int handler_again___status_RESETTING_CONNECTION();
int handler_again___status_RESYNCHRONIZING_CONNECTION();
/**
* @brief Initiates a new thread to kill current running query.

@ -116,9 +116,6 @@ enum ASYNC_ST { // MariaDB Async State Machine
ASYNC_STMT_EXECUTE_STORE_RESULT_START,
ASYNC_STMT_EXECUTE_STORE_RESULT_CONT,
ASYNC_STMT_EXECUTE_END,
ASYNC_CLOSE_START,
ASYNC_CLOSE_CONT,
ASYNC_CLOSE_END,
ASYNC_RESET_SESSION_START,
ASYNC_RESET_SESSION_CONT,
ASYNC_RESET_SESSION_END,
@ -128,6 +125,9 @@ enum ASYNC_ST { // MariaDB Async State Machine
ASYNC_STMT_DESCRIBE_START,
ASYNC_STMT_DESCRIBE_CONT,
ASYNC_STMT_DESCRIBE_END,
ASYNC_RESYNC_START,
ASYNC_RESYNC_CONT,
ASYNC_RESYNC_END,
ASYNC_IDLE
};
@ -309,6 +309,7 @@ enum session_status {
SETTING_NEXT_ISOLATION_LEVEL,
SETTING_NEXT_TRANSACTION_READ,
PROCESSING_EXTENDED_QUERY_SYNC,
RESYNCHRONIZING_CONNECTION,
session_status___NONE // special marker
};

@ -513,7 +513,11 @@ void Base_Session<S,DS,B,T>::housekeeping_before_pkts() {
myds->return_MySQL_Connection_To_Pool();
}
} else if constexpr (std::is_same_v<S, PgSQL_Session>) {
myds->return_MySQL_Connection_To_Pool();
if (myds->myconn->is_pipeline_active() == true) {
create_new_session_and_reset_connection(myds);
} else {
myds->return_MySQL_Connection_To_Pool();
}
} else {
assert(0);
}

@ -190,6 +190,8 @@ PgSQL_Connection::PgSQL_Connection(bool is_client_conn) {
new_result = true;
is_copy_out = false;
exit_pipeline_mode = false;
resync_failed = false;
reset_error();
memset(&connected_host_details, 0, sizeof(connected_host_details));
}
@ -522,6 +524,18 @@ handler_again:
set_error(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, "Unable to process 'COPY' command", true);
NEXT_IMMEDIATE(fetch_result_end_st);
break;
case PGRES_PIPELINE_SYNC:
// backend connection is in Ready for Query state, we can now safely exit pipeline mode
exit_pipeline_mode = true;
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_PIPELINE_ABORTED:
// received an extended query immediately after an error was triggered by a previous query (before sync).
// In ProxySQL this should never happen, since the extended query frame is reset after an error.
// However, it may rarely occur if an error is raised during the "describe portal" phase (while executing).
// In that case, we continue until PGRES_PIPELINE_SYNC (Ready for Query state) is received, then safely exit pipeline mode. NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
@ -555,7 +569,6 @@ handler_again:
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) {
proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement);
}
//}
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}
@ -635,7 +648,11 @@ handler_again:
assert(0);
}
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
// if we arrive here via async_perform_resync, the connection is in "Ready for Query" state,
// but query_result will be empty. In this case, we check exit_pipeline_mode; if it is true,
// it indicates a non-error scenario and we skip this check.
if (exit_pipeline_mode == false &&
(query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
// if we reach here we assume that error_info is already set in previous call
if (!is_error_present())
assert(0); // we might have missed setting error_info in previous call
@ -643,22 +660,32 @@ handler_again:
query_result->add_error(NULL);
}
// Normally, ReadyForQuery is not sent immediately if we are in extended query mode
// and there are pending messages in the queue, as it will be sent once the entire
// extended query frame has been processed.
//
// Edge case: if a message fails with an error while the queue still contains pending
// messages, the queue will be cleared later in the session. In this situation,
// ReadyForQuery would never be sent because the pending messages are discarded.
//
// Fix: if the result indicates an error, explicitly send ReadyForQuery immediately.
// The extended query frame will still be reset later in the session.
if (fetch_result_end_st != ASYNC_QUERY_END &&
!myds->sess->is_extended_query_ready_for_query() &&
((query_result->get_result_packet_type() & PGSQL_QUERY_RESULT_ERROR) == 0)) {
// Skip sending ReadyForQuery if there are still extended query messages pending in the queue
NEXT_IMMEDIATE(fetch_result_end_st);
if (fetch_result_end_st != ASYNC_QUERY_END) {
bool has_error = (query_result->get_result_packet_type() & PGSQL_QUERY_RESULT_ERROR) != 0;
// Normally, ReadyForQuery is not sent immediately if we are in extended query mode
// and there are pending messages in the queue, as it will be sent once the entire
// extended query frame has been processed.
//
// Edge case: if a message fails with an error while the queue still contains pending
// messages, the queue will be cleared later in the session. In this situation,
// ReadyForQuery would never be sent because the pending messages are discarded.
//
// Fix: if the result indicates an error, explicitly send ReadyForQuery immediately.
// The extended query frame will still be reset later in the session.
if (!myds->sess->is_extended_query_ready_for_query() && !has_error) {
// Skip sending ReadyForQuery if there are still extended query messages pending in the queue
NEXT_IMMEDIATE(fetch_result_end_st);
}
// An error has occurred while executing extended query sequence,
// and connection is not in 'Ready for Query' state, i.e., unsynchronized.
// To recover, we must resync by sending a SYNC to the backend connection.
if (!exit_pipeline_mode && has_error) {
NEXT_IMMEDIATE(ASYNC_RESYNC_START);
}
}
// finally add ready for query packet
query_result->add_ready_status(PQtransactionStatus(pgsql_conn));
update_bytes_recv(6);
@ -666,72 +693,7 @@ handler_again:
NEXT_IMMEDIATE(fetch_result_end_st);
}
break;
case ASYNC_QUERY_END:
PROXY_TRACE2();
if (is_error_present()) {
compute_unknown_transaction_status();
} else {
unknown_transaction_status = false;
}
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this);
// should be NULL
assert(!pgsql_result);
assert(!is_copy_out);
break;
case ASYNC_RESET_SESSION_START:
reset_session_start();
update_bytes_sent((reset_session_in_txn == false ? (sizeof("DISCARD ALL") + 5) : (sizeof("ROLLBACK") + 5)));
if (async_exit_status) {
next_event(ASYNC_RESET_SESSION_CONT);
} else {
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT);
}
break;
case ASYNC_RESET_SESSION_CONT:
{
if (event) {
reset_session_cont(event);
}
if (async_exit_status) {
if (myds->wait_until != 0 && myds->sess->thread->curtime >= myds->wait_until) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_TIMEOUT);
}
next_event(ASYNC_RESET_SESSION_CONT);
break;
}
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
PGresult* result = get_result();
if (result) {
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
set_error_from_result(result, PGSQL_ERROR_FIELD_ALL);
assert(is_error_present());
}
PQclear(result);
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT);
}
if (reset_session_in_txn) {
//assert(IsKnownActiveTransaction() == false);
reset_session_in_txn = false;
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
break;
case ASYNC_RESET_SESSION_END:
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_FAILED);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_SUCCESSFUL);
break;
case ASYNC_RESET_SESSION_FAILED:
case ASYNC_RESET_SESSION_SUCCESSFUL:
case ASYNC_RESET_SESSION_TIMEOUT:
break;
case ASYNC_STMT_PREPARE_START:
stmt_prepare_start();
__sync_fetch_and_add(&parent->queries_sent, 1);
@ -757,20 +719,6 @@ handler_again:
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
}
break;
case ASYNC_STMT_PREPARE_END:
PROXY_TRACE2();
if (is_error_present()) {
compute_unknown_transaction_status();
proxy_error("Failed to prepare statement: %s\n", get_error_code_with_message().c_str());
} else {
unknown_transaction_status = false;
}
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this);
assert(!pgsql_result);
assert(!is_copy_out);
break;
case ASYNC_STMT_DESCRIBE_START:
stmt_describe_start();
@ -794,20 +742,6 @@ handler_again:
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
}
break;
case ASYNC_STMT_DESCRIBE_END:
PROXY_TRACE2();
if (is_error_present()) {
compute_unknown_transaction_status();
proxy_error("Failed to describe prepared statement: %s\n", get_error_code_with_message().c_str());
} else {
unknown_transaction_status = false;
}
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this);
assert(!pgsql_result);
assert(!is_copy_out);
break;
case ASYNC_STMT_EXECUTE_START:
stmt_execute_start();
@ -832,6 +766,17 @@ handler_again:
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
}
break;
case ASYNC_RESYNC_END:
// if we reach here, it means that the connection is now synchronized
if (resync_failed) {
// if resync failed
set_error(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, "Failed to synchronize connection", false);
}
// fall through
case ASYNC_QUERY_END:
case ASYNC_STMT_PREPARE_END:
case ASYNC_STMT_DESCRIBE_END:
case ASYNC_STMT_EXECUTE_END:
PROXY_TRACE2();
if (is_error_present()) {
@ -839,11 +784,131 @@ handler_again:
} else {
unknown_transaction_status = false;
}
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this);
// we check exit_pipeline_mode to ensure it is safe to exit pipeline mode
if (exit_pipeline_mode &&
PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_ON) {
if (PQexitPipelineMode(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to exit pipeline mode. %s\n", get_error_code_with_message().c_str());
}
exit_pipeline_mode = false;
}
// should be NULL
assert(!pgsql_result);
assert(!is_copy_out);
break;
case ASYNC_RESYNC_START:
if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) {
proxy_warning("Resync not required — connection already synchronized.\n");
NEXT_IMMEDIATE(ASYNC_RESYNC_END);
}
resync_start();
if (async_exit_status) {
next_event(ASYNC_RESYNC_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_RESYNC_END);
}
break;
case ASYNC_RESYNC_CONT:
if (event) {
resync_cont(event);
}
if (async_exit_status) {
if (myds->wait_until != 0 && myds->sess->thread->curtime >= myds->wait_until) {
proxy_error("Timeout waiting for pipeline sync to complete.\n");
resync_failed = true;
NEXT_IMMEDIATE(ASYNC_RESYNC_END);
}
next_event(ASYNC_RESYNC_CONT);
break;
} else {
if (resync_failed == true) {
NEXT_IMMEDIATE(ASYNC_RESYNC_END);
}
if (query_result && query_result->result_packet_type != PGSQL_QUERY_RESULT_NO_DATA) {
// we have already have some result set, so we just continue
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
} else {
set_fetch_result_end_state(ASYNC_RESYNC_END);
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
}
}
break;
case ASYNC_RESET_SESSION_START:
reset_session_start();
if (reset_session_in_pipeline) {
update_bytes_sent(5);
}
else {
update_bytes_sent((reset_session_in_txn == false ? (sizeof("DISCARD ALL") + 5) : (sizeof("ROLLBACK") + 5)));
}
if (async_exit_status) {
next_event(ASYNC_RESET_SESSION_CONT);
}
else {
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT);
}
break;
case ASYNC_RESET_SESSION_CONT:
{
if (event) {
reset_session_cont(event);
}
if (async_exit_status) {
if (myds->wait_until != 0 && myds->sess->thread->curtime >= myds->wait_until) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_TIMEOUT);
}
next_event(ASYNC_RESET_SESSION_CONT);
break;
}
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
PGresult* result = get_result();
if (result) {
if (PQresultStatus(result) != PGRES_COMMAND_OK &&
PQresultStatus(result) != PGRES_PIPELINE_SYNC) {
set_error_from_result(result, PGSQL_ERROR_FIELD_ALL);
assert(is_error_present());
}
PQclear(result);
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT);
}
if (reset_session_in_pipeline) {
if (PQexitPipelineMode(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to exit pipeline mode. %s\n", get_error_code_with_message().c_str());
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
reset_session_in_pipeline = false;
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START);
}
if (reset_session_in_txn) {
reset_session_in_txn = false;
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
break;
case ASYNC_RESET_SESSION_END:
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_FAILED);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_SUCCESSFUL);
break;
case ASYNC_RESET_SESSION_FAILED:
case ASYNC_RESET_SESSION_SUCCESSFUL:
case ASYNC_RESET_SESSION_TIMEOUT:
break;
default:
// not implemented yet
assert(0);
@ -1107,6 +1172,12 @@ void PgSQL_Connection::fetch_result_cont(short event) {
if (PQisBusy(pgsql_conn) == 0) {
result_type = 1;
pgsql_result = PQgetResult(pgsql_conn);
if (!pgsql_result &&
query.extended_query_info &&
(query.extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) != 0) {
pgsql_result = PQgetResult(pgsql_conn);
}
return;
}
break;
@ -1141,10 +1212,15 @@ void PgSQL_Connection::fetch_result_cont(short event) {
}
result_type = 1;
pgsql_result = PQgetResult(pgsql_conn);
if (!pgsql_result &&
query.extended_query_info &&
(query.extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) != 0) {
pgsql_result = PQgetResult(pgsql_conn);
}
}
void PgSQL_Connection::flush() {
reset_error();
int res = PQflush(pgsql_conn);
if (res > 0) {
@ -1333,7 +1409,8 @@ int PgSQL_Connection::async_query(short event, const char* stmt, unsigned long l
if (async_state_machine == ASYNC_QUERY_END ||
async_state_machine == ASYNC_STMT_EXECUTE_END ||
async_state_machine == ASYNC_STMT_DESCRIBE_END ||
async_state_machine == ASYNC_STMT_PREPARE_END) {
async_state_machine == ASYNC_STMT_PREPARE_END ||
async_state_machine == ASYNC_RESYNC_END) {
PROXY_TRACE2();
compute_unknown_transaction_status();
if (is_error_present()) {
@ -1580,15 +1657,42 @@ void PgSQL_Connection::stmt_prepare_start() {
processing_multi_statement = false;
async_exit_status = PG_EVENT_NONE;
if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) {
if (PQenterPipelineMode(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to enter pipeline mode. %s\n", get_error_code_with_message().c_str());
return;
}
}
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this);
const Parse_Param_Types& parse_param_types = query.extended_query_info->parse_param_types;
const PgSQL_Extended_Query_Info* extended_query_info = query.extended_query_info;
const Parse_Param_Types& parse_param_types = extended_query_info->parse_param_types;
if (PQsendPrepare(pgsql_conn, query.backend_stmt_name, query.ptr, parse_param_types.size(), parse_param_types.data()) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send prepare. %s\n", get_error_code_with_message().c_str());
return;
}
// Send a Flush if this is not the last extended query message in the sequence/frame (or is an implicit prepared);
// otherwise, send a SYNC.
if ((extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE) != 0 ||
(extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) == 0) {
if (PQsendFlushRequest(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send flush request. %s\n", get_error_code_with_message().c_str());
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
}
}
flush();
}
@ -1607,6 +1711,14 @@ void PgSQL_Connection::stmt_describe_start() {
processing_multi_statement = false;
async_exit_status = PG_EVENT_NONE;
if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) {
if (PQenterPipelineMode(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to enter pipeline mode. %s\n", get_error_code_with_message().c_str());
return;
}
}
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this);
const PgSQL_Extended_Query_Info* extended_query_info = query.extended_query_info;
@ -1631,6 +1743,23 @@ void PgSQL_Connection::stmt_describe_start() {
proxy_error("Failed to send describe message. %s\n", get_error_code_with_message().c_str());
return;
}
// Send a Flush if this is not the last extended query message in the sequence/frame;
// otherwise, send a SYNC.
if ((extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) == 0) {
if (PQsendFlushRequest(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send flush request. %s\n", get_error_code_with_message().c_str());
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
}
}
flush();
}
@ -1643,14 +1772,58 @@ void PgSQL_Connection::stmt_describe_cont(short event) {
}
}
void PgSQL_Connection::resync_start() {
PROXY_TRACE();
async_exit_status = PG_EVENT_NONE;
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this);
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
proxy_error("Failed to send pipeline sync.\n");
resync_failed = true;
return;
}
async_exit_status = PG_EVENT_WRITE;
}
void PgSQL_Connection::resync_cont(short event) {
PROXY_TRACE();
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6, "event=%d\n", event);
async_exit_status = PG_EVENT_NONE;
if (event & POLLOUT) {
int res = PQflush(pgsql_conn);
if (res > 0) {
async_exit_status = PG_EVENT_WRITE;
} else if (res == 0) {
async_exit_status = PG_EVENT_READ;
} else {
proxy_error("Failed to flush data to backend.\n");
async_exit_status = PG_EVENT_NONE;
resync_failed = true;
}
}
}
void PgSQL_Connection::stmt_execute_start() {
PROXY_TRACE();
reset_error();
processing_multi_statement = false;
async_exit_status = PG_EVENT_NONE;
if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) {
if (PQenterPipelineMode(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to enter pipeline mode. %s\n", get_error_code_with_message().c_str());
return;
}
}
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this);
const PgSQL_Bind_Message* bind_msg = query.extended_query_info->bind_msg;
const PgSQL_Extended_Query_Info* extended_query_info = query.extended_query_info;
const PgSQL_Bind_Message* bind_msg = extended_query_info->bind_msg;
assert(bind_msg); // should never be null
const PgSQL_Bind_Data& bind_data = bind_msg->data(); // will always have valid data
@ -1719,6 +1892,23 @@ void PgSQL_Connection::stmt_execute_start() {
proxy_error("Failed to send execute prepared statement. %s\n", get_error_code_with_message().c_str());
return;
}
// Send a Flush if this is not the last extended query message in the sequence/frame;
// otherwise, send a SYNC.
if ((extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) == 0) {
if (PQsendFlushRequest(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send flush request. %s\n", get_error_code_with_message().c_str());
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
}
}
flush();
}
@ -1736,11 +1926,22 @@ void PgSQL_Connection::reset_session_start() {
assert(pgsql_conn);
reset_error();
async_exit_status = PG_EVENT_NONE;
reset_session_in_txn = IsKnownActiveTransaction();
if (PQsendQuery(pgsql_conn, (reset_session_in_txn == false ? "DISCARD ALL" : "ROLLBACK")) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send query. %s\n", get_error_code_with_message().c_str());
return;
reset_session_in_pipeline = is_pipeline_active();
if (reset_session_in_pipeline) {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
}
} else {
reset_session_in_txn = IsKnownActiveTransaction();
if (PQsendQuery(pgsql_conn, (reset_session_in_txn == false ? "DISCARD ALL" : "ROLLBACK")) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send query. %s\n", get_error_code_with_message().c_str());
return;
}
}
flush();
}
@ -2199,6 +2400,57 @@ int PgSQL_Connection::async_send_simple_command(short event, char* stmt, unsigne
return 1;
}
int PgSQL_Connection::async_perform_resync(short event) {
PROXY_TRACE();
PROXY_TRACE2();
assert(pgsql_conn);
server_status = parent->status; // we copy it here to avoid race condition. The caller will see this
if (IsServerOffline())
return -1;
switch (async_state_machine) {
case ASYNC_RESYNC_END:
processing_multi_statement = false;
break;
case ASYNC_IDLE:
if (myds && myds->sess) {
if (myds->sess->active_transactions == 0) {
myds->sess->active_transactions = 1;
myds->sess->transaction_started_at = myds->sess->thread->curtime;
}
}
async_state_machine = ASYNC_RESYNC_START;
default:
handler(event);
break;
}
if (async_state_machine == ASYNC_RESYNC_END) {
if (myds && myds->sess) {
if (myds->sess->active_transactions != 0) {
myds->sess->active_transactions = 0;
myds->sess->transaction_started_at = 0;
}
}
// We just needed to know if the query was successful, not.
// We discard the result.
if (query_result) {
assert(!query_result_reuse);
query_result->clear();
query_result_reuse = query_result;
query_result = NULL;
}
compute_unknown_transaction_status();
if (resync_failed) {
return -1;
} else {
async_state_machine = ASYNC_IDLE;
return 0;
}
}
return 1;
}
unsigned int PgSQL_Connection::reorder_dynamic_variables_idx() {
dynamic_variables_idx.clear();
// note that we are inserting the index already ordered
@ -2280,13 +2532,18 @@ void PgSQL_Connection::reset() {
options.init_connect_sent = false;
}
auto_increment_delay_token = 0;
exit_pipeline_mode = false;
resync_failed = false;
#ifdef DEBUG
if (pgsql_conn)
assert(PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF);
#endif
}
void PgSQL_Connection::set_status(bool set, uint32_t status_flag) {
if (set) {
this->status_flags |= status_flag;
}
else {
} else {
this->status_flags &= ~status_flag;
}
}

@ -1201,7 +1201,8 @@ bool PgSQL_Data_Stream::data_in_rbio() {
void PgSQL_Data_Stream::reset_connection() {
if (myconn) {
if (pgsql_thread___multiplexing && (DSS == STATE_MARIADB_GENERIC || DSS == STATE_READY) && myconn->reusable == true &&
myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false && myconn->async_state_machine == ASYNC_IDLE) {
myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false && myconn->async_state_machine == ASYNC_IDLE &&
myconn->is_pipeline_active() == false) {
myconn->last_time_used = sess->thread->curtime;
return_MySQL_Connection_To_Pool();
} else {

@ -998,7 +998,8 @@ int PgSQL_Session::handler_again___status_PINGING_SERVER() {
myconn->compute_unknown_transaction_status();
//if (pgsql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
// due to issue #2096 we disable the global check on pgsql_thread___multiplexing
if ((myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if ((myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false &&
myds->myconn->is_pipeline_active() == false) {
myds->return_MySQL_Connection_To_Pool();
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
@ -1102,6 +1103,49 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() {
return 0;
}
int PgSQL_Session::handler_again___status_RESYNCHRONIZING_CONNECTION() {
assert(mybe->server_myds->myconn);
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
if (myds->mypolls == NULL) {
thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, thread->curtime);
}
myds->DSS = STATE_MARIADB_QUERY;
int rc = myconn->async_perform_resync(myds->revents);
if (rc == 0) {
myconn->async_state_machine = ASYNC_IDLE;
myds->DSS = STATE_MARIADB_GENERIC;
RequestEnd(myds, false); // we close the session gracefully
finishQuery(myds, myds->myconn, false);
return 0;
} else {
if (rc == -1) {
const char* code = PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION);;
const char* msg = "Failed to synchronize connection";
if (myconn->is_error_present() == true) {
code = myconn->get_error_code_str();
msg = myconn->get_error_message().c_str();
}
proxy_error("Detected an error during Resynchronization on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %s , %s\n",
myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, code, msg);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, 9999);
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
RequestEnd(myds, true);
return -1;
}
// rc==1 , nothing to do for now
if (myds->mypolls == NULL) {
thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, thread->curtime);
}
}
return 0;
}
void PgSQL_Session::handler_again___new_thread_to_cancel_query() {
PgSQL_Data_Stream* myds = mybe->server_myds;
if (myds->myconn) {
@ -1215,7 +1259,8 @@ bool PgSQL_Session::handler_again___status_SETTING_INIT_CONNECT(int* _rc) {
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting INIT CONNECT", myconn);
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if (rc != -2) { // see PMC-10003
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false &&
myds->myconn->is_pipeline_active() == false) {
retry_conn = true;
}
}
@ -1337,7 +1382,8 @@ bool PgSQL_Session::handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, co
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting ", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false &&
myds->myconn->is_pipeline_active() == false) {
retry_conn = true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
@ -1644,7 +1690,8 @@ bool PgSQL_Session::handler_again___status_RESETTING_CONNECTION(int* _rc) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "during Resetting Connection", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false &&
myds->myconn->is_pipeline_active() == false) {
retry_conn = true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
@ -1670,7 +1717,8 @@ bool PgSQL_Session::handler_again___status_RESETTING_CONNECTION(int* _rc) {
bool retry_conn = false;
proxy_error("Timeout during Resetting Connection on %s , %d\n", myconn->parent->address, myconn->parent->port);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_CHANGE_USER_TIMEOUT);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false &&
myds->myconn->is_pipeline_active() == false) {
retry_conn = true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
@ -2434,7 +2482,8 @@ int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgS
// Retry the query if retries are allowed and conditions permit
if (myds->query_retries_on_failure > 0) {
myds->query_retries_on_failure--;
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false &&
myds->myconn->is_pipeline_active() == false) {
if (myds->myconn->query_result && myds->myconn->query_result->is_transfer_started()) {
// transfer to frontend has started, we cannot retry
} else {
@ -2485,7 +2534,8 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) {
detected_broken_connection(__FILE__, __LINE__, __func__, "running query", myconn, true);
if (myds->query_retries_on_failure > 0) {
myds->query_retries_on_failure--;
if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) {
if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false &&
myconn->is_pipeline_active() == false) {
if (myconn->query_result && myconn->query_result->is_transfer_started()) {
// transfer to frontend has started, we cannot retry
} else {
@ -2552,7 +2602,8 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int
myconn->parent->connect_error(9999);
if (myds->query_retries_on_failure > 0) {
myds->query_retries_on_failure--;
if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) {
if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false &&
myconn->is_pipeline_active() == false) {
retry_conn = true;
proxy_warning("Retrying query.\n");
}
@ -2615,9 +2666,14 @@ void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* my
PgSQL_Connection* myconn = myds->myconn;
if (myconn) {
myconn->reduce_auto_increment_delay_token();
if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) {
if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false &&
myconn->MultiplexDisabled() == false) {
myds->DSS = STATE_NOT_INITIALIZED;
myds->return_MySQL_Connection_To_Pool();
if (myconn->is_pipeline_active() == true) {
create_new_session_and_reset_connection(myds);
} else {
myds->return_MySQL_Connection_To_Pool();
}
} else {
myconn->async_state_machine = ASYNC_IDLE;
myds->DSS = STATE_MARIADB_GENERIC;
@ -2739,7 +2795,15 @@ __handler_again_get_pkts_from_client:
handler_again:
switch (status) {
// FIXME: move it to bottom
case RESYNCHRONIZING_CONNECTION:
{
int rc = handler_again___status_RESYNCHRONIZING_CONNECTION();
if (rc == -1) { // if the sync fails, we destroy the session
handler_ret = -1;
return handler_ret;
}
}
break;
case PROCESSING_EXTENDED_QUERY_SYNC:
{
int rc = handler___status_PROCESSING_EXTENDED_QUERY_SYNC();
@ -2765,6 +2829,10 @@ handler_again:
#ifdef DEBUG
assert(dbg_extended_query_backend_conn == myds->myconn);
#endif
if (myds->myconn->is_pipeline_active() == true) {
NEXT_IMMEDIATE(RESYNCHRONIZING_CONNECTION);
}
// Return to pool if connection is reusable
finishQuery(myds, myds->myconn, false);
}
@ -2857,7 +2925,7 @@ handler_again:
(killed == true) // session was killed by admin
) {
// we only log in case on timing out here. Logging for 'killed' is done in the places that hold that contextual information.
if (mybe->server_myds->myconn && (mybe->server_myds->myconn->async_state_machine != ASYNC_IDLE) && mybe->server_myds->wait_until && (thread->curtime >= mybe->server_myds->wait_until)) {
if (killed == false) {
std::string query{};
if (CurrentQuery.extended_query_info.stmt_info == NULL) { // text protocol
@ -2978,6 +3046,7 @@ handler_again:
PROXY_TRACE();
assert(0);
}
CurrentQuery.extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE;
previous_status.push(status);
NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE);
}
@ -5285,7 +5354,11 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon
myconn->multiplex_delayed = false;
myds->wait_until = 0;
myds->DSS = STATE_NOT_INITIALIZED;
myds->return_MySQL_Connection_To_Pool();
if (myconn->is_pipeline_active() == true) {
create_new_session_and_reset_connection(myds);
} else {
myds->return_MySQL_Connection_To_Pool();
}
}
if (transaction_persistent == true) {
transaction_persistent_hostgroup = -1;
@ -5628,7 +5701,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
this, client_myds, previous_hostgroup);
}
if (pgsql_thread___set_query_lock_on_hostgroup == 1) {
if (pgsql_thread___set_query_lock_on_hostgroup == 1) {
if (locked_on_hostgroup < 0) {
if (lock_hostgroup) {
// we are locking on hostgroup now
@ -5701,10 +5774,13 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
}
GloPgStmt->unlock();
if (extended_query_frame.empty() == true) {
extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC;
}
// Fallback: forward to backend
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_STMT_PREPARE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
mybe->server_myds->killed_at = 0;
@ -5851,6 +5927,10 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
}
}
if (extended_query_frame.empty() == true) {
extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC;
}
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_STMT_DESCRIBE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
@ -6122,6 +6202,11 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
}
}
}
if (extended_query_frame.empty() == true) {
extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC;
}
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_STMT_EXECUTE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
@ -6361,6 +6446,7 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s
st = status;
if (previous_status.empty() == false) {
CurrentQuery.extended_query_info.flags &= ~PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE;
myds->myconn->async_state_machine = ASYNC_IDLE;
myds->DSS = STATE_MARIADB_GENERIC;
st = previous_status.top();

Loading…
Cancel
Save