Implementation of processEvents() and DUMP EVENTSLOG commands

MySQL_Logger::processEvents() is responsible for dumping mysql query events
from the circular buffer into SQLite tables.
DUMP EVENTSLOG commands trigger a call to processEvents():
* DUMP EVENTSLOG FROM BUFFER TO MEMORY: copies events to the in-memory SQLite table
* DUMP EVENTSLOG FROM BUFFER TO DISK: copies events to the on-disk SQLite table
* DUMP EVENTSLOG FROM BUFFER TO BOTH: copies events to both in-memory and on-disk
       SQLite tables simultaneously
v2.x-logging_mem
René Cannaò 2 years ago
parent d7a9254bbd
commit 23e3244cc5

@ -146,6 +146,7 @@ class MySQL_Logger {
unsigned int events_find_next_id();
unsigned int audit_find_next_id();
public:
int eventslog_table_memory_size;
MySQL_Logger();
~MySQL_Logger();
void print_version();
@ -162,7 +163,8 @@ class MySQL_Logger {
void wrlock();
void wrunlock();
MySQL_Logger_CircularBuffer * MyLogCB;
void insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector<MySQL_Event*>& events);
void insertMysqlEventsIntoDb(SQLite3DB * db, const std::string& tableName, size_t numEvents, std::vector<MySQL_Event*>::const_iterator begin);
int processEvents(SQLite3DB * statsdb , SQLite3DB * statsdb_disk);
};

@ -1180,16 +1180,18 @@ void MySQL_Logger_CircularBuffer::setBufferSize(size_t newSize) {
}
void MySQL_Logger::insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector<MySQL_Event*>& events){
void MySQL_Logger::insertMysqlEventsIntoDb(SQLite3DB * db, const std::string& tableName, size_t numEvents, std::vector<MySQL_Event*>::const_iterator begin){
int rc = 0;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
char *query1=NULL;
char *query32=NULL;
std::string query32s = "";
const int numcols = 17;
query1=(char *)"INSERT INTO stats_mysql_query_events VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)";
query32s = "INSERT INTO stats_mysql_query_events VALUES " + generate_multi_rows_query(32, numcols);
std::string query1s = "";
std::string query32s = "";
query1s = "INSERT INTO " + tableName + " VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)";
query32s = "INSERT INTO " + tableName + " VALUES " + generate_multi_rows_query(32, numcols);
query1 = (char *)query1s.c_str();
query32 = (char *)query32s.c_str();
rc = db->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, db);
@ -1201,9 +1203,9 @@ void MySQL_Logger::insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector<MyS
db->execute("BEGIN");
int row_idx=0;
int max_bulk_row_idx=events.size()/32;
int max_bulk_row_idx=numEvents/32;
max_bulk_row_idx=max_bulk_row_idx*32;
for (std::vector<MySQL_Event *>::const_iterator it = events.begin() ; it != events.end(); ++it) {
for (std::vector<MySQL_Event *>::const_iterator it = begin ; it != begin + numEvents; ++it) {
MySQL_Event *event = *it;
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
@ -1261,3 +1263,47 @@ void MySQL_Logger::insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector<MyS
(*proxy_sqlite3_finalize)(statement32);
db->execute("COMMIT");
}
int MySQL_Logger::processEvents(SQLite3DB * statsdb , SQLite3DB * statsdb_disk) {
std::vector<MySQL_Event*> events = {};
MyLogCB->get_all_events(events);
if (events.empty()) return 0;
if (statsdb_disk != nullptr) {
// Write to on-disk database first
insertMysqlEventsIntoDb(statsdb_disk, "history_mysql_query_events", events.size(), events.begin());
}
if (statsdb != nullptr) {
size_t maxInMemorySize = eventslog_table_memory_size;
size_t numEventsToInsert = std::min(events.size(), maxInMemorySize);
if (events.size() >= maxInMemorySize) {
// delete everything from stats_mysql_query_events
statsdb->execute("DELETE FROM stats_mysql_query_events");
} else {
// make enough room in stats_mysql_query_events
int current_rows = statsdb->return_one_int((char *)"SELECT COUNT(*) FROM stats_mysql_query_events");
int rows_to_keep = maxInMemorySize - events.size();
if (current_rows > rows_to_keep) {
int rows_to_delete = (current_rows - rows_to_keep);
string delete_stmt = "DELETE FROM stats_mysql_query_events ORDER BY id LIMIT " + to_string(rows_to_delete);
statsdb->execute(delete_stmt.c_str());
}
}
// Pass iterators to avoid copying
insertMysqlEventsIntoDb(statsdb, "stats_mysql_query_events", numEventsToInsert, events.begin());
}
// cleanup of all events
for (MySQL_Event* event : events) {
delete event;
}
size_t ret = events.size();
return ret;
}

@ -3784,6 +3784,32 @@ void admin_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) {
}
}
if (!strncasecmp("DUMP EVENTSLOG ", query_no_space, strlen("DUMP EVENTSLOG "))) {
int num_rows = 0;
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Received command DUMP EVENTSLOG: %s\n", query_no_space);
proxy_info("Received command DUMP EVENTSLOG: %s\n", query_no_space);
// Use a map for better efficiency and readability
std::map<std::string, std::pair<SQLite3DB*, SQLite3DB*>> commandMap = {
{"DUMP EVENTSLOG FROM BUFFER TO MEMORY", {SPA->statsdb, nullptr}},
{"DUMP EVENTSLOG FROM BUFFER TO DISK", {nullptr, SPA->statsdb_disk}},
{"DUMP EVENTSLOG FROM BUFFER TO BOTH", {SPA->statsdb, SPA->statsdb_disk}}
};
auto it = commandMap.find(query_no_space);
if (it != commandMap.end()) {
num_rows = GloMyLogger->processEvents(it->second.first, it->second.second);
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL, num_rows);
} else {
proxy_warning("Received invalid command DUMP EVENTSLOG: %s\n", query_no_space);
SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"Invalid DUMP EVENTSLOG command");
}
run_query = false;
goto __run_query;
}
// handle special queries from Cluster
// for bug #1188 , ProxySQL Admin needs to know the exact query

Loading…
Cancel
Save