Multiple changes into 'test_ps_async-t' test

1. Isolated POC for ps_buffer into function.
2. Improved documentation of functions old and introduced checks.
3. Added new query used to trigger a memory error during testing.
pull/3335/head
Javier Jaramago Fernández 5 years ago
parent d0a1866cf9
commit 4071096f38

@ -158,11 +158,6 @@ static int wait_for_mysql(MYSQL *mysql, int status) {
}
}
const int NUM_ROWS=100;
const std::vector<int> NUM_ROWS_READ { 1000, 1, 2 };
const int NLOOPS = NUM_ROWS_READ.size();
int select_config_file(MYSQL* mysql, std::string& resultset) {
if (mysql_query(mysql, "select config file")) {
fprintf(stderr, "File %s, line %d, Error: %s\n",
@ -195,13 +190,182 @@ int restore_admin(MYSQL* mysqladmin) {
return 0;
}
/**
* @brief POC of the implementation for 'ps_buffering' for ProxySQL, this function encapsulates
* the core logic that is implemented in ProxySQL.
*
* @param query The query to be prepared and executed in the supplied 'MYSQL_STMT' that needs to
* be initialized.
* @param mysql A 'MYSQL*' connection handle, already oppened.
* @param stmt2a A 'MYSQL_STMT*' already initialized.
*
* @return The number of rows readed from the 'MYSQL_STMT' after this has been executed, and
* 'mysql_stmt_store_result_cont' has finished returned all the rows for the query.
*/
int mysql_stmt_store_result_cont_poc(const std::string query, MYSQL* mysql, MYSQL_STMT* stmt2a, bool check_ids) {
if (mysql_stmt_prepare(stmt2a, query.c_str(), query.size())) {
fprintf(stderr, "Query error %s\n", mysql_error(mysql));
mysql_library_end();
return -1;
}
if (mysql_stmt_execute(stmt2a))
{
fprintf(stderr, " mysql_stmt_execute(), failed\n");
ok(false, " %s\n", mysql_stmt_error(stmt2a));
return -1;
}
MYSQL_RES* prepare_meta_result = mysql_stmt_result_metadata(stmt2a);
int async_exit_status;
int interr = 0;
int cont_cnt = 0;
int rows_read = 0;
int prev_id3 = 0;
async_exit_status = mysql_stmt_store_result_start(&interr, stmt2a);
while (async_exit_status) {
async_exit_status = wait_for_mysql(mysql, async_exit_status);
async_exit_status = mysql_stmt_store_result_cont(&interr, stmt2a, async_exit_status);
cont_cnt++;
MYSQL_ROWS *r=stmt2a->result.data;
int rows_read_inner = 0;
if (r) {
rows_read_inner++;
MYSQL_ROWS *pr = r;
while(rows_read_inner < stmt2a->result.rows) {
rows_read_inner++;
pr = r;
r = r->next;
}
diag("Rows in buffer after calling mysql_stmt_store_result_cont(): %d", rows_read_inner);
// we now clean up the whole storage.
// This is the real POC
if (rows_read_inner > 1) {
// there is more than 1 row
int irs = 0;
MYSQL_ROWS *ir=stmt2a->result.data;
// see https://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html
// on why we have an offset of 3
const int row_offset = 3;
for (irs = 0; irs < stmt2a->result.rows - 1; irs++) {
if (check_ids) {
int id1, id2, id3 = 0;
memcpy(&id1, (char *)ir->data+row_offset, sizeof(int));
memcpy(&id2, (char *)ir->data+row_offset+sizeof(int), sizeof(int));
memcpy(&id3, (char *)ir->data+row_offset+sizeof(int)*2, sizeof(int));
// NOTE: Uncomment to have further information on the rows being processed
// diag("Row: %d + %d = %d", id1, id2, id3);
// We assert in case the ids doesn't match the expected one, ids should match this
// same pattern as it's the same performed in the 'SELECT'.
assert(id3==id1+id2);
// Since 'ids' are created in a sequentially increasing fashion. We can ensure
// that no middle rows are missing by checking that in case 'id3' is bigger than
// the previous 'id3' (seq_rows_id).
// We ensure that no rows are eluded during counting by checking that all in case
// of 'id3' begin bigger than the previous 'id3' ('prev_id3'), it's bigger by
// exactly one, in case of being smaller, we reset the previous 'id3' since
// a new sequence of row fetching has started.
if (irs != 0 && id3 > prev_id3) {
assert(id3 == prev_id3 + 1);
}
// Update 'prev_id3' with current 'id3'
prev_id3 = id3;
}
rows_read++;
if (irs <= stmt2a->result.rows - 2) {
ir = ir->next;
}
}
// at this point, ir points to the last row
// next, we create a new MYSQL_ROWS that is a copy of the last row
MYSQL_ROWS *lcopy = (MYSQL_ROWS *)malloc(sizeof(MYSQL_ROWS) + ir->length);
lcopy->length = ir->length;
lcopy->data= (MYSQL_ROW)(lcopy + 1);
memcpy((char *)lcopy->data, (char *)ir->data, ir->length);
// next we proceed to reset all the buffer
ma_free_root(&stmt2a->result.alloc, MYF(MY_KEEP_PREALLOC));
// NOTE: Left for testing purposes
// ma_free_root(&stmt2a->result.alloc, MYF(0));
stmt2a->result.data= NULL;
stmt2a->result_cursor= NULL;
stmt2a->result.rows = 0;
// we will now copy back the last row and make it the only row available
MYSQL_ROWS *current = (MYSQL_ROWS *)ma_alloc_root(&stmt2a->result.alloc, sizeof(MYSQL_ROWS) + lcopy->length);
current->data= (MYSQL_ROW)(current + 1);
stmt2a->result.data = current;
memcpy((char *)current->data, (char *)lcopy->data, lcopy->length);
// We update the length of the copied 'MYSQL_ROWS' data
current->length = lcopy->length;
// we free the copy
free(lcopy);
// change the rows count to 1
stmt2a->result.rows = 1;
// we should also configure the cursor, but because we scan it using our own
// algorithm, this is not needed
}
}
}
diag("mysql_stmt_store_result_cont called %d times", cont_cnt);
if (prepare_meta_result) {
mysql_free_result(prepare_meta_result);
}
return rows_read;
}
/**
* @brief Count the number of rows present in the supplied 'MYSQL_STMT*' parameter, and
* returns the counted value.
* @param stmt The 'MYSQL_STMT*' which rows are going to be counted.
* @return The number of rows present in the 'MYSQL_STMT' parameter.
*/
int count_stmt_rows(MYSQL_STMT* stmt) {
int row_count2a=0;
int stmt2aRC = 0;
// Count the rows from the resulting 'STMT'
{
MYSQL_ROWS *r=stmt->result.data;
int rows_left = stmt->result.rows;
int rows_read_inner = 0;
if (r && rows_left) {
row_count2a++;
rows_read_inner++;
while(rows_read_inner < stmt->result.rows && r->next) {
rows_read_inner++;
r = r->next;
row_count2a++;
}
}
}
return row_count2a;
}
int NUM_ROWS = 100;
int main(int argc, char** argv) {
CommandLine cl;
if(cl.getEnv())
return exit_status();
plan(1 + NLOOPS);
plan(3);
diag("Testing PS async store result");
MYSQL* mysqladmin = mysql_init(NULL);
@ -220,7 +384,7 @@ int main(int argc, char** argv) {
// configure the connection as not blocking
diag("Setting mysql connection non blocking");
mysql_options(mysql, MYSQL_OPT_NONBLOCK, 0);
if (!mysql_real_connect(mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
if (!mysql_real_connect(mysql, cl.host, cl.username, cl.password, NULL, 6033, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n",
__FILE__, __LINE__, mysql_error(mysql));
return exit_status();
@ -286,141 +450,87 @@ int main(int argc, char** argv) {
std::string query = "";
for (int loops=0; loops<NLOOPS; loops++) {
int NUM_ROWS_READ = 0;
MYSQL_STMT *stmt2a = nullptr;
int rows_read = 0;
int row_count2a = 0;
int IT_NUM_ROWS_READ = NUM_ROWS_READ[loops];
// Initial query, checking that the order is correct
// ************************************************************************
MYSQL_STMT *stmt2a = mysql_stmt_init(mysql);
if (!stmt2a)
{
NUM_ROWS_READ = 1000;
stmt2a = mysql_stmt_init(mysql);
if (!stmt2a) {
ok(false, " mysql_stmt_init(), out of memory\n");
return restore_admin(mysqladmin);
}
// NOTE: the first 2 columns we select are 3 ids, so we can later print and verify
query = "SELECT t1.id id1, t2.id id2, t1.id+t2.id id3, t1.k k1, t1.c c1, t1.pad pad1, t2.k k2, t2.c c2, t2.pad pad2 FROM test.sbtest1 t1 JOIN test.sbtest1 t2 ORDER BY t1.id, t2.id LIMIT " + std::to_string(IT_NUM_ROWS_READ);
query = "SELECT t1.id id1, t2.id id2, t1.id+t2.id id3, t1.k k1, t1.c c1, t1.pad pad1, t2.k k2, t2.c c2, t2.pad pad2 FROM test.sbtest1 t1 JOIN test.sbtest1 t2 ORDER BY t1.id, t2.id LIMIT " + std::to_string(NUM_ROWS_READ);
//query = "SELECT t1.id id1, t2.id id2, t1.id+t2.id id3 FROM test.sbtest1 t1 JOIN test.sbtest1 t2 LIMIT " + std::to_string(IT_NUM_ROWS_READ);
//query = "SELECT t1.id id1, t2.id id2, t1.id+t2.id id3 FROM test.sbtest1 t1 JOIN test.sbtest1 t2 ORDER BY t1.id, t2.id LIMIT " + std::to_string(IT_NUM_ROWS_READ);
if (mysql_stmt_prepare(stmt2a,query.c_str(), query.size())) {
fprintf(stderr, "Query error %s\n", mysql_error(mysql));
mysql_close(mysql);
mysql_library_end();
return restore_admin(mysqladmin);
}
rows_read = mysql_stmt_store_result_cont_poc(query, mysql, stmt2a, true);
row_count2a = count_stmt_rows(stmt2a);
if (mysql_stmt_execute(stmt2a))
{
fprintf(stderr, " mysql_stmt_execute(), failed\n");
ok(false, " %s\n", mysql_stmt_error(stmt2a));
return restore_admin(mysqladmin);
}
ok(
row_count2a + rows_read == NUM_ROWS_READ,
"Fetched %d rows, expected %d. Details: %d rows processed while buffering, %d at the end",
row_count2a + rows_read,
NUM_ROWS_READ,
rows_read,
row_count2a
);
int row_count2a=0;
MYSQL_RES * prepare_meta_result = mysql_stmt_result_metadata(stmt2a);
if (mysql_stmt_close(stmt2a)) {
fprintf(stderr, " failed while closing the statement\n");
ok(false, " %s\n", mysql_error(mysql));
restore_admin(mysql);
int async_exit_status;
int interr;
int cont_cnt = 0;
int rows_read = 0;
async_exit_status = mysql_stmt_store_result_start(&interr, stmt2a);
while (async_exit_status) {
async_exit_status = wait_for_mysql(mysql, async_exit_status);
async_exit_status = mysql_stmt_store_result_cont(&interr, stmt2a, async_exit_status);
cont_cnt++;
MYSQL_ROWS *r=stmt2a->result.data;
int rows_read_inner = 0;
if (r) {
//rows_read++;
rows_read_inner++;
MYSQL_ROWS *pr = r;
while(rows_read_inner < stmt2a->result.rows) {
// it is very important to check rows_read_inner FIRST
// because r->next could point to an invalid memory
rows_read_inner++;
pr = r;
r = r->next;
//rows_read++;
}
diag("Rows in buffer after calling mysql_stmt_store_result_cont(): %d", rows_read_inner);
// we now clean up the whole storage.
// This is the real POC
if (rows_read_inner > 1) {
// there is more than 1 row
int irs = 0;
MYSQL_ROWS *ir=stmt2a->result.data;
// see https://dev.mysql.com/doc/internals/en/binary-protocol-resultset-row.html
// on why we have an offset of 3
const int row_offset = 3;
for (irs = 0; irs < stmt2a->result.rows -1 ; irs++) {
int id1, id2, id3;
memcpy(&id1, (char *)ir->data+row_offset, sizeof(int));
memcpy(&id2, (char *)ir->data+row_offset+sizeof(int), sizeof(int));
memcpy(&id3, (char *)ir->data+row_offset+sizeof(int)*2, sizeof(int));
//diag("Row: %d + %d = %d", id1, id2, id3);
assert(id3==id1+id2);
rows_read++;
if (irs < stmt2a->result.rows - 2) {
ir = ir->next;
}
}
// at this point, ir points to the last row
// next, we create a new MYSQL_ROWS that is a copy of the last row
MYSQL_ROWS *lcopy = (MYSQL_ROWS *)malloc(sizeof(MYSQL_ROWS) + ir->length);
lcopy->length = ir->length;
lcopy->data= (MYSQL_ROW)(lcopy + 1);
memcpy((char *)lcopy->data, (char *)ir->data, ir->length);
// next we proceed to reset all the buffer
stmt2a->result.rows = 0;
ma_free_root(&stmt2a->result.alloc, MYF(MY_KEEP_PREALLOC));
//ma_free_root(&stmt2a->result.alloc, MYF(0));
stmt2a->result.data= NULL;
stmt2a->result_cursor= NULL;
// we will now copy back the last row and make it the only row available
MYSQL_ROWS *current = (MYSQL_ROWS *)ma_alloc_root(&stmt2a->result.alloc, sizeof(MYSQL_ROWS) + lcopy->length);
current->data= (MYSQL_ROW)(current + 1);
stmt2a->result.data = current;
memcpy((char *)current->data, (char *)lcopy->data, lcopy->length);
// we free the copy
free(lcopy);
// change the rows count to 1
stmt2a->result.rows = 1;
// we should also configure the cursor, but because we scan it using our own
// algorithm, this is not needed
}
}
return -1;
}
diag("mysql_stmt_store_result_cont called %d times", cont_cnt);
// Second query: This query was specifically thought to trigger a segfault
// and invalid memory access found while testing and reported by valgrind.
// The principle is to perform multiple big queries followed by small queries.
// ************************************************************************
NUM_ROWS_READ = 10;
int stmt2aRC = 0;
{
MYSQL_ROWS *r=stmt2a->result.data;
int rows_left = stmt2a->result.rows;
int rows_read_inner = 0;
if (r && rows_left) {
row_count2a++;
rows_read_inner++;
while(rows_read_inner < stmt2a->result.rows && r->next) {
rows_read_inner++;
r = r->next;
row_count2a++;
}
}
ok(row_count2a+rows_read==IT_NUM_ROWS_READ, "Fetched %d rows, expected %d. Details: %d rows processed while buffering, %d at the end", row_count2a+rows_read, IT_NUM_ROWS_READ , rows_read, row_count2a);
stmt2a = mysql_stmt_init(mysql);
if (!stmt2a) {
ok(false, " mysql_stmt_init(), out of memory\n");
return restore_admin(mysqladmin);
}
if (prepare_meta_result) {
mysql_free_result(prepare_meta_result);
}
if (mysql_stmt_close(stmt2a))
{
// Original query: For testing purposes
// ************************************************************************
// query = "(SELECT id, k, REPEAT(c,100+20000) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")";
// query += "UNION (SELECT id, k, REPEAT(c,100) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")";
// ************************************************************************
// Small version of 'memory issues' query triggering valgrind errors
query = "(SELECT id, k, REPEAT(c,2000) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")";
query += "UNION (SELECT id, k, REPEAT(c,10) cc FROM test.sbtest1 LIMIT " + std::to_string(NUM_ROWS_READ) + ")";
rows_read = mysql_stmt_store_result_cont_poc(query, mysql, stmt2a, false);
row_count2a = count_stmt_rows(stmt2a);
ok(
row_count2a + rows_read == NUM_ROWS_READ * 2,
"Fetched %d rows, expected %d. Details: %d rows processed while buffering, %d at the end",
row_count2a + rows_read,
NUM_ROWS_READ,
rows_read,
row_count2a
);
if (mysql_stmt_close(stmt2a)) {
fprintf(stderr, " failed while closing the statement\n");
ok(false, " %s\n", mysql_error(mysql));
return restore_admin(mysqladmin);
}
restore_admin(mysql);
return -1;
}
restore_admin(mysqladmin);

Loading…
Cancel
Save