Track and handle autocommit (#438)

- MySQL_Session::handler_special_queries() processes set autocommit from clients
- to the client is always returned the autocommit value set in the session
- set autocommit is either just acknowledged to the client, or forwarded to the backend based on the status of transactions
- unless there are query rules to handle SET AUTOCOMMIT , the statement is forwarded to the first found active transaction: this assumes there is just one transactions. Although multiple transactions are possible, handling multiple transactions at the same time seems a dangerous adventure
pull/447/head
René Cannaò 11 years ago
parent b420501d85
commit c3e6fda7a4

@ -87,6 +87,7 @@ class MySQL_Session
Query_Processor_Output *qpo; Query_Processor_Output *qpo;
StatCounters *command_counters; StatCounters *command_counters;
int healthy; int healthy;
bool autocommit;
bool killed; bool killed;
bool admin; bool admin;
bool max_connections_reached; bool max_connections_reached;
@ -101,6 +102,7 @@ class MySQL_Session
int current_hostgroup; int current_hostgroup;
int default_hostgroup; int default_hostgroup;
int active_transactions; int active_transactions;
int autocommit_on_hostgroup;
char * default_schema; char * default_schema;
bool schema_locked; bool schema_locked;
bool transaction_persistent; bool transaction_persistent;
@ -159,6 +161,7 @@ class MySQL_Session
void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *); void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *);
void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS); void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS);
unsigned int NumActiveTransactions(); unsigned int NumActiveTransactions();
int FindOneActiveTransaction();
unsigned long long IdleTime(); unsigned long long IdleTime();
void reset_all_backends(); void reset_all_backends();

@ -123,6 +123,7 @@ class MySQL_Connection {
int async_ping(short event); int async_ping(short event);
void async_free_result(); void async_free_result();
bool IsActiveTransaction(); bool IsActiveTransaction();
bool IsAutoCommit();
bool MultiplexDisabled(); bool MultiplexDisabled();
void ProcessQueryAndSetStatusFlags(char *query_digest_text); void ProcessQueryAndSetStatusFlags(char *query_digest_text);
}; };

@ -1925,6 +1925,7 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
// first EOF // first EOF
unsigned int nTrx=myds->sess->NumActiveTransactions(); unsigned int nTrx=myds->sess->NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus); myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus);
sid++; sid++;
PSarrayOUT->add(pkt.ptr,pkt.size); PSarrayOUT->add(pkt.ptr,pkt.size);
@ -1958,6 +1959,7 @@ void MySQL_ResultSet::add_eof() {
PtrSize_t pkt; PtrSize_t pkt;
unsigned int nTrx=myds->sess->NumActiveTransactions(); unsigned int nTrx=myds->sess->NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus); myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus);
PSarrayOUT->add(pkt.ptr,pkt.size); PSarrayOUT->add(pkt.ptr,pkt.size);
sid++; sid++;

@ -164,6 +164,8 @@ MySQL_Session::MySQL_Session() {
qpo=NULL; qpo=NULL;
command_counters=new StatCounters(15,10,false); command_counters=new StatCounters(15,10,false);
healthy=1; healthy=1;
autocommit=true;
autocommit_on_hostgroup=-1;
killed=false; killed=false;
admin=false; admin=false;
connections_handler=false; connections_handler=false;
@ -294,6 +296,7 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) {
sprintf(buf,"%u",last_insert_id); sprintf(buf,"%u",last_insert_id);
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
MySQL_Data_Stream *myds=client_myds; MySQL_Data_Stream *myds=client_myds;
MySQL_Protocol *myprot=&client_myds->myprot; MySQL_Protocol *myprot=&client_myds->myprot;
myds->DSS=STATE_QUERY_SENT_DS; myds->DSS=STATE_QUERY_SENT_DS;
@ -301,19 +304,20 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) {
myprot->generate_pkt_column_count(true,NULL,NULL,sid,1); sid++; myprot->generate_pkt_column_count(true,NULL,NULL,sid,1); sid++;
myprot->generate_pkt_field(true,NULL,NULL,sid,(char *)"",(char *)"",(char *)"",(char *)"LAST_INSERT_ID()",(char *)"",33,15,MYSQL_TYPE_VAR_STRING,1,0x1f,false,0,NULL); sid++; myprot->generate_pkt_field(true,NULL,NULL,sid,(char *)"",(char *)"",(char *)"",(char *)"LAST_INSERT_ID()",(char *)"",33,15,MYSQL_TYPE_VAR_STRING,1,0x1f,false,0,NULL); sid++;
myds->DSS=STATE_COLUMN_DEFINITION; myds->DSS=STATE_COLUMN_DEFINITION;
myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, 2 | setStatus); sid++; myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, setStatus); sid++;
char **p=(char **)malloc(sizeof(char*)*1); char **p=(char **)malloc(sizeof(char*)*1);
unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*1); unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*1);
l[0]=strlen(buf);; l[0]=strlen(buf);;
p[0]=buf; p[0]=buf;
myprot->generate_pkt_row(true,NULL,NULL,sid,1,l,p); sid++; myprot->generate_pkt_row(true,NULL,NULL,sid,1,l,p); sid++;
myds->DSS=STATE_ROW; myds->DSS=STATE_ROW;
myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, 2 | setStatus); sid++; myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, setStatus); sid++;
myds->DSS=STATE_SLEEP; myds->DSS=STATE_SLEEP;
l_free(pkt->size,pkt->ptr); l_free(pkt->size,pkt->ptr);
return true; return true;
} }
if (pkt->size==SELECT_VERSION_COMMENT_LEN+5 && strncmp((char *)SELECT_VERSION_COMMENT,(char *)pkt->ptr+5,pkt->size-5)==0) { if (pkt->size==SELECT_VERSION_COMMENT_LEN+5 && strncmp((char *)SELECT_VERSION_COMMENT,(char *)pkt->ptr+5,pkt->size-5)==0) {
// FIXME: this doesn't return AUTOCOMMIT or IN_TRANS
PtrSize_t pkt_2; PtrSize_t pkt_2;
pkt_2.size=PROXYSQL_VERSION_COMMENT_LEN; pkt_2.size=PROXYSQL_VERSION_COMMENT_LEN;
pkt_2.ptr=l_alloc(pkt_2.size); pkt_2.ptr=l_alloc(pkt_2.size);
@ -325,6 +329,7 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) {
return true; return true;
} }
if (pkt->size==strlen((char *)"select USER()")+5 && strncmp((char *)"select USER()",(char *)pkt->ptr+5,pkt->size-5)==0) { if (pkt->size==strlen((char *)"select USER()")+5 && strncmp((char *)"select USER()",(char *)pkt->ptr+5,pkt->size-5)==0) {
// FIXME: this doesn't return AUTOCOMMIT or IN_TRANS
char *query1=(char *)"SELECT \"%s\" AS 'USER()'"; char *query1=(char *)"SELECT \"%s\" AS 'USER()'";
char *query2=(char *)malloc(strlen(query1)+strlen(client_myds->myconn->userinfo->username)+10); char *query2=(char *)malloc(strlen(query1)+strlen(client_myds->myconn->userinfo->username)+10);
sprintf(query2,query1,client_myds->myconn->userinfo->username); sprintf(query2,query1,client_myds->myconn->userinfo->username);
@ -353,7 +358,8 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) {
client_myds->myconn->set_charset(c->nr); client_myds->myconn->set_charset(c->nr);
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,0|setStatus,0,NULL); if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
} }
client_myds->DSS=STATE_SLEEP; client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA; status=WAITING_CLIENT_DATA;
@ -361,6 +367,83 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) {
free(name); free(name);
return true; return true;
} }
size_t sal=strlen("set autocommit");
if ( pkt->size > 7+sal) {
if (strncasecmp((char *)"set autocommit",(char *)pkt->ptr+5,sal)==0) {
unsigned int i;
bool eq=false;
int fd=-1; // first digit
for (i=5+sal;i<pkt->size;i++) {
char c=((char *)pkt->ptr)[i];
if (c!='0' && c!='1' && c!=' ' && c!='=') return false; // found a not valid char
if (eq==false) {
if (c!=' ' && c!='=') return false; // found a not valid char
if (c=='=') eq=true;
} else {
if (c!='0' && c!='1' && c!=' ') return false; // found a not valid char
if (fd==-1) {
if (c=='0' || c=='1') { // found first digit
if (c=='0')
fd=0;
else
fd=1;
}
} else {
if (c=='0' || c=='1') { // found second digit
return false;
}
}
}
}
if (fd >= 0) { // we can set autocommit
// we immeditately process the number of transactions
unsigned int nTrx=NumActiveTransactions();
if (fd==1 && autocommit==true) {
// nothing to do, return OK
goto __ret_autocommit_OK;
}
if (fd==1 && autocommit==false) {
if (nTrx) {
// there is an active transaction, we need to forward it
// because this can potentially close the transaction
autocommit=true;
autocommit_on_hostgroup=FindOneActiveTransaction();
return false;
} else {
// as there is no active transaction, we do no need to forward it
// just change internal state
autocommit=false;
goto __ret_autocommit_OK;
}
}
if (fd==0) {
autocommit=false; // we set it, no matter if already set or not
if (nTrx) {
// there is an active transaction, we need to forward it
// because this can potentially close the transaction
autocommit_on_hostgroup=FindOneActiveTransaction();
return false;
} else {
// as there is no active transaction, we do no need to forward it
// just return OK
goto __ret_autocommit_OK;
}
}
__ret_autocommit_OK:
client_myds->DSS=STATE_QUERY_SENT_NET;
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
l_free(pkt->size,pkt->ptr);
return true;
}
}
}
return false; return false;
} }
@ -473,10 +556,13 @@ __get_pkts_from_client:
RequestEnd(NULL); RequestEnd(NULL);
break; break;
} }
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
if (qpo) { assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt);
if (rc_break==true) { break; } if (rc_break==true) { break; }
if (autocommit_on_hostgroup>=0) {
} }
mybe=find_or_create_backend(current_hostgroup); mybe=find_or_create_backend(current_hostgroup);
status=PROCESSING_QUERY; status=PROCESSING_QUERY;
@ -1390,6 +1476,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
if (v==1) { if (v==1) {
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_EOF(true,NULL,NULL,1,0, setStatus ); client_myds->myprot.generate_pkt_EOF(true,NULL,NULL,1,0, setStatus );
} else { } else {
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)""); client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"");
@ -1404,7 +1491,8 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
client_myds->setDSS_STATE_QUERY_SENT_NET(); client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2|setStatus,0,NULL); if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP; client_myds->DSS=STATE_SLEEP;
} }
@ -1462,14 +1550,16 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
client_myds->setDSS_STATE_QUERY_SENT_NET(); client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0|setStatus,NULL); if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2,setStatus,NULL);
client_myds->DSS=STATE_SLEEP; client_myds->DSS=STATE_SLEEP;
} else { } else {
l_free(pkt->size,pkt->ptr); l_free(pkt->size,pkt->ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET(); client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2|setStatus,0,NULL); if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP; client_myds->DSS=STATE_SLEEP;
} }
} }
@ -1531,6 +1621,16 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
current_hostgroup=qpo->destination_hostgroup; current_hostgroup=qpo->destination_hostgroup;
} }
} }
if (autocommit_on_hostgroup >= 0) {
// the query is a "set autocommit=0"
// we set current_hostgroup=autocommit_on_hostgroup if possible
if (transaction_persistent_hostgroup == -1) {
if (qpo->destination_hostgroup==-1) {
current_hostgroup=autocommit_on_hostgroup;
}
}
autocommit_on_hostgroup=-1; // at the end, always reset autocommit_on_hostgroup to -1
}
return false; return false;
} }
@ -1669,6 +1769,7 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My
unsigned int num_rows = mysql_affected_rows(mysql); unsigned int num_rows = mysql_affected_rows(mysql);
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,num_rows,mysql->insert_id,mysql->server_status|setStatus,mysql->warning_count,mysql->info); client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,num_rows,mysql->insert_id,mysql->server_status|setStatus,mysql->warning_count,mysql->info);
} else { } else {
// error // error
@ -1695,7 +1796,8 @@ void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int af
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,2 | setStatus ); sid++; if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, setStatus ); sid++;
char **p=(char **)malloc(sizeof(char*)*result->columns); char **p=(char **)malloc(sizeof(char*)*result->columns);
unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*result->columns); unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*result->columns);
//p[0]="column test"; //p[0]="column test";
@ -1720,7 +1822,8 @@ void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int af
// no error, DML succeeded // no error, DML succeeded
unsigned int nTrx=NumActiveTransactions(); unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,0|setStatus,0,NULL); if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,setStatus,0,NULL);
} }
myds->DSS=STATE_SLEEP; myds->DSS=STATE_SLEEP;
} }
@ -1747,6 +1850,21 @@ unsigned int MySQL_Session::NumActiveTransactions() {
return ret; return ret;
} }
int MySQL_Session::FindOneActiveTransaction() {
int ret=-1;
if (mybes==0) return ret;
MySQL_Backend *_mybe;
unsigned int i;
for (i=0; i < mybes->len; i++) {
_mybe=(MySQL_Backend *)mybes->index(i);
if (_mybe->server_myds)
if (_mybe->server_myds->myconn)
if (_mybe->server_myds->myconn->IsActiveTransaction())
return (int)_mybe->server_myds->myconn->parent->myhgc->hid;
}
return ret;
}
unsigned long long MySQL_Session::IdleTime() { unsigned long long MySQL_Session::IdleTime() {
if (client_myds==0) return 0; if (client_myds==0) return 0;
if (status!=WAITING_CLIENT_DATA) return 0; if (status!=WAITING_CLIENT_DATA) return 0;

@ -922,6 +922,14 @@ bool MySQL_Connection::IsActiveTransaction() {
return ret; return ret;
} }
bool MySQL_Connection::IsAutoCommit() {
bool ret=false;
if (mysql) {
ret = (mysql->server_status & SERVER_STATUS_AUTOCOMMIT);
}
return ret;
}
bool MySQL_Connection::MultiplexDisabled() { bool MySQL_Connection::MultiplexDisabled() {
// status_flags stores information about the status of the connection // status_flags stores information about the status of the connection
// can be used to determine if multiplexing can be enabled or not // can be used to determine if multiplexing can be enabled or not

Loading…
Cancel
Save