From cfb11144d6ec494f06c2cd28394fa657e87785c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 17 Dec 2020 12:41:31 +0100 Subject: [PATCH] Partial commit to buffer resultsets in Prepared Statements --- lib/mysql_connection.cpp | 211 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 210 insertions(+), 1 deletion(-) diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 7046ccec5..9342d1c01 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -12,6 +12,100 @@ #include + +// some of the code that follows is from mariadb client library memory allocator +typedef int myf; // Type of MyFlags in my_funcs +#define MYF(v) (myf) (v) +#define MY_KEEP_PREALLOC 1 +#define MY_ALIGN(A,L) (((A) + (L) - 1) & ~((L) - 1)) +#define ALIGN_SIZE(A) MY_ALIGN((A),sizeof(double)) +void ma_free_root(MA_MEM_ROOT *root, myf MyFLAGS); +void *ma_alloc_root(MA_MEM_ROOT *mem_root, size_t Size); +#define MAX(a,b) (((a) > (b)) ? (a) : (b)) + +void * ma_alloc_root(MA_MEM_ROOT *mem_root, size_t Size) +{ + size_t get_size; + void * point; + MA_USED_MEM *next= 0; + MA_USED_MEM **prev; + + Size= ALIGN_SIZE(Size); + + if ((*(prev= &mem_root->free))) + { + if ((*prev)->left < Size && + mem_root->first_block_usage++ >= 16 && + (*prev)->left < 4096) + { + next= *prev; + *prev= next->next; + next->next= mem_root->used; + mem_root->used= next; + mem_root->first_block_usage= 0; + } + for (next= *prev; next && next->left < Size; next= next->next) + prev= &next->next; + } + if (! next) + { /* Time to alloc new block */ + get_size= MAX(Size+ALIGN_SIZE(sizeof(MA_USED_MEM)), + (mem_root->block_size & ~1) * ( (mem_root->block_num >> 2) < 4 ? 4 : (mem_root->block_num >> 2) ) ); + + if (!(next = (MA_USED_MEM*) malloc(get_size))) + { + if (mem_root->error_handler) + (*mem_root->error_handler)(); + return((void *) 0); /* purecov: inspected */ + } + mem_root->block_num++; + next->next= *prev; + next->size= get_size; + next->left= get_size-ALIGN_SIZE(sizeof(MA_USED_MEM)); + *prev=next; + } + point= (void *) ((char*) next+ (next->size-next->left)); + if ((next->left-= Size) < mem_root->min_malloc) + { /* Full block */ + *prev=next->next; /* Remove block from list */ + next->next=mem_root->used; + mem_root->used=next; + mem_root->first_block_usage= 0; + } + return(point); +} + + +void ma_free_root(MA_MEM_ROOT *root, myf MyFlags) +{ + MA_USED_MEM *next,*old; + + if (!root) + return; /* purecov: inspected */ + if (!(MyFlags & MY_KEEP_PREALLOC)) + root->pre_alloc=0; + + for ( next=root->used; next ;) + { + old=next; next= next->next ; + if (old != root->pre_alloc) + free(old); + } + for (next= root->free ; next ; ) + { + old=next; next= next->next ; + if (old != root->pre_alloc) + free(old); + } + root->used=root->free=0; + if (root->pre_alloc) + { + root->free=root->pre_alloc; + root->free->left=root->pre_alloc->size-ALIGN_SIZE(sizeof(MA_USED_MEM)); + root->free->next=0; + } +} + extern const MARIADB_CHARSET_INFO * proxysql_find_charset_nr(unsigned int nr); MARIADB_CHARSET_INFO * proxysql_find_charset_name(const char *name); @@ -1065,11 +1159,75 @@ handler_again: } break; case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT: + { // this copied mostly from ASYNC_USE_RESULT_CONT + if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) { + unsigned int buffered_data=0; + buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; + buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; + if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*8) { + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause . See #1232 + break; + } + } + } stmt_execute_store_result_cont(event); + //if (async_fetch_row_start==false) { + // async_fetch_row_start=true; + //} if (async_exit_status) { next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); + //} else { + // NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + //} } else { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + // this copied mostly from ASYNC_USE_RESULT_CONT + //async_fetch_row_start=false; + unsigned int br=0; + MYSQL_ROWS *r=query.stmt->result.data; + int rows_read_inner = 0; + + if (r) { + rows_read_inner++; + while(rows_read_inner <= query.stmt->result.rows && r->next) { + // it is very important to check rows_read_inner FIRST + // because r->next could point to an invalid memory + rows_read_inner++; + r = r->next; + } + if (rows_read_inner > 1) { + } + } + + if (mysql_row) { + unsigned int br=MyRS->add_row(mysql_row); + __sync_fetch_and_add(&parent->bytes_recv,br); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv]+=br; + myds->bytes_info.bytes_recv += br; + bytes_info.bytes_recv += br; + processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event + if ( + (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8) + || + ( mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) ) + ) { + next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause + } else { + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping + } + } else { + if (mysql) { + int _myerrno=mysql_errno(mysql); + if (_myerrno) { + if (myds) { + MyRS->add_err(myds); + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + } + } + // we reach here if there was no error + MyRS->add_eof(); + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } } break; case ASYNC_STMT_EXECUTE_END: @@ -1395,6 +1553,57 @@ handler_again: return async_state_machine; } +process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT + // there is more than 1 row + unsigned long long total_size=0; + int irs = 0; + MYSQL_ROWS *ir = query.stmt->result.data; + for (irs = 0; irs < query.stmt->result.rows -1 ; irs++) { + // while iterating the rows we also count the bytes + total_size+=ir->length; + if (ir->length > 0xFFFFFF) { + total_size+=(ir->length / 0xFFFFFF) * sizeof(mysql_hdr); + } + total_size+=sizeof(mysql_hdr); + + // TODO: here we need to copy the rows in the MyRS + // before that we need to prepare MyRS + // without the two above, the final result set will only have few rows (possible only one) + + ir = ir->next; + //rows_read++; + } + // at this point, ir points to the last row + // next, we create a new MYSQL_ROWS that is a copy of the last row + MYSQL_ROWS *lcopy = (MYSQL_ROWS *)malloc(sizeof(MYSQL_ROWS) + ir->length); + lcopy->length = ir->length; + lcopy->data= (MYSQL_ROW)(lcopy + 1); + memcpy((char *)lcopy->data, (char *)ir->data, ir->length); + // next we proceed to reset all the buffer + query.stmt->result.rows = 0; + ma_free_root(&query.stmt->result.alloc, MYF(MY_KEEP_PREALLOC)); + query.stmt->result.data= NULL; + query.stmt->result_cursor= NULL; + // we will now copy back the last row and make it the only row available + MYSQL_ROWS *current = (MYSQL_ROWS *)ma_alloc_root(&query.stmt->result.alloc, sizeof(MYSQL_ROWS) + lcopy->length); + current->data= (MYSQL_ROW)(current + 1); + MYSQL_ROWS **pprevious = &query.stmt->result.data; + //current->next = NULL; + *pprevious= current; + pprevious= ¤t->next; + memcpy((char *)current->data, (char *)lcopy->data, lcopy->length); + // we free the copy + free(lcopy); + // change the rows count to 1 + query.stmt->result.rows = 1; + // we should also configure the cursor, but because we scan it using our own + // algorithm, this is not needed + + // now we update bytes counter + __sync_fetch_and_add(&parent->bytes_recv,total_size); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv]+=total_size; + myds->bytes_info.bytes_recv += total_size; + bytes_info.bytes_recv += total_size; void MySQL_Connection::next_event(MDB_ASYNC_ST new_st) { #ifdef DEBUG