diff --git a/include/MySQL_Protocol.h b/include/MySQL_Protocol.h index 9bff838b7..f02d9ae39 100644 --- a/include/MySQL_Protocol.h +++ b/include/MySQL_Protocol.h @@ -23,6 +23,9 @@ class MySQL_ResultSet { unsigned int add_row(MYSQL_ROW row); void add_eof(); bool get_resultset(PtrSizeArray *PSarrayFinal); + unsigned char *buffer; + unsigned int buffer_used; + void buffer_to_PSarrayOut(); }; @@ -79,6 +82,7 @@ class MySQL_Protocol { bool generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue); bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt); uint8_t generate_pkt_row2(PtrSizeArray *PSarrayOut, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt); + uint8_t generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt); // bool generate_pkt_initial_handshake(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len); bool generate_pkt_initial_handshake(bool send, void **ptr, unsigned int *len, uint32_t *thread_id); // bool generate_statistics_response(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len); diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index d23d3877a..a2f47c94a 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -8,6 +8,9 @@ extern MySQL_Threads_Handler *GloMTH; #undef max_allowed_packet #endif +#define ISSUE486 +#define RESULTSET_BUFLEN 16300 + #ifdef DEBUG static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) { @@ -781,6 +784,91 @@ uint8_t MySQL_Protocol::generate_pkt_row2(PtrSizeArray *PSarrayOut, unsigned int return pkt_sid; } +uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) { + int col=0; + unsigned int rowlen=0; + uint8_t pkt_sid=sequence_id; + for (col=0; colbuffer_used) ) { + // there is space in the buffer, add the data to it + pkt.ptr = myrs->buffer + myrs->buffer_used; + myrs->buffer_used += pkt.size; + } else { + // there is no space in the buffer, we flush the buffer and recreate it + myrs->buffer_to_PSarrayOut(); + // now we can check again if there is space in the buffer + if ( pkt.size<=(RESULTSET_BUFLEN-myrs->buffer_used) ) { + // there is space in the NEW buffer, add the data to it + pkt.ptr = myrs->buffer + myrs->buffer_used; + myrs->buffer_used += pkt.size; + } else { + // a new buffer is not enough to store the new row + pkt.ptr=l_alloc(pkt.size); + } + } + int l=sizeof(mysql_hdr); + for (col=0; col= myrs->buffer && pkt.ptr < myrs->buffer+RESULTSET_BUFLEN) { + // we are writing within the buffer, do not add to PSarrayOUT + } else { + // we are writing outside the buffer, add to PSarrayOUT + myrs->PSarrayOUT->add(pkt.ptr,pkt.size); + } + } else { + unsigned int left=pkt.size; + unsigned int copied=0; + while (left>=(0xFFFFFF+sizeof(mysql_hdr))) { + PtrSize_t pkt2; + pkt2.size=0xFFFFFF+sizeof(mysql_hdr); + pkt2.ptr=l_alloc(pkt2.size); + memcpy((char *)pkt2.ptr+sizeof(mysql_hdr), (char *)pkt.ptr+sizeof(mysql_hdr)+copied, 0xFFFFFF); + mysql_hdr myhdr; + myhdr.pkt_id=pkt_sid; + pkt_sid++; + myhdr.pkt_length=0xFFFFFF; + memcpy(pkt2.ptr, &myhdr, sizeof(mysql_hdr)); + // we are writing a large packet (over 16MB), we assume we are always outside the buffer + myrs->PSarrayOUT->add(pkt2.ptr,pkt2.size); + copied+=0xFFFFFF; + left-=0xFFFFFF; + } + PtrSize_t pkt2; + pkt2.size=left; + pkt2.ptr=l_alloc(pkt2.size); + memcpy((char *)pkt2.ptr+sizeof(mysql_hdr), (char *)pkt.ptr+sizeof(mysql_hdr)+copied, left-sizeof(mysql_hdr)); + mysql_hdr myhdr; + myhdr.pkt_id=pkt_sid; + myhdr.pkt_length=left-sizeof(mysql_hdr); + memcpy(pkt2.ptr, &myhdr, sizeof(mysql_hdr)); + // we are writing a large packet (over 16MB), we assume we are always outside the buffer + myrs->PSarrayOUT->add(pkt2.ptr,pkt2.size); + } + if (len) { *len=pkt.size+(pkt_sid-sequence_id)*sizeof(mysql_hdr); } + if (pkt.size >= (0xFFFFFF+sizeof(mysql_hdr))) { + l_free(pkt.size,pkt.ptr); + } + return pkt_sid; +} + bool MySQL_Protocol::generate_pkt_auth_switch_request(bool send, void **ptr, unsigned int *len) { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 7, "Generating auth switch request pkt\n"); mysql_hdr myhdr; @@ -1226,6 +1314,8 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL resultset_completed=false; myprot=_myprot; mysql=_my; + buffer=(unsigned char *)malloc(RESULTSET_BUFLEN); + buffer_used=0; myds=myprot->get_myds(); sid=myds->pkt_sid+1; PSarrayOUT = new PtrSizeArray(); @@ -1269,12 +1359,20 @@ MySQL_ResultSet::~MySQL_ResultSet() { } delete PSarrayOUT; } + if (buffer) { + free(buffer); + buffer=NULL; + } } unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) { unsigned long *lengths=mysql_fetch_lengths(result); unsigned int pkt_length; +#ifdef ISSUE486 + sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row); +#else sid=myprot->generate_pkt_row2(PSarrayOUT, &pkt_length, sid, num_fields, lengths, row); +#endif /* ISSUE486 */ sid++; resultset_size+=pkt_length; num_rows++; @@ -1283,6 +1381,7 @@ unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) { void MySQL_ResultSet::add_eof() { PtrSize_t pkt; + buffer_to_PSarrayOut(); unsigned int nTrx=myds->sess->NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; @@ -1300,3 +1399,11 @@ bool MySQL_ResultSet::get_resultset(PtrSizeArray *PSarrayFinal) { PSarrayOUT->remove_index(PSarrayOUT->len-1,NULL); return resultset_completed; } + +void MySQL_ResultSet::buffer_to_PSarrayOut() { + if (buffer_used==0) + return; // exit immediately if the buffer is empty + PSarrayOUT->add(buffer,buffer_used); + buffer=(unsigned char *)malloc(RESULTSET_BUFLEN); + buffer_used=0; +} diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 29f5e7713..befdc56ec 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -289,7 +289,9 @@ int MySQL_Threads_Handler::listener_add(const char *iface) { unsigned int i; for (i=0;imypolls.pending_listener_add,0,rc)); + while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,rc)) { + usleep(100); // pause a bit + } /* while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_change,0,1)) { cpu_relax_pa(); } while(__sync_fetch_and_add(&thr->mypolls.pending_listener_change,0)==1) { cpu_relax_pa(); }