diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index dc3b9f17a..b5eb5008a 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -119,7 +119,7 @@ class MySQL_Session bool schema_locked; bool transaction_persistent; bool session_fast_forward; - + bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, of if proysql is still buffering everything MySQL_Session(); // MySQL_Session(int); ~MySQL_Session(); diff --git a/include/mysql_connection.h b/include/mysql_connection.h index bbbb00e0e..6c247fa26 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -74,7 +74,7 @@ class MySQL_Connection { bool has_prepared_statement; bool processing_prepared_statement_prepare; bool processing_prepared_statement_execute; - + bool processing_multi_statement; MySQL_Connection(); ~MySQL_Connection(); // int assign_mshge(unsigned int); diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 8ece2ff48..914871716 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -42,6 +42,9 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine ASYNC_QUERY_START, ASYNC_QUERY_CONT, ASYNC_QUERY_END, + ASYNC_NEXT_RESULT_START, + ASYNC_NEXT_RESULT_CONT, + ASYNC_NEXT_RESULT_END, ASYNC_STORE_RESULT_START, ASYNC_STORE_RESULT_CONT, ASYNC_USE_RESULT_START, diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 7fc18f441..cf40acdac 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -502,6 +502,8 @@ bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len, case STATE_QUERY_SENT_NET: (*myds)->DSS=STATE_ERR; break; + case STATE_OK: + break; default: assert(0); } @@ -564,6 +566,8 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u case STATE_QUERY_SENT_NET: (*myds)->DSS=STATE_OK; break; + case STATE_OK: + break; default: assert(0); } @@ -1333,6 +1337,7 @@ MySQL_ResultSet::~MySQL_ResultSet() { free(buffer); buffer=NULL; } + myds->pkt_sid=sid-1; } unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) { diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 2527da533..f2ad0332a 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -203,6 +203,7 @@ MySQL_Session::MySQL_Session() { default_schema=NULL; schema_locked=false; session_fast_forward=false; + started_sending_data_to_client=false; admin_func=NULL; //client_fd=0; //server_fd=0; @@ -1145,10 +1146,32 @@ handler_again: } } } else { - // rc==1 , query is still running - // start sending to frontend if mysql_thread___threshold_resultset_size is reached - if (myconn->MyRS && myconn->MyRS->result && myconn->MyRS->resultset_size > (unsigned int) mysql_thread___threshold_resultset_size) { - myconn->MyRS->get_resultset(client_myds->PSarrayOUT); + switch (rc) { + // rc==1 , query is still running + // start sending to frontend if mysql_thread___threshold_resultset_size is reached + case 1: + if (myconn->MyRS && myconn->MyRS->result && myconn->MyRS->resultset_size > (unsigned int) mysql_thread___threshold_resultset_size) { + myconn->MyRS->get_resultset(client_myds->PSarrayOUT); + } + break; + // rc==2 : a multi-resultset (or multi statement) was detected, and the current statement is completed + case 2: + MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS); + if (myconn->MyRS) { // we also need to clear MyRS, so that the next staement will recreate it if needed + delete myconn->MyRS; + myconn->MyRS=NULL; + } + NEXT_IMMEDIATE(PROCESSING_QUERY); + break; + // rc==3 , a multi statement query is still running + // start sending to frontend if mysql_thread___threshold_resultset_size is reached + case 3: + if (myconn->MyRS && myconn->MyRS->result && myconn->MyRS->resultset_size > (unsigned int) mysql_thread___threshold_resultset_size) { + myconn->MyRS->get_resultset(client_myds->PSarrayOUT); + } + break; + default: + break; } } } @@ -1999,12 +2022,16 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + if (mysql->server_status & SERVER_MORE_RESULTS_EXIST) + setStatus += SERVER_MORE_RESULTS_EXIST; client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,num_rows,mysql->insert_id,mysql->server_status|setStatus,mysql->warning_count,mysql->info); + client_myds->pkt_sid++; } else { // error char sqlstate[10]; sprintf(sqlstate,"#%s",mysql_sqlstate(mysql)); client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(mysql),sqlstate,mysql_error(mysql)); + client_myds->pkt_sid++; } } } @@ -2132,4 +2159,5 @@ void MySQL_Session::RequestEnd(MySQL_Data_Stream *myds) { client_myds->DSS=STATE_SLEEP; // finalize the query CurrentQuery.end(); + started_sending_data_to_client=false; } diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 0929cce08..0a292574e 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -160,6 +160,7 @@ MySQL_Connection::MySQL_Connection() { largest_query_length=0; MyRS=NULL; creation_time=0; + processing_multi_statement=false; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this); }; @@ -312,6 +313,7 @@ void MySQL_Connection::connect_start() { client_flags += CLIENT_FOUND_ROWS; if (parent->compression) client_flags += CLIENT_COMPRESS; + client_flags += CLIENT_MULTI_STATEMENTS; // FIXME: add global variable if (parent->port) { async_exit_status=mysql_real_connect_start(&ret_mysql, mysql, parent->address, userinfo->username, userinfo->password, userinfo->schemaname, parent->port, NULL, client_flags); } else { @@ -561,6 +563,36 @@ handler_again: #endif } break; + + case ASYNC_NEXT_RESULT_START: + async_exit_status = mysql_next_result_start(&interr, mysql); + if (async_exit_status) { + next_event(ASYNC_NEXT_RESULT_CONT); + } else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + + case ASYNC_NEXT_RESULT_CONT: + async_exit_status = mysql_next_result_cont(&interr, mysql, mysql_status(event, true)); + if (async_exit_status) { + next_event(ASYNC_NEXT_RESULT_CONT); + } else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + + case ASYNC_NEXT_RESULT_END: + break; + case ASYNC_STORE_RESULT_START: if (mysql_errno(mysql)) { NEXT_IMMEDIATE(ASYNC_QUERY_END); @@ -620,6 +652,14 @@ handler_again: } break; case ASYNC_QUERY_END: + if (mysql_result) { + mysql_free_result(mysql_result); + mysql_result=NULL; + } + //if (mysql_next_result(mysql)==0) { + if (mysql->server_status & SERVER_MORE_RESULTS_EXIST) { + async_state_machine=ASYNC_NEXT_RESULT_START; + } break; case ASYNC_SET_AUTOCOMMIT_START: set_autocommit_start(); @@ -807,6 +847,7 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length) } switch (async_state_machine) { case ASYNC_QUERY_END: + processing_multi_statement=false; // no matter if we are processing a multi statement or not, we reached the end return 0; break; case ASYNC_IDLE: @@ -824,6 +865,16 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length) return 0; } } + if (async_state_machine==ASYNC_NEXT_RESULT_START) { + // if we reached this point it measn we are processing a multi-statement + // and we need to exit to give control to MySQL_Session + processing_multi_statement=true; + return 2; + } + if (processing_multi_statement==true) { + // we are in the middle of processing a multi-statement + return 3; + } return 1; }