diff --git a/lib/Standard_MySQL_Thread.cpp b/lib/Standard_MySQL_Thread.cpp index 5e1ff76e9..cc7d9ac3d 100644 --- a/lib/Standard_MySQL_Thread.cpp +++ b/lib/Standard_MySQL_Thread.cpp @@ -336,7 +336,7 @@ class Standard_MySQL_Threads_Handler: public MySQL_Threads_Handler } } if (!strcmp(name,"threads")) { - int intv=atoi(value); + unsigned int intv=atoi(value); if ((num_threads==0 || num_threads==intv) && intv > 0 && intv < 128) { num_threads=intv; return true; @@ -550,11 +550,8 @@ virtual void run() { int rc; //int arg_on=1; - int loops=0; // FIXME: debug - unsigned long long oldtime=monotonic_time(); -// unsigned long curtime=monotonic_time(); while (shutdown==0) { @@ -565,12 +562,6 @@ virtual void run() { } } - loops++; - if (loops>100) { - loops-=10; - } - - proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Calling poll"); rc=poll(mypolls.fds,mypolls.len,mysql_thread___poll_timeout); @@ -592,8 +583,6 @@ virtual void run() { } - //unsigned long long curtime=monotonic_time(); - if (__sync_add_and_fetch(&__global_MySQL_Thread_Variables_version,0) > __thread_MySQL_Thread_Variables_version) { refresh_variables(); } @@ -604,29 +593,25 @@ virtual void run() { + MySQL_Data_Stream *myds=mypolls.myds[n]; if (mypolls.fds[n].revents==0) { switch(mypolls.myds[n]->myds_type) { - //MySQL_Data_Stream *myds=(MySQL_Data_Stream *)events[n].data.ptr; case MYDS_BACKEND_NOT_CONNECTED: -// if (mypolls.fds[n].revents==0 && myds->myds_type==MYDS_BACKEND_NOT_CONNECTED) { - if (curtime>mypolls.last_recv[n]+10000000) { - fprintf(stderr, "connect() timeout %d curtime: %llu last_recv: %llu\n", __LINE__, curtime, mypolls.last_recv[n]); - myds->myds_type=MYDS_BACKEND_FAILED_CONNECT; - myds->sess->pause=curtime+10000000; - myds->sess->to_process=1; - } + myds_backend_set_failed_connect(myds,n); break; -// if (mypolls.fds[n].revents==0 && myds->myds_type==MYDS_BACKEND_PAUSE_CONNECT) { - case MYDS_BACKEND_PAUSE_CONNECT: + case MYDS_BACKEND_PAUSE_CONNECT: + myds_backend_set_failed_connect(myds,n); +/* if (curtime>mypolls.last_recv[n]+10000000) { fprintf(stderr, "connect() timeout %d\n", __LINE__); myds->myds_type=MYDS_BACKEND_FAILED_CONNECT; myds->sess->pause=curtime+10000000; myds->sess->to_process=1; } - break; +*/ + break; default: //if (mypolls.fds[n].revents==0 && ( mypolls.myds[n]->myds_type!=MYDS_BACKEND_NOT_CONNECTED && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT ) ) continue; continue; @@ -687,6 +672,15 @@ virtual void run() { break; } // data on exiting connection + process_data_on_data_stream(myds, n); + } + } + // iterate through all sessions and process the session logic + process_all_sessions(); + } +}; + +void process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { mypolls.last_recv[n]=curtime; myds->revents=mypolls.fds[n].revents; myds->read_from_net(); @@ -702,45 +696,51 @@ virtual void run() { if (sess->client_myds==myds) sess->client_myds=NULL; if (sess->server_myds==myds) sess->server_myds=NULL; delete myds; - myds=NULL; + myds=NULL; // useless? // FIXME // if (sess->client_myds==NULL && sess->server_myds==NULL) { // mysql_sessions->remove_fast(sess); // delete sess; // continue; // } - } - // always move pkts from queue to evbuffer - // sess->writeout(); - //if (myds) myds->write_to_net_poll(); - } - } - // iterate through all sessions and process the session logic - for (n=0; nlen; n++) { - MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n); - if (sess->healthy==0) { - unregister_session(n); - n--; - delete sess; - } else { - if (sess->to_process==1 || sess->pause<=curtime ) { - if (sess->pause <= curtime ) sess->pause=0; - if (sess->pause_until <= curtime) { - rc=sess->handler(); - if (rc==-1) { - unregister_session(n); - n--; - delete sess; - } + } +} + +void process_all_sessions() { + unsigned int n; + int rc; + for (n=0; nlen; n++) { + MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n); + if (sess->healthy==0) { + unregister_session(n); + n--; + delete sess; + } else { + if (sess->to_process==1 || sess->pause<=curtime ) { + if (sess->pause <= curtime ) sess->pause=0; + if (sess->pause_until <= curtime) { + rc=sess->handler(); + if (rc==-1) { + unregister_session(n); + n--; + delete sess; } } } } + } +} +void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n) { + if (curtime>mypolls.last_recv[n]+10000000) { + proxy_error("connect() timeout . curtime: %llu , last_recv: %llu , failed after %lluus . fd: %d , myds_type: %s\n", curtime, mypolls.last_recv[n] , (curtime-mypolls.last_recv[n]) , myds->fd, (myds->myds_type==MYDS_BACKEND_PAUSE_CONNECT ? "MYDS_BACKEND_PAUSE_CONNECT" : "MYDS_BACKEND_NOT_CONNECTED" ) ); + myds->myds_type=MYDS_BACKEND_FAILED_CONNECT; + myds->sess->pause=curtime+10000000; + myds->sess->to_process=1; } -}; -}; +} +}; // end class Standard_MySQL_Thread extern "C" MySQL_Threads_Handler * create_MySQL_Threads_Handler_func() { return new Standard_MySQL_Threads_Handler();