diff --git a/test/tap/tests/test_ps_async-t.cpp b/test/tap/tests/test_ps_async-t.cpp index 649c58bd2..c4e685f60 100644 --- a/test/tap/tests/test_ps_async-t.cpp +++ b/test/tap/tests/test_ps_async-t.cpp @@ -158,11 +158,6 @@ static int wait_for_mysql(MYSQL *mysql, int status) { } } - -const int NUM_ROWS=100; -const std::vector NUM_ROWS_READ { 1000, 1, 2 }; -const int NLOOPS = NUM_ROWS_READ.size(); - int select_config_file(MYSQL* mysql, std::string& resultset) { if (mysql_query(mysql, "select config file")) { fprintf(stderr, "File %s, line %d, Error: %s\n", @@ -195,13 +190,182 @@ int restore_admin(MYSQL* mysqladmin) { return 0; } +/** + * @brief POC of the implementation for 'ps_buffering' for ProxySQL, this function encapsulates + * the core logic that is implemented in ProxySQL. + * + * @param query The query to be prepared and executed in the supplied 'MYSQL_STMT' that needs to + * be initialized. + * @param mysql A 'MYSQL*' connection handle, already oppened. + * @param stmt2a A 'MYSQL_STMT*' already initialized. + * + * @return The number of rows readed from the 'MYSQL_STMT' after this has been executed, and + * 'mysql_stmt_store_result_cont' has finished returned all the rows for the query. + */ +int mysql_stmt_store_result_cont_poc(const std::string query, MYSQL* mysql, MYSQL_STMT* stmt2a, bool check_ids) { + if (mysql_stmt_prepare(stmt2a, query.c_str(), query.size())) { + fprintf(stderr, "Query error %s\n", mysql_error(mysql)); + mysql_library_end(); + + return -1; + } + + if (mysql_stmt_execute(stmt2a)) + { + fprintf(stderr, " mysql_stmt_execute(), failed\n"); + ok(false, " %s\n", mysql_stmt_error(stmt2a)); + + return -1; + } + + MYSQL_RES* prepare_meta_result = mysql_stmt_result_metadata(stmt2a); + + int async_exit_status; + int interr = 0; + int cont_cnt = 0; + int rows_read = 0; + int prev_id3 = 0; + + async_exit_status = mysql_stmt_store_result_start(&interr, stmt2a); + while (async_exit_status) { + async_exit_status = wait_for_mysql(mysql, async_exit_status); + async_exit_status = mysql_stmt_store_result_cont(&interr, stmt2a, async_exit_status); + cont_cnt++; + MYSQL_ROWS *r=stmt2a->result.data; + int rows_read_inner = 0; + if (r) { + rows_read_inner++; + MYSQL_ROWS *pr = r; + while(rows_read_inner < stmt2a->result.rows) { + rows_read_inner++; + pr = r; + r = r->next; + } + diag("Rows in buffer after calling mysql_stmt_store_result_cont(): %d", rows_read_inner); + // we now clean up the whole storage. + // This is the real POC + if (rows_read_inner > 1) { + // there is more than 1 row + int irs = 0; + MYSQL_ROWS *ir=stmt2a->result.data; + // see https://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html + // on why we have an offset of 3 + const int row_offset = 3; + for (irs = 0; irs < stmt2a->result.rows - 1; irs++) { + if (check_ids) { + int id1, id2, id3 = 0; + memcpy(&id1, (char *)ir->data+row_offset, sizeof(int)); + memcpy(&id2, (char *)ir->data+row_offset+sizeof(int), sizeof(int)); + memcpy(&id3, (char *)ir->data+row_offset+sizeof(int)*2, sizeof(int)); + + // NOTE: Uncomment to have further information on the rows being processed + // diag("Row: %d + %d = %d", id1, id2, id3); + + // We assert in case the ids doesn't match the expected one, ids should match this + // same pattern as it's the same performed in the 'SELECT'. + assert(id3==id1+id2); + + // Since 'ids' are created in a sequentially increasing fashion. We can ensure + // that no middle rows are missing by checking that in case 'id3' is bigger than + // the previous 'id3' (seq_rows_id). + // We ensure that no rows are eluded during counting by checking that all in case + // of 'id3' begin bigger than the previous 'id3' ('prev_id3'), it's bigger by + // exactly one, in case of being smaller, we reset the previous 'id3' since + // a new sequence of row fetching has started. + if (irs != 0 && id3 > prev_id3) { + assert(id3 == prev_id3 + 1); + } + // Update 'prev_id3' with current 'id3' + prev_id3 = id3; + } + + rows_read++; + if (irs <= stmt2a->result.rows - 2) { + ir = ir->next; + } + } + + // at this point, ir points to the last row + // next, we create a new MYSQL_ROWS that is a copy of the last row + MYSQL_ROWS *lcopy = (MYSQL_ROWS *)malloc(sizeof(MYSQL_ROWS) + ir->length); + lcopy->length = ir->length; + lcopy->data= (MYSQL_ROW)(lcopy + 1); + memcpy((char *)lcopy->data, (char *)ir->data, ir->length); + // next we proceed to reset all the buffer + ma_free_root(&stmt2a->result.alloc, MYF(MY_KEEP_PREALLOC)); + + // NOTE: Left for testing purposes + // ma_free_root(&stmt2a->result.alloc, MYF(0)); + + stmt2a->result.data= NULL; + stmt2a->result_cursor= NULL; + stmt2a->result.rows = 0; + + // we will now copy back the last row and make it the only row available + MYSQL_ROWS *current = (MYSQL_ROWS *)ma_alloc_root(&stmt2a->result.alloc, sizeof(MYSQL_ROWS) + lcopy->length); + current->data= (MYSQL_ROW)(current + 1); + stmt2a->result.data = current; + memcpy((char *)current->data, (char *)lcopy->data, lcopy->length); + // We update the length of the copied 'MYSQL_ROWS' data + current->length = lcopy->length; + + // we free the copy + free(lcopy); + // change the rows count to 1 + stmt2a->result.rows = 1; + // we should also configure the cursor, but because we scan it using our own + // algorithm, this is not needed + } + } + } + diag("mysql_stmt_store_result_cont called %d times", cont_cnt); + + if (prepare_meta_result) { + mysql_free_result(prepare_meta_result); + } + + return rows_read; +} + +/** + * @brief Count the number of rows present in the supplied 'MYSQL_STMT*' parameter, and + * returns the counted value. + * @param stmt The 'MYSQL_STMT*' which rows are going to be counted. + * @return The number of rows present in the 'MYSQL_STMT' parameter. + */ +int count_stmt_rows(MYSQL_STMT* stmt) { + int row_count2a=0; + int stmt2aRC = 0; + + // Count the rows from the resulting 'STMT' + { + MYSQL_ROWS *r=stmt->result.data; + int rows_left = stmt->result.rows; + int rows_read_inner = 0; + + if (r && rows_left) { + row_count2a++; + rows_read_inner++; + while(rows_read_inner < stmt->result.rows && r->next) { + rows_read_inner++; + r = r->next; + row_count2a++; + } + } + } + + return row_count2a; +} + +int NUM_ROWS = 100; + int main(int argc, char** argv) { CommandLine cl; if(cl.getEnv()) return exit_status(); - plan(1 + NLOOPS); + plan(3); diag("Testing PS async store result"); MYSQL* mysqladmin = mysql_init(NULL); @@ -220,7 +384,7 @@ int main(int argc, char** argv) { // configure the connection as not blocking diag("Setting mysql connection non blocking"); mysql_options(mysql, MYSQL_OPT_NONBLOCK, 0); - if (!mysql_real_connect(mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + if (!mysql_real_connect(mysql, cl.host, cl.username, cl.password, NULL, 6033, NULL, 0)) { fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(mysql)); return exit_status(); @@ -286,141 +450,87 @@ int main(int argc, char** argv) { std::string query = ""; - for (int loops=0; loopsresult.data; - int rows_read_inner = 0; - if (r) { - //rows_read++; - rows_read_inner++; - MYSQL_ROWS *pr = r; - while(rows_read_inner < stmt2a->result.rows) { - // it is very important to check rows_read_inner FIRST - // because r->next could point to an invalid memory - rows_read_inner++; - pr = r; - r = r->next; - //rows_read++; - } - diag("Rows in buffer after calling mysql_stmt_store_result_cont(): %d", rows_read_inner); - // we now clean up the whole storage. - // This is the real POC - if (rows_read_inner > 1) { - // there is more than 1 row - int irs = 0; - MYSQL_ROWS *ir=stmt2a->result.data; - // see https://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html - // on why we have an offset of 3 - const int row_offset = 3; - for (irs = 0; irs < stmt2a->result.rows -1 ; irs++) { - int id1, id2, id3; - memcpy(&id1, (char *)ir->data+row_offset, sizeof(int)); - memcpy(&id2, (char *)ir->data+row_offset+sizeof(int), sizeof(int)); - memcpy(&id3, (char *)ir->data+row_offset+sizeof(int)*2, sizeof(int)); - //diag("Row: %d + %d = %d", id1, id2, id3); - assert(id3==id1+id2); - rows_read++; - if (irs < stmt2a->result.rows - 2) { - ir = ir->next; - } - } - // at this point, ir points to the last row - // next, we create a new MYSQL_ROWS that is a copy of the last row - MYSQL_ROWS *lcopy = (MYSQL_ROWS *)malloc(sizeof(MYSQL_ROWS) + ir->length); - lcopy->length = ir->length; - lcopy->data= (MYSQL_ROW)(lcopy + 1); - memcpy((char *)lcopy->data, (char *)ir->data, ir->length); - // next we proceed to reset all the buffer - stmt2a->result.rows = 0; - ma_free_root(&stmt2a->result.alloc, MYF(MY_KEEP_PREALLOC)); - //ma_free_root(&stmt2a->result.alloc, MYF(0)); - stmt2a->result.data= NULL; - stmt2a->result_cursor= NULL; - // we will now copy back the last row and make it the only row available - MYSQL_ROWS *current = (MYSQL_ROWS *)ma_alloc_root(&stmt2a->result.alloc, sizeof(MYSQL_ROWS) + lcopy->length); - current->data= (MYSQL_ROW)(current + 1); - stmt2a->result.data = current; - memcpy((char *)current->data, (char *)lcopy->data, lcopy->length); - // we free the copy - free(lcopy); - // change the rows count to 1 - stmt2a->result.rows = 1; - // we should also configure the cursor, but because we scan it using our own - // algorithm, this is not needed - } - - } + return -1; } - diag("mysql_stmt_store_result_cont called %d times", cont_cnt); + // Second query: This query was specifically thought to trigger a segfault + // and invalid memory access found while testing and reported by valgrind. + // The principle is to perform multiple big queries followed by small queries. + // ************************************************************************ + NUM_ROWS_READ = 10; - int stmt2aRC = 0; - { - MYSQL_ROWS *r=stmt2a->result.data; - int rows_left = stmt2a->result.rows; - int rows_read_inner = 0; - if (r && rows_left) { - row_count2a++; - rows_read_inner++; - while(rows_read_inner < stmt2a->result.rows && r->next) { - rows_read_inner++; - r = r->next; - row_count2a++; - } - } - ok(row_count2a+rows_read==IT_NUM_ROWS_READ, "Fetched %d rows, expected %d. Details: %d rows processed while buffering, %d at the end", row_count2a+rows_read, IT_NUM_ROWS_READ , rows_read, row_count2a); + stmt2a = mysql_stmt_init(mysql); + if (!stmt2a) { + ok(false, " mysql_stmt_init(), out of memory\n"); + return restore_admin(mysqladmin); } - if (prepare_meta_result) { - mysql_free_result(prepare_meta_result); - } - - if (mysql_stmt_close(stmt2a)) - { + // Original query: For testing purposes + // ************************************************************************ + // query = "(SELECT id, k, REPEAT(c,100+20000) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")"; + // query += "UNION (SELECT id, k, REPEAT(c,100) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")"; + // ************************************************************************ + + // Small version of 'memory issues' query triggering valgrind errors + query = "(SELECT id, k, REPEAT(c,2000) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")"; + query += "UNION (SELECT id, k, REPEAT(c,10) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")"; + + rows_read = mysql_stmt_store_result_cont_poc(query, mysql, stmt2a, false); + row_count2a = count_stmt_rows(stmt2a); + + ok( + row_count2a + rows_read == NUM_ROWS_READ * 2, + "Fetched %d rows, expected %d. Details: %d rows processed while buffering, %d at the end", + row_count2a + rows_read, + NUM_ROWS_READ, + rows_read, + row_count2a + ); + + if (mysql_stmt_close(stmt2a)) { fprintf(stderr, " failed while closing the statement\n"); ok(false, " %s\n", mysql_error(mysql)); - return restore_admin(mysqladmin); - } + restore_admin(mysql); + + return -1; } restore_admin(mysqladmin);