Further development to support compression: Issue #219

Added functions:
MySQL_Data_Stream::generate_compressed_packet()
Extended changes in:
MySQL_Data_Stream::array2buffer()
MySQL_Data_Stream::buffer2array()

Further improvements:
Added global variables mysql-have_compress : issue #225
MySQL_Protocol::process_pkt_initial_handshake() support upper and lower capabilities
Added timestamp and timediff in proxy_debug_func() : issue #228
pull/248/head
René Cannaò 11 years ago
parent 94c38a41cd
commit c1678558c1

@ -45,6 +45,7 @@ class MySQL_Connection {
} options;
uint32_t status_flags;
unsigned long long last_time_used;
uint8_t compression_pkt_id;
MySrvC *parent;
// void * operator new(size_t);
// void operator delete(void *);

@ -39,6 +39,7 @@ class MySQL_Data_Stream
//mysql_data_buffer_t bufferOUT;
int array2buffer();
int buffer2array();
void generate_compressed_packet();
public:
void * operator new(size_t);
void operator delete(void *);

@ -282,6 +282,7 @@ class Standard_MySQL_Threads_Handler: public MySQL_Threads_Handler
char *server_version;
uint8_t default_charset;
bool servers_stats;
bool have_compress;
#ifdef DEBUG
bool session_debug;
#endif /* DEBUG */

@ -68,6 +68,7 @@ enum mysql_data_stream_status {
STATE_NOT_CONNECTED,
STATE_SERVER_HANDSHAKE,
STATE_CLIENT_HANDSHAKE,
STATE_CLIENT_AUTH_OK,
STATE_SSL_INIT,
STATE_SLEEP,
STATE_CLIENT_COM_QUERY,
@ -620,6 +621,7 @@ __thread char *mysql_thread___connect_timeout_server_error;
__thread uint16_t mysql_thread___server_capabilities;
__thread uint8_t mysql_thread___default_charset;
__thread int mysql_thread___poll_timeout;
__thread bool mysql_thread___have_compress;
__thread bool mysql_thread___servers_stats;
#ifdef DEBUG
__thread bool mysql_thread___session_debug;
@ -639,6 +641,7 @@ extern __thread char *mysql_thread___connect_timeout_server_error;
extern __thread uint16_t mysql_thread___server_capabilities;
extern __thread uint8_t mysql_thread___default_charset;
extern __thread int mysql_thread___poll_timeout;
extern __thread bool mysql_thread___have_compress;
extern __thread bool mysql_thread___servers_stats;
#ifdef DEBUG
extern __thread bool mysql_thread___session_debug;

@ -461,8 +461,10 @@ MySQL_Connection * MySrvConnList::get_random_MyConn() {
conn->parent=mysrvc;
//conn->options.charset=mysrvc->charset;
conn->options.server_capabilities=0;
conn->options.server_capabilities|=CLIENT_COMPRESS;
if (mysql_thread___have_compress==true && mysrvc->compression) {
conn->options.server_capabilities|=CLIENT_COMPRESS;
conn->options.compression_min_length=mysrvc->compression;
}
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
return conn;
}

@ -1124,6 +1124,19 @@ bool MySQL_Protocol::generate_pkt_handshake_response(bool send, void **ptr, unsi
myhdr.pkt_id=1;
uint32_t capabilities = CLIENT_LONG_PASSWORD | CLIENT_FOUND_ROWS | CLIENT_LONG_FLAG | CLIENT_CONNECT_WITH_DB | CLIENT_PROTOCOL_41 | CLIENT_TRANSACTIONS | CLIENT_SECURE_CONNECTION | CLIENT_MULTI_STATEMENTS | CLIENT_MULTI_RESULTS | CLIENT_PS_MULTI_RESULTS ;
// enable compression
assert(sess);
assert(sess->mybe);
assert(sess->mybe->server_myds);
MySQL_Connection *myconn=sess->mybe->server_myds->myconn;
assert(myconn);
if (myconn->options.compression_min_length) {
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
capabilities|=CLIENT_COMPRESS;
}
}
uint32_t max_allowed_packet=1*1024*1024;
assert(sess);
assert(sess->client_myds);
@ -1305,6 +1318,11 @@ bool MySQL_Protocol::generate_pkt_initial_handshake(bool send, void **ptr, unsig
memcpy(_ptr+l, (*myds)->myconn->scramble_buff+0, 8); l+=8;
_ptr[l]=0x00; l+=1; //0x00
if (mysql_thread___have_compress) {
mysql_thread___server_capabilities |= CLIENT_COMPRESS; // FIXME: shouldn't be here
//(*myds)->myconn->options.compression_min_length=50;
}
(*myds)->myconn->options.server_capabilities=mysql_thread___server_capabilities;
memcpy(_ptr+l,&mysql_thread___server_capabilities, sizeof(mysql_thread___server_capabilities)); l+=sizeof(mysql_thread___server_capabilities);
memcpy(_ptr+l,&mysql_thread___default_charset, sizeof(mysql_thread___default_charset)); l+=sizeof(mysql_thread___default_charset);
memcpy(_ptr+l,&server_status, sizeof(server_status)); l+=sizeof(server_status);
@ -1432,7 +1450,9 @@ bool MySQL_Protocol::process_pkt_initial_handshake(unsigned char *pkt, unsigned
goto exit_process_pkt_initial_handshake;
}
uint8_t protocol;
uint16_t capabilities;
uint16_t capabilities_lower;
uint16_t capabilities_upper;
uint32_t capabilities;
uint8_t charset;
//uint16_t status;
uint32_t thread_id;
@ -1449,12 +1469,15 @@ bool MySQL_Protocol::process_pkt_initial_handshake(unsigned char *pkt, unsigned
pkt += sizeof(uint32_t);
salt1 = pkt;
pkt += strlen((char *)salt1) + 1;
capabilities = CPY2(pkt);
capabilities_lower = CPY2(pkt);
pkt += sizeof(uint16_t);
charset = *(uint8_t *)pkt;
pkt += sizeof(uint8_t);
prot_status = CPY2(pkt);
pkt += 15; // 2 for status, 13 for zero-byte padding
pkt += sizeof(uint16_t);
capabilities_upper = CPY2(pkt);
pkt += sizeof(uint16_t);
pkt += 11;
salt2 = pkt;
// FIXME: the next two lines are here just to prevent this: warning: variable salt2 set but not used [-Wunused-but-set-variable]
@ -1462,6 +1485,8 @@ bool MySQL_Protocol::process_pkt_initial_handshake(unsigned char *pkt, unsigned
salt2++;
salt2 = pkt;
capabilities=capabilities_upper << 16;
capabilities+=capabilities_lower;
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL,1,"Handshake <proto:%u ver:\"%s\" thd:%d cap:%d char:%d status:%d>\n", protocol, version, thread_id, capabilities, charset, prot_status);
// if(op.verbose) unmask_caps(caps);
@ -1560,8 +1585,16 @@ bool MySQL_Protocol::process_pkt_handshake_response(unsigned char *pkt, unsigned
(capabilities & CLIENT_SECURE_CONNECTION ? "new" : "old"), user, password, pass, db, max_pkt, capabilities, charset, ((*myds)->encrypted ? "yes" : "no"));
assert(sess);
assert(sess->client_myds);
assert(sess->client_myds->myconn);
sess->client_myds->myconn->set_charset(charset);
MySQL_Connection *myconn=sess->client_myds->myconn;
assert(myconn);
myconn->set_charset(charset);
// enable compression
if (capabilities & CLIENT_COMPRESS) {
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
myconn->options.compression_min_length=50;
//myconn->set_status_compression(true); // don't enable this here. It needs to be enabled after the OK is sent
}
}
#ifdef DEBUG
if (dump_pkt) { __dump_pkt(__func__,_ptr,len); }
#endif

@ -858,17 +858,29 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(Ptr
void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CONNECTING_SERVER - STATE_CLIENT_HANDSHAKE\n");
if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) {
MySQL_Data_Stream *myds=mybe->server_myds;
if (myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) {
l_free(pkt->size,pkt->ptr);
mybe->server_myds->DSS=STATE_READY;
myds->DSS=STATE_READY;
//mybe->myconn=server_myds->myconn;
status=WAITING_SERVER_DATA;
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
for (k=0; k<myds->PSarrayOUTpending->len;) {
myds->PSarrayOUTpending->remove_index(0,&pkt2);
myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
myds->DSS=STATE_QUERY_SENT_DS;
}
MySQL_Connection *myconn=myds->myconn;
// enable compression
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
if (myconn->options.compression_min_length) {
myconn->set_status_compression(true);
}
} else {
// explicitly disable compression
myconn->options.compression_min_length=0;
myconn->set_status_compression(false);
}
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for backend: disconnecting\n");
@ -906,7 +918,20 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,2,0,0,0,0,NULL);
//server_myds->myconn->userinfo->set(client_myds->myconn->userinfo);
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
client_myds->DSS=STATE_CLIENT_AUTH_OK;
MySQL_Connection *myconn=client_myds->myconn;
/*
// enable compression
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
if (myconn->options.compression_min_length) {
myconn->set_status_compression(true);
}
} else {
//explicitly disable compression
myconn->options.compression_min_length=0;
myconn->set_status_compression(false);
}
*/
} else {
// use SSL
client_myds->DSS=STATE_SSL_INIT;
@ -1149,7 +1174,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_SET_NAMES_to_backend(
//mybe->server_myds->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname));
//myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname);
//mybe->server_myds->myprot.generate_COM_INIT_DB(true,NULL,NULL,mybe->server_myds->myconn->userinfo->schemaname);
mybe->server_myds->myprot.generate_COM_QUERY(true,NULL,NULL,"SET NAMES utf8");
mybe->server_myds->myprot.generate_COM_QUERY(true,NULL,NULL,(char *)"SET NAMES utf8");
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
status=CHANGING_SCHEMA;
}

@ -189,6 +189,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"connect_timeout_server",
(char *)"connect_timeout_server_error",
(char *)"default_charset",
(char *)"have_compress",
(char *)"ping_interval_server",
(char *)"ping_timeout_server",
(char *)"default_schema",
@ -279,6 +280,7 @@ Standard_MySQL_Threads_Handler::Standard_MySQL_Threads_Handler() {
variables.server_version=strdup((char *)"5.1.30");
variables.server_capabilities=CLIENT_FOUND_ROWS | CLIENT_PROTOCOL_41 | CLIENT_IGNORE_SIGPIPE | CLIENT_TRANSACTIONS | CLIENT_SECURE_CONNECTION | CLIENT_CONNECT_WITH_DB | CLIENT_SSL;
variables.poll_timeout=2000;
variables.have_compress=true;
variables.servers_stats=true;
#ifdef DEBUG
variables.session_debug=true;
@ -326,6 +328,7 @@ int Standard_MySQL_Threads_Handler::get_variable_int(char *name) {
if (!strcasecmp(name,"connect_timeout_server")) return (int)variables.connect_timeout_server;
if (!strcasecmp(name,"ping_interval_server")) return (int)variables.ping_interval_server;
if (!strcasecmp(name,"ping_timeout_server")) return (int)variables.ping_timeout_server;
if (!strcmp(name,"have_compress")) return (int)variables.have_compress;
if (!strcmp(name,"servers_stats")) return (int)variables.servers_stats;
if (!strcmp(name,"poll_timeout")) return variables.poll_timeout;
if (!strcmp(name,"stacksize")) return ( stacksize ? stacksize : DEFAULT_STACK_SIZE);
@ -377,6 +380,9 @@ char * Standard_MySQL_Threads_Handler::get_variable(char *name) { // this is the
return strdup((variables.session_debug ? "true" : "false"));
}
#endif /* DEBUG */
if (!strcmp(name,"have_compress")) {
return strdup((variables.have_compress ? "true" : "false"));
}
if (!strcmp(name,"servers_stats")) {
return strdup((variables.servers_stats ? "true" : "false"));
}
@ -450,10 +456,10 @@ bool Standard_MySQL_Threads_Handler::set_variable(char *name, char *value) { //
return false;
}
}
if (!strcmp(name,"poll_timeout")) {
if (!strcmp(name,"server_capbilities")) {
int intv=atoi(value);
if (intv > 10 && intv < 20000) {
variables.poll_timeout=intv;
if (intv > 10 && intv <= 65535) {
variables.server_capabilities=intv;
return true;
} else {
return false;
@ -508,6 +514,17 @@ bool Standard_MySQL_Threads_Handler::set_variable(char *name, char *value) { //
return false;
}
#endif /* DEBUG */
if (!strcmp(name,"have_compress")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.have_compress=true;
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.have_compress=false;
return true;
}
return false;
}
if (!strcmp(name,"servers_stats")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.servers_stats=true;
@ -685,10 +702,16 @@ MySQL_Session * Standard_MySQL_Thread::create_new_session_and_client_data_stream
//sess->myprot_client.dump_pkt=true;
sess->client_myds->myprot.dump_pkt=true;
#endif
sess->client_myds->myconn=new MySQL_Connection(); // 20141011
sess->client_myds->myconn->last_time_used=curtime;
sess->client_myds->myconn->myds=sess->client_myds; // 20141011
sess->client_myds->myconn->fd=sess->client_myds->fd; // 20141011
sess->client_myds->myconn=new MySQL_Connection();
MySQL_Connection *myconn=sess->client_myds->myconn;
//myconn=new MySQL_Connection(); // 20141011
// if (mysql_thread___have_compress) {
// myconn->options.compression_min_length=50;
// myconn->options.server_capabilities|=CLIENT_COMPRESS;
// }
myconn->last_time_used=curtime;
myconn->myds=sess->client_myds; // 20141011
myconn->fd=sess->client_myds->fd; // 20141011
// FIXME: initializing both, later we will drop one
//sess->myprot_client.init(&sess->client_myds, sess->client_myds->myconn->userinfo, sess);
@ -1028,6 +1051,7 @@ void Standard_MySQL_Thread::refresh_variables() {
mysql_thread___server_capabilities=GloMTH->get_variable_uint16((char *)"server_capabilities");
mysql_thread___default_charset=GloMTH->get_variable_uint8((char *)"default_charset");
mysql_thread___poll_timeout=GloMTH->get_variable_int((char *)"poll_timeout");
mysql_thread___have_compress=(bool)GloMTH->get_variable_int((char *)"have_compress");
mysql_thread___servers_stats=(bool)GloMTH->get_variable_int((char *)"servers_stats");
#ifdef DEBUG
mysql_thread___session_debug=(bool)GloMTH->get_variable_int((char *)"session_debug");

@ -1,5 +1,5 @@
#include "proxysql.h"
#include "proxysql_atomic.h"
#include <cxxabi.h>
@ -12,6 +12,14 @@
//extern debug_level *gdbg_lvl;
//extern int gdbg;
static unsigned long long pretime=0;
static spinlock debug_spinlock;
static inline unsigned long long debug_monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
void crash_handler(int sig) {
#ifdef DEBUG
@ -42,6 +50,7 @@ void proxy_debug_func(enum debug_module module, int verbosity, const char *fmt,
#define DEBUG_MSG_MAXSIZE 1024
#ifdef DEBUG
void proxy_debug_func(enum debug_module module, int verbosity, int thr, const char *__file, int __line, const char *__func, const char *fmt, ...) {
assert(module<PROXY_DEBUG_UNKNOWN);
if (GloVars.global.gdbg_lvl[module].verbosity < verbosity) return;
@ -53,8 +62,11 @@ void proxy_debug_func(enum debug_module module, int verbosity, int thr, const ch
va_start(ap, fmt);
vsnprintf(debugbuff, DEBUG_MSG_MAXSIZE,fmt,ap);
va_end(ap);
spin_lock(&debug_spinlock);
unsigned long long curtime=debug_monotonic_time();
//fprintf(stderr, "%d:%s:%d:%s(): MOD#%d LVL#%d : %s" , thr, __file, __line, __func, module, verbosity, debugbuff);
sprintf(longdebugbuff, "%d:%s:%d:%s(): MOD#%d LVL#%d : %s" , thr, __file, __line, __func, module, verbosity, debugbuff);
sprintf(longdebugbuff, "%llu(%llu): %d:%s:%d:%s(): MOD#%d LVL#%d : %s" , curtime, curtime-pretime, thr, __file, __line, __func, module, verbosity, debugbuff);
pretime=curtime;
}
if (GloVars.global.gdbg_lvl[module].verbosity>=10) {
void *arr[20];
@ -86,6 +98,7 @@ void proxy_debug_func(enum debug_module module, int verbosity, int thr, const ch
// fprintf(stderr, "%s", longdebugbuff);
}
if (strlen(longdebugbuff)) fprintf(stderr, "%s", longdebugbuff);
spin_unlock(&debug_spinlock);
if (GloVars.global.foreground) {
return;
}
@ -129,8 +142,10 @@ void proxy_error_func(const char *fmt, ...) {
};
#ifdef DEBUG
void init_debug_struct() {
void init_debug_struct() {
int i;
spinlock_init(&debug_spinlock);
pretime=debug_monotonic_time();
GloVars.global.gdbg_lvl= (debug_level *) malloc(PROXY_DEBUG_UNKNOWN*sizeof(debug_level));
for (i=0;i<PROXY_DEBUG_UNKNOWN;i++) {
GloVars.global.gdbg_lvl[i].module=(enum debug_module)i;

@ -112,6 +112,7 @@ MySQL_Connection::MySQL_Connection() {
status_flags=0;
options.compression_min_length=0;
options.server_version=NULL;
compression_pkt_id=0;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this);
};

@ -1,6 +1,6 @@
#include "proxysql.h"
#include "cpp.h"
#include <zlib.h>
#ifndef UNIX_PATH_MAX
#define UNIX_PATH_MAX 108
#endif
@ -380,20 +380,40 @@ int MySQL_Data_Stream::buffer2array() {
queueIN.pkt.size=0;
return ret;
}
if ((queueIN.pkt.size==0) && queue_data(queueIN)>=sizeof(mysql_hdr)) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Reading the header of a new packet\n");
memcpy(&queueIN.hdr,queue_r_ptr(queueIN),sizeof(mysql_hdr));
//Copy4B(&queueIN.hdr,queue_r_ptr(queueIN));
queue_r(queueIN,sizeof(mysql_hdr));
//proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Allocating %d bytes for a new packet\n", myds->input.hdr.pkt_length+sizeof(mysql_hdr));
queueIN.pkt.size=queueIN.hdr.pkt_length+sizeof(mysql_hdr);
queueIN.pkt.ptr=l_alloc(queueIN.pkt.size);
//MEM_COPY_FWD((unsigned char *)queueIN.pkt.ptr, (unsigned char *)&queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet
memcpy(queueIN.pkt.ptr, &queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet
//Copy4B(queueIN.pkt.ptr,&queueIN.hdr);
queueIN.partial=sizeof(mysql_hdr);
ret+=sizeof(mysql_hdr);
/**/
if (myconn->get_status_compression()==true) {
if ((queueIN.pkt.size==0) && queue_data(queueIN)>=7) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Reading the header of a new compressed packet\n");
memcpy(&queueIN.hdr,queue_r_ptr(queueIN), sizeof(mysql_hdr));
queue_r(queueIN,sizeof(mysql_hdr));
queueIN.pkt.size=queueIN.hdr.pkt_length+sizeof(mysql_hdr)+3;
queueIN.pkt.ptr=l_alloc(queueIN.pkt.size);
memcpy(queueIN.pkt.ptr, &queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet
memcpy((unsigned char *)queueIN.pkt.ptr+sizeof(mysql_hdr), queue_r_ptr(queueIN), 3); // copy 3 bytes, the length of the uncompressed payload
queue_r(queueIN,3);
queueIN.partial=7;
mysql_hdr *_hdr;
_hdr=(mysql_hdr *)queueIN.pkt.ptr;
myconn->compression_pkt_id=_hdr->pkt_id;
ret+=7;
}
} else {
/**/
if ((queueIN.pkt.size==0) && queue_data(queueIN)>=sizeof(mysql_hdr)) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Reading the header of a new packet\n");
memcpy(&queueIN.hdr,queue_r_ptr(queueIN),sizeof(mysql_hdr));
queue_r(queueIN,sizeof(mysql_hdr));
queueIN.pkt.size=queueIN.hdr.pkt_length+sizeof(mysql_hdr);
queueIN.pkt.ptr=l_alloc(queueIN.pkt.size);
memcpy(queueIN.pkt.ptr, &queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet
queueIN.partial=sizeof(mysql_hdr);
ret+=sizeof(mysql_hdr);
// if (myconn->get_status_compression()==true) {
// mysql_hdr *_hdr;
// _hdr=(mysql_hdr *)queueIN.pkt.ptr;
// myconn->compression_pkt_id=_hdr->pkt_id;
// }
}
}
if ((queueIN.pkt.size>0) && queue_data(queueIN)) {
int b= ( queue_data(queueIN) > (queueIN.pkt.size - queueIN.partial) ? (queueIN.pkt.size - queueIN.partial) : queue_data(queueIN) );
@ -404,15 +424,109 @@ int MySQL_Data_Stream::buffer2array() {
ret+=b;
}
if ((queueIN.pkt.size>0) && (queueIN.pkt.size==queueIN.partial) ) {
PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size);
pkts_recv++;
queueIN.pkt.size=0;
queueIN.pkt.ptr=NULL;
}
if (myconn->get_status_compression()==true) {
Bytef *dest;
uLongf destLen;
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Copied the whole compressed packet\n");
unsigned int progress=0;
unsigned int datalength;
unsigned int payload_length=0;
unsigned char *u;
u=(unsigned char *)queueIN.pkt.ptr;
payload_length=*(u+6);
payload_length=payload_length*256+*(u+5);
payload_length=payload_length*256+*(u+4);
unsigned char *_ptr=(unsigned char *)queueIN.pkt.ptr+7;
if (payload_length) {
// the payload is compressed
destLen=payload_length;
dest=(Bytef *)l_alloc(destLen);
int rc=uncompress(dest, &destLen, _ptr, queueIN.pkt.size-7);
assert(rc==Z_OK);
datalength=payload_length;
// change _ptr to the new buffer
_ptr=dest;
} else {
// the payload is not compressed
datalength=queueIN.pkt.size-7;
}
while (progress<datalength) {
mysql_hdr _a;
memcpy(&_a,_ptr+progress,sizeof(mysql_hdr));
unsigned int size=_a.pkt_length+sizeof(mysql_hdr);
unsigned char *ptrP=(unsigned char *)l_alloc(size);
memcpy(ptrP,_ptr+progress,size);
progress+=size;
PSarrayIN->add(ptrP,size);
}
if (payload_length) {
l_free(destLen,dest);
}
l_free(queueIN.pkt.size,queueIN.pkt.ptr);
pkts_recv++;
queueIN.pkt.size=0;
queueIN.pkt.ptr=NULL;
} else {
PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size);
pkts_recv++;
queueIN.pkt.size=0;
queueIN.pkt.ptr=NULL;
}
}
return ret;
}
void MySQL_Data_Stream::generate_compressed_packet() {
#define MAX_COMPRESSED_PACKET_SIZE 10*1024*1024
unsigned int total_size=0;
unsigned int i=0;
PtrSize_t *p=NULL;
while (i<PSarrayOUT->len && total_size<MAX_COMPRESSED_PACKET_SIZE) {
p=PSarrayOUT->index(i);
total_size+=p->size;
i++;
}
if (i>=2) {
// we successfully read at least 2 packets
if (total_size>MAX_COMPRESSED_PACKET_SIZE) {
// total_size is too big, we remove the last packet read
total_size-=p->size;
}
}
uLong sourceLen=total_size;
Bytef *source=(Bytef *)l_alloc(total_size);
uLongf destLen=total_size*120/100+12;
Bytef *dest=(Bytef *)malloc(destLen);
i=0;
total_size=0;
while (total_size<sourceLen) {
//p=PSarrayOUT->index(i);
PtrSize_t p2;
PSarrayOUT->remove_index(0,&p2);
memcpy(source+total_size,p2.ptr,p2.size);
//i++;
total_size+=p2.size;
l_free(p2.size,p2.ptr);
}
//PSarrayOUT->remove_index_range(0,i);
int rc=compress(dest, &destLen, source, sourceLen);
assert(rc==Z_OK);
l_free(total_size, source);
queueOUT.pkt.size=destLen+7;
queueOUT.pkt.ptr=l_alloc(queueOUT.pkt.size);
mysql_hdr hdr;
hdr.pkt_length=destLen;
//hdr.pkt_id=++myconn->compression_pkt_id;
hdr.pkt_id=1;
memcpy((unsigned char *)queueOUT.pkt.ptr,&hdr,sizeof(mysql_hdr));
hdr.pkt_length=total_size;
memcpy((unsigned char *)queueOUT.pkt.ptr+4,&hdr,3);
memcpy((unsigned char *)queueOUT.pkt.ptr+7,dest,destLen);
free(dest);
}
int MySQL_Data_Stream::array2buffer() {
int ret=0;
@ -428,7 +542,26 @@ int MySQL_Data_Stream::array2buffer() {
l_free(queueOUT.pkt.size,queueOUT.pkt.ptr);
queueOUT.pkt.ptr=NULL;
}
PSarrayOUT->remove_index(0,&queueOUT.pkt);
if (myconn->get_status_compression()==true) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "DataStream: %p -- Compression enabled\n", this);
generate_compressed_packet(); // it is copied directly into queueOUT.pkt
} else {
PSarrayOUT->remove_index(0,&queueOUT.pkt);
// this is a special case, needed because compression is enabled *after* the first OK
if (DSS==STATE_CLIENT_AUTH_OK) {
DSS=STATE_SLEEP;
// enable compression
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
if (myconn->options.compression_min_length) {
myconn->set_status_compression(true);
}
} else {
//explicitly disable compression
myconn->options.compression_min_length=0;
myconn->set_status_compression(false);
}
}
}
//memcpy(&queueOUT.pkt,PSarrayOUT->index(idx),sizeof(PtrSize_t));
#ifdef DEBUG
{ __dump_pkt(__func__,(unsigned char *)queueOUT.pkt.ptr,queueOUT.pkt.size); }
@ -548,6 +681,8 @@ int MySQL_Data_Stream::myds_connect(char *address, int connect_port, int *pendin
if (connect_port) {
rc=connect(s, (struct sockaddr *) &a, sizeof(a));
int arg_on=1;
setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char *) &arg_on, sizeof(int));
} else {
rc=connect(s, (struct sockaddr *) &u, len);
}

Loading…
Cancel
Save