From b906c08992ca81ad09bcae64d11bd9f575aea3da Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 26 Nov 2024 11:57:31 +0500 Subject: [PATCH] Added dynamic fast forward support --- include/PgSQL_Session.h | 23 +++++++++ lib/PgSQL_Session.cpp | 102 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 11089dfd7..c7578ed13 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -256,6 +256,29 @@ private: void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t* pkt); void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt); + /** + * @brief Switches session from normal mode to fast forward mode. + * + * This method transitions the session to fast forward mode based on session type. + * (Currently only supports SESSION_FORWARD_TYPE_TEMPORARY and extended types) + * + * @param pkt Used solely to push the packet back to client_myds PSarrayIN, + * allowing it to be forwarded to the backend via the fast forward session + * @param command Command that causes the session to switch to fast forward mode. + * @param session_type SESSION_FORWARD_TYPE indicating the type of session. + * + * @return void. + */ + void switch_normal_to_fast_forward_mode(PtrSize_t& pkt, std::string_view command, SESSION_FORWARD_TYPE session_type); + + /** + * @brief Switches session from fast forward mode to normal mode. + * + * This method is used to revert session from fast forward mode back to normal mode. + * + */ + void switch_fast_forward_to_normal_mode(); + public: bool handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, const char* var_name, const char* var_value, bool no_quote = false, bool set_transaction = false); #if 0 diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index e9c5ee484..bb6373ee9 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -6934,3 +6934,105 @@ void PgSQL_Session::set_previous_status_mode3(bool allow_execute) { // LCOV_EXCL_STOP } } + +void PgSQL_Session::switch_normal_to_fast_forward_mode(PtrSize_t& pkt, std::string_view command, SESSION_FORWARD_TYPE session_type) { + + if (session_fast_forward || session_type == SESSION_FORWARD_TYPE_PERMANENT) return; + + // we use a switch to write the command in the info message + std::string client_info; + // we add the client details in the info message + if (client_myds && client_myds->addr.addr) { + client_info += " from client " + std::string(client_myds->addr.addr) + ":" + std::to_string(client_myds->addr.port); + } + proxy_info("Received command '%s'%s. Switching to Fast Forward mode (Session Type:0x%02X)\n", + command.data(), client_info.c_str(), session_type); + session_fast_forward = session_type; + + if (client_myds->PSarrayIN->len) { + proxy_error("UNEXPECTED PACKET FROM CLIENT -- PLEASE REPORT A BUG\n"); + assert(0); + } + client_myds->PSarrayIN->add(pkt.ptr, pkt.size); + + // current_hostgroup should already be set to the correct hostgroup + mybe = find_or_create_backend(current_hostgroup); // set a backend + mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active + // We reinitialize the 'wait_until' since this session shouldn't wait for processing as + // we are now transitioning to 'FAST_FORWARD'. + mybe->server_myds->wait_until = 0; + if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) { + // NOTE: This section is entirely borrowed from 'STATE_SLEEP' for 'session_fast_forward'. + // Check comments there for extra information. + // ============================================================================= + if (mybe->server_myds->max_connect_time == 0) { + uint64_t connect_timeout = + pgsql_thread___connect_timeout_server < pgsql_thread___connect_timeout_server_max ? + pgsql_thread___connect_timeout_server_max : pgsql_thread___connect_timeout_server; + mybe->server_myds->max_connect_time = thread->curtime + connect_timeout * 1000; + } + mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; + CurrentQuery.start_time = thread->curtime; + // ============================================================================= + + // we don't have a connection + previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD + set_status(CONNECTING_SERVER); // now we need a connection + } else { + // In case of having a connection, we need to make user to reset the state machine + // for current server 'PgSQL_Data_Stream' + mybe->server_myds->DSS = STATE_READY; + // myds needs to have encrypted value set correctly + + PgSQL_Data_Stream* myds = mybe->server_myds; + PgSQL_Connection* myconn = myds->myconn; + assert(myconn != NULL); + + // if backend connection uses SSL we will set + // encrypted = true and we will start using the SSL structure + // directly from PGconn SSL structure. + if (myconn->is_connected() && myconn->get_pg_ssl_in_use()) { + SSL* ssl_obj = myconn->get_pg_ssl_object(); + if (ssl_obj != NULL) { + myds->encrypted = true; + myds->ssl = ssl_obj; + myds->rbio_ssl = BIO_new(BIO_s_mem()); + myds->wbio_ssl = BIO_new(BIO_s_mem()); + SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl); + } else { + // it means that ProxySQL tried to use SSL to connect to the backend + // but the backend didn't support SSL + } + } + set_status(FAST_FORWARD); // we can set status to FAST_FORWARD + } +} + +void PgSQL_Session::switch_fast_forward_to_normal_mode() { + if (session_fast_forward == SESSION_FORWARD_TYPE_NONE) return; + + // only handle temporary session ff + if (session_fast_forward & SESSION_FORWARD_TYPE_TEMPORARY) { + // we use a switch to write the command in the info message + std::string client_info; + // we add the client details in the info message + if (client_myds && client_myds->addr.addr) { + client_info += " for client " + std::string(client_myds->addr.addr) + ":" + std::to_string(client_myds->addr.port); + } + + proxy_info("Switching back to Normal mode (Session Type:0x%02X)%s\n", + session_fast_forward, client_info.c_str()); + session_fast_forward = SESSION_FORWARD_TYPE_NONE; + PgSQL_Data_Stream* myds = mybe->server_myds; + PgSQL_Connection* myconn = myds->myconn; + if (myds->encrypted == true) { + myds->encrypted = false; + myds->ssl = NULL; + } + RequestEnd(myds); + finishQuery(myds, myconn, false); + } else { + // cannot switch Permanent Fast Forward to Normal + assert(0); + } +}