From c437c08f813717e8fbfa2be7b386c33d9f6eee01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 10 Jun 2017 21:55:42 +0200 Subject: [PATCH 1/4] Implementation of global per-query throttle #1034 --- include/MySQL_Data_Stream.h | 1 + include/MySQL_Thread.h | 1 + include/proxysql_structs.h | 2 ++ lib/MySQL_Session.cpp | 12 +++++++++++- lib/MySQL_Thread.cpp | 26 ++++++++++++++++++++++++++ lib/mysql_connection.cpp | 2 +- lib/mysql_data_stream.cpp | 6 ++++++ 7 files changed, 48 insertions(+), 2 deletions(-) diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 2db6c363f..1f07926e9 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -135,6 +135,7 @@ class MySQL_Data_Stream int write_to_net(); int write_to_net_poll(); bool available_data_out(); + void remove_pollout(); void set_pollout(); void mysql_free(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 3daef4e30..ce6a91ad9 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -359,6 +359,7 @@ class MySQL_Threads_Handler int query_digests_max_digest_length; int query_digests_max_query_length; int wait_timeout; + int throttle_max_bytes_per_second_to_client; int max_connections; int max_stmts_per_connection; int max_stmts_cache; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 2bc3e8300..7438064b9 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -592,6 +592,7 @@ __thread int mysql_thread___max_transaction_time; __thread int mysql_thread___threshold_query_length; __thread int mysql_thread___threshold_resultset_size; __thread int mysql_thread___wait_timeout; +__thread int mysql_thread___throttle_max_bytes_per_second_to_client; __thread int mysql_thread___max_connections; __thread int mysql_thread___max_stmts_per_connection; __thread int mysql_thread___max_stmts_cache; @@ -690,6 +691,7 @@ extern __thread int mysql_thread___max_transaction_time; extern __thread int mysql_thread___threshold_query_length; extern __thread int mysql_thread___threshold_resultset_size; extern __thread int mysql_thread___wait_timeout; +extern __thread int mysql_thread___throttle_max_bytes_per_second_to_client; extern __thread int mysql_thread___max_connections; extern __thread int mysql_thread___max_stmts_per_connection; extern __thread int mysql_thread___max_stmts_cache; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index a6fbb7bd6..2fd72fc03 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -386,6 +386,10 @@ void MySQL_Session::reset_all_backends() { }; void MySQL_Session::writeout() { + int tps = 10; // throttling per second , by default every 100ms + int total_written = 0; + int mwpl = mysql_thread___throttle_max_bytes_per_second_to_client; // max writes per call + mwpl = mwpl/tps; if (client_myds) client_myds->array2buffer_full(); if (mybe && mybe->server_myds && mybe->server_myds->myds_type==MYDS_BACKEND) { if (admin==false) { @@ -402,10 +406,11 @@ void MySQL_Session::writeout() { if (mirror==false) { bool runloop=false; int retbytes=client_myds->write_to_net_poll(); + total_written+=retbytes; if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat runloop=true; } - while (runloop) { + while (runloop && total_written < mwpl) { runloop=false; // the default client_myds->array2buffer_full(); struct pollfd fds; @@ -416,6 +421,7 @@ void MySQL_Session::writeout() { if (retpoll>0) { if (fds.revents==POLLOUT) { retbytes=client_myds->write_to_net_poll(); + total_written+=retbytes; if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat runloop=true; } @@ -424,6 +430,10 @@ void MySQL_Session::writeout() { } } } + if (total_written > mwpl) { + pause_until = thread->curtime + 1000000/tps + 1000000/tps*(total_written - mwpl)/mwpl; + client_myds->remove_pollout(); + } if (mybe) { if (mybe->server_myds) mybe->server_myds->write_to_net_poll(); } diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 8b783d624..4392bed9f 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -242,6 +242,7 @@ static char * mysql_thread_variables_names[]= { (char *)"query_digests_max_digest_length", (char *)"query_digests_max_query_length", (char *)"wait_timeout", + (char *)"throttle_max_bytes_per_second_to_client", (char *)"max_connections", (char *)"max_stmts_per_connection", (char *)"max_stmts_cache", @@ -345,6 +346,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.query_digests_max_digest_length=2*1024; variables.query_digests_max_query_length=65000; // legacy default variables.wait_timeout=8*3600*1000; + variables.throttle_max_bytes_per_second_to_client=2147483647; variables.max_connections=10*1000; variables.max_stmts_per_connection=20; variables.max_stmts_cache=10000; @@ -595,6 +597,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"query_digests_max_digest_length")) return (int)variables.query_digests_max_digest_length; if (!strcasecmp(name,"query_digests_max_query_length")) return (int)variables.query_digests_max_query_length; if (!strcasecmp(name,"wait_timeout")) return (int)variables.wait_timeout; + if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) return (int)variables.throttle_max_bytes_per_second_to_client; if (!strcasecmp(name,"max_connections")) return (int)variables.max_connections; if (!strcasecmp(name,"max_stmts_per_connection")) return (int)variables.max_stmts_per_connection; if (!strcasecmp(name,"max_stmts_cache")) return (int)variables.max_stmts_cache; @@ -860,6 +863,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f sprintf(intbuf,"%d",variables.wait_timeout); return strdup(intbuf); } + if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) { + sprintf(intbuf,"%d",variables.throttle_max_bytes_per_second_to_client); + return strdup(intbuf); + } if (!strcasecmp(name,"max_connections")) { sprintf(intbuf,"%d",variables.max_connections); return strdup(intbuf); @@ -1269,6 +1276,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t } } #endif // IDLE_THREADS + if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) { + int intv=atoi(value); + if (intv >= 1024 && intv <= 2147483647) { + variables.throttle_max_bytes_per_second_to_client=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"max_connections")) { int intv=atoi(value); if (intv >= 1 && intv <= 1000*1000) { @@ -2358,6 +2374,15 @@ __mysql_thread_exit_add_mirror: mypolls.myds[n]->set_pollout(); } } + if (myds && myds->sess->pause_until > curtime) { + if (myds->myds_type==MYDS_FRONTEND) { + mypolls.myds[n]->remove_pollout(); + } + if (myds->myds_type==MYDS_BACKEND) { + mypolls.fds[n].events = 0; + } + } + } proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events); } @@ -2962,6 +2987,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___query_digests_max_digest_length=GloMTH->get_variable_int((char *)"query_digests_max_digest_length"); mysql_thread___query_digests_max_query_length=GloMTH->get_variable_int((char *)"query_digests_max_query_length"); mysql_thread___wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout"); + mysql_thread___throttle_max_bytes_per_second_to_client=GloMTH->get_variable_int((char *)"throttle_max_bytes_per_second_to_client"); mysql_thread___max_connections=GloMTH->get_variable_int((char *)"max_connections"); mysql_thread___max_stmts_per_connection=GloMTH->get_variable_int((char *)"max_stmts_per_connection"); mysql_thread___max_stmts_cache=GloMTH->get_variable_int((char *)"max_stmts_cache"); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 34175bf82..2290eed7f 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -922,7 +922,7 @@ handler_again: __sync_fetch_and_add(&parent->bytes_recv,br); myds->sess->thread->status_variables.queries_backends_bytes_recv+=br; processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event - if (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8) { + if ((processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*4) || (processed_bytes > (unsigned int)mysql_thread___throttle_max_bytes_per_second_to_client/10)) { next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause } else { NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 8069f34c6..8088d3965 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -350,6 +350,12 @@ bool MySQL_Data_Stream::available_data_out() { return false; } +void MySQL_Data_Stream::remove_pollout() { + struct pollfd *_pollfd; + _pollfd=&mypolls->fds[poll_fds_idx]; + _pollfd->events = 0; +} + void MySQL_Data_Stream::set_pollout() { struct pollfd *_pollfd; _pollfd=&mypolls->fds[poll_fds_idx]; From f614b59de526d801b61047d96eeb97e5b3a9a6aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 10 Jun 2017 16:52:05 -0500 Subject: [PATCH 2/4] Improving throttling #1034 --- include/MySQL_Data_Stream.h | 1 + lib/MySQL_Session.cpp | 7 +++++-- lib/mysql_data_stream.cpp | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 1f07926e9..aef35683f 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -73,6 +73,7 @@ class MySQL_Data_Stream PtrSize_t multi_pkt; + unsigned long long pause_until; unsigned long long wait_until; unsigned long long killed_at; unsigned long long max_connect_time; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 2fd72fc03..e19548434 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -402,7 +402,7 @@ void MySQL_Session::writeout() { mybe->server_myds->array2buffer_full(); } } - if (client_myds) { + if (client_myds && thread->curtime >= client_myds->pause_until) { if (mirror==false) { bool runloop=false; int retbytes=client_myds->write_to_net_poll(); @@ -431,8 +431,11 @@ void MySQL_Session::writeout() { } } if (total_written > mwpl) { - pause_until = thread->curtime + 1000000/tps + 1000000/tps*(total_written - mwpl)/mwpl; + unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl; + pause_until = thread->curtime + add_; + assert(pause_until > thread->curtime); client_myds->remove_pollout(); + client_myds->pause_until=pause_until; } if (mybe) { if (mybe->server_myds) mybe->server_myds->write_to_net_poll(); diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 8088d3965..2f6bcd37e 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -103,6 +103,7 @@ MySQL_Data_Stream::MySQL_Data_Stream() { connect_retries_on_failure=0; max_connect_time=0; wait_until=0; + pause_until=0; connect_tries=0; poll_fds_idx=-1; resultset_length=0; From 321cd5be67acea6965bbad3d25106204f71a1595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sun, 11 Jun 2017 00:57:02 +0200 Subject: [PATCH 3/4] Better control of backend throttle #1034 --- include/MySQL_Thread.h | 1 + include/proxysql_structs.h | 2 ++ lib/MySQL_Thread.cpp | 21 ++++++++++++++++++++- lib/mysql_connection.cpp | 8 ++++++-- 4 files changed, 29 insertions(+), 3 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index ce6a91ad9..815b93be8 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -360,6 +360,7 @@ class MySQL_Threads_Handler int query_digests_max_query_length; int wait_timeout; int throttle_max_bytes_per_second_to_client; + int throttle_ratio_server_to_client; int max_connections; int max_stmts_per_connection; int max_stmts_cache; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 7438064b9..602de670a 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -593,6 +593,7 @@ __thread int mysql_thread___threshold_query_length; __thread int mysql_thread___threshold_resultset_size; __thread int mysql_thread___wait_timeout; __thread int mysql_thread___throttle_max_bytes_per_second_to_client; +__thread int mysql_thread___throttle_ratio_server_to_client; __thread int mysql_thread___max_connections; __thread int mysql_thread___max_stmts_per_connection; __thread int mysql_thread___max_stmts_cache; @@ -692,6 +693,7 @@ extern __thread int mysql_thread___threshold_query_length; extern __thread int mysql_thread___threshold_resultset_size; extern __thread int mysql_thread___wait_timeout; extern __thread int mysql_thread___throttle_max_bytes_per_second_to_client; +extern __thread int mysql_thread___throttle_ratio_server_to_client; extern __thread int mysql_thread___max_connections; extern __thread int mysql_thread___max_stmts_per_connection; extern __thread int mysql_thread___max_stmts_cache; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 4392bed9f..a69efb29b 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -243,6 +243,7 @@ static char * mysql_thread_variables_names[]= { (char *)"query_digests_max_query_length", (char *)"wait_timeout", (char *)"throttle_max_bytes_per_second_to_client", + (char *)"throttle_ratio_server_to_client", (char *)"max_connections", (char *)"max_stmts_per_connection", (char *)"max_stmts_cache", @@ -347,6 +348,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.query_digests_max_query_length=65000; // legacy default variables.wait_timeout=8*3600*1000; variables.throttle_max_bytes_per_second_to_client=2147483647; + variables.throttle_ratio_server_to_client=0; variables.max_connections=10*1000; variables.max_stmts_per_connection=20; variables.max_stmts_cache=10000; @@ -598,6 +600,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"query_digests_max_query_length")) return (int)variables.query_digests_max_query_length; if (!strcasecmp(name,"wait_timeout")) return (int)variables.wait_timeout; if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) return (int)variables.throttle_max_bytes_per_second_to_client; + if (!strcasecmp(name,"throttle_ratio_server_to_client")) return (int)variables.throttle_ratio_server_to_client; if (!strcasecmp(name,"max_connections")) return (int)variables.max_connections; if (!strcasecmp(name,"max_stmts_per_connection")) return (int)variables.max_stmts_per_connection; if (!strcasecmp(name,"max_stmts_cache")) return (int)variables.max_stmts_cache; @@ -867,6 +870,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f sprintf(intbuf,"%d",variables.throttle_max_bytes_per_second_to_client); return strdup(intbuf); } + if (!strcasecmp(name,"throttle_ratio_server_to_client")) { + sprintf(intbuf,"%d",variables.throttle_ratio_server_to_client); + return strdup(intbuf); + } if (!strcasecmp(name,"max_connections")) { sprintf(intbuf,"%d",variables.max_connections); return strdup(intbuf); @@ -1285,6 +1292,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t return false; } } + if (!strcasecmp(name,"throttle_ratio_server_to_client")) { + int intv=atoi(value); + if (intv >= 0 && intv <= 100) { + variables.throttle_ratio_server_to_client=intv; + return true; + } else { + return false; + } + } if (!strcasecmp(name,"max_connections")) { int intv=atoi(value); if (intv >= 1 && intv <= 1000*1000) { @@ -2379,7 +2395,9 @@ __mysql_thread_exit_add_mirror: mypolls.myds[n]->remove_pollout(); } if (myds->myds_type==MYDS_BACKEND) { - mypolls.fds[n].events = 0; + if (mysql_thread___throttle_ratio_server_to_client) { + mypolls.fds[n].events = 0; + } } } @@ -2988,6 +3006,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___query_digests_max_query_length=GloMTH->get_variable_int((char *)"query_digests_max_query_length"); mysql_thread___wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout"); mysql_thread___throttle_max_bytes_per_second_to_client=GloMTH->get_variable_int((char *)"throttle_max_bytes_per_second_to_client"); + mysql_thread___throttle_ratio_server_to_client=GloMTH->get_variable_int((char *)"throttle_ratio_server_to_client"); mysql_thread___max_connections=GloMTH->get_variable_int((char *)"max_connections"); mysql_thread___max_stmts_per_connection=GloMTH->get_variable_int((char *)"max_stmts_per_connection"); mysql_thread___max_stmts_cache=GloMTH->get_variable_int((char *)"max_stmts_cache"); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 2290eed7f..5341a8c6e 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -561,7 +561,7 @@ void MySQL_Connection::set_is_client() { #define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) MDB_ASYNC_ST MySQL_Connection::handler(short event) { - unsigned int processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event + unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event if (mysql==NULL) { // it is the first time handler() is being called async_state_machine=ASYNC_CONNECT_START; @@ -922,7 +922,11 @@ handler_again: __sync_fetch_and_add(&parent->bytes_recv,br); myds->sess->thread->status_variables.queries_backends_bytes_recv+=br; processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event - if ((processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*4) || (processed_bytes > (unsigned int)mysql_thread___throttle_max_bytes_per_second_to_client/10)) { + if ( + (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8) + || + ( mysql_thread___throttle_ratio_server_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) ) + ) { next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause } else { NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping From 455026bffdbb56670e277d70ff1338b80bd54ef7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 19 Jun 2017 01:58:56 +0200 Subject: [PATCH 4/4] Improving data flow control for #1034 --- lib/MySQL_Session.cpp | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index e19548434..e2a84991b 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -388,6 +388,7 @@ void MySQL_Session::reset_all_backends() { void MySQL_Session::writeout() { int tps = 10; // throttling per second , by default every 100ms int total_written = 0; + unsigned long long last_sent_=0; int mwpl = mysql_thread___throttle_max_bytes_per_second_to_client; // max writes per call mwpl = mwpl/tps; if (client_myds) client_myds->array2buffer_full(); @@ -405,6 +406,9 @@ void MySQL_Session::writeout() { if (client_myds && thread->curtime >= client_myds->pause_until) { if (mirror==false) { bool runloop=false; + if (client_myds->mypolls) { + last_sent_ = client_myds->mypolls->last_sent[client_myds->poll_fds_idx]; + } int retbytes=client_myds->write_to_net_poll(); total_written+=retbytes; if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat @@ -430,13 +434,36 @@ void MySQL_Session::writeout() { } } } - if (total_written > mwpl) { - unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl; - pause_until = thread->curtime + add_; - assert(pause_until > thread->curtime); - client_myds->remove_pollout(); - client_myds->pause_until=pause_until; + + // flow control + if (total_written > 0) { + if (total_written > mwpl) { + unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl; + pause_until = thread->curtime + add_; + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } else { + if (total_written >= QUEUE_T_DEFAULT_SIZE) { + unsigned long long time_diff = thread->curtime - last_sent_; + if (time_diff == 0) { // sending data really too fast! + unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl; + pause_until = thread->curtime + add_; + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } else { + float current_Bps = (float)total_written*1000*1000/time_diff; + if (current_Bps > mysql_thread___throttle_max_bytes_per_second_to_client) { + unsigned long long add_ = 1000000/tps; + pause_until = thread->curtime + add_; + assert(pause_until > thread->curtime); + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } + } + } + } } + if (mybe) { if (mybe->server_myds) mybe->server_myds->write_to_net_poll(); }