Partial commit to buffer resultsets in Prepared Statements

pull/3210/head
René Cannaò 5 years ago
parent ba7b3aa1ed
commit cfb11144d6

@ -12,6 +12,100 @@
#include <atomic>
// some of the code that follows is from mariadb client library memory allocator
typedef int myf; // Type of MyFlags in my_funcs
#define MYF(v) (myf) (v)
#define MY_KEEP_PREALLOC 1
#define MY_ALIGN(A,L) (((A) + (L) - 1) & ~((L) - 1))
#define ALIGN_SIZE(A) MY_ALIGN((A),sizeof(double))
void ma_free_root(MA_MEM_ROOT *root, myf MyFLAGS);
void *ma_alloc_root(MA_MEM_ROOT *mem_root, size_t Size);
#define MAX(a,b) (((a) > (b)) ? (a) : (b))
void * ma_alloc_root(MA_MEM_ROOT *mem_root, size_t Size)
{
size_t get_size;
void * point;
MA_USED_MEM *next= 0;
MA_USED_MEM **prev;
Size= ALIGN_SIZE(Size);
if ((*(prev= &mem_root->free)))
{
if ((*prev)->left < Size &&
mem_root->first_block_usage++ >= 16 &&
(*prev)->left < 4096)
{
next= *prev;
*prev= next->next;
next->next= mem_root->used;
mem_root->used= next;
mem_root->first_block_usage= 0;
}
for (next= *prev; next && next->left < Size; next= next->next)
prev= &next->next;
}
if (! next)
{ /* Time to alloc new block */
get_size= MAX(Size+ALIGN_SIZE(sizeof(MA_USED_MEM)),
(mem_root->block_size & ~1) * ( (mem_root->block_num >> 2) < 4 ? 4 : (mem_root->block_num >> 2) ) );
if (!(next = (MA_USED_MEM*) malloc(get_size)))
{
if (mem_root->error_handler)
(*mem_root->error_handler)();
return((void *) 0); /* purecov: inspected */
}
mem_root->block_num++;
next->next= *prev;
next->size= get_size;
next->left= get_size-ALIGN_SIZE(sizeof(MA_USED_MEM));
*prev=next;
}
point= (void *) ((char*) next+ (next->size-next->left));
if ((next->left-= Size) < mem_root->min_malloc)
{ /* Full block */
*prev=next->next; /* Remove block from list */
next->next=mem_root->used;
mem_root->used=next;
mem_root->first_block_usage= 0;
}
return(point);
}
void ma_free_root(MA_MEM_ROOT *root, myf MyFlags)
{
MA_USED_MEM *next,*old;
if (!root)
return; /* purecov: inspected */
if (!(MyFlags & MY_KEEP_PREALLOC))
root->pre_alloc=0;
for ( next=root->used; next ;)
{
old=next; next= next->next ;
if (old != root->pre_alloc)
free(old);
}
for (next= root->free ; next ; )
{
old=next; next= next->next ;
if (old != root->pre_alloc)
free(old);
}
root->used=root->free=0;
if (root->pre_alloc)
{
root->free=root->pre_alloc;
root->free->left=root->pre_alloc->size-ALIGN_SIZE(sizeof(MA_USED_MEM));
root->free->next=0;
}
}
extern const MARIADB_CHARSET_INFO * proxysql_find_charset_nr(unsigned int nr);
MARIADB_CHARSET_INFO * proxysql_find_charset_name(const char *name);
@ -1065,11 +1159,75 @@ handler_again:
}
break;
case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT:
{ // this copied mostly from ASYNC_USE_RESULT_CONT
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*8) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause . See #1232
break;
}
}
}
stmt_execute_store_result_cont(event);
//if (async_fetch_row_start==false) {
// async_fetch_row_start=true;
//}
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT);
//} else {
// NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
//}
} else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
// this copied mostly from ASYNC_USE_RESULT_CONT
//async_fetch_row_start=false;
unsigned int br=0;
MYSQL_ROWS *r=query.stmt->result.data;
int rows_read_inner = 0;
if (r) {
rows_read_inner++;
while(rows_read_inner <= query.stmt->result.rows && r->next) {
// it is very important to check rows_read_inner FIRST
// because r->next could point to an invalid memory
rows_read_inner++;
r = r->next;
}
if (rows_read_inner > 1) {
}
}
if (mysql_row) {
unsigned int br=MyRS->add_row(mysql_row);
__sync_fetch_and_add(&parent->bytes_recv,br);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv]+=br;
myds->bytes_info.bytes_recv += br;
bytes_info.bytes_recv += br;
processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event
if (
(processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8)
||
( mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) )
) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause
} else {
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping
}
} else {
if (mysql) {
int _myerrno=mysql_errno(mysql);
if (_myerrno) {
if (myds) {
MyRS->add_err(myds);
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
}
}
// we reach here if there was no error
MyRS->add_eof();
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
}
break;
case ASYNC_STMT_EXECUTE_END:
@ -1395,6 +1553,57 @@ handler_again:
return async_state_machine;
}
process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT
// there is more than 1 row
unsigned long long total_size=0;
int irs = 0;
MYSQL_ROWS *ir = query.stmt->result.data;
for (irs = 0; irs < query.stmt->result.rows -1 ; irs++) {
// while iterating the rows we also count the bytes
total_size+=ir->length;
if (ir->length > 0xFFFFFF) {
total_size+=(ir->length / 0xFFFFFF) * sizeof(mysql_hdr);
}
total_size+=sizeof(mysql_hdr);
// TODO: here we need to copy the rows in the MyRS
// before that we need to prepare MyRS
// without the two above, the final result set will only have few rows (possible only one)
ir = ir->next;
//rows_read++;
}
// at this point, ir points to the last row
// next, we create a new MYSQL_ROWS that is a copy of the last row
MYSQL_ROWS *lcopy = (MYSQL_ROWS *)malloc(sizeof(MYSQL_ROWS) + ir->length);
lcopy->length = ir->length;
lcopy->data= (MYSQL_ROW)(lcopy + 1);
memcpy((char *)lcopy->data, (char *)ir->data, ir->length);
// next we proceed to reset all the buffer
query.stmt->result.rows = 0;
ma_free_root(&query.stmt->result.alloc, MYF(MY_KEEP_PREALLOC));
query.stmt->result.data= NULL;
query.stmt->result_cursor= NULL;
// we will now copy back the last row and make it the only row available
MYSQL_ROWS *current = (MYSQL_ROWS *)ma_alloc_root(&query.stmt->result.alloc, sizeof(MYSQL_ROWS) + lcopy->length);
current->data= (MYSQL_ROW)(current + 1);
MYSQL_ROWS **pprevious = &query.stmt->result.data;
//current->next = NULL;
*pprevious= current;
pprevious= &current->next;
memcpy((char *)current->data, (char *)lcopy->data, lcopy->length);
// we free the copy
free(lcopy);
// change the rows count to 1
query.stmt->result.rows = 1;
// we should also configure the cursor, but because we scan it using our own
// algorithm, this is not needed
// now we update bytes counter
__sync_fetch_and_add(&parent->bytes_recv,total_size);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv]+=total_size;
myds->bytes_info.bytes_recv += total_size;
bytes_info.bytes_recv += total_size;
void MySQL_Connection::next_event(MDB_ASYNC_ST new_st) {
#ifdef DEBUG

Loading…
Cancel
Save