diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index a2f69a11c..6f1dd26d9 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -72,11 +72,18 @@ class MySQL_Session bool handler_CommitRollback(PtrSize_t *); bool handler_SetAutocommit(PtrSize_t *); void RequestEnd(MySQL_Data_Stream *); + + void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); + int handler_again___status_PINGING_SERVER(); + void handler_again___new_thread_to_kill_connection(); // void return_MySQL_Connection_To_Poll(MySQL_Data_Stream *); // MySQL_STMT_Manager *Session_STMT_Manager; + //this pointer is always initialized inside handler(). + // it is an attempt to start simplifying the complexing of handler() + PtrSize_t *pktH; public: void * operator new(size_t); void operator delete(void *); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index fd8643d8d..fff79aca4 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -608,6 +608,103 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { return false; } +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session() { + if (pktH->size < 15*1024*1024 && (qpo->mirror_hostgroup >= 0 || qpo->mirror_flagOUT >= 0)) { + MySQL_Session *newsess=new MySQL_Session(); + newsess->client_myds = new MySQL_Data_Stream(); + newsess->client_myds->DSS=STATE_SLEEP; + newsess->client_myds->sess=newsess; + newsess->client_myds->myds_type=MYDS_FRONTEND; + newsess->client_myds->PSarrayOUT= new PtrSizeArray(); + newsess->thread_session_id=__sync_fetch_and_add(&glovars.thread_id,1); + thread->register_session(newsess); + newsess->status=WAITING_CLIENT_DATA; + MySQL_Connection *myconn=new MySQL_Connection; + myconn->userinfo->set(client_myds->myconn->userinfo); + newsess->client_myds->attach_connection(myconn); + newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess); + newsess->to_process=1; + if (qpo->mirror_hostgroup>= 0) { + newsess->mirror_hostgroup=qpo->mirror_hostgroup; // in the new session we copy the mirror hostgroup + } else { + newsess->mirror_hostgroup=default_hostgroup; // copy the default + } + newsess->mirror_flagOUT=qpo->mirror_flagOUT; // in the new session we copy the mirror flagOUT + newsess->default_schema=strdup(default_schema); + newsess->mirror=true; + newsess->mirrorPkt.size=pktH->size; + newsess->mirrorPkt.ptr=l_alloc(newsess->mirrorPkt.size); + memcpy(newsess->mirrorPkt.ptr,pktH->ptr,pktH->size); + newsess->handler(); // execute immediately + newsess->to_process=0; + } +} + +int MySQL_Session::handler_again___status_PINGING_SERVER() { + assert(mybe->server_myds->myconn); + MySQL_Data_Stream *myds=mybe->server_myds; + MySQL_Connection *myconn=myds->myconn; + int rc=myconn->async_ping(myds->revents); +// if (myds->mypolls==NULL) { +// thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime); +// } + if (rc==0) { + myconn->async_state_machine=ASYNC_IDLE; + //if ((myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + if (mysql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { + myds->return_MySQL_Connection_To_Pool(); + } else { + myds->destroy_MySQL_Connection_From_Pool(true); + } + delete mybe->server_myds; + mybe->server_myds=NULL; + set_status(NONE); + return -1; + } else { + if (rc==-1 || rc==-2) { + if (rc==-2) { + proxy_error("Ping timeout during ping on %s , %d\n", myconn->parent->address, myconn->parent->port); + } else { // rc==-1 + int myerr=mysql_errno(myconn->mysql); + proxy_error("Detected a broken connection during ping on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->mysql)); + } + myds->destroy_MySQL_Connection_From_Pool(false); + myds->fd=0; + delete mybe->server_myds; + mybe->server_myds=NULL; + //thread->mypolls.remove_index_fast(myds->poll_fds_idx); + return -1; + } else { + // rc==1 , nothing to do for now +// tring to fix bug + if (myds->mypolls==NULL) { + thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime); + } +// tring to fix bug + } + } + return 0; +} + +void MySQL_Session::handler_again___new_thread_to_kill_connection() { + MySQL_Data_Stream *myds=mybe->server_myds; + if (myds->myconn && myds->myconn->mysql) { + if (myds->killed_at==0) { + myds->wait_until=0; + myds->killed_at=thread->curtime; + //fprintf(stderr,"Expired: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime); + MySQL_Connection_userinfo *ui=client_myds->myconn->userinfo; + KillArgs *ka = new KillArgs(ui->username, ui->password, myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->mysql->thread_id); + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_attr_setstacksize (&attr, 256*1024); + pthread_t pt; + pthread_create(&pt, &attr, &kill_query_thread, ka); + } + } +} + #define NEXT_IMMEDIATE(new_st) do { set_status(new_st); goto handler_again; } while (0) int MySQL_Session::handler() { @@ -615,6 +712,7 @@ int MySQL_Session::handler() { if (to_process==0) return 0; // this should be redundant if the called does the same check proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Processing session %p\n" , this->thread, this, this); PtrSize_t pkt; + pktH=&pkt; unsigned int j; unsigned char c; @@ -774,35 +872,7 @@ __get_pkts_from_client: // default_hostgroup=mirror_hostgroup; //} if (mirror==false) { - if (pkt.size < 15*1024*1024 && (qpo->mirror_hostgroup >= 0 || qpo->mirror_flagOUT >= 0)) { - MySQL_Session *newsess=new MySQL_Session(); - newsess->client_myds = new MySQL_Data_Stream(); - newsess->client_myds->DSS=STATE_SLEEP; - newsess->client_myds->sess=newsess; - newsess->client_myds->myds_type=MYDS_FRONTEND; - newsess->client_myds->PSarrayOUT= new PtrSizeArray(); - newsess->thread_session_id=__sync_fetch_and_add(&glovars.thread_id,1); - thread->register_session(newsess); - newsess->status=WAITING_CLIENT_DATA; - MySQL_Connection *myconn=new MySQL_Connection; - myconn->userinfo->set(client_myds->myconn->userinfo); - newsess->client_myds->attach_connection(myconn); - newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess); - newsess->to_process=1; - if (qpo->mirror_hostgroup>= 0) { - newsess->mirror_hostgroup=qpo->mirror_hostgroup; // in the new session we copy the mirror hostgroup - } else { - newsess->mirror_hostgroup=default_hostgroup; // copy the default - } - newsess->mirror_flagOUT=qpo->mirror_flagOUT; // in the new session we copy the mirror flagOUT - newsess->default_schema=strdup(default_schema); - newsess->mirror=true; - newsess->mirrorPkt.size=pkt.size; - newsess->mirrorPkt.ptr=l_alloc(newsess->mirrorPkt.size); - memcpy(newsess->mirrorPkt.ptr,pkt.ptr,pkt.size); - newsess->handler(); // execute immediately - newsess->to_process=0; - } + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); } if (autocommit_on_hostgroup>=0) { @@ -1054,50 +1124,10 @@ handler_again: // FIXME: to implement break; case PINGING_SERVER: - - assert(mybe->server_myds->myconn); - { - MySQL_Data_Stream *myds=mybe->server_myds; - MySQL_Connection *myconn=myds->myconn; - int rc=myconn->async_ping(myds->revents); -// if (myds->mypolls==NULL) { -// thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime); -// } - if (rc==0) { - myconn->async_state_machine=ASYNC_IDLE; - //if ((myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { - if (mysql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - myds->return_MySQL_Connection_To_Pool(); - } else { - myds->destroy_MySQL_Connection_From_Pool(true); - } - delete mybe->server_myds; - mybe->server_myds=NULL; - set_status(NONE); + { + int rc=handler_again___status_PINGING_SERVER(); + if (rc==-1) // if the ping fails, we destroy the session return -1; - } else { - if (rc==-1 || rc==-2) { - if (rc==-2) { - proxy_error("Ping timeout during ping on %s , %d\n", myconn->parent->address, myconn->parent->port); - } else { // rc==-1 - int myerr=mysql_errno(myconn->mysql); - proxy_error("Detected a broken connection during ping on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->mysql)); - } - myds->destroy_MySQL_Connection_From_Pool(false); - myds->fd=0; - delete mybe->server_myds; - mybe->server_myds=NULL; - //thread->mypolls.remove_index_fast(myds->poll_fds_idx); - return -1; - } else { - // rc==1 , nothing to do for now -// tring to fix bug - if (myds->mypolls==NULL) { - thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime); - } -// tring to fix bug - } - } } break; @@ -1120,22 +1150,7 @@ handler_again: || (killed==true) // session was killed by admin ) { - MySQL_Data_Stream *myds=mybe->server_myds; - if (myds->myconn && myds->myconn->mysql) { - if (myds->killed_at==0) { - myds->wait_until=0; - myds->killed_at=thread->curtime; - //fprintf(stderr,"Expired: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime); - MySQL_Connection_userinfo *ui=client_myds->myconn->userinfo; - KillArgs *ka = new KillArgs(ui->username, ui->password, myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->mysql->thread_id); - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - pthread_attr_setstacksize (&attr, 256*1024); - pthread_t pt; - pthread_create(&pt, &attr, &kill_query_thread, ka); - } - } + handler_again___new_thread_to_kill_connection(); } if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) { // we don't have a backend yet