Several code cleanup in MySQL_Thread

pull/3444/head
René Cannaò 5 years ago
parent ee3bf049d1
commit 57fd24c2d6

@ -216,14 +216,21 @@ class MySQL_Thread
~MySQL_Thread();
MySQL_Session * create_new_session_and_client_data_stream(int _fd);
bool init();
void run();
void run___get_multiple_idle_connections(int& num_idles);
void run___cleanup_mirror_queue();
void ProcessAllMyDS_BeforePoll();
void ProcessAllMyDS_AfterPoll();
void run();
void poll_listener_add(int sock);
void poll_listener_del(int sock);
void register_session(MySQL_Session*, bool up_start=true);
void unregister_session(int);
struct pollfd * get_pollfd(unsigned int i);
bool process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n);
void process_all_sessions();
void ProcessAllSessions_SortingSessions();
void ProcessAllSessions_CompletedMirrorSession(unsigned int& n, MySQL_Session *sess);
void ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_);
void process_all_sessions();
void refresh_variables();
void register_session_connection_handler(MySQL_Session *_sess, bool _new=false);
void unregister_session_connection_handler(int idx, bool _new=false);

@ -4091,6 +4091,128 @@ void MySQL_Thread::unregister_session(int idx) {
}
// this function was inline in MySQL_Thread::run()
void MySQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
int rc;
int i;
num_idles=MyHGM->get_multiple_idle_connections(-1, curtime-mysql_thread___ping_interval_server_msec*1000, my_idle_conns, SESSIONS_FOR_CONNECTIONS_HANDLER);
for (i=0; i<num_idles; i++) {
MySQL_Data_Stream *myds;
MySQL_Connection *mc=my_idle_conns[i];
MySQL_Session *sess=new MySQL_Session();
sess->mybe=sess->find_or_create_backend(mc->parent->myhgc->hid);
myds=sess->mybe->server_myds;
myds->attach_connection(mc);
myds->assign_fd_from_mysql_conn();
myds->myds_type=MYDS_BACKEND;
sess->to_process=1;
myds->wait_until=curtime+mysql_thread___ping_timeout_server*1000; // max_timeout
mc->last_time_used=curtime;
myds->myprot.init(&myds, myds->myconn->userinfo, NULL);
sess->status=PINGING_SERVER;
myds->DSS=STATE_MARIADB_PING;
register_session_connection_handler(sess,true);
int rc=sess->handler();
if (rc==-1) {
unsigned int sess_idx=mysql_sessions->len-1;
unregister_session(sess_idx);
delete sess;
}
}
processing_idles=true;
last_processing_idles=curtime;
}
// this function was inline in MySQL_Thread::run()
void MySQL_Thread::ProcessAllMyDS_BeforePoll() {
unsigned int n;
for (n = 0; n < mypolls.len; n++) {
MySQL_Data_Stream *myds=NULL;
myds=mypolls.myds[n];
mypolls.fds[n].revents=0;
if (myds) {
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
// here we try to move it to the maintenance thread
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
if (move_session_to_idle_mysql_sessions(myds, n)) {
n--; // compensate mypolls.remove_index_fast(n) and n++ of loop
continue;
}
}
}
}
#endif // IDLE_THREADS
if (unlikely(myds->wait_until)) {
tune_timeout_for_myds_needs_pause(myds);
}
if (myds->sess) {
if (unlikely(myds->sess->pause_until > 0)) {
tune_timeout_for_session_needs_pause(myds);
}
}
myds->revents=0;
if (myds->myds_type!=MYDS_LISTENER) {
configure_pollout(myds, n);
}
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
}
}
// this function was inline in MySQL_Thread::run()
void MySQL_Thread::ProcessAllMyDS_AfterPoll() {
int rc;
for (unsigned int n = 0; n < mypolls.len; n++) {
proxy_debug(PROXY_DEBUG_NET,3, "poll for fd %d events %d revents %d\n", mypolls.fds[n].fd , mypolls.fds[n].events, mypolls.fds[n].revents);
MySQL_Data_Stream *myds=mypolls.myds[n];
if (myds==NULL) {
read_one_byte_from_pipe(n);
continue;
}
if (mypolls.fds[n].revents==0) {
if (poll_timeout_bool) {
check_timing_out_session(n);
}
} else {
check_for_invalid_fd(n); // this is designed to assert in case of failure
switch(myds->myds_type) {
// Note: this logic that was here was removed completely because we added mariadb client library.
case MYDS_LISTENER:
// we got a new connection!
listener_handle_new_connection(myds,n);
continue;
break;
default:
break;
}
// data on exiting connection
bool rc=process_data_on_data_stream(myds, n);
if (rc==false) {
n--;
}
}
}
}
// this function was inline in MySQL_Thread::run()
void MySQL_Thread::run___cleanup_mirror_queue() {
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions_cache->len > l) {
while (mirror_queue_mysql_sessions_cache->len > mirror_queue_mysql_sessions->len && mirror_queue_mysql_sessions_cache->len > l) {
MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions_cache->remove_index_fast(0);
__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
GloMTH->status_variables.p_gauge_array[p_th_gauge::mirror_concurrency]->Increment();
delete newsess;
}
}
}
// main loop
void MySQL_Thread::run() {
@ -4130,35 +4252,7 @@ void MySQL_Thread::run() {
processing_idles=false;
}
if (processing_idles==false && (last_processing_idles < curtime-mysql_thread___ping_interval_server_msec*1000) ) {
int i;
num_idles=MyHGM->get_multiple_idle_connections(-1, curtime-mysql_thread___ping_interval_server_msec*1000, my_idle_conns, SESSIONS_FOR_CONNECTIONS_HANDLER);
for (i=0; i<num_idles; i++) {
MySQL_Data_Stream *myds;
MySQL_Connection *mc=my_idle_conns[i];
MySQL_Session *sess=new MySQL_Session();
sess->mybe=sess->find_or_create_backend(mc->parent->myhgc->hid);
myds=sess->mybe->server_myds;
myds->attach_connection(mc);
myds->assign_fd_from_mysql_conn();
myds->myds_type=MYDS_BACKEND;
sess->to_process=1;
myds->wait_until=curtime+mysql_thread___ping_timeout_server*1000; // max_timeout
mc->last_time_used=curtime;
myds->myprot.init(&myds, myds->myconn->userinfo, NULL);
sess->status=PINGING_SERVER;
myds->DSS=STATE_MARIADB_PING;
register_session_connection_handler(sess,true);
int rc=sess->handler();
if (rc==-1) {
unsigned int sess_idx=mysql_sessions->len-1;
unregister_session(sess_idx);
delete sess;
}
}
processing_idles=true;
last_processing_idles=curtime;
run___get_multiple_idle_connections(num_idles);
}
#ifdef IDLE_THREADS
@ -4172,39 +4266,7 @@ __run_skip_1:
handle_mirror_queue_mysql_sessions();
for (n = 0; n < mypolls.len; n++) {
MySQL_Data_Stream *myds=NULL;
myds=mypolls.myds[n];
mypolls.fds[n].revents=0;
if (myds) {
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
// here we try to move it to the maintenance thread
if (myds->myds_type==MYDS_FRONTEND && myds->sess) {
if (myds->DSS==STATE_SLEEP && myds->sess->status==WAITING_CLIENT_DATA) {
if (move_session_to_idle_mysql_sessions(myds, n)) {
n--; // compensate mypolls.remove_index_fast(n) and n++ of loop
continue;
}
}
}
}
#endif // IDLE_THREADS
if (unlikely(myds->wait_until)) {
tune_timeout_for_myds_needs_pause(myds);
}
if (myds->sess) {
if (unlikely(myds->sess->pause_until > 0)) {
tune_timeout_for_session_needs_pause(myds);
}
}
myds->revents=0;
if (myds->myds_type!=MYDS_LISTENER) {
configure_pollout(myds, n);
}
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
}
ProcessAllMyDS_BeforePoll();
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
@ -4310,15 +4372,7 @@ __run_skip_1a:
if (maintenance_loop) {
// house keeping
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions_cache->len > l) {
while (mirror_queue_mysql_sessions_cache->len > mirror_queue_mysql_sessions->len && mirror_queue_mysql_sessions_cache->len > l) {
MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions_cache->remove_index_fast(0);
__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
GloMTH->status_variables.p_gauge_array[p_th_gauge::mirror_concurrency]->Increment();
delete newsess;
}
}
run___cleanup_mirror_queue();
GloQPro->update_query_processor_stats();
}
@ -4375,37 +4429,7 @@ __run_skip_1a:
}
#endif // IDLE_THREADS
for (n = 0; n < mypolls.len; n++) {
proxy_debug(PROXY_DEBUG_NET,3, "poll for fd %d events %d revents %d\n", mypolls.fds[n].fd , mypolls.fds[n].events, mypolls.fds[n].revents);
MySQL_Data_Stream *myds=mypolls.myds[n];
if (myds==NULL) {
read_one_byte_from_pipe(n);
continue;
}
if (mypolls.fds[n].revents==0) {
if (poll_timeout_bool) {
check_timing_out_session(n);
}
} else {
check_for_invalid_fd(n); // this is designed to assert in case of failure
switch(myds->myds_type) {
// Note: this logic that was here was removed completely because we added mariadb client library.
case MYDS_LISTENER:
// we got a new connection!
listener_handle_new_connection(myds,n);
continue;
break;
default:
break;
}
// data on exiting connection
bool rc=process_data_on_data_stream(myds, n);
if (rc==false) {
n--;
}
}
}
ProcessAllMyDS_AfterPoll();
#ifdef IDLE_THREADS
__run_skip_2:
@ -4709,6 +4733,126 @@ bool MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
}
// this function was inline in MySQL_Thread::process_all_sessions()
void MySQL_Thread::ProcessAllSessions_SortingSessions() {
unsigned int a=0;
for (unsigned int n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
if (sess->mybe && sess->mybe->server_myds) {
if (sess->mybe->server_myds->max_connect_time) {
MySQL_Session *sess2=(MySQL_Session *)mysql_sessions->index(a);
if (sess2->mybe && sess2->mybe->server_myds && sess2->mybe->server_myds->max_connect_time && sess2->mybe->server_myds->max_connect_time <= sess->mybe->server_myds->max_connect_time) {
// do nothing
} else {
void *p=mysql_sessions->pdata[a];
mysql_sessions->pdata[a]=mysql_sessions->pdata[n];
mysql_sessions->pdata[n]=p;
a++;
}
}
}
}
}
// this function was inline in MySQL_Thread::process_all_sessions()
void MySQL_Thread::ProcessAllSessions_CompletedMirrorSession(unsigned int& n, MySQL_Session *sess) {
unregister_session(n);
n--;
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3;
if (mirror_queue_mysql_sessions_cache->len <= l) {
bool to_cache=true;
if (sess->mybe) {
if (sess->mybe->server_myds) {
to_cache=false;
}
}
if (to_cache) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
GloMTH->status_variables.p_gauge_array[p_th_gauge::mirror_concurrency]->Decrement();
mirror_queue_mysql_sessions_cache->add(sess);
} else {
delete sess;
}
} else {
delete sess;
}
}
// this function was inline in MySQL_Thread::process_all_sessions()
void MySQL_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_) {
unsigned int numTrx=0;
sess->active_transactions=sess->NumActiveTransactions();
{
unsigned long long sess_active_transactions = sess->active_transactions;
sess->active_transactions=sess->NumActiveTransactions();
// in case we detected a new transaction just now
if (sess->active_transactions == 0) {
sess->transaction_started_at = 0;
} else {
if (sess_active_transactions == 0) {
sess->transaction_started_at = curtime;
}
}
}
total_active_transactions_ += sess->active_transactions;
sess->to_process=1;
if ( (sess_time/1000 > (unsigned long long)mysql_thread___max_transaction_idle_time) || (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) ) {
//numTrx = sess->NumActiveTransactions();
numTrx = sess->active_transactions;
if (numTrx) {
// the session has idle transactions, kill it
if (sess_time/1000 > (unsigned long long)mysql_thread___max_transaction_idle_time) {
sess->killed=true;
if (sess->client_myds) {
proxy_warning("Killing client connection %s:%d because of (possible) transaction idle for %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, sess_time/1000);
}
}
} else {
// the session is idle, kill it
if (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) {
sess->killed=true;
if (sess->client_myds) {
proxy_warning("Killing client connection %s:%d because inactive for %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, sess_time/1000);
}
}
}
} else {
if (sess->active_transactions > 0) {
// here is all the logic related to max_transaction_time
unsigned long long trx_started = sess->transaction_started_at;
if (trx_started > 0 && curtime > trx_started) {
unsigned long long trx_time = curtime - trx_started;
unsigned long long trx_time_ms = trx_time/1000;
if (trx_time_ms > (unsigned long long)mysql_thread___max_transaction_time) {
sess->killed=true;
if (sess->client_myds) {
proxy_warning("Killing client connection %s:%d because of (possible) transaction running for %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, trx_time_ms);
}
}
}
}
}
if (servers_table_version_current != servers_table_version_previous) { // bug fix for #1085
// Immediatelly kill all client connections using an OFFLINE node when session_fast_forward == true
if (sess->session_fast_forward) {
if (sess->HasOfflineBackends()) {
sess->killed=true;
proxy_warning("Killing client connection %s:%d due to 'session_fast_forward' and offline backends\n", sess->client_myds->addr.addr, sess->client_myds->addr.port);
}
} else {
// Search for connections that should be terminated, and simulate data in them
// the following 2 lines of code replace the previous 2 lines
// instead of killing the sessions, fails the backend connections
if (sess->SetEventInOfflineBackends()) {
sess->to_process=1;
}
}
}
}
void MySQL_Thread::process_all_sessions() {
unsigned int n;
unsigned int total_active_transactions_=0;
@ -4723,23 +4867,7 @@ void MySQL_Thread::process_all_sessions() {
}
#endif // IDLE_THREADS
if (sess_sort && mysql_sessions->len > 3) {
unsigned int a=0;
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
if (sess->mybe && sess->mybe->server_myds) {
if (sess->mybe->server_myds->max_connect_time) {
MySQL_Session *sess2=(MySQL_Session *)mysql_sessions->index(a);
if (sess2->mybe && sess2->mybe->server_myds && sess2->mybe->server_myds->max_connect_time && sess2->mybe->server_myds->max_connect_time <= sess->mybe->server_myds->max_connect_time) {
// do nothing
} else {
void *p=mysql_sessions->pdata[a];
mysql_sessions->pdata[a]=mysql_sessions->pdata[n];
mysql_sessions->pdata[n]=p;
a++;
}
}
}
}
ProcessAllSessions_SortingSessions();
}
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
@ -4750,27 +4878,7 @@ void MySQL_Thread::process_all_sessions() {
#endif
if (sess->mirror==true) { // this is a mirror session
if (sess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
unregister_session(n);
n--;
unsigned int l = (unsigned int)mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3;
if (mirror_queue_mysql_sessions_cache->len <= l) {
bool to_cache=true;
if (sess->mybe) {
if (sess->mybe->server_myds) {
to_cache=false;
}
}
if (to_cache) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
GloMTH->status_variables.p_gauge_array[p_th_gauge::mirror_concurrency]->Decrement();
mirror_queue_mysql_sessions_cache->add(sess);
} else {
delete sess;
}
} else {
delete sess;
}
ProcessAllSessions_CompletedMirrorSession(n, sess);
continue;
}
}
@ -4782,80 +4890,12 @@ void MySQL_Thread::process_all_sessions() {
}
}
if (maintenance_loop) {
unsigned int numTrx=0;
unsigned long long sess_time = sess->IdleTime();
#ifdef IDLE_THREADS
if (idle_maintenance_thread==false)
#endif // IDLE_THREADS
{
sess->active_transactions=sess->NumActiveTransactions();
{
unsigned long long sess_active_transactions = sess->active_transactions;
sess->active_transactions=sess->NumActiveTransactions();
// in case we detected a new transaction just now
if (sess->active_transactions == 0) {
sess->transaction_started_at = 0;
} else {
if (sess_active_transactions == 0) {
sess->transaction_started_at = curtime;
}
}
}
total_active_transactions_ += sess->active_transactions;
sess->to_process=1;
if ( (sess_time/1000 > (unsigned long long)mysql_thread___max_transaction_idle_time) || (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) ) {
//numTrx = sess->NumActiveTransactions();
numTrx = sess->active_transactions;
if (numTrx) {
// the session has idle transactions, kill it
if (sess_time/1000 > (unsigned long long)mysql_thread___max_transaction_idle_time) {
sess->killed=true;
if (sess->client_myds) {
proxy_warning("Killing client connection %s:%d because of (possible) transaction idle for %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, sess_time/1000);
}
}
} else {
// the session is idle, kill it
if (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) {
sess->killed=true;
if (sess->client_myds) {
proxy_warning("Killing client connection %s:%d because inactive for %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, sess_time/1000);
}
}
}
} else {
if (sess->active_transactions > 0) {
// here is all the logic related to max_transaction_time
unsigned long long trx_started = sess->transaction_started_at;
if (trx_started > 0 && curtime > trx_started) {
unsigned long long trx_time = curtime - trx_started;
unsigned long long trx_time_ms = trx_time/1000;
if (trx_time_ms > (unsigned long long)mysql_thread___max_transaction_time) {
sess->killed=true;
if (sess->client_myds) {
proxy_warning("Killing client connection %s:%d because of (possible) transaction running for %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, trx_time_ms);
}
}
}
}
}
if (servers_table_version_current != servers_table_version_previous) { // bug fix for #1085
// Immediatelly kill all client connections using an OFFLINE node when session_fast_forward == true
if (sess->session_fast_forward) {
if (sess->HasOfflineBackends()) {
sess->killed=true;
proxy_warning("Killing client connection %s:%d due to 'session_fast_forward' and offline backends\n", sess->client_myds->addr.addr, sess->client_myds->addr.port);
}
}
else {
// Search for connections that should be terminated, and simulate data in them
// the following 2 lines of code replace the previous 2 lines
// instead of killing the sessions, fails the backend connections
if (sess->SetEventInOfflineBackends()) {
sess->to_process=1;
}
}
}
ProcessAllSessions_MaintenanceLoop(sess, sess_time, total_active_transactions_);
}
#ifdef IDLE_THREADS
else

Loading…
Cancel
Save