diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index 809e1460a..49e17baef 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -317,8 +317,40 @@ public: */ void stmt_execute_cont(short event); - void reset_session_start(); - void reset_session_cont(short event); + /** + * @brief Initiates the asynchronous reset of the PostgreSQL connection. + * + * Starts the internal state machine that resets connection to a clean, + * reusable state so it can safely re-enter the multiplexing pool. + * + */ + void reset_session_start(); + + /** + * @brief Continues the asynchronous reset of the PostgreSQL session. + * + * This method advances the state machine initiated by reset_session_start() + * to asynchronously reset the backend connection to a clean, reusable state. + * + * @param event The event flag indicating the current I/O event. + */ + void reset_session_cont(short event); + + /** + * @brief Start a resynchronization attempt for the current backend connection. + * + * Send protocol-level Sync (or otherwise trigger the backend to reach + * ReadyForQuery) and transition the connection into the resynchronizing state. + * + */ + void resync_start(); + + /** + * @brief Continue a previously started resynchronization in response to an event. + * + * @param event The event flag indicating the current I/O event. + */ + void resync_cont(short event); int async_connect(short event); int async_query(short event, const char* stmt, unsigned long length, const char* backend_stmt_name = nullptr, @@ -326,6 +358,7 @@ public: int async_ping(short event); int async_reset_session(short event); int async_send_simple_command(short event, char* stmt, unsigned long length); // no result set expected + int async_perform_resync(short event); void next_event(PG_ASYNC_ST new_st); bool is_connected() const; @@ -436,6 +469,7 @@ public: void reset_error() { reset_error_info(error_info, false); } bool reset_session_in_txn = false; + bool reset_session_in_pipeline = false; PGresult* get_result(); void next_multi_statement_result(PGresult* result); @@ -473,9 +507,8 @@ public: const char* get_pg_transaction_status_str(); unsigned int get_memory_usage() const; char get_transaction_status_char(); - - inline - int get_backend_pid() { return (pgsql_conn) ? get_pg_backend_pid() : -1; } + inline int get_backend_pid() { return (pgsql_conn) ? get_pg_backend_pid() : -1; } + bool is_pipeline_active() { return (PQpipelineStatus(pgsql_conn) != PQ_PIPELINE_OFF); } static int char_to_encoding(const char* name) { return pg_char_to_encoding(name); @@ -602,6 +635,8 @@ public: bool processing_multi_statement; bool multiplex_delayed; bool is_client_connection; // true if this is a client connection, false if it is a server connection + bool exit_pipeline_mode; // true if it is safe to exit pipeline mode + bool resync_failed; // true if the last resync attempt failed PgSQL_STMTs_local_v14* local_stmts; PgSQL_SrvC *parent; @@ -623,7 +658,8 @@ private: ASYNC_ST fetch_result_end_st = ASYNC_QUERY_END; inline void set_fetch_result_end_state(ASYNC_ST st) { assert(st == ASYNC_QUERY_END || st == ASYNC_STMT_EXECUTE_END || - st == ASYNC_STMT_DESCRIBE_END || st == ASYNC_STMT_PREPARE_END); + st == ASYNC_STMT_DESCRIBE_END || st == ASYNC_STMT_PREPARE_END || + st == ASYNC_RESYNC_END); fetch_result_end_st = st; } // Handles the COPY OUT response from the server. diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index d2ca616ea..643945c6c 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -139,7 +139,9 @@ using Parse_Param_Types = std::vector; // Vector of parameter types fo enum PgSQL_Extended_Query_Flags : uint8_t { PGSQL_EXTENDED_QUERY_FLAG_NONE = 0x00, - PGSQL_EXTENDED_QUERY_FLAG_DESCRIBE_PORTAL = 0x01 + PGSQL_EXTENDED_QUERY_FLAG_DESCRIBE_PORTAL = 0x01, + PGSQL_EXTENDED_QUERY_FLAG_SYNC = 0x02, + PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE = 0x04, }; struct PgSQL_Extended_Query_Info { @@ -318,7 +320,7 @@ private: void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); int handler_again___status_PINGING_SERVER(); int handler_again___status_RESETTING_CONNECTION(); - + int handler_again___status_RESYNCHRONIZING_CONNECTION(); /** * @brief Initiates a new thread to kill current running query. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index f2fb4118b..5eb3fc03f 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -116,9 +116,6 @@ enum ASYNC_ST { // MariaDB Async State Machine ASYNC_STMT_EXECUTE_STORE_RESULT_START, ASYNC_STMT_EXECUTE_STORE_RESULT_CONT, ASYNC_STMT_EXECUTE_END, - ASYNC_CLOSE_START, - ASYNC_CLOSE_CONT, - ASYNC_CLOSE_END, ASYNC_RESET_SESSION_START, ASYNC_RESET_SESSION_CONT, ASYNC_RESET_SESSION_END, @@ -128,6 +125,9 @@ enum ASYNC_ST { // MariaDB Async State Machine ASYNC_STMT_DESCRIBE_START, ASYNC_STMT_DESCRIBE_CONT, ASYNC_STMT_DESCRIBE_END, + ASYNC_RESYNC_START, + ASYNC_RESYNC_CONT, + ASYNC_RESYNC_END, ASYNC_IDLE }; @@ -309,6 +309,7 @@ enum session_status { SETTING_NEXT_ISOLATION_LEVEL, SETTING_NEXT_TRANSACTION_READ, PROCESSING_EXTENDED_QUERY_SYNC, + RESYNCHRONIZING_CONNECTION, session_status___NONE // special marker }; diff --git a/lib/Base_Session.cpp b/lib/Base_Session.cpp index 4163d1c87..55b1250b3 100644 --- a/lib/Base_Session.cpp +++ b/lib/Base_Session.cpp @@ -513,7 +513,11 @@ void Base_Session::housekeeping_before_pkts() { myds->return_MySQL_Connection_To_Pool(); } } else if constexpr (std::is_same_v) { - myds->return_MySQL_Connection_To_Pool(); + if (myds->myconn->is_pipeline_active() == true) { + create_new_session_and_reset_connection(myds); + } else { + myds->return_MySQL_Connection_To_Pool(); + } } else { assert(0); } diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index fcccccc97..ee25f7b23 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -190,6 +190,8 @@ PgSQL_Connection::PgSQL_Connection(bool is_client_conn) { new_result = true; is_copy_out = false; + exit_pipeline_mode = false; + resync_failed = false; reset_error(); memset(&connected_host_details, 0, sizeof(connected_host_details)); } @@ -522,6 +524,18 @@ handler_again: set_error(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, "Unable to process 'COPY' command", true); NEXT_IMMEDIATE(fetch_result_end_st); break; + case PGRES_PIPELINE_SYNC: + // backend connection is in Ready for Query state, we can now safely exit pipeline mode + exit_pipeline_mode = true; + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + break; + case PGRES_PIPELINE_ABORTED: + // received an extended query immediately after an error was triggered by a previous query (before sync). + // In ProxySQL this should never happen, since the extended query frame is reset after an error. + // However, it may rarely occur if an error is raised during the "describe portal" phase (while executing). + // In that case, we continue until PGRES_PIPELINE_SYNC (Ready for Query state) is received, then safely exit pipeline mode. NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + break; case PGRES_BAD_RESPONSE: case PGRES_NONFATAL_ERROR: case PGRES_FATAL_ERROR: @@ -555,7 +569,6 @@ handler_again: error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement); } - //} NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); } @@ -635,7 +648,11 @@ handler_again: assert(0); } - if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { + // if we arrive here via async_perform_resync, the connection is in "Ready for Query" state, + // but query_result will be empty. In this case, we check exit_pipeline_mode; if it is true, + // it indicates a non-error scenario and we skip this check. + if (exit_pipeline_mode == false && + (query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { // if we reach here we assume that error_info is already set in previous call if (!is_error_present()) assert(0); // we might have missed setting error_info in previous call @@ -643,22 +660,32 @@ handler_again: query_result->add_error(NULL); } - // Normally, ReadyForQuery is not sent immediately if we are in extended query mode - // and there are pending messages in the queue, as it will be sent once the entire - // extended query frame has been processed. - // - // Edge case: if a message fails with an error while the queue still contains pending - // messages, the queue will be cleared later in the session. In this situation, - // ReadyForQuery would never be sent because the pending messages are discarded. - // - // Fix: if the result indicates an error, explicitly send ReadyForQuery immediately. - // The extended query frame will still be reset later in the session. - if (fetch_result_end_st != ASYNC_QUERY_END && - !myds->sess->is_extended_query_ready_for_query() && - ((query_result->get_result_packet_type() & PGSQL_QUERY_RESULT_ERROR) == 0)) { - // Skip sending ReadyForQuery if there are still extended query messages pending in the queue - NEXT_IMMEDIATE(fetch_result_end_st); + if (fetch_result_end_st != ASYNC_QUERY_END) { + bool has_error = (query_result->get_result_packet_type() & PGSQL_QUERY_RESULT_ERROR) != 0; + + // Normally, ReadyForQuery is not sent immediately if we are in extended query mode + // and there are pending messages in the queue, as it will be sent once the entire + // extended query frame has been processed. + // + // Edge case: if a message fails with an error while the queue still contains pending + // messages, the queue will be cleared later in the session. In this situation, + // ReadyForQuery would never be sent because the pending messages are discarded. + // + // Fix: if the result indicates an error, explicitly send ReadyForQuery immediately. + // The extended query frame will still be reset later in the session. + if (!myds->sess->is_extended_query_ready_for_query() && !has_error) { + // Skip sending ReadyForQuery if there are still extended query messages pending in the queue + NEXT_IMMEDIATE(fetch_result_end_st); + } + + // An error has occurred while executing extended query sequence, + // and connection is not in 'Ready for Query' state, i.e., unsynchronized. + // To recover, we must resync by sending a SYNC to the backend connection. + if (!exit_pipeline_mode && has_error) { + NEXT_IMMEDIATE(ASYNC_RESYNC_START); + } } + // finally add ready for query packet query_result->add_ready_status(PQtransactionStatus(pgsql_conn)); update_bytes_recv(6); @@ -666,72 +693,7 @@ handler_again: NEXT_IMMEDIATE(fetch_result_end_st); } break; - case ASYNC_QUERY_END: - PROXY_TRACE2(); - if (is_error_present()) { - compute_unknown_transaction_status(); - } else { - unknown_transaction_status = false; - } - PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this); - // should be NULL - assert(!pgsql_result); - assert(!is_copy_out); - break; - case ASYNC_RESET_SESSION_START: - reset_session_start(); - update_bytes_sent((reset_session_in_txn == false ? (sizeof("DISCARD ALL") + 5) : (sizeof("ROLLBACK") + 5))); - if (async_exit_status) { - next_event(ASYNC_RESET_SESSION_CONT); - } else { - if (is_error_present()) { - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); - } - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT); - } - break; - case ASYNC_RESET_SESSION_CONT: - { - if (event) { - reset_session_cont(event); - } - if (async_exit_status) { - if (myds->wait_until != 0 && myds->sess->thread->curtime >= myds->wait_until) { - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_TIMEOUT); - } - next_event(ASYNC_RESET_SESSION_CONT); - break; - } - if (is_error_present()) { - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); - } - PGresult* result = get_result(); - if (result) { - if (PQresultStatus(result) != PGRES_COMMAND_OK) { - set_error_from_result(result, PGSQL_ERROR_FIELD_ALL); - assert(is_error_present()); - } - PQclear(result); - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT); - } - if (reset_session_in_txn) { - //assert(IsKnownActiveTransaction() == false); - reset_session_in_txn = false; - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START); - } - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); - } - break; - case ASYNC_RESET_SESSION_END: - if (is_error_present()) { - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_FAILED); - } - NEXT_IMMEDIATE(ASYNC_RESET_SESSION_SUCCESSFUL); - break; - case ASYNC_RESET_SESSION_FAILED: - case ASYNC_RESET_SESSION_SUCCESSFUL: - case ASYNC_RESET_SESSION_TIMEOUT: - break; + case ASYNC_STMT_PREPARE_START: stmt_prepare_start(); __sync_fetch_and_add(&parent->queries_sent, 1); @@ -757,20 +719,6 @@ handler_again: NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); } break; - case ASYNC_STMT_PREPARE_END: - PROXY_TRACE2(); - if (is_error_present()) { - compute_unknown_transaction_status(); - proxy_error("Failed to prepare statement: %s\n", get_error_code_with_message().c_str()); - } else { - unknown_transaction_status = false; - } - - PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this); - - assert(!pgsql_result); - assert(!is_copy_out); - break; case ASYNC_STMT_DESCRIBE_START: stmt_describe_start(); @@ -794,20 +742,6 @@ handler_again: NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); } break; - case ASYNC_STMT_DESCRIBE_END: - PROXY_TRACE2(); - if (is_error_present()) { - compute_unknown_transaction_status(); - proxy_error("Failed to describe prepared statement: %s\n", get_error_code_with_message().c_str()); - } else { - unknown_transaction_status = false; - } - - PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this); - - assert(!pgsql_result); - assert(!is_copy_out); - break; case ASYNC_STMT_EXECUTE_START: stmt_execute_start(); @@ -832,6 +766,17 @@ handler_again: NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); } break; + + case ASYNC_RESYNC_END: + // if we reach here, it means that the connection is now synchronized + if (resync_failed) { + // if resync failed + set_error(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, "Failed to synchronize connection", false); + } + // fall through + case ASYNC_QUERY_END: + case ASYNC_STMT_PREPARE_END: + case ASYNC_STMT_DESCRIBE_END: case ASYNC_STMT_EXECUTE_END: PROXY_TRACE2(); if (is_error_present()) { @@ -839,11 +784,131 @@ handler_again: } else { unknown_transaction_status = false; } + PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::unhandled_notice_cb, this); + + // we check exit_pipeline_mode to ensure it is safe to exit pipeline mode + if (exit_pipeline_mode && + PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_ON) { + if (PQexitPipelineMode(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to exit pipeline mode. %s\n", get_error_code_with_message().c_str()); + } + exit_pipeline_mode = false; + } // should be NULL assert(!pgsql_result); assert(!is_copy_out); break; + + case ASYNC_RESYNC_START: + if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) { + proxy_warning("Resync not required — connection already synchronized.\n"); + NEXT_IMMEDIATE(ASYNC_RESYNC_END); + } + resync_start(); + if (async_exit_status) { + next_event(ASYNC_RESYNC_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_RESYNC_END); + } + break; + case ASYNC_RESYNC_CONT: + if (event) { + resync_cont(event); + } + if (async_exit_status) { + if (myds->wait_until != 0 && myds->sess->thread->curtime >= myds->wait_until) { + proxy_error("Timeout waiting for pipeline sync to complete.\n"); + resync_failed = true; + NEXT_IMMEDIATE(ASYNC_RESYNC_END); + } + next_event(ASYNC_RESYNC_CONT); + break; + } else { + if (resync_failed == true) { + NEXT_IMMEDIATE(ASYNC_RESYNC_END); + } + if (query_result && query_result->result_packet_type != PGSQL_QUERY_RESULT_NO_DATA) { + // we have already have some result set, so we just continue + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } else { + set_fetch_result_end_state(ASYNC_RESYNC_END); + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); + } + } + break; + + case ASYNC_RESET_SESSION_START: + reset_session_start(); + if (reset_session_in_pipeline) { + update_bytes_sent(5); + } + else { + update_bytes_sent((reset_session_in_txn == false ? (sizeof("DISCARD ALL") + 5) : (sizeof("ROLLBACK") + 5))); + } + if (async_exit_status) { + next_event(ASYNC_RESET_SESSION_CONT); + } + else { + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); + } + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT); + } + break; + case ASYNC_RESET_SESSION_CONT: + { + if (event) { + reset_session_cont(event); + } + if (async_exit_status) { + if (myds->wait_until != 0 && myds->sess->thread->curtime >= myds->wait_until) { + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_TIMEOUT); + } + next_event(ASYNC_RESET_SESSION_CONT); + break; + } + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); + } + PGresult* result = get_result(); + if (result) { + if (PQresultStatus(result) != PGRES_COMMAND_OK && + PQresultStatus(result) != PGRES_PIPELINE_SYNC) { + set_error_from_result(result, PGSQL_ERROR_FIELD_ALL); + assert(is_error_present()); + } + PQclear(result); + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT); + } + if (reset_session_in_pipeline) { + if (PQexitPipelineMode(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to exit pipeline mode. %s\n", get_error_code_with_message().c_str()); + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); + } + reset_session_in_pipeline = false; + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START); + } + if (reset_session_in_txn) { + reset_session_in_txn = false; + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START); + } + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); + } + break; + case ASYNC_RESET_SESSION_END: + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_FAILED); + } + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_SUCCESSFUL); + break; + case ASYNC_RESET_SESSION_FAILED: + case ASYNC_RESET_SESSION_SUCCESSFUL: + case ASYNC_RESET_SESSION_TIMEOUT: + break; + default: // not implemented yet assert(0); @@ -1107,6 +1172,12 @@ void PgSQL_Connection::fetch_result_cont(short event) { if (PQisBusy(pgsql_conn) == 0) { result_type = 1; pgsql_result = PQgetResult(pgsql_conn); + + if (!pgsql_result && + query.extended_query_info && + (query.extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) != 0) { + pgsql_result = PQgetResult(pgsql_conn); + } return; } break; @@ -1141,10 +1212,15 @@ void PgSQL_Connection::fetch_result_cont(short event) { } result_type = 1; pgsql_result = PQgetResult(pgsql_conn); + + if (!pgsql_result && + query.extended_query_info && + (query.extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) != 0) { + pgsql_result = PQgetResult(pgsql_conn); + } } void PgSQL_Connection::flush() { - reset_error(); int res = PQflush(pgsql_conn); if (res > 0) { @@ -1333,7 +1409,8 @@ int PgSQL_Connection::async_query(short event, const char* stmt, unsigned long l if (async_state_machine == ASYNC_QUERY_END || async_state_machine == ASYNC_STMT_EXECUTE_END || async_state_machine == ASYNC_STMT_DESCRIBE_END || - async_state_machine == ASYNC_STMT_PREPARE_END) { + async_state_machine == ASYNC_STMT_PREPARE_END || + async_state_machine == ASYNC_RESYNC_END) { PROXY_TRACE2(); compute_unknown_transaction_status(); if (is_error_present()) { @@ -1580,15 +1657,42 @@ void PgSQL_Connection::stmt_prepare_start() { processing_multi_statement = false; async_exit_status = PG_EVENT_NONE; + if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) { + if (PQenterPipelineMode(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to enter pipeline mode. %s\n", get_error_code_with_message().c_str()); + return; + } + } + PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this); - const Parse_Param_Types& parse_param_types = query.extended_query_info->parse_param_types; + const PgSQL_Extended_Query_Info* extended_query_info = query.extended_query_info; + const Parse_Param_Types& parse_param_types = extended_query_info->parse_param_types; if (PQsendPrepare(pgsql_conn, query.backend_stmt_name, query.ptr, parse_param_types.size(), parse_param_types.data()) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send prepare. %s\n", get_error_code_with_message().c_str()); return; } + + // Send a Flush if this is not the last extended query message in the sequence/frame (or is an implicit prepared); + // otherwise, send a SYNC. + if ((extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE) != 0 || + (extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) == 0) { + if (PQsendFlushRequest(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send flush request. %s\n", get_error_code_with_message().c_str()); + return; + } + } else { + // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher + if (PQpipelineSync(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); + return; + } + } flush(); } @@ -1607,6 +1711,14 @@ void PgSQL_Connection::stmt_describe_start() { processing_multi_statement = false; async_exit_status = PG_EVENT_NONE; + if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) { + if (PQenterPipelineMode(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to enter pipeline mode. %s\n", get_error_code_with_message().c_str()); + return; + } + } + PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this); const PgSQL_Extended_Query_Info* extended_query_info = query.extended_query_info; @@ -1631,6 +1743,23 @@ void PgSQL_Connection::stmt_describe_start() { proxy_error("Failed to send describe message. %s\n", get_error_code_with_message().c_str()); return; } + + // Send a Flush if this is not the last extended query message in the sequence/frame; + // otherwise, send a SYNC. + if ((extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) == 0) { + if (PQsendFlushRequest(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send flush request. %s\n", get_error_code_with_message().c_str()); + return; + } + } else { + // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher + if (PQpipelineSync(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); + return; + } + } flush(); } @@ -1643,14 +1772,58 @@ void PgSQL_Connection::stmt_describe_cont(short event) { } } +void PgSQL_Connection::resync_start() { + PROXY_TRACE(); + async_exit_status = PG_EVENT_NONE; + + PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this); + + // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher + if (PQpipelineSync(pgsql_conn) == 0) { + proxy_error("Failed to send pipeline sync.\n"); + resync_failed = true; + return; + } + async_exit_status = PG_EVENT_WRITE; +} + +void PgSQL_Connection::resync_cont(short event) { + PROXY_TRACE(); + proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6, "event=%d\n", event); + async_exit_status = PG_EVENT_NONE; + if (event & POLLOUT) { + int res = PQflush(pgsql_conn); + + if (res > 0) { + async_exit_status = PG_EVENT_WRITE; + } else if (res == 0) { + async_exit_status = PG_EVENT_READ; + } else { + proxy_error("Failed to flush data to backend.\n"); + async_exit_status = PG_EVENT_NONE; + resync_failed = true; + } + } +} + void PgSQL_Connection::stmt_execute_start() { PROXY_TRACE(); reset_error(); processing_multi_statement = false; async_exit_status = PG_EVENT_NONE; + + if (PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF) { + if (PQenterPipelineMode(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to enter pipeline mode. %s\n", get_error_code_with_message().c_str()); + return; + } + } + PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this); - const PgSQL_Bind_Message* bind_msg = query.extended_query_info->bind_msg; + const PgSQL_Extended_Query_Info* extended_query_info = query.extended_query_info; + const PgSQL_Bind_Message* bind_msg = extended_query_info->bind_msg; assert(bind_msg); // should never be null const PgSQL_Bind_Data& bind_data = bind_msg->data(); // will always have valid data @@ -1719,6 +1892,23 @@ void PgSQL_Connection::stmt_execute_start() { proxy_error("Failed to send execute prepared statement. %s\n", get_error_code_with_message().c_str()); return; } + + // Send a Flush if this is not the last extended query message in the sequence/frame; + // otherwise, send a SYNC. + if ((extended_query_info->flags & PGSQL_EXTENDED_QUERY_FLAG_SYNC) == 0) { + if (PQsendFlushRequest(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send flush request. %s\n", get_error_code_with_message().c_str()); + return; + } + } else { + // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher + if (PQpipelineSync(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); + return; + } + } flush(); } @@ -1736,11 +1926,22 @@ void PgSQL_Connection::reset_session_start() { assert(pgsql_conn); reset_error(); async_exit_status = PG_EVENT_NONE; - reset_session_in_txn = IsKnownActiveTransaction(); - if (PQsendQuery(pgsql_conn, (reset_session_in_txn == false ? "DISCARD ALL" : "ROLLBACK")) == 0) { - set_error_from_PQerrorMessage(); - proxy_error("Failed to send query. %s\n", get_error_code_with_message().c_str()); - return; + + reset_session_in_pipeline = is_pipeline_active(); + if (reset_session_in_pipeline) { + // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher + if (PQpipelineSync(pgsql_conn) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); + return; + } + } else { + reset_session_in_txn = IsKnownActiveTransaction(); + if (PQsendQuery(pgsql_conn, (reset_session_in_txn == false ? "DISCARD ALL" : "ROLLBACK")) == 0) { + set_error_from_PQerrorMessage(); + proxy_error("Failed to send query. %s\n", get_error_code_with_message().c_str()); + return; + } } flush(); } @@ -2199,6 +2400,57 @@ int PgSQL_Connection::async_send_simple_command(short event, char* stmt, unsigne return 1; } +int PgSQL_Connection::async_perform_resync(short event) { + PROXY_TRACE(); + PROXY_TRACE2(); + assert(pgsql_conn); + + server_status = parent->status; // we copy it here to avoid race condition. The caller will see this + if (IsServerOffline()) + return -1; + + switch (async_state_machine) { + case ASYNC_RESYNC_END: + processing_multi_statement = false; + break; + case ASYNC_IDLE: + if (myds && myds->sess) { + if (myds->sess->active_transactions == 0) { + myds->sess->active_transactions = 1; + myds->sess->transaction_started_at = myds->sess->thread->curtime; + } + } + async_state_machine = ASYNC_RESYNC_START; + default: + handler(event); + break; + } + if (async_state_machine == ASYNC_RESYNC_END) { + if (myds && myds->sess) { + if (myds->sess->active_transactions != 0) { + myds->sess->active_transactions = 0; + myds->sess->transaction_started_at = 0; + } + } + // We just needed to know if the query was successful, not. + // We discard the result. + if (query_result) { + assert(!query_result_reuse); + query_result->clear(); + query_result_reuse = query_result; + query_result = NULL; + } + compute_unknown_transaction_status(); + if (resync_failed) { + return -1; + } else { + async_state_machine = ASYNC_IDLE; + return 0; + } + } + return 1; +} + unsigned int PgSQL_Connection::reorder_dynamic_variables_idx() { dynamic_variables_idx.clear(); // note that we are inserting the index already ordered @@ -2280,13 +2532,18 @@ void PgSQL_Connection::reset() { options.init_connect_sent = false; } auto_increment_delay_token = 0; + exit_pipeline_mode = false; + resync_failed = false; +#ifdef DEBUG + if (pgsql_conn) + assert(PQpipelineStatus(pgsql_conn) == PQ_PIPELINE_OFF); +#endif } void PgSQL_Connection::set_status(bool set, uint32_t status_flag) { if (set) { this->status_flags |= status_flag; - } - else { + } else { this->status_flags &= ~status_flag; } } diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index 653503486..a215953f9 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -1201,7 +1201,8 @@ bool PgSQL_Data_Stream::data_in_rbio() { void PgSQL_Data_Stream::reset_connection() { if (myconn) { if (pgsql_thread___multiplexing && (DSS == STATE_MARIADB_GENERIC || DSS == STATE_READY) && myconn->reusable == true && - myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false && myconn->async_state_machine == ASYNC_IDLE) { + myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false && myconn->async_state_machine == ASYNC_IDLE && + myconn->is_pipeline_active() == false) { myconn->last_time_used = sess->thread->curtime; return_MySQL_Connection_To_Pool(); } else { diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 9af9b2534..b1dba60cd 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -998,7 +998,8 @@ int PgSQL_Session::handler_again___status_PINGING_SERVER() { myconn->compute_unknown_transaction_status(); //if (pgsql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { // due to issue #2096 we disable the global check on pgsql_thread___multiplexing - if ((myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { + if ((myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false && + myds->myconn->is_pipeline_active() == false) { myds->return_MySQL_Connection_To_Pool(); } else { myds->destroy_MySQL_Connection_From_Pool(true); @@ -1102,6 +1103,49 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() { return 0; } +int PgSQL_Session::handler_again___status_RESYNCHRONIZING_CONNECTION() { + assert(mybe->server_myds->myconn); + PgSQL_Data_Stream* myds = mybe->server_myds; + PgSQL_Connection* myconn = myds->myconn; + if (myds->mypolls == NULL) { + thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, thread->curtime); + } + myds->DSS = STATE_MARIADB_QUERY; + int rc = myconn->async_perform_resync(myds->revents); + if (rc == 0) { + myconn->async_state_machine = ASYNC_IDLE; + myds->DSS = STATE_MARIADB_GENERIC; + RequestEnd(myds, false); // we close the session gracefully + finishQuery(myds, myds->myconn, false); + return 0; + } else { + if (rc == -1) { + const char* code = PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION);; + const char* msg = "Failed to synchronize connection"; + + if (myconn->is_error_present() == true) { + code = myconn->get_error_code_str(); + msg = myconn->get_error_message().c_str(); + } + proxy_error("Detected an error during Resynchronization on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %s , %s\n", + myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, code, msg); + + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, 9999); + + myds->destroy_MySQL_Connection_From_Pool(false); + myds->fd = 0; + RequestEnd(myds, true); + return -1; + } + + // rc==1 , nothing to do for now + if (myds->mypolls == NULL) { + thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, thread->curtime); + } + } + return 0; +} + void PgSQL_Session::handler_again___new_thread_to_cancel_query() { PgSQL_Data_Stream* myds = mybe->server_myds; if (myds->myconn) { @@ -1215,7 +1259,8 @@ bool PgSQL_Session::handler_again___status_SETTING_INIT_CONNECT(int* _rc) { detected_broken_connection(__FILE__, __LINE__, __func__, "while setting INIT CONNECT", myconn); //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { if (rc != -2) { // see PMC-10003 - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { + if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false && + myds->myconn->is_pipeline_active() == false) { retry_conn = true; } } @@ -1337,7 +1382,8 @@ bool PgSQL_Session::handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, co bool retry_conn = false; // client error, serious detected_broken_connection(__FILE__, __LINE__, __func__, "while setting ", myconn); - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { + if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false && + myds->myconn->is_pipeline_active() == false) { retry_conn = true; } myds->destroy_MySQL_Connection_From_Pool(false); @@ -1644,7 +1690,8 @@ bool PgSQL_Session::handler_again___status_RESETTING_CONNECTION(int* _rc) { bool retry_conn = false; // client error, serious detected_broken_connection(__FILE__, __LINE__, __func__, "during Resetting Connection", myconn); - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { + if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false && + myds->myconn->is_pipeline_active() == false) { retry_conn = true; } myds->destroy_MySQL_Connection_From_Pool(false); @@ -1670,7 +1717,8 @@ bool PgSQL_Session::handler_again___status_RESETTING_CONNECTION(int* _rc) { bool retry_conn = false; proxy_error("Timeout during Resetting Connection on %s , %d\n", myconn->parent->address, myconn->parent->port); PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_CHANGE_USER_TIMEOUT); - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { + if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false && + myds->myconn->is_pipeline_active() == false) { retry_conn = true; } myds->destroy_MySQL_Connection_From_Pool(false); @@ -2434,7 +2482,8 @@ int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgS // Retry the query if retries are allowed and conditions permit if (myds->query_retries_on_failure > 0) { myds->query_retries_on_failure--; - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { + if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false && + myds->myconn->is_pipeline_active() == false) { if (myds->myconn->query_result && myds->myconn->query_result->is_transfer_started()) { // transfer to frontend has started, we cannot retry } else { @@ -2485,7 +2534,8 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) { detected_broken_connection(__FILE__, __LINE__, __func__, "running query", myconn, true); if (myds->query_retries_on_failure > 0) { myds->query_retries_on_failure--; - if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) { + if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false && + myconn->is_pipeline_active() == false) { if (myconn->query_result && myconn->query_result->is_transfer_started()) { // transfer to frontend has started, we cannot retry } else { @@ -2552,7 +2602,8 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int myconn->parent->connect_error(9999); if (myds->query_retries_on_failure > 0) { myds->query_retries_on_failure--; - if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) { + if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false && + myconn->is_pipeline_active() == false) { retry_conn = true; proxy_warning("Retrying query.\n"); } @@ -2615,9 +2666,14 @@ void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* my PgSQL_Connection* myconn = myds->myconn; if (myconn) { myconn->reduce_auto_increment_delay_token(); - if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) { + if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false && + myconn->MultiplexDisabled() == false) { myds->DSS = STATE_NOT_INITIALIZED; - myds->return_MySQL_Connection_To_Pool(); + if (myconn->is_pipeline_active() == true) { + create_new_session_and_reset_connection(myds); + } else { + myds->return_MySQL_Connection_To_Pool(); + } } else { myconn->async_state_machine = ASYNC_IDLE; myds->DSS = STATE_MARIADB_GENERIC; @@ -2739,7 +2795,15 @@ __handler_again_get_pkts_from_client: handler_again: switch (status) { - // FIXME: move it to bottom + case RESYNCHRONIZING_CONNECTION: + { + int rc = handler_again___status_RESYNCHRONIZING_CONNECTION(); + if (rc == -1) { // if the sync fails, we destroy the session + handler_ret = -1; + return handler_ret; + } + } + break; case PROCESSING_EXTENDED_QUERY_SYNC: { int rc = handler___status_PROCESSING_EXTENDED_QUERY_SYNC(); @@ -2765,6 +2829,10 @@ handler_again: #ifdef DEBUG assert(dbg_extended_query_backend_conn == myds->myconn); #endif + if (myds->myconn->is_pipeline_active() == true) { + NEXT_IMMEDIATE(RESYNCHRONIZING_CONNECTION); + } + // Return to pool if connection is reusable finishQuery(myds, myds->myconn, false); } @@ -2857,7 +2925,7 @@ handler_again: (killed == true) // session was killed by admin ) { // we only log in case on timing out here. Logging for 'killed' is done in the places that hold that contextual information. - if (mybe->server_myds->myconn && (mybe->server_myds->myconn->async_state_machine != ASYNC_IDLE) && mybe->server_myds->wait_until && (thread->curtime >= mybe->server_myds->wait_until)) { + if (killed == false) { std::string query{}; if (CurrentQuery.extended_query_info.stmt_info == NULL) { // text protocol @@ -2978,6 +3046,7 @@ handler_again: PROXY_TRACE(); assert(0); } + CurrentQuery.extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE; previous_status.push(status); NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE); } @@ -5285,7 +5354,11 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon myconn->multiplex_delayed = false; myds->wait_until = 0; myds->DSS = STATE_NOT_INITIALIZED; - myds->return_MySQL_Connection_To_Pool(); + if (myconn->is_pipeline_active() == true) { + create_new_session_and_reset_connection(myds); + } else { + myds->return_MySQL_Connection_To_Pool(); + } } if (transaction_persistent == true) { transaction_persistent_hostgroup = -1; @@ -5628,7 +5701,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg this, client_myds, previous_hostgroup); } - if (pgsql_thread___set_query_lock_on_hostgroup == 1) { + if (pgsql_thread___set_query_lock_on_hostgroup == 1) { if (locked_on_hostgroup < 0) { if (lock_hostgroup) { // we are locking on hostgroup now @@ -5701,10 +5774,13 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg } GloPgStmt->unlock(); + if (extended_query_frame.empty() == true) { + extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC; + } + // Fallback: forward to backend mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_STMT_PREPARE; - mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; mybe->server_myds->wait_until = 0; mybe->server_myds->killed_at = 0; @@ -5851,6 +5927,10 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des } } + if (extended_query_frame.empty() == true) { + extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC; + } + mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_STMT_DESCRIBE; mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; @@ -6122,6 +6202,11 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu } } } + + if (extended_query_frame.empty() == true) { + extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC; + } + mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_STMT_EXECUTE; mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; @@ -6361,6 +6446,7 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s st = status; if (previous_status.empty() == false) { + CurrentQuery.extended_query_info.flags &= ~PGSQL_EXTENDED_QUERY_FLAG_IMPLICIT_PREPARE; myds->myconn->async_state_machine = ASYNC_IDLE; myds->DSS = STATE_MARIADB_GENERIC; st = previous_status.top();