Improving mirroring performance

The commit introduces sessions caching, drastically reducing memory overhead in case of very high QPS
v1.4.0
René Cannaò 9 years ago
parent 5a1870098a
commit 13cade1f77

@ -183,5 +183,6 @@ class MySQL_Data_Stream
}
void free_mysql_real_query();
void reinit_queues();
void destroy_queues();
};
#endif /* __CLASS_MYSQL_DATA_STREAM_H */

@ -175,6 +175,7 @@ class MySQL_Thread
unsigned long long last_maintenance_time;
PtrArray *mysql_sessions;
PtrArray *mirror_queue_mysql_sessions;
PtrArray *mirror_queue_mysql_sessions_cache;
#ifdef IDLE_THREADS
PtrArray *idle_mysql_sessions;
PtrArray *resume_mysql_sessions;

@ -687,22 +687,29 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// int i=0;
// for (i=0;i<100;i++) {
MySQL_Session *newsess=new MySQL_Session();
newsess->client_myds = new MySQL_Data_Stream();
newsess->client_myds->DSS=STATE_SLEEP;
newsess->client_myds->sess=newsess;
newsess->client_myds->fd=0;
newsess->client_myds->myds_type=MYDS_FRONTEND;
newsess->client_myds->PSarrayOUT= new PtrSizeArray();
newsess->thread_session_id=__sync_fetch_and_add(&glovars.thread_id,1);
if (newsess->thread_session_id==0) {
MySQL_Session *newsess=NULL;
if (thread->mirror_queue_mysql_sessions_cache->len==0) {
newsess=new MySQL_Session();
newsess->client_myds = new MySQL_Data_Stream();
newsess->client_myds->DSS=STATE_SLEEP;
newsess->client_myds->sess=newsess;
newsess->client_myds->fd=0;
newsess->client_myds->myds_type=MYDS_FRONTEND;
newsess->client_myds->PSarrayOUT= new PtrSizeArray();
newsess->thread_session_id=__sync_fetch_and_add(&glovars.thread_id,1);
if (newsess->thread_session_id==0) {
newsess->thread_session_id=__sync_fetch_and_add(&glovars.thread_id,1);
}
newsess->status=WAITING_CLIENT_DATA;
MySQL_Connection *myconn=new MySQL_Connection;
newsess->client_myds->attach_connection(myconn);
newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess);
newsess->mirror=true;
newsess->client_myds->destroy_queues();
} else {
newsess=(MySQL_Session *)thread->mirror_queue_mysql_sessions_cache->remove_index_fast(0);
}
newsess->status=WAITING_CLIENT_DATA;
MySQL_Connection *myconn=new MySQL_Connection;
myconn->userinfo->set(client_myds->myconn->userinfo);
newsess->client_myds->attach_connection(myconn);
newsess->client_myds->myprot.init(&newsess->client_myds, newsess->client_myds->myconn->userinfo, newsess);
newsess->client_myds->myconn->userinfo->set(client_myds->myconn->userinfo);
newsess->to_process=1;
newsess->default_hostgroup=default_hostgroup;
if (qpo->mirror_hostgroup>= 0) {
@ -711,11 +718,17 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
newsess->mirror_hostgroup=default_hostgroup; // copy the default
}
newsess->mirror_flagOUT=qpo->mirror_flagOUT; // in the new session we copy the mirror flagOUT
newsess->default_schema=strdup(default_schema);
if (newsess->default_schema==NULL) {
newsess->default_schema=strdup(default_schema);
} else {
if (strcmp(newsess->default_schema,default_schema)) {
free(newsess->default_schema);
newsess->default_schema=strdup(default_schema);
}
}
newsess->mirrorPkt.size=pktH->size;
newsess->mirrorPkt.ptr=l_alloc(newsess->mirrorPkt.size);
memcpy(newsess->mirrorPkt.ptr,pktH->ptr,pktH->size);
newsess->mirror=true;
if (thread->mirror_queue_mysql_sessions->len==0) {
// there are no sessions in the queue, we try to execute immediately
@ -730,7 +743,14 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
//newsess->to_process=0;
if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
thread->unregister_session(thread->mysql_sessions->len-1);
delete newsess;
int l=mysql_thread___mirror_max_concurrency;
if (thread->mirror_queue_mysql_sessions->len*0.3 > l) l=thread->mirror_queue_mysql_sessions->len*0.3;
if (thread->mirror_queue_mysql_sessions_cache->len <= l) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
thread->mirror_queue_mysql_sessions_cache->add(newsess);
} else {
delete newsess;
}
}
}
} else {
@ -2138,6 +2158,7 @@ __get_pkts_from_client:
default:
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_CLIENT_DATA - STATE_UNKNOWN\n");
{
if (mirror==false) {
char buf[INET6_ADDRSTRLEN];
switch (client_myds->client_addr->sa_family) {
case AF_INET: {
@ -2154,7 +2175,8 @@ __get_pkts_from_client:
sprintf(buf, "localhost");
break;
}
proxy_error("Unexpected packet from client %s . Session_status: %d , client_status: %d Disconnecting it\n", buf, status, client_myds->status);
proxy_error("Unexpected packet from client %s . Session_status: %d , client_status: %d Disconnecting it\n", buf, status, client_myds->status);
}
}
return -1;
break;

@ -1942,6 +1942,15 @@ MySQL_Thread::~MySQL_Thread() {
mirror_queue_mysql_sessions=NULL;
}
if (mirror_queue_mysql_sessions_cache) {
while(mirror_queue_mysql_sessions_cache->len) {
MySQL_Session *sess=(MySQL_Session *)mirror_queue_mysql_sessions_cache->remove_index_fast(0);
delete sess;
}
delete mirror_queue_mysql_sessions_cache;
mirror_queue_mysql_sessions_cache=NULL;
}
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
if (idle_mysql_sessions) {
@ -2043,6 +2052,7 @@ bool MySQL_Thread::init() {
int i;
mysql_sessions = new PtrArray();
mirror_queue_mysql_sessions = new PtrArray();
mirror_queue_mysql_sessions_cache = new PtrArray();
cached_connections = new PtrArray();
assert(mysql_sessions);
@ -2216,7 +2226,14 @@ __run_skip_1:
newsess->handler(); // execute immediately
if (newsess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
unregister_session(mysql_sessions->len-1);
delete newsess;
int l=mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3;
if (mirror_queue_mysql_sessions_cache->len <= l) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
mirror_queue_mysql_sessions_cache->add(newsess);
} else {
delete newsess;
}
}
//newsess->to_process=0;
}
@ -2417,6 +2434,15 @@ __run_skip_1a:
mypolls.loop_counters->incr(curtime/1000000);
if (maintenance_loop) {
// house keeping
int l=mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions_cache->len > l) {
while (mirror_queue_mysql_sessions_cache->len > mirror_queue_mysql_sessions->len && mirror_queue_mysql_sessions_cache->len > l) {
MySQL_Session *newsess=(MySQL_Session *)mirror_queue_mysql_sessions_cache->remove_index_fast(0);
__sync_add_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
delete newsess;
}
}
GloQPro->update_query_processor_stats();
}
if (rc == -1 && errno == EINTR)
@ -2770,7 +2796,14 @@ void MySQL_Thread::process_all_sessions() {
if (sess->status==WAITING_CLIENT_DATA) { // the mirror session has completed
unregister_session(n);
n--;
delete sess;
int l=mysql_thread___mirror_max_concurrency;
if (mirror_queue_mysql_sessions->len*0.3 > l) l=mirror_queue_mysql_sessions->len*0.3;
if (mirror_queue_mysql_sessions_cache->len <= l) {
__sync_sub_and_fetch(&GloMTH->status_variables.mirror_sessions_current,1);
mirror_queue_mysql_sessions_cache->add(sess);
} else {
delete sess;
}
continue;
}
}

@ -77,12 +77,24 @@ uint64_t MySQL_Connection_userinfo::compute_hash() {
void MySQL_Connection_userinfo::set(char *u, char *p, char *s, char *sh1) {
if (u) {
if (username) free(username);
username=strdup(u);
if (username) {
if (strcmp(u,username)) {
free(username);
username=strdup(u);
}
} else {
username=strdup(u);
}
}
if (p) {
if (password) free(password);
password=strdup(p);
if (password) {
if (strcmp(p,password)) {
free(password);
password=strdup(p);
}
} else {
password=strdup(p);
}
}
if (s) {
if (schemaname) free(schemaname);

@ -876,3 +876,8 @@ void MySQL_Data_Stream::free_mysql_real_query() {
mysql_real_query.end();
}
}
void MySQL_Data_Stream::destroy_queues() {
queue_destroy(queueIN);
queue_destroy(queueOUT);
}

Loading…
Cancel
Save