Merge branch 'v1.4.1-1034' into v1.4.4-bandwidth

pull/1243/head
René Cannaò 9 years ago
commit 44cc349a09

@ -73,6 +73,7 @@ class MySQL_Data_Stream
PtrSize_t multi_pkt;
unsigned long long pause_until;
unsigned long long wait_until;
unsigned long long killed_at;
unsigned long long max_connect_time;
@ -135,6 +136,7 @@ class MySQL_Data_Stream
int write_to_net();
int write_to_net_poll();
bool available_data_out();
void remove_pollout();
void set_pollout();
void mysql_free();

@ -372,6 +372,8 @@ class MySQL_Threads_Handler
int query_digests_max_digest_length;
int query_digests_max_query_length;
int wait_timeout;
int throttle_max_bytes_per_second_to_client;
int throttle_ratio_server_to_client;
int max_connections;
int max_stmts_per_connection;
int max_stmts_cache;

@ -594,6 +594,8 @@ __thread int mysql_thread___max_transaction_time;
__thread int mysql_thread___threshold_query_length;
__thread int mysql_thread___threshold_resultset_size;
__thread int mysql_thread___wait_timeout;
__thread int mysql_thread___throttle_max_bytes_per_second_to_client;
__thread int mysql_thread___throttle_ratio_server_to_client;
__thread int mysql_thread___max_connections;
__thread int mysql_thread___max_stmts_per_connection;
__thread int mysql_thread___max_stmts_cache;
@ -694,6 +696,8 @@ extern __thread int mysql_thread___max_transaction_time;
extern __thread int mysql_thread___threshold_query_length;
extern __thread int mysql_thread___threshold_resultset_size;
extern __thread int mysql_thread___wait_timeout;
extern __thread int mysql_thread___throttle_max_bytes_per_second_to_client;
extern __thread int mysql_thread___throttle_ratio_server_to_client;
extern __thread int mysql_thread___max_connections;
extern __thread int mysql_thread___max_stmts_per_connection;
extern __thread int mysql_thread___max_stmts_cache;

@ -411,6 +411,11 @@ void MySQL_Session::reset_all_backends() {
};
void MySQL_Session::writeout() {
int tps = 10; // throttling per second , by default every 100ms
int total_written = 0;
unsigned long long last_sent_=0;
int mwpl = mysql_thread___throttle_max_bytes_per_second_to_client; // max writes per call
mwpl = mwpl/tps;
if (client_myds) client_myds->array2buffer_full();
if (mybe && mybe->server_myds && mybe->server_myds->myds_type==MYDS_BACKEND) {
if (session_type==PROXYSQL_SESSION_MYSQL) {
@ -423,14 +428,18 @@ void MySQL_Session::writeout() {
mybe->server_myds->array2buffer_full();
}
}
if (client_myds) {
if (client_myds && thread->curtime >= client_myds->pause_until) {
if (mirror==false) {
bool runloop=false;
if (client_myds->mypolls) {
last_sent_ = client_myds->mypolls->last_sent[client_myds->poll_fds_idx];
}
int retbytes=client_myds->write_to_net_poll();
total_written+=retbytes;
if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat
runloop=true;
}
while (runloop) {
while (runloop && total_written < mwpl) {
runloop=false; // the default
client_myds->array2buffer_full();
struct pollfd fds;
@ -441,6 +450,7 @@ void MySQL_Session::writeout() {
if (retpoll>0) {
if (fds.revents==POLLOUT) {
retbytes=client_myds->write_to_net_poll();
total_written+=retbytes;
if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat
runloop=true;
}
@ -449,6 +459,36 @@ void MySQL_Session::writeout() {
}
}
}
// flow control
if (total_written > 0) {
if (total_written > mwpl) {
unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl;
pause_until = thread->curtime + add_;
client_myds->remove_pollout();
client_myds->pause_until = thread->curtime + add_;
} else {
if (total_written >= QUEUE_T_DEFAULT_SIZE) {
unsigned long long time_diff = thread->curtime - last_sent_;
if (time_diff == 0) { // sending data really too fast!
unsigned long long add_ = 1000000/tps + 1000000/tps*((unsigned long long)total_written - (unsigned long long)mwpl)/mwpl;
pause_until = thread->curtime + add_;
client_myds->remove_pollout();
client_myds->pause_until = thread->curtime + add_;
} else {
float current_Bps = (float)total_written*1000*1000/time_diff;
if (current_Bps > mysql_thread___throttle_max_bytes_per_second_to_client) {
unsigned long long add_ = 1000000/tps;
pause_until = thread->curtime + add_;
assert(pause_until > thread->curtime);
client_myds->remove_pollout();
client_myds->pause_until = thread->curtime + add_;
}
}
}
}
}
if (mybe) {
if (mybe->server_myds) mybe->server_myds->write_to_net_poll();
}

@ -255,6 +255,8 @@ static char * mysql_thread_variables_names[]= {
(char *)"query_digests_max_digest_length",
(char *)"query_digests_max_query_length",
(char *)"wait_timeout",
(char *)"throttle_max_bytes_per_second_to_client",
(char *)"throttle_ratio_server_to_client",
(char *)"max_connections",
(char *)"max_stmts_per_connection",
(char *)"max_stmts_cache",
@ -362,6 +364,8 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.query_digests_max_digest_length=2*1024;
variables.query_digests_max_query_length=65000; // legacy default
variables.wait_timeout=8*3600*1000;
variables.throttle_max_bytes_per_second_to_client=2147483647;
variables.throttle_ratio_server_to_client=0;
variables.max_connections=10*1000;
variables.max_stmts_per_connection=20;
variables.max_stmts_cache=10000;
@ -617,6 +621,8 @@ int MySQL_Threads_Handler::get_variable_int(char *name) {
if (!strcasecmp(name,"query_digests_max_digest_length")) return (int)variables.query_digests_max_digest_length;
if (!strcasecmp(name,"query_digests_max_query_length")) return (int)variables.query_digests_max_query_length;
if (!strcasecmp(name,"wait_timeout")) return (int)variables.wait_timeout;
if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) return (int)variables.throttle_max_bytes_per_second_to_client;
if (!strcasecmp(name,"throttle_ratio_server_to_client")) return (int)variables.throttle_ratio_server_to_client;
if (!strcasecmp(name,"max_connections")) return (int)variables.max_connections;
if (!strcasecmp(name,"max_stmts_per_connection")) return (int)variables.max_stmts_per_connection;
if (!strcasecmp(name,"max_stmts_cache")) return (int)variables.max_stmts_cache;
@ -894,6 +900,14 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
sprintf(intbuf,"%d",variables.wait_timeout);
return strdup(intbuf);
}
if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) {
sprintf(intbuf,"%d",variables.throttle_max_bytes_per_second_to_client);
return strdup(intbuf);
}
if (!strcasecmp(name,"throttle_ratio_server_to_client")) {
sprintf(intbuf,"%d",variables.throttle_ratio_server_to_client);
return strdup(intbuf);
}
if (!strcasecmp(name,"max_connections")) {
sprintf(intbuf,"%d",variables.max_connections);
return strdup(intbuf);
@ -1321,6 +1335,24 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
}
}
#endif // IDLE_THREADS
if (!strcasecmp(name,"throttle_max_bytes_per_second_to_client")) {
int intv=atoi(value);
if (intv >= 1024 && intv <= 2147483647) {
variables.throttle_max_bytes_per_second_to_client=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"throttle_ratio_server_to_client")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 100) {
variables.throttle_ratio_server_to_client=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"max_connections")) {
int intv=atoi(value);
if (intv >= 1 && intv <= 1000*1000) {
@ -2477,6 +2509,17 @@ __mysql_thread_exit_add_mirror:
mypolls.myds[n]->set_pollout();
}
}
if (myds && myds->sess->pause_until > curtime) {
if (myds->myds_type==MYDS_FRONTEND) {
mypolls.myds[n]->remove_pollout();
}
if (myds->myds_type==MYDS_BACKEND) {
if (mysql_thread___throttle_ratio_server_to_client) {
mypolls.fds[n].events = 0;
}
}
}
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
}
@ -3092,6 +3135,8 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___query_digests_max_digest_length=GloMTH->get_variable_int((char *)"query_digests_max_digest_length");
mysql_thread___query_digests_max_query_length=GloMTH->get_variable_int((char *)"query_digests_max_query_length");
mysql_thread___wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout");
mysql_thread___throttle_max_bytes_per_second_to_client=GloMTH->get_variable_int((char *)"throttle_max_bytes_per_second_to_client");
mysql_thread___throttle_ratio_server_to_client=GloMTH->get_variable_int((char *)"throttle_ratio_server_to_client");
mysql_thread___max_connections=GloMTH->get_variable_int((char *)"max_connections");
mysql_thread___max_stmts_per_connection=GloMTH->get_variable_int((char *)"max_stmts_per_connection");
mysql_thread___max_stmts_cache=GloMTH->get_variable_int((char *)"max_stmts_cache");

@ -561,7 +561,7 @@ void MySQL_Connection::set_is_client() {
#define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0)
MDB_ASYNC_ST MySQL_Connection::handler(short event) {
unsigned int processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event
unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event
if (mysql==NULL) {
// it is the first time handler() is being called
async_state_machine=ASYNC_CONNECT_START;
@ -922,7 +922,11 @@ handler_again:
__sync_fetch_and_add(&parent->bytes_recv,br);
myds->sess->thread->status_variables.queries_backends_bytes_recv+=br;
processed_bytes+=br; // issue #527 : this variable will store the amount of bytes processed during this event
if (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8) {
if (
(processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size*8)
||
( mysql_thread___throttle_ratio_server_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client/10*(unsigned long long)mysql_thread___throttle_ratio_server_to_client) )
) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause
} else {
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping

@ -103,6 +103,7 @@ MySQL_Data_Stream::MySQL_Data_Stream() {
connect_retries_on_failure=0;
max_connect_time=0;
wait_until=0;
pause_until=0;
connect_tries=0;
poll_fds_idx=-1;
resultset_length=0;
@ -352,6 +353,12 @@ bool MySQL_Data_Stream::available_data_out() {
return false;
}
void MySQL_Data_Stream::remove_pollout() {
struct pollfd *_pollfd;
_pollfd=&mypolls->fds[poll_fds_idx];
_pollfd->events = 0;
}
void MySQL_Data_Stream::set_pollout() {
struct pollfd *_pollfd;
_pollfd=&mypolls->fds[poll_fds_idx];

Loading…
Cancel
Save