Merge pull request #4922 from sysown/v3.0-4861_fixes1

Further enhancement in prepared statement logging #4861
pull/4851/head
René Cannaò 1 year ago committed by GitHub
commit edc84ea14b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -533,6 +533,7 @@ class MySQL_Threads_Handler
int eventslog_buffer_max_query_length;
int eventslog_default_log;
int eventslog_format;
int eventslog_stmt_parameters;
char *auditlog_filename;
int auditlog_filesize;
// SSL related, proxy to server

@ -40,7 +40,8 @@ enum log_event_type {
PROXYSQL_SQLITE_AUTH_CLOSE,
PROXYSQL_SQLITE_AUTH_QUIT,
PROXYSQL_COM_STMT_EXECUTE,
PROXYSQL_COM_STMT_PREPARE
PROXYSQL_COM_STMT_PREPARE,
PROXYSQL_METADATA
};
enum cred_username_type { USERNAME_BACKEND, USERNAME_FRONTEND, USERNAME_NONE };
@ -1315,6 +1316,7 @@ __thread int mysql_thread___eventslog_table_memory_size;
__thread int mysql_thread___eventslog_buffer_max_query_length;
__thread int mysql_thread___eventslog_default_log;
__thread int mysql_thread___eventslog_format;
__thread int mysql_thread___eventslog_stmt_parameters;
/* variables used by audit log */
__thread char * mysql_thread___auditlog_filename;
@ -1619,6 +1621,7 @@ extern __thread int mysql_thread___eventslog_table_memory_size;
extern __thread int mysql_thread___eventslog_buffer_max_query_length;
extern __thread int mysql_thread___eventslog_default_log;
extern __thread int mysql_thread___eventslog_format;
extern __thread int mysql_thread___eventslog_stmt_parameters;
/* variables used by audit log */
extern __thread char * mysql_thread___auditlog_filename;

@ -64,6 +64,7 @@ struct BufferTypeInfo {
// Helper lambda to convert binary data to a hex string.
auto binaryToHex = [](const MYSQL_BIND* bind, unsigned long len, std::string &out) {
std::ostringstream oss;
oss << "0x";
const unsigned char* data = reinterpret_cast<const unsigned char*>(bind->buffer);
for (unsigned long i = 0; i < len; i++) {
oss << std::setw(2) << std::setfill('0') << std::hex << (int)data[i];
@ -109,7 +110,7 @@ static const std::unordered_map<unsigned int, BufferTypeInfo> bufferTypeInfoMap
},
{ MYSQL_TYPE_TIMESTAMP,
{ "TIMESTAMP", [](const MYSQL_BIND* bind, unsigned long len, std::string &out) {
binaryToHex(bind, len, out);
binaryToHex(bind, sizeof(MYSQL_TIME), out);
}}
},
{ MYSQL_TYPE_LONGLONG,
@ -125,17 +126,17 @@ static const std::unordered_map<unsigned int, BufferTypeInfo> bufferTypeInfoMap
},
{ MYSQL_TYPE_DATE,
{ "DATE", [](const MYSQL_BIND* bind, unsigned long len, std::string &out) {
binaryToHex(bind, len, out);
binaryToHex(bind, sizeof(MYSQL_TIME), out);
}}
},
{ MYSQL_TYPE_TIME,
{ "TIME", [](const MYSQL_BIND* bind, unsigned long len, std::string &out) {
binaryToHex(bind, len, out);
binaryToHex(bind, sizeof(MYSQL_TIME), out);
}}
},
{ MYSQL_TYPE_DATETIME,
{ "DATETIME", [](const MYSQL_BIND* bind, unsigned long len, std::string &out) {
binaryToHex(bind, len, out);
binaryToHex(bind, sizeof(MYSQL_TIME), out);
}}
},
{ MYSQL_TYPE_YEAR,
@ -216,12 +217,12 @@ static const std::unordered_map<unsigned int, BufferTypeInfo> bufferTypeInfoMap
},
{ MYSQL_TYPE_VAR_STRING,
{ "VAR_STRING", [](const MYSQL_BIND* bind, unsigned long len, std::string &out) {
out.assign(reinterpret_cast<char*>(bind->buffer), len);
binaryToHex(bind, len, out);
}}
},
{ MYSQL_TYPE_STRING,
{ "STRING", [](const MYSQL_BIND* bind, unsigned long len, std::string &out) {
out.assign(reinterpret_cast<char*>(bind->buffer), len);
binaryToHex(bind, len, out);
}}
},
{ MYSQL_TYPE_GEOMETRY,
@ -523,6 +524,11 @@ uint64_t MySQL_Event::write(std::fstream *f, MySQL_Session *sess) {
case PROXYSQL_SQLITE_AUTH_QUIT:
write_auth(f, sess);
break;
case PROXYSQL_METADATA:
if (mysql_thread___eventslog_format==1) { // format 1 , binary
total_bytes=write_query_format_1(f);
}
break;
default:
break;
}
@ -740,7 +746,7 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) {
// Validate Session and Statement Metadata:
// The code checks whether the session pointer and the current query's statement metadata (stmt_meta)
// are non-null to ensure that parameter details are available.
if (session != nullptr && session->CurrentQuery.stmt_meta != nullptr) {
if (mysql_thread___eventslog_stmt_parameters > 0 && session != nullptr && session->CurrentQuery.stmt_meta != nullptr) {
stmt_execute_metadata_t *meta = session->CurrentQuery.stmt_meta;
uint16_t num_params = meta->num_params;
// Add bytes for encoded parameter count.
@ -757,6 +763,17 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) {
const MYSQL_BIND *bind = meta->binds ? &meta->binds[i] : nullptr;
if (bind != nullptr && !(meta->is_nulls && meta->is_nulls[i])) {
unsigned long len = meta->lengths ? meta->lengths[i] : 0;
auto bt = bind->buffer_type;
switch (bt) {
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME:
len = sizeof(MYSQL_TIME);
break;
default:
break;
}
// Use getValueForBind() to produce a string representation.
auto[valType, convVal] = getValueForBind(bind, len);
convertedValue = convVal;
@ -866,7 +883,7 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) {
// Validate Session and Statement Metadata:
// The code checks whether the session pointer and the current query's statement metadata (stmt_meta)
// are non-null to ensure that parameter details are available.
if (session != nullptr && session->CurrentQuery.stmt_meta != nullptr) {
if (mysql_thread___eventslog_stmt_parameters > 0 && session != nullptr && session->CurrentQuery.stmt_meta != nullptr) {
stmt_execute_metadata_t *meta = session->CurrentQuery.stmt_meta;
// Write the number of parameters.
// Writing the Encoded Parameter Count:
@ -914,6 +931,17 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) {
std::string convertedValue;
if (bind && bind->buffer && !(meta->is_nulls && meta->is_nulls[i])) {
unsigned long len = meta->lengths ? meta->lengths[i] : 0;
auto bt = bind->buffer_type;
switch (bt) {
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME:
len = sizeof(MYSQL_TIME);
break;
default:
break;
}
auto[valType, convVal] = getValueForBind(bind, len);
convertedValue = convVal;
} else {
@ -1073,7 +1101,9 @@ uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) {
}
if (et == PROXYSQL_COM_STMT_EXECUTE) {
if (session != nullptr) {
extractStmtExecuteMetadataToJson(j);
if (mysql_thread___eventslog_stmt_parameters != 0) {
extractStmtExecuteMetadataToJson(j);
}
}
}
@ -1250,6 +1280,26 @@ void MySQL_Logger::events_open_log_unlocked() {
try {
events.logfile->open(filen , std::ios::out | std::ios::binary);
proxy_info("Starting new mysql event log file %s\n", filen);
if (mysql_thread___eventslog_format == 1) {
// create a new event, type PROXYSQL_METADATA, that writes the ProxySQL version as part of the payload
json j = {};
j["version"] = string(PROXYSQL_VERSION);
string msg = j.dump();
MySQL_Event metaEvent(
PROXYSQL_METADATA, // event type for metadata
0, // thread_id (0 for metadata events)
(char*)msg.c_str(), // using "metadata" as the username
(char*)"", // empty schemaname
0, // start_time (current time)
0, // end_time (current time)
0, // query_digest not used for metadata
(char *)"", // client field holds the version string
0, // length of version string
nullptr // no session associated
);
metaEvent.set_query((char *)"",0);
metaEvent.write(events.logfile, nullptr);
}
}
catch (const std::ofstream::failure&) {
proxy_error("Error creating new mysql event log file %s\n", filen);

@ -358,6 +358,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"eventslog_buffer_max_query_length",
(char *)"eventslog_default_log",
(char *)"eventslog_format",
(char *)"eventslog_stmt_parameters",
(char *)"auditlog_filename",
(char *)"auditlog_filesize",
//(char *)"default_charset", // removed in 2.0.13 . Obsoleted previously using MySQL_Variables instead
@ -1083,6 +1084,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.eventslog_buffer_max_query_length = 32*1024;
variables.eventslog_default_log=0;
variables.eventslog_format=1;
variables.eventslog_stmt_parameters=0;
variables.auditlog_filename=strdup((char *)"");
variables.auditlog_filesize=100*1024*1024;
//variables.server_capabilities=CLIENT_FOUND_ROWS | CLIENT_PROTOCOL_41 | CLIENT_IGNORE_SIGPIPE | CLIENT_TRANSACTIONS | CLIENT_SECURE_CONNECTION | CLIENT_CONNECT_WITH_DB;
@ -2285,6 +2287,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["eventslog_table_memory_size"] = make_tuple(&variables.eventslog_table_memory_size, 0, 8*1024*1024, false);
VariablesPointers_int["eventslog_buffer_max_query_length"] = make_tuple(&variables.eventslog_buffer_max_query_length, 128, 32*1024*1024, false);
VariablesPointers_int["eventslog_default_log"] = make_tuple(&variables.eventslog_default_log, 0, 1, false);
VariablesPointers_int["eventslog_stmt_parameters"] = make_tuple(&variables.eventslog_stmt_parameters, 0, 1, false);
// various
VariablesPointers_int["long_query_time"] = make_tuple(&variables.long_query_time, 0, 20*24*3600*1000, false);
VariablesPointers_int["max_allowed_packet"] = make_tuple(&variables.max_allowed_packet, 8192, 1024*1024*1024, false);
@ -4194,6 +4197,7 @@ void MySQL_Thread::refresh_variables() {
REFRESH_VARIABLE_INT(eventslog_buffer_max_query_length);
REFRESH_VARIABLE_INT(eventslog_default_log);
REFRESH_VARIABLE_INT(eventslog_format);
REFRESH_VARIABLE_INT(eventslog_stmt_parameters);
REFRESH_VARIABLE_CHAR(eventslog_filename);
REFRESH_VARIABLE_INT(auditlog_filesize);
REFRESH_VARIABLE_CHAR(auditlog_filename);

@ -902,6 +902,7 @@ int main(int argc, char** argv) {
}
MYSQL_QUERY(admin, set_log_mode.c_str());
MYSQL_QUERY(admin, "SET mysql-eventslog_default_log=1");
MYSQL_QUERY(admin, "SET mysql-eventslog_stmt_parameters=1");
MYSQL_QUERY(admin, "LOAD MYSQL VARIABLES TO RUNTIME");
MYSQL_QUERY(proxy, "DROP TABLE IF EXISTS test.prepared_log_test");
MYSQL_QUERY(proxy, create_table_query_ext().c_str());

@ -11,314 +11,331 @@ using json = nlohmann::json;
// this is copied from proxysql_structs.h
enum log_event_type {
PROXYSQL_COM_QUERY,
PROXYSQL_MYSQL_AUTH_OK,
PROXYSQL_MYSQL_AUTH_ERR,
PROXYSQL_MYSQL_AUTH_CLOSE,
PROXYSQL_MYSQL_AUTH_QUIT,
PROXYSQL_MYSQL_CHANGE_USER_OK,
PROXYSQL_MYSQL_CHANGE_USER_ERR,
PROXYSQL_MYSQL_INITDB,
PROXYSQL_ADMIN_AUTH_OK,
PROXYSQL_ADMIN_AUTH_ERR,
PROXYSQL_ADMIN_AUTH_CLOSE,
PROXYSQL_ADMIN_AUTH_QUIT,
PROXYSQL_SQLITE_AUTH_OK,
PROXYSQL_SQLITE_AUTH_ERR,
PROXYSQL_SQLITE_AUTH_CLOSE,
PROXYSQL_SQLITE_AUTH_QUIT,
PROXYSQL_COM_STMT_EXECUTE,
PROXYSQL_COM_STMT_PREPARE
PROXYSQL_COM_QUERY,
PROXYSQL_MYSQL_AUTH_OK,
PROXYSQL_MYSQL_AUTH_ERR,
PROXYSQL_MYSQL_AUTH_CLOSE,
PROXYSQL_MYSQL_AUTH_QUIT,
PROXYSQL_MYSQL_CHANGE_USER_OK,
PROXYSQL_MYSQL_CHANGE_USER_ERR,
PROXYSQL_MYSQL_INITDB,
PROXYSQL_ADMIN_AUTH_OK,
PROXYSQL_ADMIN_AUTH_ERR,
PROXYSQL_ADMIN_AUTH_CLOSE,
PROXYSQL_ADMIN_AUTH_QUIT,
PROXYSQL_SQLITE_AUTH_OK,
PROXYSQL_SQLITE_AUTH_ERR,
PROXYSQL_SQLITE_AUTH_CLOSE,
PROXYSQL_SQLITE_AUTH_QUIT,
PROXYSQL_COM_STMT_EXECUTE,
PROXYSQL_COM_STMT_PREPARE,
PROXYSQL_METADATA
};
// Decodes a MySQL length-encoded integer from the stream.
// Returns true if decoding was successful.
bool decodeInt(std::istream &in, uint64_t &val) {
unsigned char first;
if (!in.read(reinterpret_cast<char*>(&first), 1))
return false;
if (first < 251) {
val = first;
return true;
} else if (first == 0xfc) {
uint16_t tmp;
if (!in.read(reinterpret_cast<char*>(&tmp), sizeof(tmp)))
return false;
val = tmp;
return true;
} else if (first == 0xfd) {
uint32_t tmp = 0;
char buf[3] = {0};
if (!in.read(buf, 3))
return false;
tmp = (unsigned char)buf[0] |
((unsigned char)buf[1] << 8) |
((unsigned char)buf[2] << 16);
val = tmp;
return true;
} else if (first == 0xfe) {
uint64_t tmp;
if (!in.read(reinterpret_cast<char*>(&tmp), sizeof(tmp)))
return false;
val = tmp;
return true;
}
return false;
unsigned char first;
if (!in.read(reinterpret_cast<char*>(&first), 1))
return false;
if (first < 251) {
val = first;
return true;
} else if (first == 0xfc) {
uint16_t tmp;
if (!in.read(reinterpret_cast<char*>(&tmp), sizeof(tmp)))
return false;
val = tmp;
return true;
} else if (first == 0xfd) {
uint32_t tmp = 0;
char buf[3] = {0};
if (!in.read(buf, 3))
return false;
tmp = (unsigned char)buf[0] |
((unsigned char)buf[1] << 8) |
((unsigned char)buf[2] << 16);
val = tmp;
return true;
} else if (first == 0xfe) {
uint64_t tmp;
if (!in.read(reinterpret_cast<char*>(&tmp), sizeof(tmp)))
return false;
val = tmp;
return true;
}
return false;
}
// Reads a fixed-size value from the stream.
template <typename T>
bool readValue(std::istream &in, T &value) {
in.read(reinterpret_cast<char*>(&value), sizeof(T));
return in.good();
in.read(reinterpret_cast<char*>(&value), sizeof(T));
return in.good();
}
// Reads raw bytes into a string.
std::string readString(std::istream &in, uint64_t length) {
std::vector<char> buf(length);
if (!in.read(buf.data(), length))
return "";
return std::string(buf.begin(), buf.end());
std::vector<char> buf(length);
if (!in.read(buf.data(), length))
return "";
return std::string(buf.begin(), buf.end());
}
json parseEvent(std::istream &in, bool verbose = true) {
json j;
// Read total_bytes (first 8 bytes)
uint64_t total_bytes = 0;
if (!readValue(in, total_bytes))
throw std::runtime_error("Cannot read total_bytes");
// j["total_bytes"] = total_bytes; // optional
if (verbose == true) {
std::cout << "total_bytes: " << total_bytes << std::endl;
}
json j;
// Read total_bytes (first 8 bytes)
uint64_t total_bytes = 0;
if (!readValue(in, total_bytes))
throw std::runtime_error("Cannot read total_bytes");
// j["total_bytes"] = total_bytes; // optional
if (verbose == true) {
std::cout << "total_bytes: " << total_bytes << std::endl;
}
// Read event type (1 byte)
char etb;
enum log_event_type et;
std::string et_name = "";
// Read event type (1 byte)
char etb;
enum log_event_type et;
std::string et_name = "";
if (!readValue(in, etb))
throw std::runtime_error("Cannot read event type");
et = static_cast<log_event_type>(etb);
switch (et) {
case PROXYSQL_COM_STMT_EXECUTE:
et_name="COM_STMT_EXECUTE";
break;
case PROXYSQL_COM_STMT_PREPARE:
et_name="COM_STMT_PREPARE";
break;
default:
et_name="COM_QUERY";
break;
}
if (!readValue(in, etb))
throw std::runtime_error("Cannot read event type");
et = static_cast<log_event_type>(etb);
switch (et) {
case PROXYSQL_COM_STMT_EXECUTE:
et_name="COM_STMT_EXECUTE";
break;
case PROXYSQL_COM_STMT_PREPARE:
et_name="COM_STMT_PREPARE";
break;
case PROXYSQL_METADATA:
et_name="METADATA";
break;
default:
et_name="COM_QUERY";
break;
}
j["event_type"] = et_name;
if (verbose == true) {
std::cout << "event_type: " << static_cast<int>(et) << std::endl;
}
j["event_type"] = et_name;
if (verbose == true) {
std::cout << "event_type: " << static_cast<int>(et) << std::endl;
}
// Read thread_id
uint64_t thread_id = 0;
if (!decodeInt(in, thread_id))
throw std::runtime_error("Error decoding thread_id");
j["thread_id"] = thread_id;
if (verbose == true) {
std::cout << "thread_id: " << thread_id << std::endl;
}
// Read thread_id
uint64_t thread_id = 0;
if (!decodeInt(in, thread_id))
throw std::runtime_error("Error decoding thread_id");
if (et != PROXYSQL_METADATA) {
j["thread_id"] = thread_id;
if (verbose == true) {
std::cout << "thread_id: " << thread_id << std::endl;
}
}
// Username: first read its length then the raw string.
uint64_t username_len = 0;
if (!decodeInt(in, username_len))
throw std::runtime_error("Error decoding username length");
std::string username = readString(in, username_len);
j["username"] = username;
if (verbose == true) {
std::cout << "username: " << username << std::endl;
}
// Username: first read its length then the raw string.
uint64_t username_len = 0;
if (!decodeInt(in, username_len))
throw std::runtime_error("Error decoding username length");
std::string username = readString(in, username_len);
if (et != PROXYSQL_METADATA) {
j["username"] = username;
if (verbose == true) {
std::cout << "username: " << username << std::endl;
}
} else {
json metadata = json::parse(username);
j["metadata"] = metadata;
}
// Schemaname
uint64_t schemaname_len = 0;
if (!decodeInt(in, schemaname_len))
throw std::runtime_error("Error decoding schemaname length");
std::string schemaname = readString(in, schemaname_len);
j["schemaname"] = schemaname;
if (verbose == true) {
std::cout << "schemaname: " << schemaname << std::endl;
}
// Schemaname
uint64_t schemaname_len = 0;
if (!decodeInt(in, schemaname_len))
throw std::runtime_error("Error decoding schemaname length");
std::string schemaname = readString(in, schemaname_len);
if (et != PROXYSQL_METADATA) {
j["schemaname"] = schemaname;
if (verbose == true) {
std::cout << "schemaname: " << schemaname << std::endl;
}
}
// Client string
uint64_t client_len = 0;
if (!decodeInt(in, client_len))
throw std::runtime_error("Error decoding client length");
std::string client = readString(in, client_len);
if (et != PROXYSQL_METADATA) {
j["client"] = client;
if (verbose == true) {
std::cout << "client: " << client << std::endl;
}
}
// Host id (hid)
uint64_t hid = 0;
if (!decodeInt(in, hid))
throw std::runtime_error("Error decoding hid");
if (et != PROXYSQL_METADATA) {
j["hid"] = hid;
if (verbose == true) {
std::cout << "hid: " << hid << std::endl;
}
}
// If hid != UINT64_MAX then read server string.
if (hid != UINT64_MAX) {
uint64_t server_len = 0;
if (!decodeInt(in, server_len))
throw std::runtime_error("Error decoding server length");
std::string server = readString(in, server_len);
j["server"] = server;
if (verbose == true) {
std::cout << "server: " << server << std::endl;
}
}
// Client string
uint64_t client_len = 0;
if (!decodeInt(in, client_len))
throw std::runtime_error("Error decoding client length");
std::string client = readString(in, client_len);
j["client"] = client;
if (verbose == true) {
std::cout << "client: " << client << std::endl;
}
// Start time and End time
uint64_t start_time = 0, end_time = 0;
if (!decodeInt(in, start_time))
throw std::runtime_error("Error decoding start_time");
if (!decodeInt(in, end_time))
throw std::runtime_error("Error decoding end_time");
if (et != PROXYSQL_METADATA) {
j["start_time"] = start_time;
j["end_time"] = end_time;
if (verbose == true) {
std::cout << "start_time: " << start_time << ", end_time: " << end_time << std::endl;
}
}
// Client statement id (only for COM_STMT_PREPARE/EXECUTE events)
uint64_t client_stmt_id = 0;
if (et == PROXYSQL_COM_STMT_PREPARE || et == PROXYSQL_COM_STMT_EXECUTE) {
if (!decodeInt(in, client_stmt_id))
throw std::runtime_error("Error decoding client_stmt_id");
j["client_stmt_id"] = client_stmt_id;
if (verbose == true) {
std::cout << "client_stmt_id: " << client_stmt_id << std::endl;
}
}
// Host id (hid)
uint64_t hid = 0;
if (!decodeInt(in, hid))
throw std::runtime_error("Error decoding hid");
j["hid"] = hid;
if (verbose == true) {
std::cout << "hid: " << hid << std::endl;
}
// affected_rows, last_insert_id, rows_sent, query_digest
uint64_t affected_rows, last_insert_id, rows_sent, query_digest;
if (!decodeInt(in, affected_rows))
throw std::runtime_error("Error decoding affected_rows");
if (!decodeInt(in, last_insert_id))
throw std::runtime_error("Error decoding last_insert_id");
if (!decodeInt(in, rows_sent))
throw std::runtime_error("Error decoding rows_sent");
if (!decodeInt(in, query_digest))
throw std::runtime_error("Error decoding query_digest");
if (et != PROXYSQL_METADATA) {
j["affected_rows"] = affected_rows;
j["last_insert_id"] = last_insert_id;
j["rows_sent"] = rows_sent;
j["query_digest"] = query_digest;
if (verbose == true) {
std::cout << "affected_rows: " << affected_rows
<< ", last_insert_id: " << last_insert_id
<< ", rows_sent: " << rows_sent
<< ", query_digest: " << query_digest << std::endl;
}
}
// Query: first read its length then raw query bytes.
uint64_t query_len = 0;
if (!decodeInt(in, query_len))
throw std::runtime_error("Error decoding query length");
std::string query = readString(in, query_len);
if (et != PROXYSQL_METADATA) {
j["query"] = query;
if (verbose == true) {
std::cout << "query: " << query << std::endl;
}
}
// If the event is COM_STMT_EXECUTE then read parameter block.
if (et == PROXYSQL_COM_STMT_EXECUTE) {
// Read parameters count
uint64_t num_params;
if (!decodeInt(in, num_params))
throw std::runtime_error("Error decoding parameter count");
if (verbose == true) {
std::cout << "num_params: " << num_params << std::endl;
}
// If hid != UINT64_MAX then read server string.
if (hid != UINT64_MAX) {
uint64_t server_len = 0;
if (!decodeInt(in, server_len))
throw std::runtime_error("Error decoding server length");
std::string server = readString(in, server_len);
j["server"] = server;
if (verbose == true) {
std::cout << "server: " << server << std::endl;
}
}
json jparams = json::array();
// Calculate null bitmap size
size_t bitmap_size = (num_params + 7) / 8;
std::vector<unsigned char> null_bitmap(bitmap_size);
if (!in.read(reinterpret_cast<char*>(null_bitmap.data()), bitmap_size))
throw std::runtime_error("Error reading null bitmap");
if (verbose == true) {
std::cout << "null_bitmap size: " << bitmap_size << std::endl;
}
// Start time and End time
uint64_t start_time = 0, end_time = 0;
if (!decodeInt(in, start_time))
throw std::runtime_error("Error decoding start_time");
if (!decodeInt(in, end_time))
throw std::runtime_error("Error decoding end_time");
j["start_time"] = start_time;
j["end_time"] = end_time;
if (verbose == true) {
std::cout << "start_time: " << start_time << ", end_time: " << end_time << std::endl;
}
// Client statement id (only for COM_STMT_PREPARE/EXECUTE events)
uint64_t client_stmt_id = 0;
if (et == PROXYSQL_COM_STMT_PREPARE || et == PROXYSQL_COM_STMT_EXECUTE) {
if (!decodeInt(in, client_stmt_id))
throw std::runtime_error("Error decoding client_stmt_id");
j["client_stmt_id"] = client_stmt_id;
if (verbose == true) {
std::cout << "client_stmt_id: " << client_stmt_id << std::endl;
}
}
// affected_rows, last_insert_id, rows_sent, query_digest
uint64_t affected_rows, last_insert_id, rows_sent, query_digest;
if (!decodeInt(in, affected_rows))
throw std::runtime_error("Error decoding affected_rows");
if (!decodeInt(in, last_insert_id))
throw std::runtime_error("Error decoding last_insert_id");
if (!decodeInt(in, rows_sent))
throw std::runtime_error("Error decoding rows_sent");
if (!decodeInt(in, query_digest))
throw std::runtime_error("Error decoding query_digest");
j["affected_rows"] = affected_rows;
j["last_insert_id"] = last_insert_id;
j["rows_sent"] = rows_sent;
j["query_digest"] = query_digest;
if (verbose == true) {
std::cout << "affected_rows: " << affected_rows
<< ", last_insert_id: " << last_insert_id
<< ", rows_sent: " << rows_sent
<< ", query_digest: " << query_digest << std::endl;
}
// Query: first read its length then raw query bytes.
uint64_t query_len = 0;
if (!decodeInt(in, query_len))
throw std::runtime_error("Error decoding query length");
std::string query = readString(in, query_len);
j["query"] = query;
if (verbose == true) {
std::cout << "query: " << query << std::endl;
}
// If the event is COM_STMT_EXECUTE then read parameter block.
if (et == PROXYSQL_COM_STMT_EXECUTE) {
// Read parameters count
uint64_t num_params;
if (!decodeInt(in, num_params))
throw std::runtime_error("Error decoding parameter count");
if (verbose == true) {
std::cout << "num_params: " << num_params << std::endl;
}
json jparams = json::array();
// Calculate null bitmap size
size_t bitmap_size = (num_params + 7) / 8;
std::vector<unsigned char> null_bitmap(bitmap_size);
if (!in.read(reinterpret_cast<char*>(null_bitmap.data()), bitmap_size))
throw std::runtime_error("Error reading null bitmap");
if (verbose == true) {
std::cout << "null_bitmap size: " << bitmap_size << std::endl;
}
for (uint16_t i = 0; i < num_params; i++) {
json jparam;
// Read parameter type (2 bytes)
uint16_t param_type;
if (!readValue(in, param_type))
throw std::runtime_error("Error reading parameter type");
jparam["type"] = param_type;
if (verbose == true) {
std::cout << "param[" << i << "] type: " << param_type << std::endl;
}
// Check if parameter is NULL using null bitmap.
bool isNull = false;
if (i < num_params) {
isNull = (null_bitmap[i / 8] & (1 << (i % 8))) != 0;
}
if (isNull) {
jparam["value"] = nullptr;
if (verbose == true) {
std::cout << "param[" << i << "] is NULL" << std::endl;
}
} else {
// Read encoded length of parameter value
uint64_t param_value_len;
if (!decodeInt(in, param_value_len))
throw std::runtime_error("Error decoding param value length");
if (verbose == true) {
std::cout << "param[" << i << "] value length: " << param_value_len << std::endl;
}
// Read raw parameter value bytes.
std::string param_value = readString(in, param_value_len);
jparam["value"] = param_value;
if (verbose == true) {
std::cout << "param[" << i << "] value: " << param_value << std::endl;
}
}
jparams.push_back(jparam);
}
j["parameters"] = jparams;
}
return j;
for (uint16_t i = 0; i < num_params; i++) {
json jparam;
// Read parameter type (2 bytes)
uint16_t param_type;
if (!readValue(in, param_type))
throw std::runtime_error("Error reading parameter type");
jparam["type"] = param_type;
if (verbose == true) {
std::cout << "param[" << i << "] type: " << param_type << std::endl;
}
// Check if parameter is NULL using null bitmap.
bool isNull = false;
if (i < num_params) {
isNull = (null_bitmap[i / 8] & (1 << (i % 8))) != 0;
}
if (isNull) {
jparam["value"] = nullptr;
if (verbose == true) {
std::cout << "param[" << i << "] is NULL" << std::endl;
}
} else {
// Read encoded length of parameter value
uint64_t param_value_len;
if (!decodeInt(in, param_value_len))
throw std::runtime_error("Error decoding param value length");
if (verbose == true) {
std::cout << "param[" << i << "] value length: " << param_value_len << std::endl;
}
// Read raw parameter value bytes.
std::string param_value = readString(in, param_value_len);
jparam["value"] = param_value;
if (verbose == true) {
std::cout << "param[" << i << "] value: " << param_value << std::endl;
}
}
jparams.push_back(jparam);
}
j["parameters"] = jparams;
}
return j;
}
int main(int argc, char* argv[]) {
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " <binary_log_file>" << std::endl;
return 1;
}
std::ifstream infile(argv[1], std::ios::binary);
if (!infile.is_open()) {
std::cerr << "Failed to open file: " << argv[1] << std::endl;
return 1;
}
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " <binary_log_file>" << std::endl;
return 1;
}
std::ifstream infile(argv[1], std::ios::binary);
if (!infile.is_open()) {
std::cerr << "Failed to open file: " << argv[1] << std::endl;
return 1;
}
//const bool verbose = true;
const bool verbose = false;
json jEvents = json::array();
// Read events until end-of-file.
while (infile.peek() != EOF) {
try {
json jEvent = parseEvent(infile, verbose);
jEvents.push_back(jEvent);
} catch (const std::exception &ex) {
// Break on error or end of file.
break;
}
}
std::cout << jEvents.dump(4) << std::endl;
return 0;
//const bool verbose = true;
const bool verbose = false;
json jEvents = json::array();
// Read events until end-of-file.
while (infile.peek() != EOF) {
try {
json jEvent = parseEvent(infile, verbose);
jEvents.push_back(jEvent);
} catch (const std::exception &ex) {
// Break on error or end of file.
break;
}
}
std::cout << jEvents.dump(4) << std::endl;
return 0;
}

Loading…
Cancel
Save