diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index e8a89870e..f44303e8b 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -1091,7 +1091,7 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() { void PgSQL_Session::handler_again___new_thread_to_cancel_query() { PgSQL_Data_Stream* myds = mybe->server_myds; - if (myds->myconn) { + if (myds->myconn) { if (myds->killed_at == 0) { myds->wait_until = 0; myds->killed_at = thread->curtime; @@ -3674,32 +3674,116 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt) { - // the query was rewritten - l_free(pkt->size, pkt->ptr); // free old pkt - // allocate new pkt - timespec begint; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); + if (pkt->size == 0) + return; // nothing to rewrite + + const char msg_type = *((char*)pkt->ptr); + bool stats_enabled = thread->variables.stats_time_query_processor; + + auto start_timer = [&]() -> timespec { + timespec t{}; + if (stats_enabled) + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t); + return t; + }; + + auto stop_timer = [&](const timespec& begin) { + if (!stats_enabled) return; + timespec end{}; + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &end); + thread->status_variables.stvar[st_var_query_processor_time] += + (end.tv_sec * 1000000000 + end.tv_nsec) - + (begin.tv_sec * 1000000000 + begin.tv_nsec); + }; + + if (msg_type == 'Q') { + // Free old packet before building new one + l_free(pkt->size, pkt->ptr); + + timespec begint = start_timer(); + + PG_pkt pgpkt(1 + 4 + qpo->new_query->length() + 1); + pgpkt.put_char('Q'); + pgpkt.put_uint32(4 + qpo->new_query->length() + 1); + pgpkt.put_bytes(qpo->new_query->data(), qpo->new_query->length()); + pgpkt.put_char('\0'); + + auto buff = pgpkt.detach(); + pkt->ptr = buff.first; + pkt->size = buff.second; + + CurrentQuery.query_parser_free(); + CurrentQuery.begin(reinterpret_cast(pkt->ptr), pkt->size, true); + + delete qpo->new_query; + stop_timer(begint); + return; } - PG_pkt pgpkt(1 + 4 + qpo->new_query->length() + 1); - pgpkt.put_char('Q'); - pgpkt.put_uint32(4 + qpo->new_query->length() + 1); - pgpkt.put_bytes(qpo->new_query->data(), qpo->new_query->length()); - pgpkt.put_char('\0'); - auto buff = pgpkt.detach(); - pkt->ptr = buff.first; - pkt->size = buff.second; - CurrentQuery.query_parser_free(); - CurrentQuery.begin((unsigned char*)pkt->ptr, pkt->size, true); - delete qpo->new_query; - timespec endt; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); - thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + - (endt.tv_sec * 1000000000 + endt.tv_nsec) - - (begint.tv_sec * 1000000000 + begint.tv_nsec); + if (msg_type == 'P') { + timespec begint = start_timer(); + + // Parse the original packet before rewriting + PgSQL_Parse_Message orig_parse_msg; + if (!orig_parse_msg.parse(*pkt)) { + assert(0); // should never happen + } + const auto& orig_data = orig_parse_msg.data(); + + unsigned int new_query_size = qpo->new_query->length(); + unsigned int old_query_size = strlen(orig_data.query_string); + unsigned int new_pkt_size = orig_parse_msg.get_raw_pkt().size + (new_query_size - old_query_size); + + PG_pkt pgpkt(new_pkt_size); + pgpkt.put_char('P'); + pgpkt.put_uint32(new_pkt_size - 1); + pgpkt.put_string(orig_data.stmt_name); + pgpkt.put_string(qpo->new_query->c_str()); + pgpkt.put_uint16(orig_data.num_param_types); + if (orig_data.num_param_types) { + pgpkt.put_bytes(orig_data.param_types_start_ptr, + orig_data.num_param_types * sizeof(uint32_t)); + } + + auto buff = pgpkt.detach(); + pkt->ptr = buff.first; + pkt->size = buff.second; + + delete qpo->new_query; + + // Parse the new rewritten packet + PgSQL_Parse_Message new_parse_msg; + if (!new_parse_msg.parse(*pkt)) { + assert(0); // should never happen + } + const auto& new_data = new_parse_msg.data(); + + CurrentQuery.query_parser_free(); + CurrentQuery.begin((unsigned char*)new_data.query_string, + strlen(new_data.query_string) + 1, false); + CurrentQuery.extended_query_info.stmt_client_name = new_data.stmt_name; + + if (new_data.num_param_types > 0) { + Parse_Param_Types parse_param_type(new_data.num_param_types); + auto param_type_reader = new_parse_msg.get_param_types_reader(); + for (uint16_t i = 0; i < new_data.num_param_types; ++i) { + if (!param_type_reader.next(&parse_param_type[i])) { + proxy_error("Failed to read result format at index %u\n", i); + assert(0); + } + } + CurrentQuery.extended_query_info.parse_param_types = std::move(parse_param_type); + } + + // parse() takes ownership of the packet, so pkt is replaced here + *pkt = new_parse_msg.detach(); + + stop_timer(begint); + return; } + + // Should never happen + assert(0); } // this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo