diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 6f1dd26d9..f2c2e47c3 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -76,6 +76,15 @@ class MySQL_Session void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); int handler_again___status_PINGING_SERVER(); void handler_again___new_thread_to_kill_connection(); + + bool handler_again___verify_backend_charset(); + bool handler_again___verify_init_connect(); + bool handler_again___verify_backend_autocommit(); + bool handler_again___verify_backend_user_schema(); + bool handler_again___status_SETTING_INIT_CONNECT(int *); + bool handler_again___status_CHANGING_SCHEMA(int *); + bool handler_again___status_CONNECTING_SERVER(int *); + // void return_MySQL_Connection_To_Poll(MySQL_Data_Stream *); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index fff79aca4..c64cb2076 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -705,7 +705,377 @@ void MySQL_Session::handler_again___new_thread_to_kill_connection() { } } +// NEXT_IMMEDIATE is a legacy macro used inside handler() to immediately jump +// to handler_again #define NEXT_IMMEDIATE(new_st) do { set_status(new_st); goto handler_again; } while (0) +// NEXT_IMMEDIATE_NEW is a new macro to use *outside* handler(). +// handler() should check the return code of the function it calls, and if +// true should jump to handler_again +#define NEXT_IMMEDIATE_NEW(new_st) do { set_status(new_st); return true; } while (0) + +bool MySQL_Session::handler_again___verify_backend_charset() { + if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) { + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } + NEXT_IMMEDIATE_NEW(CHANGING_CHARSET); + } + return false; +} + + +bool MySQL_Session::handler_again___verify_init_connect() { + if (mybe->server_myds->myconn->options.init_connect_sent==false) { + // we needs to set it to true + mybe->server_myds->myconn->options.init_connect_sent=true; + if (mysql_thread___init_connect) { + // we send init connect queries only if set + mybe->server_myds->myconn->options.init_connect=strdup(mysql_thread___init_connect); + previous_status.push(PROCESSING_QUERY); + NEXT_IMMEDIATE_NEW(SETTING_INIT_CONNECT); + } + } + return false; +} + +bool MySQL_Session::handler_again___verify_backend_autocommit() { + if (autocommit != mybe->server_myds->myconn->IsAutoCommit()) { + // see case #485 + if (mysql_thread___enforce_autocommit_on_reads == false) { + // enforce_autocommit_on_reads is disabled + // we need to check if it is a SELECT not FOR UPDATE + if (CurrentQuery.is_select_NOT_for_update()==false) { + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } + NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT); + } + } else { + // in every other cases, enforce autocommit + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } + NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT); + } + } + return false; +} + +bool MySQL_Session::handler_again___verify_backend_user_schema() { + MySQL_Data_Stream *myds=mybe->server_myds; + if (client_myds->myconn->userinfo->hash!=mybe->server_myds->myconn->userinfo->hash) { + if (strcmp(client_myds->myconn->userinfo->username,myds->myconn->userinfo->username)) { + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } + NEXT_IMMEDIATE_NEW(CHANGING_USER_SERVER); + } + if (strcmp(client_myds->myconn->userinfo->schemaname,myds->myconn->userinfo->schemaname)) { + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } + NEXT_IMMEDIATE_NEW(CHANGING_SCHEMA); + } + } + return false; +} + +bool MySQL_Session::handler_again___status_SETTING_INIT_CONNECT(int *_rc) { + bool ret=false; + assert(mybe->server_myds->myconn); + MySQL_Data_Stream *myds=mybe->server_myds; + MySQL_Connection *myconn=myds->myconn; + myds->DSS=STATE_MARIADB_QUERY; + enum session_status st=status; + if (myds->mypolls==NULL) { + thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); + } + int rc=myconn->async_send_simple_command(myds->revents,myconn->options.init_connect,strlen(myconn->options.init_connect)); + if (rc==0) { + myds->revents|=POLLOUT; // we also set again POLLOUT to send a query immediately! + st=previous_status.top(); + previous_status.pop(); + NEXT_IMMEDIATE_NEW(st); + } else { + if (rc==-1) { + // the command failed + int myerr=mysql_errno(myconn->mysql); + if (myerr > 2000) { + bool retry_conn=false; + // client error, serious + proxy_error("Detected a broken connection while setting INIT CONNECT on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql)); + //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { + retry_conn=true; + } + myds->destroy_MySQL_Connection_From_Pool(false); + myds->fd=0; + if (retry_conn) { + myds->DSS=STATE_NOT_INITIALIZED; + //previous_status.push(PROCESSING_QUERY); + NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); + } + *_rc=-1; // an error happened, we should destroy the Session + return ret; + } else { + proxy_warning("Error while setting INIT CONNECT: %d, %s\n", myerr, mysql_error(myconn->mysql)); + // we won't go back to PROCESSING_QUERY + st=previous_status.top(); + previous_status.pop(); + char sqlstate[10]; + sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql)); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql)); + myds->destroy_MySQL_Connection_From_Pool(true); + myds->fd=0; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + } else { + // rc==1 , nothing to do for now + } + } + return ret; +} + + +bool MySQL_Session::handler_again___status_CHANGING_SCHEMA(int *_rc) { + bool ret=false; + //fprintf(stderr,"CHANGING_SCHEMA\n"); + assert(mybe->server_myds->myconn); + MySQL_Data_Stream *myds=mybe->server_myds; + MySQL_Connection *myconn=myds->myconn; + myds->DSS=STATE_MARIADB_QUERY; + enum session_status st=status; + if (myds->mypolls==NULL) { + thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); + } + int rc=myconn->async_select_db(myds->revents); + if (rc==0) { + myds->myconn->userinfo->set(client_myds->myconn->userinfo); + st=previous_status.top(); + previous_status.pop(); + NEXT_IMMEDIATE_NEW(st); + } else { + if (rc==-1) { + // the command failed + int myerr=mysql_errno(myconn->mysql); + if (myerr > 2000) { + bool retry_conn=false; + // client error, serious + proxy_error("Detected a broken connection during INIT_DB on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql)); + //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { + retry_conn=true; + } + myds->destroy_MySQL_Connection_From_Pool(false); + myds->fd=0; + if (retry_conn) { + myds->DSS=STATE_NOT_INITIALIZED; + //previous_status.push(PROCESSING_QUERY); + NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); + } + *_rc=-1; // an error happened, we should destroy the Session + return ret; + } else { + proxy_warning("Error during INIT_DB: %d, %s\n", myerr, mysql_error(myconn->mysql)); + // we won't go back to PROCESSING_QUERY + st=previous_status.top(); + previous_status.pop(); + char sqlstate[10]; + sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql)); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql)); +// CurrentQuery.end(); +// myds->free_mysql_real_query(); + myds->destroy_MySQL_Connection_From_Pool(true); + myds->fd=0; +// status=WAITING_CLIENT_DATA; +// client_myds->DSS=STATE_SLEEP; + RequestEnd(myds); + } + } else { + // rc==1 , nothing to do for now + } + } + return false; +} + + +bool MySQL_Session::handler_again___status_CONNECTING_SERVER(int *_rc) { + //fprintf(stderr,"CONNECTING_SERVER\n"); + if (mybe->server_myds->max_connect_time) { + if (thread->curtime >= mybe->server_myds->max_connect_time) { + char buf[256]; + sprintf(buf,"Max connect timeout reached while reaching hostgroup %d after %llums", current_hostgroup, (thread->curtime - CurrentQuery.start_time)/1000 ); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf); +// CurrentQuery.end(); +// mybe->server_myds->free_mysql_real_query(); +// client_myds->DSS=STATE_SLEEP; + RequestEnd(mybe->server_myds); + //enum session_status st; + while (previous_status.size()) { + previous_status.top(); + previous_status.pop(); + } + if (mybe->server_myds->myconn) { + //mybe->server_myds->destroy_MySQL_Connection(); + mybe->server_myds->destroy_MySQL_Connection_From_Pool(false); + } + mybe->server_myds->max_connect_time=0; + NEXT_IMMEDIATE_NEW(WAITING_CLIENT_DATA); + } + } + if (mybe->server_myds->myconn==NULL) { + handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection(); + } + if (mybe->server_myds->myconn==NULL) { + pause_until=thread->curtime+mysql_thread___connect_retries_delay*1000; + //goto __exit_DSS__STATE_NOT_INITIALIZED; + *_rc=1; + return false; + } else { + MySQL_Data_Stream *myds=mybe->server_myds; + MySQL_Connection *myconn=myds->myconn; + int rc; + if (default_hostgroup<0) { + // we are connected to a Admin module backend + // we pretend to set a user variable to disable multiplexing + myconn->set_status_user_variable(true); + } + enum session_status st=status; + if (mybe->server_myds->myconn->async_state_machine==ASYNC_IDLE) { + st=previous_status.top(); + previous_status.pop(); + NEXT_IMMEDIATE_NEW(st); + assert(0); + } + assert(st==status); + unsigned long long curtime=monotonic_time(); + //mybe->server_myds->myprot.init(&mybe->server_myds, mybe->server_myds->myconn->userinfo, this); +/* */ + assert(myconn->async_state_machine!=ASYNC_IDLE); + rc=myconn->async_connect(myds->revents); + if (myds->mypolls==NULL) { + // connection yet not in mypolls + myds->assign_fd_from_mysql_conn(); + thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, curtime); + } + switch (rc) { + case 0: + myds->myds_type=MYDS_BACKEND; + myds->DSS=STATE_MARIADB_GENERIC; + status=WAITING_CLIENT_DATA; + st=previous_status.top(); + previous_status.pop(); + myds->wait_until=0; + if (session_fast_forward==true) { + // we have a successful connection and session_fast_forward enabled + // set DSS=STATE_SLEEP or it will believe it have to use MARIADB client library + myds->DSS=STATE_SLEEP; + } + NEXT_IMMEDIATE_NEW(st); + break; + case -1: + case -2: + // FIXME: experimental + //wrong_pass=true; + if (myds->connect_retries_on_failure >0 ) { + myds->connect_retries_on_failure--; + //myds->destroy_MySQL_Connection(); + myds->destroy_MySQL_Connection_From_Pool(false); + NEXT_IMMEDIATE_NEW(CONNECTING_SERVER); + } else { + int myerr=mysql_errno(myconn->mysql); + if (myerr) { + char sqlstate[10]; + sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql)); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql)); + } else { + char buf[256]; + sprintf(buf,"Max connect failure while reaching hostgroup %d", current_hostgroup); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf); + } +// CurrentQuery.end(); +// myds->free_mysql_real_query(); +// client_myds->DSS=STATE_SLEEP; + RequestEnd(myds); + while (previous_status.size()) { + st=previous_status.top(); + previous_status.pop(); + } + //myds->destroy_MySQL_Connection(); + myds->destroy_MySQL_Connection_From_Pool( myerr ? true : false ); + myds->max_connect_time=0; + NEXT_IMMEDIATE_NEW(WAITING_CLIENT_DATA); + } + break; + case 1: // continue on next loop + default: + break; + } + } + return false; +} + int MySQL_Session::handler() { bool wrong_pass=false; @@ -1180,116 +1550,19 @@ handler_again: thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); } if (default_hostgroup>=0) { - if (client_myds->myconn->userinfo->hash!=mybe->server_myds->myconn->userinfo->hash) { - if (strcmp(client_myds->myconn->userinfo->username,myds->myconn->userinfo->username)) { - //previous_status.push(PROCESSING_QUERY); - switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - assert(0); - break; - } - NEXT_IMMEDIATE(CHANGING_USER_SERVER); - } - if (strcmp(client_myds->myconn->userinfo->schemaname,myds->myconn->userinfo->schemaname)) { - //previous_status.push(PROCESSING_QUERY); - switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - assert(0); - break; - } - NEXT_IMMEDIATE(CHANGING_SCHEMA); - } + if (handler_again___verify_backend_user_schema()) { + goto handler_again; } if (mirror==false) { // do not care about autocommit and charset if mirror - if (mybe->server_myds->myconn->options.init_connect_sent==false) { - // we needs to set it to true - mybe->server_myds->myconn->options.init_connect_sent=true; - if (mysql_thread___init_connect) { - // we send init connect queries only if set - mybe->server_myds->myconn->options.init_connect=strdup(mysql_thread___init_connect); - previous_status.push(PROCESSING_QUERY); - NEXT_IMMEDIATE(SETTING_INIT_CONNECT); - } + if (handler_again___verify_init_connect()) { + goto handler_again; } - if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) { - //previous_status.push(PROCESSING_QUERY); - switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - assert(0); - break; + if (handler_again___verify_backend_charset()) { + goto handler_again; } - NEXT_IMMEDIATE(CHANGING_CHARSET); - } - if (autocommit != mybe->server_myds->myconn->IsAutoCommit()) { - // see case #485 - if (mysql_thread___enforce_autocommit_on_reads == false) { - // enforce_autocommit_on_reads is disabled - // we need to check if it is a SELECT not FOR UPDATE - if (CurrentQuery.is_select_NOT_for_update()==false) { - //previous_status.push(PROCESSING_QUERY); - switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - assert(0); - break; - } - NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT); - } - } else { - // in every other cases, enforce autocommit - //previous_status.push(PROCESSING_QUERY); - switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - assert(0); - break; - } - NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT); + if (handler_again___verify_backend_autocommit()) { + goto handler_again; } - } if (status==PROCESSING_STMT_EXECUTE) { CurrentQuery.mysql_stmt=myconn->local_stmts->find(CurrentQuery.stmt_global_id); if (CurrentQuery.mysql_stmt==NULL) { @@ -1839,230 +2112,40 @@ handler_again: break; case SETTING_INIT_CONNECT: - assert(mybe->server_myds->myconn); { - MySQL_Data_Stream *myds=mybe->server_myds; - MySQL_Connection *myconn=myds->myconn; - myds->DSS=STATE_MARIADB_QUERY; - enum session_status st=status; - if (myds->mypolls==NULL) { - thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); - } - int rc=myconn->async_send_simple_command(myds->revents,myconn->options.init_connect,strlen(myconn->options.init_connect)); - if (rc==0) { - myds->revents|=POLLOUT; // we also set again POLLOUT to send a query immediately! - st=previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE(st); + int rc=0; + if (handler_again___status_SETTING_INIT_CONNECT(&rc)) { + goto handler_again; // we changed status } else { - if (rc==-1) { - // the command failed - int myerr=mysql_errno(myconn->mysql); - if (myerr > 2000) { - bool retry_conn=false; - // client error, serious - proxy_error("Detected a broken connection while setting INIT CONNECT on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql)); - //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { - if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - retry_conn=true; - } - myds->destroy_MySQL_Connection_From_Pool(false); - myds->fd=0; - if (retry_conn) { - myds->DSS=STATE_NOT_INITIALIZED; - //previous_status.push(PROCESSING_QUERY); - NEXT_IMMEDIATE(CONNECTING_SERVER); - } - return -1; - } else { - proxy_warning("Error while setting INIT CONNECT: %d, %s\n", myerr, mysql_error(myconn->mysql)); - // we won't go back to PROCESSING_QUERY - st=previous_status.top(); - previous_status.pop(); - char sqlstate[10]; - sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql)); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql)); - myds->destroy_MySQL_Connection_From_Pool(true); - myds->fd=0; - status=WAITING_CLIENT_DATA; - client_myds->DSS=STATE_SLEEP; - } - } else { - // rc==1 , nothing to do for now + if (rc==-1) { // we have an error we can't handle + return -1; } } } break; case CHANGING_SCHEMA: - //fprintf(stderr,"CHANGING_SCHEMA\n"); - assert(mybe->server_myds->myconn); { - MySQL_Data_Stream *myds=mybe->server_myds; - MySQL_Connection *myconn=myds->myconn; - myds->DSS=STATE_MARIADB_QUERY; - enum session_status st=status; - if (myds->mypolls==NULL) { - thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); - } - int rc=myconn->async_select_db(myds->revents); - if (rc==0) { - myds->myconn->userinfo->set(client_myds->myconn->userinfo); - st=previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE(st); + int rc=0; + if (handler_again___status_CHANGING_SCHEMA(&rc)) { + goto handler_again; // we changed status } else { - if (rc==-1) { - // the command failed - int myerr=mysql_errno(myconn->mysql); - if (myerr > 2000) { - bool retry_conn=false; - // client error, serious - proxy_error("Detected a broken connection during INIT_DB on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql)); - //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { - if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { - retry_conn=true; - } - myds->destroy_MySQL_Connection_From_Pool(false); - myds->fd=0; - if (retry_conn) { - myds->DSS=STATE_NOT_INITIALIZED; - //previous_status.push(PROCESSING_QUERY); - NEXT_IMMEDIATE(CONNECTING_SERVER); - } - return -1; - } else { - proxy_warning("Error during INIT_DB: %d, %s\n", myerr, mysql_error(myconn->mysql)); - // we won't go back to PROCESSING_QUERY - st=previous_status.top(); - previous_status.pop(); - char sqlstate[10]; - sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql)); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql)); -// CurrentQuery.end(); -// myds->free_mysql_real_query(); - myds->destroy_MySQL_Connection_From_Pool(true); - myds->fd=0; -// status=WAITING_CLIENT_DATA; -// client_myds->DSS=STATE_SLEEP; - RequestEnd(myds); - } - } else { - // rc==1 , nothing to do for now + if (rc==-1) { // we have an error we can't handle + return -1; } } } break; case CONNECTING_SERVER: - //fprintf(stderr,"CONNECTING_SERVER\n"); - if (mybe->server_myds->max_connect_time) { - if (thread->curtime >= mybe->server_myds->max_connect_time) { - char buf[256]; - sprintf(buf,"Max connect timeout reached while reaching hostgroup %d after %llums", current_hostgroup, (thread->curtime - CurrentQuery.start_time)/1000 ); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf); -// CurrentQuery.end(); -// mybe->server_myds->free_mysql_real_query(); -// client_myds->DSS=STATE_SLEEP; - RequestEnd(mybe->server_myds); - //enum session_status st; - while (previous_status.size()) { - previous_status.top(); - previous_status.pop(); - } - if (mybe->server_myds->myconn) { - //mybe->server_myds->destroy_MySQL_Connection(); - mybe->server_myds->destroy_MySQL_Connection_From_Pool(false); + { + int rc=0; + if (handler_again___status_CONNECTING_SERVER(&rc)) { + goto handler_again; // we changed status + } else { + if (rc==1) { //handler_again___status_CONNECTING_SERVER returns 1 + goto __exit_DSS__STATE_NOT_INITIALIZED; } - mybe->server_myds->max_connect_time=0; - NEXT_IMMEDIATE(WAITING_CLIENT_DATA); - } - } - if (mybe->server_myds->myconn==NULL) { - handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection(); - } - if (mybe->server_myds->myconn==NULL) { - pause_until=thread->curtime+mysql_thread___connect_retries_delay*1000; - goto __exit_DSS__STATE_NOT_INITIALIZED; - } else { - MySQL_Data_Stream *myds=mybe->server_myds; - MySQL_Connection *myconn=myds->myconn; - int rc; - if (default_hostgroup<0) { - // we are connected to a Admin module backend - // we pretend to set a user variable to disable multiplexing - myconn->set_status_user_variable(true); - } - enum session_status st=status; - if (mybe->server_myds->myconn->async_state_machine==ASYNC_IDLE) { - st=previous_status.top(); - previous_status.pop(); - NEXT_IMMEDIATE(st); - assert(0); - } - assert(st==status); - unsigned long long curtime=monotonic_time(); - //mybe->server_myds->myprot.init(&mybe->server_myds, mybe->server_myds->myconn->userinfo, this); -/* */ - assert(myconn->async_state_machine!=ASYNC_IDLE); - rc=myconn->async_connect(myds->revents); - if (myds->mypolls==NULL) { - // connection yet not in mypolls - myds->assign_fd_from_mysql_conn(); - thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, curtime); - } - switch (rc) { - case 0: - myds->myds_type=MYDS_BACKEND; - myds->DSS=STATE_MARIADB_GENERIC; - status=WAITING_CLIENT_DATA; - st=previous_status.top(); - previous_status.pop(); - myds->wait_until=0; - if (session_fast_forward==true) { - // we have a successful connection and session_fast_forward enabled - // set DSS=STATE_SLEEP or it will believe it have to use MARIADB client library - myds->DSS=STATE_SLEEP; - } - NEXT_IMMEDIATE(st); - break; - case -1: - case -2: - // FIXME: experimental - //wrong_pass=true; - if (myds->connect_retries_on_failure >0 ) { - myds->connect_retries_on_failure--; - //myds->destroy_MySQL_Connection(); - myds->destroy_MySQL_Connection_From_Pool(false); - NEXT_IMMEDIATE(CONNECTING_SERVER); - } else { - int myerr=mysql_errno(myconn->mysql); - if (myerr) { - char sqlstate[10]; - sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql)); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql)); - } else { - char buf[256]; - sprintf(buf,"Max connect failure while reaching hostgroup %d", current_hostgroup); - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf); - } -// CurrentQuery.end(); -// myds->free_mysql_real_query(); -// client_myds->DSS=STATE_SLEEP; - RequestEnd(myds); - while (previous_status.size()) { - st=previous_status.top(); - previous_status.pop(); - } - //myds->destroy_MySQL_Connection(); - myds->destroy_MySQL_Connection_From_Pool( myerr ? true : false ); - myds->max_connect_time=0; - NEXT_IMMEDIATE(WAITING_CLIENT_DATA); - } - break; - case 1: // continue on next loop - default: - break; } } break;