Add Query Rewrite support for Extended Query Protocol

pull/5044/head
Rahim Kanji 9 months ago
parent dd1d231d51
commit 0bb5747291

@ -1091,7 +1091,7 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() {
void PgSQL_Session::handler_again___new_thread_to_cancel_query() {
PgSQL_Data_Stream* myds = mybe->server_myds;
if (myds->myconn) {
if (myds->myconn) {
if (myds->killed_at == 0) {
myds->wait_until = 0;
myds->killed_at = thread->curtime;
@ -3674,32 +3674,116 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo
void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt) {
// the query was rewritten
l_free(pkt->size, pkt->ptr); // free old pkt
// allocate new pkt
timespec begint;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint);
if (pkt->size == 0)
return; // nothing to rewrite
const char msg_type = *((char*)pkt->ptr);
bool stats_enabled = thread->variables.stats_time_query_processor;
auto start_timer = [&]() -> timespec {
timespec t{};
if (stats_enabled)
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t);
return t;
};
auto stop_timer = [&](const timespec& begin) {
if (!stats_enabled) return;
timespec end{};
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &end);
thread->status_variables.stvar[st_var_query_processor_time] +=
(end.tv_sec * 1000000000 + end.tv_nsec) -
(begin.tv_sec * 1000000000 + begin.tv_nsec);
};
if (msg_type == 'Q') {
// Free old packet before building new one
l_free(pkt->size, pkt->ptr);
timespec begint = start_timer();
PG_pkt pgpkt(1 + 4 + qpo->new_query->length() + 1);
pgpkt.put_char('Q');
pgpkt.put_uint32(4 + qpo->new_query->length() + 1);
pgpkt.put_bytes(qpo->new_query->data(), qpo->new_query->length());
pgpkt.put_char('\0');
auto buff = pgpkt.detach();
pkt->ptr = buff.first;
pkt->size = buff.second;
CurrentQuery.query_parser_free();
CurrentQuery.begin(reinterpret_cast<unsigned char*>(pkt->ptr), pkt->size, true);
delete qpo->new_query;
stop_timer(begint);
return;
}
PG_pkt pgpkt(1 + 4 + qpo->new_query->length() + 1);
pgpkt.put_char('Q');
pgpkt.put_uint32(4 + qpo->new_query->length() + 1);
pgpkt.put_bytes(qpo->new_query->data(), qpo->new_query->length());
pgpkt.put_char('\0');
auto buff = pgpkt.detach();
pkt->ptr = buff.first;
pkt->size = buff.second;
CurrentQuery.query_parser_free();
CurrentQuery.begin((unsigned char*)pkt->ptr, pkt->size, true);
delete qpo->new_query;
timespec endt;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt);
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
(endt.tv_sec * 1000000000 + endt.tv_nsec) -
(begint.tv_sec * 1000000000 + begint.tv_nsec);
if (msg_type == 'P') {
timespec begint = start_timer();
// Parse the original packet before rewriting
PgSQL_Parse_Message orig_parse_msg;
if (!orig_parse_msg.parse(*pkt)) {
assert(0); // should never happen
}
const auto& orig_data = orig_parse_msg.data();
unsigned int new_query_size = qpo->new_query->length();
unsigned int old_query_size = strlen(orig_data.query_string);
unsigned int new_pkt_size = orig_parse_msg.get_raw_pkt().size + (new_query_size - old_query_size);
PG_pkt pgpkt(new_pkt_size);
pgpkt.put_char('P');
pgpkt.put_uint32(new_pkt_size - 1);
pgpkt.put_string(orig_data.stmt_name);
pgpkt.put_string(qpo->new_query->c_str());
pgpkt.put_uint16(orig_data.num_param_types);
if (orig_data.num_param_types) {
pgpkt.put_bytes(orig_data.param_types_start_ptr,
orig_data.num_param_types * sizeof(uint32_t));
}
auto buff = pgpkt.detach();
pkt->ptr = buff.first;
pkt->size = buff.second;
delete qpo->new_query;
// Parse the new rewritten packet
PgSQL_Parse_Message new_parse_msg;
if (!new_parse_msg.parse(*pkt)) {
assert(0); // should never happen
}
const auto& new_data = new_parse_msg.data();
CurrentQuery.query_parser_free();
CurrentQuery.begin((unsigned char*)new_data.query_string,
strlen(new_data.query_string) + 1, false);
CurrentQuery.extended_query_info.stmt_client_name = new_data.stmt_name;
if (new_data.num_param_types > 0) {
Parse_Param_Types parse_param_type(new_data.num_param_types);
auto param_type_reader = new_parse_msg.get_param_types_reader();
for (uint16_t i = 0; i < new_data.num_param_types; ++i) {
if (!param_type_reader.next(&parse_param_type[i])) {
proxy_error("Failed to read result format at index %u\n", i);
assert(0);
}
}
CurrentQuery.extended_query_info.parse_param_types = std::move(parse_param_type);
}
// parse() takes ownership of the packet, so pkt is replaced here
*pkt = new_parse_msg.detach();
stop_timer(begint);
return;
}
// Should never happen
assert(0);
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo

Loading…
Cancel
Save