diff --git a/include/MySQL_Protocol.h b/include/MySQL_Protocol.h index 8ea0be882..32364c4c8 100644 --- a/include/MySQL_Protocol.h +++ b/include/MySQL_Protocol.h @@ -28,6 +28,7 @@ class MySQL_ResultSet { MySQL_Protocol *myprot; MYSQL *mysql; MYSQL_RES *result; + MYSQL_STMT *stmt; unsigned int num_fields; unsigned long long num_rows; unsigned long long resultset_size; @@ -35,8 +36,9 @@ class MySQL_ResultSet { //PtrSizeArray *PSarrayOUT; MySQL_ResultSet(); void init(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my, MYSQL_STMT *_stmt=NULL); - void init_with_stmt(MYSQL_STMT *_stmt); + void init_with_stmt(); ~MySQL_ResultSet(); + unsigned int add_row(MYSQL_ROWS *rows); unsigned int add_row(MYSQL_ROW row); unsigned int add_row2(MYSQL_ROWS *row, unsigned char *offset); void add_eof(); @@ -100,7 +102,7 @@ class MySQL_Protocol { // bool generate_pkt_field(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue); bool generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue, MySQL_ResultSet *myrs=NULL); bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt); - uint8_t generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt); + uint8_t generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt, unsigned long rl); bool generate_pkt_initial_handshake(bool send, void **ptr, unsigned int *len, uint32_t *thread_id, bool deprecate_eof_active); // bool generate_statistics_response(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len); bool generate_statistics_response(bool send, void **ptr, unsigned int *len); diff --git a/include/mysql_connection.h b/include/mysql_connection.h index cab230285..12c05670d 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -188,7 +188,7 @@ class MySQL_Connection { void stmt_execute_store_result_start(); void stmt_execute_store_result_cont(short event); - void process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(); + void process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(unsigned long long& processed_bytes); void async_free_result(); bool IsActiveTransaction(); /* { diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index fcc4ed0fe..b2b485e09 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -1015,15 +1015,21 @@ bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, return true; } -uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) { +uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt, unsigned long rl) { if ((*myds)->sess->mirror==true) { return true; } int col=0; - unsigned int rowlen=0; + unsigned long rowlen=0; uint8_t pkt_sid=sequence_id; - for (col=0; colmyds); buffer_to_PSarrayOut(); unsigned long long total_size=0; @@ -2576,11 +2589,28 @@ MySQL_ResultSet::~MySQL_ResultSet() { //if (myds) myds->pkt_sid=sid-1; } +// this function is used for binary protocol +// maybe later on can be adapted for text protocol too +unsigned int MySQL_ResultSet::add_row(MYSQL_ROWS *rows) { + unsigned int pkt_length=0; + MYSQL_ROW row = rows->data; + unsigned long row_length = rows->length; + // we call generate_pkt_row3 passing row_length + sid=myprot->generate_pkt_row3(this, &pkt_length, sid, 0, NULL, row, row_length); + sid++; + resultset_size+=pkt_length; + num_rows++; + return pkt_length; +} + + +// this function is used for text protocol unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) { unsigned long *lengths=mysql_fetch_lengths(result); unsigned int pkt_length=0; if (myprot) { - sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row); + // we call generate_pkt_row3 without passing row_length + sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row, 0); } else { unsigned int col=0; for (col=0; colMyRS) { + MyRS = myconn->MyRS; + } + } +/* MYSQL_RES *stmt_result=myconn->query.stmt_result; if (stmt_result) { MySQL_ResultSet *MyRS=new MySQL_ResultSet(); @@ -6111,6 +6118,14 @@ void MySQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt, MySQL_Conn CurrentQuery.rows_sent = MyRS->num_rows; //removed bool resultset_completed=MyRS->get_resultset(client_myds->PSarrayOUT); delete MyRS; +*/ + if (MyRS) { + assert(MyRS->result); + bool transfer_started=MyRS->transfer_started; + MyRS->init_with_stmt(); + bool resultset_completed=MyRS->get_resultset(client_myds->PSarrayOUT); + CurrentQuery.rows_sent = MyRS->num_rows; + assert(resultset_completed); // the resultset should always be completed if MySQL_Result_to_MySQL_wire is called } else { MYSQL *mysql=stmt->mysql; // no result set diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 340a5c442..7cd7f906a 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -1161,6 +1161,30 @@ handler_again: query.stmt_result=mysql_stmt_result_metadata(query.stmt); if (query.stmt_result==NULL) { NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } else { + if (myds->sess->mirror==false) { + if (MyRS_reuse == NULL) { + MyRS = new MySQL_ResultSet(); + MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, mysql, query.stmt); + } else { + MyRS = MyRS_reuse; + MyRS_reuse = NULL; + MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, mysql, query.stmt); + } + } else { +/* + // we do not support mirroring with prepared statements + if (MyRS_reuse == NULL) { + MyRS = new MySQL_ResultSet(); + MyRS->init(NULL, mysql_result, mysql); + } else { + MyRS = MyRS_reuse; + MyRS_reuse = NULL; + MyRS->init(NULL, mysql_result, mysql); + } +*/ + } + //async_fetch_row_start=false; } } stmt_execute_store_result_start(); @@ -1202,7 +1226,16 @@ handler_again: r = r->next; } if (rows_read_inner > 1) { - process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(); + process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(processed_bytes); + if ( + (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8) + || + ( mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) ) + ) { + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause + } else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we continue looping + } } } next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); @@ -1566,7 +1599,7 @@ handler_again: return async_state_machine; } -void MySQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT() { +void MySQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(unsigned long long& processed_bytes) { // there is more than 1 row unsigned long long total_size=0; long long unsigned int irs = 0; @@ -1579,12 +1612,22 @@ void MySQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT() { } total_size+=sizeof(mysql_hdr); - // TODO: here we need to copy the rows in the MyRS - // before that we need to prepare MyRS - // without the two above, the final result set will only have few rows (possible only one) - - ir = ir->next; //rows_read++; + //MYSQL_ROW mysql_row = ir->data; + //if (mysql_row) { + unsigned int br=MyRS->add_row(ir); + __sync_fetch_and_add(&parent->bytes_recv,br); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv]+=br; + myds->bytes_info.bytes_recv += br; + bytes_info.bytes_recv += br; + processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event + //} + ir = ir->next; + } + // generate a heartbeat, always + if (myds && myds->sess && myds->sess->thread) { + unsigned long long curtime=monotonic_time(); + myds->sess->thread->atomic_curtime=curtime; } // at this point, ir points to the last row // next, we create a new MYSQL_ROWS that is a copy of the last row diff --git a/test/tap/tests/test_ps_async-t.cpp b/test/tap/tests/test_ps_async-t.cpp index a51d01925..038b8acdc 100644 --- a/test/tap/tests/test_ps_async-t.cpp +++ b/test/tap/tests/test_ps_async-t.cpp @@ -188,6 +188,8 @@ int restore_admin(MYSQL* mysqladmin) { MYSQL_QUERY(mysqladmin, "load mysql query rules to runtime"); MYSQL_QUERY(mysqladmin, "load mysql servers from disk"); MYSQL_QUERY(mysqladmin, "load mysql servers to runtime"); + MYSQL_QUERY(mysqladmin, "load mysql variables from disk"); + MYSQL_QUERY(mysqladmin, "load mysql variables to runtime"); } int main(int argc, char** argv) { @@ -239,6 +241,9 @@ int main(int argc, char** argv) { MYSQL_QUERY(mysqladmin, "delete from mysql_servers where hostgroup_id=1"); MYSQL_QUERY(mysqladmin, "load mysql servers to runtime"); + MYSQL_QUERY(mysqladmin, "set mysql-threshold_resultset_size=5000"); + MYSQL_QUERY(mysqladmin, "load mysql variables to runtime"); + MYSQL_QUERY(mysql, "drop database if exists test"); MYSQL_QUERY(mysql, "create database if not exists test"); MYSQL_QUERY(mysql, "DROP TABLE IF EXISTS test.sbtest1");