Renamed pending_query to extended_query_frame

Improved extended query frame handling
pull/5044/head
Rahim Kanji 10 months ago
parent f350102f72
commit d5fc1fefff

@ -187,7 +187,7 @@ public:
PgSQL_Session* sess;
unsigned char* QueryPointer;
char* stmt_client_name;
const char* stmt_client_name;
PgSQL_STMT_Global_info* stmt_info;
PgSQL_Bind_Message* bind_msg;
SQP_par_t QueryParserArgs;
@ -221,7 +221,7 @@ private:
using PktType = std::variant<std::unique_ptr<PgSQL_Parse_Message>,std::unique_ptr<PgSQL_Describe_Message>,
std::unique_ptr<PgSQL_Close_Message>, std::unique_ptr<PgSQL_Bind_Message>, std::unique_ptr<PgSQL_Execute_Message>>;
std::queue<PktType> pending_packets;
std::queue<PktType> extended_query_frame;
std::unique_ptr<PgSQL_Bind_Message> bind_to_execute;
//int handler_ret;
@ -276,9 +276,10 @@ private:
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_CLOSE(PtrSize_t& pkt);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_BIND(PtrSize_t& pkt);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_EXECUTE(PtrSize_t& pkt);
int handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(PtrSize_t& pkt);
int handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC();
bool handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& st, PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params);
void handler___rc0_PROCESSING_STMT_DESCRIBE_PREPARE(PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params);
int handler___status_PROCESSING_EXTENDED_QUERY_SYNC();
int handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg);
int handle_post_sync_describe_message(PgSQL_Describe_Message* describe_msg);
int handle_post_sync_close_message(PgSQL_Close_Message* close_msg);

@ -309,10 +309,7 @@ enum session_status {
SHOW_WARNINGS,
SETTING_NEXT_ISOLATION_LEVEL,
SETTING_NEXT_TRANSACTION_READ,
//PGSQL_PROCESSING_PARSE,
//PGSQL_PROCESSING_BIND,
//PGSQL_PROCESSING_DESCRIBE,
//PGSQL_PROCESSING_EXECUTE,
PROCESSING_EXTENDED_QUERY_SYNC,
session_status___NONE // special marker
};

@ -722,7 +722,7 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
case PROCESSING_STMT_EXECUTE:
c = (char *)sess->CurrentQuery.stmt_info->query;
ql = sess->CurrentQuery.stmt_info->query_length;
me.set_client_stmt_name(sess->CurrentQuery.stmt_client_name);
me.set_client_stmt_name((char*)sess->CurrentQuery.stmt_client_name);
break;
case PROCESSING_STMT_PREPARE:
default:
@ -733,7 +733,7 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
// global cache and due to that we immediately reply to the client and session doesn't reach
// 'PROCESSING_STMT_PREPARE' state. 'stmt_client_id' is expected to be '0' for anything that isn't
// a prepared statement, still, logging should rely 'log_event_type' instead of this value.
me.set_client_stmt_name(sess->CurrentQuery.stmt_client_name);
me.set_client_stmt_name((char*)sess->CurrentQuery.stmt_client_name);
break;
}
if (c) {

@ -8,6 +8,7 @@ using json = nlohmann::json;
#include "proxysql.h"
#include "cpp.h"
#include "Command_Counter.h"
#include "PgSQL_PreparedStatement.h"
#include "PgSQL_Query_Processor.h"
extern PgSQL_Threads_Handler* GloPTH;
@ -271,14 +272,14 @@ PgSQL_Query_Processor_Output* PgSQL_Query_Processor::process_query(PgSQL_Session
SQP_par_t stmt_exec_qp;
SQP_par_t* qp = NULL;
if (qi) {
// NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE
// NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE or STMT_DESCRIBE
if (ptr) {
qp = (SQP_par_t*)&qi->QueryParserArgs;
} else {
qp = &stmt_exec_qp;
//qp->digest = qi->stmt_info->digest;
//qp->digest_text = qi->stmt_info->digest_text;
//qp->first_comment = qi->stmt_info->first_comment;
qp->digest = qi->stmt_info->digest;
qp->digest_text = qi->stmt_info->digest_text;
qp->first_comment = qi->stmt_info->first_comment;
}
}
#define stackbuffer_size 128

@ -310,10 +310,7 @@ PgSQL_Query_Info::PgSQL_Query_Info() {
PgSQL_Query_Info::~PgSQL_Query_Info() {
GloPgQPro->query_parser_free(&QueryParserArgs);
stmt_info=NULL;
if (stmt_client_name) {
free(stmt_client_name);
stmt_client_name = NULL;
}
stmt_client_name = NULL;
if (bind_msg) {
delete bind_msg;
bind_msg = NULL;
@ -350,10 +347,7 @@ void PgSQL_Query_Info::end() {
stmt_info = NULL;
stmt_backend_id = 0;
stmt_global_id = 0;
if (stmt_client_name) {
free(stmt_client_name);
stmt_client_name = NULL;
}
stmt_client_name = NULL;
if (bind_msg) {
delete bind_msg;
bind_msg = NULL;
@ -373,10 +367,7 @@ void PgSQL_Query_Info::init(unsigned char *_p, int len, bool header) {
stmt_backend_id = 0;
stmt_global_id = 0;
stmt_info = NULL;
if (stmt_client_name) {
free(stmt_client_name);
stmt_client_name = NULL;
}
stmt_client_name = NULL;
if (bind_msg) {
delete bind_msg;
bind_msg = NULL;
@ -2247,8 +2238,7 @@ __get_pkts_from_client:
else {
if (locked_on_hostgroup == -1) {
current_hostgroup = default_hostgroup;
}
else {
} else {
current_hostgroup = locked_on_hostgroup;
}
}
@ -2310,7 +2300,7 @@ __get_pkts_from_client:
l_free(pkt.size, pkt.ptr);
handler_ret = -1;
return handler_ret;
} else if (c == 'P' || c == 'B' || c == 'D' || c == 'E') {
} else if (c == 'P' || c == 'B' || c == 'C' || c == 'D' || c == 'E') {
l_free(pkt.size, pkt.ptr);
continue;
} else {
@ -2505,8 +2495,13 @@ __get_pkts_from_client:
break;
case 'S':
{
__run_sync_again:
int rc = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(pkt);
// we do not need sync packet anymore
l_free(pkt.size, pkt.ptr);
pkt.ptr = NULL;
pkt.size = 0;
__run_sync_again:
int rc = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC();
if (rc == -1) {
handler_ret = -1;
@ -2514,14 +2509,9 @@ __get_pkts_from_client:
}
if (rc == 0) {
if (pending_packets.empty() == false) {
if (extended_query_frame.empty() == false) {
writeout();
goto __run_sync_again;
} else {
// we do not need this packet anymore
l_free(pkt.size, pkt.ptr);
pkt.ptr = NULL;
pkt.size = 0;
}
}
}
@ -2761,9 +2751,11 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds,
}
switch (status) {
case PROCESSING_STMT_EXECUTE:
case PROCESSING_QUERY:
PgSQL_Result_to_PgSQL_wire(myconn, myds);
break;
case PROCESSING_STMT_DESCRIBE:
case PROCESSING_STMT_PREPARE:
client_myds->myprot.generate_error_packet(true, true, myconn->get_error_message().c_str(), myconn->get_error_code(), false);
if (previous_status.size()) {
@ -2774,8 +2766,6 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds,
wrong_pass = true;
}
break;
case PROCESSING_STMT_DESCRIBE:
case PROCESSING_STMT_EXECUTE:
default:
// LCOV_EXCL_START
assert(0);
@ -2902,6 +2892,23 @@ int PgSQL_Session::handler() {
handler_again:
switch (status) {
// FIXME: move it to bottom
case PROCESSING_EXTENDED_QUERY_SYNC:
{
int rc = handler___status_PROCESSING_EXTENDED_QUERY_SYNC();
if (rc == -1) {
handler_ret = -1;
return handler_ret;
}
if (rc == 0 && extended_query_frame.empty() == false) {
writeout();
NEXT_IMMEDIATE(PROCESSING_EXTENDED_QUERY_SYNC);
} else {
goto handler_again;
}
}
break;
case WAITING_CLIENT_DATA:
// housekeeping
handler___status_WAITING_CLIENT_DATA();
@ -3148,6 +3155,7 @@ handler_again:
}
switch (status) {
case PROCESSING_STMT_EXECUTE:
case PROCESSING_QUERY:
PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds);
break;
@ -3162,9 +3170,10 @@ handler_again:
case PROCESSING_STMT_DESCRIBE:
handler___rc0_PROCESSING_STMT_DESCRIBE_PREPARE(myds, prepared_stmt_with_no_params);
break;
case PROCESSING_STMT_EXECUTE:
PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds);
break;
// Handled in PROCESSING_QUERY
//case PROCESSING_STMT_EXECUTE:
// PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds);
// break;
default:
// LCOV_EXCL_START
assert(0);
@ -3246,19 +3255,21 @@ handler_again:
}
}
}
// FIXME: Temporary workaround. Update the logic below when pipeline mode is implemented
if (rc != 1 && pkt.size && pkt.ptr && ((char*)pkt.ptr)[0] == 'S') { // it's a sync packet
// sent sync packet again to client queue, to execute sync in next iteration to handle remaining pending packets
if (pending_packets.empty() == false) {
writeout();
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(pkt);
goto handler_again;
// if query is not in pending state
if (rc != 1) {
if (rc == 0) {
// check if there are messages remaining in extended_query_frame, if yes, process them
if (extended_query_frame.empty() == false) {
writeout();
NEXT_IMMEDIATE(PROCESSING_EXTENDED_QUERY_SYNC);
}
} else {
// we do not need this packet anymore
l_free(pkt.size, pkt.ptr);
pkt.ptr = NULL;
pkt.size = 0;
// incase of error, we discard all pending messages
bind_to_execute.reset(nullptr);
while (extended_query_frame.empty() == false) {
extended_query_frame.pop();
}
// don't change any status
}
}
@ -3266,10 +3277,6 @@ handler_again:
}
break;
case PROCESSING_EXTENDED_QUERY_SYNC:
assert(0); //no handled yet
break;
case SETTING_ISOLATION_LEVEL:
case SETTING_TRANSACTION_READ:
case SETTING_VARIABLE:
@ -5346,7 +5353,6 @@ void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, const unsigned int myerr
previous_hostgroup = current_hostgroup;
}
// this function tries to report all the memory statistics related to the sessions
void PgSQL_Session::Memory_Stats() {
if (thread == NULL)
@ -5839,7 +5845,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
CurrentQuery.begin((unsigned char*)parse_msg->query_string, strlen(parse_msg->query_string) + 1, false);
//FIXME: replace strdup with s_strdup
CurrentQuery.stmt_client_name = strdup(parse_msg->stmt_name ? parse_msg->stmt_name : "");;
CurrentQuery.stmt_client_name = parse_msg->stmt_name;
timespec begint;
timespec endt;
@ -5854,8 +5860,12 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
(begint.tv_sec * 1000000000 + begint.tv_nsec);
}
assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo
// make sure parse packet is not used anymore
auto parse_pkt = parse_msg->detach(); // detach the packet from the parse message
// setting 'prepared' to prevent fetching results from the cache if the digest matches
rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE);
rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE);
if (rc_break == true)
return 0;
@ -5884,69 +5894,82 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
RequestEnd(NULL);
free(buf);
l_free(parse_pkt.size, parse_pkt.ptr);
return 2;
}
}
}
//mybe = find_or_create_backend(current_hostgroup);
PgSQL_STMTs_local_v14* local_stmts = client_myds->myconn->local_stmts;
std::string stmt_name = (char*)CurrentQuery.stmt_client_name; // create a string
//mybe = find_or_create_backend(current_hostgroup);
// if the same statement name is used, we drop it
//FIXME: Revisit this logic
if (auto search = local_stmts->stmt_name_to_global_ids.find(stmt_name);
search != local_stmts->stmt_name_to_global_ids.end()) {
uint64_t client_global_id = search->second;
auto range = local_stmts->global_id_to_stmt_names.equal_range(client_global_id);
assert(range.first != range.second);
for (auto it = range.first; it != range.second; ++it) {
if (it->second == stmt_name) {
local_stmts->global_id_to_stmt_names.erase(it);
PgSQL_STMTs_local_v14* local_stmts = client_myds->myconn->local_stmts;
std::string stmt_name(CurrentQuery.stmt_client_name);
if (auto it = local_stmts->stmt_name_to_global_ids.find(stmt_name);
it != local_stmts->stmt_name_to_global_ids.end()) {
uint64_t global_id = it->second;
auto range = local_stmts->global_id_to_stmt_names.equal_range(global_id);
for (auto iter = range.first; iter != range.second; ++iter) {
if (iter->second == stmt_name) {
local_stmts->global_id_to_stmt_names.erase(iter);
break;
}
}
local_stmts->stmt_name_to_global_ids.erase(search);
client_myds->myconn->local_stmts->client_close(stmt_name);
local_stmts->stmt_name_to_global_ids.erase(it);
local_stmts->client_close(stmt_name);
}
uint64_t hash = client_myds->myconn->local_stmts->compute_hash(
(char*)client_myds->myconn->userinfo->username,
(char*)client_myds->myconn->userinfo->dbname,
// Hash the query
uint64_t hash = local_stmts->compute_hash(
client_myds->myconn->userinfo->username,
client_myds->myconn->userinfo->dbname,
(char*)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength
);
PgSQL_STMT_Global_info* stmt_info = NULL;
// we first lock GloStmt
// Check global statement cache
GloPgStmt->wrlock();
stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash, false);
PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash, false);
if (stmt_info) {
local_stmts->client_insert(stmt_info->statement_id, stmt_name);
CurrentQuery.stmt_global_id = stmt_info->statement_id;
client_myds->setDSS_STATE_QUERY_SENT_NET();
bool send_ready_packet = pending_packets.empty();
unsigned int nTxn = NumActiveTransactions();
const char txn_state = (nTxn ? 'T' : 'I');
char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I';
bool send_ready_packet = extended_query_frame.empty();
client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state);
LogQuery(NULL);
LogQuery(nullptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
CurrentQuery.end_time = thread->curtime;
CurrentQuery.end();
//CurrentQuery.end();
RequestEnd(NULL);
GloPgStmt->unlock();
l_free(parse_pkt.size, parse_pkt.ptr);
return 0;
}
}
GloPgStmt->unlock();
// Fallback: forward to backend
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_STMT_PREPARE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
pause_until = 0;
mybe->server_myds->killed_at = 0;
mybe->server_myds->kill_type = 0;
auto parse_pkt = parse_msg->detach(); // detach the packet from the parse message
mybe->server_myds->pgsql_real_query.init(&parse_pkt); // mem leak fix
mybe->server_myds->pgsql_real_query.init(&parse_pkt); // Transfer packet ownership
mybe->server_myds->statuses.questions++;
client_myds->setDSS_STATE_QUERY_SENT_NET();
return 1;
}
@ -5960,7 +5983,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
//CurrentQuery.begin(nullptr, 0, false);
//FIXME: replace strdup with s_strdup
const char* stmt_client_name = strdup(describe_msg->stmt_name ? describe_msg->stmt_name : "");
const char* stmt_client_name = describe_msg->stmt_name ? describe_msg->stmt_name : "";
uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name);
if (stmt_global_id == 0) {
@ -5969,7 +5992,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
free((void*)stmt_client_name);
//free((void*)stmt_client_name);
return 2;
}
@ -5983,7 +6006,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
free((void*)stmt_client_name);
//free((void*)stmt_client_name);
return 2;
}
CurrentQuery.stmt_client_name = (char*)stmt_client_name;
@ -6012,7 +6035,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
if (stmt_info->stmt_metadata) {
// we have the metadata, so we can send it to the client
client_myds->setDSS_STATE_QUERY_SENT_NET();
bool send_ready_packet = pending_packets.empty();
bool send_ready_packet = extended_query_frame.empty();
unsigned int nTxn = NumActiveTransactions();
const char txn_state = (nTxn ? 'T' : 'I');
client_myds->myprot.generate_describe_completion_packet(true, send_ready_packet, stmt_info->stmt_metadata, txn_state);
@ -6086,7 +6109,7 @@ int PgSQL_Session::handle_post_sync_close_message(PgSQL_Close_Message* close_msg
client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTxn = NumActiveTransactions();
char txn_state = (nTxn ? 'T' : 'I');
bool send_ready = pending_packets.empty();
bool send_ready = extended_query_frame.empty();
client_myds->myprot.generate_close_completion_packet(true, send_ready, txn_state);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
@ -6114,7 +6137,7 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) {
client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTxn = NumActiveTransactions();
char txn_state = (nTxn ? 'T' : 'I');
bool send_ready = pending_packets.empty();
bool send_ready = extended_query_frame.empty();
client_myds->myprot.generate_bind_completion_packet(true, send_ready, txn_state);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
@ -6140,7 +6163,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
return 2;
}
//FIXME: replace strdup with s_strdup
const char* stmt_client_name = strdup(bind_to_execute->stmt_name ? bind_to_execute->stmt_name : "");
const char* stmt_client_name = bind_to_execute->stmt_name;
uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name);
if (stmt_global_id == 0) {
client_myds->setDSS_STATE_QUERY_SENT_NET();
@ -6148,7 +6171,6 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
free((void*)stmt_client_name);
return 2;
}
@ -6161,7 +6183,6 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
client_myds->myprot.generate_error_packet(true, true, err_msg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
free((void*)stmt_client_name);
return 2;
}
@ -6237,7 +6258,8 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
return 1;
}
int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(PtrSize_t& pkt) {
int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC() {
if (session_type != PROXYSQL_SESSION_PGSQL) { // only PgSQL module supports prepared statement!!
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_error_packet(true, false, "Prepared statements not supported", PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED,
@ -6247,16 +6269,23 @@ int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_S
return 0;
}
if (pending_packets.empty()) {
if (extended_query_frame.empty()) {
client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTxn = NumActiveTransactions();
const char txn_state = (nTxn ? 'T' : 'I');
client_myds->myprot.generate_ready_for_query_packet(true, txn_state);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
return 0;
}
return handler___status_PROCESSING_EXTENDED_QUERY_SYNC();
}
int PgSQL_Session::handler___status_PROCESSING_EXTENDED_QUERY_SYNC() {
// we have pending packets, so we will process them now
auto packet = std::move(pending_packets.front()); // get the packet from the queue
pending_packets.pop(); // remove the packet from the queue
auto packet = std::move(extended_query_frame.front()); // get the packet from the queue
extended_query_frame.pop(); // remove the packet from the queue
int rc = -1;
@ -6280,13 +6309,13 @@ int PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_S
if (rc == 2) {
// incase of error, we discard all pending messages
bind_to_execute.reset(nullptr);
while (pending_packets.empty() == false) {
pending_packets.pop();
while (extended_query_frame.empty() == false) {
extended_query_frame.pop();
}
rc = 0;
}
return rc;
return rc;
}
bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(PtrSize_t& pkt) {
@ -6310,7 +6339,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_P
writeout();
return false;
}
pending_packets.push(std::move(parse_msg)); // we will process it later, after sync packet
extended_query_frame.push(std::move(parse_msg)); // we will process it later, after sync packet
return true;
}
@ -6335,7 +6364,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_D
writeout();
return false;
}
pending_packets.push(std::move(describe_msg)); // we will process it later, after sync packet
extended_query_frame.push(std::move(describe_msg)); // we will process it later, after sync packet
return true;
}
@ -6359,7 +6388,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_C
writeout();
return false;
}
pending_packets.push(std::move(close_msg)); // we will process it later, after sync packet
extended_query_frame.push(std::move(close_msg)); // we will process it later, after sync packet
return true;
}
@ -6383,7 +6412,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_B
writeout();
return false;
}
pending_packets.push(std::move(bind_msg)); // we will process it later, after sync packet
extended_query_frame.push(std::move(bind_msg)); // we will process it later, after sync packet
return true;
}
@ -6408,7 +6437,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_E
writeout();
return false;
}
pending_packets.push(std::move(execute_msg)); // we will process it later, after sync packet
extended_query_frame.push(std::move(execute_msg)); // we will process it later, after sync packet
return true;
}
@ -6456,7 +6485,7 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s
return true;
}
bool send_ready_packet = pending_packets.empty();
bool send_ready_packet = extended_query_frame.empty();
char txn_state = myds->myconn->get_transaction_status_char();
client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state);
//if (stmt_info->num_params == 0) {
@ -6470,7 +6499,7 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s
void PgSQL_Session::handler___rc0_PROCESSING_STMT_DESCRIBE_PREPARE(PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params) {
//thread->status_variables.stvar[st_var_backend_stmt_describe]++;
assert(CurrentQuery.stmt_info);
bool send_ready_packet = pending_packets.empty();
bool send_ready_packet = extended_query_frame.empty();
char txn_state = myds->myconn->get_transaction_status_char();
GloPgStmt->wrlock();

Loading…
Cancel
Save