Simplifying MySQL_Session::Handler() , 2

Added functions:
* handler_again___verify_backend_charset();
* handler_again___verify_init_connect();
* handler_again___verify_backend_autocommit();
* handler_again___verify_backend_user_schema();
* handler_again___status_SETTING_INIT_CONNECT(int *);
* handler_again___status_CHANGING_SCHEMA(int *);
* handler_again___status_CONNECTING_SERVER(int *);
pull/739/head
René Cannaò 10 years ago
parent da8d6d8cf3
commit 8f3c0e1b2f

@ -76,6 +76,15 @@ class MySQL_Session
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
int handler_again___status_PINGING_SERVER();
void handler_again___new_thread_to_kill_connection();
bool handler_again___verify_backend_charset();
bool handler_again___verify_init_connect();
bool handler_again___verify_backend_autocommit();
bool handler_again___verify_backend_user_schema();
bool handler_again___status_SETTING_INIT_CONNECT(int *);
bool handler_again___status_CHANGING_SCHEMA(int *);
bool handler_again___status_CONNECTING_SERVER(int *);
// void return_MySQL_Connection_To_Poll(MySQL_Data_Stream *);

@ -705,7 +705,377 @@ void MySQL_Session::handler_again___new_thread_to_kill_connection() {
}
}
// NEXT_IMMEDIATE is a legacy macro used inside handler() to immediately jump
// to handler_again
#define NEXT_IMMEDIATE(new_st) do { set_status(new_st); goto handler_again; } while (0)
// NEXT_IMMEDIATE_NEW is a new macro to use *outside* handler().
// handler() should check the return code of the function it calls, and if
// true should jump to handler_again
#define NEXT_IMMEDIATE_NEW(new_st) do { set_status(new_st); return true; } while (0)
bool MySQL_Session::handler_again___verify_backend_charset() {
if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE_NEW(CHANGING_CHARSET);
}
return false;
}
bool MySQL_Session::handler_again___verify_init_connect() {
if (mybe->server_myds->myconn->options.init_connect_sent==false) {
// we needs to set it to true
mybe->server_myds->myconn->options.init_connect_sent=true;
if (mysql_thread___init_connect) {
// we send init connect queries only if set
mybe->server_myds->myconn->options.init_connect=strdup(mysql_thread___init_connect);
previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE_NEW(SETTING_INIT_CONNECT);
}
}
return false;
}
bool MySQL_Session::handler_again___verify_backend_autocommit() {
if (autocommit != mybe->server_myds->myconn->IsAutoCommit()) {
// see case #485
if (mysql_thread___enforce_autocommit_on_reads == false) {
// enforce_autocommit_on_reads is disabled
// we need to check if it is a SELECT not FOR UPDATE
if (CurrentQuery.is_select_NOT_for_update()==false) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT);
}
} else {
// in every other cases, enforce autocommit
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT);
}
}
return false;
}
bool MySQL_Session::handler_again___verify_backend_user_schema() {
MySQL_Data_Stream *myds=mybe->server_myds;
if (client_myds->myconn->userinfo->hash!=mybe->server_myds->myconn->userinfo->hash) {
if (strcmp(client_myds->myconn->userinfo->username,myds->myconn->userinfo->username)) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE_NEW(CHANGING_USER_SERVER);
}
if (strcmp(client_myds->myconn->userinfo->schemaname,myds->myconn->userinfo->schemaname)) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE_NEW(CHANGING_SCHEMA);
}
}
return false;
}
bool MySQL_Session::handler_again___status_SETTING_INIT_CONNECT(int *_rc) {
bool ret=false;
assert(mybe->server_myds->myconn);
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
myds->DSS=STATE_MARIADB_QUERY;
enum session_status st=status;
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
int rc=myconn->async_send_simple_command(myds->revents,myconn->options.init_connect,strlen(myconn->options.init_connect));
if (rc==0) {
myds->revents|=POLLOUT; // we also set again POLLOUT to send a query immediately!
st=previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
} else {
if (rc==-1) {
// the command failed
int myerr=mysql_errno(myconn->mysql);
if (myerr > 2000) {
bool retry_conn=false;
// client error, serious
proxy_error("Detected a broken connection while setting INIT CONNECT on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql));
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
retry_conn=true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
//previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
}
*_rc=-1; // an error happened, we should destroy the Session
return ret;
} else {
proxy_warning("Error while setting INIT CONNECT: %d, %s\n", myerr, mysql_error(myconn->mysql));
// we won't go back to PROCESSING_QUERY
st=previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql));
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd=0;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
} else {
// rc==1 , nothing to do for now
}
}
return ret;
}
bool MySQL_Session::handler_again___status_CHANGING_SCHEMA(int *_rc) {
bool ret=false;
//fprintf(stderr,"CHANGING_SCHEMA\n");
assert(mybe->server_myds->myconn);
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
myds->DSS=STATE_MARIADB_QUERY;
enum session_status st=status;
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
int rc=myconn->async_select_db(myds->revents);
if (rc==0) {
myds->myconn->userinfo->set(client_myds->myconn->userinfo);
st=previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
} else {
if (rc==-1) {
// the command failed
int myerr=mysql_errno(myconn->mysql);
if (myerr > 2000) {
bool retry_conn=false;
// client error, serious
proxy_error("Detected a broken connection during INIT_DB on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql));
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
retry_conn=true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
//previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
}
*_rc=-1; // an error happened, we should destroy the Session
return ret;
} else {
proxy_warning("Error during INIT_DB: %d, %s\n", myerr, mysql_error(myconn->mysql));
// we won't go back to PROCESSING_QUERY
st=previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql));
// CurrentQuery.end();
// myds->free_mysql_real_query();
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd=0;
// status=WAITING_CLIENT_DATA;
// client_myds->DSS=STATE_SLEEP;
RequestEnd(myds);
}
} else {
// rc==1 , nothing to do for now
}
}
return false;
}
bool MySQL_Session::handler_again___status_CONNECTING_SERVER(int *_rc) {
//fprintf(stderr,"CONNECTING_SERVER\n");
if (mybe->server_myds->max_connect_time) {
if (thread->curtime >= mybe->server_myds->max_connect_time) {
char buf[256];
sprintf(buf,"Max connect timeout reached while reaching hostgroup %d after %llums", current_hostgroup, (thread->curtime - CurrentQuery.start_time)/1000 );
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf);
// CurrentQuery.end();
// mybe->server_myds->free_mysql_real_query();
// client_myds->DSS=STATE_SLEEP;
RequestEnd(mybe->server_myds);
//enum session_status st;
while (previous_status.size()) {
previous_status.top();
previous_status.pop();
}
if (mybe->server_myds->myconn) {
//mybe->server_myds->destroy_MySQL_Connection();
mybe->server_myds->destroy_MySQL_Connection_From_Pool(false);
}
mybe->server_myds->max_connect_time=0;
NEXT_IMMEDIATE_NEW(WAITING_CLIENT_DATA);
}
}
if (mybe->server_myds->myconn==NULL) {
handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection();
}
if (mybe->server_myds->myconn==NULL) {
pause_until=thread->curtime+mysql_thread___connect_retries_delay*1000;
//goto __exit_DSS__STATE_NOT_INITIALIZED;
*_rc=1;
return false;
} else {
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
int rc;
if (default_hostgroup<0) {
// we are connected to a Admin module backend
// we pretend to set a user variable to disable multiplexing
myconn->set_status_user_variable(true);
}
enum session_status st=status;
if (mybe->server_myds->myconn->async_state_machine==ASYNC_IDLE) {
st=previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
assert(0);
}
assert(st==status);
unsigned long long curtime=monotonic_time();
//mybe->server_myds->myprot.init(&mybe->server_myds, mybe->server_myds->myconn->userinfo, this);
/* */
assert(myconn->async_state_machine!=ASYNC_IDLE);
rc=myconn->async_connect(myds->revents);
if (myds->mypolls==NULL) {
// connection yet not in mypolls
myds->assign_fd_from_mysql_conn();
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, curtime);
}
switch (rc) {
case 0:
myds->myds_type=MYDS_BACKEND;
myds->DSS=STATE_MARIADB_GENERIC;
status=WAITING_CLIENT_DATA;
st=previous_status.top();
previous_status.pop();
myds->wait_until=0;
if (session_fast_forward==true) {
// we have a successful connection and session_fast_forward enabled
// set DSS=STATE_SLEEP or it will believe it have to use MARIADB client library
myds->DSS=STATE_SLEEP;
}
NEXT_IMMEDIATE_NEW(st);
break;
case -1:
case -2:
// FIXME: experimental
//wrong_pass=true;
if (myds->connect_retries_on_failure >0 ) {
myds->connect_retries_on_failure--;
//myds->destroy_MySQL_Connection();
myds->destroy_MySQL_Connection_From_Pool(false);
NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
} else {
int myerr=mysql_errno(myconn->mysql);
if (myerr) {
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql));
} else {
char buf[256];
sprintf(buf,"Max connect failure while reaching hostgroup %d", current_hostgroup);
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf);
}
// CurrentQuery.end();
// myds->free_mysql_real_query();
// client_myds->DSS=STATE_SLEEP;
RequestEnd(myds);
while (previous_status.size()) {
st=previous_status.top();
previous_status.pop();
}
//myds->destroy_MySQL_Connection();
myds->destroy_MySQL_Connection_From_Pool( myerr ? true : false );
myds->max_connect_time=0;
NEXT_IMMEDIATE_NEW(WAITING_CLIENT_DATA);
}
break;
case 1: // continue on next loop
default:
break;
}
}
return false;
}
int MySQL_Session::handler() {
bool wrong_pass=false;
@ -1180,116 +1550,19 @@ handler_again:
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
if (default_hostgroup>=0) {
if (client_myds->myconn->userinfo->hash!=mybe->server_myds->myconn->userinfo->hash) {
if (strcmp(client_myds->myconn->userinfo->username,myds->myconn->userinfo->username)) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_USER_SERVER);
}
if (strcmp(client_myds->myconn->userinfo->schemaname,myds->myconn->userinfo->schemaname)) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_SCHEMA);
}
if (handler_again___verify_backend_user_schema()) {
goto handler_again;
}
if (mirror==false) { // do not care about autocommit and charset if mirror
if (mybe->server_myds->myconn->options.init_connect_sent==false) {
// we needs to set it to true
mybe->server_myds->myconn->options.init_connect_sent=true;
if (mysql_thread___init_connect) {
// we send init connect queries only if set
mybe->server_myds->myconn->options.init_connect=strdup(mysql_thread___init_connect);
previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE(SETTING_INIT_CONNECT);
}
if (handler_again___verify_init_connect()) {
goto handler_again;
}
if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
if (handler_again___verify_backend_charset()) {
goto handler_again;
}
NEXT_IMMEDIATE(CHANGING_CHARSET);
}
if (autocommit != mybe->server_myds->myconn->IsAutoCommit()) {
// see case #485
if (mysql_thread___enforce_autocommit_on_reads == false) {
// enforce_autocommit_on_reads is disabled
// we need to check if it is a SELECT not FOR UPDATE
if (CurrentQuery.is_select_NOT_for_update()==false) {
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT);
}
} else {
// in every other cases, enforce autocommit
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT);
if (handler_again___verify_backend_autocommit()) {
goto handler_again;
}
}
if (status==PROCESSING_STMT_EXECUTE) {
CurrentQuery.mysql_stmt=myconn->local_stmts->find(CurrentQuery.stmt_global_id);
if (CurrentQuery.mysql_stmt==NULL) {
@ -1839,230 +2112,40 @@ handler_again:
break;
case SETTING_INIT_CONNECT:
assert(mybe->server_myds->myconn);
{
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
myds->DSS=STATE_MARIADB_QUERY;
enum session_status st=status;
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
int rc=myconn->async_send_simple_command(myds->revents,myconn->options.init_connect,strlen(myconn->options.init_connect));
if (rc==0) {
myds->revents|=POLLOUT; // we also set again POLLOUT to send a query immediately!
st=previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE(st);
int rc=0;
if (handler_again___status_SETTING_INIT_CONNECT(&rc)) {
goto handler_again; // we changed status
} else {
if (rc==-1) {
// the command failed
int myerr=mysql_errno(myconn->mysql);
if (myerr > 2000) {
bool retry_conn=false;
// client error, serious
proxy_error("Detected a broken connection while setting INIT CONNECT on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql));
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
retry_conn=true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
//previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
return -1;
} else {
proxy_warning("Error while setting INIT CONNECT: %d, %s\n", myerr, mysql_error(myconn->mysql));
// we won't go back to PROCESSING_QUERY
st=previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql));
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd=0;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
} else {
// rc==1 , nothing to do for now
if (rc==-1) { // we have an error we can't handle
return -1;
}
}
}
break;
case CHANGING_SCHEMA:
//fprintf(stderr,"CHANGING_SCHEMA\n");
assert(mybe->server_myds->myconn);
{
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
myds->DSS=STATE_MARIADB_QUERY;
enum session_status st=status;
if (myds->mypolls==NULL) {
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
int rc=myconn->async_select_db(myds->revents);
if (rc==0) {
myds->myconn->userinfo->set(client_myds->myconn->userinfo);
st=previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE(st);
int rc=0;
if (handler_again___status_CHANGING_SCHEMA(&rc)) {
goto handler_again; // we changed status
} else {
if (rc==-1) {
// the command failed
int myerr=mysql_errno(myconn->mysql);
if (myerr > 2000) {
bool retry_conn=false;
// client error, serious
proxy_error("Detected a broken connection during INIT_DB on %s , %d : %d, %s\n", myconn->parent->address, myconn->parent->port, myerr, mysql_error(myconn->mysql));
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
retry_conn=true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
//previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
return -1;
} else {
proxy_warning("Error during INIT_DB: %d, %s\n", myerr, mysql_error(myconn->mysql));
// we won't go back to PROCESSING_QUERY
st=previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql));
// CurrentQuery.end();
// myds->free_mysql_real_query();
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd=0;
// status=WAITING_CLIENT_DATA;
// client_myds->DSS=STATE_SLEEP;
RequestEnd(myds);
}
} else {
// rc==1 , nothing to do for now
if (rc==-1) { // we have an error we can't handle
return -1;
}
}
}
break;
case CONNECTING_SERVER:
//fprintf(stderr,"CONNECTING_SERVER\n");
if (mybe->server_myds->max_connect_time) {
if (thread->curtime >= mybe->server_myds->max_connect_time) {
char buf[256];
sprintf(buf,"Max connect timeout reached while reaching hostgroup %d after %llums", current_hostgroup, (thread->curtime - CurrentQuery.start_time)/1000 );
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf);
// CurrentQuery.end();
// mybe->server_myds->free_mysql_real_query();
// client_myds->DSS=STATE_SLEEP;
RequestEnd(mybe->server_myds);
//enum session_status st;
while (previous_status.size()) {
previous_status.top();
previous_status.pop();
}
if (mybe->server_myds->myconn) {
//mybe->server_myds->destroy_MySQL_Connection();
mybe->server_myds->destroy_MySQL_Connection_From_Pool(false);
{
int rc=0;
if (handler_again___status_CONNECTING_SERVER(&rc)) {
goto handler_again; // we changed status
} else {
if (rc==1) { //handler_again___status_CONNECTING_SERVER returns 1
goto __exit_DSS__STATE_NOT_INITIALIZED;
}
mybe->server_myds->max_connect_time=0;
NEXT_IMMEDIATE(WAITING_CLIENT_DATA);
}
}
if (mybe->server_myds->myconn==NULL) {
handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection();
}
if (mybe->server_myds->myconn==NULL) {
pause_until=thread->curtime+mysql_thread___connect_retries_delay*1000;
goto __exit_DSS__STATE_NOT_INITIALIZED;
} else {
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=myds->myconn;
int rc;
if (default_hostgroup<0) {
// we are connected to a Admin module backend
// we pretend to set a user variable to disable multiplexing
myconn->set_status_user_variable(true);
}
enum session_status st=status;
if (mybe->server_myds->myconn->async_state_machine==ASYNC_IDLE) {
st=previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE(st);
assert(0);
}
assert(st==status);
unsigned long long curtime=monotonic_time();
//mybe->server_myds->myprot.init(&mybe->server_myds, mybe->server_myds->myconn->userinfo, this);
/* */
assert(myconn->async_state_machine!=ASYNC_IDLE);
rc=myconn->async_connect(myds->revents);
if (myds->mypolls==NULL) {
// connection yet not in mypolls
myds->assign_fd_from_mysql_conn();
thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, curtime);
}
switch (rc) {
case 0:
myds->myds_type=MYDS_BACKEND;
myds->DSS=STATE_MARIADB_GENERIC;
status=WAITING_CLIENT_DATA;
st=previous_status.top();
previous_status.pop();
myds->wait_until=0;
if (session_fast_forward==true) {
// we have a successful connection and session_fast_forward enabled
// set DSS=STATE_SLEEP or it will believe it have to use MARIADB client library
myds->DSS=STATE_SLEEP;
}
NEXT_IMMEDIATE(st);
break;
case -1:
case -2:
// FIXME: experimental
//wrong_pass=true;
if (myds->connect_retries_on_failure >0 ) {
myds->connect_retries_on_failure--;
//myds->destroy_MySQL_Connection();
myds->destroy_MySQL_Connection_From_Pool(false);
NEXT_IMMEDIATE(CONNECTING_SERVER);
} else {
int myerr=mysql_errno(myconn->mysql);
if (myerr) {
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,mysql_errno(myconn->mysql),sqlstate,mysql_error(myconn->mysql));
} else {
char buf[256];
sprintf(buf,"Max connect failure while reaching hostgroup %d", current_hostgroup);
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",buf);
}
// CurrentQuery.end();
// myds->free_mysql_real_query();
// client_myds->DSS=STATE_SLEEP;
RequestEnd(myds);
while (previous_status.size()) {
st=previous_status.top();
previous_status.pop();
}
//myds->destroy_MySQL_Connection();
myds->destroy_MySQL_Connection_From_Pool( myerr ? true : false );
myds->max_connect_time=0;
NEXT_IMMEDIATE(WAITING_CLIENT_DATA);
}
break;
case 1: // continue on next loop
default:
break;
}
}
break;

Loading…
Cancel
Save