Added support to convert EOF packets into OK packets for 'Query_Cache' entries

pull/3123/head
Javier Jaramago Fernández 6 years ago
parent a1e230029e
commit 79e3d7a1b7

@ -90,7 +90,7 @@ class Query_Cache {
void print_version();
bool set(uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp, uint32_t vl, unsigned long long create_ms, unsigned long long curtime_ms, unsigned long long expire_ms, bool deprecate_eof_active);
bool set(uint64_t , const unsigned char *, uint32_t, unsigned char *, uint32_t, unsigned long long, unsigned long long, unsigned long long);
unsigned char * get(uint64_t , const unsigned char *, const uint32_t, uint32_t *, unsigned long long, unsigned long long);
unsigned char * get(uint64_t , const unsigned char *, const uint32_t, uint32_t *, unsigned long long, unsigned long long, bool deprecate_eof_active);
uint64_t flush();
SQLite3_result * SQL3_getStats();
};

@ -5714,6 +5714,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
if (qpo->cache_ttl>0) {
bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF;
uint32_t resbuf=0;
unsigned char *aa=GloQC->get(
client_myds->myconn->userinfo->hash,
@ -5721,7 +5722,8 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
CurrentQuery.QueryLength ,
&resbuf ,
thread->curtime/1000 ,
qpo->cache_ttl
qpo->cache_ttl,
deprecate_eof_active
);
if (aa) {
client_myds->buffer2resultset(aa,resbuf);

@ -464,7 +464,159 @@ Query_Cache::~Query_Cache() {
}
};
unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, const uint32_t kl, uint32_t *lv, unsigned long long curtime_ms, unsigned long long cache_ttl) {
const int eof_to_ok_dif = static_cast<const int>(- (sizeof(mysql_hdr) + 5) + 2);
const int ok_to_eof_dif = static_cast<const int>(+ (sizeof(mysql_hdr) + 5) - 2);
/**
* @brief Converts a 'EOF_Packet' to holded inside a 'QC_entry_t' into a 'OK_Packet'.
* Warning: This function assumes that the supplied 'QC_entry_t' holds a valid
* 'EOF_Packet'.
*
* @param entry The 'QC_entry_t' holding a 'OK_Packet' to be converted into
* a 'EOF_Packet'.
* @return The converted packet.
*/
unsigned char* eof_to_ok_packet(QC_entry_t* entry) {
unsigned char* result = (unsigned char*)malloc(entry->length + eof_to_ok_dif);
unsigned char* vp = result;
char* it = entry->value;
// Copy until the first EOF
memcpy(vp, entry->value, entry->column_eof_pkt_offset);
it += entry->column_eof_pkt_offset;
vp += entry->column_eof_pkt_offset;
// Skip the first EOF after columns def
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + hdr.pkt_length;
// Copy all the rows
uint64_t u_entry_val = reinterpret_cast<uint64_t>(entry->value);
uint64_t u_it_pos = reinterpret_cast<uint64_t>(it);
uint64_t rows_length = (u_entry_val + entry->row_eof_pkt_offset) - u_it_pos;
memcpy(vp, it, rows_length);
vp += rows_length;
it += rows_length;
// Replace final EOF in favor of OK packet
// =======================================
// Copy the mysql header
memcpy(&hdr, it, sizeof(mysql_hdr));
hdr.pkt_length = 7;
memcpy(vp, &hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
it += sizeof(mysql_hdr);
// OK packet header
*vp = 0xfe;
vp++;
it++;
// Initialize affected_rows and last_insert_id to zero
memset(vp, 0, 2);
vp += 2;
// Copy the warning an status flags
memcpy(vp, it, 4);
// =======================================
return result;
}
/**
* @brief Converts a 'OK_Packet' holded inside 'QC_entry_t' into a 'EOF_Packet'.
* Warning: This function assumes that the supplied 'QC_entry_t' holds a valid
* 'OK_Packet'.
*
* @param entry The 'QC_entry_t' holding a 'EOF_Packet' to be converted into
* a 'OK_Packet'.
* @return The converted packet.
*/
unsigned char* ok_to_eof_packet(QC_entry_t* entry) {
unsigned char* result = (unsigned char*)malloc(entry->length + ok_to_eof_dif);
unsigned char* vp = result;
char* it = entry->value;
// Extract warning flags and status from 'OK_packet'
char* ok_packet = it + entry->ok_pkt_offset;
mysql_hdr ok_hdr;
memcpy(&ok_hdr, ok_packet, sizeof(mysql_hdr));
ok_packet += sizeof(mysql_hdr);
// Skipt the 'affected_rows' and 'last_insert_id'
ok_packet += 2;
uint16_t status_flags = *reinterpret_cast<uint16_t*>(ok_packet);
ok_packet += 2;
uint16_t warnings = *reinterpret_cast<uint16_t*>(ok_packet);
// Find the spot in which the first EOF needs to be placed
it += sizeof(mysql_hdr);
uint64_t c_count = 0;
int c_count_len = mysql_decode_length(reinterpret_cast<unsigned char*>(it), &c_count);
it += c_count_len;
mysql_hdr column_hdr;
for (uint64_t i = 0; i < c_count; i++) {
memcpy(&column_hdr, it ,sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + column_hdr.pkt_length;
}
// Location for 'column_eof'
uint64_t column_eof_offset =
reinterpret_cast<unsigned char*>(it) -
reinterpret_cast<unsigned char*>(entry->value);
memcpy(vp, entry->value, column_eof_offset);
vp += column_eof_offset;
// Write 'column_eof_packet' header
column_hdr.pkt_id = column_hdr.pkt_id + 1;
column_hdr.pkt_length = 5;
memcpy(vp, &column_hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
// Write 'column_eof_packet' contents
*vp = 0xfe;
vp++;
*reinterpret_cast<uint16_t*>(vp) = warnings;
vp += 2;
*reinterpret_cast<uint16_t*>(vp) = status_flags;
vp += 2;
// Find the OK packet
for (;;) {
mysql_hdr hdr;
memcpy(&hdr, it ,sizeof(mysql_hdr));
unsigned char* payload =
reinterpret_cast<unsigned char*>(it) +
sizeof(mysql_hdr);
if (hdr.pkt_length < 9 && *payload == 0xfe) {
mysql_hdr ok_hdr;
ok_hdr.pkt_id = hdr.pkt_id + 1;
ok_hdr.pkt_length = 5;
memcpy(vp, &ok_hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
*vp = 0xfe;
vp++;
*reinterpret_cast<uint16_t*>(vp) = warnings;
vp += 2;
*reinterpret_cast<uint16_t*>(vp) = status_flags;
break;
} else {
// Increment the package id by one due to 'column_eof_packet'
hdr.pkt_id += 1;
memcpy(vp, &hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
it += sizeof(mysql_hdr);
memcpy(vp, it, hdr.pkt_length);
vp += hdr.pkt_length;
it += hdr.pkt_length;
}
}
return result;
}
unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, const uint32_t kl, uint32_t *lv, unsigned long long curtime_ms, unsigned long long cache_ttl, bool deprecate_eof_active) {
unsigned char *result=NULL;
uint64_t hk=SpookyHash::Hash64(kp, kl, user_hash);
@ -477,9 +629,19 @@ unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, co
if (entry->expire_ms > t && entry->create_ms + cache_ttl > t) {
THR_UPDATE_CNT(__thr_cntGetOK,Glo_cntGetOK,1,1);
THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT,entry->length,1);
result=(unsigned char *)malloc(entry->length);
memcpy(result,entry->value,entry->length);
*lv=entry->length;
if (deprecate_eof_active && entry->column_eof_pkt_offset) {
result = eof_to_ok_packet(entry);
*lv = entry->length + eof_to_ok_dif;
} else if (!deprecate_eof_active && entry->ok_pkt_offset){
result = ok_to_eof_packet(entry);
*lv = entry->length + ok_to_eof_dif;
} else {
result = (unsigned char *)malloc(entry->length);
memcpy(result, entry->value, entry->length);
*lv = entry->length;
}
if (t > entry->access_ms) entry->access_ms=t;
}
__sync_fetch_and_sub(&entry->ref_count,1);

Loading…
Cancel
Save