From 3e8929d9c8d33d90d3bb696fc86f0d6ae061597b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 7 Feb 2015 08:28:39 +0000 Subject: [PATCH] Clean up on Standard_MySQL_Thread To improve readability, added these functions: - myds_backend_pause_connect() - myds_backend_first_packet_after_connect() - listener_handle_new_connection() --- lib/Standard_MySQL_Thread.cpp | 103 +++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/lib/Standard_MySQL_Thread.cpp b/lib/Standard_MySQL_Thread.cpp index cc7d9ac3d..62a8d8925 100644 --- a/lib/Standard_MySQL_Thread.cpp +++ b/lib/Standard_MySQL_Thread.cpp @@ -597,74 +597,43 @@ virtual void run() { MySQL_Data_Stream *myds=mypolls.myds[n]; if (mypolls.fds[n].revents==0) { - switch(mypolls.myds[n]->myds_type) { + switch(myds->myds_type) { case MYDS_BACKEND_NOT_CONNECTED: myds_backend_set_failed_connect(myds,n); break; case MYDS_BACKEND_PAUSE_CONNECT: myds_backend_set_failed_connect(myds,n); -/* - if (curtime>mypolls.last_recv[n]+10000000) { - fprintf(stderr, "connect() timeout %d\n", __LINE__); - myds->myds_type=MYDS_BACKEND_FAILED_CONNECT; - myds->sess->pause=curtime+10000000; - myds->sess->to_process=1; - } -*/ break; - default: - //if (mypolls.fds[n].revents==0 && ( mypolls.myds[n]->myds_type!=MYDS_BACKEND_NOT_CONNECTED && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT ) ) continue; - continue; - break; + default: + //if (mypolls.fds[n].revents==0 && ( mypolls.myds[n]->myds_type!=MYDS_BACKEND_NOT_CONNECTED && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT ) ) continue; + continue; + break; } + } else { - int c; - switch(mypolls.myds[n]->myds_type) { + + switch(myds->myds_type) { case MYDS_BACKEND_NOT_CONNECTED: // if (myds->myds_type==MYDS_BACKEND_NOT_CONNECTED && mypolls.fds[n].revents) { if ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) { - fprintf(stderr, "connect() timeout %d\n", __LINE__); - myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT; - myds->sess->pause=curtime+10000000; - myds->sess->to_process=1; + // error on connect + myds_backend_pause_connect(myds); continue; } if (mypolls.fds[n].revents & POLLOUT) { - // connect from a not blocking socket - int optval; - socklen_t optlen=sizeof(optval); - getsockopt(myds->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen); - if (optval==0) { - mypolls.last_recv[n]=curtime; - myds->myds_type=MYDS_BACKEND; - myds->sess->pause=0; - } else { - fprintf(stderr,"Connect() error\n"); - myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT; - myds->sess->pause=curtime+10000000; - } + // first data on connect from a not blocking socket + myds_backend_first_packet_after_connect(myds, n); } break; case MYDS_LISTENER: -// if (myds->myds_type==MYDS_LISTENER && mypolls.fds[n].revents) { // we got a new connection! - c=accept(myds->fd, NULL, NULL); - if (c>-1) { // accept() succeeded - // create a new client connection - mypolls.fds[n].revents=0; - MySQL_Session *sess=create_new_session_and_client_data_stream(c); - //sess->myprot_client.generate_pkt_initial_handshake(sess->client_myds,true,NULL,NULL); - sess->myprot_client.generate_pkt_initial_handshake(true,NULL,NULL); - ioctl_FIONBIO(sess->client_fd, 1); - mypolls.add(POLLIN|POLLOUT, sess->client_fd, sess->client_myds, curtime); - proxy_debug(PROXY_DEBUG_NET,1,"Session=%p -- Adding client FD %d\n", sess, sess->client_fd); - } - // if we arrive here, accept() failed + listener_handle_new_connection(myds,n); continue; break; case MYDS_FRONTEND: -// if (myds->myds_type==MYDS_FRONTEND && mypolls.fds[n].revents) { + // detected an error on backend if ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) { + // FIXME: try to handle it in a more graceful way myds->sess->healthy=0; } break; @@ -740,6 +709,48 @@ void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n) { } } +void myds_backend_pause_connect(MySQL_Data_Stream *myds) { + proxy_error("connect() error on fd %d . Pausing ...\n", myds->fd); + myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT; + myds->sess->pause=curtime+10000000; + myds->sess->to_process=1; +} + +void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n) { + int optval; + socklen_t optlen=sizeof(optval); + getsockopt(myds->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen); + if (optval==0) { + mypolls.last_recv[n]=curtime; + myds->myds_type=MYDS_BACKEND; + myds->sess->pause=0; + } else { + fprintf(stderr,"Connect() error\n"); + myds->myds_type=MYDS_BACKEND_PAUSE_CONNECT; + myds->sess->pause=curtime+10000000; + } +} + + +void listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n) { + int c; + c=accept(myds->fd, NULL, NULL); + if (c>-1) { // accept() succeeded + // create a new client connection + mypolls.fds[n].revents=0; + MySQL_Session *sess=create_new_session_and_client_data_stream(c); + //sess->myprot_client.generate_pkt_initial_handshake(sess->client_myds,true,NULL,NULL); + sess->myprot_client.generate_pkt_initial_handshake(true,NULL,NULL); + ioctl_FIONBIO(sess->client_fd, 1); + mypolls.add(POLLIN|POLLOUT, sess->client_fd, sess->client_myds, curtime); + proxy_debug(PROXY_DEBUG_NET,1,"Session=%p -- Adding client FD %d\n", sess, sess->client_fd); + } else { + // if we arrive here, accept() failed + // because multiple threads try to handle the same incoming connection, this is OK + } +} + + }; // end class Standard_MySQL_Thread extern "C" MySQL_Threads_Handler * create_MySQL_Threads_Handler_func() {