Implemented PROCESSLIST with Prepared Statements

pull/739/head
René Cannaò 10 years ago
parent 5125067643
commit e0c7ca209a

@ -133,6 +133,8 @@ class MySQL_STMT_Global_info {
private:
void compute_hash();
public:
uint64_t digest;
char * digest_text;
uint64_t hash;
char *username;
char *schemaname;

@ -26,6 +26,7 @@ class Query_Info {
MYSQL_STMT *mysql_stmt;
stmt_execute_metadata_t *stmt_meta;
uint32_t stmt_global_id;
MySQL_STMT_Global_info *stmt_info;
int QueryLength;
enum MYSQL_COM_QUERY_command MyComQueryCmd;

@ -261,7 +261,7 @@ class Query_Processor {
char * get_digest_text(SQP_par_t *qp);
uint64_t get_digest(SQP_par_t *qp);
void update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n);
void update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info);
unsigned long long query_parser_update_counters(MySQL_Session *sess, enum MYSQL_COM_QUERY_command c, SQP_par_t *qp, unsigned long long t);

@ -164,6 +164,7 @@ MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_stmt_id(
auto s=m.find(id);
if (s!=m.end()) {
ret=s->second;
__sync_fetch_and_add(&ret->ref_count,1); // increase reference count
}
if (lock) {
@ -181,6 +182,7 @@ MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_hash(uin
auto s=h.find(hash);
if (s!=h.end()) {
ret=s->second;
__sync_fetch_and_add(&ret->ref_count,1); // increase reference count
}
if (lock) {
@ -193,10 +195,12 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint32_t id, unsigned int h, char
statement_id=id;
hostgroup_id=h;
ref_count=0;
digest_text=NULL;
username=strdup(u);
schemaname=strdup(s);
query=(char *)malloc(ql);
query=(char *)malloc(ql+1);
memcpy(query,q,ql);
query[ql]='\0'; // add NULL byte
query_length=ql;
num_params=stmt->param_count;
num_columns=stmt->field_count;
@ -281,5 +285,8 @@ MySQL_STMT_Global_info::~MySQL_STMT_Global_info() {
free(params);
params=NULL;
}
if (digest_text) {
free(digest_text);
digest_text=NULL;
}
}

@ -78,6 +78,7 @@ Query_Info::Query_Info() {
QueryLength=0;
QueryParserArgs.digest_text=NULL;
QueryParserArgs.first_comment=NULL;
stmt_info=NULL;
}
Query_Info::~Query_Info() {
@ -87,6 +88,10 @@ Query_Info::~Query_Info() {
if (QueryPointer) {
//l_free(QueryLength+1,QueryPointer);
}
if (stmt_info) {
__sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count
stmt_info=NULL;
}
}
void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) {
@ -114,6 +119,10 @@ void Query_Info::end() {
__sync_add_and_fetch(&sess->thread->status_variables.queries_slow,1);
}
assert(mysql_stmt==NULL);
if (stmt_info) {
__sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count
stmt_info=NULL;
}
}
void Query_Info::init(unsigned char *_p, int len, bool mysql_header) {
@ -1508,6 +1517,7 @@ __get_pkts_from_client:
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info);
__sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
@ -1556,6 +1566,7 @@ __get_pkts_from_client:
status=WAITING_CLIENT_DATA;
break;
}
CurrentQuery.stmt_info=stmt_info;
//stmt_execute_metadata_t *stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info->num_params);
// we now take the metadata associated with STMT_EXECUTE from MySQL_STMTs_meta
@ -1571,6 +1582,8 @@ __get_pkts_from_client:
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Error in prepared statement execution");
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
__sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count
stmt_info=NULL;
break;
}
if (stmt_meta_found==false) {
@ -1873,6 +1886,10 @@ handler_again:
qpo->timeout,
qpo->delay,
true);
stmt_info->digest=CurrentQuery.QueryParserArgs.digest; // copy digest
if (CurrentQuery.QueryParserArgs.digest_text) {
stmt_info->digest_text=strdup(CurrentQuery.QueryParserArgs.digest_text);
}
stmid=stmt_info->statement_id;
//uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
/*

@ -2504,12 +2504,23 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() {
pta[9]=strdup(buf);
sprintf(buf,"%d", mc->parent->port);
pta[10]=strdup(buf);
if (mc->query.length) {
pta[13]=(char *)malloc(mc->query.length+1);
strncpy(pta[13],mc->query.ptr,mc->query.length);
pta[13][mc->query.length]='\0';
} else {
pta[13]=NULL;
if (sess->CurrentQuery.stmt_info==NULL) { // text protocol
if (mc->query.length) {
pta[13]=(char *)malloc(mc->query.length+1);
strncpy(pta[13],mc->query.ptr,mc->query.length);
pta[13][mc->query.length]='\0';
} else {
pta[13]=NULL;
}
} else { // prepared statement
MySQL_STMT_Global_info *si=sess->CurrentQuery.stmt_info;
if (si->query_length) {
pta[13]=(char *)malloc(si->query_length+1);
strncpy(pta[13],si->query,si->query_length);
pta[13][si->query_length]='\0';
} else {
pta[13]=NULL;
}
}
} else {
pta[7]=NULL;
@ -2538,6 +2549,9 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() {
case CHANGING_SCHEMA:
pta[11]=strdup("InitDB");
break;
case PROCESSING_STMT_EXECUTE:
pta[11]=strdup("Execute");
break;
default:
sprintf(buf,"%d", sess->status);
pta[11]=strdup(buf);

@ -913,7 +913,7 @@ unsigned long long Query_Processor::query_parser_update_counters(MySQL_Session *
unsigned long long ret=_thr_commands_counters[c]->add_time(t);
if (qp->digest_text) {
if (sess->CurrentQuery.stmt_info==NULL && qp->digest_text) {
// this code is executed only if digest_text is not NULL , that means mysql_thread___query_digests was true when the query started
uint64_t hash2;
SpookyHash myhash;
@ -930,13 +930,32 @@ unsigned long long Query_Processor::query_parser_update_counters(MySQL_Session *
myhash.Update(ui->schemaname,strlen(ui->schemaname));
myhash.Update(&sess->current_hostgroup,sizeof(sess->default_hostgroup));
myhash.Final(&qp->digest_total,&hash2);
update_query_digest(qp, sess->current_hostgroup, ui, t, sess->thread->curtime, NULL);
}
if (sess->CurrentQuery.stmt_info && sess->CurrentQuery.stmt_info->digest_text) {
uint64_t hash2;
SpookyHash myhash;
myhash.Init(19,3);
assert(sess);
assert(sess->client_myds);
assert(sess->client_myds->myconn);
assert(sess->client_myds->myconn->userinfo);
MySQL_Connection_userinfo *ui=sess->client_myds->myconn->userinfo;
assert(ui->username);
assert(ui->schemaname);
MySQL_STMT_Global_info *stmt_info=sess->CurrentQuery.stmt_info;
myhash.Update(ui->username,strlen(ui->username));
myhash.Update(&stmt_info->digest,sizeof(qp->digest));
myhash.Update(ui->schemaname,strlen(ui->schemaname));
myhash.Update(&sess->current_hostgroup,sizeof(sess->default_hostgroup));
myhash.Final(&qp->digest_total,&hash2);
//delete myhash;
update_query_digest(qp, sess->current_hostgroup, ui, t, sess->thread->curtime);
update_query_digest(qp, sess->current_hostgroup, ui, t, sess->thread->curtime, stmt_info);
}
return ret;
}
void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n) {
void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info) {
spin_wrlock(&digest_rwlock);
QP_query_digest_stats *qds;
@ -948,7 +967,11 @@ void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connecti
qds=(QP_query_digest_stats *)it->second;
qds->add_time(t,n);
} else {
qds=new QP_query_digest_stats(ui->username, ui->schemaname, qp->digest, qp->digest_text, hid);
if (_stmt_info==NULL) {
qds=new QP_query_digest_stats(ui->username, ui->schemaname, qp->digest, qp->digest_text, hid);
} else {
qds=new QP_query_digest_stats(ui->username, ui->schemaname, _stmt_info->digest, _stmt_info->digest_text, hid);
}
qds->add_time(t,n);
digest_umap.insert(std::make_pair(qp->digest_total,(void *)qds));
}

Loading…
Cancel
Save