|
|
|
|
@ -3,6 +3,7 @@ using json = nlohmann::json;
|
|
|
|
|
#define PROXYJSON
|
|
|
|
|
|
|
|
|
|
#include <fstream>
|
|
|
|
|
#include <map>
|
|
|
|
|
#include "proxysql.h"
|
|
|
|
|
#include "cpp.h"
|
|
|
|
|
|
|
|
|
|
@ -24,6 +25,109 @@ using json = nlohmann::json;
|
|
|
|
|
|
|
|
|
|
extern PgSQL_Logger *GloPgSQL_Logger;
|
|
|
|
|
|
|
|
|
|
using metric_name = std::string;
|
|
|
|
|
using metric_help = std::string;
|
|
|
|
|
using metric_tags = std::map<std::string, std::string>;
|
|
|
|
|
|
|
|
|
|
using pl_counter_tuple =
|
|
|
|
|
std::tuple<
|
|
|
|
|
p_pl_counter::metric,
|
|
|
|
|
metric_name,
|
|
|
|
|
metric_help,
|
|
|
|
|
metric_tags
|
|
|
|
|
>;
|
|
|
|
|
|
|
|
|
|
using pl_gauge_tuple =
|
|
|
|
|
std::tuple<
|
|
|
|
|
p_pl_gauge::metric,
|
|
|
|
|
metric_name,
|
|
|
|
|
metric_help,
|
|
|
|
|
metric_tags
|
|
|
|
|
>;
|
|
|
|
|
|
|
|
|
|
using pl_counter_vector = std::vector<pl_counter_tuple>;
|
|
|
|
|
using pl_gauge_vector = std::vector<pl_gauge_tuple>;
|
|
|
|
|
|
|
|
|
|
const std::tuple<pl_counter_vector, pl_gauge_vector>
|
|
|
|
|
pl_metrics_map = std::make_tuple(
|
|
|
|
|
pl_counter_vector {
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::memory_copy_count,
|
|
|
|
|
"proxysql_pgsql_logger_copy_total",
|
|
|
|
|
"Number of times events were copied to the in-memory/on-disk databases.",
|
|
|
|
|
metric_tags { { "target", "memory" } }
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::disk_copy_count,
|
|
|
|
|
"proxysql_pgsql_logger_copy_total",
|
|
|
|
|
"Number of times events were copied to the in-memory/on-disk databases.",
|
|
|
|
|
metric_tags { { "target", "disk" } }
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::get_all_events_calls_count,
|
|
|
|
|
"proxysql_pgsql_logger_get_all_events_calls_total",
|
|
|
|
|
"Number of times the 'get_all_events' method was called.",
|
|
|
|
|
metric_tags {}
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::get_all_events_events_count,
|
|
|
|
|
"proxysql_pgsql_logger_get_all_events_events_total",
|
|
|
|
|
"Number of events retrieved by the `get_all_events` method.",
|
|
|
|
|
metric_tags {}
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::total_memory_copy_time_us,
|
|
|
|
|
"proxysql_pgsql_logger_copy_seconds_total",
|
|
|
|
|
"Total time spent copying events to the in-memory/on-disk databases.",
|
|
|
|
|
metric_tags { { "target", "memory" } }
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::total_disk_copy_time_us,
|
|
|
|
|
"proxysql_pgsql_logger_copy_seconds_total",
|
|
|
|
|
"Total time spent copying events to the in-memory/on-disk databases.",
|
|
|
|
|
metric_tags { { "target", "disk" } }
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::total_get_all_events_time_us,
|
|
|
|
|
"proxysql_pgsql_logger_get_all_events_seconds_total",
|
|
|
|
|
"Total time spent in `get_all_events` method.",
|
|
|
|
|
metric_tags {}
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::total_events_copied_to_memory,
|
|
|
|
|
"proxysql_pgsql_logger_events_copied_total",
|
|
|
|
|
"Total number of events copied to the in-memory/on-disk databases.",
|
|
|
|
|
metric_tags { { "target", "memory" } }
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::total_events_copied_to_disk,
|
|
|
|
|
"proxysql_pgsql_logger_events_copied_total",
|
|
|
|
|
"Total number of events copied to the in-memory/on-disk databases.",
|
|
|
|
|
metric_tags { { "target", "disk" } }
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::circular_buffer_events_added_count,
|
|
|
|
|
"proxysql_pgsql_logger_circular_buffer_events_total",
|
|
|
|
|
"The total number of events added/dropped to/from the circular buffer.",
|
|
|
|
|
metric_tags { { "type", "added" } }
|
|
|
|
|
),
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_counter::circular_buffer_events_dropped_count,
|
|
|
|
|
"proxysql_pgsql_logger_circular_buffer_events_total",
|
|
|
|
|
"The total number of events added/dropped to/from the circular buffer.",
|
|
|
|
|
metric_tags { { "type", "dropped" } }
|
|
|
|
|
),
|
|
|
|
|
},
|
|
|
|
|
pl_gauge_vector {
|
|
|
|
|
std::make_tuple(
|
|
|
|
|
p_pl_gauge::circular_buffer_events_size,
|
|
|
|
|
"proxysql_pgsql_logger_circular_buffer_events",
|
|
|
|
|
"Number of events currently present in the circular buffer.",
|
|
|
|
|
metric_tags {}
|
|
|
|
|
),
|
|
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
static uint8_t encode_length(uint64_t len, unsigned char *hd) {
|
|
|
|
|
if (len < 251) return 1;
|
|
|
|
|
if (len < 65536) { if (hd) { *hd=0xfc; }; return 3; }
|
|
|
|
|
@ -47,20 +151,105 @@ PgSQL_Event::PgSQL_Event (PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char *
|
|
|
|
|
thread_id=_thread_id;
|
|
|
|
|
username=_username;
|
|
|
|
|
schemaname=_schemaname;
|
|
|
|
|
username_len=0;
|
|
|
|
|
schemaname_len=0;
|
|
|
|
|
client_stmt_name_len=0;
|
|
|
|
|
start_time=_start_time;
|
|
|
|
|
end_time=_end_time;
|
|
|
|
|
query_digest=_query_digest;
|
|
|
|
|
query_ptr = NULL;
|
|
|
|
|
query_len = 0;
|
|
|
|
|
client=_client;
|
|
|
|
|
client_len=_client_len;
|
|
|
|
|
et=_et;
|
|
|
|
|
hid=UINT64_MAX;
|
|
|
|
|
server=NULL;
|
|
|
|
|
server_len=0;
|
|
|
|
|
extra_info = NULL;
|
|
|
|
|
have_affected_rows=false;
|
|
|
|
|
affected_rows=0;
|
|
|
|
|
have_rows_sent=false;
|
|
|
|
|
rows_sent=0;
|
|
|
|
|
client_stmt_name=NULL;
|
|
|
|
|
sqlstate = NULL;
|
|
|
|
|
errmsg = NULL;
|
|
|
|
|
free_on_delete = false; // do not own pointers by default
|
|
|
|
|
memset(buf, 0, sizeof(buf));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PgSQL_Event::PgSQL_Event(const PgSQL_Event& other) {
|
|
|
|
|
memcpy(this, &other, sizeof(PgSQL_Event));
|
|
|
|
|
|
|
|
|
|
if (other.username != nullptr) {
|
|
|
|
|
username = strdup(other.username);
|
|
|
|
|
}
|
|
|
|
|
if (other.schemaname != nullptr) {
|
|
|
|
|
schemaname = strdup(other.schemaname);
|
|
|
|
|
}
|
|
|
|
|
if (other.query_ptr != nullptr) {
|
|
|
|
|
size_t maxQueryLen = pgsql_thread___eventslog_buffer_max_query_length;
|
|
|
|
|
size_t lenToCopy = std::min(other.query_len, maxQueryLen);
|
|
|
|
|
query_ptr = (char*)malloc(lenToCopy + 1);
|
|
|
|
|
memcpy(query_ptr, other.query_ptr, lenToCopy);
|
|
|
|
|
query_ptr[lenToCopy] = '\0';
|
|
|
|
|
query_len = lenToCopy;
|
|
|
|
|
}
|
|
|
|
|
if (other.server != nullptr) {
|
|
|
|
|
server = (char*)malloc(server_len + 1);
|
|
|
|
|
memcpy(server, other.server, server_len);
|
|
|
|
|
server[server_len] = '\0';
|
|
|
|
|
}
|
|
|
|
|
if (other.client != nullptr) {
|
|
|
|
|
client = (char*)malloc(client_len + 1);
|
|
|
|
|
memcpy(client, other.client, client_len);
|
|
|
|
|
client[client_len] = '\0';
|
|
|
|
|
}
|
|
|
|
|
if (other.extra_info != nullptr) {
|
|
|
|
|
extra_info = strdup(other.extra_info);
|
|
|
|
|
}
|
|
|
|
|
if (other.client_stmt_name != nullptr) {
|
|
|
|
|
client_stmt_name = strdup(other.client_stmt_name);
|
|
|
|
|
}
|
|
|
|
|
if (other.sqlstate != nullptr) {
|
|
|
|
|
sqlstate = strdup(other.sqlstate);
|
|
|
|
|
}
|
|
|
|
|
if (other.errmsg != nullptr) {
|
|
|
|
|
errmsg = strdup(other.errmsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
free_on_delete = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PgSQL_Event::~PgSQL_Event() {
|
|
|
|
|
if (free_on_delete == true) {
|
|
|
|
|
if (username != nullptr) {
|
|
|
|
|
free(username); username = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (schemaname != nullptr) {
|
|
|
|
|
free(schemaname); schemaname = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (query_ptr != nullptr) {
|
|
|
|
|
free(query_ptr); query_ptr = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (server != nullptr) {
|
|
|
|
|
free(server); server = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (client != nullptr) {
|
|
|
|
|
free(client); client = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (extra_info != nullptr) {
|
|
|
|
|
free(extra_info); extra_info = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (client_stmt_name != nullptr) {
|
|
|
|
|
free(client_stmt_name); client_stmt_name = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (sqlstate != nullptr) {
|
|
|
|
|
free(sqlstate); sqlstate = nullptr;
|
|
|
|
|
}
|
|
|
|
|
if (errmsg != nullptr) {
|
|
|
|
|
free(errmsg); errmsg = nullptr;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Event::set_client_stmt_name(char* client_stmt_name) {
|
|
|
|
|
@ -98,6 +287,11 @@ void PgSQL_Event::set_server(int _hid, const char *ptr, int len) {
|
|
|
|
|
hid=_hid;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Event::set_error(const char* _sqlstate, const char* _errmsg) {
|
|
|
|
|
sqlstate = const_cast<char*>(_sqlstate);
|
|
|
|
|
errmsg = const_cast<char*>(_errmsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint64_t PgSQL_Event::write(std::fstream *f, PgSQL_Session *sess) {
|
|
|
|
|
uint64_t total_bytes=0;
|
|
|
|
|
switch (et) {
|
|
|
|
|
@ -406,6 +600,12 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) {
|
|
|
|
|
if (have_rows_sent == true) {
|
|
|
|
|
j["rows_sent"] = rows_sent;
|
|
|
|
|
}
|
|
|
|
|
if (sqlstate != nullptr) {
|
|
|
|
|
j["sqlstate"] = sqlstate;
|
|
|
|
|
}
|
|
|
|
|
if (errmsg != nullptr) {
|
|
|
|
|
j["error"] = errmsg;
|
|
|
|
|
}
|
|
|
|
|
j["query"] = string(query_ptr, query_len);
|
|
|
|
|
j["starttime_timestamp_us"] = start_time;
|
|
|
|
|
{
|
|
|
|
|
@ -451,7 +651,7 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) {
|
|
|
|
|
|
|
|
|
|
extern PgSQL_Query_Processor* GloPgQPro;
|
|
|
|
|
|
|
|
|
|
PgSQL_Logger::PgSQL_Logger() {
|
|
|
|
|
PgSQL_Logger::PgSQL_Logger() : metrics{0, 0, 0, 0, 0, 0, 0, 0, 0} {
|
|
|
|
|
events.enabled=false;
|
|
|
|
|
events.base_filename=NULL;
|
|
|
|
|
events.datadir=NULL;
|
|
|
|
|
@ -471,6 +671,14 @@ PgSQL_Logger::PgSQL_Logger() {
|
|
|
|
|
audit.logfile=NULL;
|
|
|
|
|
audit.log_file_id=0;
|
|
|
|
|
audit.max_log_file_size=100*1024*1024;
|
|
|
|
|
PgLogCB = new PgSQL_Logger_CircularBuffer(0);
|
|
|
|
|
|
|
|
|
|
init_prometheus_counter_array<pl_metrics_map_idx, p_pl_counter>(
|
|
|
|
|
pl_metrics_map, this->prom_metrics.p_counter_array
|
|
|
|
|
);
|
|
|
|
|
init_prometheus_gauge_array<pl_metrics_map_idx, p_pl_gauge>(
|
|
|
|
|
pl_metrics_map, this->prom_metrics.p_gauge_array
|
|
|
|
|
);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
PgSQL_Logger::~PgSQL_Logger() {
|
|
|
|
|
@ -482,6 +690,7 @@ PgSQL_Logger::~PgSQL_Logger() {
|
|
|
|
|
free(audit.datadir);
|
|
|
|
|
}
|
|
|
|
|
free(audit.base_filename);
|
|
|
|
|
delete PgLogCB;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
void PgSQL_Logger::wrlock() {
|
|
|
|
|
@ -658,8 +867,11 @@ void PgSQL_Logger::audit_set_datadir(char *s) {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
|
|
|
|
|
if (events.enabled==false) return;
|
|
|
|
|
if (events.logfile==NULL) return;
|
|
|
|
|
int elmhs = pgsql_thread___eventslog_buffer_history_size;
|
|
|
|
|
if (elmhs == 0) {
|
|
|
|
|
if (events.enabled == false) return;
|
|
|
|
|
if (events.logfile == NULL) return;
|
|
|
|
|
}
|
|
|
|
|
// 'PgSQL_Session::client_myds' could be NULL in case of 'RequestEnd' being called over a freshly created session
|
|
|
|
|
// due to a failed 'CONNECTION_RESET'. Because this scenario isn't a client request, we just return.
|
|
|
|
|
if (sess->client_myds==NULL || sess->client_myds->myconn== NULL) return;
|
|
|
|
|
@ -758,6 +970,9 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
|
|
|
|
|
me.set_affected_rows(sess->CurrentQuery.affected_rows);
|
|
|
|
|
}
|
|
|
|
|
me.set_rows_sent(sess->CurrentQuery.rows_sent);
|
|
|
|
|
if (myds && myds->myconn && myds->myconn->is_error_present()) {
|
|
|
|
|
me.set_error(myds->myconn->get_error_code_str(), myds->myconn->get_error_message().c_str());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int sl=0;
|
|
|
|
|
char *sa=(char *)""; // default
|
|
|
|
|
@ -782,17 +997,20 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
|
|
|
|
|
// right before the write to disk
|
|
|
|
|
//wrlock();
|
|
|
|
|
|
|
|
|
|
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump
|
|
|
|
|
GloPgSQL_Logger->wrlock();
|
|
|
|
|
|
|
|
|
|
me.write(events.logfile, sess);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unsigned long curpos=events.logfile->tellp();
|
|
|
|
|
if (curpos > events.max_log_file_size) {
|
|
|
|
|
events_flush_log_unlocked();
|
|
|
|
|
if ((events.enabled == true) && (events.logfile != nullptr)) {
|
|
|
|
|
//add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump
|
|
|
|
|
GloPgSQL_Logger->wrlock();
|
|
|
|
|
me.write(events.logfile, sess);
|
|
|
|
|
unsigned long curpos=events.logfile->tellp();
|
|
|
|
|
if (curpos > events.max_log_file_size) {
|
|
|
|
|
events_flush_log_unlocked();
|
|
|
|
|
}
|
|
|
|
|
wrunlock();
|
|
|
|
|
}
|
|
|
|
|
if (PgLogCB->buffer_size != 0) {
|
|
|
|
|
PgSQL_Event* me2 = new PgSQL_Event(me);
|
|
|
|
|
PgLogCB->insert(me2);
|
|
|
|
|
}
|
|
|
|
|
wrunlock();
|
|
|
|
|
|
|
|
|
|
if (cl && sess->client_myds->addr.port) {
|
|
|
|
|
free(ca);
|
|
|
|
|
@ -1057,3 +1275,220 @@ void PgSQL_Logger::print_version() {
|
|
|
|
|
fprintf(stderr,"Standard ProxySQL PgSQL Logger rev. %s -- %s -- %s\n", PROXYSQL_PGSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PgSQL_Logger_CircularBuffer::PgSQL_Logger_CircularBuffer(size_t size) :
|
|
|
|
|
event_buffer(),
|
|
|
|
|
eventsAddedCount(0), eventsDroppedCount(0),
|
|
|
|
|
buffer_size(size) {}
|
|
|
|
|
|
|
|
|
|
PgSQL_Logger_CircularBuffer::~PgSQL_Logger_CircularBuffer() {
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
for (PgSQL_Event* event : event_buffer) {
|
|
|
|
|
delete event;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Logger_CircularBuffer::insert(PgSQL_Event* event) {
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
eventsAddedCount++;
|
|
|
|
|
if (buffer_size == 0) {
|
|
|
|
|
delete event;
|
|
|
|
|
eventsDroppedCount++;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
while (event_buffer.size() >= buffer_size) {
|
|
|
|
|
delete event_buffer.front();
|
|
|
|
|
event_buffer.pop_front();
|
|
|
|
|
eventsDroppedCount++;
|
|
|
|
|
}
|
|
|
|
|
event_buffer.push_back(event);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Logger_CircularBuffer::get_all_events(std::vector<PgSQL_Event*>& events) {
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
events.reserve(event_buffer.size());
|
|
|
|
|
events.insert(events.end(), event_buffer.begin(), event_buffer.end());
|
|
|
|
|
event_buffer.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t PgSQL_Logger_CircularBuffer::size() {
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
return event_buffer.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t PgSQL_Logger_CircularBuffer::getBufferSize() const {
|
|
|
|
|
return buffer_size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Logger_CircularBuffer::setBufferSize(size_t newSize) {
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
buffer_size = newSize;
|
|
|
|
|
while (event_buffer.size() > buffer_size) {
|
|
|
|
|
delete event_buffer.front();
|
|
|
|
|
event_buffer.pop_front();
|
|
|
|
|
eventsDroppedCount++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Logger::insertPgSQLEventsIntoDb(SQLite3DB* db, const std::string& tableName, size_t numEvents, std::vector<PgSQL_Event*>::const_iterator begin) {
|
|
|
|
|
int rc = 0;
|
|
|
|
|
const int numcols = 17;
|
|
|
|
|
|
|
|
|
|
std::string coldefs = "(thread_id, username, database, start_time, end_time, query_digest, query, server, client, event_type, hid, extra_info, affected_rows, rows_sent, client_stmt_name, sqlstate, error)";
|
|
|
|
|
std::string query1s = "INSERT INTO " + tableName + coldefs + " VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)";
|
|
|
|
|
std::string query32s = "INSERT INTO " + tableName + coldefs + " VALUES " + generate_multi_rows_query(32, numcols);
|
|
|
|
|
|
|
|
|
|
auto [rc1, statement1_unique] = db->prepare_v2(query1s.c_str());
|
|
|
|
|
ASSERT_SQLITE_OK(rc1, db);
|
|
|
|
|
auto [rc2, statement32_unique] = db->prepare_v2(query32s.c_str());
|
|
|
|
|
ASSERT_SQLITE_OK(rc2, db);
|
|
|
|
|
sqlite3_stmt* statement1 = statement1_unique.get();
|
|
|
|
|
sqlite3_stmt* statement32 = statement32_unique.get();
|
|
|
|
|
|
|
|
|
|
auto bind_text_or_null = [&](sqlite3_stmt* stmt, int idx, const char* v) {
|
|
|
|
|
if (v) {
|
|
|
|
|
rc = (*proxy_sqlite3_bind_text)(stmt, idx, v, -1, SQLITE_TRANSIENT);
|
|
|
|
|
} else {
|
|
|
|
|
rc = (*proxy_sqlite3_bind_null)(stmt, idx);
|
|
|
|
|
}
|
|
|
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
char digest_hex_str[20];
|
|
|
|
|
db->execute("BEGIN");
|
|
|
|
|
|
|
|
|
|
int row_idx = 0;
|
|
|
|
|
int max_bulk_row_idx = (numEvents / 32) * 32;
|
|
|
|
|
for (std::vector<PgSQL_Event*>::const_iterator it = begin; it != begin + numEvents; ++it) {
|
|
|
|
|
PgSQL_Event* event = *it;
|
|
|
|
|
int idx = row_idx % 32;
|
|
|
|
|
sqlite3_stmt* stmt = (row_idx < max_bulk_row_idx ? statement32 : statement1);
|
|
|
|
|
int base = (row_idx < max_bulk_row_idx ? idx * numcols : 0);
|
|
|
|
|
|
|
|
|
|
rc = (*proxy_sqlite3_bind_int)(stmt, base + 1, event->thread_id); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
bind_text_or_null(stmt, base + 2, event->username);
|
|
|
|
|
bind_text_or_null(stmt, base + 3, event->schemaname);
|
|
|
|
|
rc = (*proxy_sqlite3_bind_int64)(stmt, base + 4, event->start_time); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
rc = (*proxy_sqlite3_bind_int64)(stmt, base + 5, event->end_time); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
sprintf(digest_hex_str, "0x%016llX", (long long unsigned int)event->query_digest);
|
|
|
|
|
bind_text_or_null(stmt, base + 6, digest_hex_str);
|
|
|
|
|
bind_text_or_null(stmt, base + 7, event->query_ptr);
|
|
|
|
|
bind_text_or_null(stmt, base + 8, event->server);
|
|
|
|
|
bind_text_or_null(stmt, base + 9, event->client);
|
|
|
|
|
rc = (*proxy_sqlite3_bind_int)(stmt, base + 10, (int)event->et); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
rc = (*proxy_sqlite3_bind_int64)(stmt, base + 11, event->hid); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
bind_text_or_null(stmt, base + 12, event->extra_info);
|
|
|
|
|
rc = (*proxy_sqlite3_bind_int64)(stmt, base + 13, event->affected_rows); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
rc = (*proxy_sqlite3_bind_int64)(stmt, base + 14, event->rows_sent); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
bind_text_or_null(stmt, base + 15, event->client_stmt_name);
|
|
|
|
|
bind_text_or_null(stmt, base + 16, event->sqlstate);
|
|
|
|
|
bind_text_or_null(stmt, base + 17, event->errmsg);
|
|
|
|
|
|
|
|
|
|
if (row_idx < max_bulk_row_idx) {
|
|
|
|
|
if (idx == 31) {
|
|
|
|
|
SAFE_SQLITE3_STEP2(statement32);
|
|
|
|
|
rc = (*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
rc = (*proxy_sqlite3_reset)(statement32); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
SAFE_SQLITE3_STEP2(statement1);
|
|
|
|
|
rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
row_idx++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
db->execute("COMMIT");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int PgSQL_Logger::processEvents(SQLite3DB* statsdb, SQLite3DB* statsdb_disk) {
|
|
|
|
|
unsigned long long startTimeMicros = monotonic_time();
|
|
|
|
|
std::vector<PgSQL_Event*> events = {};
|
|
|
|
|
PgLogCB->get_all_events(events);
|
|
|
|
|
metrics.getAllEventsCallsCount++;
|
|
|
|
|
if (events.empty()) return 0;
|
|
|
|
|
|
|
|
|
|
unsigned long long afterGetAllEventsTimeMicros = monotonic_time();
|
|
|
|
|
metrics.getAllEventsEventsCount += events.size();
|
|
|
|
|
metrics.totalGetAllEventsTimeMicros += (afterGetAllEventsTimeMicros - startTimeMicros);
|
|
|
|
|
|
|
|
|
|
if (statsdb_disk != nullptr) {
|
|
|
|
|
unsigned long long diskStartTimeMicros = monotonic_time();
|
|
|
|
|
insertPgSQLEventsIntoDb(statsdb_disk, "history_pgsql_query_events", events.size(), events.begin());
|
|
|
|
|
unsigned long long diskEndTimeMicros = monotonic_time();
|
|
|
|
|
metrics.diskCopyCount++;
|
|
|
|
|
metrics.totalDiskCopyTimeMicros += (diskEndTimeMicros - diskStartTimeMicros);
|
|
|
|
|
metrics.totalEventsCopiedToDisk += events.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (statsdb != nullptr) {
|
|
|
|
|
unsigned long long memoryStartTimeMicros = monotonic_time();
|
|
|
|
|
size_t maxInMemorySize = pgsql_thread___eventslog_table_memory_size;
|
|
|
|
|
size_t numEventsToInsert = std::min(events.size(), maxInMemorySize);
|
|
|
|
|
|
|
|
|
|
if (events.size() >= maxInMemorySize) {
|
|
|
|
|
statsdb->execute("DELETE FROM stats_pgsql_query_events");
|
|
|
|
|
} else {
|
|
|
|
|
int current_rows = statsdb->return_one_int((char*)"SELECT COUNT(*) FROM stats_pgsql_query_events");
|
|
|
|
|
int rows_to_keep = maxInMemorySize - events.size();
|
|
|
|
|
if (current_rows > rows_to_keep) {
|
|
|
|
|
int rows_to_delete = (current_rows - rows_to_keep);
|
|
|
|
|
std::string query = "SELECT MAX(id) FROM (SELECT id FROM stats_pgsql_query_events ORDER BY id LIMIT " + std::to_string(rows_to_delete) + ")";
|
|
|
|
|
int maxIdToDelete = statsdb->return_one_int(query.c_str());
|
|
|
|
|
std::string delete_stmt = "DELETE FROM stats_pgsql_query_events WHERE id <= " + std::to_string(maxIdToDelete);
|
|
|
|
|
statsdb->execute(delete_stmt.c_str());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
insertPgSQLEventsIntoDb(statsdb, "stats_pgsql_query_events", numEventsToInsert, events.begin());
|
|
|
|
|
unsigned long long memoryEndTimeMicros = monotonic_time();
|
|
|
|
|
metrics.memoryCopyCount++;
|
|
|
|
|
metrics.totalMemoryCopyTimeMicros += (memoryEndTimeMicros - memoryStartTimeMicros);
|
|
|
|
|
metrics.totalEventsCopiedToMemory += numEventsToInsert;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (PgSQL_Event* event : events) {
|
|
|
|
|
delete event;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return events.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::unordered_map<std::string, unsigned long long> PgSQL_Logger::getAllMetrics() const {
|
|
|
|
|
std::unordered_map<std::string, unsigned long long> allMetrics;
|
|
|
|
|
|
|
|
|
|
allMetrics["memoryCopyCount"] = metrics.memoryCopyCount;
|
|
|
|
|
allMetrics["diskCopyCount"] = metrics.diskCopyCount;
|
|
|
|
|
allMetrics["getAllEventsCallsCount"] = metrics.getAllEventsCallsCount;
|
|
|
|
|
allMetrics["getAllEventsEventsCount"] = metrics.getAllEventsEventsCount;
|
|
|
|
|
allMetrics["totalMemoryCopyTimeMicros"] = metrics.totalMemoryCopyTimeMicros;
|
|
|
|
|
allMetrics["totalDiskCopyTimeMicros"] = metrics.totalDiskCopyTimeMicros;
|
|
|
|
|
allMetrics["totalGetAllEventsTimeMicros"] = metrics.totalGetAllEventsTimeMicros;
|
|
|
|
|
allMetrics["totalEventsCopiedToMemory"] = metrics.totalEventsCopiedToMemory;
|
|
|
|
|
allMetrics["totalEventsCopiedToDisk"] = metrics.totalEventsCopiedToDisk;
|
|
|
|
|
allMetrics["circularBufferEventsAddedCount"] = PgLogCB->getEventsAddedCount();
|
|
|
|
|
allMetrics["circularBufferEventsDroppedCount"] = PgLogCB->getEventsDroppedCount();
|
|
|
|
|
allMetrics["circularBufferEventsSize"] = PgLogCB->size();
|
|
|
|
|
|
|
|
|
|
return allMetrics;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PgSQL_Logger::p_update_metrics() {
|
|
|
|
|
using pl_c = p_pl_counter;
|
|
|
|
|
const auto& counters { this->prom_metrics.p_counter_array };
|
|
|
|
|
|
|
|
|
|
p_update_counter(counters[pl_c::memory_copy_count], metrics.memoryCopyCount);
|
|
|
|
|
p_update_counter(counters[pl_c::disk_copy_count], metrics.diskCopyCount);
|
|
|
|
|
p_update_counter(counters[pl_c::get_all_events_calls_count], metrics.getAllEventsCallsCount);
|
|
|
|
|
p_update_counter(counters[pl_c::get_all_events_events_count], metrics.getAllEventsEventsCount);
|
|
|
|
|
p_update_counter(counters[pl_c::total_memory_copy_time_us], metrics.totalMemoryCopyTimeMicros / (1000.0 * 1000));
|
|
|
|
|
p_update_counter(counters[pl_c::total_disk_copy_time_us], metrics.totalDiskCopyTimeMicros / (1000.0 * 1000));
|
|
|
|
|
p_update_counter(counters[pl_c::total_get_all_events_time_us], metrics.totalGetAllEventsTimeMicros / (1000.0 * 1000));
|
|
|
|
|
p_update_counter(counters[pl_c::total_events_copied_to_memory], metrics.totalEventsCopiedToMemory);
|
|
|
|
|
p_update_counter(counters[pl_c::total_events_copied_to_disk], metrics.totalEventsCopiedToDisk);
|
|
|
|
|
p_update_counter(counters[pl_c::circular_buffer_events_added_count], PgLogCB->getEventsAddedCount());
|
|
|
|
|
p_update_counter(counters[pl_c::circular_buffer_events_dropped_count], PgLogCB->getEventsDroppedCount());
|
|
|
|
|
|
|
|
|
|
using pl_g = p_pl_gauge;
|
|
|
|
|
const auto& gauges { this->prom_metrics.p_gauge_array };
|
|
|
|
|
gauges[pl_g::circular_buffer_events_size]->Set(PgLogCB->size());
|
|
|
|
|
}
|
|
|
|
|
|