Drafting auto-reconnect

Issue #195 and relative subtickets:
- issue #191
- issue #192
- issue #194
pull/201/head^2
René Cannaò 11 years ago
parent b46fa7c5f2
commit 209d784397

@ -71,7 +71,8 @@ enum mysql_data_stream_status {
STATE_SLEEP,
STATE_CLIENT_COM_QUERY,
STATE_READY,
STATE_QUERY_SENT,
STATE_QUERY_SENT_DS,
STATE_QUERY_SENT_NET,
STATE_COLUMN_COUNT,
STATE_COLUMN_DEFINITION,
STATE_ROW,

@ -785,7 +785,8 @@ bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len,
(*myds)->PSarrayOUT->add((void *)_ptr,size);
switch ((*myds)->DSS) {
case STATE_CLIENT_HANDSHAKE:
case STATE_QUERY_SENT:
case STATE_QUERY_SENT_DS:
case STATE_QUERY_SENT_NET:
(*myds)->DSS=STATE_ERR;
break;
default:
@ -839,7 +840,8 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u
(*myds)->PSarrayOUT->add((void *)_ptr,size);
switch ((*myds)->DSS) {
case STATE_CLIENT_HANDSHAKE:
case STATE_QUERY_SENT:
case STATE_QUERY_SENT_DS:
case STATE_QUERY_SENT_NET:
(*myds)->DSS=STATE_OK;
break;
default:

@ -180,12 +180,24 @@ void MySQL_Session::reset_all_backends() {
void MySQL_Session::writeout() {
if (client_myds) client_myds->array2buffer_full();
if (server_myds && server_myds->myds_type==MYDS_BACKEND) server_myds->array2buffer_full();
if (server_myds && server_myds->myds_type==MYDS_BACKEND) {
if (admin==false) {
if (server_myds->net_failure==false) {
if (server_myds->poll_fds_idx>-1 && (server_myds->mypolls->fds[server_myds->poll_fds_idx].revents & POLLOUT)) {
server_myds->array2buffer_full();
}
} else {
server_myds->move_from_OUT_to_OUTpending();
}
} else {
server_myds->array2buffer_full();
}
}
// FIXME: experimental
//if (client_myds) client_myds->set_pollout();
//if (server_myds) server_myds->set_pollout();
if (client_myds) client_myds->write_to_net_poll();
//if (server_myds && server_myds->net_failure==false) server_myds->write_to_net_poll();
if (server_myds) server_myds->write_to_net_poll();
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this);
}
@ -287,7 +299,7 @@ int MySQL_Session::handler() {
server_myds=mybe->server_myds;
}
server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
} else {
// this is processed by the admin module
admin_func(this, GloAdmin, &pkt);
@ -334,12 +346,13 @@ int MySQL_Session::handler() {
__get_a_backend:
if (client_myds->DSS==STATE_QUERY_SENT) {
if (client_myds->DSS==STATE_QUERY_SENT_NET) {
// the client has completely sent the query, now we should handle it server side
//
if (server_myds->DSS==STATE_NOT_INITIALIZED) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT , server_myds==STATE_NOT_INITIALIZED\n", this);
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_NOT_INITIALIZED\n", this);
// DSS is STATE_NOT_INITIALIZED. It means we are not connected to any server
// try to connect
pending_connect=1;
@ -368,7 +381,7 @@ int MySQL_Session::handler() {
//} else { TRY #1
} // TRY #1
if (server_myds->myds_type==MYDS_BACKEND && server_myds->DSS==STATE_READY) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT , server_myds==STATE_READY , server_myds->myds_type==MYDS_BACKEND\n", this);
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_READY , server_myds->myds_type==MYDS_BACKEND\n", this);
//if (strcmp(userinfo_client.schemaname,userinfo_server.schemaname)==0) {
if (
(client_myds->myconn->userinfo->hash!=mybe->myconn->userinfo->hash)
@ -389,7 +402,7 @@ int MySQL_Session::handler() {
}
} else {
//server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
server_myds->DSS=STATE_QUERY_SENT;
server_myds->DSS=STATE_QUERY_SENT_DS;
status=WAITING_SERVER_DATA;
}
}
@ -424,7 +437,7 @@ __exit_DSS__STATE_NOT_INITIALIZED:
case WAITING_SERVER_DATA:
switch (server_myds->DSS) {
case STATE_QUERY_SENT:
case STATE_QUERY_SENT_NET:
handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(&pkt);
break;
@ -461,6 +474,48 @@ __exit_DSS__STATE_NOT_INITIALIZED:
}
writeout();
if (
server_myds
&&
server_myds->DSS==STATE_QUERY_SENT_DS
&&
server_myds->PSarrayOUT->len==0
&&
server_myds->net_failure==false
&&
server_myds->available_data_out()==false
) {
server_myds->DSS=STATE_QUERY_SENT_NET;
}
if (server_myds) {
if (server_myds->net_failure) {
if (( server_myds->DSS==STATE_READY || server_myds->DSS==STATE_QUERY_SENT_DS ) && server_myds->myds_type==MYDS_BACKEND) {
server_myds->myconn=NULL;
server_myds->DSS=STATE_NOT_INITIALIZED;
server_myds->move_from_OUT_to_OUTpending();
if (mybe->myconn) {
MyHGM->destroy_MyConn_from_pool(mybe->myconn);
mybe->myconn=NULL;
}
if (server_fd) {
shutdown(server_myds->fd,SHUT_RDWR);
close(server_myds->fd);
server_myds->fd=0;
thread->mypolls.remove_index_fast(server_myds->poll_fds_idx);
server_fd=0;
}
server_myds->net_failure=false;
server_myds->active=1;
goto __get_a_backend;
} else {
healthy=0;
}
}
}
//writeout();
/*
if ( // FIXME: this implementation is horrible
(server_myds ? server_myds->PSarrayIN->len==0 : 1 ) &&
@ -489,7 +544,7 @@ bool MySQL_Session::handler___status_CHANGING_SCHEMA(PtrSize_t *pkt) {
for (k=0; k<server_myds->PSarrayOUTpending->len;) {
server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
server_myds->DSS=STATE_QUERY_SENT;
server_myds->DSS=STATE_QUERY_SENT_DS;
}
return true;
} else {
@ -516,7 +571,7 @@ bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) {
for (k=0; k<server_myds->PSarrayOUTpending->len;) {
server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
server_myds->DSS=STATE_QUERY_SENT;
server_myds->DSS=STATE_QUERY_SENT_DS;
}
return true;
} else {
@ -646,13 +701,13 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(
for (k=0; k<server_myds->PSarrayOUTpending->len;) {
server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
server_myds->DSS=STATE_QUERY_SENT;
server_myds->DSS=STATE_QUERY_SENT_DS;
}
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for backend: disconnecting\n");
l_free(pkt->size,pkt->ptr);
*wrong_pass=true;
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100);
sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO"));
myprot_client.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000", _s);
@ -701,7 +756,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for frontend: disconnecting\n");
*wrong_pass=true;
// FIXME: this should become close connection
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100);
sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO"));
myprot_client.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"#28000", _s);
@ -735,7 +790,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
v=*((char *)pkt->ptr+3);
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_SET_OPTION packet , value %d\n", v);
// FIXME: ProxySQL doesn't support yet CLIENT_MULTI_STATEMENTS
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
if (v==1) {
myprot_client.generate_pkt_EOF(true,NULL,NULL,1,0,0);
} else {
@ -748,7 +803,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PING(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_PING packet\n");
l_free(pkt->size,pkt->ptr);
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL);
client_myds->DSS=STATE_SLEEP;
}
@ -757,12 +812,12 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
if (admin==false) {
/* FIXME: temporary */
l_free(pkt->size,pkt->ptr);
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
myprot_client.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
} else {
l_free(pkt->size,pkt->ptr);
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
myprot_client.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
}
@ -774,12 +829,12 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
if (admin==false) {
client_myds->myconn->userinfo->set_schemaname((char *)pkt->ptr+sizeof(mysql_hdr)+1,pkt->size-sizeof(mysql_hdr)-1);
l_free(pkt->size,pkt->ptr);
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL);
client_myds->DSS=STATE_SLEEP;
} else {
l_free(pkt->size,pkt->ptr);
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL);
client_myds->DSS=STATE_SLEEP;
}
@ -818,7 +873,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STATISTICS(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_STATISTICS packet\n");
l_free(pkt->size,pkt->ptr);
client_myds->DSS=STATE_QUERY_SENT;
client_myds->DSS=STATE_QUERY_SENT_NET;
myprot_client.generate_statistics_response(true,NULL,NULL);
client_myds->DSS=STATE_SLEEP;
}
@ -859,7 +914,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_INIT_DB_to_backend()
mybe->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname));
//myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname);
myprot_server.generate_COM_INIT_DB(true,NULL,NULL,mybe->myconn->userinfo->schemaname);
server_myds->DSS=STATE_QUERY_SENT;
server_myds->DSS=STATE_QUERY_SENT_DS;
status=CHANGING_SCHEMA;
}
@ -870,7 +925,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_CHANGE_USER_to_backen
mybe->myconn->userinfo->set(client_myds->myconn->userinfo);
//myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname);
myprot_server.generate_COM_CHANGE_USER(true,NULL,NULL);
server_myds->DSS=STATE_QUERY_SENT;
server_myds->DSS=STATE_QUERY_SENT_DS;
status=CHANGING_USER_SERVER;
}

@ -654,18 +654,56 @@ void process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
myds->revents=mypolls.fds[n].revents;
myds->read_from_net();
myds->read_pkts();
if ( (mypolls.fds[n].events & POLLOUT)
&&
( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) )
) {
myds->net_failure=true;
}
myds->check_data_flow();
myds->sess->to_process=1;
if (myds->active==FALSE) {
if (myds->sess->client_myds==myds) {
proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd);
myds->sess->healthy=0;
}
}
/*
if (myds->active==FALSE) {
mypolls.remove_index_fast(n);
proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd);
myds->shut_hard();
//myds->shut_hard();
MySQL_Session *sess=myds->sess;
if (
(sess->server_myds==myds)
&&
(myds->myds_type==MYDS_BACKEND)
&&
(myds->DSS==STATE_READY)
) {
if (sess->mybe->myconn) {
MyHGM->destroy_MyConn_from_pool(sess->mybe->myconn);
sess->mybe->myconn=NULL;
}
// This is a failed backend, let's try to save the session
return;
}
sess->healthy=0;
if (sess->client_myds==myds) sess->client_myds=NULL;
if (sess->server_myds==myds) sess->server_myds=NULL;
delete myds;
myds=NULL; // useless?
if (sess->client_myds==myds) {
sess->client_myds=NULL;
delete myds;
}
if (sess->server_myds==myds) {
sess->server_myds=NULL;
}
//delete myds;
//myds=NULL; // useless?
// FIXME
// if (sess->client_myds==NULL && sess->server_myds==NULL) {
// mysql_sessions->remove_fast(sess);
@ -673,6 +711,7 @@ void process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
// continue;
// }
}
*/
}
void process_all_sessions() {

@ -2273,7 +2273,7 @@ void Standard_ProxySQL_Admin::__refresh_users() {
void Standard_ProxySQL_Admin::send_MySQL_OK(MySQL_Protocol *myprot, char *msg) {
assert(myprot);
MySQL_Data_Stream *myds=myprot->get_myds();
myds->DSS=STATE_QUERY_SENT;
myds->DSS=STATE_QUERY_SENT_DS;
myprot->generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,msg);
myds->DSS=STATE_SLEEP;
}
@ -2281,7 +2281,7 @@ void Standard_ProxySQL_Admin::send_MySQL_OK(MySQL_Protocol *myprot, char *msg) {
void Standard_ProxySQL_Admin::send_MySQL_ERR(MySQL_Protocol *myprot, char *msg) {
assert(myprot);
MySQL_Data_Stream *myds=myprot->get_myds();
myds->DSS=STATE_QUERY_SENT;
myds->DSS=STATE_QUERY_SENT_DS;
myprot->generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",msg);
myds->DSS=STATE_SLEEP;
}
@ -2289,7 +2289,7 @@ void Standard_ProxySQL_Admin::send_MySQL_ERR(MySQL_Protocol *myprot, char *msg)
void Standard_ProxySQL_Admin::SQLite3_to_MySQL(SQLite3_result *result, char *error, int affected_rows, MySQL_Protocol *myprot) {
assert(myprot);
MySQL_Data_Stream *myds=myprot->get_myds();
myds->DSS=STATE_QUERY_SENT;
myds->DSS=STATE_QUERY_SENT_DS;
int sid=1;
if (result) {
// sess->myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,0,0,NULL);

@ -335,9 +335,15 @@ int MySQL_Data_Stream::write_to_net_poll() {
int rc=0;
if (active==FALSE) return rc;
proxy_debug(PROXY_DEBUG_NET,1,"Session=%p, DataStream=%p --\n", sess, this);
if (queue_data(queueOUT) && poll_fds_idx>-1 && (mypolls->fds[poll_fds_idx].revents & POLLOUT)) {
//if (queue_data(queueOUT)) { //FIXME
rc=write_to_net();
if (queue_data(queueOUT)) {
if ((sess->admin==false)) {
if (poll_fds_idx>-1 && (mypolls->fds[poll_fds_idx].revents & POLLOUT)) {
if (net_failure==false)
rc=write_to_net();
}
} else {
rc=write_to_net();
}
}
if (fd>0 && sess->admin==false) set_pollout();
return rc;

Loading…
Cancel
Save