First draft on mirroring feature

Add new variable MySQL_Session::mirror (false by default)
Add new struct MySQL_Session::mirrorPkt (false by default)

These functions return immediately if this is a mirror session:
* MySQL_Protocol::generate_pkt_EOF()
* MySQL_Protocol::generate_pkt_ERR()
* MySQL_Protocol::generate_pkt_OK()
* MySQL_Protocol::generate_pkt_column_count()
* MySQL_Protocol::generate_pkt_field()
* MySQL_Protocol::generate_pkt_row3()

MySQL_ResultSet::MySQL_ResultSet() exits almost immediately if this is a mirror session

Most of the code of these functions is not executed if this is a mirror session:
* MySQL_ResultSet::add_eof()
* MySQL_ResultSet::get_resultset()

In MySQL_Session::writeout() :
* `client_myds->write_to_net_poll()` is called only if this is not a mirror session

In MySQL_Session::handler():
* if query is a `SELECT` (hardcoded for now) :
** create a new session and mark it as `mirror=true`
** duplicate the query and send it to the new session
* if `mirror==true` :
** pretend to read a new query from c`lient_myds`
** proceed normally (with the only exception that no data can be sent to a client)

In MySQL_Thread::process_all_sessions() :
* if `mirror==true` and `status==WAITING_CLIENT_DATA` it means the session is "completed" so it get destroyed

Limitations:
* it currently doesn't support large packets
* doesn't support different charset
* doesn't support transactions
pull/525/head
René Cannaò 10 years ago
parent baabb55273
commit c101fad268

@ -85,6 +85,8 @@ class MySQL_Session
int user_max_connections;
bool client_authenticated;
bool connections_handler;
bool mirror;
PtrSize_t mirrorPkt;
bool stats;
void (*admin_func) (MySQL_Session *arg, ProxySQL_Admin *, PtrSize_t *pkt);
// int client_fd;

@ -438,6 +438,9 @@ bool MySQL_Protocol::generate_statistics_response(bool send, void **ptr, unsigne
//bool MySQL_Protocol::generate_pkt_EOF(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t warnings, uint16_t status) {
bool MySQL_Protocol::generate_pkt_EOF(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t warnings, uint16_t status) {
if ((*myds)->sess->mirror==true) {
return true;
}
mysql_hdr myhdr;
myhdr.pkt_id=sequence_id;
myhdr.pkt_length=5;
@ -473,6 +476,9 @@ bool MySQL_Protocol::generate_pkt_EOF(bool send, void **ptr, unsigned int *len,
//bool MySQL_Protocol::generate_pkt_ERR(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t error_code, char *sql_state, char *sql_message) {
bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint16_t error_code, char *sql_state, char *sql_message) {
if ((*myds)->sess->mirror==true) {
return true;
}
mysql_hdr myhdr;
uint32_t sql_message_len=( sql_message ? strlen(sql_message) : 0 );
myhdr.pkt_id=sequence_id;
@ -511,7 +517,9 @@ bool MySQL_Protocol::generate_pkt_ERR(bool send, void **ptr, unsigned int *len,
//bool MySQL_Protocol::generate_pkt_OK(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, unsigned int affected_rows, unsigned int last_insert_id, uint16_t status, uint16_t warnings, char *msg) {
bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, unsigned int affected_rows, uint64_t last_insert_id, uint16_t status, uint16_t warnings, char *msg) {
if ((*myds)->sess->mirror==true) {
return true;
}
char affected_rows_prefix;
uint8_t affected_rows_len=mysql_encode_length(affected_rows, &affected_rows_prefix);
char last_insert_id_prefix;
@ -570,6 +578,9 @@ bool MySQL_Protocol::generate_pkt_OK(bool send, void **ptr, unsigned int *len, u
//bool MySQL_Protocol::generate_pkt_column_count(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint64_t count) {
bool MySQL_Protocol::generate_pkt_column_count(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint64_t count) {
if ((*myds)->sess->mirror==true) {
return true;
}
char count_prefix=0;
uint8_t count_len=mysql_encode_length(count, &count_prefix);
@ -606,6 +617,9 @@ bool MySQL_Protocol::generate_pkt_column_count(bool send, void **ptr, unsigned i
//bool MySQL_Protocol::generate_pkt_field(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue) {
bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue) {
if ((*myds)->sess->mirror==true) {
return true;
}
char *def=(char *)"def";
uint32_t def_strlen=strlen(def);
char def_prefix;
@ -722,6 +736,9 @@ bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len,
}
uint8_t MySQL_Protocol::generate_pkt_row3(MySQL_ResultSet *myrs, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) {
if ((*myds)->sess->mirror==true) {
return true;
}
int col=0;
unsigned int rowlen=0;
uint8_t pkt_sid=sequence_id;
@ -1261,9 +1278,14 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
mysql=_my;
buffer=(unsigned char *)malloc(RESULTSET_BUFLEN);
buffer_used=0;
myds=myprot->get_myds();
sid=myds->pkt_sid+1;
PSarrayOUT = new PtrSizeArray();
myds=NULL;
sid=0;
PSarrayOUT = NULL;
if (myprot) { // if myprot = NULL , this is a mirror
myds=myprot->get_myds();
sid=myds->pkt_sid+1;
PSarrayOUT = new PtrSizeArray();
}
result=_res;
resultset_size=0;
num_rows=0;
@ -1271,6 +1293,9 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
PtrSize_t pkt;
// immediately generate the first set of packets
// columns count
if (myprot==NULL) {
return; // this is a mirror
}
myprot->generate_pkt_column_count(false,&pkt.ptr,&pkt.size,sid,num_fields);
sid++;
PSarrayOUT->add(pkt.ptr,pkt.size);
@ -1312,8 +1337,15 @@ MySQL_ResultSet::~MySQL_ResultSet() {
unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) {
unsigned long *lengths=mysql_fetch_lengths(result);
unsigned int pkt_length;
sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row);
unsigned int pkt_length=0;
if (myprot) {
sid=myprot->generate_pkt_row3(this, &pkt_length, sid, num_fields, lengths, row);
} else {
unsigned int col=0;
for (col=0; col<num_fields; col++) {
pkt_length+=( row[col] ? lengths[col]+mysql_encode_length(lengths[col],NULL) : 1 );
}
}
sid++;
resultset_size+=pkt_length;
num_rows++;
@ -1322,22 +1354,26 @@ unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) {
void MySQL_ResultSet::add_eof() {
PtrSize_t pkt;
buffer_to_PSarrayOut();
unsigned int nTrx=myds->sess->NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus);
PSarrayOUT->add(pkt.ptr,pkt.size);
sid++;
resultset_size+=pkt.size;
if (myprot) {
buffer_to_PSarrayOut();
unsigned int nTrx=myds->sess->NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (myds->sess->autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
myprot->generate_pkt_EOF(false,&pkt.ptr,&pkt.size,sid,0,mysql->server_status|setStatus);
PSarrayOUT->add(pkt.ptr,pkt.size);
sid++;
resultset_size+=pkt.size;
}
resultset_completed=true;
}
bool MySQL_ResultSet::get_resultset(PtrSizeArray *PSarrayFinal) {
transfer_started=true;
PSarrayFinal->copy_add(PSarrayOUT,0,PSarrayOUT->len);
while (PSarrayOUT->len)
PSarrayOUT->remove_index(PSarrayOUT->len-1,NULL);
if (myprot) {
PSarrayFinal->copy_add(PSarrayOUT,0,PSarrayOUT->len);
while (PSarrayOUT->len)
PSarrayOUT->remove_index(PSarrayOUT->len-1,NULL);
}
return resultset_completed;
}

@ -208,6 +208,9 @@ MySQL_Session::MySQL_Session() {
//server_myds=NULL;
to_process=0;
mybe=NULL;
mirror=false;
mirrorPkt.ptr=NULL;
mirrorPkt.size=0;
mybes= new PtrArray(4);
set_status(NONE);
@ -238,7 +241,7 @@ MySQL_Session::~MySQL_Session() {
}
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Shutdown Session %p\n" , this->thread, this, this);
delete command_counters;
if (admin==false && connections_handler==false) {
if (admin==false && connections_handler==false && mirror==false) {
__sync_fetch_and_sub(&MyHGM->status.client_connections,1);
}
}
@ -308,7 +311,11 @@ void MySQL_Session::writeout() {
// FIXME: experimental
//if (client_myds) client_myds->set_pollout();
//if (server_myds) server_myds->set_pollout();
if (client_myds) client_myds->write_to_net_poll();
if (client_myds) {
if (mirror==false) {
client_myds->write_to_net_poll();
}
}
//if (server_myds && server_myds->net_failure==false) server_myds->write_to_net_poll();
if (mybe) {
if (mybe->server_myds) mybe->server_myds->write_to_net_poll();
@ -549,14 +556,33 @@ int MySQL_Session::handler() {
assert(mybe);
assert(mybe->server_myds);
goto handler_again;
//goto __exit_DSS__STATE_NOT_INITIALIZED;
} else {
if (mirror==true) {
if (mirrorPkt.ptr) { // this is the first time we call handler()
pkt.ptr=mirrorPkt.ptr;
pkt.size=mirrorPkt.size;
mirrorPkt.ptr=NULL; // this will prevent the copy to happen again
} else {
if (status==WAITING_CLIENT_DATA) {
// we are being called a second time with WAITING_CLIENT_DATA
return 0;
}
}
}
}
}
__get_pkts_from_client:
for (j=0; j<client_myds->PSarrayIN->len;) {
client_myds->PSarrayIN->remove_index(0,&pkt);
//for (j=0; j<client_myds->PSarrayIN->len;) {
// implement a more complex logic to run even in case of mirror
// if client_myds , this is a regular client
// if client_myds == NULL , it is a mirror
// process mirror only status==WAITING_CLIENT_DATA
for (j=0; j< ( client_myds->PSarrayIN ? client_myds->PSarrayIN->len : 0) || (mirror==true && status==WAITING_CLIENT_DATA) ;) {
if (mirror==false) {
client_myds->PSarrayIN->remove_index(0,&pkt);
}
//prot.parse_mysql_pkt(&pkt,client_myds);
switch (status) {
@ -576,9 +602,10 @@ __get_pkts_from_client:
break;
case WAITING_CLIENT_DATA:
// this is handled only for real traffic, not mirror
if (pkt.size==(0xFFFFFF+sizeof(mysql_hdr))) {
// we are handling a multi-packet
switch (client_myds->DSS) {
switch (client_myds->DSS) { // real traffic only
case STATE_SLEEP:
client_myds->DSS=STATE_SLEEP_MULTI_PACKET;
break;
@ -618,7 +645,7 @@ __get_pkts_from_client:
}
if (client_myds->DSS!=STATE_SLEEP) // if DSS==STATE_SLEEP , we continue
break;
case STATE_SLEEP:
case STATE_SLEEP: // only this section can be executed ALSO by mirror
command_counters->incr(thread->curtime/1000000);
if (transaction_persistent_hostgroup==-1) {
current_hostgroup=default_hostgroup;
@ -648,6 +675,33 @@ __get_pkts_from_client:
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt);
if (rc_break==true) { break; }
if (mirror==false) {
if (pkt.size < 1000000 && CurrentQuery.is_select_NOT_for_update()==true) {
// this is a prototype for creating a mirror, only for SELECT
MySQL_Session *newsess=new MySQL_Session();
newsess->client_myds = new MySQL_Data_Stream();
newsess->client_myds->DSS=STATE_SLEEP;
newsess->client_myds->sess=newsess;
newsess->client_myds->myds_type=MYDS_FRONTEND;
newsess->client_myds->PSarrayOUT= new PtrSizeArray();;
thread->register_session(newsess);
newsess->status=WAITING_CLIENT_DATA;
MySQL_Connection *myconn=new MySQL_Connection;
myconn->userinfo->set(client_myds->myconn->userinfo);
newsess->client_myds->attach_connection(myconn);
newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess);
newsess->to_process=1;
newsess->default_hostgroup=default_hostgroup;
newsess->default_schema=strdup(default_schema);
newsess->mirror=true;
newsess->mirrorPkt.size=pkt.size;
newsess->mirrorPkt.ptr=l_alloc(newsess->mirrorPkt.size);
memcpy(newsess->mirrorPkt.ptr,pkt.ptr,pkt.size);
newsess->handler(); // execute immediately
newsess->to_process=0;
}
}
if (autocommit_on_hostgroup>=0) {
}
mybe=find_or_create_backend(current_hostgroup);
@ -861,6 +915,7 @@ handler_again:
NEXT_IMMEDIATE(CHANGING_SCHEMA);
}
}
if (mirror==false) { // do not care about autocommit and charset if mirror
if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) {
previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE(CHANGING_CHARSET);
@ -880,6 +935,7 @@ handler_again:
NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT);
}
}
}
}
status=PROCESSING_QUERY;
mybe->server_myds->max_connect_time=0;

@ -1690,6 +1690,14 @@ void MySQL_Thread::process_all_sessions() {
}
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
if (sess->mirror==true) { // this is a mirror session
if (sess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
unregister_session(n);
n--;
delete sess;
continue;
}
}
if (maintenance_loop) {
unsigned int numTrx=0;
unsigned long long sess_time = sess->IdleTime();

@ -584,7 +584,11 @@ handler_again:
if (mysql_result==NULL) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
} else {
MyRS=new MySQL_ResultSet(&myds->sess->client_myds->myprot, mysql_result, mysql);
if (myds->sess->mirror==false) {
MyRS=new MySQL_ResultSet(&myds->sess->client_myds->myprot, mysql_result, mysql);
} else {
MyRS=new MySQL_ResultSet(NULL, mysql_result, mysql);
}
async_fetch_row_start=false;
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}

@ -676,6 +676,12 @@ int MySQL_Data_Stream::array2buffer() {
int ret=0;
unsigned int idx=0;
bool cont=true;
if (sess) {
if (sess->mirror==true) { // if this is a mirror session, just empty it
idx=PSarrayOUT->len;
goto __exit_array2buffer;
}
}
while (cont) {
VALGRIND_DISABLE_ERROR_REPORTING;
if (queue_available(queueOUT)==0) {

Loading…
Cancel
Save