From 41ef582fc5854978025c1fd62ce11120ec24ecb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Tue, 15 Nov 2022 20:32:27 +0100 Subject: [PATCH] Add new field 'stmt_id' to eventslog for 'STMT_PREPARE|STMT_EXECUTE' events --- include/MySQL_Logger.hpp | 2 ++ include/MySQL_Session.h | 1 + lib/MySQL_Logger.cpp | 23 +++++++++++++++++++++++ lib/MySQL_Session.cpp | 8 +++++++- 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/include/MySQL_Logger.hpp b/include/MySQL_Logger.hpp index 358ab4ec3..f1487b38d 100644 --- a/include/MySQL_Logger.hpp +++ b/include/MySQL_Logger.hpp @@ -30,12 +30,14 @@ class MySQL_Event { bool have_rows_sent; uint64_t affected_rows; uint64_t rows_sent; + uint32_t client_stmt_id; public: MySQL_Event(log_event_type _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len); uint64_t write(std::fstream *f, MySQL_Session *sess); uint64_t write_query_format_1(std::fstream *f); uint64_t write_query_format_2_json(std::fstream *f); void write_auth(std::fstream *f, MySQL_Session *sess); + void set_client_stmt_id(uint32_t client_stmt_id); void set_query(const char *ptr, int len); void set_server(int _hid, const char *ptr, int len); void set_extra_info(char *); diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 1cec60202..487252803 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -51,6 +51,7 @@ class Query_Info { MYSQL_STMT *mysql_stmt; stmt_execute_metadata_t *stmt_meta; uint64_t stmt_global_id; + uint64_t stmt_client_id; MySQL_STMT_Global_info *stmt_info; int QueryLength; diff --git a/lib/MySQL_Logger.cpp b/lib/MySQL_Logger.cpp index dfe63da34..fcb1f55f3 100644 --- a/lib/MySQL_Logger.cpp +++ b/lib/MySQL_Logger.cpp @@ -58,6 +58,11 @@ MySQL_Event::MySQL_Event (log_event_type _et, uint32_t _thread_id, char * _usern affected_rows=0; have_rows_sent=false; rows_sent=0; + client_stmt_id=0; +} + +void MySQL_Event::set_client_stmt_id(uint32_t client_stmt_id) { + this->client_stmt_id = client_stmt_id; } void MySQL_Event::set_affected_rows(uint64_t ar) { @@ -257,6 +262,7 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { total_bytes+=mysql_encode_length(start_time,NULL); total_bytes+=mysql_encode_length(end_time,NULL); + total_bytes+=mysql_encode_length(client_stmt_id,NULL); total_bytes+=mysql_encode_length(affected_rows,NULL); total_bytes+=mysql_encode_length(rows_sent,NULL); @@ -314,6 +320,12 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { write_encoded_length(buf,end_time,len,buf[0]); f->write((char *)buf,len); + if (et == PROXYSQL_COM_STMT_PREPARE || et == PROXYSQL_COM_STMT_EXECUTE) { + len=mysql_encode_length(client_stmt_id,buf); + write_encoded_length(buf,client_stmt_id,len,buf[0]); + f->write((char *)buf,len); + } + len=mysql_encode_length(affected_rows,buf); write_encoded_length(buf,affected_rows,len,buf[0]); f->write((char *)buf,len); @@ -406,6 +418,10 @@ uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) { sprintf(digest_hex,"0x%016llX", (long long unsigned int)query_digest); j["digest"] = digest_hex; + if (et == PROXYSQL_COM_STMT_PREPARE || et == PROXYSQL_COM_STMT_EXECUTE) { + j["client_stmt_id"] = client_stmt_id; + } + // for performance reason, we are moving the write lock // right before the write to disk //GloMyLogger->wrlock(); @@ -683,11 +699,18 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds) { case PROCESSING_STMT_EXECUTE: c = (char *)sess->CurrentQuery.stmt_info->query; ql = sess->CurrentQuery.stmt_info->query_length; + me.set_client_stmt_id(sess->CurrentQuery.stmt_client_id); break; case PROCESSING_STMT_PREPARE: default: c = (char *)sess->CurrentQuery.QueryPointer; ql = sess->CurrentQuery.QueryLength; + // NOTE: This needs to be located in the 'default' case because otherwise will miss state + // 'WAITING_CLIENT_DATA'. This state is possible when the prepared statement is found in the + // global cache and due to that we immediately reply to the client and session doesn't reach + // 'PROCESSING_STMT_PREPARE' state. 'stmt_client_id' is expected to be '0' for anything that isn't + // a prepared statement, still, logging should rely 'log_event_type' instead of this value. + me.set_client_stmt_id(sess->CurrentQuery.stmt_client_id); break; } if (c) { diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index de3b5d7d9..6fafe8745 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -278,6 +278,7 @@ Query_Info::Query_Info() { rows_sent=0; start_time=0; end_time=0; + stmt_client_id=0; } Query_Info::~Query_Info() { @@ -309,6 +310,7 @@ void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) { affected_rows=0; rows_sent=0; sess->gtid_hid=-1; + stmt_client_id=0; } void Query_Info::end() { @@ -547,6 +549,7 @@ MySQL_Session::MySQL_Session() { CurrentQuery.mysql_stmt=NULL; CurrentQuery.stmt_meta=NULL; CurrentQuery.stmt_global_id=0; + CurrentQuery.stmt_client_id=0; CurrentQuery.stmt_info=NULL; current_hostgroup=-1; @@ -3226,6 +3229,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // for this reason, we do not need to prepare it again, and we can already reply to the client // we will now generate a unique stmt and send it to the client uint32_t new_stmt_id=client_myds->myconn->local_stmts->generate_new_client_stmt_id(stmt_info->statement_id); + CurrentQuery.stmt_client_id=new_stmt_id; client_myds->setDSS_STATE_QUERY_SENT_NET(); client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info,new_stmt_id); LogQuery(NULL); @@ -3275,6 +3279,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C uint32_t client_stmt_id=0; uint64_t stmt_global_id=0; memcpy(&client_stmt_id,(char *)pkt.ptr+5,sizeof(uint32_t)); + CurrentQuery.stmt_client_id=client_stmt_id; stmt_global_id=client_myds->myconn->local_stmts->find_global_stmt_id_from_client(client_stmt_id); if (stmt_global_id == 0) { // FIXME: add error handling @@ -4088,7 +4093,7 @@ void MySQL_Session::SetQueryTimeout() { bool MySQL_Session::handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, MySQL_Data_Stream *myds, bool& prepared_stmt_with_no_params) { thread->status_variables.stvar[st_var_backend_stmt_prepare]++; GloMyStmt->wrlock(); - uint32_t client_stmtid; + uint32_t client_stmtid=0; uint64_t global_stmtid; //bool is_new; MySQL_STMT_Global_info *stmt_info=NULL; @@ -4112,6 +4117,7 @@ bool MySQL_Session::handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, if (previous_status.size() == 0) client_stmtid=client_myds->myconn->local_stmts->generate_new_client_stmt_id(global_stmtid); CurrentQuery.mysql_stmt=NULL; + CurrentQuery.stmt_client_id=client_stmtid; st=status; size_t sts=previous_status.size(); if (sts) {