diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 2db6c363f..aef35683f 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -73,6 +73,7 @@ class MySQL_Data_Stream PtrSize_t multi_pkt; + unsigned long long pause_until; unsigned long long wait_until; unsigned long long killed_at; unsigned long long max_connect_time; @@ -135,6 +136,7 @@ class MySQL_Data_Stream int write_to_net(); int write_to_net_poll(); bool available_data_out(); + void remove_pollout(); void set_pollout(); void mysql_free(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 743db8392..92a9d6ce4 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -372,6 +372,8 @@ class MySQL_Threads_Handler int query_digests_max_digest_length; int query_digests_max_query_length; int wait_timeout; + int throttle_max_bytes_per_second_to_client; + int throttle_ratio_server_to_client; int max_connections; int max_stmts_per_connection; int max_stmts_cache; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 8dab9dc1b..efdaf25de 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -594,6 +594,8 @@ __thread int mysql_thread___max_transaction_time; __thread int mysql_thread___threshold_query_length; __thread int mysql_thread___threshold_resultset_size; __thread int mysql_thread___wait_timeout; +__thread int mysql_thread___throttle_max_bytes_per_second_to_client; +__thread int mysql_thread___throttle_ratio_server_to_client; __thread int mysql_thread___max_connections; __thread int mysql_thread___max_stmts_per_connection; __thread int mysql_thread___max_stmts_cache; @@ -694,6 +696,8 @@ extern __thread int mysql_thread___max_transaction_time; extern __thread int mysql_thread___threshold_query_length; extern __thread int mysql_thread___threshold_resultset_size; extern __thread int mysql_thread___wait_timeout; +extern __thread int mysql_thread___throttle_max_bytes_per_second_to_client; +extern __thread int mysql_thread___throttle_ratio_server_to_client; extern __thread int mysql_thread___max_connections; extern __thread int mysql_thread___max_stmts_per_connection; extern __thread int mysql_thread___max_stmts_cache; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 9cd5b538d..d588efd43 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -411,6 +411,11 @@ void MySQL_Session::reset_all_backends() { }; void MySQL_Session::writeout() { + int tps = 10; // throttling per second , by default every 100ms + int total_written = 0; + unsigned long long last_sent_=0; + int mwpl = mysql_thread___throttle_max_bytes_per_second_to_client; // max writes per call + mwpl = mwpl/tps; if (client_myds) client_myds->array2buffer_full(); if (mybe && mybe->server_myds && mybe->server_myds->myds_type==MYDS_BACKEND) { if (session_type==PROXYSQL_SESSION_MYSQL) { @@ -423,14 +428,18 @@ void MySQL_Session::writeout() { mybe->server_myds->array2buffer_full(); } } - if (client_myds) { + if (client_myds && thread->curtime >= client_myds->pause_until) { if (mirror==false) { bool runloop=false; + if (client_myds->mypolls) { + last_sent_ = client_myds->mypolls->last_sent[client_myds->poll_fds_idx]; + } int retbytes=client_myds->write_to_net_poll(); + total_written+=retbytes; if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat runloop=true; } - while (runloop) { + while (runloop && total_written < mwpl) { runloop=false; // the default client_myds->array2buffer_full(); struct pollfd fds; @@ -441,6 +450,7 @@ void MySQL_Session::writeout() { if (retpoll>0) { if (fds.revents==POLLOUT) { retbytes=client_myds->write_to_net_poll(); + total_written+=retbytes; if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat runloop=true; } @@ -449,6 +459,36 @@ void MySQL_Session::writeout() { } } } + + // flow control + if (total_written > 0) { + if (total_written > mwpl) { + unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl; + pause_until = thread->curtime + add_; + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } else { + if (total_written >= QUEUE_T_DEFAULT_SIZE) { + unsigned long long time_diff = thread->curtime - last_sent_; + if (time_diff == 0) { // sending data really too fast! + unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl; + pause_until = thread->curtime + add_; + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } else { + float current_Bps = (float)total_written*1000*1000/time_diff; + if (current_Bps > mysql_thread___throttle_max_bytes_per_second_to_client) { + unsigned long long add_ = 1000000/tps; + pause_until = thread->curtime + add_; + assert(pause_until > thread->curtime); + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } + } + } + } + } + if (mybe) { if (mybe->server_myds) mybe->server_myds->write_to_net_poll(); } diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index a4df4f85e..8aa1b51a0 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -255,6 +255,8 @@ static char * mysql_thread_variables_names[]= { (char *)"query_digests_max_digest_length", (char *)"query_digests_max_query_length", (char *)"wait_timeout", + (char *)"throttle_max_bytes_per_second_to_client", + (char *)"throttle_ratio_server_to_client", (char *)"max_connections", (char *)"max_stmts_per_connection", (char *)"max_stmts_cache", @@ -362,6 +364,8 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.query_digests_max_digest_length=2*1024; variables.query_digests_max_query_length=65000; // legacy default variables.wait_timeout=8*3600*1000; + variables.throttle_max_bytes_per_second_to_client=2147483647; + variables.throttle_ratio_server_to_client=0; variables.max_connections=10*1000; variables.max_stmts_per_connection=20; variables.max_stmts_cache=10000; @@ -617,6 +621,8 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"query_digests_max_digest_length")) return (int)variables.query_digests_max_digest_length; if (!strcasecmp(name,"query_digests_max_query_length")) return (int)variables.query_digests_max_query_length; if (!strcasecmp(name,"wait_timeout")) return (int)variables.wait_timeout; + if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) return (int)variables.throttle_max_bytes_per_second_to_client; + if (!strcasecmp(name,"throttle_ratio_server_to_client")) return (int)variables.throttle_ratio_server_to_client; if (!strcasecmp(name,"max_connections")) return (int)variables.max_connections; if (!strcasecmp(name,"max_stmts_per_connection")) return (int)variables.max_stmts_per_connection; if (!strcasecmp(name,"max_stmts_cache")) return (int)variables.max_stmts_cache; @@ -894,6 +900,14 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f sprintf(intbuf,"%d",variables.wait_timeout); return strdup(intbuf); } + if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) { + sprintf(intbuf,"%d",variables.throttle_max_bytes_per_second_to_client); + return strdup(intbuf); + } + if (!strcasecmp(name,"throttle_ratio_server_to_client")) { + sprintf(intbuf,"%d",variables.throttle_ratio_server_to_client); + return strdup(intbuf); + } if (!strcasecmp(name,"max_connections")) { sprintf(intbuf,"%d",variables.max_connections); return strdup(intbuf); @@ -1321,6 +1335,24 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t } } #endif // IDLE_THREADS + if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) { + int intv=atoi(value); + if (intv >= 1024 && intv <= 2147483647) { + variables.throttle_max_bytes_per_second_to_client=intv; + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"throttle_ratio_server_to_client")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 100) { + variables.throttle_ratio_server_to_client=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"max_connections")) { int intv=atoi(value); if (intv >= 1 && intv <= 1000*1000) { @@ -2477,6 +2509,17 @@ __mysql_thread_exit_add_mirror: mypolls.myds[n]->set_pollout(); } } + if (myds && myds->sess->pause_until > curtime) { + if (myds->myds_type==MYDS_FRONTEND) { + mypolls.myds[n]->remove_pollout(); + } + if (myds->myds_type==MYDS_BACKEND) { + if (mysql_thread___throttle_ratio_server_to_client) { + mypolls.fds[n].events = 0; + } + } + } + } proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); } @@ -3092,6 +3135,8 @@ void MySQL_Thread::refresh_variables() { mysql_thread___query_digests_max_digest_length=GloMTH->get_variable_int((char *)"query_digests_max_digest_length"); mysql_thread___query_digests_max_query_length=GloMTH->get_variable_int((char *)"query_digests_max_query_length"); mysql_thread___wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout"); + mysql_thread___throttle_max_bytes_per_second_to_client=GloMTH->get_variable_int((char *)"throttle_max_bytes_per_second_to_client"); + mysql_thread___throttle_ratio_server_to_client=GloMTH->get_variable_int((char *)"throttle_ratio_server_to_client"); mysql_thread___max_connections=GloMTH->get_variable_int((char *)"max_connections"); mysql_thread___max_stmts_per_connection=GloMTH->get_variable_int((char *)"max_stmts_per_connection"); mysql_thread___max_stmts_cache=GloMTH->get_variable_int((char *)"max_stmts_cache"); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 85e5b71bd..faf1840db 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -561,7 +561,7 @@ void MySQL_Connection::set_is_client() { #define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) MDB_ASYNC_ST MySQL_Connection::handler(short event) { - unsigned int processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event + unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event if (mysql==NULL) { // it is the first time handler() is being called async_state_machine=ASYNC_CONNECT_START; @@ -922,7 +922,11 @@ handler_again: __sync_fetch_and_add(&parent->bytes_recv,br); myds->sess->thread->status_variables.queries_backends_bytes_recv+=br; processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event - if (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8) { + if ( + (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8) + || + ( mysql_thread___throttle_ratio_server_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) ) + ) { next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause } else { NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 24429f0a4..0d6d8830f 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -103,6 +103,7 @@ MySQL_Data_Stream::MySQL_Data_Stream() { connect_retries_on_failure=0; max_connect_time=0; wait_until=0; + pause_until=0; connect_tries=0; poll_fds_idx=-1; resultset_length=0; @@ -352,6 +353,12 @@ bool MySQL_Data_Stream::available_data_out() { return false; } +void MySQL_Data_Stream::remove_pollout() { + struct pollfd *_pollfd; + _pollfd=&mypolls->fds[poll_fds_idx]; + _pollfd->events = 0; +} + void MySQL_Data_Stream::set_pollout() { struct pollfd *_pollfd; _pollfd=&mypolls->fds[poll_fds_idx];