|
|
|
|
@ -997,99 +997,6 @@ int PgSQL_Data_Stream::read_pkts() {
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Data_Stream::generate_compressed_packet() {
|
|
|
|
|
#define MAX_COMPRESSED_PACKET_SIZE 10*1024*1024
|
|
|
|
|
unsigned int total_size = 0;
|
|
|
|
|
unsigned int i = 0;
|
|
|
|
|
PtrSize_t* p = NULL;
|
|
|
|
|
while (i < PSarrayOUT->len && total_size < MAX_COMPRESSED_PACKET_SIZE) {
|
|
|
|
|
p = PSarrayOUT->index(i);
|
|
|
|
|
total_size += p->size;
|
|
|
|
|
i++;
|
|
|
|
|
}
|
|
|
|
|
if (i >= 2) {
|
|
|
|
|
// we successfully read at least 2 packets
|
|
|
|
|
if (total_size > MAX_COMPRESSED_PACKET_SIZE) {
|
|
|
|
|
// total_size is too big, we remove the last packet read
|
|
|
|
|
total_size -= p->size;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (total_size <= MAX_COMPRESSED_PACKET_SIZE) {
|
|
|
|
|
// this worked in the past . it applies for small packets
|
|
|
|
|
uLong sourceLen = total_size;
|
|
|
|
|
Bytef* source = (Bytef*)l_alloc(total_size);
|
|
|
|
|
uLongf destLen = total_size * 120 / 100 + 12;
|
|
|
|
|
Bytef* dest = (Bytef*)malloc(destLen);
|
|
|
|
|
i = 0;
|
|
|
|
|
total_size = 0;
|
|
|
|
|
while (total_size < sourceLen) {
|
|
|
|
|
PtrSize_t p2;
|
|
|
|
|
PSarrayOUT->remove_index(0, &p2);
|
|
|
|
|
memcpy(source + total_size, p2.ptr, p2.size);
|
|
|
|
|
total_size += p2.size;
|
|
|
|
|
l_free(p2.size, p2.ptr);
|
|
|
|
|
}
|
|
|
|
|
int rc = compress(dest, &destLen, source, sourceLen);
|
|
|
|
|
assert(rc == Z_OK);
|
|
|
|
|
l_free(total_size, source);
|
|
|
|
|
queueOUT.pkt.size = destLen + 7;
|
|
|
|
|
queueOUT.pkt.ptr = l_alloc(queueOUT.pkt.size);
|
|
|
|
|
mysql_hdr hdr;
|
|
|
|
|
hdr.pkt_length = destLen;
|
|
|
|
|
hdr.pkt_id = ++myconn->compression_pkt_id;
|
|
|
|
|
memcpy((unsigned char*)queueOUT.pkt.ptr, &hdr, sizeof(mysql_hdr));
|
|
|
|
|
hdr.pkt_length = total_size;
|
|
|
|
|
memcpy((unsigned char*)queueOUT.pkt.ptr + 4, &hdr, 3);
|
|
|
|
|
memcpy((unsigned char*)queueOUT.pkt.ptr + 7, dest, destLen);
|
|
|
|
|
free(dest);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
// if we reach here, it means we have one single packet larger than MAX_COMPRESSED_PACKET_SIZE
|
|
|
|
|
PtrSize_t p2;
|
|
|
|
|
PSarrayOUT->remove_index(0, &p2);
|
|
|
|
|
|
|
|
|
|
unsigned int len1 = MAX_COMPRESSED_PACKET_SIZE / 2;
|
|
|
|
|
unsigned int len2 = p2.size - len1;
|
|
|
|
|
uLongf destLen1;
|
|
|
|
|
uLongf destLen2;
|
|
|
|
|
Bytef* dest1;
|
|
|
|
|
Bytef* dest2;
|
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
|
|
mysql_hdr hdr;
|
|
|
|
|
|
|
|
|
|
destLen1 = len1 * 120 / 100 + 12;
|
|
|
|
|
dest1 = (Bytef*)malloc(destLen1 + 7);
|
|
|
|
|
destLen2 = len2 * 120 / 100 + 12;
|
|
|
|
|
dest2 = (Bytef*)malloc(destLen2 + 7);
|
|
|
|
|
rc = compress(dest1 + 7, &destLen1, (const unsigned char*)p2.ptr, len1);
|
|
|
|
|
assert(rc == Z_OK);
|
|
|
|
|
rc = compress(dest2 + 7, &destLen2, (const unsigned char*)p2.ptr + len1, len2);
|
|
|
|
|
assert(rc == Z_OK);
|
|
|
|
|
|
|
|
|
|
hdr.pkt_length = destLen1;
|
|
|
|
|
hdr.pkt_id = ++myconn->compression_pkt_id;
|
|
|
|
|
memcpy(dest1, &hdr, sizeof(mysql_hdr));
|
|
|
|
|
hdr.pkt_length = len1;
|
|
|
|
|
memcpy((char*)dest1 + sizeof(mysql_hdr), &hdr, 3);
|
|
|
|
|
|
|
|
|
|
hdr.pkt_length = destLen2;
|
|
|
|
|
hdr.pkt_id = ++myconn->compression_pkt_id;
|
|
|
|
|
memcpy(dest2, &hdr, sizeof(mysql_hdr));
|
|
|
|
|
hdr.pkt_length = len2;
|
|
|
|
|
memcpy((char*)dest2 + sizeof(mysql_hdr), &hdr, 3);
|
|
|
|
|
|
|
|
|
|
queueOUT.pkt.size = destLen1 + destLen2 + 7 + 7;
|
|
|
|
|
queueOUT.pkt.ptr = l_alloc(queueOUT.pkt.size);
|
|
|
|
|
memcpy((char*)queueOUT.pkt.ptr, dest1, destLen1 + 7);
|
|
|
|
|
memcpy((char*)queueOUT.pkt.ptr + destLen1 + 7, dest2, destLen2 + 7);
|
|
|
|
|
free(dest1);
|
|
|
|
|
free(dest2);
|
|
|
|
|
l_free(p2.size, p2.ptr);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int PgSQL_Data_Stream::array2buffer() {
|
|
|
|
|
int ret = 0;
|
|
|
|
|
unsigned int idx = 0;
|
|
|
|
|
@ -1113,32 +1020,18 @@ int PgSQL_Data_Stream::array2buffer() {
|
|
|
|
|
add_to_data_packet_history_without_alloc(data_packets_history_OUT, queueOUT.pkt.ptr, queueOUT.pkt.size);
|
|
|
|
|
queueOUT.pkt.ptr = NULL;
|
|
|
|
|
}
|
|
|
|
|
//VALGRIND_ENABLE_ERROR_REPORTING;
|
|
|
|
|
if (myconn->get_status(STATUS_MYSQL_CONNECTION_COMPRESSION) == true) {
|
|
|
|
|
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Compression enabled\n", sess, this);
|
|
|
|
|
generate_compressed_packet(); // it is copied directly into queueOUT.pkt
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
//VALGRIND_DISABLE_ERROR_REPORTING;
|
|
|
|
|
memcpy(&queueOUT.pkt, PSarrayOUT->index(idx), sizeof(PtrSize_t));
|
|
|
|
|
idx++;
|
|
|
|
|
//VALGRIND_ENABLE_ERROR_REPORTING;
|
|
|
|
|
// this is a special case, needed because compression is enabled *after* the first OK
|
|
|
|
|
if (DSS == STATE_CLIENT_AUTH_OK) {
|
|
|
|
|
DSS = STATE_SLEEP;
|
|
|
|
|
// enable compression
|
|
|
|
|
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
|
|
|
|
|
if (myconn->options.compression_min_length) {
|
|
|
|
|
myconn->set_status(true, STATUS_MYSQL_CONNECTION_COMPRESSION);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
//explicitly disable compression
|
|
|
|
|
myconn->options.compression_min_length = 0;
|
|
|
|
|
myconn->set_status(false, STATUS_MYSQL_CONNECTION_COMPRESSION);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
memcpy(&queueOUT.pkt, PSarrayOUT->index(idx), sizeof(PtrSize_t));
|
|
|
|
|
idx++;
|
|
|
|
|
|
|
|
|
|
if (DSS == STATE_CLIENT_AUTH_OK) {
|
|
|
|
|
DSS = STATE_SLEEP;
|
|
|
|
|
|
|
|
|
|
//explicitly disable compression
|
|
|
|
|
myconn->options.compression_min_length = 0;
|
|
|
|
|
myconn->set_status(false, STATUS_MYSQL_CONNECTION_COMPRESSION);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef DEBUG
|
|
|
|
|
{ __dump_pkt(__func__, (unsigned char*)queueOUT.pkt.ptr, queueOUT.pkt.size); }
|
|
|
|
|
#endif
|
|
|
|
|
|