Clean up on Standard_MySQL_Thread

To improve readability, added these functions:
- myds_backend_pause_connect()
- myds_backend_first_packet_after_connect()
- listener_handle_new_connection()
pull/190/head
René Cannaò 11 years ago
parent cc9e635abd
commit 3e8929d9c8

@ -597,74 +597,43 @@ virtual void run() {
MySQL_Data_Stream *myds=mypolls.myds[n];
if (mypolls.fds[n].revents==0) {
switch(mypolls.myds[n]->myds_type) {
switch(myds->myds_type) {
case MYDS_BACKEND_NOT_CONNECTED:
myds_backend_set_failed_connect(myds,n);
break;
case MYDS_BACKEND_PAUSE_CONNECT:
myds_backend_set_failed_connect(myds,n);
/*
if (curtime>mypolls.last_recv[n]+10000000) {
fprintf(stderr, "connect() timeout %d\n", __LINE__);
myds->myds_type=MYDS_BACKEND_FAILED_CONNECT;
myds->sess->pause=curtime+10000000;
myds->sess->to_process=1;
}
*/
break;
default:
//if (mypolls.fds[n].revents==0 && ( mypolls.myds[n]->myds_type!=MYDS_BACKEND_NOT_CONNECTED && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT ) ) continue;
continue;
break;
default:
//if (mypolls.fds[n].revents==0 && ( mypolls.myds[n]->myds_type!=MYDS_BACKEND_NOT_CONNECTED && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT ) ) continue;
continue;
break;
}
} else {
int c;
switch(mypolls.myds[n]->myds_type) {
switch(myds->myds_type) {
case MYDS_BACKEND_NOT_CONNECTED:
// if (myds->myds_type==MYDS_BACKEND_NOT_CONNECTED && mypolls.fds[n].revents) {
if ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) {
fprintf(stderr, "connect() timeout %d\n", __LINE__);
myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT;
myds->sess->pause=curtime+10000000;
myds->sess->to_process=1;
// error on connect
myds_backend_pause_connect(myds);
continue;
}
if (mypolls.fds[n].revents & POLLOUT) {
// connect from a not blocking socket
int optval;
socklen_t optlen=sizeof(optval);
getsockopt(myds->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen);
if (optval==0) {
mypolls.last_recv[n]=curtime;
myds->myds_type=MYDS_BACKEND;
myds->sess->pause=0;
} else {
fprintf(stderr,"Connect() error\n");
myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT;
myds->sess->pause=curtime+10000000;
}
// first data on connect from a not blocking socket
myds_backend_first_packet_after_connect(myds, n);
}
break;
case MYDS_LISTENER:
// if (myds->myds_type==MYDS_LISTENER && mypolls.fds[n].revents) {
// we got a new connection!
c=accept(myds->fd, NULL, NULL);
if (c>-1) { // accept() succeeded
// create a new client connection
mypolls.fds[n].revents=0;
MySQL_Session *sess=create_new_session_and_client_data_stream(c);
//sess->myprot_client.generate_pkt_initial_handshake(sess->client_myds,true,NULL,NULL);
sess->myprot_client.generate_pkt_initial_handshake(true,NULL,NULL);
ioctl_FIONBIO(sess->client_fd, 1);
mypolls.add(POLLIN|POLLOUT, sess->client_fd, sess->client_myds, curtime);
proxy_debug(PROXY_DEBUG_NET,1,"Session=%p -- Adding client FD %d\n", sess, sess->client_fd);
}
// if we arrive here, accept() failed
listener_handle_new_connection(myds,n);
continue;
break;
case MYDS_FRONTEND:
// if (myds->myds_type==MYDS_FRONTEND && mypolls.fds[n].revents) {
// detected an error on backend
if ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) {
// FIXME: try to handle it in a more graceful way
myds->sess->healthy=0;
}
break;
@ -740,6 +709,48 @@ void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n) {
}
}
void myds_backend_pause_connect(MySQL_Data_Stream *myds) {
proxy_error("connect() error on fd %d . Pausing ...\n", myds->fd);
myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT;
myds->sess->pause=curtime+10000000;
myds->sess->to_process=1;
}
void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n) {
int optval;
socklen_t optlen=sizeof(optval);
getsockopt(myds->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen);
if (optval==0) {
mypolls.last_recv[n]=curtime;
myds->myds_type=MYDS_BACKEND;
myds->sess->pause=0;
} else {
fprintf(stderr,"Connect() error\n");
myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT;
myds->sess->pause=curtime+10000000;
}
}
void listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n) {
int c;
c=accept(myds->fd, NULL, NULL);
if (c>-1) { // accept() succeeded
// create a new client connection
mypolls.fds[n].revents=0;
MySQL_Session *sess=create_new_session_and_client_data_stream(c);
//sess->myprot_client.generate_pkt_initial_handshake(sess->client_myds,true,NULL,NULL);
sess->myprot_client.generate_pkt_initial_handshake(true,NULL,NULL);
ioctl_FIONBIO(sess->client_fd, 1);
mypolls.add(POLLIN|POLLOUT, sess->client_fd, sess->client_myds, curtime);
proxy_debug(PROXY_DEBUG_NET,1,"Session=%p -- Adding client FD %d\n", sess, sess->client_fd);
} else {
// if we arrive here, accept() failed
// because multiple threads try to handle the same incoming connection, this is OK
}
}
}; // end class Standard_MySQL_Thread
extern "C" MySQL_Threads_Handler * create_MySQL_Threads_Handler_func() {

Loading…
Cancel
Save