Handle large packets from client

pull/317/head
René Cannaò 11 years ago
parent f5a9ca278e
commit 9ca3bdf7e6

@ -90,6 +90,9 @@ class MySQL_Data_Stream
bool net_failure;
PtrSize_t multi_pkt;
uint8_t pkt_sid;
struct {
char *ptr;
unsigned int size;

@ -112,6 +112,7 @@ enum mysql_data_stream_status {
STATE_CLIENT_AUTH_OK,
STATE_SSL_INIT,
STATE_SLEEP,
STATE_SLEEP_MULTI_PACKET,
STATE_CLIENT_COM_QUERY,
STATE_READY,
STATE_QUERY_SENT_DS,

@ -274,6 +274,7 @@ int MySQL_Session::handler() {
}
}
__get_pkts_from_client:
for (j=0; j<client_myds->PSarrayIN->len;) {
client_myds->PSarrayIN->remove_index(0,&pkt);
@ -304,7 +305,47 @@ int MySQL_Session::handler() {
break;
case WAITING_CLIENT_DATA:
if (pkt.size==(0xFFFFFF+sizeof(mysql_hdr))) {
// we are handling a multi-packet
switch (client_myds->DSS) {
case STATE_SLEEP:
client_myds->DSS=STATE_SLEEP_MULTI_PACKET;
break;
case STATE_SLEEP_MULTI_PACKET:
break;
default:
assert(0);
break;
}
}
switch (client_myds->DSS) {
case STATE_SLEEP_MULTI_PACKET:
if (client_myds->multi_pkt.ptr==NULL) {
// not initialized yet
client_myds->multi_pkt.ptr=pkt.ptr;
client_myds->multi_pkt.size=pkt.size;
} else {
PtrSize_t tmp_pkt;
tmp_pkt.ptr=client_myds->multi_pkt.ptr;
tmp_pkt.size=client_myds->multi_pkt.size;
client_myds->multi_pkt.size = pkt.size + tmp_pkt.size-sizeof(mysql_hdr);
client_myds->multi_pkt.ptr = l_alloc(client_myds->multi_pkt.size);
memcpy(client_myds->multi_pkt.ptr, tmp_pkt.ptr, tmp_pkt.size);
memcpy((char *)client_myds->multi_pkt.ptr + tmp_pkt.size , (char *)pkt.ptr+sizeof(mysql_hdr) , pkt.size-sizeof(mysql_hdr)); // the header is not copied
l_free(tmp_pkt.size , tmp_pkt.ptr);
}
if (pkt.size==(0xFFFFFF+sizeof(mysql_hdr))) { // there are more packets
goto __get_pkts_from_client;
} else {
// no more packets, move everything back to pkt and proceed
pkt.ptr=client_myds->multi_pkt.ptr;
pkt.size=client_myds->multi_pkt.size;
client_myds->multi_pkt.size=0;
client_myds->multi_pkt.ptr=NULL;
client_myds->DSS=STATE_SLEEP;
}
if (client_myds->DSS!=STATE_SLEEP) // if DSS==STATE_SLEEP , we continue
break;
case STATE_SLEEP:
command_counters->incr(thread->curtime/1000000);
if (transaction_persistent_hostgroup==-1) {
@ -889,7 +930,7 @@ handler_again:
switch (rc) {
case 0:
myds->myds_type=MYDS_BACKEND;
myds->DSS=STATE_READY;
myds->DSS=STATE_MARIADB_GENERIC;
status=WAITING_CLIENT_DATA;
st=previous_status.top();
previous_status.pop();
@ -2006,7 +2047,7 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MYSQL_RES *result,
assert(myprot);
MySQL_Data_Stream *myds=myprot->get_myds();
myds->DSS=STATE_QUERY_SENT_DS;
int sid=1;
uint8_t sid=client_myds->pkt_sid+1;
unsigned int num_fields=mysql_field_count(mysql);
unsigned int num_rows;
unsigned int pkt_length=0;

@ -126,6 +126,8 @@ MySQL_Data_Stream::MySQL_Data_Stream() {
ssl=NULL;
net_failure=false;
// ssl_ctx=NULL;
multi_pkt.ptr=NULL;
multi_pkt.size=0;
}
@ -193,6 +195,11 @@ MySQL_Data_Stream::~MySQL_Data_Stream() {
if (ssl) SSL_free(ssl);
// if (ssl_ctx) SSL_CTX_free(ssl_ctx);
}
if (multi_pkt.ptr) {
l_free(multi_pkt.size,multi_pkt.ptr);
multi_pkt.ptr=NULL;
multi_pkt.size=0;
}
}
@ -429,6 +436,7 @@ int MySQL_Data_Stream::buffer2array() {
if ((queueIN.pkt.size==0) && queue_data(queueIN)>=sizeof(mysql_hdr)) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Reading the header of a new packet\n");
memcpy(&queueIN.hdr,queue_r_ptr(queueIN),sizeof(mysql_hdr));
pkt_sid=queueIN.hdr.pkt_id;
queue_r(queueIN,sizeof(mysql_hdr));
queueIN.pkt.size=queueIN.hdr.pkt_length+sizeof(mysql_hdr);
queueIN.pkt.ptr=l_alloc(queueIN.pkt.size);

Loading…
Cancel
Save