Added query logging support for PostgreSQL

v3.0_postgres_query_logging_issue_5123
Rahim Kanji 5 months ago
parent 62b8103bae
commit a89291c9a9

@ -5,6 +5,26 @@
#define PROXYSQL_LOGGER_PTHREAD_MUTEX
enum class PGSQL_LOG_EVENT_TYPE {
SIMPLE_QUERY,
AUTH_OK,
AUTH_ERR,
AUTH_CLOSE,
AUTH_QUIT,
INITDB,
ADMIN_AUTH_OK,
ADMIN_AUTH_ERR,
ADMIN_AUTH_CLOSE,
ADMIN_AUTH_QUIT,
SQLITE_AUTH_OK,
SQLITE_AUTH_ERR,
SQLITE_AUTH_CLOSE,
SQLITE_AUTH_QUIT,
STMT_EXECUTE,
STMT_DESCRIBE,
STMT_PREPARE
};
class PgSQL_Event {
private:
uint32_t thread_id;
@ -24,7 +44,7 @@ class PgSQL_Event {
size_t client_len;
//uint64_t total_length;
unsigned char buf[10];
enum log_event_type et;
PGSQL_LOG_EVENT_TYPE et;
uint64_t hid;
char *extra_info;
char *client_stmt_name;
@ -35,7 +55,7 @@ class PgSQL_Event {
uint64_t rows_sent;
public:
PgSQL_Event(log_event_type _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len);
PgSQL_Event(PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len);
uint64_t write(std::fstream *f, PgSQL_Session *sess);
uint64_t write_query_format_1(std::fstream *f);
uint64_t write_query_format_2_json(std::fstream *f);
@ -89,7 +109,7 @@ class PgSQL_Logger {
void audit_set_datadir(char *);
void audit_set_base_filename();
void log_request(PgSQL_Session *, PgSQL_Data_Stream *);
void log_audit_entry(log_event_type, PgSQL_Session *, PgSQL_Data_Stream *, char *e = NULL);
void log_audit_entry(PGSQL_LOG_EVENT_TYPE, PgSQL_Session *, PgSQL_Data_Stream *, char *e = NULL);
void flush();
void wrlock();
void wrunlock();

@ -24,7 +24,7 @@ using json = nlohmann::json;
extern PgSQL_Logger *GloPgSQL_Logger;
static uint8_t mysql_encode_length(uint64_t len, unsigned char *hd) {
static uint8_t encode_length(uint64_t len, unsigned char *hd) {
if (len < 251) return 1;
if (len < 65536) { if (hd) { *hd=0xfc; }; return 3; }
if (len < 16777216) { if (hd) { *hd=0xfd; }; return 4; }
@ -43,7 +43,7 @@ static inline int write_encoded_length(unsigned char *p, uint64_t val, uint8_t l
return len;
}
PgSQL_Event::PgSQL_Event (log_event_type _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len) {
PgSQL_Event::PgSQL_Event (PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len) {
thread_id=_thread_id;
username=_username;
schemaname=_schemaname;
@ -85,6 +85,10 @@ void PgSQL_Event::set_extra_info(char *_err) {
void PgSQL_Event::set_query(const char *ptr, int len) {
query_ptr=(char *)ptr;
// Adjust length: input length includes the null terminator
if (len > 0) {
len--; // exclude '\0'
}
query_len=len;
}
@ -97,28 +101,29 @@ void PgSQL_Event::set_server(int _hid, const char *ptr, int len) {
uint64_t PgSQL_Event::write(std::fstream *f, PgSQL_Session *sess) {
uint64_t total_bytes=0;
switch (et) {
case PROXYSQL_COM_QUERY:
case PROXYSQL_COM_STMT_EXECUTE:
case PROXYSQL_COM_STMT_PREPARE:
case PGSQL_LOG_EVENT_TYPE::SIMPLE_QUERY:
case PGSQL_LOG_EVENT_TYPE::STMT_EXECUTE:
case PGSQL_LOG_EVENT_TYPE::STMT_PREPARE:
case PGSQL_LOG_EVENT_TYPE::STMT_DESCRIBE:
if (pgsql_thread___eventslog_format==1) { // format 1 , binary
total_bytes=write_query_format_1(f);
} else { // format 2 , json
total_bytes=write_query_format_2_json(f);
}
break;
case PROXYSQL_MYSQL_AUTH_OK:
case PROXYSQL_MYSQL_AUTH_ERR:
case PROXYSQL_MYSQL_AUTH_CLOSE:
case PROXYSQL_MYSQL_AUTH_QUIT:
case PROXYSQL_MYSQL_INITDB:
case PROXYSQL_ADMIN_AUTH_OK:
case PROXYSQL_ADMIN_AUTH_ERR:
case PROXYSQL_ADMIN_AUTH_CLOSE:
case PROXYSQL_ADMIN_AUTH_QUIT:
case PROXYSQL_SQLITE_AUTH_OK:
case PROXYSQL_SQLITE_AUTH_ERR:
case PROXYSQL_SQLITE_AUTH_CLOSE:
case PROXYSQL_SQLITE_AUTH_QUIT:
case PGSQL_LOG_EVENT_TYPE::AUTH_OK:
case PGSQL_LOG_EVENT_TYPE::AUTH_ERR:
case PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE:
case PGSQL_LOG_EVENT_TYPE::AUTH_QUIT:
case PGSQL_LOG_EVENT_TYPE::INITDB:
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_OK:
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_ERR:
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_CLOSE:
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_QUIT:
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_OK:
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_ERR:
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_CLOSE:
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_QUIT:
write_auth(f, sess);
break;
default:
@ -163,52 +168,52 @@ void PgSQL_Event::write_auth(std::fstream *f, PgSQL_Session *sess) {
j["extra_info"] = extra_info;
}
switch (et) {
case PROXYSQL_MYSQL_AUTH_OK:
j["event"]="MySQL_Client_Connect_OK";
case PGSQL_LOG_EVENT_TYPE::AUTH_OK:
j["event"]="PgSQL_Client_Connect_OK";
break;
case PROXYSQL_MYSQL_AUTH_ERR:
j["event"]="MySQL_Client_Connect_ERR";
case PGSQL_LOG_EVENT_TYPE::AUTH_ERR:
j["event"]="PgSQL_Client_Connect_ERR";
break;
case PROXYSQL_MYSQL_AUTH_CLOSE:
j["event"]="MySQL_Client_Close";
case PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE:
j["event"]="PGSQL_Client_Close";
break;
case PROXYSQL_MYSQL_AUTH_QUIT:
j["event"]="MySQL_Client_Quit";
case PGSQL_LOG_EVENT_TYPE::AUTH_QUIT:
j["event"]="PGSQL_Client_Quit";
break;
case PROXYSQL_MYSQL_INITDB:
j["event"]="MySQL_Client_Init_DB";
case PGSQL_LOG_EVENT_TYPE::INITDB:
j["event"]="PGSQL_Client_Init_DB";
break;
case PROXYSQL_ADMIN_AUTH_OK:
j["event"]="Admin_Connect_OK";
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_OK:
j["event"]="PGSQL_Admin_Connect_OK";
break;
case PROXYSQL_ADMIN_AUTH_ERR:
j["event"]="Admin_Connect_ERR";
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_ERR:
j["event"]="PGSQL_Admin_Connect_ERR";
break;
case PROXYSQL_ADMIN_AUTH_CLOSE:
j["event"]="Admin_Close";
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_CLOSE:
j["event"]="PGSQL_Admin_Close";
break;
case PROXYSQL_ADMIN_AUTH_QUIT:
j["event"]="Admin_Quit";
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_QUIT:
j["event"]="PGSQL_Admin_Quit";
break;
case PROXYSQL_SQLITE_AUTH_OK:
j["event"]="SQLite3_Connect_OK";
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_OK:
j["event"]="PGSQL_SQLite3_Connect_OK";
break;
case PROXYSQL_SQLITE_AUTH_ERR:
j["event"]="SQLite3_Connect_ERR";
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_ERR:
j["event"]="PGSQL_SQLite3_Connect_ERR";
break;
case PROXYSQL_SQLITE_AUTH_CLOSE:
j["event"]="SQLite3_Close";
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_CLOSE:
j["event"]="PGSQL_SQLite3_Close";
break;
case PROXYSQL_SQLITE_AUTH_QUIT:
j["event"]="SQLite3_Quit";
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_QUIT:
j["event"]="PGSQL_SQLite3_Quit";
break;
default:
break;
}
switch (et) {
case PROXYSQL_MYSQL_AUTH_CLOSE:
case PROXYSQL_ADMIN_AUTH_CLOSE:
case PROXYSQL_SQLITE_AUTH_CLOSE:
case PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE:
case PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_CLOSE:
case PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_CLOSE:
{
uint64_t curtime_real=realtime_time();
uint64_t curtime_mono=sess->thread->curtime;
@ -251,29 +256,29 @@ void PgSQL_Event::write_auth(std::fstream *f, PgSQL_Session *sess) {
uint64_t PgSQL_Event::write_query_format_1(std::fstream *f) {
uint64_t total_bytes=0;
total_bytes+=1; // et
total_bytes+=mysql_encode_length(thread_id, NULL);
total_bytes+=encode_length(thread_id, NULL);
username_len=strlen(username);
total_bytes+=mysql_encode_length(username_len,NULL)+username_len;
total_bytes+=encode_length(username_len,NULL)+username_len;
schemaname_len=strlen(schemaname);
total_bytes+=mysql_encode_length(schemaname_len,NULL)+schemaname_len;
total_bytes+=encode_length(schemaname_len,NULL)+schemaname_len;
total_bytes+=mysql_encode_length(client_len,NULL)+client_len;
total_bytes+=encode_length(client_len,NULL)+client_len;
total_bytes+=mysql_encode_length(hid, NULL);
total_bytes+=encode_length(hid, NULL);
if (hid!=UINT64_MAX) {
total_bytes+=mysql_encode_length(server_len,NULL)+server_len;
total_bytes+=encode_length(server_len,NULL)+server_len;
}
total_bytes+=mysql_encode_length(start_time,NULL);
total_bytes+=mysql_encode_length(end_time,NULL);
client_stmt_name_len=strlen(client_stmt_name);
total_bytes+=mysql_encode_length(client_stmt_name_len,NULL)+client_stmt_name_len;
total_bytes+=mysql_encode_length(affected_rows,NULL);
total_bytes+=mysql_encode_length(rows_sent,NULL);
total_bytes+=encode_length(start_time,NULL);
total_bytes+=encode_length(end_time,NULL);
client_stmt_name_len=client_stmt_name ? strlen(client_stmt_name) : 0;
total_bytes+=encode_length(client_stmt_name_len,NULL)+client_stmt_name_len;
total_bytes+=encode_length(affected_rows,NULL);
total_bytes+=encode_length(rows_sent,NULL);
total_bytes+=mysql_encode_length(query_digest,NULL);
total_bytes+=encode_length(query_digest,NULL);
total_bytes+=mysql_encode_length(query_len,NULL)+query_len;
total_bytes+=encode_length(query_len,NULL)+query_len;
// for performance reason, we are moving the write lock
// right before the write to disk
@ -287,64 +292,64 @@ uint64_t PgSQL_Event::write_query_format_1(std::fstream *f) {
f->write((char *)&et,1);
len=mysql_encode_length(thread_id,buf);
len=encode_length(thread_id,buf);
write_encoded_length(buf,thread_id,len,buf[0]);
f->write((char *)buf,len);
len=mysql_encode_length(username_len,buf);
len=encode_length(username_len,buf);
write_encoded_length(buf,username_len,len,buf[0]);
f->write((char *)buf,len);
f->write(username,username_len);
len=mysql_encode_length(schemaname_len,buf);
len=encode_length(schemaname_len,buf);
write_encoded_length(buf,schemaname_len,len,buf[0]);
f->write((char *)buf,len);
f->write(schemaname,schemaname_len);
len=mysql_encode_length(client_len,buf);
len=encode_length(client_len,buf);
write_encoded_length(buf,client_len,len,buf[0]);
f->write((char *)buf,len);
f->write(client,client_len);
len=mysql_encode_length(hid,buf);
len=encode_length(hid,buf);
write_encoded_length(buf,hid,len,buf[0]);
f->write((char *)buf,len);
if (hid!=UINT64_MAX) {
len=mysql_encode_length(server_len,buf);
len=encode_length(server_len,buf);
write_encoded_length(buf,server_len,len,buf[0]);
f->write((char *)buf,len);
f->write(server,server_len);
}
len=mysql_encode_length(start_time,buf);
len=encode_length(start_time,buf);
write_encoded_length(buf,start_time,len,buf[0]);
f->write((char *)buf,len);
len=mysql_encode_length(end_time,buf);
len=encode_length(end_time,buf);
write_encoded_length(buf,end_time,len,buf[0]);
f->write((char *)buf,len);
if (et == PROXYSQL_COM_STMT_PREPARE || et == PROXYSQL_COM_STMT_EXECUTE) {
len = mysql_encode_length(client_stmt_name_len, buf);
if (et == PGSQL_LOG_EVENT_TYPE::STMT_PREPARE || et == PGSQL_LOG_EVENT_TYPE::STMT_EXECUTE || et == PGSQL_LOG_EVENT_TYPE::STMT_DESCRIBE) {
len = encode_length(client_stmt_name_len, buf);
write_encoded_length(buf, client_stmt_name_len, len, buf[0]);
f->write((char*)buf, len);
f->write(client_stmt_name, client_stmt_name_len);
}
len=mysql_encode_length(affected_rows,buf);
len=encode_length(affected_rows,buf);
write_encoded_length(buf,affected_rows,len,buf[0]);
f->write((char *)buf,len);
len=mysql_encode_length(rows_sent,buf);
len=encode_length(rows_sent,buf);
write_encoded_length(buf,rows_sent,len,buf[0]);
f->write((char *)buf,len);
len=mysql_encode_length(query_digest,buf);
len=encode_length(query_digest,buf);
write_encoded_length(buf,query_digest,len,buf[0]);
f->write((char *)buf,len);
len=mysql_encode_length(query_len,buf);
len=encode_length(query_len,buf);
write_encoded_length(buf,query_len,len,buf[0]);
f->write((char *)buf,len);
if (query_len) {
@ -364,30 +369,27 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) {
}
j["thread_id"] = thread_id;
switch (et) {
case PROXYSQL_COM_STMT_EXECUTE:
j["event"]="COM_STMT_EXECUTE";
case PGSQL_LOG_EVENT_TYPE::STMT_EXECUTE:
j["event"]="PGSQL_STMT_EXECUTE";
break;
case PGSQL_LOG_EVENT_TYPE::STMT_PREPARE:
j["event"]="PGSQL_STMT_PREPARE";
break;
case PROXYSQL_COM_STMT_PREPARE:
j["event"]="COM_STMT_PREPARE";
case PGSQL_LOG_EVENT_TYPE::STMT_DESCRIBE:
j["event"]="PGSQL_STMT_DESCRIBE";
break;
default:
j["event"]="COM_QUERY";
j["event"]="PGSQL_SIMPLE_QUERY";
break;
}
if (username) {
j["username"] = username;
//} else {
// j["username"] = "";
}
if (schemaname) {
j["schemaname"] = schemaname;
//} else {
// j["schemaname"] = "";
}
if (client) {
j["client"] = client;
//} else {
// j["client"] = "";
}
if (hid!=UINT64_MAX) {
if (server) {
@ -404,7 +406,7 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) {
if (have_rows_sent == true) {
j["rows_sent"] = rows_sent;
}
j["query"] = string(query_ptr,query_len);
j["query"] = string(query_ptr, query_len);
j["starttime_timestamp_us"] = start_time;
{
time_t timer=start_time/1000/1000;
@ -432,8 +434,10 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) {
sprintf(digest_hex,"0x%016llX", (long long unsigned int)query_digest);
j["digest"] = digest_hex;
if (et == PROXYSQL_COM_STMT_PREPARE || et == PROXYSQL_COM_STMT_EXECUTE) {
j["client_stmt_name"] = client_stmt_name;
if (et == PGSQL_LOG_EVENT_TYPE::STMT_PREPARE || et == PGSQL_LOG_EVENT_TYPE::STMT_EXECUTE || et == PGSQL_LOG_EVENT_TYPE::STMT_DESCRIBE) {
if (client_stmt_name) {
j["client_stmt_name"] = client_stmt_name;
}
}
// for performance reason, we are moving the write lock
@ -554,10 +558,10 @@ void PgSQL_Logger::events_open_log_unlocked() {
events.logfile->exceptions ( std::ofstream::failbit | std::ofstream::badbit );
try {
events.logfile->open(filen , std::ios::out | std::ios::binary);
proxy_info("Starting new mysql event log file %s\n", filen);
proxy_info("Starting new pgsql event log file %s\n", filen);
}
catch (const std::ofstream::failure&) {
proxy_error("Error creating new mysql event log file %s\n", filen);
proxy_error("Error creating new pgsql event log file %s\n", filen);
delete events.logfile;
events.logfile=NULL;
}
@ -583,10 +587,10 @@ void PgSQL_Logger::audit_open_log_unlocked() {
audit.logfile->exceptions ( std::ofstream::failbit | std::ofstream::badbit );
try {
audit.logfile->open(filen , std::ios::out | std::ios::binary);
proxy_info("Starting new audit log file %s\n", filen);
proxy_info("Starting new pgsql audit log file %s\n", filen);
}
catch (const std::ofstream::failure&) {
proxy_error("Error creating new audit log file %s\n", filen);
proxy_error("Error creating new pgsql audit log file %s\n", filen);
delete audit.logfile;
audit.logfile=NULL;
}
@ -675,26 +679,32 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
sprintf(ca,"%s:%d",sess->client_myds->addr.addr,sess->client_myds->addr.port);
}
cl=strlen(ca);
enum log_event_type let = PROXYSQL_COM_QUERY; // default
PGSQL_LOG_EVENT_TYPE let = PGSQL_LOG_EVENT_TYPE::SIMPLE_QUERY; // default
switch (sess->status) {
case PROCESSING_STMT_EXECUTE:
let = PROXYSQL_COM_STMT_EXECUTE;
let = PGSQL_LOG_EVENT_TYPE::STMT_EXECUTE;
break;
case PROCESSING_STMT_PREPARE:
let = PROXYSQL_COM_STMT_PREPARE;
let = PGSQL_LOG_EVENT_TYPE::STMT_PREPARE;
break;
case PROCESSING_STMT_DESCRIBE:
let = PGSQL_LOG_EVENT_TYPE::STMT_DESCRIBE;
break;
case WAITING_CLIENT_DATA:
{
unsigned char c=*((unsigned char *)sess->pkt.ptr+sizeof(mysql_hdr));
switch ((enum_mysql_command)c) {
case _MYSQL_COM_STMT_PREPARE:
// proxysql is responding to COM_STMT_PREPARE without
/*unsigned char cmd = *(static_cast<unsigned char*>(sess->pkt.ptr));
switch (cmd) {
case 'P':
// proxysql is responding to PARSE without
// preparing on any backend
let = PROXYSQL_COM_STMT_PREPARE;
let = PGSQL_LOG_EVENT_TYPE::STMT_PREPARE;
break;
default:
break;
}
}*/
if (sess->pkt.ptr == nullptr)
let = PGSQL_LOG_EVENT_TYPE::STMT_PREPARE;
}
break;
default:
@ -703,7 +713,7 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
uint64_t query_digest = 0;
if (sess->status != PROCESSING_STMT_EXECUTE) {
if (sess->status != PROCESSING_STMT_EXECUTE && sess->status != PROCESSING_STMT_DESCRIBE) {
query_digest = GloPgQPro->get_digest(&sess->CurrentQuery.QueryParserArgs);
} else {
query_digest = sess->CurrentQuery.extended_query_info.stmt_info->digest;
@ -719,8 +729,9 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
char *c = NULL;
int ql = 0;
switch (sess->status) {
case PROCESSING_STMT_DESCRIBE:
case PROCESSING_STMT_EXECUTE:
c = (char *)sess->CurrentQuery.extended_query_info.stmt_info->query;
c = sess->CurrentQuery.extended_query_info.stmt_info->query;
ql = sess->CurrentQuery.extended_query_info.stmt_info->query_length;
me.set_client_stmt_name((char*)sess->CurrentQuery.extended_query_info.stmt_client_name);
break;
@ -732,7 +743,7 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
// 'WAITING_CLIENT_DATA'. This state is possible when the prepared statement is found in the
// global cache and due to that we immediately reply to the client and session doesn't reach
// 'PROCESSING_STMT_PREPARE' state. 'stmt_client_id' is expected to be '0' for anything that isn't
// a prepared statement, still, logging should rely 'log_event_type' instead of this value.
// a prepared statement, still, logging should rely 'PGSQL_LOG_EVENT_TYPE' instead of this value.
me.set_client_stmt_name((char*)sess->CurrentQuery.extended_query_info.stmt_client_name);
break;
}
@ -790,7 +801,7 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) {
}
}
void PgSQL_Logger::log_audit_entry(log_event_type _et, PgSQL_Session *sess, PgSQL_Data_Stream *myds, char *xi) {
void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess, PgSQL_Data_Stream *myds, char *xi) {
if (audit.enabled==false) return;
if (audit.logfile==NULL) return;
@ -808,50 +819,50 @@ void PgSQL_Logger::log_audit_entry(log_event_type _et, PgSQL_Session *sess, PgSQ
if (sess) {
// to reduce complexing in the calling function, we do some changes here
switch (_et) {
case PROXYSQL_MYSQL_AUTH_OK:
case PGSQL_LOG_EVENT_TYPE::AUTH_OK:
switch (sess->session_type) {
case PROXYSQL_SESSION_ADMIN:
case PROXYSQL_SESSION_STATS:
_et = PROXYSQL_ADMIN_AUTH_OK;
_et = PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_OK;
break;
case PROXYSQL_SESSION_SQLITE:
_et = PROXYSQL_SQLITE_AUTH_OK;
_et = PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_OK;
default:
break;
}
break;
case PROXYSQL_MYSQL_AUTH_ERR:
case PGSQL_LOG_EVENT_TYPE::AUTH_ERR:
switch (sess->session_type) {
case PROXYSQL_SESSION_ADMIN:
case PROXYSQL_SESSION_STATS:
_et = PROXYSQL_ADMIN_AUTH_ERR;
_et = PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_ERR;
break;
case PROXYSQL_SESSION_SQLITE:
_et = PROXYSQL_SQLITE_AUTH_ERR;
_et = PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_ERR;
default:
break;
}
break;
case PROXYSQL_MYSQL_AUTH_QUIT:
case PGSQL_LOG_EVENT_TYPE::AUTH_QUIT:
switch (sess->session_type) {
case PROXYSQL_SESSION_ADMIN:
case PROXYSQL_SESSION_STATS:
_et = PROXYSQL_ADMIN_AUTH_QUIT;
_et = PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_QUIT;
break;
case PROXYSQL_SESSION_SQLITE:
_et = PROXYSQL_SQLITE_AUTH_QUIT;
_et = PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_QUIT;
default:
break;
}
break;
case PROXYSQL_MYSQL_AUTH_CLOSE:
case PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE:
switch (sess->session_type) {
case PROXYSQL_SESSION_ADMIN:
case PROXYSQL_SESSION_STATS:
_et = PROXYSQL_ADMIN_AUTH_CLOSE;
_et = PGSQL_LOG_EVENT_TYPE::ADMIN_AUTH_CLOSE;
break;
case PROXYSQL_SESSION_SQLITE:
_et = PROXYSQL_SQLITE_AUTH_CLOSE;
_et = PGSQL_LOG_EVENT_TYPE::SQLITE_AUTH_CLOSE;
default:
break;
}

@ -1893,7 +1893,7 @@ void PgSQL_Session::handler___status_NONE_or_default(PtrSize_t& pkt) {
if (pkt.size == 5 && cmd == 'X') {
if (GloPgSQL_Logger) {
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_QUIT, this, NULL);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_QUIT, this, NULL);
}
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got QUIT packet\n");
if (thread) {
@ -2073,7 +2073,7 @@ __implicit_sync:
unsigned char command = *(static_cast<unsigned char*>(pkt.ptr));
if (command == 'X') {
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got QUIT packet\n");
if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_QUIT, this, NULL); }
if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_QUIT, this, NULL); }
l_free(pkt.size, pkt.ptr);
handler_ret = -1;
return handler_ret;
@ -2118,7 +2118,7 @@ __implicit_sync:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___not_mysql(pkt);
} else if (c == 'X') {
//proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n");
//if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_QUIT, this, NULL); }
//if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_QUIT, this, NULL); }
l_free(pkt.size, pkt.ptr);
handler_ret = -1;
return handler_ret;
@ -2284,7 +2284,7 @@ __implicit_sync:
break;
case 'X':
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got QUIT packet\n");
if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_QUIT, this, NULL); }
if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_QUIT, this, NULL); }
l_free(pkt.size, pkt.ptr);
handler_ret = -1;
return handler_ret;
@ -2572,6 +2572,7 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int
proxy_warning("Error OUT_OF_MEMORY during query on (%d,%s,%d,%d): %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_backend_pid(), myconn->get_error_code_with_message().c_str());
break;
default:
break; // continue normally
}
return false;
@ -2589,6 +2590,7 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds,
switch (status) {
case PROCESSING_STMT_PREPARE:
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE, this, NULL);
if (previous_status.size()) {
// an STMT_PREPARE failed
// we have a previous status, probably STMT_DESCRIBE or STMT_EXECUTE,
@ -2596,9 +2598,13 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds,
// for this reason we exit immediately
wrong_pass = true;
}
// fall through
PgSQL_Result_to_PgSQL_wire(myconn, myds);
break;
case PROCESSING_STMT_DESCRIBE:
case PROCESSING_STMT_EXECUTE:
PgSQL_Result_to_PgSQL_wire(myconn, myds);
LogQuery(myds);
break;
case PROCESSING_QUERY:
PgSQL_Result_to_PgSQL_wire(myconn, myds);
break;
@ -2656,15 +2662,6 @@ int PgSQL_Session::RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) {
const std::string& backend_stmt_name =
std::string(PROXYSQL_PS_PREFIX) + std::to_string(CurrentQuery.extended_query_info.stmt_backend_id);
rc = myconn->async_query(myds->revents, nullptr, 0, backend_stmt_name.c_str(), type, &CurrentQuery.extended_query_info);
// Handle edge case: Since libpq automatically sends a Sync after Execute,
// the Bind message is no longer pending on the backend. We must reset
// bind_waiting_for_execute in case the client sends a sequence like
// Bind/Describe/Execute/Describe/Sync, so that a subsequent Describe Portal
// does not incorrectly assume a pending Bind.
if (rc == 0 && type == PGSQL_EXTENDED_QUERY_TYPE_EXECUTE) {
bind_waiting_for_execute.reset(nullptr);
}
}
break;
/* case PROCESSING_STMT_EXECUTE:
@ -3060,9 +3057,12 @@ handler_again:
NEXT_IMMEDIATE(st);
}
}
// fall through
// fall through
case PROCESSING_STMT_DESCRIBE:
case PROCESSING_STMT_EXECUTE:
PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds);
LogQuery(myds);
break;
case PROCESSING_QUERY:
PgSQL_Result_to_PgSQL_wire(myconn, myconn->myds);
break;
@ -3080,6 +3080,15 @@ handler_again:
// LCOV_EXCL_STOP
}
// Handle edge case: Since libpq automatically sends a Sync after Execute,
// the Bind message is no longer pending on the backend. We must reset
// bind_waiting_for_execute in case the client sends a sequence like
// Bind/Describe/Execute/Describe/Sync, so that a subsequent Describe Portal
// does not incorrectly assume a pending Bind.
if (status == PROCESSING_STMT_EXECUTE) {
bind_waiting_for_execute.reset(nullptr);
}
// if we are in extended query mode, we need to check if we have a pending extended query messages
bool has_pending_messages = false;
if (processing_extended_query) {
@ -3442,7 +3451,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
client_myds->myprot.generate_error_packet(true, false, "Too many connections", PGSQL_ERROR_CODES::ERRCODE_TOO_MANY_CONNECTIONS,
true, true);
proxy_warning("pgsql-max_connections reached. Returning 'Too many connections'\n");
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_ERR, this, NULL, (char*)"pgsql-max_connections reached");
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_ERR, this, NULL, (char*)"pgsql-max_connections reached");
__sync_fetch_and_add(&PgHGM->status.access_denied_max_connections, 1);
}
else { // see issue #794
@ -3451,7 +3460,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
char* a = (char*)"User '%s' has exceeded the 'max_user_connections' resource (current value: %d)";
char* b = (char*)malloc(strlen(a) + strlen(client_myds->myconn->userinfo->username) + 16);
sprintf(b, a, client_myds->myconn->userinfo->username, used_users);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_ERR, this, NULL, b);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_ERR, this, NULL, b);
client_myds->myprot.generate_error_packet(true, false, b, PGSQL_ERROR_CODES::ERRCODE_TOO_MANY_CONNECTIONS,
true, true);
proxy_warning("User '%s' has exceeded the 'max_user_connections' resource (current value: %d)\n", client_myds->myconn->userinfo->username, used_users);
@ -3514,7 +3523,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
// we are good!
client_myds->myprot.welcome_client();
handshake_err = false;
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_OK, this, NULL);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_OK, this, NULL);
status = WAITING_CLIENT_DATA;
client_myds->DSS = STATE_CLIENT_AUTH_OK;
}
@ -3522,7 +3531,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
char* a = (char*)"User '%s' can only connect locally";
char* b = (char*)malloc(strlen(a) + strlen(client_myds->myconn->userinfo->username));
sprintf(b, a, client_myds->myconn->userinfo->username);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_ERR, this, NULL, b);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_ERR, this, NULL, b);
client_myds->myprot.generate_error_packet(true, false, b, PGSQL_ERROR_CODES::ERRCODE_SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION,
true, true);
free(b);
@ -3545,7 +3554,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
}
if (use_ssl == true && is_encrypted == false) {
*wrong_pass = true;
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_ERR, this, NULL);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_ERR, this, NULL);
char* _a = (char*)"ProxySQL Error: Access denied for user '%s' (using password: %s). SSL is required";
char* _s = (char*)malloc(strlen(_a) + strlen(client_myds->myconn->userinfo->username) + 32);
@ -3562,7 +3571,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
// we are good!
//client_myds->myprot.generate_pkt_OK(true,NULL,NULL, (is_encrypted ? 3 : 2), 0,0,0,0,NULL,false);
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 8, "Session=%p , DS=%p . STATE_CLIENT_AUTH_OK\n", this, client_myds);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_OK, this, NULL);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_OK, this, NULL);
client_myds->myprot.welcome_client();
handshake_err = false;
status = WAITING_CLIENT_DATA;
@ -3634,7 +3643,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
if (client_addr) {
free(client_addr);
}
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_ERR, this, NULL);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_ERR, this, NULL);
__sync_add_and_fetch(&PgHGM->status.client_connections_aborted, 1);
client_myds->DSS = STATE_SLEEP;
}
@ -3724,7 +3733,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0);
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true, NULL, NULL, 1, 0, 0, setStatus, 0, NULL);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_INITDB, this, NULL);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::INITDB, this, NULL);
client_myds->DSS = STATE_SLEEP;
}
else {
@ -3769,7 +3778,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0);
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true, NULL, NULL, 1, 0, 0, setStatus, 0, NULL);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_INITDB, this, NULL);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::INITDB, this, NULL);
client_myds->DSS = STATE_SLEEP;
}
else {
@ -5014,8 +5023,7 @@ void PgSQL_Session::LogQuery(PgSQL_Data_Stream* myds) {
if (qpo) {
if (qpo->log == 1) {
GloPgSQL_Logger->log_request(this, myds); // we send for logging only if logging is enabled for this query
}
else {
} else {
if (qpo->log == -1) {
if (pgsql_thread___eventslog_default_log == 1) {
GloPgSQL_Logger->log_request(this, myds); // we send for logging only if enabled by default
@ -5063,7 +5071,16 @@ void PgSQL_Session::RequestEnd(PgSQL_Data_Stream* myds, bool called_on_failure)
}
}
LogQuery(myds);
switch (status)
{
case PROCESSING_STMT_PREPARE:
case PROCESSING_STMT_DESCRIBE:
case PROCESSING_STMT_EXECUTE:
// already logged
break;
default:
LogQuery(myds);
}
__cleanup:
@ -6373,8 +6390,6 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s
// like 'PROCESSING_STMT_EXECUTE'.
assert(extended_query_info.stmt_client_name);
client_myds->myconn->local_stmts->client_insert(global_stmtid, extended_query_info.stmt_client_name);
LogQuery(myds);
GloPgStmt->unlock();
return false;

@ -2646,7 +2646,7 @@ PgSQL_Thread::~PgSQL_Thread() {
if (sess->session_type == PROXYSQL_SESSION_ADMIN || sess->session_type == PROXYSQL_SESSION_STATS) {
char _buf[1024];
sprintf(_buf, "%s:%d:%s()", __FILE__, __LINE__, __func__);
if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_CLOSE, sess, NULL, _buf); }
if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE, sess, NULL, _buf); }
}
delete sess;
}
@ -3690,7 +3690,7 @@ void PgSQL_Thread::process_all_sessions() {
}
}
sprintf(_buf, "%s:%d:%s()", __FILE__, __LINE__, __func__);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_CLOSE, sess, NULL, _buf);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE, sess, NULL, _buf);
unregister_session(n);
n--;
delete sess;
@ -3705,7 +3705,7 @@ void PgSQL_Thread::process_all_sessions() {
if (sess->client_myds && sess->killed)
proxy_warning("Closing killed client connection %s:%d\n", sess->client_myds->addr.addr, sess->client_myds->addr.port);
sprintf(_buf, "%s:%d:%s()", __FILE__, __LINE__, __func__);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_CLOSE, sess, NULL, _buf);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE, sess, NULL, _buf);
unregister_session(n);
n--;
delete sess;
@ -3720,7 +3720,7 @@ void PgSQL_Thread::process_all_sessions() {
if (sess->client_myds)
proxy_warning("Closing killed client connection %s:%d\n", sess->client_myds->addr.addr, sess->client_myds->addr.port);
sprintf(_buf, "%s:%d:%s()", __FILE__, __LINE__, __func__);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_CLOSE, sess, NULL, _buf);
GloPgSQL_Logger->log_audit_entry(PGSQL_LOG_EVENT_TYPE::AUTH_CLOSE, sess, NULL, _buf);
unregister_session(n);
n--;
delete sess;

Loading…
Cancel
Save