First implementation of insertMysqlEventsIntoDb()

MySQL_Logger::insertMysqlEventsIntoDb() populate table `stats_mysql_query_events`
using mysql query events in the circular buffer
v2.x-logging_mem
René Cannaò 2 years ago
parent 754f1e52f2
commit efb95ca87e

@ -6,6 +6,8 @@
#define PROXYSQL_LOGGER_PTHREAD_MUTEX
class MySQL_Logger;
class MySQL_Event {
private:
uint32_t thread_id;
@ -51,6 +53,7 @@ class MySQL_Event {
void set_affected_rows(uint64_t ar, uint64_t lid);
void set_rows_sent(uint64_t rs);
void set_gtid(MySQL_Session *sess);
friend MySQL_Logger;
};
/**
@ -159,6 +162,7 @@ class MySQL_Logger {
void wrlock();
void wrunlock();
MySQL_Logger_CircularBuffer * MyLogCB;
void insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector<MySQL_Event*>& events);
};

@ -1145,7 +1145,7 @@ void MySQL_Logger::print_version() {
fprintf(stderr,"Standard ProxySQL MySQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__);
};
MySQL_Logger_CircularBuffer::MySQL_Logger_CircularBuffer(size_t size) : buffer_size(size), event_buffer(size) {}
MySQL_Logger_CircularBuffer::MySQL_Logger_CircularBuffer(size_t size) : event_buffer(size), buffer_size(size) {}
MySQL_Logger_CircularBuffer::~MySQL_Logger_CircularBuffer() {
std::lock_guard<std::mutex> lock(mutex);
@ -1178,3 +1178,86 @@ void MySQL_Logger_CircularBuffer::setBufferSize(size_t newSize) {
std::lock_guard<std::mutex> lock(mutex);
buffer_size = newSize;
}
void MySQL_Logger::insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector<MySQL_Event*>& events){
int rc = 0;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
char *query1=NULL;
char *query32=NULL;
std::string query32s = "";
const int numcols = 17;
query1=(char *)"INSERT INTO stats_mysql_query_events VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)";
query32s = "INSERT INTO stats_mysql_query_events VALUES " + generate_multi_rows_query(32, numcols);
query32 = (char *)query32s.c_str();
rc = db->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, db);
rc = db->prepare_v2(query32, &statement32);
ASSERT_SQLITE_OK(rc, db);
char digest_hex_str[20]; // 2+sizeof(unsigned long long)*2+2
db->execute("BEGIN");
int row_idx=0;
int max_bulk_row_idx=events.size()/32;
max_bulk_row_idx=max_bulk_row_idx*32;
for (std::vector<MySQL_Event *>::const_iterator it = events.begin() ; it != events.end(); ++it) {
MySQL_Event *event = *it;
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
//Bind parameters. Handle potential errors in binding.
rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+1, event->thread_id); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+2, event->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+3, event->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+4, event->start_time); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+5, event->end_time); ASSERT_SQLITE_OK(rc, db);
sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+6, digest_hex_str, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+7, event->query_ptr, event->query_len, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+8, event->server, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+9, event->client, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+10, (int)event->et); ASSERT_SQLITE_OK(rc, db); // Assuming event_type is an enum mapped to integers
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+11, event->hid); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+12, event->extra_info, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+13, event->affected_rows); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+14, event->last_insert_id); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+15, event->rows_sent); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+16, event->client_stmt_id); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+17, event->gtid, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
if (idx==31) {
SAFE_SQLITE3_STEP2(statement32);
rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, db);
rc=(*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, db);
}
} else { // single row
//Bind parameters. Handle potential errors in binding.
rc = (*proxy_sqlite3_bind_int)(statement1, 1, event->thread_id); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement1, 2, event->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement1, 3, event->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement1, 4, event->start_time); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement1, 5, event->end_time); ASSERT_SQLITE_OK(rc, db);
sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest);
rc = (*proxy_sqlite3_bind_text)(statement1, 6, digest_hex_str, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement1, 7, event->query_ptr, event->query_len, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement1, 8, event->server, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement1, 9, event->client, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int)(statement1, 10, (int)event->et); ASSERT_SQLITE_OK(rc, db); // Assuming event_type is an enum mapped to integers
rc = (*proxy_sqlite3_bind_int64)(statement1, 11, event->hid); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement1, 12, event->extra_info, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement1, 13, event->affected_rows); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement1, 14, event->last_insert_id); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int64)(statement1, 15, event->rows_sent); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_int)(statement1, 16, event->client_stmt_id); ASSERT_SQLITE_OK(rc, db);
rc = (*proxy_sqlite3_bind_text)(statement1, 17, event->gtid, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db);
SAFE_SQLITE3_STEP2(statement1);
rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db);
rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db);
}
row_idx++;
}
(*proxy_sqlite3_finalize)(statement1);
(*proxy_sqlite3_finalize)(statement32);
db->execute("COMMIT");
}

Loading…
Cancel
Save