Merge branch 'T88' into beta7

evhttp
René Cannaò 10 years ago
commit 01cf99f16d

@ -23,6 +23,9 @@ class MySQL_ResultSet {
unsigned int add_row(MYSQL_ROW row);
void add_eof();
bool get_resultset(PtrSizeArray *PSarrayFinal);
unsigned char *buffer;
unsigned int buffer_used;
void buffer_to_PSarrayOut();
};
@ -79,6 +82,7 @@ class MySQL_Protocol {
bool generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue);
bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt);
uint8_t generate_pkt_row2(PtrSizeArray *PSarrayOut, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt);
uint8_t generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt);
// bool generate_pkt_initial_handshake(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len);
bool generate_pkt_initial_handshake(bool send, void **ptr, unsigned int *len, uint32_t *thread_id);
// bool generate_statistics_response(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len);

@ -8,6 +8,9 @@ extern MySQL_Threads_Handler *GloMTH;
#undef max_allowed_packet
#endif
#define ISSUE486
#define RESULTSET_BUFLEN 16300
#ifdef DEBUG
static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) {
@ -781,6 +784,91 @@ uint8_t MySQL_Protocol::generate_pkt_row2(PtrSizeArray *PSarrayOut, unsigned int
return pkt_sid;
}
uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) {
int col=0;
unsigned int rowlen=0;
uint8_t pkt_sid=sequence_id;
for (col=0; col<colnums; col++) {
rowlen+=( fieldstxt[col] ? fieldslen[col]+mysql_encode_length(fieldslen[col],NULL) : 1 );
}
PtrSize_t pkt;
pkt.size=rowlen+sizeof(mysql_hdr);
if ( pkt.size<=(RESULTSET_BUFLEN-myrs->buffer_used) ) {
// there is space in the buffer, add the data to it
pkt.ptr = myrs->buffer + myrs->buffer_used;
myrs->buffer_used += pkt.size;
} else {
// there is no space in the buffer, we flush the buffer and recreate it
myrs->buffer_to_PSarrayOut();
// now we can check again if there is space in the buffer
if ( pkt.size<=(RESULTSET_BUFLEN-myrs->buffer_used) ) {
// there is space in the NEW buffer, add the data to it
pkt.ptr = myrs->buffer + myrs->buffer_used;
myrs->buffer_used += pkt.size;
} else {
// a new buffer is not enough to store the new row
pkt.ptr=l_alloc(pkt.size);
}
}
int l=sizeof(mysql_hdr);
for (col=0; col<colnums; col++) {
if (fieldstxt[col]) {
char length_prefix;
uint8_t length_len=mysql_encode_length(fieldslen[col], &length_prefix);
l+=write_encoded_length_and_string((unsigned char *)pkt.ptr+l,fieldslen[col],length_len, length_prefix, fieldstxt[col]);
} else {
char *_ptr=(char *)pkt.ptr;
_ptr[l]=0xfb;
l++;
}
}
if (pkt.size < (0xFFFFFF+sizeof(mysql_hdr))) {
mysql_hdr myhdr;
myhdr.pkt_id=pkt_sid;
myhdr.pkt_length=rowlen;
memcpy(pkt.ptr, &myhdr, sizeof(mysql_hdr));
if (pkt.ptr >= myrs->buffer && pkt.ptr < myrs->buffer+RESULTSET_BUFLEN) {
// we are writing within the buffer, do not add to PSarrayOUT
} else {
// we are writing outside the buffer, add to PSarrayOUT
myrs->PSarrayOUT->add(pkt.ptr,pkt.size);
}
} else {
unsigned int left=pkt.size;
unsigned int copied=0;
while (left>=(0xFFFFFF+sizeof(mysql_hdr))) {
PtrSize_t pkt2;
pkt2.size=0xFFFFFF+sizeof(mysql_hdr);
pkt2.ptr=l_alloc(pkt2.size);
memcpy((char *)pkt2.ptr+sizeof(mysql_hdr), (char *)pkt.ptr+sizeof(mysql_hdr)+copied, 0xFFFFFF);
mysql_hdr myhdr;
myhdr.pkt_id=pkt_sid;
pkt_sid++;
myhdr.pkt_length=0xFFFFFF;
memcpy(pkt2.ptr, &myhdr, sizeof(mysql_hdr));
// we are writing a large packet (over 16MB), we assume we are always outside the buffer
myrs->PSarrayOUT->add(pkt2.ptr,pkt2.size);
copied+=0xFFFFFF;
left-=0xFFFFFF;
}
PtrSize_t pkt2;
pkt2.size=left;
pkt2.ptr=l_alloc(pkt2.size);
memcpy((char *)pkt2.ptr+sizeof(mysql_hdr), (char *)pkt.ptr+sizeof(mysql_hdr)+copied, left-sizeof(mysql_hdr));
mysql_hdr myhdr;
myhdr.pkt_id=pkt_sid;
myhdr.pkt_length=left-sizeof(mysql_hdr);
memcpy(pkt2.ptr, &myhdr, sizeof(mysql_hdr));
// we are writing a large packet (over 16MB), we assume we are always outside the buffer
myrs->PSarrayOUT->add(pkt2.ptr,pkt2.size);
}
if (len) { *len=pkt.size+(pkt_sid-sequence_id)*sizeof(mysql_hdr); }
if (pkt.size >= (0xFFFFFF+sizeof(mysql_hdr))) {
l_free(pkt.size,pkt.ptr);
}
return pkt_sid;
}
bool MySQL_Protocol::generate_pkt_auth_switch_request(bool send, void **ptr, unsigned int *len) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Generating auth switch request pkt\n");
mysql_hdr myhdr;
@ -1226,6 +1314,8 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
resultset_completed=false;
myprot=_myprot;
mysql=_my;
buffer=(unsigned char *)malloc(RESULTSET_BUFLEN);
buffer_used=0;
myds=myprot->get_myds();
sid=myds->pkt_sid+1;
PSarrayOUT = new PtrSizeArray();
@ -1269,12 +1359,20 @@ MySQL_ResultSet::~MySQL_ResultSet() {
}
delete PSarrayOUT;
}
if (buffer) {
free(buffer);
buffer=NULL;
}
}
unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) {
unsigned long *lengths=mysql_fetch_lengths(result);
unsigned int pkt_length;
#ifdef ISSUE486
sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row);
#else
sid=myprot->generate_pkt_row2(PSarrayOUT, &pkt_length, sid, num_fields, lengths, row);
#endif /* ISSUE486 */
sid++;
resultset_size+=pkt_length;
num_rows++;
@ -1283,6 +1381,7 @@ unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) {
void MySQL_ResultSet::add_eof() {
PtrSize_t pkt;
buffer_to_PSarrayOut();
unsigned int nTrx=myds->sess->NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
@ -1300,3 +1399,11 @@ bool MySQL_ResultSet::get_resultset(PtrSizeArray *PSarrayFinal) {
PSarrayOUT->remove_index(PSarrayOUT->len-1,NULL);
return resultset_completed;
}
void MySQL_ResultSet::buffer_to_PSarrayOut() {
if (buffer_used==0)
return; // exit immediately if the buffer is empty
PSarrayOUT->add(buffer,buffer_used);
buffer=(unsigned char *)malloc(RESULTSET_BUFLEN);
buffer_used=0;
}

@ -289,7 +289,9 @@ int MySQL_Threads_Handler::listener_add(const char *iface) {
unsigned int i;
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,rc));
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,rc)) {
usleep(100); // pause a bit
}
/*
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_change,0,1)) { cpu_relax_pa(); }
while(__sync_fetch_and_add(&thr->mypolls.pending_listener_change,0)==1) { cpu_relax_pa(); }

Loading…
Cancel
Save