From e0c7ca209a942bfd5ca58f9f94f32fb2b50ae8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Fri, 19 Aug 2016 23:35:04 +0000 Subject: [PATCH] Implemented PROCESSLIST with Prepared Statements --- include/MySQL_PreparedStatement.h | 2 ++ include/MySQL_Session.h | 1 + include/query_processor.h | 2 +- lib/MySQL_PreparedStatement.cpp | 11 +++++++++-- lib/MySQL_Session.cpp | 17 +++++++++++++++++ lib/MySQL_Thread.cpp | 26 ++++++++++++++++++++------ lib/Query_Processor.cpp | 31 +++++++++++++++++++++++++++---- 7 files changed, 77 insertions(+), 13 deletions(-) diff --git a/include/MySQL_PreparedStatement.h b/include/MySQL_PreparedStatement.h index cd5da0f0e..5303ded62 100644 --- a/include/MySQL_PreparedStatement.h +++ b/include/MySQL_PreparedStatement.h @@ -133,6 +133,8 @@ class MySQL_STMT_Global_info { private: void compute_hash(); public: + uint64_t digest; + char * digest_text; uint64_t hash; char *username; char *schemaname; diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index f95c56727..e438bdd83 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -26,6 +26,7 @@ class Query_Info { MYSQL_STMT *mysql_stmt; stmt_execute_metadata_t *stmt_meta; uint32_t stmt_global_id; + MySQL_STMT_Global_info *stmt_info; int QueryLength; enum MYSQL_COM_QUERY_command MyComQueryCmd; diff --git a/include/query_processor.h b/include/query_processor.h index 5906abedd..561b9b8e7 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -261,7 +261,7 @@ class Query_Processor { char * get_digest_text(SQP_par_t *qp); uint64_t get_digest(SQP_par_t *qp); - void update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n); + void update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info); unsigned long long query_parser_update_counters(MySQL_Session *sess, enum MYSQL_COM_QUERY_command c, SQP_par_t *qp, unsigned long long t); diff --git a/lib/MySQL_PreparedStatement.cpp b/lib/MySQL_PreparedStatement.cpp index 779e13652..749bf5fd2 100644 --- a/lib/MySQL_PreparedStatement.cpp +++ b/lib/MySQL_PreparedStatement.cpp @@ -164,6 +164,7 @@ MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_stmt_id( auto s=m.find(id); if (s!=m.end()) { ret=s->second; + __sync_fetch_and_add(&ret->ref_count,1); // increase reference count } if (lock) { @@ -181,6 +182,7 @@ MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_hash(uin auto s=h.find(hash); if (s!=h.end()) { ret=s->second; + __sync_fetch_and_add(&ret->ref_count,1); // increase reference count } if (lock) { @@ -193,10 +195,12 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint32_t id, unsigned int h, char statement_id=id; hostgroup_id=h; ref_count=0; + digest_text=NULL; username=strdup(u); schemaname=strdup(s); - query=(char *)malloc(ql); + query=(char *)malloc(ql+1); memcpy(query,q,ql); + query[ql]='\0'; // add NULL byte query_length=ql; num_params=stmt->param_count; num_columns=stmt->field_count; @@ -281,5 +285,8 @@ MySQL_STMT_Global_info::~MySQL_STMT_Global_info() { free(params); params=NULL; } - + if (digest_text) { + free(digest_text); + digest_text=NULL; + } } diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 931254f27..e38f7faff 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -78,6 +78,7 @@ Query_Info::Query_Info() { QueryLength=0; QueryParserArgs.digest_text=NULL; QueryParserArgs.first_comment=NULL; + stmt_info=NULL; } Query_Info::~Query_Info() { @@ -87,6 +88,10 @@ Query_Info::~Query_Info() { if (QueryPointer) { //l_free(QueryLength+1,QueryPointer); } + if (stmt_info) { + __sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count + stmt_info=NULL; + } } void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) { @@ -114,6 +119,10 @@ void Query_Info::end() { __sync_add_and_fetch(&sess->thread->status_variables.queries_slow,1); } assert(mysql_stmt==NULL); + if (stmt_info) { + __sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count + stmt_info=NULL; + } } void Query_Info::init(unsigned char *_p, int len, bool mysql_header) { @@ -1508,6 +1517,7 @@ __get_pkts_from_client: l_free(pkt.size,pkt.ptr); client_myds->setDSS_STATE_QUERY_SENT_NET(); client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info); + __sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count client_myds->DSS=STATE_SLEEP; status=WAITING_CLIENT_DATA; break; @@ -1556,6 +1566,7 @@ __get_pkts_from_client: status=WAITING_CLIENT_DATA; break; } + CurrentQuery.stmt_info=stmt_info; //stmt_execute_metadata_t *stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info->num_params); // we now take the metadata associated with STMT_EXECUTE from MySQL_STMTs_meta @@ -1571,6 +1582,8 @@ __get_pkts_from_client: client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Error in prepared statement execution"); client_myds->DSS=STATE_SLEEP; status=WAITING_CLIENT_DATA; + __sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count + stmt_info=NULL; break; } if (stmt_meta_found==false) { @@ -1873,6 +1886,10 @@ handler_again: qpo->timeout, qpo->delay, true); + stmt_info->digest=CurrentQuery.QueryParserArgs.digest; // copy digest + if (CurrentQuery.QueryParserArgs.digest_text) { + stmt_info->digest_text=strdup(CurrentQuery.QueryParserArgs.digest_text); + } stmid=stmt_info->statement_id; //uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); /* diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 47416efc5..f50c02819 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -2504,12 +2504,23 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() { pta[9]=strdup(buf); sprintf(buf,"%d", mc->parent->port); pta[10]=strdup(buf); - if (mc->query.length) { - pta[13]=(char *)malloc(mc->query.length+1); - strncpy(pta[13],mc->query.ptr,mc->query.length); - pta[13][mc->query.length]='\0'; - } else { - pta[13]=NULL; + if (sess->CurrentQuery.stmt_info==NULL) { // text protocol + if (mc->query.length) { + pta[13]=(char *)malloc(mc->query.length+1); + strncpy(pta[13],mc->query.ptr,mc->query.length); + pta[13][mc->query.length]='\0'; + } else { + pta[13]=NULL; + } + } else { // prepared statement + MySQL_STMT_Global_info *si=sess->CurrentQuery.stmt_info; + if (si->query_length) { + pta[13]=(char *)malloc(si->query_length+1); + strncpy(pta[13],si->query,si->query_length); + pta[13][si->query_length]='\0'; + } else { + pta[13]=NULL; + } } } else { pta[7]=NULL; @@ -2538,6 +2549,9 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() { case CHANGING_SCHEMA: pta[11]=strdup("InitDB"); break; + case PROCESSING_STMT_EXECUTE: + pta[11]=strdup("Execute"); + break; default: sprintf(buf,"%d", sess->status); pta[11]=strdup(buf); diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index bb62522c3..997a67558 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -913,7 +913,7 @@ unsigned long long Query_Processor::query_parser_update_counters(MySQL_Session * unsigned long long ret=_thr_commands_counters[c]->add_time(t); - if (qp->digest_text) { + if (sess->CurrentQuery.stmt_info==NULL && qp->digest_text) { // this code is executed only if digest_text is not NULL , that means mysql_thread___query_digests was true when the query started uint64_t hash2; SpookyHash myhash; @@ -930,13 +930,32 @@ unsigned long long Query_Processor::query_parser_update_counters(MySQL_Session * myhash.Update(ui->schemaname,strlen(ui->schemaname)); myhash.Update(&sess->current_hostgroup,sizeof(sess->default_hostgroup)); myhash.Final(&qp->digest_total,&hash2); + update_query_digest(qp, sess->current_hostgroup, ui, t, sess->thread->curtime, NULL); + } + if (sess->CurrentQuery.stmt_info && sess->CurrentQuery.stmt_info->digest_text) { + uint64_t hash2; + SpookyHash myhash; + myhash.Init(19,3); + assert(sess); + assert(sess->client_myds); + assert(sess->client_myds->myconn); + assert(sess->client_myds->myconn->userinfo); + MySQL_Connection_userinfo *ui=sess->client_myds->myconn->userinfo; + assert(ui->username); + assert(ui->schemaname); + MySQL_STMT_Global_info *stmt_info=sess->CurrentQuery.stmt_info; + myhash.Update(ui->username,strlen(ui->username)); + myhash.Update(&stmt_info->digest,sizeof(qp->digest)); + myhash.Update(ui->schemaname,strlen(ui->schemaname)); + myhash.Update(&sess->current_hostgroup,sizeof(sess->default_hostgroup)); + myhash.Final(&qp->digest_total,&hash2); //delete myhash; - update_query_digest(qp, sess->current_hostgroup, ui, t, sess->thread->curtime); + update_query_digest(qp, sess->current_hostgroup, ui, t, sess->thread->curtime, stmt_info); } return ret; } -void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n) { +void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info) { spin_wrlock(&digest_rwlock); QP_query_digest_stats *qds; @@ -948,7 +967,11 @@ void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connecti qds=(QP_query_digest_stats *)it->second; qds->add_time(t,n); } else { - qds=new QP_query_digest_stats(ui->username, ui->schemaname, qp->digest, qp->digest_text, hid); + if (_stmt_info==NULL) { + qds=new QP_query_digest_stats(ui->username, ui->schemaname, qp->digest, qp->digest_text, hid); + } else { + qds=new QP_query_digest_stats(ui->username, ui->schemaname, _stmt_info->digest, _stmt_info->digest_text, hid); + } qds->add_time(t,n); digest_umap.insert(std::make_pair(qp->digest_total,(void *)qds)); }