diff --git a/include/MySQL_Logger.hpp b/include/MySQL_Logger.hpp index 5c27b5b21..f9aae93b5 100644 --- a/include/MySQL_Logger.hpp +++ b/include/MySQL_Logger.hpp @@ -6,6 +6,8 @@ #define PROXYSQL_LOGGER_PTHREAD_MUTEX +class MySQL_Logger; + class MySQL_Event { private: uint32_t thread_id; @@ -51,6 +53,7 @@ class MySQL_Event { void set_affected_rows(uint64_t ar, uint64_t lid); void set_rows_sent(uint64_t rs); void set_gtid(MySQL_Session *sess); + friend MySQL_Logger; }; /** @@ -159,6 +162,7 @@ class MySQL_Logger { void wrlock(); void wrunlock(); MySQL_Logger_CircularBuffer * MyLogCB; + void insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector& events); }; diff --git a/lib/MySQL_Logger.cpp b/lib/MySQL_Logger.cpp index f8a999df6..e04564f98 100644 --- a/lib/MySQL_Logger.cpp +++ b/lib/MySQL_Logger.cpp @@ -1145,7 +1145,7 @@ void MySQL_Logger::print_version() { fprintf(stderr,"Standard ProxySQL MySQL Logger rev. %s -- %s -- %s\n", PROXYSQL_MYSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); }; -MySQL_Logger_CircularBuffer::MySQL_Logger_CircularBuffer(size_t size) : buffer_size(size), event_buffer(size) {} +MySQL_Logger_CircularBuffer::MySQL_Logger_CircularBuffer(size_t size) : event_buffer(size), buffer_size(size) {} MySQL_Logger_CircularBuffer::~MySQL_Logger_CircularBuffer() { std::lock_guard lock(mutex); @@ -1178,3 +1178,86 @@ void MySQL_Logger_CircularBuffer::setBufferSize(size_t newSize) { std::lock_guard lock(mutex); buffer_size = newSize; } + + +void MySQL_Logger::insertMysqlEventsIntoDb(SQLite3DB * db, const std::vector& events){ + int rc = 0; + sqlite3_stmt *statement1=NULL; + sqlite3_stmt *statement32=NULL; + char *query1=NULL; + char *query32=NULL; + std::string query32s = ""; + const int numcols = 17; + query1=(char *)"INSERT INTO stats_mysql_query_events VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)"; + query32s = "INSERT INTO stats_mysql_query_events VALUES " + generate_multi_rows_query(32, numcols); + query32 = (char *)query32s.c_str(); + rc = db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, db); + rc = db->prepare_v2(query32, &statement32); + ASSERT_SQLITE_OK(rc, db); + + char digest_hex_str[20]; // 2+sizeof(unsigned long long)*2+2 + + db->execute("BEGIN"); + + int row_idx=0; + int max_bulk_row_idx=events.size()/32; + max_bulk_row_idx=max_bulk_row_idx*32; + for (std::vector::const_iterator it = events.begin() ; it != events.end(); ++it) { + MySQL_Event *event = *it; + int idx=row_idx%32; + if (row_idxthread_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+2, event->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+3, event->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+4, event->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+5, event->end_time); ASSERT_SQLITE_OK(rc, db); + sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+6, digest_hex_str, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+7, event->query_ptr, event->query_len, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+8, event->server, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+9, event->client, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+10, (int)event->et); ASSERT_SQLITE_OK(rc, db); // Assuming event_type is an enum mapped to integers + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+11, event->hid); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+12, event->extra_info, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+13, event->affected_rows); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+14, event->last_insert_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement32, (idx*numcols)+15, event->rows_sent); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement32, (idx*numcols)+16, event->client_stmt_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement32, (idx*numcols)+17, event->gtid, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + if (idx==31) { + SAFE_SQLITE3_STEP2(statement32); + rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, db); + rc=(*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, db); + } + } else { // single row + //Bind parameters. Handle potential errors in binding. + rc = (*proxy_sqlite3_bind_int)(statement1, 1, event->thread_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 2, event->username, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 3, event->schemaname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 4, event->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 5, event->end_time); ASSERT_SQLITE_OK(rc, db); + sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest); + rc = (*proxy_sqlite3_bind_text)(statement1, 6, digest_hex_str, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 7, event->query_ptr, event->query_len, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 8, event->server, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 9, event->client, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement1, 10, (int)event->et); ASSERT_SQLITE_OK(rc, db); // Assuming event_type is an enum mapped to integers + rc = (*proxy_sqlite3_bind_int64)(statement1, 11, event->hid); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 12, event->extra_info, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 13, event->affected_rows); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 14, event->last_insert_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 15, event->rows_sent); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int)(statement1, 16, event->client_stmt_id); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 17, event->gtid, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement1); + rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + row_idx++; + } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement32); + db->execute("COMMIT"); +}