Clean up on Standard_MySQL_Thread

Removed unnecessary code
To improve readability, added these functions:
- process_data_on_data_stream()
- process_all_sessions()
- myds_backend_set_failed_connect()
pull/190/head
René Cannaò 11 years ago
parent b84960da67
commit cc9e635abd

@ -336,7 +336,7 @@ class Standard_MySQL_Threads_Handler: public MySQL_Threads_Handler
}
}
if (!strcmp(name,"threads")) {
int intv=atoi(value);
unsigned int intv=atoi(value);
if ((num_threads==0 || num_threads==intv) && intv > 0 && intv < 128) {
num_threads=intv;
return true;
@ -550,11 +550,8 @@ virtual void run() {
int rc;
//int arg_on=1;
int loops=0; // FIXME: debug
unsigned long long oldtime=monotonic_time();
// unsigned long curtime=monotonic_time();
while (shutdown==0) {
@ -565,12 +562,6 @@ virtual void run() {
}
}
loops++;
if (loops>100) {
loops-=10;
}
proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Calling poll");
rc=poll(mypolls.fds,mypolls.len,mysql_thread___poll_timeout);
@ -592,8 +583,6 @@ virtual void run() {
}
//unsigned long long curtime=monotonic_time();
if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) {
refresh_variables();
}
@ -604,29 +593,25 @@ virtual void run() {
MySQL_Data_Stream *myds=mypolls.myds[n];
if (mypolls.fds[n].revents==0) {
switch(mypolls.myds[n]->myds_type) {
//MySQL_Data_Stream *myds=(MySQL_Data_Stream *)events[n].data.ptr;
case MYDS_BACKEND_NOT_CONNECTED:
// if (mypolls.fds[n].revents==0 && myds->myds_type==MYDS_BACKEND_NOT_CONNECTED) {
if (curtime>mypolls.last_recv[n]+10000000) {
fprintf(stderr, "connect() timeout %d curtime: %llu last_recv: %llu\n", __LINE__, curtime, mypolls.last_recv[n]);
myds->myds_type=MYDS_BACKEND_FAILED_CONNECT;
myds->sess->pause=curtime+10000000;
myds->sess->to_process=1;
}
myds_backend_set_failed_connect(myds,n);
break;
// if (mypolls.fds[n].revents==0 && myds->myds_type==MYDS_BACKEND_PAUSE_CONNECT) {
case MYDS_BACKEND_PAUSE_CONNECT:
case MYDS_BACKEND_PAUSE_CONNECT:
myds_backend_set_failed_connect(myds,n);
/*
if (curtime>mypolls.last_recv[n]+10000000) {
fprintf(stderr, "connect() timeout %d\n", __LINE__);
myds->myds_type=MYDS_BACKEND_FAILED_CONNECT;
myds->sess->pause=curtime+10000000;
myds->sess->to_process=1;
}
break;
*/
break;
default:
//if (mypolls.fds[n].revents==0 && ( mypolls.myds[n]->myds_type!=MYDS_BACKEND_NOT_CONNECTED && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT ) ) continue;
continue;
@ -687,6 +672,15 @@ virtual void run() {
break;
}
// data on exiting connection
process_data_on_data_stream(myds, n);
}
}
// iterate through all sessions and process the session logic
process_all_sessions();
}
};
void process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
mypolls.last_recv[n]=curtime;
myds->revents=mypolls.fds[n].revents;
myds->read_from_net();
@ -702,45 +696,51 @@ virtual void run() {
if (sess->client_myds==myds) sess->client_myds=NULL;
if (sess->server_myds==myds) sess->server_myds=NULL;
delete myds;
myds=NULL;
myds=NULL; // useless?
// FIXME
// if (sess->client_myds==NULL && sess->server_myds==NULL) {
// mysql_sessions->remove_fast(sess);
// delete sess;
// continue;
// }
}
// always move pkts from queue to evbuffer
// sess->writeout();
//if (myds) myds->write_to_net_poll();
}
}
// iterate through all sessions and process the session logic
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
if (sess->healthy==0) {
unregister_session(n);
n--;
delete sess;
} else {
if (sess->to_process==1 || sess->pause<=curtime ) {
if (sess->pause <= curtime ) sess->pause=0;
if (sess->pause_until <= curtime) {
rc=sess->handler();
if (rc==-1) {
unregister_session(n);
n--;
delete sess;
}
}
}
void process_all_sessions() {
unsigned int n;
int rc;
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
if (sess->healthy==0) {
unregister_session(n);
n--;
delete sess;
} else {
if (sess->to_process==1 || sess->pause<=curtime ) {
if (sess->pause <= curtime ) sess->pause=0;
if (sess->pause_until <= curtime) {
rc=sess->handler();
if (rc==-1) {
unregister_session(n);
n--;
delete sess;
}
}
}
}
}
}
void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n) {
if (curtime>mypolls.last_recv[n]+10000000) {
proxy_error("connect() timeout . curtime: %llu , last_recv: %llu , failed after %lluus . fd: %d , myds_type: %s\n", curtime, mypolls.last_recv[n] , (curtime-mypolls.last_recv[n]) , myds->fd, (myds->myds_type==MYDS_BACKEND_PAUSE_CONNECT ? "MYDS_BACKEND_PAUSE_CONNECT" : "MYDS_BACKEND_NOT_CONNECTED" ) );
myds->myds_type=MYDS_BACKEND_FAILED_CONNECT;
myds->sess->pause=curtime+10000000;
myds->sess->to_process=1;
}
};
};
}
}; // end class Standard_MySQL_Thread
extern "C" MySQL_Threads_Handler * create_MySQL_Threads_Handler_func() {
return new Standard_MySQL_Threads_Handler();

Loading…
Cancel
Save