Added SET statement tracking support

v3.0_extended_query_protocol
Rahim Kanji 7 months ago
parent ce94a7080a
commit 02eb58db79

@ -235,9 +235,9 @@ private:
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PROCESS_KILL(PtrSize_t*);
#endif
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t*, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type = PGSQL_EXTENDED_QUERY_TYPE_NOT_SET);
void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection();
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t*, bool* lock_hostgroup,
PgSQL_Extended_Query_Type stmt_type = PGSQL_EXTENDED_QUERY_TYPE_NOT_SET);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(PtrSize_t& pkt);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_DESCRIBE(PtrSize_t& pkt);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_CLOSE(PtrSize_t& pkt);
@ -253,6 +253,7 @@ private:
int handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg);
int handle_post_sync_execute_message(PgSQL_Execute_Message* execute_msg);
void handle_post_sync_error(PGSQL_ERROR_CODES errcode, const char* errmsg, bool fatal);
void handle_post_sync_locked_on_hostgroup_error(const char* query, int query_len);
void reset_extended_query_frame();

@ -20,7 +20,7 @@ using json = nlohmann::json;
#else
#define DEB ""
#endif /* DEBUG */
#define PROXYSQL_MYSQL_LOGGER_VERSION "2.5.0421" DEB
#define PROXYSQL_PGSQL_LOGGER_VERSION "2.5.0421" DEB
extern PgSQL_Logger *GloPgSQL_Logger;
@ -1042,6 +1042,6 @@ unsigned int PgSQL_Logger::audit_find_next_id() {
}
void PgSQL_Logger::print_version() {
fprintf(stderr,"Standard ProxySQL MySQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__);
fprintf(stderr,"Standard ProxySQL PgSQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__);
};

@ -274,7 +274,7 @@ PgSQL_Query_Processor_Output* PgSQL_Query_Processor::process_query(PgSQL_Session
if (qi) {
// NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE or STMT_DESCRIBE
if (ptr) {
qp = (SQP_par_t*)&qi->QueryParserArgs;
qp = &qi->QueryParserArgs;
} else {
qp = &stmt_exec_qp;
qp->digest = qi->extended_query_info.stmt_info->digest;
@ -296,10 +296,9 @@ PgSQL_Query_Processor_Output* PgSQL_Query_Processor::process_query(PgSQL_Session
}
memcpy(query, ptr, len);
query[len-1] = 0;
}
else {
//query = qi->stmt_info->query;
//len = qi->stmt_info->query_length;
} else {
query = qi->extended_query_info.stmt_info->query;
len = qi->extended_query_info.stmt_info->query_length;
}
Query_Processor::process_query(sess, ptr == NULL, query, len, ret, qp);

@ -2341,7 +2341,7 @@ __get_pkts_from_client:
if (qpo->max_lag_ms >= 0) {
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
}
rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup);
rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&pkt, &lock_hostgroup);
if (mirror == false && rc_break == false) {
if (pgsql_thread___automatic_detect_sqli) {
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_detect_SQLi()) {
@ -2796,10 +2796,11 @@ int PgSQL_Session::RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) {
case PROCESSING_STMT_EXECUTE:
assert(CurrentQuery.extended_query_info.stmt_backend_id);
{
PgSQL_Extended_Query_Type type = (status == PROCESSING_STMT_DESCRIBE) ? PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE : PGSQL_EXTENDED_QUERY_TYPE_EXECUTE;
const std::string& backend_stmt_name = std::string(PROXYSQL_PS_PREFIX) + std::to_string(CurrentQuery.extended_query_info.stmt_backend_id);
rc = myconn->async_query(myds->revents, (char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength,
backend_stmt_name.c_str(), type, &CurrentQuery.extended_query_info);
PgSQL_Extended_Query_Type type =
(status == PROCESSING_STMT_DESCRIBE) ? PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE : PGSQL_EXTENDED_QUERY_TYPE_EXECUTE;
const std::string& backend_stmt_name =
std::string(PROXYSQL_PS_PREFIX) + std::to_string(CurrentQuery.extended_query_info.stmt_backend_id);
rc = myconn->async_query(myds->revents, nullptr, 0, backend_stmt_name.c_str(), type, &CurrentQuery.extended_query_info);
// Handle edge case: Since libpq automatically sends a Sync after Execute,
// the Bind message is no longer pending on the backend. We must reset
@ -3109,13 +3110,15 @@ handler_again:
proxy_error("Session %p, status %d, CurrentQuery.stmt_info is NULL\n", this, status);
assert(0);
}
CurrentQuery.QueryLength = CurrentQuery.extended_query_info.stmt_info->query_length;
CurrentQuery.QueryPointer = (unsigned char*)CurrentQuery.extended_query_info.stmt_info->query;
// NOTE: Update 'first_comment' with the 'first_comment' from the retrieved
// 'stmt_info' from the found prepared statement. 'CurrentQuery' requires its
// own copy of 'first_comment' because it will later be free by 'QueryInfo::end'.
if (CurrentQuery.extended_query_info.stmt_info->first_comment) {
CurrentQuery.QueryParserArgs.first_comment = strdup(CurrentQuery.extended_query_info.stmt_info->first_comment);
if (status == PROCESSING_STMT_DESCRIBE) {
CurrentQuery.QueryLength = CurrentQuery.extended_query_info.stmt_info->query_length;
CurrentQuery.QueryPointer = (unsigned char*)CurrentQuery.extended_query_info.stmt_info->query;
// NOTE: Update 'first_comment' with the 'first_comment' from the retrieved
// 'stmt_info' from the found prepared statement. 'CurrentQuery' requires its
// own copy of 'first_comment' because it will later be free by 'QueryInfo::end'.
if (CurrentQuery.extended_query_info.stmt_info->first_comment) {
CurrentQuery.QueryParserArgs.first_comment = strdup(CurrentQuery.extended_query_info.stmt_info->first_comment);
}
}
if (CurrentQuery.extended_query_info.stmt_info->parse_param_types.empty() == false) {
CurrentQuery.extended_query_info.parse_param_types = CurrentQuery.extended_query_info.stmt_info->parse_param_types;
@ -3893,7 +3896,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
#endif
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo
void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt) {
// the query was rewritten
l_free(pkt->size, pkt->ptr); // free old pkt
@ -3923,7 +3926,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt) {
}
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo
void PgSQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t* pkt) {
client_myds->DSS = STATE_QUERY_SENT_NET;
@ -3934,7 +3937,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t* pkt) {
l_free(pkt->size, pkt->ptr);
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo
void PgSQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t* pkt) {
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_error_packet(true, true, qpo->error_msg,
@ -3943,7 +3946,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t* pkt) {
l_free(pkt->size, pkt->ptr);
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo
void PgSQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt) {
// ER_NET_PACKET_TOO_LARGE
client_myds->DSS = STATE_QUERY_SENT_NET;
@ -3953,7 +3956,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt) {
l_free(pkt->size, pkt->ptr);
}
bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t* pkt, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type) {
bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t* pkt, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type) {
/*
lock_hostgroup:
If this variable is set to true, this session will get lock to a
@ -3985,11 +3988,6 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return true;
}
if (stmt_type == PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE ||
stmt_type == PGSQL_EXTENDED_QUERY_TYPE_EXECUTE) { // for Describe and Execute we exit here
goto __exit_set_destination_hostgroup;
}
// Check if the session is not locked on a hostgroup and there are untracked option parameters
if (locked_on_hostgroup < 0 && untracked_option_parameters.empty() == false) {
if (client_myds && client_myds->addr.addr) {
@ -4007,6 +4005,11 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return false;
}
if (stmt_type == PGSQL_EXTENDED_QUERY_TYPE_PARSE ||
stmt_type == PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE) { // for Parse and Describe we exit here
goto __exit_set_destination_hostgroup;
}
// handle here #509, #815 and #816
if (CurrentQuery.QueryParserArgs.digest_text) {
char* dig = CurrentQuery.QueryParserArgs.digest_text;
@ -4129,9 +4132,9 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
client_myds->DSS = STATE_QUERY_SENT_NET;
client_myds->myprot.generate_error_packet(true, true, errmsg,
PGSQL_ERROR_CODES::ERRCODE_INVALID_PARAMETER_VALUE, false, true);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
free(errmsg);
RequestEnd(NULL);
l_free(pkt->size, pkt->ptr);
return true;
}
@ -4603,8 +4606,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
if (mirror == false) {
RequestEnd(NULL);
}
else {
} else {
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
}
@ -4762,7 +4764,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return true;
}
//}
if (qpo->cache_ttl > 0 && ((stmt_type & PGSQL_EXTENDED_QUERY_TYPE_PARSE) == 0)) {
if (qpo->cache_ttl > 0 && ((stmt_type & PGSQL_EXTENDED_QUERY_TYPE_EXECUTE) == 0)) {
const std::shared_ptr<PgSQL_QC_entry_t> pgsql_qc_entry = GloPgQC->get(
client_myds->myconn->userinfo->hash,
@ -5855,6 +5857,25 @@ void PgSQL_Session::handle_post_sync_error(PGSQL_ERROR_CODES errcode, const char
status = WAITING_CLIENT_DATA;
}
void PgSQL_Session::handle_post_sync_locked_on_hostgroup_error(const char* query, int query_len) {
client_myds->DSS = STATE_QUERY_SENT_NET;
int l = query_len;
char* end = (char*)"";
if (l > 256) {
l = 253;
end = (char*)"...";
}
std::string nqn = string(query, l); // truncate string to 253 characters
const char* err_msg = "Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s";
char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64);
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION,
false, true);
free(buf);
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
RequestEnd(NULL);
}
int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg) {
PROXY_TRACE();
thread->status_variables.stvar[st_var_frontend_stmt_prepare]++;
@ -5899,7 +5920,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
auto parse_pkt = parse_msg->detach(); // detach the packet from the parse message
// setting 'prepared' to prevent fetching results from the cache if the digest matches
bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE);
bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE);
if (handled_in_handler == true)
// no need to release parse_pkt, it has been released in handler
return 0;
@ -5913,22 +5934,8 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
}
if (locked_on_hostgroup >= 0) {
if (current_hostgroup != locked_on_hostgroup) {
client_myds->DSS = STATE_QUERY_SENT_NET;
int l = CurrentQuery.QueryLength;
char* end = (char*)"";
if (l > 256) {
l = 253;
end = (char*)"...";
}
string nqn = string((char*)CurrentQuery.QueryPointer, l);
char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s";
char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64);
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION,
false, true);
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
handle_post_sync_locked_on_hostgroup_error((const char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength);
RequestEnd(NULL);
free(buf);
l_free(parse_pkt.size, parse_pkt.ptr);
return 2;
}
@ -5968,7 +5975,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
uint64_t hash = local_stmts->compute_hash(
client_myds->myconn->userinfo->username,
client_myds->myconn->userinfo->dbname,
(char*)CurrentQuery.QueryPointer,
(const char*)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
CurrentQuery.extended_query_info.parse_param_types
);
@ -5979,21 +5986,14 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
if (stmt_info) {
local_stmts->client_insert(stmt_info->statement_id, stmt_name);
extended_query_info.stmt_global_id = stmt_info->statement_id;
GloPgStmt->unlock();
client_myds->setDSS_STATE_QUERY_SENT_NET();
char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I';
bool send_ready_packet = extended_query_frame.empty();
client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state);
LogQuery(nullptr);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
CurrentQuery.end_time = thread->curtime;
//CurrentQuery.end();
//LogQuery(nullptr);
//CurrentQuery.end_time = thread->curtime;
RequestEnd(NULL);
GloPgStmt->unlock();
l_free(parse_pkt.size, parse_pkt.ptr);
return 0;
}
@ -6043,9 +6043,11 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
}
if (extended_query_frame.empty() == false) {
// peeking next message in the extended query frame
// Peeking next message in the extended query frame
// Assuming the client follows correct Bind/Describe/Execute order,
// we can skip this Describe message, as libpq's PQsendQueryPrepared already sends it.
if (std::holds_alternative<std::unique_ptr<PgSQL_Execute_Message>>(extended_query_frame.front())) {
return 0; // assuming client is sending Bind/Describe/Execute in the correct order, so libpq alreay sends describe message
return 0;
}
}
@ -6094,7 +6096,6 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint);
}
qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery);
assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo
if (qpo->max_lag_ms >= 0) {
@ -6119,11 +6120,9 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
client_myds->myprot.generate_describe_completion_packet(true, send_ready_packet, stmt_info->stmt_metadata,
extended_query_info.stmt_type, txn_state);
stmt_info->unlock();
LogQuery(NULL);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
CurrentQuery.end_time = thread->curtime;
CurrentQuery.end();
//LogQuery(NULL);
//CurrentQuery.end_time = thread->curtime;
RequestEnd(NULL);
return 0;
}
stmt_info->unlock();
@ -6131,7 +6130,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
auto describe_pkt = describe_msg->detach(); // detach the packet from the describe message
// setting 'prepared' to prevent fetching results from the cache if the digest matches
bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&describe_pkt,
bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&describe_pkt,
&lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE);
if (handled_in_handler == true) {
// no need to free describe_pkt, it is already freed in the handler
@ -6147,22 +6146,9 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
}
if (locked_on_hostgroup >= 0) {
if (current_hostgroup != locked_on_hostgroup) {
client_myds->DSS = STATE_QUERY_SENT_NET;
int l = CurrentQuery.QueryLength;
char* end = (char*)"";
if (l > 256) {
l = 253;
end = (char*)"...";
}
string nqn = string((char*)CurrentQuery.QueryPointer, l);
char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s";
char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64);
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION,
false, true);
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
handle_post_sync_locked_on_hostgroup_error(CurrentQuery.extended_query_info.stmt_info->query,
CurrentQuery.extended_query_info.stmt_info->query_length);
RequestEnd(NULL);
free(buf);
l_free(describe_pkt.size, describe_pkt.ptr);
return 2;
}
@ -6291,6 +6277,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false);
return 2;
}
PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info;
extended_query_info.stmt_client_portal_name = portal_name;
extended_query_info.stmt_client_name = stmt_client_name;
@ -6305,9 +6292,16 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint);
}
qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery);
assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo
// required for SET statement parsing
CurrentQuery.QueryPointer = (unsigned char*)stmt_info->query;
CurrentQuery.QueryLength = stmt_info->query_length;
CurrentQuery.QueryParserArgs.digest = stmt_info->digest;
CurrentQuery.QueryParserArgs.digest_text = stmt_info->digest_text ? strdup(stmt_info->digest_text) : nullptr;
CurrentQuery.QueryParserArgs.first_comment = stmt_info->first_comment ? strdup(stmt_info->first_comment) : nullptr;
//
if (qpo->max_lag_ms >= 0) {
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
}
@ -6320,7 +6314,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
auto execute_pkt = execute_msg->detach(); // detach the packet from the describe message
// setting 'prepared' to prevent fetching results from the cache if the digest matches
bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&execute_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_EXECUTE);
bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&execute_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_EXECUTE);
if (handled_in_handler == true) {
// no need to free execute_pkt, it is already freed in the handler
return 0;
@ -6335,22 +6329,9 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
}
if (locked_on_hostgroup >= 0) {
if (current_hostgroup != locked_on_hostgroup) {
client_myds->DSS = STATE_QUERY_SENT_NET;
int l = CurrentQuery.QueryLength;
char* end = (char*)"";
if (l > 256) {
l = 253;
end = (char*)"...";
}
string nqn = string((char*)CurrentQuery.QueryPointer, l);
char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s";
char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64);
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION,
false, true);
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
handle_post_sync_locked_on_hostgroup_error(CurrentQuery.extended_query_info.stmt_info->query,
CurrentQuery.extended_query_info.stmt_info->query_length);
RequestEnd(NULL);
free(buf);
l_free(execute_pkt.size, execute_pkt.ptr);
return 2;
}

Loading…
Cancel
Save