From c1678558c18b33eea21da6083bec7b786cf988fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 9 Mar 2015 23:31:19 +0000 Subject: [PATCH] Further development to support compression: Issue #219 Added functions: MySQL_Data_Stream::generate_compressed_packet() Extended changes in: MySQL_Data_Stream::array2buffer() MySQL_Data_Stream::buffer2array() Further improvements: Added global variables mysql-have_compress : issue #225 MySQL_Protocol::process_pkt_initial_handshake() support upper and lower capabilities Added timestamp and timediff in proxy_debug_func() : issue #228 --- include/mysql_connection.h | 1 + include/mysql_data_stream.h | 1 + include/mysql_thread.h | 1 + include/proxysql_structs.h | 3 + lib/MySQL_HostGroups_Manager.cpp | 6 +- lib/MySQL_Protocol.cpp | 43 +++++++- lib/MySQL_Session.cpp | 41 +++++-- lib/Standard_MySQL_Thread.cpp | 38 +++++-- lib/debug.cpp | 21 +++- lib/mysql_connection.cpp | 1 + lib/mysql_data_stream.cpp | 177 +++++++++++++++++++++++++++---- 11 files changed, 287 insertions(+), 46 deletions(-) diff --git a/include/mysql_connection.h b/include/mysql_connection.h index db579c6cd..0cf8b01d8 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -45,6 +45,7 @@ class MySQL_Connection { } options; uint32_t status_flags; unsigned long long last_time_used; + uint8_t compression_pkt_id; MySrvC *parent; // void * operator new(size_t); // void operator delete(void *); diff --git a/include/mysql_data_stream.h b/include/mysql_data_stream.h index 3781eecb8..ea8cd1be3 100644 --- a/include/mysql_data_stream.h +++ b/include/mysql_data_stream.h @@ -39,6 +39,7 @@ class MySQL_Data_Stream //mysql_data_buffer_t bufferOUT; int array2buffer(); int buffer2array(); + void generate_compressed_packet(); public: void * operator new(size_t); void operator delete(void *); diff --git a/include/mysql_thread.h b/include/mysql_thread.h index e85ec6e53..f72c7c279 100644 --- a/include/mysql_thread.h +++ b/include/mysql_thread.h @@ -282,6 +282,7 @@ class Standard_MySQL_Threads_Handler: public MySQL_Threads_Handler char *server_version; uint8_t default_charset; bool servers_stats; + bool have_compress; #ifdef DEBUG bool session_debug; #endif /* DEBUG */ diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index bb947e4d8..22fe8c78b 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -68,6 +68,7 @@ enum mysql_data_stream_status { STATE_NOT_CONNECTED, STATE_SERVER_HANDSHAKE, STATE_CLIENT_HANDSHAKE, + STATE_CLIENT_AUTH_OK, STATE_SSL_INIT, STATE_SLEEP, STATE_CLIENT_COM_QUERY, @@ -620,6 +621,7 @@ __thread char *mysql_thread___connect_timeout_server_error; __thread uint16_t mysql_thread___server_capabilities; __thread uint8_t mysql_thread___default_charset; __thread int mysql_thread___poll_timeout; +__thread bool mysql_thread___have_compress; __thread bool mysql_thread___servers_stats; #ifdef DEBUG __thread bool mysql_thread___session_debug; @@ -639,6 +641,7 @@ extern __thread char *mysql_thread___connect_timeout_server_error; extern __thread uint16_t mysql_thread___server_capabilities; extern __thread uint8_t mysql_thread___default_charset; extern __thread int mysql_thread___poll_timeout; +extern __thread bool mysql_thread___have_compress; extern __thread bool mysql_thread___servers_stats; #ifdef DEBUG extern __thread bool mysql_thread___session_debug; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 1412e7901..4d2588d73 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -461,8 +461,10 @@ MySQL_Connection * MySrvConnList::get_random_MyConn() { conn->parent=mysrvc; //conn->options.charset=mysrvc->charset; conn->options.server_capabilities=0; - conn->options.server_capabilities|=CLIENT_COMPRESS; - + if (mysql_thread___have_compress==true && mysrvc->compression) { + conn->options.server_capabilities|=CLIENT_COMPRESS; + conn->options.compression_min_length=mysrvc->compression; + } proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port); return conn; } diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 5749a9d36..92d3db3b4 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -1124,6 +1124,19 @@ bool MySQL_Protocol::generate_pkt_handshake_response(bool send, void **ptr, unsi myhdr.pkt_id=1; uint32_t capabilities = CLIENT_LONG_PASSWORD | CLIENT_FOUND_ROWS | CLIENT_LONG_FLAG | CLIENT_CONNECT_WITH_DB | CLIENT_PROTOCOL_41 | CLIENT_TRANSACTIONS | CLIENT_SECURE_CONNECTION | CLIENT_MULTI_STATEMENTS | CLIENT_MULTI_RESULTS | CLIENT_PS_MULTI_RESULTS ; + + // enable compression + assert(sess); + assert(sess->mybe); + assert(sess->mybe->server_myds); + MySQL_Connection *myconn=sess->mybe->server_myds->myconn; + assert(myconn); + if (myconn->options.compression_min_length) { + if (myconn->options.server_capabilities & CLIENT_COMPRESS) { + capabilities|=CLIENT_COMPRESS; + } + } + uint32_t max_allowed_packet=1*1024*1024; assert(sess); assert(sess->client_myds); @@ -1305,6 +1318,11 @@ bool MySQL_Protocol::generate_pkt_initial_handshake(bool send, void **ptr, unsig memcpy(_ptr+l, (*myds)->myconn->scramble_buff+0, 8); l+=8; _ptr[l]=0x00; l+=1; //0x00 + if (mysql_thread___have_compress) { + mysql_thread___server_capabilities |= CLIENT_COMPRESS; // FIXME: shouldn't be here + //(*myds)->myconn->options.compression_min_length=50; + } + (*myds)->myconn->options.server_capabilities=mysql_thread___server_capabilities; memcpy(_ptr+l,&mysql_thread___server_capabilities, sizeof(mysql_thread___server_capabilities)); l+=sizeof(mysql_thread___server_capabilities); memcpy(_ptr+l,&mysql_thread___default_charset, sizeof(mysql_thread___default_charset)); l+=sizeof(mysql_thread___default_charset); memcpy(_ptr+l,&server_status, sizeof(server_status)); l+=sizeof(server_status); @@ -1432,7 +1450,9 @@ bool MySQL_Protocol::process_pkt_initial_handshake(unsigned char *pkt, unsigned goto exit_process_pkt_initial_handshake; } uint8_t protocol; - uint16_t capabilities; + uint16_t capabilities_lower; + uint16_t capabilities_upper; + uint32_t capabilities; uint8_t charset; //uint16_t status; uint32_t thread_id; @@ -1449,12 +1469,15 @@ bool MySQL_Protocol::process_pkt_initial_handshake(unsigned char *pkt, unsigned pkt += sizeof(uint32_t); salt1 = pkt; pkt += strlen((char *)salt1) + 1; - capabilities = CPY2(pkt); + capabilities_lower = CPY2(pkt); pkt += sizeof(uint16_t); charset = *(uint8_t *)pkt; pkt += sizeof(uint8_t); prot_status = CPY2(pkt); - pkt += 15; // 2 for status, 13 for zero-byte padding + pkt += sizeof(uint16_t); + capabilities_upper = CPY2(pkt); + pkt += sizeof(uint16_t); + pkt += 11; salt2 = pkt; // FIXME: the next two lines are here just to prevent this: warning: variable ‘salt2’ set but not used [-Wunused-but-set-variable] @@ -1462,6 +1485,8 @@ bool MySQL_Protocol::process_pkt_initial_handshake(unsigned char *pkt, unsigned salt2++; salt2 = pkt; + capabilities=capabilities_upper << 16; + capabilities+=capabilities_lower; proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL,1,"Handshake \n", protocol, version, thread_id, capabilities, charset, prot_status); // if(op.verbose) unmask_caps(caps); @@ -1560,8 +1585,16 @@ bool MySQL_Protocol::process_pkt_handshake_response(unsigned char *pkt, unsigned (capabilities & CLIENT_SECURE_CONNECTION ? "new" : "old"), user, password, pass, db, max_pkt, capabilities, charset, ((*myds)->encrypted ? "yes" : "no")); assert(sess); assert(sess->client_myds); - assert(sess->client_myds->myconn); - sess->client_myds->myconn->set_charset(charset); + MySQL_Connection *myconn=sess->client_myds->myconn; + assert(myconn); + myconn->set_charset(charset); + // enable compression + if (capabilities & CLIENT_COMPRESS) { + if (myconn->options.server_capabilities & CLIENT_COMPRESS) { + myconn->options.compression_min_length=50; + //myconn->set_status_compression(true); // don't enable this here. It needs to be enabled after the OK is sent + } + } #ifdef DEBUG if (dump_pkt) { __dump_pkt(__func__,_ptr,len); } #endif diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 33ecce47a..eaaf825bf 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -858,17 +858,29 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(Ptr void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CONNECTING_SERVER - STATE_CLIENT_HANDSHAKE\n"); - if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) { + MySQL_Data_Stream *myds=mybe->server_myds; + if (myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) { l_free(pkt->size,pkt->ptr); - mybe->server_myds->DSS=STATE_READY; + myds->DSS=STATE_READY; //mybe->myconn=server_myds->myconn; status=WAITING_SERVER_DATA; unsigned int k; PtrSize_t pkt2; - for (k=0; kserver_myds->PSarrayOUTpending->len;) { - mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2); - mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); - mybe->server_myds->DSS=STATE_QUERY_SENT_DS; + for (k=0; kPSarrayOUTpending->len;) { + myds->PSarrayOUTpending->remove_index(0,&pkt2); + myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); + myds->DSS=STATE_QUERY_SENT_DS; + } + MySQL_Connection *myconn=myds->myconn; + // enable compression + if (myconn->options.server_capabilities & CLIENT_COMPRESS) { + if (myconn->options.compression_min_length) { + myconn->set_status_compression(true); + } + } else { + // explicitly disable compression + myconn->options.compression_min_length=0; + myconn->set_status_compression(false); } } else { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for backend: disconnecting\n"); @@ -906,7 +918,20 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( client_myds->myprot.generate_pkt_OK(true,NULL,NULL,2,0,0,0,0,NULL); //server_myds->myconn->userinfo->set(client_myds->myconn->userinfo); status=WAITING_CLIENT_DATA; - client_myds->DSS=STATE_SLEEP; + client_myds->DSS=STATE_CLIENT_AUTH_OK; + MySQL_Connection *myconn=client_myds->myconn; +/* + // enable compression + if (myconn->options.server_capabilities & CLIENT_COMPRESS) { + if (myconn->options.compression_min_length) { + myconn->set_status_compression(true); + } + } else { + //explicitly disable compression + myconn->options.compression_min_length=0; + myconn->set_status_compression(false); + } +*/ } else { // use SSL client_myds->DSS=STATE_SSL_INIT; @@ -1149,7 +1174,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_SET_NAMES_to_backend( //mybe->server_myds->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname)); //myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname); //mybe->server_myds->myprot.generate_COM_INIT_DB(true,NULL,NULL,mybe->server_myds->myconn->userinfo->schemaname); - mybe->server_myds->myprot.generate_COM_QUERY(true,NULL,NULL,"SET NAMES utf8"); + mybe->server_myds->myprot.generate_COM_QUERY(true,NULL,NULL,(char *)"SET NAMES utf8"); mybe->server_myds->DSS=STATE_QUERY_SENT_DS; status=CHANGING_SCHEMA; } diff --git a/lib/Standard_MySQL_Thread.cpp b/lib/Standard_MySQL_Thread.cpp index 69dabbc1d..cb0158a55 100644 --- a/lib/Standard_MySQL_Thread.cpp +++ b/lib/Standard_MySQL_Thread.cpp @@ -189,6 +189,7 @@ static char * mysql_thread_variables_names[]= { (char *)"connect_timeout_server", (char *)"connect_timeout_server_error", (char *)"default_charset", + (char *)"have_compress", (char *)"ping_interval_server", (char *)"ping_timeout_server", (char *)"default_schema", @@ -279,6 +280,7 @@ Standard_MySQL_Threads_Handler::Standard_MySQL_Threads_Handler() { variables.server_version=strdup((char *)"5.1.30"); variables.server_capabilities=CLIENT_FOUND_ROWS | CLIENT_PROTOCOL_41 | CLIENT_IGNORE_SIGPIPE | CLIENT_TRANSACTIONS | CLIENT_SECURE_CONNECTION | CLIENT_CONNECT_WITH_DB | CLIENT_SSL; variables.poll_timeout=2000; + variables.have_compress=true; variables.servers_stats=true; #ifdef DEBUG variables.session_debug=true; @@ -326,6 +328,7 @@ int Standard_MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"connect_timeout_server")) return (int)variables.connect_timeout_server; if (!strcasecmp(name,"ping_interval_server")) return (int)variables.ping_interval_server; if (!strcasecmp(name,"ping_timeout_server")) return (int)variables.ping_timeout_server; + if (!strcmp(name,"have_compress")) return (int)variables.have_compress; if (!strcmp(name,"servers_stats")) return (int)variables.servers_stats; if (!strcmp(name,"poll_timeout")) return variables.poll_timeout; if (!strcmp(name,"stacksize")) return ( stacksize ? stacksize : DEFAULT_STACK_SIZE); @@ -377,6 +380,9 @@ char * Standard_MySQL_Threads_Handler::get_variable(char *name) { // this is the return strdup((variables.session_debug ? "true" : "false")); } #endif /* DEBUG */ + if (!strcmp(name,"have_compress")) { + return strdup((variables.have_compress ? "true" : "false")); + } if (!strcmp(name,"servers_stats")) { return strdup((variables.servers_stats ? "true" : "false")); } @@ -450,10 +456,10 @@ bool Standard_MySQL_Threads_Handler::set_variable(char *name, char *value) { // return false; } } - if (!strcmp(name,"poll_timeout")) { + if (!strcmp(name,"server_capbilities")) { int intv=atoi(value); - if (intv > 10 && intv < 20000) { - variables.poll_timeout=intv; + if (intv > 10 && intv <= 65535) { + variables.server_capabilities=intv; return true; } else { return false; @@ -508,6 +514,17 @@ bool Standard_MySQL_Threads_Handler::set_variable(char *name, char *value) { // return false; } #endif /* DEBUG */ + if (!strcmp(name,"have_compress")) { + if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { + variables.have_compress=true; + return true; + } + if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) { + variables.have_compress=false; + return true; + } + return false; + } if (!strcmp(name,"servers_stats")) { if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { variables.servers_stats=true; @@ -685,10 +702,16 @@ MySQL_Session * Standard_MySQL_Thread::create_new_session_and_client_data_stream //sess->myprot_client.dump_pkt=true; sess->client_myds->myprot.dump_pkt=true; #endif - sess->client_myds->myconn=new MySQL_Connection(); // 20141011 - sess->client_myds->myconn->last_time_used=curtime; - sess->client_myds->myconn->myds=sess->client_myds; // 20141011 - sess->client_myds->myconn->fd=sess->client_myds->fd; // 20141011 + sess->client_myds->myconn=new MySQL_Connection(); + MySQL_Connection *myconn=sess->client_myds->myconn; + //myconn=new MySQL_Connection(); // 20141011 +// if (mysql_thread___have_compress) { +// myconn->options.compression_min_length=50; +// myconn->options.server_capabilities|=CLIENT_COMPRESS; +// } + myconn->last_time_used=curtime; + myconn->myds=sess->client_myds; // 20141011 + myconn->fd=sess->client_myds->fd; // 20141011 // FIXME: initializing both, later we will drop one //sess->myprot_client.init(&sess->client_myds, sess->client_myds->myconn->userinfo, sess); @@ -1028,6 +1051,7 @@ void Standard_MySQL_Thread::refresh_variables() { mysql_thread___server_capabilities=GloMTH->get_variable_uint16((char *)"server_capabilities"); mysql_thread___default_charset=GloMTH->get_variable_uint8((char *)"default_charset"); mysql_thread___poll_timeout=GloMTH->get_variable_int((char *)"poll_timeout"); + mysql_thread___have_compress=(bool)GloMTH->get_variable_int((char *)"have_compress"); mysql_thread___servers_stats=(bool)GloMTH->get_variable_int((char *)"servers_stats"); #ifdef DEBUG mysql_thread___session_debug=(bool)GloMTH->get_variable_int((char *)"session_debug"); diff --git a/lib/debug.cpp b/lib/debug.cpp index c3e854439..8feaf696e 100644 --- a/lib/debug.cpp +++ b/lib/debug.cpp @@ -1,5 +1,5 @@ #include "proxysql.h" - +#include "proxysql_atomic.h" #include @@ -12,6 +12,14 @@ //extern debug_level *gdbg_lvl; //extern int gdbg; +static unsigned long long pretime=0; +static spinlock debug_spinlock; + +static inline unsigned long long debug_monotonic_time() { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000); +} void crash_handler(int sig) { #ifdef DEBUG @@ -42,6 +50,7 @@ void proxy_debug_func(enum debug_module module, int verbosity, const char *fmt, #define DEBUG_MSG_MAXSIZE 1024 #ifdef DEBUG + void proxy_debug_func(enum debug_module module, int verbosity, int thr, const char *__file, int __line, const char *__func, const char *fmt, ...) { assert(module=10) { void *arr[20]; @@ -86,6 +98,7 @@ void proxy_debug_func(enum debug_module module, int verbosity, int thr, const ch // fprintf(stderr, "%s", longdebugbuff); } if (strlen(longdebugbuff)) fprintf(stderr, "%s", longdebugbuff); + spin_unlock(&debug_spinlock); if (GloVars.global.foreground) { return; } @@ -129,8 +142,10 @@ void proxy_error_func(const char *fmt, ...) { }; #ifdef DEBUG -void init_debug_struct() { +void init_debug_struct() { int i; + spinlock_init(&debug_spinlock); + pretime=debug_monotonic_time(); GloVars.global.gdbg_lvl= (debug_level *) malloc(PROXY_DEBUG_UNKNOWN*sizeof(debug_level)); for (i=0;i #ifndef UNIX_PATH_MAX #define UNIX_PATH_MAX 108 #endif @@ -380,20 +380,40 @@ int MySQL_Data_Stream::buffer2array() { queueIN.pkt.size=0; return ret; } - if ((queueIN.pkt.size==0) && queue_data(queueIN)>=sizeof(mysql_hdr)) { - proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Reading the header of a new packet\n"); - memcpy(&queueIN.hdr,queue_r_ptr(queueIN),sizeof(mysql_hdr)); - //Copy4B(&queueIN.hdr,queue_r_ptr(queueIN)); - queue_r(queueIN,sizeof(mysql_hdr)); - //proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Allocating %d bytes for a new packet\n", myds->input.hdr.pkt_length+sizeof(mysql_hdr)); - queueIN.pkt.size=queueIN.hdr.pkt_length+sizeof(mysql_hdr); - queueIN.pkt.ptr=l_alloc(queueIN.pkt.size); - - //MEM_COPY_FWD((unsigned char *)queueIN.pkt.ptr, (unsigned char *)&queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet - memcpy(queueIN.pkt.ptr, &queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet - //Copy4B(queueIN.pkt.ptr,&queueIN.hdr); - queueIN.partial=sizeof(mysql_hdr); - ret+=sizeof(mysql_hdr); +/**/ + if (myconn->get_status_compression()==true) { + if ((queueIN.pkt.size==0) && queue_data(queueIN)>=7) { + proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Reading the header of a new compressed packet\n"); + memcpy(&queueIN.hdr,queue_r_ptr(queueIN), sizeof(mysql_hdr)); + queue_r(queueIN,sizeof(mysql_hdr)); + queueIN.pkt.size=queueIN.hdr.pkt_length+sizeof(mysql_hdr)+3; + queueIN.pkt.ptr=l_alloc(queueIN.pkt.size); + memcpy(queueIN.pkt.ptr, &queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet + memcpy((unsigned char *)queueIN.pkt.ptr+sizeof(mysql_hdr), queue_r_ptr(queueIN), 3); // copy 3 bytes, the length of the uncompressed payload + queue_r(queueIN,3); + queueIN.partial=7; + mysql_hdr *_hdr; + _hdr=(mysql_hdr *)queueIN.pkt.ptr; + myconn->compression_pkt_id=_hdr->pkt_id; + ret+=7; + } + } else { +/**/ + if ((queueIN.pkt.size==0) && queue_data(queueIN)>=sizeof(mysql_hdr)) { + proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Reading the header of a new packet\n"); + memcpy(&queueIN.hdr,queue_r_ptr(queueIN),sizeof(mysql_hdr)); + queue_r(queueIN,sizeof(mysql_hdr)); + queueIN.pkt.size=queueIN.hdr.pkt_length+sizeof(mysql_hdr); + queueIN.pkt.ptr=l_alloc(queueIN.pkt.size); + memcpy(queueIN.pkt.ptr, &queueIN.hdr, sizeof(mysql_hdr)); // immediately copy the header into the packet + queueIN.partial=sizeof(mysql_hdr); + ret+=sizeof(mysql_hdr); +// if (myconn->get_status_compression()==true) { +// mysql_hdr *_hdr; +// _hdr=(mysql_hdr *)queueIN.pkt.ptr; +// myconn->compression_pkt_id=_hdr->pkt_id; +// } + } } if ((queueIN.pkt.size>0) && queue_data(queueIN)) { int b= ( queue_data(queueIN) > (queueIN.pkt.size - queueIN.partial) ? (queueIN.pkt.size - queueIN.partial) : queue_data(queueIN) ); @@ -404,15 +424,109 @@ int MySQL_Data_Stream::buffer2array() { ret+=b; } if ((queueIN.pkt.size>0) && (queueIN.pkt.size==queueIN.partial) ) { - PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); - pkts_recv++; - queueIN.pkt.size=0; - queueIN.pkt.ptr=NULL; - } + if (myconn->get_status_compression()==true) { + Bytef *dest; + uLongf destLen; + proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Copied the whole compressed packet\n"); + unsigned int progress=0; + unsigned int datalength; + unsigned int payload_length=0; + unsigned char *u; + u=(unsigned char *)queueIN.pkt.ptr; + payload_length=*(u+6); + payload_length=payload_length*256+*(u+5); + payload_length=payload_length*256+*(u+4); + unsigned char *_ptr=(unsigned char *)queueIN.pkt.ptr+7; + + if (payload_length) { + // the payload is compressed + destLen=payload_length; + dest=(Bytef *)l_alloc(destLen); + int rc=uncompress(dest, &destLen, _ptr, queueIN.pkt.size-7); + assert(rc==Z_OK); + datalength=payload_length; + // change _ptr to the new buffer + _ptr=dest; + } else { + // the payload is not compressed + datalength=queueIN.pkt.size-7; + } + while (progressadd(ptrP,size); + } + if (payload_length) { + l_free(destLen,dest); + } + l_free(queueIN.pkt.size,queueIN.pkt.ptr); + pkts_recv++; + queueIN.pkt.size=0; + queueIN.pkt.ptr=NULL; + } else { + PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + pkts_recv++; + queueIN.pkt.size=0; + queueIN.pkt.ptr=NULL; + } + } return ret; } +void MySQL_Data_Stream::generate_compressed_packet() { +#define MAX_COMPRESSED_PACKET_SIZE 10*1024*1024 + unsigned int total_size=0; + unsigned int i=0; + PtrSize_t *p=NULL; + while (ilen && total_sizeindex(i); + total_size+=p->size; + i++; + } + if (i>=2) { + // we successfully read at least 2 packets + if (total_size>MAX_COMPRESSED_PACKET_SIZE) { + // total_size is too big, we remove the last packet read + total_size-=p->size; + } + } + uLong sourceLen=total_size; + Bytef *source=(Bytef *)l_alloc(total_size); + uLongf destLen=total_size*120/100+12; + Bytef *dest=(Bytef *)malloc(destLen); + i=0; + total_size=0; + while (total_sizeindex(i); + PtrSize_t p2; + PSarrayOUT->remove_index(0,&p2); + memcpy(source+total_size,p2.ptr,p2.size); + //i++; + total_size+=p2.size; + l_free(p2.size,p2.ptr); + } + //PSarrayOUT->remove_index_range(0,i); + int rc=compress(dest, &destLen, source, sourceLen); + assert(rc==Z_OK); + l_free(total_size, source); + queueOUT.pkt.size=destLen+7; + queueOUT.pkt.ptr=l_alloc(queueOUT.pkt.size); + mysql_hdr hdr; + hdr.pkt_length=destLen; + //hdr.pkt_id=++myconn->compression_pkt_id; + hdr.pkt_id=1; + memcpy((unsigned char *)queueOUT.pkt.ptr,&hdr,sizeof(mysql_hdr)); + hdr.pkt_length=total_size; + memcpy((unsigned char *)queueOUT.pkt.ptr+4,&hdr,3); + memcpy((unsigned char *)queueOUT.pkt.ptr+7,dest,destLen); + free(dest); +} + int MySQL_Data_Stream::array2buffer() { int ret=0; @@ -428,7 +542,26 @@ int MySQL_Data_Stream::array2buffer() { l_free(queueOUT.pkt.size,queueOUT.pkt.ptr); queueOUT.pkt.ptr=NULL; } - PSarrayOUT->remove_index(0,&queueOUT.pkt); + if (myconn->get_status_compression()==true) { + proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "DataStream: %p -- Compression enabled\n", this); + generate_compressed_packet(); // it is copied directly into queueOUT.pkt + } else { + PSarrayOUT->remove_index(0,&queueOUT.pkt); + // this is a special case, needed because compression is enabled *after* the first OK + if (DSS==STATE_CLIENT_AUTH_OK) { + DSS=STATE_SLEEP; + // enable compression + if (myconn->options.server_capabilities & CLIENT_COMPRESS) { + if (myconn->options.compression_min_length) { + myconn->set_status_compression(true); + } + } else { + //explicitly disable compression + myconn->options.compression_min_length=0; + myconn->set_status_compression(false); + } + } + } //memcpy(&queueOUT.pkt,PSarrayOUT->index(idx),sizeof(PtrSize_t)); #ifdef DEBUG { __dump_pkt(__func__,(unsigned char *)queueOUT.pkt.ptr,queueOUT.pkt.size); } @@ -548,6 +681,8 @@ int MySQL_Data_Stream::myds_connect(char *address, int connect_port, int *pendin if (connect_port) { rc=connect(s, (struct sockaddr *) &a, sizeof(a)); + int arg_on=1; + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char *) &arg_on, sizeof(int)); } else { rc=connect(s, (struct sockaddr *) &u, len); }