Inject implicit Sync if simple query arrives before extended query cycle is completed

When a simple query arrives while extended query messages are pending,
we now:
- inject an implicit Sync,
- process all extended query messages,
- then execute the simple query,
- and send ReadyForQuery only after the simple query completes.
pull/5078/head
Rahim Kanji 8 months ago
parent be2ae45950
commit 9a5fa148c8

@ -13,7 +13,6 @@
#include "PgSQL_Variables.h"
#include "PgSQL_Variables_Validator.h"
class PgSQL_Query_Result;
class PgSQL_ExplicitTxnStateMgr;
class PgSQL_Parse_Message;
@ -193,10 +192,18 @@ private:
class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQL_Backend, PgSQL_Thread> {
private:
typedef enum ExtendedQueryPhase {
EXTQ_PHASE_IDLE = 0, // No extended query activity
EXTQ_PHASE_BUILDING, // Collecting extended query messages (Parse/Bind/etc.)
EXTQ_PHASE_EXECUTING_SYNC_CLIENT, // Executing after client-initiated Sync
EXTQ_PHASE_EXECUTING_SYNC_IMPLICIT // Executing after implicit Sync (injected)
} ExtendedQueryPhase;
using PktType = std::variant<std::unique_ptr<PgSQL_Parse_Message>,std::unique_ptr<PgSQL_Describe_Message>,
std::unique_ptr<PgSQL_Close_Message>, std::unique_ptr<PgSQL_Bind_Message>, std::unique_ptr<PgSQL_Execute_Message>>;
bool extended_query_exec_qp = false;
ExtendedQueryPhase extended_query_phase{ EXTQ_PHASE_IDLE };
std::queue<PktType> extended_query_frame;
std::unique_ptr<const PgSQL_Bind_Message> bind_waiting_for_execute;
@ -407,6 +414,11 @@ public:
return extended_query_frame.empty();
}
inline bool is_extended_query_ready_for_query() const {
return extended_query_frame.empty() &&
extended_query_phase != EXTQ_PHASE_EXECUTING_SYNC_IMPLICIT;
}
bool handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, const char* var_name, const char* var_value, bool no_quote = false, bool set_transaction = false);
#if 0
bool handler_again___status_SETTING_SQL_LOG_BIN(int*);

@ -658,7 +658,7 @@ handler_again:
// Fix: if the result indicates an error, explicitly send ReadyForQuery immediately.
// The extended query flag will still be reset later in the session.
if (fetch_result_end_st == ASYNC_STMT_EXECUTE_END &&
!myds->sess->is_extended_query_frame_empty() &&
!myds->sess->is_extended_query_ready_for_query() &&
((query_result->get_result_packet_type() & PGSQL_QUERY_RESULT_ERROR) == 0)) {
// Skip sending ReadyForQuery if there are still extended query messages pending in the queue
NEXT_IMMEDIATE(fetch_result_end_st);

@ -1926,7 +1926,6 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
int handler_ret = 0;
unsigned char c;
//__get_pkts_from_client:
// implement a more complex logic to run even in case of mirror
// if client_myds , this is a regular client
@ -1946,6 +1945,35 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
// active at a time, with no client-side changes required.
switch (status) {
case WAITING_CLIENT_DATA:
if (extended_query_frame.empty() == false) {
// peeking message type of the next packet
if (const PtrSize_t* nxt_pkt = client_myds->PSarrayIN->index(0); nxt_pkt->size > 0) {
char msg_type = *static_cast<char*>(nxt_pkt->ptr);
// As noted in https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY,
// "The simple Query message is approximately equivalent to the series Parse, Bind, portal Describe,
// Execute, Close, Sync". This means a simple query implicitly includes a Sync at the end.
//
// To handle this, if there are pending extended query messages in the frame, we first send an
// implicit Sync, process all those extended query messages, and then execute the simple query.
//
// Note: in this case, ReadyForQuery will not be sent immediately after the extended query frame
// is processed, but only after the simple query completes.
if (msg_type == 'Q') {
proxy_warning("Simple query received from client '%s:%d' before extended query cycle completed. Issuing implicit Sync.\n",
client_myds->addr.addr ? client_myds->addr.addr : "", client_myds->addr.port);
PG_pkt pgpkt(5);
pgpkt.put_char('S');
pgpkt.put_uint32(4);
auto [ptr, size] = pgpkt.detach();
pkt.ptr = ptr;
pkt.size = size;
extended_query_phase = EXTQ_PHASE_EXECUTING_SYNC_IMPLICIT;
goto __implicit_sync;
}
}
}
break; // allowed states
case CONNECTING_CLIENT:
break; // allowed states
default:
@ -1960,6 +1988,9 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
if (mirror == false) {
client_myds->PSarrayIN->remove_index(0, &pkt);
}
__implicit_sync:
switch (status) {
case CONNECTING_CLIENT:
@ -2245,30 +2276,35 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
break;
// Extended Query Handling
case 'P':
extended_query_phase = EXTQ_PHASE_BUILDING;
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(pkt) == false) {
handler_ret = -1;
return handler_ret;
}
break;
case 'D':
extended_query_phase = EXTQ_PHASE_BUILDING;
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_DESCRIBE(pkt) == false) {
handler_ret = -1;
return handler_ret;
}
break;
case 'C':
extended_query_phase = EXTQ_PHASE_BUILDING;
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_CLOSE(pkt) == false) {
handler_ret = -1;
return handler_ret;
}
break;
case 'B':
extended_query_phase = EXTQ_PHASE_BUILDING;
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_BIND(pkt) == false) {
handler_ret = -1;
return handler_ret;
}
break;
case 'E':
extended_query_phase = EXTQ_PHASE_BUILDING;
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_EXECUTE(pkt) == false) {
handler_ret = -1;
return handler_ret;
@ -2276,6 +2312,9 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
break;
case 'S':
{
if (extended_query_phase != EXTQ_PHASE_EXECUTING_SYNC_IMPLICIT) {
extended_query_phase = EXTQ_PHASE_EXECUTING_SYNC_CLIENT;
}
#ifdef DEBUG
dbg_extended_query_backend_conn = nullptr;
#endif
@ -2945,7 +2984,8 @@ handler_again:
proxy_error("'%s' command is not supported in Extended Query protocol mode. Use Simple Query mode to run this command\n",
query_to_match ? query_to_match : "");
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_error_packet(true, true, "Feature not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED,
bool send_ready_packet = is_extended_query_ready_for_query();
client_myds->myprot.generate_error_packet(true, send_ready_packet, "Feature not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED,
false, true);
RequestEnd(myds, true);
finishQuery(myds, myconn, false);
@ -5604,7 +5644,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
GloPgStmt->unlock();
client_myds->setDSS_STATE_QUERY_SENT_NET();
char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I';
bool send_ready_packet = extended_query_frame.empty();
bool send_ready_packet = is_extended_query_ready_for_query();
client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state);
//LogQuery(nullptr);
//CurrentQuery.end_time = thread->curtime;
@ -5735,7 +5775,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
if (stmt_info->stmt_metadata) {
// we have the metadata, so we can send it to the client
client_myds->setDSS_STATE_QUERY_SENT_NET();
bool send_ready_packet = extended_query_frame.empty();
bool send_ready_packet = is_extended_query_ready_for_query();
unsigned int nTxn = NumActiveTransactions();
const char txn_state = (nTxn ? 'T' : 'I');
client_myds->myprot.generate_describe_completion_packet(true, send_ready_packet, stmt_info->stmt_metadata,
@ -5823,8 +5863,8 @@ int PgSQL_Session::handle_post_sync_close_message(PgSQL_Close_Message* close_msg
client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTxn = NumActiveTransactions();
char txn_state = (nTxn ? 'T' : 'I');
bool send_ready = extended_query_frame.empty();
client_myds->myprot.generate_close_completion_packet(true, send_ready, txn_state);
bool send_ready_packet = is_extended_query_ready_for_query();
client_myds->myprot.generate_close_completion_packet(true, send_ready_packet, txn_state);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return 0;
@ -5927,8 +5967,8 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) {
client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTxn = NumActiveTransactions();
char txn_state = (nTxn ? 'T' : 'I');
bool send_ready = extended_query_frame.empty();
client_myds->myprot.generate_bind_completion_packet(true, send_ready, txn_state);
bool send_ready_packet = is_extended_query_ready_for_query();
client_myds->myprot.generate_bind_completion_packet(true, send_ready_packet, txn_state);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return 0;
@ -6076,6 +6116,7 @@ void PgSQL_Session::reset_extended_query_frame() {
extended_query_frame.pop();
}
bind_waiting_for_execute.reset(nullptr);
extended_query_phase = EXTQ_PHASE_IDLE;
}
int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC() {
@ -6092,7 +6133,7 @@ int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_S
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Session=%p client_myds=%p. Processing '%lu' pending messages in extended query frame\n",
this, client_myds, extended_query_frame.size());
if (extended_query_frame.empty()) {
if (extended_query_frame.empty() == true) {
client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTxn = NumActiveTransactions();
const char txn_state = (nTxn ? 'T' : 'I');
@ -6305,7 +6346,7 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s
assert(extended_query_info.stmt_client_name);
client_myds->myconn->local_stmts->client_insert(global_stmtid, extended_query_info.stmt_client_name);
bool send_ready_packet = extended_query_frame.empty();
bool send_ready_packet = is_extended_query_ready_for_query();
char txn_state = myds->myconn->get_transaction_status_char();
client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state);
LogQuery(myds);
@ -6317,7 +6358,7 @@ void PgSQL_Session::handler___rc0_PROCESSING_STMT_DESCRIBE_PREPARE(PgSQL_Data_St
//thread->status_variables.stvar[st_var_backend_stmt_describe]++;
const PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info;
assert(extended_query_info.stmt_info);
bool send_ready_packet = extended_query_frame.empty();
bool send_ready_packet = is_extended_query_ready_for_query();
char txn_state = myds->myconn->get_transaction_status_char();
if (extended_query_info.stmt_type == 'S') {

Loading…
Cancel
Save