Simplifying MySQL_Session::Handler() , 1

Added functions:
* void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session()
* int handler_again___status_PINGING_SERVER();
* void handler_again___new_thread_to_kill_connection();
pull/739/head
René Cannaò 10 years ago
parent 03354f708e
commit da8d6d8cf3

@ -72,11 +72,18 @@ class MySQL_Session
bool handler_CommitRollback(PtrSize_t *);
bool handler_SetAutocommit(PtrSize_t *);
void RequestEnd(MySQL_Data_Stream *);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
int handler_again___status_PINGING_SERVER();
void handler_again___new_thread_to_kill_connection();
// void return_MySQL_Connection_To_Poll(MySQL_Data_Stream *);
// MySQL_STMT_Manager *Session_STMT_Manager;
//this pointer is always initialized inside handler().
// it is an attempt to start simplifying the complexing of handler()
PtrSize_t *pktH;
public:
void * operator new(size_t);
void operator delete(void *);

@ -608,6 +608,103 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) {
return false;
}
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session() {
if (pktH->size < 15*1024*1024 && (qpo->mirror_hostgroup >= 0 || qpo->mirror_flagOUT >= 0)) {
MySQL_Session *newsess=new MySQL_Session();
newsess->client_myds = new MySQL_Data_Stream();
newsess->client_myds->DSS=STATE_SLEEP;
newsess->client_myds->sess=newsess;
newsess->client_myds->myds_type=MYDS_FRONTEND;
newsess->client_myds->PSarrayOUT= new PtrSizeArray();
newsess->thread_session_id=__sync_fetch_and_add(&glovars.thread_id,1);
thread->register_session(newsess);
newsess->status=WAITING_CLIENT_DATA;
MySQL_Connection *myconn=new MySQL_Connection;
myconn->userinfo->set(client_myds->myconn->userinfo);
newsess->client_myds->attach_connection(myconn);
newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess);
newsess->to_process=1;
if (qpo->mirror_hostgroup>= 0) {
newsess->mirror_hostgroup=qpo->mirror_hostgroup; // in the new session we copy the mirror hostgroup
} else {
newsess->mirror_hostgroup=default_hostgroup; // copy the default
}
newsess->mirror_flagOUT=qpo->mirror_flagOUT; // in the new session we copy the mirror flagOUT
newsess->default_schema=strdup(default_schema);
newsess->mirror=true;
newsess->mirrorPkt.size=pktH->size;
newsess->mirrorPkt.ptr=l_alloc(newsess->mirrorPkt.size);
memcpy(newsess->mirrorPkt.ptr,pktH->ptr,pktH->size);
newsess->handler(); // execute immediately
newsess->to_process=0;
}
}
int MySQL_Session::handler_again___status_PINGING_SERVER() {
assert(mybe->server_myds->myconn);
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
int rc=myconn->async_ping(myds->revents);
// if (myds->mypolls==NULL) {
// thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime);
// }
if (rc==0) {
myconn->async_state_machine=ASYNC_IDLE;
//if ((myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if (mysql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
myds->return_MySQL_Connection_To_Pool();
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
delete mybe->server_myds;
mybe->server_myds=NULL;
set_status(NONE);
return -1;
} else {
if (rc==-1 || rc==-2) {
if (rc==-2) {
proxy_error("Ping timeout during ping on %s , %d\n", myconn->parent->address, myconn->parent->port);
} else { // rc==-1
int myerr=mysql_errno(myconn->mysql);
proxy_error("Detected a broken connection during ping on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->mysql));
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
delete mybe->server_myds;
mybe->server_myds=NULL;
//thread->mypolls.remove_index_fast(myds->poll_fds_idx);
return -1;
} else {
// rc==1 , nothing to do for now
// tring to fix bug
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime);
}
// tring to fix bug
}
}
return 0;
}
void MySQL_Session::handler_again___new_thread_to_kill_connection() {
MySQL_Data_Stream *myds=mybe->server_myds;
if (myds->myconn && myds->myconn->mysql) {
if (myds->killed_at==0) {
myds->wait_until=0;
myds->killed_at=thread->curtime;
//fprintf(stderr,"Expired: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime);
MySQL_Connection_userinfo *ui=client_myds->myconn->userinfo;
KillArgs *ka = new KillArgs(ui->username, ui->password, myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->mysql->thread_id);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setstacksize (&attr, 256*1024);
pthread_t pt;
pthread_create(&pt, &attr, &kill_query_thread, ka);
}
}
}
#define NEXT_IMMEDIATE(new_st) do { set_status(new_st); goto handler_again; } while (0)
int MySQL_Session::handler() {
@ -615,6 +712,7 @@ int MySQL_Session::handler() {
if (to_process==0) return 0; // this should be redundant if the called does the same check
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Processing session %p\n" , this->thread, this, this);
PtrSize_t pkt;
pktH=&pkt;
unsigned int j;
unsigned char c;
@ -774,35 +872,7 @@ __get_pkts_from_client:
// default_hostgroup=mirror_hostgroup;
//}
if (mirror==false) {
if (pkt.size < 15*1024*1024 && (qpo->mirror_hostgroup >= 0 || qpo->mirror_flagOUT >= 0)) {
MySQL_Session *newsess=new MySQL_Session();
newsess->client_myds = new MySQL_Data_Stream();
newsess->client_myds->DSS=STATE_SLEEP;
newsess->client_myds->sess=newsess;
newsess->client_myds->myds_type=MYDS_FRONTEND;
newsess->client_myds->PSarrayOUT= new PtrSizeArray();
newsess->thread_session_id=__sync_fetch_and_add(&glovars.thread_id,1);
thread->register_session(newsess);
newsess->status=WAITING_CLIENT_DATA;
MySQL_Connection *myconn=new MySQL_Connection;
myconn->userinfo->set(client_myds->myconn->userinfo);
newsess->client_myds->attach_connection(myconn);
newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess);
newsess->to_process=1;
if (qpo->mirror_hostgroup>= 0) {
newsess->mirror_hostgroup=qpo->mirror_hostgroup; // in the new session we copy the mirror hostgroup
} else {
newsess->mirror_hostgroup=default_hostgroup; // copy the default
}
newsess->mirror_flagOUT=qpo->mirror_flagOUT; // in the new session we copy the mirror flagOUT
newsess->default_schema=strdup(default_schema);
newsess->mirror=true;
newsess->mirrorPkt.size=pkt.size;
newsess->mirrorPkt.ptr=l_alloc(newsess->mirrorPkt.size);
memcpy(newsess->mirrorPkt.ptr,pkt.ptr,pkt.size);
newsess->handler(); // execute immediately
newsess->to_process=0;
}
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
}
if (autocommit_on_hostgroup>=0) {
@ -1054,50 +1124,10 @@ handler_again:
// FIXME: to implement
break;
case PINGING_SERVER:
assert(mybe->server_myds->myconn);
{
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
int rc=myconn->async_ping(myds->revents);
// if (myds->mypolls==NULL) {
// thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime);
// }
if (rc==0) {
myconn->async_state_machine=ASYNC_IDLE;
//if ((myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if (mysql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
myds->return_MySQL_Connection_To_Pool();
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
delete mybe->server_myds;
mybe->server_myds=NULL;
set_status(NONE);
{
int rc=handler_again___status_PINGING_SERVER();
if (rc==-1) // if the ping fails, we destroy the session
return -1;
} else {
if (rc==-1 || rc==-2) {
if (rc==-2) {
proxy_error("Ping timeout during ping on %s , %d\n", myconn->parent->address, myconn->parent->port);
} else { // rc==-1
int myerr=mysql_errno(myconn->mysql);
proxy_error("Detected a broken connection during ping on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->mysql));
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
delete mybe->server_myds;
mybe->server_myds=NULL;
//thread->mypolls.remove_index_fast(myds->poll_fds_idx);
return -1;
} else {
// rc==1 , nothing to do for now
// tring to fix bug
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, myds->fd, myds, thread->curtime);
}
// tring to fix bug
}
}
}
break;
@ -1120,22 +1150,7 @@ handler_again:
||
(killed==true) // session was killed by admin
) {
MySQL_Data_Stream *myds=mybe->server_myds;
if (myds->myconn && myds->myconn->mysql) {
if (myds->killed_at==0) {
myds->wait_until=0;
myds->killed_at=thread->curtime;
//fprintf(stderr,"Expired: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime);
MySQL_Connection_userinfo *ui=client_myds->myconn->userinfo;
KillArgs *ka = new KillArgs(ui->username, ui->password, myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->mysql->thread_id);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setstacksize (&attr, 256*1024);
pthread_t pt;
pthread_create(&pt, &attr, &kill_query_thread, ka);
}
}
handler_again___new_thread_to_kill_connection();
}
if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) {
// we don't have a backend yet

Loading…
Cancel
Save