diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index f4323f902..1470a28c6 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -85,6 +85,8 @@ class MySQL_Session int user_max_connections; bool client_authenticated; bool connections_handler; + bool mirror; + PtrSize_t mirrorPkt; bool stats; void (*admin_func) (MySQL_Session *arg, ProxySQL_Admin *, PtrSize_t *pkt); // int client_fd; diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 047581baf..0cb2e47e3 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -438,6 +438,9 @@ bool MySQL_Protocol::generate_statistics_response(bool send, void **ptr, unsigne //bool MySQL_Protocol::generate_pkt_EOF(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t warnings, uint16_t status) { bool MySQL_Protocol::generate_pkt_EOF(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t warnings, uint16_t status) { + if ((*myds)->sess->mirror==true) { + return true; + } mysql_hdr myhdr; myhdr.pkt_id=sequence_id; myhdr.pkt_length=5; @@ -473,6 +476,9 @@ bool MySQL_Protocol::generate_pkt_EOF(bool send, void **ptr, unsigned int *len, //bool MySQL_Protocol::generate_pkt_ERR(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t error_code, char *sql_state, char *sql_message) { bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t error_code, char *sql_state, char *sql_message) { + if ((*myds)->sess->mirror==true) { + return true; + } mysql_hdr myhdr; uint32_t sql_message_len=( sql_message ? strlen(sql_message) : 0 ); myhdr.pkt_id=sequence_id; @@ -511,7 +517,9 @@ bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len, //bool MySQL_Protocol::generate_pkt_OK(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, unsigned int affected_rows, unsigned int last_insert_id, uint16_t status, uint16_t warnings, char *msg) { bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, unsigned int affected_rows, uint64_t last_insert_id, uint16_t status, uint16_t warnings, char *msg) { - + if ((*myds)->sess->mirror==true) { + return true; + } char affected_rows_prefix; uint8_t affected_rows_len=mysql_encode_length(affected_rows, &affected_rows_prefix); char last_insert_id_prefix; @@ -570,6 +578,9 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u //bool MySQL_Protocol::generate_pkt_column_count(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint64_t count) { bool MySQL_Protocol::generate_pkt_column_count(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint64_t count) { + if ((*myds)->sess->mirror==true) { + return true; + } char count_prefix=0; uint8_t count_len=mysql_encode_length(count, &count_prefix); @@ -606,6 +617,9 @@ bool MySQL_Protocol::generate_pkt_column_count(bool send, void **ptr, unsigned i //bool MySQL_Protocol::generate_pkt_field(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue) { bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue) { + if ((*myds)->sess->mirror==true) { + return true; + } char *def=(char *)"def"; uint32_t def_strlen=strlen(def); char def_prefix; @@ -722,6 +736,9 @@ bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, } uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) { + if ((*myds)->sess->mirror==true) { + return true; + } int col=0; unsigned int rowlen=0; uint8_t pkt_sid=sequence_id; @@ -1261,9 +1278,14 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL mysql=_my; buffer=(unsigned char *)malloc(RESULTSET_BUFLEN); buffer_used=0; - myds=myprot->get_myds(); - sid=myds->pkt_sid+1; - PSarrayOUT = new PtrSizeArray(); + myds=NULL; + sid=0; + PSarrayOUT = NULL; + if (myprot) { // if myprot = NULL , this is a mirror + myds=myprot->get_myds(); + sid=myds->pkt_sid+1; + PSarrayOUT = new PtrSizeArray(); + } result=_res; resultset_size=0; num_rows=0; @@ -1271,6 +1293,9 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL PtrSize_t pkt; // immediately generate the first set of packets // columns count + if (myprot==NULL) { + return; // this is a mirror + } myprot->generate_pkt_column_count(false,&pkt.ptr,&pkt.size,sid,num_fields); sid++; PSarrayOUT->add(pkt.ptr,pkt.size); @@ -1312,8 +1337,15 @@ MySQL_ResultSet::~MySQL_ResultSet() { unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) { unsigned long *lengths=mysql_fetch_lengths(result); - unsigned int pkt_length; - sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row); + unsigned int pkt_length=0; + if (myprot) { + sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row); + } else { + unsigned int col=0; + for (col=0; colsess->NumActiveTransactions(); - uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; - myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus); - PSarrayOUT->add(pkt.ptr,pkt.size); - sid++; - resultset_size+=pkt.size; + if (myprot) { + buffer_to_PSarrayOut(); + unsigned int nTrx=myds->sess->NumActiveTransactions(); + uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT; + myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus); + PSarrayOUT->add(pkt.ptr,pkt.size); + sid++; + resultset_size+=pkt.size; + } resultset_completed=true; } bool MySQL_ResultSet::get_resultset(PtrSizeArray *PSarrayFinal) { transfer_started=true; - PSarrayFinal->copy_add(PSarrayOUT,0,PSarrayOUT->len); - while (PSarrayOUT->len) - PSarrayOUT->remove_index(PSarrayOUT->len-1,NULL); + if (myprot) { + PSarrayFinal->copy_add(PSarrayOUT,0,PSarrayOUT->len); + while (PSarrayOUT->len) + PSarrayOUT->remove_index(PSarrayOUT->len-1,NULL); + } return resultset_completed; } diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 46caed0e0..3c3a51e86 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -208,6 +208,9 @@ MySQL_Session::MySQL_Session() { //server_myds=NULL; to_process=0; mybe=NULL; + mirror=false; + mirrorPkt.ptr=NULL; + mirrorPkt.size=0; mybes= new PtrArray(4); set_status(NONE); @@ -238,7 +241,7 @@ MySQL_Session::~MySQL_Session() { } proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Shutdown Session %p\n" , this->thread, this, this); delete command_counters; - if (admin==false && connections_handler==false) { + if (admin==false && connections_handler==false && mirror==false) { __sync_fetch_and_sub(&MyHGM->status.client_connections,1); } } @@ -308,7 +311,11 @@ void MySQL_Session::writeout() { // FIXME: experimental //if (client_myds) client_myds->set_pollout(); //if (server_myds) server_myds->set_pollout(); - if (client_myds) client_myds->write_to_net_poll(); + if (client_myds) { + if (mirror==false) { + client_myds->write_to_net_poll(); + } + } //if (server_myds && server_myds->net_failure==false) server_myds->write_to_net_poll(); if (mybe) { if (mybe->server_myds) mybe->server_myds->write_to_net_poll(); @@ -549,14 +556,33 @@ int MySQL_Session::handler() { assert(mybe); assert(mybe->server_myds); goto handler_again; - //goto __exit_DSS__STATE_NOT_INITIALIZED; + } else { + if (mirror==true) { + if (mirrorPkt.ptr) { // this is the first time we call handler() + pkt.ptr=mirrorPkt.ptr; + pkt.size=mirrorPkt.size; + mirrorPkt.ptr=NULL; // this will prevent the copy to happen again + } else { + if (status==WAITING_CLIENT_DATA) { + // we are being called a second time with WAITING_CLIENT_DATA + return 0; + } + } + } } } __get_pkts_from_client: - for (j=0; jPSarrayIN->len;) { - client_myds->PSarrayIN->remove_index(0,&pkt); + //for (j=0; jPSarrayIN->len;) { + // implement a more complex logic to run even in case of mirror + // if client_myds , this is a regular client + // if client_myds == NULL , it is a mirror + // process mirror only status==WAITING_CLIENT_DATA + for (j=0; j< ( client_myds->PSarrayIN ? client_myds->PSarrayIN->len : 0) || (mirror==true && status==WAITING_CLIENT_DATA) ;) { + if (mirror==false) { + client_myds->PSarrayIN->remove_index(0,&pkt); + } //prot.parse_mysql_pkt(&pkt,client_myds); switch (status) { @@ -576,9 +602,10 @@ __get_pkts_from_client: break; case WAITING_CLIENT_DATA: + // this is handled only for real traffic, not mirror if (pkt.size==(0xFFFFFF+sizeof(mysql_hdr))) { // we are handling a multi-packet - switch (client_myds->DSS) { + switch (client_myds->DSS) { // real traffic only case STATE_SLEEP: client_myds->DSS=STATE_SLEEP_MULTI_PACKET; break; @@ -618,7 +645,7 @@ __get_pkts_from_client: } if (client_myds->DSS!=STATE_SLEEP) // if DSS==STATE_SLEEP , we continue break; - case STATE_SLEEP: + case STATE_SLEEP: // only this section can be executed ALSO by mirror command_counters->incr(thread->curtime/1000000); if (transaction_persistent_hostgroup==-1) { current_hostgroup=default_hostgroup; @@ -648,6 +675,33 @@ __get_pkts_from_client: rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); if (rc_break==true) { break; } + if (mirror==false) { + if (pkt.size < 1000000 && CurrentQuery.is_select_NOT_for_update()==true) { + // this is a prototype for creating a mirror, only for SELECT + MySQL_Session *newsess=new MySQL_Session(); + newsess->client_myds = new MySQL_Data_Stream(); + newsess->client_myds->DSS=STATE_SLEEP; + newsess->client_myds->sess=newsess; + newsess->client_myds->myds_type=MYDS_FRONTEND; + newsess->client_myds->PSarrayOUT= new PtrSizeArray();; + thread->register_session(newsess); + newsess->status=WAITING_CLIENT_DATA; + MySQL_Connection *myconn=new MySQL_Connection; + myconn->userinfo->set(client_myds->myconn->userinfo); + newsess->client_myds->attach_connection(myconn); + newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess); + newsess->to_process=1; + newsess->default_hostgroup=default_hostgroup; + newsess->default_schema=strdup(default_schema); + newsess->mirror=true; + newsess->mirrorPkt.size=pkt.size; + newsess->mirrorPkt.ptr=l_alloc(newsess->mirrorPkt.size); + memcpy(newsess->mirrorPkt.ptr,pkt.ptr,pkt.size); + newsess->handler(); // execute immediately + newsess->to_process=0; + } + } + if (autocommit_on_hostgroup>=0) { } mybe=find_or_create_backend(current_hostgroup); @@ -861,6 +915,7 @@ handler_again: NEXT_IMMEDIATE(CHANGING_SCHEMA); } } + if (mirror==false) { // do not care about autocommit and charset if mirror if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) { previous_status.push(PROCESSING_QUERY); NEXT_IMMEDIATE(CHANGING_CHARSET); @@ -880,6 +935,7 @@ handler_again: NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT); } } + } } status=PROCESSING_QUERY; mybe->server_myds->max_connect_time=0; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index b6874ebf0..7eae3835b 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -1690,6 +1690,14 @@ void MySQL_Thread::process_all_sessions() { } for (n=0; nlen; n++) { MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n); + if (sess->mirror==true) { // this is a mirror session + if (sess->status==WAITING_CLIENT_DATA) { // the mirror session has completed + unregister_session(n); + n--; + delete sess; + continue; + } + } if (maintenance_loop) { unsigned int numTrx=0; unsigned long long sess_time = sess->IdleTime(); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index b24278244..f02cf3824 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -584,7 +584,11 @@ handler_again: if (mysql_result==NULL) { NEXT_IMMEDIATE(ASYNC_QUERY_END); } else { - MyRS=new MySQL_ResultSet(&myds->sess->client_myds->myprot, mysql_result, mysql); + if (myds->sess->mirror==false) { + MyRS=new MySQL_ResultSet(&myds->sess->client_myds->myprot, mysql_result, mysql); + } else { + MyRS=new MySQL_ResultSet(NULL, mysql_result, mysql); + } async_fetch_row_start=false; NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); } diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index bb2e167b2..c47849007 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -676,6 +676,12 @@ int MySQL_Data_Stream::array2buffer() { int ret=0; unsigned int idx=0; bool cont=true; + if (sess) { + if (sess->mirror==true) { // if this is a mirror session, just empty it + idx=PSarrayOUT->len; + goto __exit_array2buffer; + } + } while (cont) { VALGRIND_DISABLE_ERROR_REPORTING; if (queue_available(queueOUT)==0) {