diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 19c064293..0fa77db3e 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -187,7 +187,7 @@ public: PgSQL_Session* sess; unsigned char* QueryPointer; - char* stmt_client_name; + const char* stmt_client_name; PgSQL_STMT_Global_info* stmt_info; PgSQL_Bind_Message* bind_msg; SQP_par_t QueryParserArgs; @@ -221,7 +221,7 @@ private: using PktType = std::variant,std::unique_ptr, std::unique_ptr, std::unique_ptr, std::unique_ptr>; - std::queue pending_packets; + std::queue extended_query_frame; std::unique_ptr bind_to_execute; //int handler_ret; @@ -276,9 +276,10 @@ private: bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_CLOSE(PtrSize_t& pkt); bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_BIND(PtrSize_t& pkt); bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_EXECUTE(PtrSize_t& pkt); - int handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(PtrSize_t& pkt); + int handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(); bool handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& st, PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params); void handler___rc0_PROCESSING_STMT_DESCRIBE_PREPARE(PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params); + int handler___status_PROCESSING_EXTENDED_QUERY_SYNC(); int handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg); int handle_post_sync_describe_message(PgSQL_Describe_Message* describe_msg); int handle_post_sync_close_message(PgSQL_Close_Message* close_msg); diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index b4fd6fa12..d15dc8ff7 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -309,10 +309,7 @@ enum session_status { SHOW_WARNINGS, SETTING_NEXT_ISOLATION_LEVEL, SETTING_NEXT_TRANSACTION_READ, - //PGSQL_PROCESSING_PARSE, - //PGSQL_PROCESSING_BIND, - //PGSQL_PROCESSING_DESCRIBE, - //PGSQL_PROCESSING_EXECUTE, + PROCESSING_EXTENDED_QUERY_SYNC, session_status___NONE // special marker }; diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index 2aecca79f..8d3d577e4 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -722,7 +722,7 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { case PROCESSING_STMT_EXECUTE: c = (char *)sess->CurrentQuery.stmt_info->query; ql = sess->CurrentQuery.stmt_info->query_length; - me.set_client_stmt_name(sess->CurrentQuery.stmt_client_name); + me.set_client_stmt_name((char*)sess->CurrentQuery.stmt_client_name); break; case PROCESSING_STMT_PREPARE: default: @@ -733,7 +733,7 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { // global cache and due to that we immediately reply to the client and session doesn't reach // 'PROCESSING_STMT_PREPARE' state. 'stmt_client_id' is expected to be '0' for anything that isn't // a prepared statement, still, logging should rely 'log_event_type' instead of this value. - me.set_client_stmt_name(sess->CurrentQuery.stmt_client_name); + me.set_client_stmt_name((char*)sess->CurrentQuery.stmt_client_name); break; } if (c) { diff --git a/lib/PgSQL_Query_Processor.cpp b/lib/PgSQL_Query_Processor.cpp index 246bca18f..b77d93a78 100644 --- a/lib/PgSQL_Query_Processor.cpp +++ b/lib/PgSQL_Query_Processor.cpp @@ -8,6 +8,7 @@ using json = nlohmann::json; #include "proxysql.h" #include "cpp.h" #include "Command_Counter.h" +#include "PgSQL_PreparedStatement.h" #include "PgSQL_Query_Processor.h" extern PgSQL_Threads_Handler* GloPTH; @@ -271,14 +272,14 @@ PgSQL_Query_Processor_Output* PgSQL_Query_Processor::process_query(PgSQL_Session SQP_par_t stmt_exec_qp; SQP_par_t* qp = NULL; if (qi) { - // NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE + // NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE or STMT_DESCRIBE if (ptr) { qp = (SQP_par_t*)&qi->QueryParserArgs; } else { qp = &stmt_exec_qp; - //qp->digest = qi->stmt_info->digest; - //qp->digest_text = qi->stmt_info->digest_text; - //qp->first_comment = qi->stmt_info->first_comment; + qp->digest = qi->stmt_info->digest; + qp->digest_text = qi->stmt_info->digest_text; + qp->first_comment = qi->stmt_info->first_comment; } } #define stackbuffer_size 128 diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 2bd48656d..802be948e 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -310,10 +310,7 @@ PgSQL_Query_Info::PgSQL_Query_Info() { PgSQL_Query_Info::~PgSQL_Query_Info() { GloPgQPro->query_parser_free(&QueryParserArgs); stmt_info=NULL; - if (stmt_client_name) { - free(stmt_client_name); - stmt_client_name = NULL; - } + stmt_client_name = NULL; if (bind_msg) { delete bind_msg; bind_msg = NULL; @@ -350,10 +347,7 @@ void PgSQL_Query_Info::end() { stmt_info = NULL; stmt_backend_id = 0; stmt_global_id = 0; - if (stmt_client_name) { - free(stmt_client_name); - stmt_client_name = NULL; - } + stmt_client_name = NULL; if (bind_msg) { delete bind_msg; bind_msg = NULL; @@ -373,10 +367,7 @@ void PgSQL_Query_Info::init(unsigned char *_p, int len, bool header) { stmt_backend_id = 0; stmt_global_id = 0; stmt_info = NULL; - if (stmt_client_name) { - free(stmt_client_name); - stmt_client_name = NULL; - } + stmt_client_name = NULL; if (bind_msg) { delete bind_msg; bind_msg = NULL; @@ -2247,8 +2238,7 @@ __get_pkts_from_client: else { if (locked_on_hostgroup == -1) { current_hostgroup = default_hostgroup; - } - else { + } else { current_hostgroup = locked_on_hostgroup; } } @@ -2310,7 +2300,7 @@ __get_pkts_from_client: l_free(pkt.size, pkt.ptr); handler_ret = -1; return handler_ret; - } else if (c == 'P' || c == 'B' || c == 'D' || c == 'E') { + } else if (c == 'P' || c == 'B' || c == 'C' || c == 'D' || c == 'E') { l_free(pkt.size, pkt.ptr); continue; } else { @@ -2505,8 +2495,13 @@ __get_pkts_from_client: break; case 'S': { - __run_sync_again: - int rc = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(pkt); + // we do not need sync packet anymore + l_free(pkt.size, pkt.ptr); + pkt.ptr = NULL; + pkt.size = 0; + + __run_sync_again: + int rc = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(); if (rc == -1) { handler_ret = -1; @@ -2514,14 +2509,9 @@ __get_pkts_from_client: } if (rc == 0) { - if (pending_packets.empty() == false) { + if (extended_query_frame.empty() == false) { writeout(); goto __run_sync_again; - } else { - // we do not need this packet anymore - l_free(pkt.size, pkt.ptr); - pkt.ptr = NULL; - pkt.size = 0; } } } @@ -2761,9 +2751,11 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, } switch (status) { + case PROCESSING_STMT_EXECUTE: case PROCESSING_QUERY: PgSQL_Result_to_PgSQL_wire(myconn, myds); break; + case PROCESSING_STMT_DESCRIBE: case PROCESSING_STMT_PREPARE: client_myds->myprot.generate_error_packet(true, true, myconn->get_error_message().c_str(), myconn->get_error_code(), false); if (previous_status.size()) { @@ -2774,8 +2766,6 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, wrong_pass = true; } break; - case PROCESSING_STMT_DESCRIBE: - case PROCESSING_STMT_EXECUTE: default: // LCOV_EXCL_START assert(0); @@ -2902,6 +2892,23 @@ int PgSQL_Session::handler() { handler_again: switch (status) { + // FIXME: move it to bottom + case PROCESSING_EXTENDED_QUERY_SYNC: + { + int rc = handler___status_PROCESSING_EXTENDED_QUERY_SYNC(); + if (rc == -1) { + handler_ret = -1; + return handler_ret; + } + + if (rc == 0 && extended_query_frame.empty() == false) { + writeout(); + NEXT_IMMEDIATE(PROCESSING_EXTENDED_QUERY_SYNC); + } else { + goto handler_again; + } + } + break; case WAITING_CLIENT_DATA: // housekeeping handler___status_WAITING_CLIENT_DATA(); @@ -3148,6 +3155,7 @@ handler_again: } switch (status) { + case PROCESSING_STMT_EXECUTE: case PROCESSING_QUERY: PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds); break; @@ -3162,9 +3170,10 @@ handler_again: case PROCESSING_STMT_DESCRIBE: handler___rc0_PROCESSING_STMT_DESCRIBE_PREPARE(myds, prepared_stmt_with_no_params); break; - case PROCESSING_STMT_EXECUTE: - PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds); - break; + // Handled in PROCESSING_QUERY + //case PROCESSING_STMT_EXECUTE: + // PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds); + // break; default: // LCOV_EXCL_START assert(0); @@ -3246,19 +3255,21 @@ handler_again: } } } - - // FIXME: Temporary workaround. Update the logic below when pipeline mode is implemented - if (rc != 1 && pkt.size && pkt.ptr && ((char*)pkt.ptr)[0] == 'S') { // it's a sync packet - // sent sync packet again to client queue, to execute sync in next iteration to handle remaining pending packets - if (pending_packets.empty() == false) { - writeout(); - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(pkt); - goto handler_again; + // if query is not in pending state + if (rc != 1) { + if (rc == 0) { + // check if there are messages remaining in extended_query_frame, if yes, process them + if (extended_query_frame.empty() == false) { + writeout(); + NEXT_IMMEDIATE(PROCESSING_EXTENDED_QUERY_SYNC); + } } else { - // we do not need this packet anymore - l_free(pkt.size, pkt.ptr); - pkt.ptr = NULL; - pkt.size = 0; + // incase of error, we discard all pending messages + bind_to_execute.reset(nullptr); + while (extended_query_frame.empty() == false) { + extended_query_frame.pop(); + } + // don't change any status } } @@ -3266,10 +3277,6 @@ handler_again: } break; - case PROCESSING_EXTENDED_QUERY_SYNC: - assert(0); //no handled yet - break; - case SETTING_ISOLATION_LEVEL: case SETTING_TRANSACTION_READ: case SETTING_VARIABLE: @@ -5346,7 +5353,6 @@ void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, const unsigned int myerr previous_hostgroup = current_hostgroup; } - // this function tries to report all the memory statistics related to the sessions void PgSQL_Session::Memory_Stats() { if (thread == NULL) @@ -5839,7 +5845,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg CurrentQuery.begin((unsigned char*)parse_msg->query_string, strlen(parse_msg->query_string) + 1, false); //FIXME: replace strdup with s_strdup - CurrentQuery.stmt_client_name = strdup(parse_msg->stmt_name ? parse_msg->stmt_name : "");; + CurrentQuery.stmt_client_name = parse_msg->stmt_name; timespec begint; timespec endt; @@ -5854,8 +5860,12 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg (begint.tv_sec * 1000000000 + begint.tv_nsec); } assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo + + // make sure parse packet is not used anymore + auto parse_pkt = parse_msg->detach(); // detach the packet from the parse message + // setting 'prepared' to prevent fetching results from the cache if the digest matches - rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE); + rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE); if (rc_break == true) return 0; @@ -5884,69 +5894,82 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; RequestEnd(NULL); free(buf); + l_free(parse_pkt.size, parse_pkt.ptr); return 2; } } } - //mybe = find_or_create_backend(current_hostgroup); - PgSQL_STMTs_local_v14* local_stmts = client_myds->myconn->local_stmts; - std::string stmt_name = (char*)CurrentQuery.stmt_client_name; // create a string + //mybe = find_or_create_backend(current_hostgroup); // if the same statement name is used, we drop it //FIXME: Revisit this logic - if (auto search = local_stmts->stmt_name_to_global_ids.find(stmt_name); - search != local_stmts->stmt_name_to_global_ids.end()) { - uint64_t client_global_id = search->second; - auto range = local_stmts->global_id_to_stmt_names.equal_range(client_global_id); - assert(range.first != range.second); - for (auto it = range.first; it != range.second; ++it) { - if (it->second == stmt_name) { - local_stmts->global_id_to_stmt_names.erase(it); + PgSQL_STMTs_local_v14* local_stmts = client_myds->myconn->local_stmts; + std::string stmt_name(CurrentQuery.stmt_client_name); + + if (auto it = local_stmts->stmt_name_to_global_ids.find(stmt_name); + it != local_stmts->stmt_name_to_global_ids.end()) { + + uint64_t global_id = it->second; + auto range = local_stmts->global_id_to_stmt_names.equal_range(global_id); + + for (auto iter = range.first; iter != range.second; ++iter) { + if (iter->second == stmt_name) { + local_stmts->global_id_to_stmt_names.erase(iter); break; } } - local_stmts->stmt_name_to_global_ids.erase(search); - client_myds->myconn->local_stmts->client_close(stmt_name); + + local_stmts->stmt_name_to_global_ids.erase(it); + local_stmts->client_close(stmt_name); } - uint64_t hash = client_myds->myconn->local_stmts->compute_hash( - (char*)client_myds->myconn->userinfo->username, - (char*)client_myds->myconn->userinfo->dbname, + + // Hash the query + uint64_t hash = local_stmts->compute_hash( + client_myds->myconn->userinfo->username, + client_myds->myconn->userinfo->dbname, (char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength ); - PgSQL_STMT_Global_info* stmt_info = NULL; - // we first lock GloStmt + + // Check global statement cache GloPgStmt->wrlock(); - stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash, false); + PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash, false); if (stmt_info) { local_stmts->client_insert(stmt_info->statement_id, stmt_name); CurrentQuery.stmt_global_id = stmt_info->statement_id; + client_myds->setDSS_STATE_QUERY_SENT_NET(); - bool send_ready_packet = pending_packets.empty(); - unsigned int nTxn = NumActiveTransactions(); - const char txn_state = (nTxn ? 'T' : 'I'); + + char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I'; + bool send_ready_packet = extended_query_frame.empty(); + client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state); - LogQuery(NULL); + LogQuery(nullptr); + client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; CurrentQuery.end_time = thread->curtime; - CurrentQuery.end(); + //CurrentQuery.end(); + RequestEnd(NULL); GloPgStmt->unlock(); + l_free(parse_pkt.size, parse_pkt.ptr); return 0; - } + } GloPgStmt->unlock(); + // Fallback: forward to backend mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_STMT_PREPARE; + mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; mybe->server_myds->wait_until = 0; - pause_until = 0; mybe->server_myds->killed_at = 0; mybe->server_myds->kill_type = 0; - auto parse_pkt = parse_msg->detach(); // detach the packet from the parse message - mybe->server_myds->pgsql_real_query.init(&parse_pkt); // mem leak fix + + mybe->server_myds->pgsql_real_query.init(&parse_pkt); // Transfer packet ownership mybe->server_myds->statuses.questions++; + client_myds->setDSS_STATE_QUERY_SENT_NET(); return 1; } @@ -5960,7 +5983,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des //CurrentQuery.begin(nullptr, 0, false); //FIXME: replace strdup with s_strdup - const char* stmt_client_name = strdup(describe_msg->stmt_name ? describe_msg->stmt_name : ""); + const char* stmt_client_name = describe_msg->stmt_name ? describe_msg->stmt_name : ""; uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); if (stmt_global_id == 0) { @@ -5969,7 +5992,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; - free((void*)stmt_client_name); + //free((void*)stmt_client_name); return 2; } @@ -5983,7 +6006,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; - free((void*)stmt_client_name); + //free((void*)stmt_client_name); return 2; } CurrentQuery.stmt_client_name = (char*)stmt_client_name; @@ -6012,7 +6035,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des if (stmt_info->stmt_metadata) { // we have the metadata, so we can send it to the client client_myds->setDSS_STATE_QUERY_SENT_NET(); - bool send_ready_packet = pending_packets.empty(); + bool send_ready_packet = extended_query_frame.empty(); unsigned int nTxn = NumActiveTransactions(); const char txn_state = (nTxn ? 'T' : 'I'); client_myds->myprot.generate_describe_completion_packet(true, send_ready_packet, stmt_info->stmt_metadata, txn_state); @@ -6086,7 +6109,7 @@ int PgSQL_Session::handle_post_sync_close_message(PgSQL_Close_Message* close_msg client_myds->setDSS_STATE_QUERY_SENT_NET(); unsigned int nTxn = NumActiveTransactions(); char txn_state = (nTxn ? 'T' : 'I'); - bool send_ready = pending_packets.empty(); + bool send_ready = extended_query_frame.empty(); client_myds->myprot.generate_close_completion_packet(true, send_ready, txn_state); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; @@ -6114,7 +6137,7 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) { client_myds->setDSS_STATE_QUERY_SENT_NET(); unsigned int nTxn = NumActiveTransactions(); char txn_state = (nTxn ? 'T' : 'I'); - bool send_ready = pending_packets.empty(); + bool send_ready = extended_query_frame.empty(); client_myds->myprot.generate_bind_completion_packet(true, send_ready, txn_state); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; @@ -6140,7 +6163,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu return 2; } //FIXME: replace strdup with s_strdup - const char* stmt_client_name = strdup(bind_to_execute->stmt_name ? bind_to_execute->stmt_name : ""); + const char* stmt_client_name = bind_to_execute->stmt_name; uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); if (stmt_global_id == 0) { client_myds->setDSS_STATE_QUERY_SENT_NET(); @@ -6148,7 +6171,6 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; - free((void*)stmt_client_name); return 2; } @@ -6161,7 +6183,6 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true); client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; - free((void*)stmt_client_name); return 2; } @@ -6237,7 +6258,8 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu return 1; } -int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(PtrSize_t& pkt) { +int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC() { + if (session_type != PROXYSQL_SESSION_PGSQL) { // only PgSQL module supports prepared statement!! client_myds->setDSS_STATE_QUERY_SENT_NET(); client_myds->myprot.generate_error_packet(true, false, "Prepared statements not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED, @@ -6247,16 +6269,23 @@ int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_S return 0; } - if (pending_packets.empty()) { + if (extended_query_frame.empty()) { + client_myds->setDSS_STATE_QUERY_SENT_NET(); unsigned int nTxn = NumActiveTransactions(); const char txn_state = (nTxn ? 'T' : 'I'); client_myds->myprot.generate_ready_for_query_packet(true, txn_state); + client_myds->DSS = STATE_SLEEP; + status = WAITING_CLIENT_DATA; return 0; } + return handler___status_PROCESSING_EXTENDED_QUERY_SYNC(); +} + +int PgSQL_Session::handler___status_PROCESSING_EXTENDED_QUERY_SYNC() { // we have pending packets, so we will process them now - auto packet = std::move(pending_packets.front()); // get the packet from the queue - pending_packets.pop(); // remove the packet from the queue + auto packet = std::move(extended_query_frame.front()); // get the packet from the queue + extended_query_frame.pop(); // remove the packet from the queue int rc = -1; @@ -6280,13 +6309,13 @@ int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_S if (rc == 2) { // incase of error, we discard all pending messages bind_to_execute.reset(nullptr); - while (pending_packets.empty() == false) { - pending_packets.pop(); + while (extended_query_frame.empty() == false) { + extended_query_frame.pop(); } rc = 0; } - return rc; + return rc; } bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(PtrSize_t& pkt) { @@ -6310,7 +6339,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_P writeout(); return false; } - pending_packets.push(std::move(parse_msg)); // we will process it later, after sync packet + extended_query_frame.push(std::move(parse_msg)); // we will process it later, after sync packet return true; } @@ -6335,7 +6364,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_D writeout(); return false; } - pending_packets.push(std::move(describe_msg)); // we will process it later, after sync packet + extended_query_frame.push(std::move(describe_msg)); // we will process it later, after sync packet return true; } @@ -6359,7 +6388,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_C writeout(); return false; } - pending_packets.push(std::move(close_msg)); // we will process it later, after sync packet + extended_query_frame.push(std::move(close_msg)); // we will process it later, after sync packet return true; } @@ -6383,7 +6412,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_B writeout(); return false; } - pending_packets.push(std::move(bind_msg)); // we will process it later, after sync packet + extended_query_frame.push(std::move(bind_msg)); // we will process it later, after sync packet return true; } @@ -6408,7 +6437,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_E writeout(); return false; } - pending_packets.push(std::move(execute_msg)); // we will process it later, after sync packet + extended_query_frame.push(std::move(execute_msg)); // we will process it later, after sync packet return true; } @@ -6456,7 +6485,7 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s return true; } - bool send_ready_packet = pending_packets.empty(); + bool send_ready_packet = extended_query_frame.empty(); char txn_state = myds->myconn->get_transaction_status_char(); client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state); //if (stmt_info->num_params == 0) { @@ -6470,7 +6499,7 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s void PgSQL_Session::handler___rc0_PROCESSING_STMT_DESCRIBE_PREPARE(PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params) { //thread->status_variables.stvar[st_var_backend_stmt_describe]++; assert(CurrentQuery.stmt_info); - bool send_ready_packet = pending_packets.empty(); + bool send_ready_packet = extended_query_frame.empty(); char txn_state = myds->myconn->get_transaction_status_char(); GloPgStmt->wrlock();