Third commit to buffer resultsets in Prepared Statements

* MySQL_ResultSet() stores a pointer to the PS
* MySQL_ResultSet::init_with_stmt() doesn't need anymore the pointer to the PS
  Furthermore, it isn't anymore an "init" but an end (we will rename it)
* Added MySQL_ResultSet::add_row(MYSQL_ROWS *rows) for PS only
* MySQL_Protocol::generate_pkt_row3() accepts an optional row length.
  If passed, it will know the length of the row without computing it
* MyRS initialized during ASYNC_STMT_EXECUTE_STORE_RESULT_START
* Implemented throttling during ASYNC_STMT_EXECUTE_STORE_RESULT_START
* MySQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT() always
  generates a heartbeat. For now unnecessary, and we will better tune it later
* test_ps_async-t.cpp tunes mysql-threshold_resultset_size to trigger buffering

More testing is required
pull/3210/head
René Cannaò 5 years ago
parent 0171082448
commit 7921e130d2

@ -28,6 +28,7 @@ class MySQL_ResultSet {
MySQL_Protocol *myprot;
MYSQL *mysql;
MYSQL_RES *result;
MYSQL_STMT *stmt;
unsigned int num_fields;
unsigned long long num_rows;
unsigned long long resultset_size;
@ -35,8 +36,9 @@ class MySQL_ResultSet {
//PtrSizeArray *PSarrayOUT;
MySQL_ResultSet();
void init(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my, MYSQL_STMT *_stmt=NULL);
void init_with_stmt(MYSQL_STMT *_stmt);
void init_with_stmt();
~MySQL_ResultSet();
unsigned int add_row(MYSQL_ROWS *rows);
unsigned int add_row(MYSQL_ROW row);
unsigned int add_row2(MYSQL_ROWS *row, unsigned char *offset);
void add_eof();
@ -100,7 +102,7 @@ class MySQL_Protocol {
// bool generate_pkt_field(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue);
bool generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue, MySQL_ResultSet *myrs=NULL);
bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt);
uint8_t generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt);
uint8_t generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt, unsigned long rl);
bool generate_pkt_initial_handshake(bool send, void **ptr, unsigned int *len, uint32_t *thread_id, bool deprecate_eof_active);
// bool generate_statistics_response(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len);
bool generate_statistics_response(bool send, void **ptr, unsigned int *len);

@ -188,7 +188,7 @@ class MySQL_Connection {
void stmt_execute_store_result_start();
void stmt_execute_store_result_cont(short event);
void process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT();
void process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(unsigned long long& processed_bytes);
void async_free_result();
bool IsActiveTransaction(); /* {

@ -1015,15 +1015,21 @@ bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len,
return true;
}
uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) {
uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt, unsigned long rl) {
if ((*myds)->sess->mirror==true) {
return true;
}
int col=0;
unsigned int rowlen=0;
unsigned long rowlen=0;
uint8_t pkt_sid=sequence_id;
for (col=0; col<colnums; col++) {
rowlen+=( fieldstxt[col] ? fieldslen[col]+mysql_encode_length(fieldslen[col],NULL) : 1 );
if (rl == 0) {
// if rl == 0 , we are using text protocol (legacy) therefore we need to compute the size of the row
for (col=0; col<colnums; col++) {
rowlen+=( fieldstxt[col] ? fieldslen[col]+mysql_encode_length(fieldslen[col],NULL) : 1 );
}
} else {
// we already know the size of the row
rowlen=rl;
}
PtrSize_t pkt;
pkt.size=rowlen+sizeof(mysql_hdr);
@ -1045,16 +1051,20 @@ uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *l
}
}
int l=sizeof(mysql_hdr);
for (col=0; col<colnums; col++) {
if (fieldstxt[col]) {
char length_prefix;
uint8_t length_len=mysql_encode_length(fieldslen[col], &length_prefix);
l+=write_encoded_length_and_string((unsigned char *)pkt.ptr+l,fieldslen[col],length_len, length_prefix, fieldstxt[col]);
} else {
char *_ptr=(char *)pkt.ptr;
_ptr[l]=0xfb;
l++;
if (rl == 0) {
for (col=0; col<colnums; col++) {
if (fieldstxt[col]) {
char length_prefix;
uint8_t length_len=mysql_encode_length(fieldslen[col], &length_prefix);
l+=write_encoded_length_and_string((unsigned char *)pkt.ptr+l,fieldslen[col],length_len, length_prefix, fieldstxt[col]);
} else {
char *_ptr=(char *)pkt.ptr;
_ptr[l]=0xfb;
l++;
}
}
} else {
memcpy((unsigned char *)pkt.ptr+l, fieldstxt, rl);
}
if (pkt.size < (0xFFFFFF+sizeof(mysql_hdr))) {
mysql_hdr myhdr;
@ -2367,6 +2377,7 @@ void MySQL_ResultSet::init(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my,
resultset_completed=false;
myprot=_myprot;
mysql=_my;
stmt=_stmt;
if (buffer==NULL) {
//if (_stmt==NULL) { // we allocate this buffer only for not prepared statements
// removing the previous assumption. We allocate this buffer also for prepared statements
@ -2388,7 +2399,7 @@ void MySQL_ResultSet::init(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my,
//}
//reset_pid=true;
result=_res;
//resultset_size=0;
resultset_size=0;
num_rows=0;
num_fields=mysql_field_count(mysql);
PtrSize_t pkt;
@ -2442,13 +2453,15 @@ void MySQL_ResultSet::init(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my,
resultset_size += 9;
}
//}
if (_stmt) { // binary protocol , we also assume we have ALL the resultset
init_with_stmt(_stmt);
}
//if (_stmt) { // binary protocol , we also assume we have ALL the resultset
/// init_with_stmt(_stmt);
//}
}
void MySQL_ResultSet::init_with_stmt(MYSQL_STMT *_stmt) {
void MySQL_ResultSet::init_with_stmt() {
assert(stmt);
MYSQL_STMT *_stmt = stmt;
MySQL_Data_Stream * c_myds = *(myprot->myds);
buffer_to_PSarrayOut();
unsigned long long total_size=0;
@ -2576,11 +2589,28 @@ MySQL_ResultSet::~MySQL_ResultSet() {
//if (myds) myds->pkt_sid=sid-1;
}
// this function is used for binary protocol
// maybe later on can be adapted for text protocol too
unsigned int MySQL_ResultSet::add_row(MYSQL_ROWS *rows) {
unsigned int pkt_length=0;
MYSQL_ROW row = rows->data;
unsigned long row_length = rows->length;
// we call generate_pkt_row3 passing row_length
sid=myprot->generate_pkt_row3(this, &pkt_length, sid, 0, NULL, row, row_length);
sid++;
resultset_size+=pkt_length;
num_rows++;
return pkt_length;
}
// this function is used for text protocol
unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) {
unsigned long *lengths=mysql_fetch_lengths(result);
unsigned int pkt_length=0;
if (myprot) {
sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row);
// we call generate_pkt_row3 without passing row_length
sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row, 0);
} else {
unsigned int col=0;
for (col=0; col<num_fields; col++) {

@ -6103,6 +6103,13 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
}
void MySQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt, MySQL_Connection *myconn) {
MySQL_ResultSet *MyRS = NULL;
if (myconn) {
if (myconn->MyRS) {
MyRS = myconn->MyRS;
}
}
/*
MYSQL_RES *stmt_result=myconn->query.stmt_result;
if (stmt_result) {
MySQL_ResultSet *MyRS=new MySQL_ResultSet();
@ -6111,6 +6118,14 @@ void MySQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt, MySQL_Conn
CurrentQuery.rows_sent = MyRS->num_rows;
//removed bool resultset_completed=MyRS->get_resultset(client_myds->PSarrayOUT);
delete MyRS;
*/
if (MyRS) {
assert(MyRS->result);
bool transfer_started=MyRS->transfer_started;
MyRS->init_with_stmt();
bool resultset_completed=MyRS->get_resultset(client_myds->PSarrayOUT);
CurrentQuery.rows_sent = MyRS->num_rows;
assert(resultset_completed); // the resultset should always be completed if MySQL_Result_to_MySQL_wire is called
} else {
MYSQL *mysql=stmt->mysql;
// no result set

@ -1161,6 +1161,30 @@ handler_again:
query.stmt_result=mysql_stmt_result_metadata(query.stmt);
if (query.stmt_result==NULL) {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
} else {
if (myds->sess->mirror==false) {
if (MyRS_reuse == NULL) {
MyRS = new MySQL_ResultSet();
MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, mysql, query.stmt);
} else {
MyRS = MyRS_reuse;
MyRS_reuse = NULL;
MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, mysql, query.stmt);
}
} else {
/*
// we do not support mirroring with prepared statements
if (MyRS_reuse == NULL) {
MyRS = new MySQL_ResultSet();
MyRS->init(NULL, mysql_result, mysql);
} else {
MyRS = MyRS_reuse;
MyRS_reuse = NULL;
MyRS->init(NULL, mysql_result, mysql);
}
*/
}
//async_fetch_row_start=false;
}
}
stmt_execute_store_result_start();
@ -1202,7 +1226,16 @@ handler_again:
r = r->next;
}
if (rows_read_inner > 1) {
process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT();
process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(processed_bytes);
if (
(processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8)
||
( mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) )
) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause
} else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we continue looping
}
}
}
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT);
@ -1566,7 +1599,7 @@ handler_again:
return async_state_machine;
}
void MySQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT() {
void MySQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(unsigned long long& processed_bytes) {
// there is more than 1 row
unsigned long long total_size=0;
long long unsigned int irs = 0;
@ -1579,12 +1612,22 @@ void MySQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT() {
}
total_size+=sizeof(mysql_hdr);
// TODO: here we need to copy the rows in the MyRS
// before that we need to prepare MyRS
// without the two above, the final result set will only have few rows (possible only one)
ir = ir->next;
//rows_read++;
//MYSQL_ROW mysql_row = ir->data;
//if (mysql_row) {
unsigned int br=MyRS->add_row(ir);
__sync_fetch_and_add(&parent->bytes_recv,br);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv]+=br;
myds->bytes_info.bytes_recv += br;
bytes_info.bytes_recv += br;
processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event
//}
ir = ir->next;
}
// generate a heartbeat, always
if (myds && myds->sess && myds->sess->thread) {
unsigned long long curtime=monotonic_time();
myds->sess->thread->atomic_curtime=curtime;
}
// at this point, ir points to the last row
// next, we create a new MYSQL_ROWS that is a copy of the last row

@ -188,6 +188,8 @@ int restore_admin(MYSQL* mysqladmin) {
MYSQL_QUERY(mysqladmin, "load mysql query rules to runtime");
MYSQL_QUERY(mysqladmin, "load mysql servers from disk");
MYSQL_QUERY(mysqladmin, "load mysql servers to runtime");
MYSQL_QUERY(mysqladmin, "load mysql variables from disk");
MYSQL_QUERY(mysqladmin, "load mysql variables to runtime");
}
int main(int argc, char** argv) {
@ -239,6 +241,9 @@ int main(int argc, char** argv) {
MYSQL_QUERY(mysqladmin, "delete from mysql_servers where hostgroup_id=1");
MYSQL_QUERY(mysqladmin, "load mysql servers to runtime");
MYSQL_QUERY(mysqladmin, "set mysql-threshold_resultset_size=5000");
MYSQL_QUERY(mysqladmin, "load mysql variables to runtime");
MYSQL_QUERY(mysql, "drop database if exists test");
MYSQL_QUERY(mysql, "create database if not exists test");
MYSQL_QUERY(mysql, "DROP TABLE IF EXISTS test.sbtest1");

Loading…
Cancel
Save