diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 1ce64ef8c..b163f55a9 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -87,6 +87,7 @@ class MySQL_Session Query_Processor_Output *qpo; StatCounters *command_counters; int healthy; + bool autocommit; bool killed; bool admin; bool max_connections_reached; @@ -101,6 +102,7 @@ class MySQL_Session int current_hostgroup; int default_hostgroup; int active_transactions; + int autocommit_on_hostgroup; char * default_schema; bool schema_locked; bool transaction_persistent; @@ -159,6 +161,7 @@ class MySQL_Session void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *); void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS); unsigned int NumActiveTransactions(); + int FindOneActiveTransaction(); unsigned long long IdleTime(); void reset_all_backends(); diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 143995b1e..09fbbd4a1 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -123,6 +123,7 @@ class MySQL_Connection { int async_ping(short event); void async_free_result(); bool IsActiveTransaction(); + bool IsAutoCommit(); bool MultiplexDisabled(); void ProcessQueryAndSetStatusFlags(char *query_digest_text); }; diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index a41d3d80e..3a5c5b77d 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -1925,6 +1925,7 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL // first EOF unsigned int nTrx=myds->sess->NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus); sid++; PSarrayOUT->add(pkt.ptr,pkt.size); @@ -1958,6 +1959,7 @@ void MySQL_ResultSet::add_eof() { PtrSize_t pkt; unsigned int nTrx=myds->sess->NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus); PSarrayOUT->add(pkt.ptr,pkt.size); sid++; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index a2b3e8daa..cef89d201 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -164,6 +164,8 @@ MySQL_Session::MySQL_Session() { qpo=NULL; command_counters=new StatCounters(15,10,false); healthy=1; + autocommit=true; + autocommit_on_hostgroup=-1; killed=false; admin=false; connections_handler=false; @@ -294,6 +296,7 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { sprintf(buf,"%u",last_insert_id); unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; MySQL_Data_Stream *myds=client_myds; MySQL_Protocol *myprot=&client_myds->myprot; myds->DSS=STATE_QUERY_SENT_DS; @@ -301,19 +304,20 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { myprot->generate_pkt_column_count(true,NULL,NULL,sid,1); sid++; myprot->generate_pkt_field(true,NULL,NULL,sid,(char *)"",(char *)"",(char *)"",(char *)"LAST_INSERT_ID()",(char *)"",33,15,MYSQL_TYPE_VAR_STRING,1,0x1f,false,0,NULL); sid++; myds->DSS=STATE_COLUMN_DEFINITION; - myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, 2 | setStatus); sid++; + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, setStatus); sid++; char **p=(char **)malloc(sizeof(char*)*1); unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*1); l[0]=strlen(buf);; p[0]=buf; myprot->generate_pkt_row(true,NULL,NULL,sid,1,l,p); sid++; myds->DSS=STATE_ROW; - myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, 2 | setStatus); sid++; + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, setStatus); sid++; myds->DSS=STATE_SLEEP; l_free(pkt->size,pkt->ptr); return true; } if (pkt->size==SELECT_VERSION_COMMENT_LEN+5 && strncmp((char *)SELECT_VERSION_COMMENT,(char *)pkt->ptr+5,pkt->size-5)==0) { + // FIXME: this doesn't return AUTOCOMMIT or IN_TRANS PtrSize_t pkt_2; pkt_2.size=PROXYSQL_VERSION_COMMENT_LEN; pkt_2.ptr=l_alloc(pkt_2.size); @@ -325,6 +329,7 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { return true; } if (pkt->size==strlen((char *)"select USER()")+5 && strncmp((char *)"select USER()",(char *)pkt->ptr+5,pkt->size-5)==0) { + // FIXME: this doesn't return AUTOCOMMIT or IN_TRANS char *query1=(char *)"SELECT \"%s\" AS 'USER()'"; char *query2=(char *)malloc(strlen(query1)+strlen(client_myds->myconn->userinfo->username)+10); sprintf(query2,query1,client_myds->myconn->userinfo->username); @@ -353,7 +358,8 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { client_myds->myconn->set_charset(c->nr); unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,0|setStatus,0,NULL); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); } client_myds->DSS=STATE_SLEEP; status=WAITING_CLIENT_DATA; @@ -361,6 +367,83 @@ bool MySQL_Session::handler_special_queries(PtrSize_t *pkt) { free(name); return true; } + + size_t sal=strlen("set autocommit"); + if ( pkt->size > 7+sal) { + if (strncasecmp((char *)"set autocommit",(char *)pkt->ptr+5,sal)==0) { + unsigned int i; + bool eq=false; + int fd=-1; // first digit + for (i=5+sal;isize;i++) { + char c=((char *)pkt->ptr)[i]; + if (c!='0' && c!='1' && c!=' ' && c!='=') return false; // found a not valid char + if (eq==false) { + if (c!=' ' && c!='=') return false; // found a not valid char + if (c=='=') eq=true; + } else { + if (c!='0' && c!='1' && c!=' ') return false; // found a not valid char + if (fd==-1) { + if (c=='0' || c=='1') { // found first digit + if (c=='0') + fd=0; + else + fd=1; + } + } else { + if (c=='0' || c=='1') { // found second digit + return false; + } + } + } + } + if (fd >= 0) { // we can set autocommit + // we immeditately process the number of transactions + unsigned int nTrx=NumActiveTransactions(); + if (fd==1 && autocommit==true) { + // nothing to do, return OK + goto __ret_autocommit_OK; + } + if (fd==1 && autocommit==false) { + if (nTrx) { + // there is an active transaction, we need to forward it + // because this can potentially close the transaction + autocommit=true; + autocommit_on_hostgroup=FindOneActiveTransaction(); + return false; + } else { + // as there is no active transaction, we do no need to forward it + // just change internal state + autocommit=false; + goto __ret_autocommit_OK; + } + } + + if (fd==0) { + autocommit=false; // we set it, no matter if already set or not + if (nTrx) { + // there is an active transaction, we need to forward it + // because this can potentially close the transaction + autocommit_on_hostgroup=FindOneActiveTransaction(); + return false; + } else { + // as there is no active transaction, we do no need to forward it + // just return OK + goto __ret_autocommit_OK; + } + } +__ret_autocommit_OK: + client_myds->DSS=STATE_QUERY_SENT_NET; + uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + l_free(pkt->size,pkt->ptr); + return true; + } + } + } + return false; } @@ -473,10 +556,13 @@ __get_pkts_from_client: RequestEnd(NULL); break; } + qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); - if (qpo) { - rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); - if (rc_break==true) { break; } + assert(qpo); // GloQPro->process_mysql_query() should always return a qpo + rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); + if (rc_break==true) { break; } + + if (autocommit_on_hostgroup>=0) { } mybe=find_or_create_backend(current_hostgroup); status=PROCESSING_QUERY; @@ -1390,6 +1476,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C if (v==1) { unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; client_myds->myprot.generate_pkt_EOF(true,NULL,NULL,1,0, setStatus ); } else { client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)""); @@ -1404,7 +1491,8 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C client_myds->setDSS_STATE_QUERY_SENT_NET(); unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2|setStatus,0,NULL); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); client_myds->DSS=STATE_SLEEP; } @@ -1462,14 +1550,16 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C client_myds->setDSS_STATE_QUERY_SENT_NET(); unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0|setStatus,NULL); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2,setStatus,NULL); client_myds->DSS=STATE_SLEEP; } else { l_free(pkt->size,pkt->ptr); client_myds->setDSS_STATE_QUERY_SENT_NET(); unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2|setStatus,0,NULL); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); client_myds->DSS=STATE_SLEEP; } } @@ -1531,6 +1621,16 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C current_hostgroup=qpo->destination_hostgroup; } } + if (autocommit_on_hostgroup >= 0) { + // the query is a "set autocommit=0" + // we set current_hostgroup=autocommit_on_hostgroup if possible + if (transaction_persistent_hostgroup == -1) { + if (qpo->destination_hostgroup==-1) { + current_hostgroup=autocommit_on_hostgroup; + } + } + autocommit_on_hostgroup=-1; // at the end, always reset autocommit_on_hostgroup to -1 + } return false; } @@ -1669,6 +1769,7 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My unsigned int num_rows = mysql_affected_rows(mysql); unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,num_rows,mysql->insert_id,mysql->server_status|setStatus,mysql->warning_count,mysql->info); } else { // error @@ -1695,7 +1796,8 @@ void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int af unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,2 | setStatus ); sid++; + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0, setStatus ); sid++; char **p=(char **)malloc(sizeof(char*)*result->columns); unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*result->columns); //p[0]="column test"; @@ -1720,7 +1822,8 @@ void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int af // no error, DML succeeded unsigned int nTrx=NumActiveTransactions(); uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,0|setStatus,0,NULL); + if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,setStatus,0,NULL); } myds->DSS=STATE_SLEEP; } @@ -1747,6 +1850,21 @@ unsigned int MySQL_Session::NumActiveTransactions() { return ret; } +int MySQL_Session::FindOneActiveTransaction() { + int ret=-1; + if (mybes==0) return ret; + MySQL_Backend *_mybe; + unsigned int i; + for (i=0; i < mybes->len; i++) { + _mybe=(MySQL_Backend *)mybes->index(i); + if (_mybe->server_myds) + if (_mybe->server_myds->myconn) + if (_mybe->server_myds->myconn->IsActiveTransaction()) + return (int)_mybe->server_myds->myconn->parent->myhgc->hid; + } + return ret; +} + unsigned long long MySQL_Session::IdleTime() { if (client_myds==0) return 0; if (status!=WAITING_CLIENT_DATA) return 0; diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 6d1d6478a..33a511284 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -922,6 +922,14 @@ bool MySQL_Connection::IsActiveTransaction() { return ret; } +bool MySQL_Connection::IsAutoCommit() { + bool ret=false; + if (mysql) { + ret = (mysql->server_status & SERVER_STATUS_AUTOCOMMIT); + } + return ret; +} + bool MySQL_Connection::MultiplexDisabled() { // status_flags stores information about the status of the connection // can be used to determine if multiplexing can be enabled or not