From 0b34782931e155049c74a8ca4a2e4230542f3787 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 10 Dec 2020 20:17:58 +0100 Subject: [PATCH] Adding more TAP test for prepared statements --- test/tap/tests/test_ps_async-t.cpp | 427 ++++++++++++++++++++++++++ test/tap/tests/test_ps_no_store-t.cpp | 226 ++++++++++++++ 2 files changed, 653 insertions(+) create mode 100644 test/tap/tests/test_ps_async-t.cpp create mode 100644 test/tap/tests/test_ps_no_store-t.cpp diff --git a/test/tap/tests/test_ps_async-t.cpp b/test/tap/tests/test_ps_async-t.cpp new file mode 100644 index 000000000..a51d01925 --- /dev/null +++ b/test/tap/tests/test_ps_async-t.cpp @@ -0,0 +1,427 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +#include +#include +#include + +typedef int myf; // Type of MyFlags in my_funcs +#define MYF(v) (myf) (v) +#define MY_KEEP_PREALLOC 1 +#define MY_ALIGN(A,L) (((A) + (L) - 1) & ~((L) - 1)) +#define ALIGN_SIZE(A) MY_ALIGN((A),sizeof(double)) +void ma_free_root(MA_MEM_ROOT *root, myf MyFLAGS); +void *ma_alloc_root(MA_MEM_ROOT *mem_root, size_t Size); +//void ma_free_root(MA_MEM_ROOT *, int); +//#include "ma_global.h" +//#include "ma_sys.h" +#define MAX(a,b) (((a) > (b)) ? (a) : (b)) + + +void * ma_alloc_root(MA_MEM_ROOT *mem_root, size_t Size) +{ +#if defined(HAVE_purify) && defined(EXTRA_DEBUG) + MA_USED_MEM *next; + Size+=ALIGN_SIZE(sizeof(MA_USED_MEM)); + + if (!(next = (MA_USED_MEM*) malloc(Size))) + { + if (mem_root->error_handler) + (*mem_root->error_handler)(); + return((void *) 0); /* purecov: inspected */ + } + next->next=mem_root->used; + mem_root->used=next; + return (void *) (((char*) next)+ALIGN_SIZE(sizeof(MA_USED_MEM))); +#else + size_t get_size; + void * point; + MA_USED_MEM *next= 0; + MA_USED_MEM **prev; + + Size= ALIGN_SIZE(Size); + + if ((*(prev= &mem_root->free))) + { + if ((*prev)->left < Size && + mem_root->first_block_usage++ >= 16 && + (*prev)->left < 4096) + { + next= *prev; + *prev= next->next; + next->next= mem_root->used; + mem_root->used= next; + mem_root->first_block_usage= 0; + } + for (next= *prev; next && next->left < Size; next= next->next) + prev= &next->next; + } + if (! next) + { /* Time to alloc new block */ + get_size= MAX(Size+ALIGN_SIZE(sizeof(MA_USED_MEM)), + (mem_root->block_size & ~1) * ( (mem_root->block_num >> 2) < 4 ? 4 : (mem_root->block_num >> 2) ) ); + + if (!(next = (MA_USED_MEM*) malloc(get_size))) + { + if (mem_root->error_handler) + (*mem_root->error_handler)(); + return((void *) 0); /* purecov: inspected */ + } + mem_root->block_num++; + next->next= *prev; + next->size= get_size; + next->left= get_size-ALIGN_SIZE(sizeof(MA_USED_MEM)); + *prev=next; + } + point= (void *) ((char*) next+ (next->size-next->left)); + if ((next->left-= Size) < mem_root->min_malloc) + { /* Full block */ + *prev=next->next; /* Remove block from list */ + next->next=mem_root->used; + mem_root->used=next; + mem_root->first_block_usage= 0; + } + return(point); +#endif +} + + +void ma_free_root(MA_MEM_ROOT *root, myf MyFlags) +{ + MA_USED_MEM *next,*old; + + if (!root) + return; /* purecov: inspected */ + if (!(MyFlags & MY_KEEP_PREALLOC)) + root->pre_alloc=0; + + for ( next=root->used; next ;) + { + old=next; next= next->next ; + if (old != root->pre_alloc) + free(old); + } + for (next= root->free ; next ; ) + { + old=next; next= next->next ; + if (old != root->pre_alloc) + free(old); + } + root->used=root->free=0; + if (root->pre_alloc) + { + root->free=root->pre_alloc; + root->free->left=root->pre_alloc->size-ALIGN_SIZE(sizeof(MA_USED_MEM)); + root->free->next=0; + } +} + + +/* Helper function to do the waiting for events on the socket. */ +static int wait_for_mysql(MYSQL *mysql, int status) { + struct pollfd pfd; + int timeout, res; + + pfd.fd = mysql_get_socket(mysql); + pfd.events = + (status & MYSQL_WAIT_READ ? POLLIN : 0) | + (status & MYSQL_WAIT_WRITE ? POLLOUT : 0) | + (status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0); + if (status & MYSQL_WAIT_TIMEOUT) + timeout = 1000*mysql_get_timeout_value(mysql); + else + timeout = -1; + res = poll(&pfd, 1, timeout); + if (res == 0) + return MYSQL_WAIT_TIMEOUT; + else if (res < 0) + return MYSQL_WAIT_TIMEOUT; + else { + int status = 0; + if (pfd.revents & POLLIN) status |= MYSQL_WAIT_READ; + if (pfd.revents & POLLOUT) status |= MYSQL_WAIT_WRITE; + if (pfd.revents & POLLPRI) status |= MYSQL_WAIT_EXCEPT; + return status; + } +} + + +const int NUM_ROWS=100; +const int NUM_ROWS_READ=1000; +const int NLOOPS=1; + +int select_config_file(MYSQL* mysql, std::string& resultset) { + if (mysql_query(mysql, "select config file")) { + fprintf(stderr, "File %s, line %d, Error: %s\n", + __FILE__, __LINE__, mysql_error(mysql)); + return exit_status(); + } + + MYSQL_RES *result; + MYSQL_ROW row; + result = mysql_store_result(mysql); + if (result) { + row = mysql_fetch_row(result); + resultset = row[0]; + mysql_free_result(result); + } else { + fprintf(stderr, "error\n"); + } + +} + +int restore_admin(MYSQL* mysqladmin) { + MYSQL_QUERY(mysqladmin, "load mysql query rules from disk"); + MYSQL_QUERY(mysqladmin, "load mysql query rules to runtime"); + MYSQL_QUERY(mysqladmin, "load mysql servers from disk"); + MYSQL_QUERY(mysqladmin, "load mysql servers to runtime"); +} + +int main(int argc, char** argv) { + CommandLine cl; + + if(cl.getEnv()) + return exit_status(); + + plan(2*NLOOPS); + diag("Testing PS async store result"); + + MYSQL* mysqladmin = mysql_init(NULL); + if (!mysqladmin) + return exit_status(); + + if (!mysql_real_connect(mysqladmin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", + __FILE__, __LINE__, mysql_error(mysqladmin)); + return exit_status(); + } + + MYSQL* mysql = mysql_init(NULL); + if (!mysql) + return exit_status(); + // configure the connection as not blocking + diag("Setting mysql connection non blocking"); + mysql_options(mysql, MYSQL_OPT_NONBLOCK, 0); + if (!mysql_real_connect(mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", + __FILE__, __LINE__, mysql_error(mysql)); + return exit_status(); + } + + // we drastically reduce the receive buffer to make sure that + // mysql_stmt_store_result_[start|continue] doesn't complete + // in a single call + int s = mysql_get_socket(mysql); + int rcvbuf = 10240; + + diag("Setting mysql connection RCVBUF to %d bytes", rcvbuf); + if(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) < 0) { + fprintf(stderr, "Failed to call setsockopt\n"); + return exit_status(); + } + + MYSQL_QUERY(mysqladmin, "delete from mysql_query_rules"); + MYSQL_QUERY(mysqladmin, "load mysql query rules to runtime"); + + MYSQL_QUERY(mysqladmin, "delete from mysql_servers where hostgroup_id=1"); + MYSQL_QUERY(mysqladmin, "load mysql servers to runtime"); + + MYSQL_QUERY(mysql, "drop database if exists test"); + MYSQL_QUERY(mysql, "create database if not exists test"); + MYSQL_QUERY(mysql, "DROP TABLE IF EXISTS test.sbtest1"); + MYSQL_QUERY(mysql, "CREATE TABLE if not exists test.sbtest1 (`id` int(10) unsigned NOT NULL AUTO_INCREMENT, `k` int(10) unsigned NOT NULL DEFAULT '0', `c` char(120) NOT NULL DEFAULT '', `pad` char(60) NOT NULL DEFAULT '', PRIMARY KEY (`id`), KEY `k_1` (`k`))"); + + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0.0, 9.0); + + std::stringstream q; + q << "INSERT INTO test.sbtest1 (k, c, pad) values "; + bool put_comma = false; + for (int i=0; iresult.data; + int rows_read_inner = 0; + if (r) { + //rows_read++; + rows_read_inner++; + MYSQL_ROWS *pr = r; + while(rows_read_inner <= stmt2a->result.rows && r->next) { + // it is very important to check rows_read_inner FIRST + // because r->next could point to an invalid memory + rows_read_inner++; + pr = r; + r = r->next; + //rows_read++; + } + diag("Rows in buffer after calling mysql_stmt_store_result_cont(): %d", rows_read_inner); + // we now clean up the whole storage. + // This is the real POC + if (rows_read_inner > 1) { + // there is more than 1 row + int irs = 0; + MYSQL_ROWS *ir=stmt2a->result.data; + // see https://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html + // on why we have an offset of 3 + const int row_offset = 3; + for (irs = 0; irs < stmt2a->result.rows -1 ; irs++) { + int id1, id2, id3; + memcpy(&id1, (char *)ir->data+row_offset, sizeof(int)); + memcpy(&id2, (char *)ir->data+row_offset+sizeof(int), sizeof(int)); + memcpy(&id3, (char *)ir->data+row_offset+sizeof(int)*2, sizeof(int)); + //diag("Row: %d + %d = %d", id1, id2, id3); + assert(id3==id1+id2); + ir = ir->next; + rows_read++; + } + // at this point, ir points to the last row + // next, we create a new MYSQL_ROWS that is a copy of the last row + MYSQL_ROWS *lcopy = (MYSQL_ROWS *)malloc(sizeof(MYSQL_ROWS) + ir->length); + lcopy->length = ir->length; + lcopy->data= (MYSQL_ROW)(lcopy + 1); + memcpy((char *)lcopy->data, (char *)ir->data, ir->length); + // next we proceed to reset all the buffer + stmt2a->result.rows = 0; + ma_free_root(&stmt2a->result.alloc, MYF(MY_KEEP_PREALLOC)); + //ma_free_root(&stmt2a->result.alloc, MYF(0)); + stmt2a->result.data= NULL; + stmt2a->result_cursor= NULL; + // we will now copy back the last row and make it the only row available + MYSQL_ROWS *current = (MYSQL_ROWS *)ma_alloc_root(&stmt2a->result.alloc, sizeof(MYSQL_ROWS) + lcopy->length); + current->data= (MYSQL_ROW)(current + 1); + MYSQL_ROWS **pprevious = &stmt2a->result.data; + //current->next = NULL; + *pprevious= current; + pprevious= ¤t->next; + memcpy((char *)current->data, (char *)lcopy->data, lcopy->length); + // we free the copy + free(lcopy); + // change the rows count to 1 + stmt2a->result.rows = 1; + // we should also configure the cursor, but because we scan it using our own + // algorithm, this is not needed + } + + } + } + diag("mysql_stmt_store_result_cont called %d times", cont_cnt); + + + int stmt2aRC = 0; + { + MYSQL_ROWS *r=stmt2a->result.data; + int rows_left = stmt2a->result.rows; + int rows_read_inner = 0; + if (r && rows_left) { + row_count2a++; + rows_read_inner++; + while(rows_read_inner < stmt2a->result.rows && r->next) { + rows_read_inner++; + r = r->next; + row_count2a++; + } + } + ok(row_count2a+rows_read==NUM_ROWS_READ, "Fetched %d rows, expected %d. Details: %d rows processed while buffering, %d at the end", row_count2a+rows_read, NUM_ROWS_READ , rows_read, row_count2a); + } + + if (prepare_meta_result) { + mysql_free_result(prepare_meta_result); + } + + if (mysql_stmt_close(stmt2a)) + { + fprintf(stderr, " failed while closing the statement\n"); + ok(false, " %s\n", mysql_error(mysql)); + restore_admin(mysqladmin); + return exit_status(); + } + } + + restore_admin(mysqladmin); + + mysql_close(mysql); + mysql_library_end(); + + return exit_status(); +} + diff --git a/test/tap/tests/test_ps_no_store-t.cpp b/test/tap/tests/test_ps_no_store-t.cpp new file mode 100644 index 000000000..5b0ad9165 --- /dev/null +++ b/test/tap/tests/test_ps_no_store-t.cpp @@ -0,0 +1,226 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +const int NUM_ROWS=5; + +int select_config_file(MYSQL* mysql, std::string& resultset) { + if (mysql_query(mysql, "select config file")) { + fprintf(stderr, "File %s, line %d, Error: %s\n", + __FILE__, __LINE__, mysql_error(mysql)); + return exit_status(); + } + + MYSQL_RES *result; + MYSQL_ROW row; + result = mysql_store_result(mysql); + if (result) { + row = mysql_fetch_row(result); + resultset = row[0]; + mysql_free_result(result); + } else { + fprintf(stderr, "error\n"); + } + +} + +int restore_admin(MYSQL* mysqladmin) { + MYSQL_QUERY(mysqladmin, "load mysql query rules from disk"); + MYSQL_QUERY(mysqladmin, "load mysql query rules to runtime"); + MYSQL_QUERY(mysqladmin, "load mysql servers from disk"); + MYSQL_QUERY(mysqladmin, "load mysql servers to runtime"); +} + +int main(int argc, char** argv) { + CommandLine cl; + + if(cl.getEnv()) + return exit_status(); + + plan(4); + diag("Testing PS large resultset"); + + MYSQL* mysqladmin = mysql_init(NULL); + if (!mysqladmin) + return exit_status(); + + if (!mysql_real_connect(mysqladmin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", + __FILE__, __LINE__, mysql_error(mysqladmin)); + return exit_status(); + } + + MYSQL* mysql = mysql_init(NULL); + if (!mysql) + return exit_status(); + + if (!mysql_real_connect(mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", + __FILE__, __LINE__, mysql_error(mysql)); + return exit_status(); + } + + MYSQL_QUERY(mysqladmin, "delete from mysql_query_rules"); + MYSQL_QUERY(mysqladmin, "load mysql query rules to runtime"); + + MYSQL_QUERY(mysqladmin, "delete from mysql_servers where hostgroup_id=1"); + MYSQL_QUERY(mysqladmin, "load mysql servers to runtime"); + + MYSQL_QUERY(mysql, "drop database if exists test"); + MYSQL_QUERY(mysql, "create database if not exists test"); + MYSQL_QUERY(mysql, "create table if not exists test.t (i int)"); + MYSQL_QUERY(mysql, "CREATE TABLE if not exists test.sbtest1 (`id` int(10) unsigned NOT NULL AUTO_INCREMENT, `k` int(10) unsigned NOT NULL DEFAULT '0', `c` char(120) NOT NULL DEFAULT '', `pad` char(60) NOT NULL DEFAULT '', PRIMARY KEY (`id`), KEY `k_1` (`k`))"); + + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0.0, 9.0); + + std::stringstream q; + q << "INSERT INTO test.sbtest1 (k, c, pad) values "; + bool put_comma = false; + for (int i=0; i