From 02eb58db79969febc4e4c0d689b16e3c5ccd5b82 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Sun, 3 Aug 2025 17:39:28 +0500 Subject: [PATCH] Added SET statement tracking support --- include/PgSQL_Session.h | 5 +- lib/PgSQL_Logger.cpp | 4 +- lib/PgSQL_Query_Processor.cpp | 9 +- lib/PgSQL_Session.cpp | 171 +++++++++++++++------------------- 4 files changed, 85 insertions(+), 104 deletions(-) diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index d6e6f291b..fa3f7d42c 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -235,9 +235,9 @@ private: void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PROCESS_KILL(PtrSize_t*); #endif - bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t*, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type = PGSQL_EXTENDED_QUERY_TYPE_NOT_SET); - void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection(); + bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t*, bool* lock_hostgroup, + PgSQL_Extended_Query_Type stmt_type = PGSQL_EXTENDED_QUERY_TYPE_NOT_SET); bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(PtrSize_t& pkt); bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_DESCRIBE(PtrSize_t& pkt); bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_CLOSE(PtrSize_t& pkt); @@ -253,6 +253,7 @@ private: int handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg); int handle_post_sync_execute_message(PgSQL_Execute_Message* execute_msg); void handle_post_sync_error(PGSQL_ERROR_CODES errcode, const char* errmsg, bool fatal); + void handle_post_sync_locked_on_hostgroup_error(const char* query, int query_len); void reset_extended_query_frame(); diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index 59a7eae51..e5f2dc71d 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -20,7 +20,7 @@ using json = nlohmann::json; #else #define DEB "" #endif /* DEBUG */ -#define PROXYSQL_MYSQL_LOGGER_VERSION "2.5.0421" DEB +#define PROXYSQL_PGSQL_LOGGER_VERSION "2.5.0421" DEB extern PgSQL_Logger *GloPgSQL_Logger; @@ -1042,6 +1042,6 @@ unsigned int PgSQL_Logger::audit_find_next_id() { } void PgSQL_Logger::print_version() { - fprintf(stderr,"Standard ProxySQL MySQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); + fprintf(stderr,"Standard ProxySQL PgSQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); }; diff --git a/lib/PgSQL_Query_Processor.cpp b/lib/PgSQL_Query_Processor.cpp index 03136160d..33028cef8 100644 --- a/lib/PgSQL_Query_Processor.cpp +++ b/lib/PgSQL_Query_Processor.cpp @@ -274,7 +274,7 @@ PgSQL_Query_Processor_Output* PgSQL_Query_Processor::process_query(PgSQL_Session if (qi) { // NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE or STMT_DESCRIBE if (ptr) { - qp = (SQP_par_t*)&qi->QueryParserArgs; + qp = &qi->QueryParserArgs; } else { qp = &stmt_exec_qp; qp->digest = qi->extended_query_info.stmt_info->digest; @@ -296,10 +296,9 @@ PgSQL_Query_Processor_Output* PgSQL_Query_Processor::process_query(PgSQL_Session } memcpy(query, ptr, len); query[len-1] = 0; - } - else { - //query = qi->stmt_info->query; - //len = qi->stmt_info->query_length; + } else { + query = qi->extended_query_info.stmt_info->query; + len = qi->extended_query_info.stmt_info->query_length; } Query_Processor::process_query(sess, ptr == NULL, query, len, ret, qp); diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index f51a2f707..8efeb8424 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -2341,7 +2341,7 @@ __get_pkts_from_client: if (qpo->max_lag_ms >= 0) { thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; } - rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup); + rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&pkt, &lock_hostgroup); if (mirror == false && rc_break == false) { if (pgsql_thread___automatic_detect_sqli) { if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_detect_SQLi()) { @@ -2796,10 +2796,11 @@ int PgSQL_Session::RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) { case PROCESSING_STMT_EXECUTE: assert(CurrentQuery.extended_query_info.stmt_backend_id); { - PgSQL_Extended_Query_Type type = (status == PROCESSING_STMT_DESCRIBE) ? PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE : PGSQL_EXTENDED_QUERY_TYPE_EXECUTE; - const std::string& backend_stmt_name = std::string(PROXYSQL_PS_PREFIX) + std::to_string(CurrentQuery.extended_query_info.stmt_backend_id); - rc = myconn->async_query(myds->revents, (char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, - backend_stmt_name.c_str(), type, &CurrentQuery.extended_query_info); + PgSQL_Extended_Query_Type type = + (status == PROCESSING_STMT_DESCRIBE) ? PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE : PGSQL_EXTENDED_QUERY_TYPE_EXECUTE; + const std::string& backend_stmt_name = + std::string(PROXYSQL_PS_PREFIX) + std::to_string(CurrentQuery.extended_query_info.stmt_backend_id); + rc = myconn->async_query(myds->revents, nullptr, 0, backend_stmt_name.c_str(), type, &CurrentQuery.extended_query_info); // Handle edge case: Since libpq automatically sends a Sync after Execute, // the Bind message is no longer pending on the backend. We must reset @@ -3109,13 +3110,15 @@ handler_again: proxy_error("Session %p, status %d, CurrentQuery.stmt_info is NULL\n", this, status); assert(0); } - CurrentQuery.QueryLength = CurrentQuery.extended_query_info.stmt_info->query_length; - CurrentQuery.QueryPointer = (unsigned char*)CurrentQuery.extended_query_info.stmt_info->query; - // NOTE: Update 'first_comment' with the 'first_comment' from the retrieved - // 'stmt_info' from the found prepared statement. 'CurrentQuery' requires its - // own copy of 'first_comment' because it will later be free by 'QueryInfo::end'. - if (CurrentQuery.extended_query_info.stmt_info->first_comment) { - CurrentQuery.QueryParserArgs.first_comment = strdup(CurrentQuery.extended_query_info.stmt_info->first_comment); + if (status == PROCESSING_STMT_DESCRIBE) { + CurrentQuery.QueryLength = CurrentQuery.extended_query_info.stmt_info->query_length; + CurrentQuery.QueryPointer = (unsigned char*)CurrentQuery.extended_query_info.stmt_info->query; + // NOTE: Update 'first_comment' with the 'first_comment' from the retrieved + // 'stmt_info' from the found prepared statement. 'CurrentQuery' requires its + // own copy of 'first_comment' because it will later be free by 'QueryInfo::end'. + if (CurrentQuery.extended_query_info.stmt_info->first_comment) { + CurrentQuery.QueryParserArgs.first_comment = strdup(CurrentQuery.extended_query_info.stmt_info->first_comment); + } } if (CurrentQuery.extended_query_info.stmt_info->parse_param_types.empty() == false) { CurrentQuery.extended_query_info.parse_param_types = CurrentQuery.extended_query_info.stmt_info->parse_param_types; @@ -3893,7 +3896,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } #endif -// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt) { // the query was rewritten l_free(pkt->size, pkt->ptr); // free old pkt @@ -3923,7 +3926,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt) { } } -// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo void PgSQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t* pkt) { client_myds->DSS = STATE_QUERY_SENT_NET; @@ -3934,7 +3937,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t* pkt) { l_free(pkt->size, pkt->ptr); } -// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo void PgSQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t* pkt) { client_myds->DSS = STATE_QUERY_SENT_NET; client_myds->myprot.generate_error_packet(true, true, qpo->error_msg, @@ -3943,7 +3946,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t* pkt) { l_free(pkt->size, pkt->ptr); } -// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo void PgSQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt) { // ER_NET_PACKET_TOO_LARGE client_myds->DSS = STATE_QUERY_SENT_NET; @@ -3953,7 +3956,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt) { l_free(pkt->size, pkt->ptr); } -bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t* pkt, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type) { +bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t* pkt, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type) { /* lock_hostgroup: If this variable is set to true, this session will get lock to a @@ -3985,11 +3988,6 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C return true; } - if (stmt_type == PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE || - stmt_type == PGSQL_EXTENDED_QUERY_TYPE_EXECUTE) { // for Describe and Execute we exit here - goto __exit_set_destination_hostgroup; - } - // Check if the session is not locked on a hostgroup and there are untracked option parameters if (locked_on_hostgroup < 0 && untracked_option_parameters.empty() == false) { if (client_myds && client_myds->addr.addr) { @@ -4007,6 +4005,11 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C return false; } + if (stmt_type == PGSQL_EXTENDED_QUERY_TYPE_PARSE || + stmt_type == PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE) { // for Parse and Describe we exit here + goto __exit_set_destination_hostgroup; + } + // handle here #509, #815 and #816 if (CurrentQuery.QueryParserArgs.digest_text) { char* dig = CurrentQuery.QueryParserArgs.digest_text; @@ -4129,9 +4132,9 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C client_myds->DSS = STATE_QUERY_SENT_NET; client_myds->myprot.generate_error_packet(true, true, errmsg, PGSQL_ERROR_CODES::ERRCODE_INVALID_PARAMETER_VALUE, false, true); - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; free(errmsg); + RequestEnd(NULL); + l_free(pkt->size, pkt->ptr); return true; } @@ -4603,8 +4606,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C if (mirror == false) { RequestEnd(NULL); - } - else { + } else { client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; } @@ -4762,7 +4764,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C return true; } //} - if (qpo->cache_ttl > 0 && ((stmt_type & PGSQL_EXTENDED_QUERY_TYPE_PARSE) == 0)) { + if (qpo->cache_ttl > 0 && ((stmt_type & PGSQL_EXTENDED_QUERY_TYPE_EXECUTE) == 0)) { const std::shared_ptr pgsql_qc_entry = GloPgQC->get( client_myds->myconn->userinfo->hash, @@ -5855,6 +5857,25 @@ void PgSQL_Session::handle_post_sync_error(PGSQL_ERROR_CODES errcode, const char status = WAITING_CLIENT_DATA; } +void PgSQL_Session::handle_post_sync_locked_on_hostgroup_error(const char* query, int query_len) { + client_myds->DSS = STATE_QUERY_SENT_NET; + int l = query_len; + char* end = (char*)""; + if (l > 256) { + l = 253; + end = (char*)"..."; + } + std::string nqn = string(query, l); // truncate string to 253 characters + const char* err_msg = "Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s"; + char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64); + sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); + client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, + false, true); + free(buf); + thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; + RequestEnd(NULL); +} + int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg) { PROXY_TRACE(); thread->status_variables.stvar[st_var_frontend_stmt_prepare]++; @@ -5899,7 +5920,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg auto parse_pkt = parse_msg->detach(); // detach the packet from the parse message // setting 'prepared' to prevent fetching results from the cache if the digest matches - bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE); + bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE); if (handled_in_handler == true) // no need to release parse_pkt, it has been released in handler return 0; @@ -5913,22 +5934,8 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg } if (locked_on_hostgroup >= 0) { if (current_hostgroup != locked_on_hostgroup) { - client_myds->DSS = STATE_QUERY_SENT_NET; - int l = CurrentQuery.QueryLength; - char* end = (char*)""; - if (l > 256) { - l = 253; - end = (char*)"..."; - } - string nqn = string((char*)CurrentQuery.QueryPointer, l); - char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s"; - char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64); - sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); - client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, - false, true); - thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; + handle_post_sync_locked_on_hostgroup_error((const char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); RequestEnd(NULL); - free(buf); l_free(parse_pkt.size, parse_pkt.ptr); return 2; } @@ -5968,7 +5975,7 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg uint64_t hash = local_stmts->compute_hash( client_myds->myconn->userinfo->username, client_myds->myconn->userinfo->dbname, - (char*)CurrentQuery.QueryPointer, + (const char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, CurrentQuery.extended_query_info.parse_param_types ); @@ -5979,21 +5986,14 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg if (stmt_info) { local_stmts->client_insert(stmt_info->statement_id, stmt_name); extended_query_info.stmt_global_id = stmt_info->statement_id; - + GloPgStmt->unlock(); client_myds->setDSS_STATE_QUERY_SENT_NET(); - char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I'; bool send_ready_packet = extended_query_frame.empty(); - client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state); - LogQuery(nullptr); - - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; - CurrentQuery.end_time = thread->curtime; - //CurrentQuery.end(); + //LogQuery(nullptr); + //CurrentQuery.end_time = thread->curtime; RequestEnd(NULL); - GloPgStmt->unlock(); l_free(parse_pkt.size, parse_pkt.ptr); return 0; } @@ -6043,9 +6043,11 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des } if (extended_query_frame.empty() == false) { - // peeking next message in the extended query frame + // Peeking next message in the extended query frame + // Assuming the client follows correct Bind/Describe/Execute order, + // we can skip this Describe message, as libpq's PQsendQueryPrepared already sends it. if (std::holds_alternative>(extended_query_frame.front())) { - return 0; // assuming client is sending Bind/Describe/Execute in the correct order, so libpq alreay sends describe message + return 0; } } @@ -6094,7 +6096,6 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); } qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery); - assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo if (qpo->max_lag_ms >= 0) { @@ -6119,11 +6120,9 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des client_myds->myprot.generate_describe_completion_packet(true, send_ready_packet, stmt_info->stmt_metadata, extended_query_info.stmt_type, txn_state); stmt_info->unlock(); - LogQuery(NULL); - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; - CurrentQuery.end_time = thread->curtime; - CurrentQuery.end(); + //LogQuery(NULL); + //CurrentQuery.end_time = thread->curtime; + RequestEnd(NULL); return 0; } stmt_info->unlock(); @@ -6131,7 +6130,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des auto describe_pkt = describe_msg->detach(); // detach the packet from the describe message // setting 'prepared' to prevent fetching results from the cache if the digest matches - bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&describe_pkt, + bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&describe_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE); if (handled_in_handler == true) { // no need to free describe_pkt, it is already freed in the handler @@ -6147,22 +6146,9 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des } if (locked_on_hostgroup >= 0) { if (current_hostgroup != locked_on_hostgroup) { - client_myds->DSS = STATE_QUERY_SENT_NET; - int l = CurrentQuery.QueryLength; - char* end = (char*)""; - if (l > 256) { - l = 253; - end = (char*)"..."; - } - string nqn = string((char*)CurrentQuery.QueryPointer, l); - char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s"; - char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64); - sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); - client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, - false, true); - thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; + handle_post_sync_locked_on_hostgroup_error(CurrentQuery.extended_query_info.stmt_info->query, + CurrentQuery.extended_query_info.stmt_info->query_length); RequestEnd(NULL); - free(buf); l_free(describe_pkt.size, describe_pkt.ptr); return 2; } @@ -6291,6 +6277,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); return 2; } + PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; extended_query_info.stmt_client_portal_name = portal_name; extended_query_info.stmt_client_name = stmt_client_name; @@ -6305,9 +6292,16 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); } qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery); - assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo + // required for SET statement parsing + CurrentQuery.QueryPointer = (unsigned char*)stmt_info->query; + CurrentQuery.QueryLength = stmt_info->query_length; + CurrentQuery.QueryParserArgs.digest = stmt_info->digest; + CurrentQuery.QueryParserArgs.digest_text = stmt_info->digest_text ? strdup(stmt_info->digest_text) : nullptr; + CurrentQuery.QueryParserArgs.first_comment = stmt_info->first_comment ? strdup(stmt_info->first_comment) : nullptr; + // + if (qpo->max_lag_ms >= 0) { thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; } @@ -6320,7 +6314,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu auto execute_pkt = execute_msg->detach(); // detach the packet from the describe message // setting 'prepared' to prevent fetching results from the cache if the digest matches - bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&execute_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_EXECUTE); + bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&execute_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_EXECUTE); if (handled_in_handler == true) { // no need to free execute_pkt, it is already freed in the handler return 0; @@ -6335,22 +6329,9 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu } if (locked_on_hostgroup >= 0) { if (current_hostgroup != locked_on_hostgroup) { - client_myds->DSS = STATE_QUERY_SENT_NET; - int l = CurrentQuery.QueryLength; - char* end = (char*)""; - if (l > 256) { - l = 253; - end = (char*)"..."; - } - string nqn = string((char*)CurrentQuery.QueryPointer, l); - char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s%s"; - char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64); - sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); - client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, - false, true); - thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; + handle_post_sync_locked_on_hostgroup_error(CurrentQuery.extended_query_info.stmt_info->query, + CurrentQuery.extended_query_info.stmt_info->query_length); RequestEnd(NULL); - free(buf); l_free(execute_pkt.size, execute_pkt.ptr); return 2; }