From 209d78439768698fd3e3ef37abcdbbb54a40cd3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Mon, 9 Feb 2015 00:47:47 +0000 Subject: [PATCH] Drafting auto-reconnect Issue #195 and relative subtickets: - issue #191 - issue #192 - issue #194 --- include/proxysql_structs.h | 3 +- lib/MySQL_Protocol.cpp | 6 +- lib/MySQL_Session.cpp | 99 +++++++++++++++++++++++++-------- lib/Standard_MySQL_Thread.cpp | 49 ++++++++++++++-- lib/Standard_ProxySQL_Admin.cpp | 6 +- lib/mysql_data_stream.cpp | 12 +++- 6 files changed, 139 insertions(+), 36 deletions(-) diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index ca35dc28e..9983fb70c 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -71,7 +71,8 @@ enum mysql_data_stream_status { STATE_SLEEP, STATE_CLIENT_COM_QUERY, STATE_READY, - STATE_QUERY_SENT, + STATE_QUERY_SENT_DS, + STATE_QUERY_SENT_NET, STATE_COLUMN_COUNT, STATE_COLUMN_DEFINITION, STATE_ROW, diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 93acca6a0..29e520f6d 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -785,7 +785,8 @@ bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len, (*myds)->PSarrayOUT->add((void *)_ptr,size); switch ((*myds)->DSS) { case STATE_CLIENT_HANDSHAKE: - case STATE_QUERY_SENT: + case STATE_QUERY_SENT_DS: + case STATE_QUERY_SENT_NET: (*myds)->DSS=STATE_ERR; break; default: @@ -839,7 +840,8 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u (*myds)->PSarrayOUT->add((void *)_ptr,size); switch ((*myds)->DSS) { case STATE_CLIENT_HANDSHAKE: - case STATE_QUERY_SENT: + case STATE_QUERY_SENT_DS: + case STATE_QUERY_SENT_NET: (*myds)->DSS=STATE_OK; break; default: diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index cb473b6d3..8d1f84e0d 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -180,12 +180,24 @@ void MySQL_Session::reset_all_backends() { void MySQL_Session::writeout() { if (client_myds) client_myds->array2buffer_full(); - if (server_myds && server_myds->myds_type==MYDS_BACKEND) server_myds->array2buffer_full(); - + if (server_myds && server_myds->myds_type==MYDS_BACKEND) { + if (admin==false) { + if (server_myds->net_failure==false) { + if (server_myds->poll_fds_idx>-1 && (server_myds->mypolls->fds[server_myds->poll_fds_idx].revents & POLLOUT)) { + server_myds->array2buffer_full(); + } + } else { + server_myds->move_from_OUT_to_OUTpending(); + } + } else { + server_myds->array2buffer_full(); + } + } // FIXME: experimental //if (client_myds) client_myds->set_pollout(); //if (server_myds) server_myds->set_pollout(); if (client_myds) client_myds->write_to_net_poll(); + //if (server_myds && server_myds->net_failure==false) server_myds->write_to_net_poll(); if (server_myds) server_myds->write_to_net_poll(); proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this); } @@ -287,7 +299,7 @@ int MySQL_Session::handler() { server_myds=mybe->server_myds; } server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; } else { // this is processed by the admin module admin_func(this, GloAdmin, &pkt); @@ -334,12 +346,13 @@ int MySQL_Session::handler() { +__get_a_backend: - if (client_myds->DSS==STATE_QUERY_SENT) { + if (client_myds->DSS==STATE_QUERY_SENT_NET) { // the client has completely sent the query, now we should handle it server side // if (server_myds->DSS==STATE_NOT_INITIALIZED) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT , server_myds==STATE_NOT_INITIALIZED\n", this); + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_NOT_INITIALIZED\n", this); // DSS is STATE_NOT_INITIALIZED. It means we are not connected to any server // try to connect pending_connect=1; @@ -368,7 +381,7 @@ int MySQL_Session::handler() { //} else { TRY #1 } // TRY #1 if (server_myds->myds_type==MYDS_BACKEND && server_myds->DSS==STATE_READY) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT , server_myds==STATE_READY , server_myds->myds_type==MYDS_BACKEND\n", this); + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_READY , server_myds->myds_type==MYDS_BACKEND\n", this); //if (strcmp(userinfo_client.schemaname,userinfo_server.schemaname)==0) { if ( (client_myds->myconn->userinfo->hash!=mybe->myconn->userinfo->hash) @@ -389,7 +402,7 @@ int MySQL_Session::handler() { } } else { //server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); - server_myds->DSS=STATE_QUERY_SENT; + server_myds->DSS=STATE_QUERY_SENT_DS; status=WAITING_SERVER_DATA; } } @@ -424,7 +437,7 @@ __exit_DSS__STATE_NOT_INITIALIZED: case WAITING_SERVER_DATA: switch (server_myds->DSS) { - case STATE_QUERY_SENT: + case STATE_QUERY_SENT_NET: handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(&pkt); break; @@ -461,6 +474,48 @@ __exit_DSS__STATE_NOT_INITIALIZED: } writeout(); + + if ( + server_myds + && + server_myds->DSS==STATE_QUERY_SENT_DS + && + server_myds->PSarrayOUT->len==0 + && + server_myds->net_failure==false + && + server_myds->available_data_out()==false + ) { + server_myds->DSS=STATE_QUERY_SENT_NET; + } + if (server_myds) { + if (server_myds->net_failure) { + if (( server_myds->DSS==STATE_READY || server_myds->DSS==STATE_QUERY_SENT_DS ) && server_myds->myds_type==MYDS_BACKEND) { + server_myds->myconn=NULL; + server_myds->DSS=STATE_NOT_INITIALIZED; + server_myds->move_from_OUT_to_OUTpending(); + if (mybe->myconn) { + MyHGM->destroy_MyConn_from_pool(mybe->myconn); + mybe->myconn=NULL; + } + if (server_fd) { + shutdown(server_myds->fd,SHUT_RDWR); + close(server_myds->fd); + server_myds->fd=0; + thread->mypolls.remove_index_fast(server_myds->poll_fds_idx); + server_fd=0; + } + server_myds->net_failure=false; + server_myds->active=1; + goto __get_a_backend; + } else { + healthy=0; + } + } + } + + //writeout(); + /* if ( // FIXME: this implementation is horrible (server_myds ? server_myds->PSarrayIN->len==0 : 1 ) && @@ -489,7 +544,7 @@ bool MySQL_Session::handler___status_CHANGING_SCHEMA(PtrSize_t *pkt) { for (k=0; kPSarrayOUTpending->len;) { server_myds->PSarrayOUTpending->remove_index(0,&pkt2); server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); - server_myds->DSS=STATE_QUERY_SENT; + server_myds->DSS=STATE_QUERY_SENT_DS; } return true; } else { @@ -516,7 +571,7 @@ bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) { for (k=0; kPSarrayOUTpending->len;) { server_myds->PSarrayOUTpending->remove_index(0,&pkt2); server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); - server_myds->DSS=STATE_QUERY_SENT; + server_myds->DSS=STATE_QUERY_SENT_DS; } return true; } else { @@ -646,13 +701,13 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE( for (k=0; kPSarrayOUTpending->len;) { server_myds->PSarrayOUTpending->remove_index(0,&pkt2); server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); - server_myds->DSS=STATE_QUERY_SENT; + server_myds->DSS=STATE_QUERY_SENT_DS; } } else { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for backend: disconnecting\n"); l_free(pkt->size,pkt->ptr); *wrong_pass=true; - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100); sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO")); myprot_client.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000", _s); @@ -701,7 +756,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for frontend: disconnecting\n"); *wrong_pass=true; // FIXME: this should become close connection - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100); sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO")); myprot_client.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"#28000", _s); @@ -735,7 +790,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C v=*((char *)pkt->ptr+3); proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_SET_OPTION packet , value %d\n", v); // FIXME: ProxySQL doesn't support yet CLIENT_MULTI_STATEMENTS - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; if (v==1) { myprot_client.generate_pkt_EOF(true,NULL,NULL,1,0,0); } else { @@ -748,7 +803,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PING(PtrSize_t *pkt) { proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_PING packet\n"); l_free(pkt->size,pkt->ptr); - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL); client_myds->DSS=STATE_SLEEP; } @@ -757,12 +812,12 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C if (admin==false) { /* FIXME: temporary */ l_free(pkt->size,pkt->ptr); - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; myprot_client.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); client_myds->DSS=STATE_SLEEP; } else { l_free(pkt->size,pkt->ptr); - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; myprot_client.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); client_myds->DSS=STATE_SLEEP; } @@ -774,12 +829,12 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C if (admin==false) { client_myds->myconn->userinfo->set_schemaname((char *)pkt->ptr+sizeof(mysql_hdr)+1,pkt->size-sizeof(mysql_hdr)-1); l_free(pkt->size,pkt->ptr); - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL); client_myds->DSS=STATE_SLEEP; } else { l_free(pkt->size,pkt->ptr); - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL); client_myds->DSS=STATE_SLEEP; } @@ -818,7 +873,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STATISTICS(PtrSize_t *pkt) { proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_STATISTICS packet\n"); l_free(pkt->size,pkt->ptr); - client_myds->DSS=STATE_QUERY_SENT; + client_myds->DSS=STATE_QUERY_SENT_NET; myprot_client.generate_statistics_response(true,NULL,NULL); client_myds->DSS=STATE_SLEEP; } @@ -859,7 +914,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_INIT_DB_to_backend() mybe->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname)); //myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname); myprot_server.generate_COM_INIT_DB(true,NULL,NULL,mybe->myconn->userinfo->schemaname); - server_myds->DSS=STATE_QUERY_SENT; + server_myds->DSS=STATE_QUERY_SENT_DS; status=CHANGING_SCHEMA; } @@ -870,7 +925,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_CHANGE_USER_to_backen mybe->myconn->userinfo->set(client_myds->myconn->userinfo); //myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname); myprot_server.generate_COM_CHANGE_USER(true,NULL,NULL); - server_myds->DSS=STATE_QUERY_SENT; + server_myds->DSS=STATE_QUERY_SENT_DS; status=CHANGING_USER_SERVER; } diff --git a/lib/Standard_MySQL_Thread.cpp b/lib/Standard_MySQL_Thread.cpp index 62a8d8925..0ed778c14 100644 --- a/lib/Standard_MySQL_Thread.cpp +++ b/lib/Standard_MySQL_Thread.cpp @@ -654,18 +654,56 @@ void process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { myds->revents=mypolls.fds[n].revents; myds->read_from_net(); myds->read_pkts(); + + if ( (mypolls.fds[n].events & POLLOUT) + && + ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) + ) { + myds->net_failure=true; + } + myds->check_data_flow(); myds->sess->to_process=1; + + + if (myds->active==FALSE) { + if (myds->sess->client_myds==myds) { + proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd); + myds->sess->healthy=0; + } + } + +/* if (myds->active==FALSE) { mypolls.remove_index_fast(n); proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd); - myds->shut_hard(); + //myds->shut_hard(); MySQL_Session *sess=myds->sess; + if ( + (sess->server_myds==myds) + && + (myds->myds_type==MYDS_BACKEND) + && + (myds->DSS==STATE_READY) + ) { + if (sess->mybe->myconn) { + MyHGM->destroy_MyConn_from_pool(sess->mybe->myconn); + sess->mybe->myconn=NULL; + } + // This is a failed backend, let's try to save the session + return; + } + sess->healthy=0; - if (sess->client_myds==myds) sess->client_myds=NULL; - if (sess->server_myds==myds) sess->server_myds=NULL; - delete myds; - myds=NULL; // useless? + if (sess->client_myds==myds) { + sess->client_myds=NULL; + delete myds; + } + if (sess->server_myds==myds) { + sess->server_myds=NULL; + } + //delete myds; + //myds=NULL; // useless? // FIXME // if (sess->client_myds==NULL && sess->server_myds==NULL) { // mysql_sessions->remove_fast(sess); @@ -673,6 +711,7 @@ void process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { // continue; // } } +*/ } void process_all_sessions() { diff --git a/lib/Standard_ProxySQL_Admin.cpp b/lib/Standard_ProxySQL_Admin.cpp index d8c7e9f3a..dc69f6351 100644 --- a/lib/Standard_ProxySQL_Admin.cpp +++ b/lib/Standard_ProxySQL_Admin.cpp @@ -2273,7 +2273,7 @@ void Standard_ProxySQL_Admin::__refresh_users() { void Standard_ProxySQL_Admin::send_MySQL_OK(MySQL_Protocol *myprot, char *msg) { assert(myprot); MySQL_Data_Stream *myds=myprot->get_myds(); - myds->DSS=STATE_QUERY_SENT; + myds->DSS=STATE_QUERY_SENT_DS; myprot->generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,msg); myds->DSS=STATE_SLEEP; } @@ -2281,7 +2281,7 @@ void Standard_ProxySQL_Admin::send_MySQL_OK(MySQL_Protocol *myprot, char *msg) { void Standard_ProxySQL_Admin::send_MySQL_ERR(MySQL_Protocol *myprot, char *msg) { assert(myprot); MySQL_Data_Stream *myds=myprot->get_myds(); - myds->DSS=STATE_QUERY_SENT; + myds->DSS=STATE_QUERY_SENT_DS; myprot->generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",msg); myds->DSS=STATE_SLEEP; } @@ -2289,7 +2289,7 @@ void Standard_ProxySQL_Admin::send_MySQL_ERR(MySQL_Protocol *myprot, char *msg) void Standard_ProxySQL_Admin::SQLite3_to_MySQL(SQLite3_result *result, char *error, int affected_rows, MySQL_Protocol *myprot) { assert(myprot); MySQL_Data_Stream *myds=myprot->get_myds(); - myds->DSS=STATE_QUERY_SENT; + myds->DSS=STATE_QUERY_SENT_DS; int sid=1; if (result) { // sess->myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,0,0,NULL); diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index f86f03ea0..c8d8eceaa 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -335,9 +335,15 @@ int MySQL_Data_Stream::write_to_net_poll() { int rc=0; if (active==FALSE) return rc; proxy_debug(PROXY_DEBUG_NET,1,"Session=%p, DataStream=%p --\n", sess, this); - if (queue_data(queueOUT) && poll_fds_idx>-1 && (mypolls->fds[poll_fds_idx].revents & POLLOUT)) { - //if (queue_data(queueOUT)) { //FIXME - rc=write_to_net(); + if (queue_data(queueOUT)) { + if ((sess->admin==false)) { + if (poll_fds_idx>-1 && (mypolls->fds[poll_fds_idx].revents & POLLOUT)) { + if (net_failure==false) + rc=write_to_net(); + } + } else { + rc=write_to_net(); + } } if (fd>0 && sess->admin==false) set_pollout(); return rc;