Merge pull request #4680 from sysown/v3.0-postgres_monitor_poc

Initial POC for PostgreSQL monitoring support
pull/4687/head v3.0.0-alpha
Javier Jaramago Fernández 2 years ago committed by GitHub
commit 42bb0cf52a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -585,7 +585,7 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager<PgSQL_HGC> {
/**
* @brief Mutex used to guard 'pgsql_servers_to_monitor' resulset.
*/
std::mutex pgsql_servers_to_monitor_mutex;
std::mutex pgsql_servers_to_monitor_mutex {};
/**
* @brief Resulset containing the latest 'pgsql_servers' present in 'mydb'.
* @details This resulset should be updated via 'update_table_pgsql_servers_for_monitor' each time actions

@ -0,0 +1,71 @@
#ifndef __PGSQL_MONITOR_H
#define __PGSQL_MONITOR_H
#include "libpq-fe.h"
#include "sqlite3db.h"
#include "proxysql_structs.h"
#include <cassert>
#include <mutex>
#include <vector>
#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_CONNECT_LOG "CREATE TABLE pgsql_server_connect_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , connect_success_time_us INT DEFAULT 0 , connect_error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"
#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_PING_LOG "CREATE TABLE pgsql_server_ping_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , ping_success_time_us INT DEFAULT 0 , ping_error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"
#define MONITOR_SQLITE_TABLE_PGSQL_SERVERS "CREATE TABLE pgsql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status INT CHECK (status IN (0, 1, 2, 3, 4)) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , PRIMARY KEY (hostname, port) )"
#define MONITOR_SQLITE_TABLE_PROXYSQL_SERVERS "CREATE TABLE proxysql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 6032 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostname, port) )"
struct PgSQL_Monitor {
// @brief Flags if monitoring threads should be shutdown.
bool shutdown = false;
// @brief Mutex to hold to update `monitor_internal.pgsql_servers`
std::mutex pgsql_srvs_mutex {};
// @brief Mutex to hold to update/read `pgsql_servers` to monitor
std::mutex pgsql_srvs_to_monitor_mutex {};
// @brief Used to access monitor database
SQLite3DB monitordb {};
// @brief Used to access internal monitor database
SQLite3DB monitor_internal_db {};
// Internal counters for metrics
///////////////////////////////////////////////////////////////////////////
uint64_t connect_check_ERR { 0 };
uint64_t connect_check_OK { 0 };
uint64_t ping_check_ERR { 0 };
uint64_t ping_check_OK { 0 };
///////////////////////////////////////////////////////////////////////////
std::vector<table_def_t> tables_defs_monitor {
{
const_cast<char*>("pgsql_server_connect_log"),
const_cast<char*>(MONITOR_SQLITE_TABLE_PGSQL_SERVER_CONNECT_LOG)
},
{
const_cast<char*>("pgsql_server_ping_log"),
const_cast<char*>(MONITOR_SQLITE_TABLE_PGSQL_SERVER_PING_LOG)
}
};
std::vector<table_def_t> tables_defs_monitor_internal {
{
const_cast<char*>("pgsql_servers"),
const_cast<char*>(MONITOR_SQLITE_TABLE_PGSQL_SERVERS)
}
};
PgSQL_Monitor();
};
struct pgsql_conn_t {
PGconn* conn { nullptr };
int fd { 0 };
uint64_t last_used { 0 };
ASYNC_ST state { ASYNC_ST::ASYNC_CONNECT_FAILED };
mf_unique_ptr<char> err {};
};
void* PgSQL_monitor_scheduler_thread();
#endif

@ -6,7 +6,6 @@
#include "proxysql.h"
#include "Base_Thread.h"
#include "cpp.h"
#include "ProxySQL_Poll.h"
#include "PgSQL_Variables.h"
#ifdef IDLE_THREADS
@ -825,6 +824,7 @@ public:
//! Read only check timeout. Unit: 'ms'.
int monitor_replication_lag_timeout;
int monitor_replication_lag_count;
/* TODO: Remove
int monitor_groupreplication_healthcheck_interval;
int monitor_groupreplication_healthcheck_timeout;
int monitor_groupreplication_healthcheck_max_timeout_count;
@ -836,9 +836,13 @@ public:
int monitor_query_interval;
int monitor_query_timeout;
int monitor_slave_lag_when_null;
*/
int monitor_threads;
/* TODO: Remove
int monitor_threads_min;
int monitor_threads_max;
int monitor_threads_queue_maxsize;
*/
int monitor_local_dns_cache_ttl;
int monitor_local_dns_cache_refresh_interval;
int monitor_local_dns_resolver_queue_maxsize;

@ -2,7 +2,6 @@
#define PGSQL_VARIABLES_H
#include "proxysql.h"
#include "cpp.h"
#include <cstdint>
#include <vector>

@ -15,6 +15,7 @@
#include "sqlite3db.h"
//#include "StatCounters.h"
#include "MySQL_Monitor.hpp"
#include "PgSQL_Monitor.hpp"
//#include "MySQL_Protocol.h"
//#include "MySQL_Authentication.hpp"
//#include "MySQL_LDAP_Authentication.hpp"

@ -1,14 +1,3 @@
/*
#ifdef DEBUG
#ifndef DEBUG_EXTERN
#define DEBUG_EXTERN
extern debug_level *gdbg_lvl;
extern int gdbg;
#endif
#endif
*/
#ifndef __PROXYSQL_DEBUG_H
#define __PROXYSQL_DEBUG_H
@ -46,7 +35,6 @@ class Timer {
#ifdef DEBUG
#define PROXY_TRACE() { proxy_debug(PROXY_DEBUG_GENERIC,10,"TRACE\n"); }
//#define PROXY_TRACE2() { proxy_info("TRACE\n"); }
#define PROXY_TRACE2()
#else
#define PROXY_TRACE()
@ -64,7 +52,6 @@ class Timer {
} \
} while (0)
#elif defined(__linux__)
//#ifdef SYS_gettid
#define proxy_debug(module, verbosity, fmt, ...) \
do { if (GloVars.global.gdbg) { \
proxy_debug_func(module, verbosity, syscall(SYS_gettid), __FILE__, __LINE__, __func__ , fmt, ## __VA_ARGS__); \
@ -76,9 +63,6 @@ class Timer {
#define proxy_debug(module, verbosity, fmt, ...)
#endif /* DEBUG */
/*
#ifdef DEBUG
*/
#define proxy_error(fmt, ...) \
do { \
time_t __timer; \
@ -111,23 +95,7 @@ class Timer {
strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", &__tm_info); \
proxy_error_func(0, "%s %s:%d:%s(): [ERROR] " fmt, __buffer, fi, li, fu , ## __VA_ARGS__); \
} while(0)
/*
#else
#define proxy_error(fmt, ...) \
do { \
time_t __timer; \
char __buffer[25]; \
struct tm *__tm_info; \
time(&__timer); \
__tm_info = localtime(&__timer); \
strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \
proxy_error_func("%s [ERROR] " fmt , __buffer , ## __VA_ARGS__); \
} while(0)
#endif
*/
/*
#ifdef DEBUG
*/
#define proxy_warning(fmt, ...) \
do { \
time_t __timer; \
@ -150,20 +118,6 @@ class Timer {
proxy_error_func(ecode, "%s %s:%d:%s(): [WARNING] " fmt, __buffer, __FILE__, __LINE__, __func__ , ## __VA_ARGS__); \
} while(0)
/*
#else
#define proxy_warning(fmt, ...) \
do { \
time_t __timer; \
char __buffer[25]; \
struct tm *__tm_info; \
time(&__timer); \
__tm_info = localtime(&__timer); \
strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \
proxy_error_func("%s [WARNING] " fmt , __buffer , ## __VA_ARGS__); \
} while(0)
#endif
*/
#ifdef DEBUG
#define proxy_info(fmt, ...) \
do { \
@ -211,13 +165,26 @@ class Timer {
#endif
#ifdef DEBUG
//void *debug_logger();
#endif
#define NULL_DB_MSG "The pointer to sqlite3 database is NULL. Cannot get error message."
#define ASSERT_SQLITE_OK(rc, db) \
do { \
if (rc!=SQLITE_OK) { \
proxy_error("SQLite3 error with return code %d. Error message: %s. Shutting down.\n", rc, db?(*proxy_sqlite3_errmsg)(db->get_db()):"The pointer to sqlite3 database is null. Cannot get error message."); \
proxy_error( \
"SQLite3 error. Shutting down rc=%d msg='%s'\n", \
rc, db ? (*proxy_sqlite3_errmsg)(db->get_db()) : NULL_DB_MSG); \
assert(0); \
} \
} while(0)
#define ASSERT_SQLITE3_OK(rc, db) \
do { \
if (rc!=SQLITE_OK) { \
proxy_error( \
"SQLite3 error. Shutting down rc=%d msg='%s'\n", \
rc, db ? (*proxy_sqlite3_errmsg)(db) : NULL_DB_MSG); \
assert(0); \
} \
} while(0)
@ -243,7 +210,7 @@ SQLite3_result* proxysql_get_message_stats(bool reset=false);
*/
void proxysql_init_debug_prometheus_metrics();
class SQLite3DB;
/**
* @brief Set or unset if Admin has debugdb_disk fully initialized
*/
@ -251,4 +218,4 @@ void proxysql_set_admin_debugdb_disk(SQLite3DB *_db);
void proxysql_set_admin_debug_output(unsigned int _do);
#endif
#endif // DEBUG

@ -647,6 +647,7 @@ enum PROXYSQL_MYSQL_ERR {
ER_PROXYSQL_AWS_HEALTH_CHECK_CONN_TIMEOUT = 9017,
ER_PROXYSQL_AWS_HEALTH_CHECK_TIMEOUT = 9018,
ER_PROXYSQL_SRV_NULL_REPLICATION_LAG = 9019,
ER_PROXYSQL_CONNECT_TIMEOUT = 9020,
};
enum proxysql_session_type {
@ -1079,6 +1080,21 @@ __thread char* pgsql_thread___firewall_whitelist_errormsg;
__thread bool pgsql_thread___firewall_whitelist_enabled;
__thread int pgsql_thread___query_processor_iterations;
__thread int pgsql_thread___query_processor_regex;
__thread bool pgsql_thread___monitor_enabled;
__thread int pgsql_thread___monitor_history;
__thread int pgsql_thread___monitor_connect_interval;
__thread int pgsql_thread___monitor_connect_timeout;
__thread int pgsql_thread___monitor_ping_interval;
__thread int pgsql_thread___monitor_ping_max_failures;
__thread int pgsql_thread___monitor_ping_timeout;
__thread int pgsql_thread___monitor_read_only_interval;
__thread int pgsql_thread___monitor_read_only_timeout;
__thread int pgsql_thread___monitor_read_only_max_timeout_count;
__thread int pgsql_thread___monitor_threads;
__thread char* pgsql_thread___monitor_username;
__thread char* pgsql_thread___monitor_password;
//---------------------------
__thread char *mysql_thread___default_schema;
@ -1351,6 +1367,21 @@ extern __thread char* pgsql_thread___firewall_whitelist_errormsg;
extern __thread bool pgsql_thread___firewall_whitelist_enabled;
extern __thread int pgsql_thread___query_processor_iterations;
extern __thread int pgsql_thread___query_processor_regex;
extern __thread bool pgsql_thread___monitor_enabled;
extern __thread int pgsql_thread___monitor_history;
extern __thread int pgsql_thread___monitor_connect_interval;
extern __thread int pgsql_thread___monitor_connect_timeout;
extern __thread int pgsql_thread___monitor_ping_interval;
extern __thread int pgsql_thread___monitor_ping_max_failures;
extern __thread int pgsql_thread___monitor_ping_timeout;
extern __thread int pgsql_thread___monitor_read_only_interval;
extern __thread int pgsql_thread___monitor_read_only_timeout;
extern __thread int pgsql_thread___monitor_read_only_max_timeout_count;
extern __thread int pgsql_thread___monitor_threads;
extern __thread char* pgsql_thread___monitor_username;
extern __thread char* pgsql_thread___monitor_password;
//---------------------------
extern __thread char *mysql_thread___default_schema;

@ -53,6 +53,7 @@ template void Base_HostGroups_Manager<PgSQL_HGC>::wrlock();
template void Base_HostGroups_Manager<PgSQL_HGC>::wrunlock();
template SQLite3_result * Base_HostGroups_Manager<MyHGC>::execute_query(char*, char**);
template SQLite3_result * Base_HostGroups_Manager<PgSQL_HGC>::execute_query(char*, char**);
#if 0
#define SAFE_SQLITE3_STEP(_stmt) do {\

@ -147,7 +147,7 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo
Base_Session.oo Base_Thread.oo \
proxy_protocol_info.oo \
proxysql_find_charset.oo ProxySQL_Poll.oo \
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo PgSQL_Monitor.oo
OBJ_CXX := $(patsubst %,$(ODIR)/%,$(_OBJ_CXX))
HEADERS := ../include/*.h ../include/*.hpp

File diff suppressed because it is too large Load Diff

@ -45,6 +45,7 @@ static PgSQL_Session* sess_stopat;
extern PgSQL_Query_Processor* GloPgQPro;
extern PgSQL_Threads_Handler* GloPTH;
extern MySQL_Monitor* GloMyMon;
extern PgSQL_Monitor* GloPgMon;
extern PgSQL_Logger* GloPgSQL_Logger;
typedef struct mythr_st_vars {
@ -314,6 +315,7 @@ static char* pgsql_thread_variables_names[] = {
(char*)"monitor_ping_interval",
(char*)"monitor_ping_max_failures",
(char*)"monitor_ping_timeout",
/*
(char*)"monitor_aws_rds_topology_discovery_interval",
(char*)"monitor_read_only_interval",
(char*)"monitor_read_only_timeout",
@ -330,12 +332,17 @@ static char* pgsql_thread_variables_names[] = {
(char*)"monitor_galera_healthcheck_interval",
(char*)"monitor_galera_healthcheck_timeout",
(char*)"monitor_galera_healthcheck_max_timeout_count",
*/
(char*)"monitor_username",
(char*)"monitor_password",
/*
(char*)"monitor_replication_lag_use_percona_heartbeat",
(char*)"monitor_query_interval",
(char*)"monitor_query_timeout",
(char*)"monitor_slave_lag_when_null",
*/
(char*)"monitor_threads",
/*
(char*)"monitor_threads_min",
(char*)"monitor_threads_max",
(char*)"monitor_threads_queue_maxsize",
@ -344,6 +351,7 @@ static char* pgsql_thread_variables_names[] = {
(char*)"monitor_local_dns_resolver_queue_maxsize",
(char*)"monitor_wait_timeout",
(char*)"monitor_writer_is_also_reader",
*/
(char*)"max_allowed_packet",
(char*)"tcp_keepalive_time",
(char*)"use_tcp_keepalive",
@ -936,6 +944,7 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() {
variables.monitor_replication_lag_interval = 10000;
variables.monitor_replication_lag_timeout = 1000;
variables.monitor_replication_lag_count = 1;
/* TODO: Remove
variables.monitor_groupreplication_healthcheck_interval = 5000;
variables.monitor_groupreplication_healthcheck_timeout = 800;
variables.monitor_groupreplication_healthcheck_max_timeout_count = 3;
@ -950,12 +959,16 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() {
variables.monitor_threads_min = 8;
variables.monitor_threads_max = 128;
variables.monitor_threads_queue_maxsize = 128;
*/
variables.monitor_threads = 2;
variables.monitor_local_dns_cache_ttl = 300000;
variables.monitor_local_dns_cache_refresh_interval = 60000;
variables.monitor_local_dns_resolver_queue_maxsize = 128;
variables.monitor_username = strdup((char*)"monitor");
variables.monitor_password = strdup((char*)"monitor");
/* TODO: Remove
variables.monitor_replication_lag_use_percona_heartbeat = strdup((char*)"");
*/
variables.monitor_wait_timeout = true;
variables.monitor_writer_is_also_reader = true;
variables.max_allowed_packet = 64 * 1024 * 1024;
@ -1168,7 +1181,9 @@ char* PgSQL_Threads_Handler::get_variable_string(char* name) {
if (!strncmp(name, "monitor_", 8)) {
if (!strcmp(name, "monitor_username")) return strdup(variables.monitor_username);
if (!strcmp(name, "monitor_password")) return strdup(variables.monitor_password);
/*
if (!strcmp(name, "monitor_replication_lag_use_percona_heartbeat")) return strdup(variables.monitor_replication_lag_use_percona_heartbeat);
*/
}
if (!strncmp(name, "ssl_", 4)) {
if (!strcmp(name, "ssl_p2s_ca")) {
@ -1489,7 +1504,9 @@ char* PgSQL_Threads_Handler::get_variable(char* name) { // this is the public fu
if (!strncasecmp(name, "monitor_", 8)) {
if (!strcasecmp(name, "monitor_username")) return strdup(variables.monitor_username);
if (!strcasecmp(name, "monitor_password")) return strdup(variables.monitor_password);
/*
if (!strcasecmp(name, "monitor_replication_lag_use_percona_heartbeat")) return strdup(variables.monitor_replication_lag_use_percona_heartbeat);
*/
}
if (!strcasecmp(name, "threads")) {
sprintf(intbuf, "%d", (num_threads ? num_threads : DEFAULT_NUM_THREADS));
@ -2068,11 +2085,13 @@ char** PgSQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["monitor_ping_timeout"] = make_tuple(&variables.monitor_ping_timeout, 100, 600 * 1000, false);
VariablesPointers_int["monitor_ping_max_failures"] = make_tuple(&variables.monitor_ping_max_failures, 1, 1000 * 1000, false);
/*
VariablesPointers_int["monitor_aws_rds_topology_discovery_interval"] = make_tuple(&variables.monitor_aws_rds_topology_discovery_interval, 1, 100000, false);
*/
VariablesPointers_int["monitor_read_only_interval"] = make_tuple(&variables.monitor_read_only_interval, 100, 7 * 24 * 3600 * 1000, false);
VariablesPointers_int["monitor_read_only_timeout"] = make_tuple(&variables.monitor_read_only_timeout, 100, 600 * 1000, false);
VariablesPointers_int["monitor_read_only_max_timeout_count"] = make_tuple(&variables.monitor_read_only_max_timeout_count, 1, 1000 * 1000, false);
/*
VariablesPointers_int["monitor_replication_lag_interval"] = make_tuple(&variables.monitor_replication_lag_interval, 100, 7 * 24 * 3600 * 1000, false);
VariablesPointers_int["monitor_replication_lag_timeout"] = make_tuple(&variables.monitor_replication_lag_timeout, 100, 600 * 1000, false);
VariablesPointers_int["monitor_replication_lag_count"] = make_tuple(&variables.monitor_replication_lag_count, 1, 10, false);
@ -2089,13 +2108,17 @@ char** PgSQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["monitor_query_interval"] = make_tuple(&variables.monitor_query_interval, 100, 7 * 24 * 3600 * 1000, false);
VariablesPointers_int["monitor_query_timeout"] = make_tuple(&variables.monitor_query_timeout, 100, 600 * 1000, false);
*/
VariablesPointers_int["monitor_threads"] = make_tuple(&variables.monitor_threads, 2, 256, false);
/*
VariablesPointers_int["monitor_threads_min"] = make_tuple(&variables.monitor_threads_min, 2, 256, false);
VariablesPointers_int["monitor_threads_max"] = make_tuple(&variables.monitor_threads_max, 4, 1024, false);
VariablesPointers_int["monitor_slave_lag_when_null"] = make_tuple(&variables.monitor_slave_lag_when_null, 0, 604800, false);
VariablesPointers_int["monitor_threads_queue_maxsize"] = make_tuple(&variables.monitor_threads_queue_maxsize, 16, 1024, false);
VariablesPointers_int["monitor_threads_queue_maxsize"] = make_tuple(&variables.monitor_threads_queue_maxsize, 16, 1024, false);
*/
VariablesPointers_int["monitor_local_dns_cache_ttl"] = make_tuple(&variables.monitor_local_dns_cache_ttl, 0, 7 * 24 * 3600 * 1000, false);
VariablesPointers_int["monitor_local_dns_cache_refresh_interval"] = make_tuple(&variables.monitor_local_dns_cache_refresh_interval, 0, 7 * 24 * 3600 * 1000, false);
VariablesPointers_int["monitor_local_dns_resolver_queue_maxsize"] = make_tuple(&variables.monitor_local_dns_resolver_queue_maxsize, 16, 1024, false);
@ -3783,16 +3806,25 @@ void PgSQL_Thread::refresh_variables() {
mysql_thread___monitor_wait_timeout = (bool)GloPTH->get_variable_int((char*)"monitor_wait_timeout");
mysql_thread___monitor_writer_is_also_reader = (bool)GloPTH->get_variable_int((char*)"monitor_writer_is_also_reader");
mysql_thread___monitor_enabled = (bool)GloPTH->get_variable_int((char*)"monitor_enabled");
mysql_thread___monitor_history = GloPTH->get_variable_int((char*)"monitor_history");
mysql_thread___monitor_connect_interval = GloPTH->get_variable_int((char*)"monitor_connect_interval");
mysql_thread___monitor_connect_timeout = GloPTH->get_variable_int((char*)"monitor_connect_timeout");
mysql_thread___monitor_ping_interval = GloPTH->get_variable_int((char*)"monitor_ping_interval");
mysql_thread___monitor_ping_max_failures = GloPTH->get_variable_int((char*)"monitor_ping_max_failures");
mysql_thread___monitor_ping_timeout = GloPTH->get_variable_int((char*)"monitor_ping_timeout");
*/
pgsql_thread___monitor_enabled = (bool)GloPTH->get_variable_int((char*)"monitor_enabled");
pgsql_thread___monitor_history = GloPTH->get_variable_int((char*)"monitor_history");
pgsql_thread___monitor_connect_interval = GloPTH->get_variable_int((char*)"monitor_connect_interval");
pgsql_thread___monitor_connect_timeout = GloPTH->get_variable_int((char*)"monitor_connect_timeout");
pgsql_thread___monitor_ping_interval = GloPTH->get_variable_int((char*)"monitor_ping_interval");
pgsql_thread___monitor_ping_max_failures = GloPTH->get_variable_int((char*)"monitor_ping_max_failures");
pgsql_thread___monitor_ping_timeout = GloPTH->get_variable_int((char*)"monitor_ping_timeout");
pgsql_thread___monitor_read_only_interval = GloPTH->get_variable_int((char*)"monitor_read_only_interval");
pgsql_thread___monitor_read_only_timeout = GloPTH->get_variable_int((char*)"monitor_read_only_timeout");
pgsql_thread___monitor_threads = GloPTH->get_variable_int((char*)"monitor_threads");
if (pgsql_thread___monitor_username) free(pgsql_thread___monitor_username);
pgsql_thread___monitor_username = GloPTH->get_variable_string((char*)"monitor_username");
if (pgsql_thread___monitor_password) free(pgsql_thread___monitor_password);
pgsql_thread___monitor_password = GloPTH->get_variable_string((char*)"monitor_password");
/*
mysql_thread___monitor_aws_rds_topology_discovery_interval = GloPTH->get_variable_int((char *)"monitor_aws_rds_topology_discovery_interval");
mysql_thread___monitor_read_only_interval = GloPTH->get_variable_int((char*)"monitor_read_only_interval");
mysql_thread___monitor_read_only_timeout = GloPTH->get_variable_int((char*)"monitor_read_only_timeout");
mysql_thread___monitor_read_only_max_timeout_count = GloPTH->get_variable_int((char*)"monitor_read_only_max_timeout_count");
mysql_thread___monitor_replication_lag_group_by_host = (bool)GloPTH->get_variable_int((char*)"monitor_replication_lag_group_by_host");
mysql_thread___monitor_replication_lag_interval = GloPTH->get_variable_int((char*)"monitor_replication_lag_interval");
@ -4377,30 +4409,32 @@ SQLite3_result* PgSQL_Threads_Handler::SQL3_GlobalStatus(bool _memory) {
pta[1] = buf;
result->add_row(pta);
}
*/
{
pta[0] = (char*)"MySQL_Monitor_connect_check_OK";
sprintf(buf, "%llu", GloMyMon->connect_check_OK);
pta[0] = (char*)"PgSQL_Monitor_connect_check_OK";
sprintf(buf, "%lu", GloPgMon->connect_check_OK);
pta[1] = buf;
result->add_row(pta);
}
{
pta[0] = (char*)"MySQL_Monitor_connect_check_ERR";
sprintf(buf, "%llu", GloMyMon->connect_check_ERR);
pta[0] = (char*)"PgSQL_Monitor_connect_check_ERR";
sprintf(buf, "%lu", GloPgMon->connect_check_ERR);
pta[1] = buf;
result->add_row(pta);
}
{
pta[0] = (char*)"MySQL_Monitor_ping_check_OK";
sprintf(buf, "%llu", GloMyMon->ping_check_OK);
pta[0] = (char*)"PgSQL_Monitor_ping_check_OK";
sprintf(buf, "%lu", GloPgMon->ping_check_OK);
pta[1] = buf;
result->add_row(pta);
}
{
pta[0] = (char*)"MySQL_Monitor_ping_check_ERR";
sprintf(buf, "%llu", GloMyMon->ping_check_ERR);
pta[0] = (char*)"PgSQL_Monitor_ping_check_ERR";
sprintf(buf, "%lu", GloPgMon->ping_check_ERR);
pta[1] = buf;
result->add_row(pta);
}
/*
{
pta[0] = (char*)"MySQL_Monitor_read_only_check_OK";
sprintf(buf, "%llu", GloMyMon->read_only_check_OK);

@ -175,4 +175,4 @@ int ProxySQL_Poll<T>::find_index(int fd) {
}
template class ProxySQL_Poll<PgSQL_Data_Stream>;
template class ProxySQL_Poll<MySQL_Data_Stream>;
template class ProxySQL_Poll<MySQL_Data_Stream>;

@ -15,37 +15,16 @@ using std::string;
using std::unordered_map;
#ifdef DEBUG
#ifdef DEBUG_EXTERN
#undef DEBUG_EXTERN
#endif /* DEBUG_EXTERN */
#endif /* DEBUG */
#ifndef CLOCK_MONOTONIC
#define CLOCK_MONOTONIC SYSTEM_CLOCK
#endif // CLOCK_MONOTONIC
#ifdef DEBUG
__thread unsigned long long pretime=0;
static pthread_mutex_t debug_mutex;
static pthread_rwlock_t filters_rwlock;
static SQLite3DB * debugdb_disk = NULL;
sqlite3_stmt *statement1=NULL;
static unsigned int debug_output = 1;
#endif /* DEBUG */
/*
static inline unsigned long long debug_monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
*/
#define DEBUG_MSG_MAXSIZE 1024
#ifdef DEBUG
/**
* @brief Contains all filters related to debug.
* @details The convention for key value is `filename:line:function`. This key structure also applies also
@ -58,7 +37,6 @@ static inline unsigned long long debug_monotonic_time() {
std::set<std::string>* debug_filters = nullptr;
static bool filter_debug_entry(const char *__file, int __line, const char *__func) {
//pthread_mutex_lock(&debug_mutex);
pthread_rwlock_rdlock(&filters_rwlock);
bool to_filter = false;
if (debug_filters && debug_filters->size()) { // if the set is empty we aren't performing any filter, so we won't search
@ -99,7 +77,6 @@ static bool filter_debug_entry(const char *__file, int __line, const char *__fun
}
}
}
//pthread_mutex_unlock(&debug_mutex);
pthread_rwlock_unlock(&filters_rwlock);
return to_filter;
}
@ -107,19 +84,16 @@ static bool filter_debug_entry(const char *__file, int __line, const char *__fun
// we use this function to sent the filters to Admin
// we hold here the lock on filters_rwlock
void proxy_debug_get_filters(std::set<std::string>& f) {
//pthread_mutex_lock(&debug_mutex);
pthread_rwlock_rdlock(&filters_rwlock);
if (debug_filters) {
f = *debug_filters;
}
pthread_rwlock_unlock(&filters_rwlock);
//pthread_mutex_unlock(&debug_mutex);
}
// we use this function to get the filters from Admin
// we hold here the lock on filters_rwlock
void proxy_debug_load_filters(std::set<std::string>& f) {
//pthread_mutex_lock(&debug_mutex);
pthread_rwlock_wrlock(&filters_rwlock);
if (debug_filters) {
debug_filters->erase(debug_filters->begin(), debug_filters->end());
@ -128,11 +102,19 @@ void proxy_debug_load_filters(std::set<std::string>& f) {
debug_filters = new std::set<std::string>(f);
}
pthread_rwlock_unlock(&filters_rwlock);
//pthread_mutex_unlock(&debug_mutex);
}
// REMINDER: This function should always save/restore 'errno', otherwise it could influence error handling.
void proxy_debug_func(enum debug_module module, int verbosity, int thr, const char *__file, int __line, const char *__func, const char *fmt, ...) {
void proxy_debug_func(
enum debug_module module,
int verbosity,
int thr,
const char *__file,
int __line,
const char *__func,
const char *fmt,
...
) {
int saved_errno = errno;
assert(module<PROXY_DEBUG_UNKNOWN);
if (pretime == 0) { // never initialized
@ -147,17 +129,22 @@ void proxy_debug_func(enum debug_module module, int verbosity, int thr, const ch
errno = saved_errno;
return;
}
char origdebugbuff[DEBUG_MSG_MAXSIZE];
char debugbuff[DEBUG_MSG_MAXSIZE];
char longdebugbuff[DEBUG_MSG_MAXSIZE*8];
char longdebugbuff2[DEBUG_MSG_MAXSIZE*8];
longdebugbuff[0]=0;
longdebugbuff2[0]=0;
unsigned long long curtime=realtime_time();
bool write_to_disk = false;
if (debugdb_disk != NULL && (debug_output == 2 || debug_output == 3)) {
write_to_disk = true;
}
if (
GloVars.global.foreground
||
@ -167,7 +154,6 @@ void proxy_debug_func(enum debug_module module, int verbosity, int thr, const ch
va_start(ap, fmt);
vsnprintf(origdebugbuff, DEBUG_MSG_MAXSIZE,fmt,ap);
va_end(ap);
//fprintf(stderr, "%d:%s:%d:%s(): MOD#%d LVL#%d : %s" , thr, __file, __line, __func, module, verbosity, debugbuff);
sprintf(longdebugbuff, "%llu(%llu): %d:%s:%d:%s(): MOD#%d#%s LVL#%d : %s" , curtime, curtime-pretime, thr, __file, __line, __func, module, GloVars.global.gdbg_lvl[module].name, verbosity, origdebugbuff);
}
#ifdef __GLIBC__
@ -176,14 +162,12 @@ void proxy_debug_func(enum debug_module module, int verbosity, int thr, const ch
char **strings;
int s;
s = backtrace(arr, 20);
//backtrace_symbols_fd(arr, s, STDERR_FILENO);
strings=backtrace_symbols(arr,s);
if (strings == NULL) {
perror("backtrace_symbols");
exit(EXIT_FAILURE);
}
for (int i=0; i<s; i++) {
//printf("%s\n", strings[i]);
debugbuff[0]=0;
sscanf(strings[i], "%*[^(](%100[^+]", debugbuff);
int status;
@ -194,11 +178,7 @@ void proxy_debug_func(enum debug_module module, int verbosity, int thr, const ch
strcat(longdebugbuff2,debugbuff);
}
}
//printf("\n");
//strcat(longdebugbuff2,"\n");
free(strings);
// } else {
// fprintf(stderr, "%s", longdebugbuff);
}
#endif
pthread_mutex_lock(&debug_mutex);
@ -518,4 +498,5 @@ void proxysql_set_admin_debugdb_disk(SQLite3DB * _db) {
void proxysql_set_admin_debug_output(unsigned int _do) {
debug_output = _do;
}
#endif /* DEBUG */

@ -70,8 +70,6 @@ void SetParser::set_query(const std::string& nq) {
#define SESSION_P1 "(?:|SESSION +|@@|@@session.|@@local.)"
#define VAR_P1 "`?(@\\w+|\\w+)`?"
//#define VAR_VALUE "((?:[\\w/\\d:\\+\\-]|,)+)"
//#define VAR_VALUE "((?:CONCAT\\((?:(REPLACE|CONCAT)\\()+@@sql_mode,(?:(?:'|\\w|,| |\"|\\))+(?:\\)))|(?:[@\\w/\\d:\\+\\-]|,)+|(?:)))"
// added (?:[\\w]+=(?:on|off)|,)+ for optimizer_switch
#define VAR_VALUE_P1_1 "(?:\\()*(?:SELECT)*(?: )*(?:CONCAT\\()*(?:(?:(?: )*REPLACE|IFNULL|CONCAT)\\()+(?: )*(?:NULL|@OLD_SQL_MODE|@@SQL_MODE),(?:(?:'|\\w|,| |\"|\\))+(?:\\))*)(?:\\))"
@ -150,17 +148,6 @@ VALGRIND_ENABLE_ERROR_REPORTING;
return result;
}
/*
#define VAR_VALUE_P1_1 "(?:\\()*(?:SELECT)*(?: )*(?:CONCAT\\()*(?:(?:(?: )*REPLACE|IFNULL|CONCAT)\\()+(?: )*(?:NULL|@OLD_SQL_MODE|@@SQL_MODE),(?:(?:'|\\w|,| |\"|\\))+(?:\\))*)(?:\\))"
#define VAR_VALUE_P1_2 "|(?:NULL)"
#define VAR_VALUE_P1_3 "|(?:[\\w]+=(?:on|off)|,)+"
#define VAR_VALUE_P1_4 "|(?:[@\\w/\\d:\\+\\-]|,)+"
#define VAR_VALUE_P1_5 "|(?:(?:'{1}|\"{1})(?:)(?:'{1}|\"{1}))"
#define VAR_VALUE_P1_6 "|(?: )+"
#define VAR_VALUE_P1 "(" VAR_VALUE_P1_1 VAR_VALUE_P1_2 VAR_VALUE_P1_3 VAR_VALUE_P1_4 VAR_VALUE_P1_5 VAR_VALUE_P1_6 ")"
*/
void SetParser::generateRE_parse1v2() {
vector<string> quote_symbol = {"\"", "'", "`"};
vector<string> var_patterns = {};
@ -198,19 +185,9 @@ void SetParser::generateRE_parse1v2() {
string vp = "NULL"; // NULL
var_patterns.push_back(vp);
//vp = "\\w+"; // single word
//var_patterns.push_back(vp);
{
string vp0 = "(?:\\w|\\d)+"; // single word with letters and digits , for example utf8mb4 and latin1
//var_patterns.push_back(vp);
/*
string vp1 = "(?:" + vp0 + "(?:," + vp0 + ")*)"; // multiple words (letters and digits) separated by commas WITHOUT any spaces between words . Used also for sql_mode , example: ONLY_FULL_GROUP_BY,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO
//var_patterns.push_back(vp1); // do NOT add without quote
for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) {
string s = *it + vp1 + *it;
var_patterns.push_back(s); // add with quote
}
*/
string vp2 = "(?:" + vp0 + "(?:-" + vp0 + ")*)"; // multiple words (letters and digits) separated by dash, WITHOUT any spaces between words . Used also for transaction isolation
var_patterns.push_back(vp2);
for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) {
@ -218,12 +195,6 @@ void SetParser::generateRE_parse1v2() {
var_patterns.push_back(s); // add with quote
}
}
//vp = "(?:\\w|\\d)+(?:-|\\w|\\d+)*"; // multiple words (letters and digits) separated by dash, WITHOUT any spaces between words . Used ialso for transaction isolation
//var_patterns.push_back(vp);
// for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) {
// string s = *it + vp + *it;
// var_patterns.push_back(s); // add with quote
// }
vp = "\\w+(?:,\\w+)+"; // multiple words separated by commas, WITHOUT any spaces between words
// NOTE: we do not use multiple words without quotes
@ -268,7 +239,6 @@ void SetParser::generateRE_parse1v2() {
vp = "(?:| *(?:\\+|\\-) *)\\d+(?:|\\.\\d+)"; // a signed or unsigned integer or decimal , N7 = merge of N3 and N6
var_patterns.push_back(vp);
{
// time_zone in numeric format:
// - +/- sign
@ -297,8 +267,6 @@ void SetParser::generateRE_parse1v2() {
var_patterns.push_back(s); // add with quote
}
string var_value = "(";
for (auto it = var_patterns.begin(); it != var_patterns.end(); it++) {
string s = "(?:" + *it + ")";
@ -317,9 +285,6 @@ void SetParser::generateRE_parse1v2() {
parse1v2_opt2->set_case_sensitive(false);
parse1v2_opt2->set_longest_match(false);
string var_1_0 = "(?:@\\w+|\\w+)"; // @name|name
string var_1 = "(" + var_1_0 + "|`" + var_1_0 + "`)"; // var_1_0|`var_1_0`
var_1 = SESSION_P1 + var_1;
@ -328,9 +293,6 @@ void SetParser::generateRE_parse1v2() {
string name_value = "(";
for (auto it = quote_symbol.begin(); it != quote_symbol.end(); it++) {
string s = "(?:" + *it + charset_name + *it + ")";
//auto it2 = it;
//it2++;
//if (it2 != quote_symbol.end())
s += "|";
name_value += s;
}
@ -344,22 +306,7 @@ void SetParser::generateRE_parse1v2() {
}
#endif
#ifdef PARSERDEBUG
// delete opt2;
// return result;
#endif
/*
#define QUOTES "(?:'|\"|`)?"
#define SPACES " *"
#define NAMES "(NAMES)"
#define NAME_VALUE "((?:\\w|\\d)+)"
*/
//const std::string pattern="(?:" NAMES SPACES QUOTES NAME_VALUE QUOTES "(?: +COLLATE +" QUOTES NAME_VALUE QUOTES "|)" "|" SESSION_P1 VAR_P1 SPACES "(?:|:)=" SPACES QUOTES VAR_VALUE_P1 QUOTES ") *,? *";
const std::string pattern="(?:" NAMES SPACES + name_value + "(?: +COLLATE +" + name_value + "|)" "|" + var_1 + SPACES "(?:|:)=" SPACES + var_value + ") *,? *";
//const std::string pattern=var_1 + SPACES "(?:|:)=" SPACES + var_value;
#ifdef DEBUG
VALGRIND_DISABLE_ERROR_REPORTING;
#endif // DEBUG
@ -368,7 +315,6 @@ VALGRIND_DISABLE_ERROR_REPORTING;
cout << pattern << endl;
}
#endif
//re2::RE2 re(pattern, *opt2);
parse1v2_pattern = pattern;
parse1v2_re = new re2::RE2(parse1v2_pattern, *parse1v2_opt2);
parse1v2_init = true;
@ -460,16 +406,8 @@ std::map<std::string,std::vector<std::string>> SetParser::parse2() {
std::map<std::string,std::vector<std::string>> result;
// regex used:
// SET(?: +)(|SESSION +)TRANSACTION(?: +)(?:(?:(ISOLATION(?: +)LEVEL)(?: +)(REPEATABLE(?: +)READ|READ(?: +)COMMITTED|READ(?: +)UNCOMMITTED|SERIALIZABLE))|(?:(READ)(?: +)(WRITE|ONLY)))
/*
#define SESSION_P2 "(|SESSION)"
#define VAR_P2 "(ISOLATION LEVEL|READ)"
//#define VAR_VALUE "((?:[\\w/\\d:\\+\\-]|,)+)"
//#define VAR_VALUE "((?:CONCAT\\((?:(REPLACE|CONCAT)\\()+@@sql_mode,(?:(?:'|\\w|,| |\"|\\))+(?:\\)))|(?:[@\\w/\\d:\\+\\-]|,)+|(?:)))"
#define VAR_VALUE_P2 "(((?:CONCAT\\()*(?:((?: )*REPLACE|IFNULL|CONCAT)\\()+(?: )*(?:NULL|@OLD_SQL_MODE|@@sql_mode),(?:(?:'|\\w|,| |\"|\\))+(?:\\))*)|(?:[@\\w/\\d:\\+\\-]|,)+|(?:)))"
*/
//const std::string pattern="(?:" NAMES SPACES QUOTES NAME_VALUE QUOTES "(?: +COLLATE +" QUOTES NAME_VALUE QUOTES "|)" "|" SESSION_P1 VAR_P1 SPACES "(?:|:)=" SPACES QUOTES VAR_VALUE_P1 QUOTES ") *,? *";
// Regex used:
// SET(?: +)(|SESSION +)TRANSACTION(?: +)(?:(?:(ISOLATION(?: +)LEVEL)(?: +)(REPEATABLE(?: +)READ|READ(?: +)COMMITTED|READ(?: +)UNCOMMITTED|SERIALIZABLE))|(?:(READ)(?: +)(WRITE|ONLY)))
const std::string pattern="(|SESSION) *TRANSACTION(?: +)(?:(?:(ISOLATION(?: +)LEVEL)(?: +)(REPEATABLE(?: +)READ|READ(?: +)COMMITTED|READ(?: +)UNCOMMITTED|SERIALIZABLE))|(?:(READ)(?: +)(WRITE|ONLY)))";
re2::RE2 re(pattern, *opt2);
std::string var;

@ -34,6 +34,7 @@ using json = nlohmann::json;
#include "proxysql_restapi.h"
#include "Web_Interface.hpp"
#include "proxysql_utils.h"
#include "PgSQL_Monitor.hpp"
#include "libdaemon/dfork.h"
#include "libdaemon/dsignal.h"
@ -82,6 +83,7 @@ void * __mysql_ldap_auth;
volatile create_Web_Interface_t * create_Web_Interface = NULL;
void * __web_interface;
std::thread* pgsql_monitor_thread = nullptr;
extern int ProxySQL_create_or_load_TLS(bool bootstrap, std::string& msg);
@ -458,6 +460,7 @@ Web_Interface *GloWebInterface;
MySQL_STMT_Manager_v14 *GloMyStmt;
MySQL_Monitor *GloMyMon;
PgSQL_Monitor *GloPgMon;
std::thread *MyMon_thread = NULL;
MySQL_Logger *GloMyLogger;
@ -1007,7 +1010,9 @@ void ProxySQL_Main_join_all_threads() {
if (GloMyMon) {
GloMyMon->shutdown=true;
}
if (GloPgMon) {
GloPgMon->shutdown=true;
}
// join GloMyMon thread
if (GloMyMon && MyMon_thread) {
cpu_timer t;
@ -1018,7 +1023,15 @@ void ProxySQL_Main_join_all_threads() {
std::cerr << "GloMyMon joined in ";
#endif
}
if (GloPgMon && pgsql_monitor_thread) {
cpu_timer t;
pgsql_monitor_thread->join();
delete pgsql_monitor_thread;
pgsql_monitor_thread = NULL;
#ifdef DEBUG
std::cerr << "GloPgMon joined in ";
#endif
}
// join GloQC thread
if (GloQC) {
cpu_timer t;
@ -1041,7 +1054,14 @@ void ProxySQL_Main_shutdown_all_modules() {
std::cerr << "GloMyMon shutdown in ";
#endif
}
if (GloPgMon) {
cpu_timer t;
delete GloPgMon;
GloPgMon=NULL;
#ifdef DEBUG
std::cerr << "GloPgMon shutdown in ";
#endif
}
if (GloQC) {
cpu_timer t;
delete GloQC;
@ -1302,7 +1322,6 @@ void ProxySQL_Main_init_phase2___not_started(const bootstrap_info_t& boostrap_in
}
}
void ProxySQL_Main_init_phase3___start_all() {
{
@ -1323,6 +1342,7 @@ void ProxySQL_Main_init_phase3___start_all() {
}
// Initialized monitor, no matter if it will be started or not
GloMyMon = new MySQL_Monitor();
GloPgMon = new PgSQL_Monitor();
// load all mysql servers to GloHGH
{
cpu_timer t;
@ -1427,6 +1447,8 @@ void ProxySQL_Main_init_phase3___start_all() {
// Load the config not previously loaded for these modules
GloAdmin->load_http_server();
GloAdmin->load_restapi_server();
pgsql_monitor_thread = new std::thread(&PgSQL_monitor_scheduler_thread);
}

Loading…
Cancel
Save