mirror of https://github.com/sysown/proxysql
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2282 lines
66 KiB
2282 lines
66 KiB
#include "PgSQL_HostGroups_Manager.h"
|
|
#include "PgSQL_Monitor.hpp"
|
|
#include "PgSQL_Thread.h"
|
|
|
|
#include "gen_utils.h"
|
|
|
|
#include <pthread.h>
|
|
#include <poll.h>
|
|
|
|
#include <cassert>
|
|
#include <cstdlib>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <queue>
|
|
#include <stdint.h>
|
|
#include <utility>
|
|
#include <vector>
|
|
#include <list>
|
|
|
|
using std::function;
|
|
using std::unique_ptr;
|
|
using std::vector;
|
|
using std::list;
|
|
|
|
extern PgSQL_Monitor* GloPgMon;
|
|
extern PgSQL_Threads_Handler* GloPTH;
|
|
|
|
/**
|
|
* @brief Used for performing the PING operation.
|
|
* @details Direct use of 'libpq' isn't possible (creates new conns).
|
|
*/
|
|
const char PING_QUERY[] { "" };
|
|
/**
|
|
* @brief Used to detect if server is a replica in 'hot_standby'.
|
|
* @details If the server is not in this mode would be assumed to be a primary.
|
|
*/
|
|
const char READ_ONLY_QUERY[] { "SELECT pg_is_in_recovery()" };
|
|
|
|
template <typename T>
|
|
void append(std::vector<T>& dest, std::vector<T>&& src) {
|
|
dest.insert(dest.end(),
|
|
std::make_move_iterator(src.begin()),
|
|
std::make_move_iterator(src.end())
|
|
);
|
|
}
|
|
|
|
/**
|
|
* @brief Only responsive servers are eligible for monitoring actions.
|
|
* @details Non-suitable is determined by 'ping_max_failures'.
|
|
*/
|
|
const char RESP_SERVERS_QUERY_T[] {
|
|
"SELECT 1 FROM ("
|
|
"SELECT hostname,port,ping_error FROM pgsql_server_ping_log"
|
|
" WHERE hostname='%s' AND port=%d"
|
|
" ORDER BY time_start_us DESC LIMIT %d"
|
|
") a WHERE"
|
|
" ping_error IS NOT NULL"
|
|
" AND ping_error NOT LIKE '%%password authentication failed for user%%'"
|
|
" GROUP BY hostname,port HAVING COUNT(*)=%d"
|
|
};
|
|
|
|
/**
|
|
* @brief Checks if a server is responsive (suitable for other monitoring ops).
|
|
* @param db The monitor DB against to perform the query.
|
|
* @param addr The server address.
|
|
* @param port The server port.
|
|
* @param max_fails Maximum number of failures to consider the server non-suitable.
|
|
* @return True if the server is suitable, false otherwise.
|
|
*/
|
|
bool server_responds_to_ping(SQLite3DB& db, const char* addr, int port, int max_fails) {
|
|
cfmt_t q_fmt { cstr_format(RESP_SERVERS_QUERY_T, addr, port, max_fails, max_fails) };
|
|
|
|
char* err { nullptr };
|
|
unique_ptr<SQLite3_result> result { db.execute_statement(q_fmt.str.c_str(), &err) };
|
|
|
|
if (err || result == nullptr) {
|
|
proxy_error(
|
|
"Internal error querying 'pgsql_server_ping_log'. Aborting query=%s error='%s'\n",
|
|
q_fmt.str.c_str(), err
|
|
);
|
|
free(err);
|
|
assert(0);
|
|
} else {
|
|
return !result->rows_count;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Helper function for building the tables for the monitoring DB.
|
|
* @param db The monitor DB in which to create the tables.
|
|
* @param tables_defs The definitions of the tables to be created.
|
|
*/
|
|
void check_and_build_standard_tables(SQLite3DB& db, const vector<table_def_t>& tables_defs) {
|
|
db.execute("PRAGMA foreign_keys = OFF");
|
|
|
|
for (const auto& def : tables_defs) {
|
|
db.check_and_build_table(def.table_name, def.table_def);
|
|
}
|
|
|
|
db.execute("PRAGMA foreign_keys = ON");
|
|
}
|
|
|
|
PgSQL_Monitor::PgSQL_Monitor() {
|
|
int rc = monitordb.open(
|
|
const_cast<char*>("file:mem_monitordb?mode=memory&cache=shared"),
|
|
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX
|
|
);
|
|
assert(rc == 0 && "Failed to open 'monitordb' for PgSQL Monitor");
|
|
|
|
rc = monitor_internal_db.open(
|
|
const_cast<char*>("file:mem_monitor_internal_db?mode=memory&cache=shared"),
|
|
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX
|
|
);
|
|
assert(rc == 0 && "Failed to open 'internal_monitordb' for PgSQL Monitor");
|
|
|
|
rc = monitordb.execute(
|
|
"ATTACH DATABASE 'file:mem_monitor_internal_db?mode=memory&cache=shared' AS 'monitor_internal'"
|
|
);
|
|
assert(rc == 1 && "Failed to attach 'monitor_internal' for PgSQL Monitor");
|
|
|
|
check_and_build_standard_tables(this->monitordb, this->tables_defs_monitor);
|
|
check_and_build_standard_tables(this->monitor_internal_db, this->tables_defs_monitor_internal);
|
|
|
|
// Explicit index creation
|
|
monitordb.execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON pgsql_server_connect_log (time_start_us)");
|
|
monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON pgsql_server_ping_log (time_start_us)");
|
|
monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_2 ON pgsql_server_ping_log (hostname, port, time_start_us)");
|
|
}
|
|
|
|
/**
|
|
* @brief Initializes the structures related with a PgSQL_Thread.
|
|
* @details It doesn't initialize a real thread, just the structures associated with it.
|
|
* @return The created and initialized 'PgSQL_Thread'.
|
|
*/
|
|
unique_ptr<PgSQL_Thread> init_pgsql_thread_struct() {
|
|
unique_ptr<PgSQL_Thread> pgsql_thr { new PgSQL_Thread() };
|
|
pgsql_thr->curtime = monotonic_time();
|
|
pgsql_thr->refresh_variables();
|
|
|
|
return pgsql_thr;
|
|
}
|
|
|
|
// Helper function for binding text
|
|
void sqlite_bind_text(sqlite3_stmt* stmt, int index, const char* text) {
|
|
int rc = (*proxy_sqlite3_bind_text)(stmt, index, text, -1, SQLITE_TRANSIENT);
|
|
ASSERT_SQLITE3_OK(rc, (*proxy_sqlite3_db_handle)(stmt));
|
|
}
|
|
|
|
// Helper function for binding integers
|
|
void sqlite_bind_int(sqlite3_stmt* stmt, int index, int value) {
|
|
int rc = (*proxy_sqlite3_bind_int)(stmt, index, value);
|
|
ASSERT_SQLITE3_OK(rc, (*proxy_sqlite3_db_handle)(stmt));
|
|
}
|
|
|
|
// Helper function for binding 64-bit integers
|
|
void sqlite_bind_int64(sqlite3_stmt* stmt, int index, long long value) {
|
|
int rc = (*proxy_sqlite3_bind_int64)(stmt, index, value);
|
|
ASSERT_SQLITE3_OK(rc, (*proxy_sqlite3_db_handle)(stmt));
|
|
}
|
|
|
|
void sqlite_bind_null(sqlite3_stmt* stmt, int index) {
|
|
int rc = (*proxy_sqlite3_bind_null)(stmt, index);
|
|
ASSERT_SQLITE3_OK(rc, (*proxy_sqlite3_db_handle)(stmt));
|
|
}
|
|
|
|
// Helper function for executing a statement
|
|
int sqlite_execute_statement(sqlite3_stmt* stmt) {
|
|
int rc = 0;
|
|
|
|
do {
|
|
rc = (*proxy_sqlite3_step)(stmt);
|
|
if (rc == SQLITE_LOCKED || rc == SQLITE_BUSY) {
|
|
usleep(100);
|
|
}
|
|
} while (rc == SQLITE_LOCKED || rc == SQLITE_BUSY);
|
|
|
|
return rc;
|
|
}
|
|
|
|
// Helper function for clearing bindings
|
|
void sqlite_clear_bindings(sqlite3_stmt* stmt) {
|
|
int rc = (*proxy_sqlite3_clear_bindings)(stmt);
|
|
ASSERT_SQLITE3_OK(rc, (*proxy_sqlite3_db_handle)(stmt));
|
|
}
|
|
|
|
// Helper function for resetting a statement
|
|
void sqlite_reset_statement(sqlite3_stmt* stmt) {
|
|
int rc = (*proxy_sqlite3_reset)(stmt);
|
|
ASSERT_SQLITE3_OK(rc, (*proxy_sqlite3_db_handle)(stmt));
|
|
}
|
|
|
|
// Helper function for finalizing a statement
|
|
void sqlite_finalize_statement(sqlite3_stmt* stmt) {
|
|
(*proxy_sqlite3_finalize)(stmt);
|
|
}
|
|
|
|
unique_ptr<SQLite3_result> sqlite_fetch_and_clear(sqlite3_stmt* stmt) {
|
|
unique_ptr<SQLite3_result> result { new SQLite3_result(stmt) };
|
|
|
|
sqlite_clear_bindings(stmt);
|
|
sqlite_reset_statement(stmt);
|
|
|
|
return result;
|
|
}
|
|
|
|
void update_monitor_pgsql_servers(SQLite3_result* rs, SQLite3DB* db) {
|
|
std::lock_guard<std::mutex> monitor_db_guard { GloPgMon->pgsql_srvs_mutex };
|
|
|
|
if (rs != nullptr) {
|
|
db->execute("DELETE FROM monitor_internal.pgsql_servers");
|
|
|
|
sqlite3_stmt* stmt1 = nullptr;
|
|
int rc = db->prepare_v2(
|
|
"INSERT INTO monitor_internal.pgsql_servers VALUES (?, ?, ?, ?)", &stmt1
|
|
);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
|
|
sqlite3_stmt* stmt32 = nullptr;
|
|
rc = db->prepare_v2(
|
|
("INSERT INTO monitor_internal.pgsql_servers VALUES " +
|
|
generate_multi_rows_query(32, 4)).c_str(),
|
|
&stmt32
|
|
);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
|
|
// Iterate through rows
|
|
int row_idx = 0;
|
|
int max_bulk_row_idx = (rs->rows_count / 32) * 32;
|
|
for (const auto& r1 : rs->rows) {
|
|
int idx = row_idx % 32;
|
|
|
|
if (row_idx < max_bulk_row_idx) { // Bulk insert
|
|
sqlite_bind_text(stmt32, (idx * 4) + 1, r1->fields[0]);
|
|
sqlite_bind_int64(stmt32, (idx * 4) + 2, std::atoll(r1->fields[1]));
|
|
sqlite_bind_int64(stmt32, (idx * 4) + 3, std::atoll(r1->fields[2]));
|
|
sqlite_bind_int64(stmt32, (idx * 4) + 4, std::atoll(r1->fields[3]));
|
|
|
|
if (idx == 31) {
|
|
sqlite_execute_statement(stmt32);
|
|
sqlite_clear_bindings(stmt32);
|
|
sqlite_reset_statement(stmt32);
|
|
}
|
|
} else { // Single row insert
|
|
sqlite_bind_text(stmt1, 1, r1->fields[0]);
|
|
sqlite_bind_int64(stmt1, 2, std::atoll(r1->fields[1]));
|
|
sqlite_bind_int64(stmt1, 3, std::atoll(r1->fields[2]));
|
|
sqlite_bind_int64(stmt1, 4, std::atoll(r1->fields[3]));
|
|
|
|
sqlite_execute_statement(stmt1);
|
|
sqlite_clear_bindings(stmt1);
|
|
sqlite_reset_statement(stmt1);
|
|
}
|
|
|
|
row_idx++;
|
|
}
|
|
|
|
// Finalize statements
|
|
sqlite_finalize_statement(stmt1);
|
|
sqlite_finalize_statement(stmt32);
|
|
}
|
|
}
|
|
|
|
enum class task_type_t { ping, connect, readonly };
|
|
|
|
struct mon_srv_t {
|
|
string addr;
|
|
uint16_t port;
|
|
bool ssl;
|
|
struct ssl_opts_t {
|
|
string ssl_p2s_key;
|
|
string ssl_p2s_cert;
|
|
string ssl_p2s_ca;
|
|
string ssl_p2s_crl;
|
|
string ssl_p2s_crlpath;
|
|
} ssl_opt;
|
|
};
|
|
|
|
struct mon_user_t {
|
|
string user;
|
|
string pass;
|
|
string dbname;
|
|
};
|
|
|
|
struct ping_params_t {
|
|
int32_t interval;
|
|
double interval_window;
|
|
int32_t timeout;
|
|
int32_t max_failures;
|
|
};
|
|
|
|
struct readonly_res_t {
|
|
int32_t val;
|
|
};
|
|
|
|
struct ping_conf_t {
|
|
unique_ptr<SQLite3_result> srvs_info;
|
|
ping_params_t params;
|
|
};
|
|
|
|
struct connect_params_t {
|
|
int32_t interval;
|
|
double interval_window;
|
|
int32_t timeout;
|
|
int32_t ping_max_failures;
|
|
int32_t ping_interval;
|
|
};
|
|
|
|
struct connect_conf_t {
|
|
unique_ptr<SQLite3_result> srvs_info;
|
|
connect_params_t params;
|
|
};
|
|
|
|
struct readonly_params_t {
|
|
int32_t interval;
|
|
double interval_window;
|
|
int32_t timeout;
|
|
int32_t max_timeout_count;
|
|
int32_t ping_max_failures;
|
|
int32_t ping_interval;
|
|
bool writer_is_also_reader;
|
|
};
|
|
|
|
struct readonly_conf_t {
|
|
unique_ptr<SQLite3_result> srvs_info;
|
|
readonly_params_t params;
|
|
};
|
|
|
|
struct tasks_conf_t {
|
|
ping_conf_t ping;
|
|
connect_conf_t connect;
|
|
readonly_conf_t readonly;
|
|
mon_user_t user_info;
|
|
};
|
|
|
|
unique_ptr<SQLite3_result> fetch_mon_srvs_conf(PgSQL_Monitor* mon, const char query[]) {
|
|
char* err = nullptr;
|
|
unique_ptr<SQLite3_result> srvs { mon->monitordb.execute_statement(query, &err) };
|
|
|
|
if (err) {
|
|
proxy_error("SQLite3 error. Shutting down msg=%s\n", err);
|
|
free(err);
|
|
assert(0);
|
|
}
|
|
|
|
return srvs;
|
|
}
|
|
|
|
unique_ptr<SQLite3_result> fetch_hgm_srvs_conf(PgSQL_HostGroups_Manager* hgm, const char query[]) {
|
|
char* err = nullptr;
|
|
unique_ptr<SQLite3_result> srvs { hgm->execute_query(const_cast<char*>(query), &err) };
|
|
|
|
if (err) {
|
|
proxy_error("SQLite3 error. Shutting down msg=%s\n", err);
|
|
free(err);
|
|
assert(0);
|
|
}
|
|
|
|
return srvs;
|
|
}
|
|
|
|
vector<mon_srv_t> ext_srvs(const unique_ptr<SQLite3_result>& srvs_info) {
|
|
vector<mon_srv_t> srvs {};
|
|
srvs.reserve(srvs_info->rows.size());
|
|
for (const auto& row : srvs_info->rows) {
|
|
srvs.push_back({
|
|
string { row->fields[0] },
|
|
static_cast<uint16_t>(std::atoi(row->fields[1])),
|
|
static_cast<bool>(std::atoi(row->fields[2])),
|
|
mon_srv_t::ssl_opts_t {
|
|
string { pgsql_thread___ssl_p2s_key ? pgsql_thread___ssl_p2s_key : ""},
|
|
string { pgsql_thread___ssl_p2s_cert ? pgsql_thread___ssl_p2s_cert : "" },
|
|
string { pgsql_thread___ssl_p2s_ca ? pgsql_thread___ssl_p2s_ca : "" },
|
|
string { pgsql_thread___ssl_p2s_crl ? pgsql_thread___ssl_p2s_crl : "" },
|
|
string { pgsql_thread___ssl_p2s_crlpath ? pgsql_thread___ssl_p2s_crlpath : ""}
|
|
}
|
|
});
|
|
}
|
|
return srvs;
|
|
}
|
|
|
|
/**
|
|
* @brief Fetches updated config to be used in the current monitoring interval.
|
|
* @param mon Pointer to 'PgSQL_Monitor' module instance.
|
|
* @param hgm Pointer to 'PgSQL_HostGroups_Manager' module instance.
|
|
* @return Updated config to be used for interval tasks.
|
|
*/
|
|
tasks_conf_t fetch_updated_conf(PgSQL_Monitor* mon, PgSQL_HostGroups_Manager* hgm) {
|
|
// Update the 'monitor_internal.pgsql_servers' servers info.
|
|
{
|
|
try {
|
|
std::lock_guard<std::mutex> pgsql_srvs_guard(hgm->pgsql_servers_to_monitor_mutex);
|
|
update_monitor_pgsql_servers(hgm->pgsql_servers_to_monitor, &mon->monitordb);
|
|
} catch (const std::exception& e) {
|
|
proxy_error("Exception e=%s\n", e.what());
|
|
}
|
|
}
|
|
|
|
unique_ptr<SQLite3_result> ping_srvrs { fetch_mon_srvs_conf(mon,
|
|
"SELECT hostname, port, MAX(use_ssl) use_ssl FROM monitor_internal.pgsql_servers"
|
|
" GROUP BY hostname, port ORDER BY RANDOM()"
|
|
)};
|
|
|
|
unique_ptr<SQLite3_result> connect_srvrs { fetch_mon_srvs_conf(mon,
|
|
"SELECT hostname, port, MAX(use_ssl) use_ssl FROM monitor_internal.pgsql_servers"
|
|
" GROUP BY hostname, port ORDER BY RANDOM()"
|
|
)};
|
|
|
|
unique_ptr<SQLite3_result> readonly_srvs { fetch_hgm_srvs_conf(hgm,
|
|
"SELECT hostname, port, MAX(use_ssl) use_ssl, check_type, reader_hostgroup"
|
|
" FROM pgsql_servers JOIN pgsql_replication_hostgroups"
|
|
" ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup"
|
|
" WHERE status NOT IN (2,3) GROUP BY hostname, port ORDER BY RANDOM()"
|
|
)};
|
|
|
|
|
|
return tasks_conf_t {
|
|
ping_conf_t {
|
|
std::move(ping_srvrs),
|
|
ping_params_t {
|
|
pgsql_thread___monitor_ping_interval * 1000,
|
|
pgsql_thread___monitor_ping_interval_window / 100.0,
|
|
pgsql_thread___monitor_ping_timeout * 1000,
|
|
pgsql_thread___monitor_ping_max_failures
|
|
}
|
|
},
|
|
connect_conf_t {
|
|
std::move(connect_srvrs),
|
|
connect_params_t {
|
|
pgsql_thread___monitor_connect_interval * 1000,
|
|
pgsql_thread___monitor_connect_interval_window / 100.0,
|
|
pgsql_thread___monitor_connect_timeout * 1000,
|
|
// TODO: Revisit this logic; For now identical to previous
|
|
// - Used for server responsiveness
|
|
pgsql_thread___monitor_ping_max_failures,
|
|
// - Used for connection cleanup
|
|
pgsql_thread___monitor_ping_interval * 1000
|
|
}
|
|
},
|
|
readonly_conf_t {
|
|
std::move(readonly_srvs),
|
|
readonly_params_t {
|
|
pgsql_thread___monitor_read_only_interval * 1000,
|
|
pgsql_thread___monitor_read_only_interval_window / 100.0,
|
|
pgsql_thread___monitor_read_only_timeout * 1000,
|
|
pgsql_thread___monitor_read_only_max_timeout_count,
|
|
pgsql_thread___monitor_ping_max_failures,
|
|
pgsql_thread___monitor_ping_interval * 1000,
|
|
pgsql_thread___monitor_writer_is_also_reader
|
|
}
|
|
},
|
|
mon_user_t {
|
|
pgsql_thread___monitor_username,
|
|
pgsql_thread___monitor_password,
|
|
pgsql_thread___monitor_dbname
|
|
}
|
|
};
|
|
}
|
|
|
|
using op_params_t = std::unique_ptr<void, std::function<void(void*)>>;
|
|
using op_result_t = std::unique_ptr<void, std::function<void(void*)>>;
|
|
|
|
struct op_st_t {
|
|
// :: info
|
|
mon_srv_t srv_info;
|
|
mon_user_t user_info;
|
|
op_params_t op_params;
|
|
// :: state
|
|
uint64_t exec_time { 0 };
|
|
op_result_t op_result;
|
|
};
|
|
|
|
struct task_st_t {
|
|
// :: info
|
|
task_type_t type;
|
|
uint64_t sched_intv;
|
|
// :: state
|
|
uint64_t start { 0 };
|
|
uint64_t end { 0 };
|
|
op_st_t op_st;
|
|
};
|
|
|
|
struct task_inf_t {
|
|
task_type_t type;
|
|
op_st_t op_st;
|
|
};
|
|
|
|
struct state_t {
|
|
pgsql_conn_t conn;
|
|
task_st_t task;
|
|
};
|
|
|
|
enum class task_status_t { success, failure };
|
|
|
|
mf_unique_ptr<char> strdup_no_lf(const char* input) {
|
|
if (input == nullptr) return nullptr;
|
|
|
|
size_t len = std::strlen(input);
|
|
char* res = static_cast<char*>(malloc(len + 1));
|
|
memset(res, 0, len + 1);
|
|
|
|
bool in_lf = false;
|
|
size_t res_pos = 0;
|
|
|
|
for (size_t i = 0; i < len; i++) {
|
|
if (input[i] == '\n') {
|
|
if (i < len - 1) {
|
|
res[res_pos] = ' ';
|
|
res_pos++;
|
|
}
|
|
in_lf = true;
|
|
} else if (in_lf && (input[i] == ' ' || input[i] == '\t')) {
|
|
if (input[i - 1] == '\n' && (input[i] == ' ' || input[i] == '\t')) {
|
|
res[res_pos] = ' ';
|
|
res_pos++;
|
|
} else {
|
|
continue;
|
|
}
|
|
} else {
|
|
in_lf = false;
|
|
res[res_pos] = input[i];
|
|
res_pos++;
|
|
}
|
|
}
|
|
|
|
res[res_pos] = '\0';
|
|
|
|
return mf_unique_ptr<char>(res);
|
|
}
|
|
|
|
void set_failed_st(state_t& st, ASYNC_ST new_st, mf_unique_ptr<char> err) {
|
|
st.conn.state = new_st;
|
|
st.conn.err = std::move(err);
|
|
st.task.end = monotonic_time();
|
|
}
|
|
|
|
void set_finish_st(state_t& st, ASYNC_ST new_st, op_result_t res = {}) {
|
|
st.conn.state = new_st;
|
|
st.task.op_st.op_result = std::move(res);
|
|
st.task.end = monotonic_time();
|
|
}
|
|
|
|
short handle_async_check_cont(state_t& st, short _) {
|
|
pgsql_conn_t& pgconn { st.conn };
|
|
|
|
// Single command queries; 'PQisBusy' and 'PQconsumeInput' not required
|
|
PGresult* res { PQgetResult(pgconn.conn) };
|
|
|
|
// Wait for the result asynchronously
|
|
if (res == NULL) {
|
|
if (st.task.type == task_type_t::ping) {
|
|
set_finish_st(st, ASYNC_PING_END);
|
|
} else {
|
|
set_finish_st(st, ASYNC_QUERY_END);
|
|
}
|
|
} else {
|
|
// Check for errors in the query execution
|
|
ExecStatusType status = PQresultStatus(res);
|
|
|
|
if (status == PGRES_EMPTY_QUERY) {
|
|
set_finish_st(st, ASYNC_PING_END);
|
|
// Cleanup of resultset required for conn reuse
|
|
PQclear(PQgetResult(pgconn.conn));
|
|
} else if (status == PGRES_TUPLES_OK) {
|
|
int row_count = PQntuples(res);
|
|
|
|
if (row_count > 0) {
|
|
const char* value_str { PQgetvalue(res, 0, 0) };
|
|
bool value { strcmp(value_str, "t") == 0 };
|
|
|
|
set_finish_st(st, ASYNC_QUERY_END,
|
|
op_result_t {
|
|
new readonly_res_t { value },
|
|
[] (void* v) { delete static_cast<readonly_res_t*>(v); }
|
|
}
|
|
);
|
|
} else {
|
|
const mon_srv_t& srv { st.task.op_st.srv_info };
|
|
const char err_t[] { "Invalid number of rows '%d'" };
|
|
char err_b[sizeof(err_t) + 12] = { 0 };
|
|
|
|
cstr_format(err_b, err_t, row_count);
|
|
proxy_error(
|
|
"Monitor readonly failed addr='%s:%d' status=%d error='%s'\n",
|
|
srv.addr.c_str(), srv.port, status, err_b
|
|
);
|
|
set_failed_st(st, ASYNC_QUERY_FAILED, mf_unique_ptr<char>(strdup(err_b)));
|
|
}
|
|
|
|
// Cleanup of resultset required for conn reuse
|
|
PQclear(PQgetResult(pgconn.conn));
|
|
} else if (status != PGRES_COMMAND_OK) {
|
|
const mon_srv_t& srv { st.task.op_st.srv_info };
|
|
auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) };
|
|
|
|
if (st.task.type == task_type_t::ping) {
|
|
proxy_error(
|
|
"Monitor ping failed addr='%s:%d' status=%d error='%s'\n",
|
|
srv.addr.c_str(), srv.port, status, err.get()
|
|
);
|
|
set_failed_st(st, ASYNC_PING_FAILED, std::move(err));
|
|
} else if (st.task.type == task_type_t::readonly) {
|
|
proxy_error(
|
|
"Monitor readonly failed addr='%s:%d' status=%d error='%s'\n",
|
|
srv.addr.c_str(), srv.port, status, err.get()
|
|
);
|
|
set_failed_st(st, ASYNC_QUERY_FAILED, std::move(err));
|
|
} else {
|
|
assert(0 && "Invalid task type");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Clear always; we assume no resultset on ping
|
|
PQclear(res);
|
|
|
|
return POLLIN;
|
|
}
|
|
|
|
pair<short,bool> handle_async_connect_cont(state_t& st, short revent) {
|
|
pgsql_conn_t& pgconn { st.conn };
|
|
|
|
short req_events { 0 };
|
|
bool proc_again { false };
|
|
|
|
// NOTE: SCRAM-Handshake-256 may introduce an observable delay (CPU intensive).
|
|
PostgresPollingStatusType poll_res { PQconnectPoll(pgconn.conn) };
|
|
pgconn.fd = PQsocket(pgconn.conn);
|
|
|
|
switch (poll_res) {
|
|
case PGRES_POLLING_WRITING:
|
|
req_events |= POLLOUT;
|
|
break;
|
|
case PGRES_POLLING_ACTIVE:
|
|
case PGRES_POLLING_READING:
|
|
req_events |= POLLIN;
|
|
break;
|
|
case PGRES_POLLING_OK:
|
|
pgconn.state = ASYNC_ST::ASYNC_CONNECT_END;
|
|
|
|
// connection successful, update SSL stats
|
|
if (PQsslInUse(pgconn.conn)) {
|
|
__sync_fetch_and_add(&GloPgMon->ssl_connections_OK, 1);
|
|
} else {
|
|
__sync_fetch_and_add(&GloPgMon->non_ssl_connections_OK, 1);
|
|
}
|
|
|
|
if (st.task.type == task_type_t::connect) {
|
|
st.task.end = monotonic_time();
|
|
} else if (st.task.type == task_type_t::ping) {
|
|
proc_again = true;
|
|
} else if (st.task.type == task_type_t::readonly) {
|
|
proc_again = true;
|
|
} else {
|
|
assert(0 && "Non-implemented task-type");
|
|
}
|
|
break;
|
|
case PGRES_POLLING_FAILED: {
|
|
// During connection phase use `PQerrorMessage`
|
|
const mon_srv_t& srv { st.task.op_st.srv_info };
|
|
auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) };
|
|
|
|
proxy_error(
|
|
"Monitor connect failed addr='%s:%d' error='%s'\n",
|
|
srv.addr.c_str(), srv.port, err.get()
|
|
);
|
|
set_failed_st(st, ASYNC_CONNECT_FAILED, std::move(err));
|
|
break;
|
|
}
|
|
}
|
|
|
|
return { req_events, proc_again };
|
|
}
|
|
|
|
short handle_async_connect_end(state_t& st, short _) {
|
|
pgsql_conn_t& pgconn { st.conn };
|
|
|
|
short req_events { 0 };
|
|
const char* QUERY { st.task.type == task_type_t::ping ? PING_QUERY : READ_ONLY_QUERY };
|
|
|
|
int rc = PQsendQuery(pgconn.conn, QUERY);
|
|
if (rc == 0) {
|
|
const mon_srv_t& srv { st.task.op_st.srv_info };
|
|
auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) };
|
|
|
|
if (st.task.type == task_type_t::ping) {
|
|
proxy_error(
|
|
"Monitor ping start failed addr='%s:%d' error='%s'\n",
|
|
srv.addr.c_str(), srv.port, err.get()
|
|
);
|
|
set_failed_st(st, ASYNC_PING_FAILED, std::move(err));
|
|
} else if (st.task.type == task_type_t::readonly) {
|
|
proxy_error(
|
|
"Monitor readonly start failed addr='%s:%d' error='%s'\n",
|
|
srv.addr.c_str(), srv.port, err.get()
|
|
);
|
|
set_failed_st(st, ASYNC_QUERY_FAILED, std::move(err));
|
|
} else {
|
|
assert(0 && "Invalid task type");
|
|
}
|
|
} else {
|
|
int res = PQflush(pgconn.conn);
|
|
|
|
if (res < 0) {
|
|
const mon_srv_t& srv { st.task.op_st.srv_info };
|
|
auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) };
|
|
|
|
if (st.task.type == task_type_t::ping) {
|
|
proxy_error(
|
|
"Monitor ping start failed addr='%s:%d' error='%s'\n",
|
|
srv.addr.c_str(), srv.port, err.get()
|
|
);
|
|
set_failed_st(st, ASYNC_PING_FAILED, std::move(err));
|
|
} else if (st.task.type == task_type_t::readonly) {
|
|
proxy_error(
|
|
"Monitor readonly start failed addr='%s:%d' error='%s'\n",
|
|
srv.addr.c_str(), srv.port, err.get()
|
|
);
|
|
set_failed_st(st, ASYNC_QUERY_FAILED, std::move(err));
|
|
} else {
|
|
assert(0 && "Invalid task type");
|
|
}
|
|
} else {
|
|
req_events |= res > 0 ? POLLOUT : POLLIN;
|
|
|
|
if (st.task.type == task_type_t::ping) {
|
|
pgconn.state = ASYNC_ST::ASYNC_PING_CONT;
|
|
} else if (st.task.type == task_type_t::readonly) {
|
|
pgconn.state = ASYNC_ST::ASYNC_QUERY_CONT;
|
|
} else {
|
|
assert(0 && "Invalid task type");
|
|
}
|
|
}
|
|
}
|
|
|
|
return req_events;
|
|
}
|
|
|
|
short handle_pg_event(state_t& st, short event) {
|
|
pgsql_conn_t& pgconn { st.conn };
|
|
short req_events = 0;
|
|
|
|
#ifdef DEBUG
|
|
const char* host { PQhostaddr(pgconn.conn) };
|
|
const char* port { PQport(pgconn.conn) };
|
|
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Handling event for conn fd=%d addr='%s:%s' event=%d state=%d\n",
|
|
pgconn.fd, host, port, event, st.conn.state
|
|
);
|
|
#endif
|
|
|
|
next_immediate:
|
|
|
|
switch (pgconn.state) {
|
|
case ASYNC_ST::ASYNC_CONNECT_FAILED: {
|
|
// Conn creation failed; no socket adquired
|
|
break;
|
|
}
|
|
case ASYNC_ST::ASYNC_CONNECT_CONT: {
|
|
auto [events, proc_again] = handle_async_connect_cont(st, event);
|
|
req_events = events;
|
|
|
|
if (proc_again) {
|
|
goto next_immediate;
|
|
}
|
|
break;
|
|
}
|
|
case ASYNC_ST::ASYNC_CONNECT_END: {
|
|
req_events = handle_async_connect_end(st, event);
|
|
break;
|
|
}
|
|
case ASYNC_ST::ASYNC_QUERY_CONT:
|
|
case ASYNC_ST::ASYNC_PING_CONT: {
|
|
req_events = handle_async_check_cont(st, event);
|
|
break;
|
|
}
|
|
case ASYNC_ST::ASYNC_PING_END: {
|
|
pgconn.state = ASYNC_ST::ASYNC_CONNECT_END;
|
|
break;
|
|
}
|
|
case ASYNC_ST::ASYNC_QUERY_END: {
|
|
pgconn.state = ASYNC_ST::ASYNC_CONNECT_END;
|
|
break;
|
|
}
|
|
default: {
|
|
// Should not be reached
|
|
assert(0 && "State matching should be exhaustive");
|
|
break;
|
|
}
|
|
}
|
|
|
|
return req_events;
|
|
}
|
|
|
|
struct conn_pool_t {
|
|
unordered_map<string, list<pgsql_conn_t>> conn_map;
|
|
std::mutex mutex;
|
|
};
|
|
|
|
conn_pool_t mon_conn_pool {};
|
|
|
|
pair<bool,pgsql_conn_t> get_conn(
|
|
conn_pool_t& conn_pool, const mon_srv_t& srv_info, uint64_t intv
|
|
) {
|
|
bool found { false };
|
|
pgsql_conn_t found_conn {};
|
|
vector<pgsql_conn_t> expired_conns {};
|
|
|
|
{
|
|
std::lock_guard<std::mutex> lock(conn_pool.mutex);
|
|
|
|
const string key { srv_info.addr + ":" + std::to_string(srv_info.port) };
|
|
auto it = mon_conn_pool.conn_map.find(key);
|
|
|
|
if (it != mon_conn_pool.conn_map.end()) {
|
|
list<pgsql_conn_t>& conn_list = it->second;
|
|
auto now = monotonic_time();
|
|
|
|
for (auto it = conn_list.begin(); it != conn_list.end();) {
|
|
// TODO: Tune this value; keeping alive too many conns per-host
|
|
// - Connect always create new connections
|
|
// - Low connect intervals guarantee to keep up to N conns per host
|
|
if (now - it->last_used > 3 * intv) {
|
|
expired_conns.emplace_back(std::move(*it));
|
|
it = conn_list.erase(it);
|
|
} else {
|
|
++it;
|
|
}
|
|
}
|
|
|
|
if (!conn_list.empty()) {
|
|
found = true;
|
|
found_conn = std::move(conn_list.front());
|
|
|
|
conn_list.pop_front();
|
|
}
|
|
}
|
|
}
|
|
|
|
for (pgsql_conn_t& conn : expired_conns) {
|
|
PQfinish(conn.conn);
|
|
}
|
|
|
|
return pair<bool,pgsql_conn_t>(found, std::move(found_conn));
|
|
}
|
|
|
|
void put_conn(conn_pool_t& conn_pool, const mon_srv_t& srv_info, pgsql_conn_t conn) {
|
|
std::lock_guard<std::mutex> lock(conn_pool.mutex);
|
|
|
|
const string key { srv_info.addr + ":" + std::to_string(srv_info.port) };
|
|
conn_pool.conn_map[key].emplace_front(std::move(conn));
|
|
}
|
|
|
|
uint64_t get_connpool_cleanup_intv(task_st_t& task) {
|
|
uint64_t res = 0;
|
|
|
|
if (task.type == task_type_t::connect) {
|
|
connect_params_t* params {
|
|
static_cast<connect_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
res = params->ping_interval;
|
|
} else if (task.type == task_type_t::ping) {
|
|
ping_params_t* params {
|
|
static_cast<ping_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
res = params->interval;
|
|
} else if (task.type == task_type_t::readonly){
|
|
readonly_params_t* params {
|
|
static_cast<readonly_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
res = params->ping_interval;
|
|
} else {
|
|
assert(0 && "Non-implemented task-type");
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
pair<bool,pgsql_conn_t> get_task_conn(conn_pool_t& conn_pool, task_st_t& task_st) {
|
|
if (task_st.type == task_type_t::connect) {
|
|
return pair<bool,pgsql_conn_t> { false, pgsql_conn_t {} };
|
|
} else {
|
|
const mon_srv_t& mon_srv { task_st.op_st.srv_info };
|
|
uint64_t cleanup_intv { get_connpool_cleanup_intv(task_st) };
|
|
|
|
return get_conn(conn_pool, mon_srv, cleanup_intv);
|
|
}
|
|
}
|
|
|
|
static void append_conninfo_param(std::ostringstream& conninfo, const std::string& key, const std::string& val) {
|
|
if (val.empty()) return;
|
|
|
|
std::string escaped_val;
|
|
escaped_val.reserve(val.length() * 2); // Reserve maximum possible size
|
|
|
|
for (char c : val) {
|
|
if (c == '\'' || c == '\\') {
|
|
escaped_val.push_back('\\');
|
|
}
|
|
escaped_val.push_back(c);
|
|
}
|
|
|
|
conninfo << key << "='" << escaped_val << "' ";
|
|
}
|
|
|
|
string build_conn_str(const task_st_t& task_st) {
|
|
const mon_srv_t& srv_info { task_st.op_st.srv_info };
|
|
const mon_user_t& user_info { task_st.op_st.user_info };
|
|
|
|
std::ostringstream conninfo;
|
|
append_conninfo_param(conninfo, "user", user_info.user); // username
|
|
append_conninfo_param(conninfo, "password", user_info.pass); // password
|
|
append_conninfo_param(conninfo, "dbname", user_info.dbname); // dbname
|
|
append_conninfo_param(conninfo, "host", srv_info.addr); // backend address
|
|
conninfo << "port=" << srv_info.port << " "; // backend port
|
|
conninfo << "application_name=ProxySQL-Monitor "; // application name
|
|
if (srv_info.ssl) {
|
|
conninfo << "sslmode='require' "; // SSL required
|
|
append_conninfo_param(conninfo, "sslkey", srv_info.ssl_opt.ssl_p2s_key);
|
|
append_conninfo_param(conninfo, "sslcert", srv_info.ssl_opt.ssl_p2s_cert);
|
|
append_conninfo_param(conninfo, "sslrootcert", srv_info.ssl_opt.ssl_p2s_ca);
|
|
append_conninfo_param(conninfo, "sslcrl", srv_info.ssl_opt.ssl_p2s_crl);
|
|
append_conninfo_param(conninfo, "sslcrldir", srv_info.ssl_opt.ssl_p2s_crlpath);
|
|
} else {
|
|
conninfo << "sslmode='disable' "; // not supporting SSL
|
|
}
|
|
return conninfo.str();
|
|
}
|
|
|
|
pgsql_conn_t create_new_conn(task_st_t& task_st) {
|
|
pgsql_conn_t pgconn {};
|
|
|
|
// Initialize connection parameters
|
|
const string conn_str { build_conn_str(task_st) };
|
|
pgconn.conn = PQconnectStart(conn_str.c_str());
|
|
|
|
if (pgconn.conn == NULL || PQstatus(pgconn.conn) == CONNECTION_BAD) {
|
|
const mon_srv_t& srv { task_st.op_st.srv_info };
|
|
|
|
if (pgconn.conn) {
|
|
auto error { strdup_no_lf(PQerrorMessage(pgconn.conn)) };
|
|
proxy_error(
|
|
"Monitor connect failed addr='%s:%d' error='%s'\n",
|
|
srv.addr.c_str(), srv.port, error.get()
|
|
);
|
|
|
|
pgconn.err = std::move(error);
|
|
task_st.end = monotonic_time();
|
|
} else {
|
|
mf_unique_ptr<char> error { strdup("Out of memory") };
|
|
proxy_error(
|
|
"Monitor connect failed addr='%s:%d' error='%s'\n",
|
|
srv.addr.c_str(), srv.port, "Out of memory"
|
|
);
|
|
|
|
pgconn.err = std::move(error);
|
|
task_st.end = monotonic_time();
|
|
}
|
|
} else {
|
|
if (PQsetnonblocking(pgconn.conn, 1) != 0) {
|
|
auto error { strdup_no_lf(PQerrorMessage(pgconn.conn)) };
|
|
proxy_error("Failed to set non-blocking mode error='%s'\n", error.get());
|
|
|
|
pgconn.err = std::move(error);
|
|
task_st.end = monotonic_time();
|
|
} else {
|
|
pgconn.state = ASYNC_ST::ASYNC_CONNECT_CONT;
|
|
pgconn.fd = PQsocket(pgconn.conn);
|
|
}
|
|
}
|
|
|
|
return pgconn;
|
|
}
|
|
|
|
#ifdef DEBUG
|
|
uint64_t count_pool_conns(conn_pool_t& pool) {
|
|
std::lock_guard<std::mutex> lock(pool.mutex);
|
|
uint64_t count = 0;
|
|
|
|
for (const auto& [key, connections] : pool.conn_map) {
|
|
count += connections.size();
|
|
}
|
|
|
|
return count;
|
|
}
|
|
#endif
|
|
|
|
pgsql_conn_t create_conn(task_st_t& task_st) {
|
|
// Count the task as already started (conn acquisition)
|
|
task_st.start = monotonic_time();
|
|
// Get taskFetched from conn_pool if task types allows it
|
|
pair<bool,pgsql_conn_t> pool_res { get_task_conn(mon_conn_pool, task_st) };
|
|
|
|
#ifdef DEBUG
|
|
const mon_srv_t& srv { task_st.op_st.srv_info };
|
|
uint64_t pool_conn_count { count_pool_conns(mon_conn_pool) };
|
|
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Fetched conn from pool task_type=%d fd=%d addr='%s:%d' pool_conn_count=%lu\n",
|
|
int(task_st.type), pool_res.second.fd, srv.addr.c_str(), srv.port, pool_conn_count
|
|
);
|
|
#endif
|
|
|
|
if (pool_res.first) {
|
|
return std::move(pool_res.second);
|
|
} else {
|
|
return create_new_conn(task_st);
|
|
}
|
|
}
|
|
|
|
// Previous tasks results
|
|
struct tasks_stats_t {
|
|
uint64_t start;
|
|
uint64_t end;
|
|
uint64_t count;
|
|
};
|
|
|
|
// Compute the required number of threads for the current interval
|
|
uint32_t required_worker_threads(
|
|
tasks_stats_t prev,
|
|
uint64_t worker_threads,
|
|
uint64_t new_tasks_intv,
|
|
uint64_t new_tasks_count
|
|
) {
|
|
uint64_t req_worker_threads = worker_threads;
|
|
|
|
double prev_intv_rate = double(prev.count) / (prev.end - prev.start);
|
|
double est_intv_proc_tasks = new_tasks_intv * prev_intv_rate;
|
|
|
|
if (est_intv_proc_tasks < new_tasks_count && prev.count != 0) {
|
|
// Estimate of number of tasks consumed per worker
|
|
double tasks_per_worker = double(prev.count) / worker_threads;
|
|
req_worker_threads = ceil(new_tasks_count / tasks_per_worker);
|
|
}
|
|
|
|
return req_worker_threads;
|
|
}
|
|
|
|
struct tasks_intvs_t {
|
|
uint64_t next_ping_at;
|
|
uint64_t next_connect_at;
|
|
uint64_t next_readonly_at;
|
|
};
|
|
|
|
struct task_poll_t {
|
|
std::vector<struct pollfd> fds {};
|
|
std::vector<state_t> tasks {};
|
|
size_t size = 0;
|
|
};
|
|
|
|
void add_task(
|
|
task_poll_t& task_poll, short int events, state_t&& task
|
|
) {
|
|
if (task_poll.size < task_poll.fds.size()) {
|
|
task_poll.fds[task_poll.size] = pollfd { task.conn.fd, events, 0 };
|
|
} else {
|
|
task_poll.fds.emplace_back(pollfd { task.conn.fd, events, 0 });
|
|
}
|
|
if (task_poll.size < task_poll.tasks.size()) {
|
|
task_poll.tasks[task_poll.size] = std::move(task);
|
|
} else {
|
|
task_poll.tasks.emplace_back(std::move(task));
|
|
}
|
|
|
|
task_poll.size++;
|
|
}
|
|
|
|
void rm_task_fast(task_poll_t& task_poll, size_t idx) {
|
|
if (idx > task_poll.size || idx < 0) {
|
|
proxy_error("Receveid invalid task index idx=%lu", idx);
|
|
assert(0);
|
|
}
|
|
|
|
task_poll.fds[idx] = task_poll.fds[task_poll.size - 1];
|
|
task_poll.tasks[idx] = std::move(task_poll.tasks[task_poll.size - 1]);
|
|
task_poll.size--;
|
|
}
|
|
|
|
struct task_queue_t {
|
|
int comm_fd[2];
|
|
std::queue<task_st_t> queue {};
|
|
std::mutex mutex {};
|
|
|
|
task_queue_t() {
|
|
int rc = pipe(comm_fd);
|
|
assert(rc == 0 && "Failed to create pipe for Monitor worker thread");
|
|
}
|
|
};
|
|
|
|
struct task_res_t {
|
|
task_status_t status;
|
|
task_st_t task;
|
|
};
|
|
|
|
struct result_queue_t {
|
|
std::queue<task_res_t> queue {};
|
|
std::mutex mutex {};
|
|
};
|
|
|
|
tasks_stats_t compute_intv_stats(result_queue_t& results) {
|
|
std::lock_guard<std::mutex> lock_queue { results.mutex };
|
|
|
|
tasks_stats_t stats {};
|
|
|
|
if (results.queue.size() != 0) {
|
|
stats = tasks_stats_t {
|
|
results.queue.front().task.start,
|
|
results.queue.back().task.end,
|
|
results.queue.size()
|
|
};
|
|
} else {
|
|
stats = tasks_stats_t { 0, 0, 0 };
|
|
}
|
|
|
|
results.queue = {};
|
|
|
|
return stats;
|
|
}
|
|
|
|
template <typename conf_t, typename params_t>
|
|
vector<task_st_t> create_simple_tasks(
|
|
uint64_t curtime, const mon_user_t user, const conf_t& conf, task_type_t type
|
|
) {
|
|
vector<task_st_t> tasks {};
|
|
const vector<mon_srv_t> srvs_info { ext_srvs(conf.srvs_info) };
|
|
|
|
for (const auto& srv_info : srvs_info) {
|
|
auto op_dtor { [] (void* v) { delete static_cast<params_t*>(v); } };
|
|
op_params_t op_params { new params_t { conf.params }, op_dtor };
|
|
op_st_t op_st { srv_info, user, std::move(op_params) };
|
|
|
|
tasks.push_back(task_st_t { type, curtime, curtime, 0, std::move(op_st) });
|
|
}
|
|
|
|
return tasks;
|
|
}
|
|
|
|
using worker_queue_t = pair<task_queue_t,result_queue_t>;
|
|
using worker_thread_t = pair<pthread_t, unique_ptr<worker_queue_t>>;
|
|
|
|
std::pair<int, pthread_t> create_thread(size_t stack_size, void*(*routine)(void*), void* args) {
|
|
pthread_attr_t attr;
|
|
int result = pthread_attr_init(&attr);
|
|
assert(result == 0 && "Failed to initialize thread attributes.");
|
|
|
|
result = pthread_attr_setstacksize(&attr, stack_size);
|
|
assert(result == 0 && "Invalid stack size provided for thread creation.");
|
|
|
|
pthread_t pthread;
|
|
result = pthread_create(&pthread, &attr, routine, args);
|
|
pthread_attr_destroy(&attr);
|
|
|
|
if (result != 0) {
|
|
return std::make_pair(result, pthread_t {});
|
|
} else {
|
|
return std::make_pair(result, pthread_t { pthread });
|
|
}
|
|
}
|
|
|
|
void write_signal(int fd, uint8_t val) {
|
|
uint8_t s { val };
|
|
|
|
for (;;) {
|
|
int rc = write(fd, &s, 1);
|
|
|
|
if (rc >= 0) {
|
|
break;
|
|
} else if (errno == EINTR || errno == EAGAIN) {
|
|
continue;
|
|
} else {
|
|
proxy_error(
|
|
"Failed to signal Monitor workers. Aborting rc=%d errno=%d\n", rc, errno
|
|
);
|
|
assert(0);
|
|
}
|
|
}
|
|
}
|
|
|
|
uint8_t read_signal(int fd) {
|
|
uint8_t s { 0 };
|
|
|
|
for (;;) {
|
|
int rc = read(fd, &s, 1);
|
|
|
|
if (rc >= 0) {
|
|
break;
|
|
} else if (errno == EINTR || errno == EAGAIN) {
|
|
continue;
|
|
} else {
|
|
proxy_error(
|
|
"Failed to read scheduler signal. Aborting rc=%d errno=%d\n", rc, errno
|
|
);
|
|
assert(0);
|
|
}
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
/**
|
|
* @brief Add the supplied tasks to the worker threads queues.
|
|
* @details Scheduling to avoid network burst is config dependent. Task distribution is
|
|
* even between workers with the exception of the last thread, which at worst could
|
|
* receive ⌊A/B⌋ + (B - 1) extra elements.
|
|
*
|
|
* @param workers Workers threads for even task distribution.
|
|
* @param tasks The tasks to be moved to the worker queues.
|
|
*/
|
|
void schedule_tasks(vector<worker_thread_t>& workers, vector<task_st_t>&& tasks) {
|
|
size_t tasks_per_thread { tasks.size() / workers.size() };
|
|
size_t task_idx = 0;
|
|
|
|
for (size_t i = 0; i < workers.size(); i++) {
|
|
task_queue_t& task_queue { workers[i].second->first };
|
|
std::lock_guard<std::mutex> lock_queue { task_queue.mutex };
|
|
|
|
if (i == workers.size() - 1) {
|
|
for (size_t j = task_idx; j < tasks.size(); j++) {
|
|
task_queue.queue.push(std::move(tasks[j]));
|
|
}
|
|
} else {
|
|
for (uint64_t t = 0; t < tasks_per_thread; t++, task_idx++) {
|
|
task_queue.queue.push(std::move(tasks[task_idx]));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Signal all threads to process queues
|
|
for (size_t i = 0; i < workers.size(); i++) {
|
|
task_queue_t& task_queue { workers[i].second->first };
|
|
write_signal(task_queue.comm_fd[1], 0);
|
|
}
|
|
}
|
|
|
|
pair<uint64_t,uint64_t> compute_task_rate(
|
|
uint64_t workers, uint64_t tasks, uint64_t intv_us, double intv_pct
|
|
) {
|
|
uint64_t intv_pct_us { uint64_t(ceil(intv_us * intv_pct)) };
|
|
double tasks_per_worker { ceil(tasks / double(workers)) };
|
|
uint64_t delay_per_bat { uint64_t(floor(intv_pct_us / tasks_per_worker)) };
|
|
|
|
return { workers, delay_per_bat };
|
|
}
|
|
|
|
uint64_t compute_sched_sleep(uint64_t curtime, uint64_t closest_intv, uint64_t next_batch_wait) {
|
|
const uint64_t next_intv_diff { closest_intv < curtime ? 0 : closest_intv - curtime };
|
|
const uint64_t max_wait_us { std::min({ next_batch_wait, next_intv_diff }) };
|
|
|
|
return max_wait_us;
|
|
}
|
|
|
|
struct task_batch_t {
|
|
// :: info
|
|
task_type_t type;
|
|
uint64_t batch_sz;
|
|
int32_t intv_us;
|
|
double intv_window;
|
|
// :: state
|
|
uint64_t next_sched;
|
|
vector<task_st_t> tasks;
|
|
};
|
|
|
|
vector<task_st_t> get_from_batch(task_batch_t& batch, uint64_t tasks) {
|
|
vector<task_st_t> new_bat {};
|
|
|
|
if (batch.tasks.size()) {
|
|
uint64_t batch_size { tasks > batch.tasks.size() ? batch.tasks.size() : tasks };
|
|
|
|
new_bat.insert(
|
|
new_bat.end(),
|
|
std::make_move_iterator(batch.tasks.begin()),
|
|
std::make_move_iterator(batch.tasks.begin() + batch_size)
|
|
);
|
|
batch.tasks.erase(batch.tasks.begin(), batch.tasks.begin() + batch_size);
|
|
}
|
|
|
|
return new_bat;
|
|
}
|
|
|
|
bool is_task_success(pgsql_conn_t& c, task_st_t& st) {
|
|
return
|
|
((c.state != ASYNC_ST::ASYNC_CONNECT_FAILED && c.state != ASYNC_CONNECT_TIMEOUT)
|
|
|| (c.state != ASYNC_ST::ASYNC_PING_FAILED && c.state != ASYNC_PING_TIMEOUT)
|
|
|| (c.state != ASYNC_ST::ASYNC_QUERY_FAILED && c.state != ASYNC_QUERY_TIMEOUT))
|
|
&& ((c.state == ASYNC_ST::ASYNC_CONNECT_END && st.type == task_type_t::connect)
|
|
|| (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping)
|
|
|| (c.state == ASYNC_ST::ASYNC_QUERY_END && st.type == task_type_t::readonly));
|
|
}
|
|
|
|
bool is_task_finish(pgsql_conn_t& c, task_st_t& st) {
|
|
return
|
|
((c.state == ASYNC_ST::ASYNC_CONNECT_FAILED || c.state == ASYNC_ST::ASYNC_CONNECT_TIMEOUT)
|
|
|| (c.state == ASYNC_ST::ASYNC_PING_FAILED || c.state == ASYNC_ST::ASYNC_PING_TIMEOUT)
|
|
|| (c.state == ASYNC_ST::ASYNC_QUERY_FAILED || c.state == ASYNC_ST::ASYNC_QUERY_TIMEOUT))
|
|
|| (c.state == ASYNC_ST::ASYNC_CONNECT_END && st.type == task_type_t::connect)
|
|
|| (c.state == ASYNC_ST::ASYNC_PING_END && st.type == task_type_t::ping)
|
|
|| (c.state == ASYNC_ST::ASYNC_QUERY_END && st.type == task_type_t::readonly);
|
|
}
|
|
|
|
void update_connect_table(SQLite3DB* db, state_t& state) {
|
|
sqlite3_stmt* stmt = nullptr;
|
|
int rc = db->prepare_v2(
|
|
"INSERT OR REPLACE INTO pgsql_server_connect_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)", &stmt
|
|
);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
|
|
uint64_t op_dur_us { state.task.end - state.task.start };
|
|
|
|
sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str());
|
|
sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port);
|
|
|
|
uint64_t time_start_us = realtime_time() - op_dur_us;
|
|
sqlite_bind_int64(stmt, 3, time_start_us);
|
|
|
|
uint64_t succ_time_us { is_task_success(state.conn, state.task) ? op_dur_us : 0 };
|
|
sqlite_bind_int64(stmt, 4, succ_time_us);
|
|
sqlite_bind_text(stmt, 5, state.conn.err.get());
|
|
|
|
SAFE_SQLITE3_STEP2(stmt);
|
|
|
|
sqlite_clear_bindings(stmt);
|
|
sqlite_reset_statement(stmt);
|
|
sqlite_finalize_statement(stmt);
|
|
|
|
if (state.conn.err) {
|
|
const mon_srv_t& srv { state.task.op_st.srv_info };
|
|
char* srv_addr { const_cast<char*>(srv.addr.c_str()) };
|
|
int err_code { 0 };
|
|
|
|
if (state.conn.state != ASYNC_ST::ASYNC_CONNECT_TIMEOUT) {
|
|
err_code = 9100 + state.conn.state;
|
|
} else {
|
|
err_code = ER_PROXYSQL_CONNECT_TIMEOUT;
|
|
};
|
|
|
|
PgHGM->p_update_pgsql_error_counter(
|
|
p_pgsql_error_type::proxysql, 0, srv_addr, srv.port, err_code
|
|
);
|
|
__sync_fetch_and_add(&GloPgMon->connect_check_ERR, 1);
|
|
} else {
|
|
__sync_fetch_and_add(&GloPgMon->connect_check_OK, 1);
|
|
}
|
|
}
|
|
|
|
void update_ping_table(SQLite3DB* db, state_t& state) {
|
|
sqlite3_stmt* stmt = nullptr;
|
|
int rc = db->prepare_v2(
|
|
"INSERT OR REPLACE INTO pgsql_server_ping_log VALUES (?1, ?2, ?3, ?4, ?5)", &stmt
|
|
);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
|
|
uint64_t op_dur_us { state.task.end - state.task.start };
|
|
|
|
sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str());
|
|
sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port);
|
|
|
|
uint64_t time_start_us { realtime_time() - op_dur_us };
|
|
sqlite_bind_int64(stmt, 3, time_start_us);
|
|
uint64_t succ_time_us { is_task_success(state.conn, state.task) ? op_dur_us : 0 };
|
|
sqlite_bind_int64(stmt, 4, succ_time_us);
|
|
|
|
sqlite_bind_text(stmt, 5, state.conn.err.get());
|
|
|
|
SAFE_SQLITE3_STEP2(stmt);
|
|
|
|
sqlite_clear_bindings(stmt);
|
|
sqlite_reset_statement(stmt);
|
|
sqlite_finalize_statement(stmt);
|
|
|
|
if (state.conn.err) {
|
|
const mon_srv_t& srv { state.task.op_st.srv_info };
|
|
char* srv_addr { const_cast<char*>(srv.addr.c_str()) };
|
|
int err_code { 0 };
|
|
|
|
if (state.conn.state != ASYNC_ST::ASYNC_PING_TIMEOUT) {
|
|
err_code = 9100 + state.conn.state;
|
|
} else {
|
|
err_code = ER_PROXYSQL_PING_TIMEOUT;
|
|
};
|
|
|
|
PgHGM->p_update_pgsql_error_counter(
|
|
p_pgsql_error_type::proxysql, 0, srv_addr, srv.port, err_code
|
|
);
|
|
__sync_fetch_and_add(&GloPgMon->ping_check_ERR, 1);
|
|
} else {
|
|
__sync_fetch_and_add(&GloPgMon->ping_check_OK, 1);
|
|
}
|
|
}
|
|
|
|
void update_readonly_table(SQLite3DB* db, state_t& state) {
|
|
readonly_res_t* op_result {
|
|
static_cast<readonly_res_t*>(state.task.op_st.op_result.get())
|
|
};
|
|
|
|
sqlite3_stmt* stmt = nullptr;
|
|
int rc = db->prepare_v2(
|
|
"INSERT OR REPLACE INTO pgsql_server_read_only_log VALUES (?1, ?2, ?3, ?4, ?5, ?6)", &stmt
|
|
);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
|
|
uint64_t op_dur_us { state.task.end - state.task.start };
|
|
|
|
sqlite_bind_text(stmt, 1, state.task.op_st.srv_info.addr.c_str());
|
|
sqlite_bind_int(stmt, 2, state.task.op_st.srv_info.port);
|
|
|
|
uint64_t time_start_us { realtime_time() - op_dur_us };
|
|
sqlite_bind_int64(stmt, 3, time_start_us);
|
|
|
|
uint64_t succ_time_us { is_task_success(state.conn, state.task) ? op_dur_us : 0 };
|
|
sqlite_bind_int64(stmt, 4, succ_time_us);
|
|
|
|
if (op_result) {
|
|
sqlite_bind_int64(stmt, 5, op_result->val);
|
|
} else {
|
|
sqlite_bind_null(stmt, 5);
|
|
}
|
|
|
|
sqlite_bind_text(stmt, 6, state.conn.err.get());
|
|
|
|
SAFE_SQLITE3_STEP2(stmt);
|
|
|
|
sqlite_clear_bindings(stmt);
|
|
sqlite_reset_statement(stmt);
|
|
sqlite_finalize_statement(stmt);
|
|
|
|
if (state.conn.err) {
|
|
const mon_srv_t& srv { state.task.op_st.srv_info };
|
|
char* srv_addr { const_cast<char*>(srv.addr.c_str()) };
|
|
int err_code { 0 };
|
|
|
|
if (state.conn.state != ASYNC_ST::ASYNC_QUERY_TIMEOUT) {
|
|
err_code = 9100 + state.conn.state;
|
|
} else {
|
|
err_code = ER_PROXYSQL_READONLY_TIMEOUT;
|
|
};
|
|
|
|
PgHGM->p_update_pgsql_error_counter(
|
|
p_pgsql_error_type::proxysql, 0, srv_addr, srv.port, err_code
|
|
);
|
|
__sync_fetch_and_add(&GloPgMon->readonly_check_ERR, 1);
|
|
} else {
|
|
__sync_fetch_and_add(&GloPgMon->readonly_check_OK, 1);
|
|
}
|
|
}
|
|
|
|
const char CHECK_HOST_ERR_LIMIT_QUERY[] {
|
|
"SELECT 1"
|
|
" FROM"
|
|
" ("
|
|
" SELECT hostname, port, ping_error"
|
|
" FROM pgsql_server_ping_log"
|
|
" WHERE hostname = ? AND port = ?"
|
|
" ORDER BY time_start_us DESC"
|
|
" LIMIT ?"
|
|
" ) a"
|
|
" WHERE"
|
|
" ping_error IS NOT NULL"
|
|
" AND ping_error NOT LIKE '%password authentication failed for user%'"
|
|
" GROUP BY"
|
|
" hostname, port"
|
|
" HAVING"
|
|
" COUNT(*) = ?"
|
|
};
|
|
|
|
thread_local sqlite3_stmt* CHECK_HOST_ERR_LIMIT_STMT { nullptr };
|
|
|
|
void shunn_non_resp_srv(SQLite3DB* db, state_t& state) {
|
|
ping_params_t* params { static_cast<ping_params_t*>(state.task.op_st.op_params.get()) };
|
|
|
|
const mon_srv_t& srv { state.task.op_st.srv_info };
|
|
char* addr { const_cast<char*>(srv.addr.c_str()) };
|
|
int port { srv.port };
|
|
int32_t max_fails { params->max_failures };
|
|
|
|
if (CHECK_HOST_ERR_LIMIT_STMT == nullptr) {
|
|
int rc = db->prepare_v2(CHECK_HOST_ERR_LIMIT_QUERY, &CHECK_HOST_ERR_LIMIT_STMT);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
}
|
|
|
|
sqlite_bind_text(CHECK_HOST_ERR_LIMIT_STMT, 1, addr);
|
|
sqlite_bind_int(CHECK_HOST_ERR_LIMIT_STMT, 2, port);
|
|
sqlite_bind_int(CHECK_HOST_ERR_LIMIT_STMT, 3, max_fails);
|
|
sqlite_bind_int(CHECK_HOST_ERR_LIMIT_STMT, 4, max_fails);
|
|
|
|
unique_ptr<SQLite3_result> limit_set { sqlite_fetch_and_clear(CHECK_HOST_ERR_LIMIT_STMT) };
|
|
|
|
if (limit_set && limit_set->rows_count) {
|
|
bool shunned { PgHGM->shun_and_killall(addr, port) };
|
|
if (shunned) {
|
|
proxy_error(
|
|
"Server %s:%d missed %d heartbeats, shunning it and killing all the connections."
|
|
" Disabling other checks until the node comes back online.\n",
|
|
addr, port, max_fails
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
const char HOST_FETCH_UPD_LATENCY_QUERY[] {
|
|
"SELECT"
|
|
" hostname, port, COALESCE(CAST(AVG(ping_success_time_us) AS INTEGER), 10000)"
|
|
" FROM"
|
|
" ("
|
|
" SELECT hostname, port, ping_success_time_us, ping_error"
|
|
" FROM pgsql_server_ping_log"
|
|
" WHERE hostname = ? AND port = ?"
|
|
" ORDER BY time_start_us DESC"
|
|
" LIMIT 3"
|
|
" ) a"
|
|
" WHERE ping_error IS NULL"
|
|
" GROUP BY hostname, port"
|
|
};
|
|
|
|
thread_local sqlite3_stmt* FETCH_HOST_LATENCY_STMT { nullptr };
|
|
|
|
void update_srv_latency(SQLite3DB* db, state_t& state) {
|
|
const mon_srv_t& srv { state.task.op_st.srv_info };
|
|
char* addr { const_cast<char*>(srv.addr.c_str()) };
|
|
int port { srv.port };
|
|
|
|
if (FETCH_HOST_LATENCY_STMT == nullptr) {
|
|
int rc = db->prepare_v2(HOST_FETCH_UPD_LATENCY_QUERY, &FETCH_HOST_LATENCY_STMT);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
}
|
|
|
|
sqlite_bind_text(FETCH_HOST_LATENCY_STMT, 1, addr);
|
|
sqlite_bind_int(FETCH_HOST_LATENCY_STMT, 2, port);
|
|
|
|
unique_ptr<SQLite3_result> resultset { sqlite_fetch_and_clear(FETCH_HOST_LATENCY_STMT) };
|
|
|
|
if (resultset && resultset->rows_count) {
|
|
for (const SQLite3_row* srv : resultset->rows) {
|
|
char* cur_latency { srv->fields[2] };
|
|
PgHGM->set_server_current_latency_us(addr, port, atoi(cur_latency));
|
|
}
|
|
}
|
|
}
|
|
|
|
void perf_ping_actions(SQLite3DB* db, state_t& state) {
|
|
// Update table entries
|
|
update_ping_table(db, state);
|
|
|
|
// TODO: Checks could be redesign so the checks themselves are cheap operations.
|
|
// Actions could remain expensive, as they should be the exception, not the norm.
|
|
/////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Shunn all problematic hosts
|
|
shunn_non_resp_srv(db, state);
|
|
|
|
// Update 'current_lantency_ms'
|
|
update_srv_latency(db, state);
|
|
/////////////////////////////////////////////////////////////////////////////////////
|
|
}
|
|
|
|
const char READONLY_HOSTS_QUERY_T[] {
|
|
"SELECT 1 FROM ("
|
|
" SELECT hostname, port, read_only, error FROM pgsql_server_read_only_log"
|
|
" WHERE hostname = '%s' AND port = '%d'"
|
|
" ORDER BY time_start_us DESC"
|
|
" LIMIT %d"
|
|
") a WHERE"
|
|
" read_only IS NULL AND error LIKE '%%Operation timed out%%'"
|
|
" GROUP BY"
|
|
" hostname, port"
|
|
" HAVING"
|
|
" COUNT(*) = %d"
|
|
};
|
|
|
|
void perf_readonly_actions(SQLite3DB* db, state_t& state) {
|
|
// Update table entries
|
|
update_readonly_table(db, state);
|
|
|
|
// Perform the readonly actions
|
|
{
|
|
const op_st_t& op_st { state.task.op_st };
|
|
const mon_srv_t& srv { state.task.op_st.srv_info };
|
|
readonly_params_t* params { static_cast<readonly_params_t*>(op_st.op_params.get()) };
|
|
|
|
cfmt_t q_fmt {
|
|
cstr_format(
|
|
READONLY_HOSTS_QUERY_T,
|
|
srv.addr.c_str(),
|
|
srv.port,
|
|
params->max_timeout_count,
|
|
params->max_timeout_count
|
|
)
|
|
};
|
|
|
|
if (is_task_success(state.conn, state.task)) {
|
|
readonly_res_t* op_result { static_cast<readonly_res_t*>(op_st.op_result.get()) };
|
|
PgHGM->read_only_action_v2(
|
|
{{ srv.addr, srv.port, op_result->val }}, params->writer_is_also_reader
|
|
);
|
|
} else {
|
|
char* err { nullptr };
|
|
unique_ptr<SQLite3_result> resultset { db->execute_statement(q_fmt.str.c_str(), &err) };
|
|
|
|
if (!err && resultset && resultset->rows_count) {
|
|
proxy_error(
|
|
"Server %s:%d missed %d read_only checks. Assuming read_only=1\n",
|
|
srv.addr.c_str(), srv.port, params->max_timeout_count
|
|
);
|
|
PgHGM->read_only_action_v2(
|
|
{{ srv.addr, srv.port, 1 }}, params->writer_is_also_reader
|
|
);
|
|
} else if (err) {
|
|
proxy_error(
|
|
"Internal query error. Aborting query=%s error='%s'\n", q_fmt.str.c_str(), err
|
|
);
|
|
free(err);
|
|
assert(0);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
uint64_t get_task_timeout(task_st_t& task) {
|
|
uint64_t task_to = 0;
|
|
|
|
if (task.type == task_type_t::connect) {
|
|
connect_params_t* params {
|
|
static_cast<connect_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
task_to = params->timeout;
|
|
} else if (task.type == task_type_t::ping) {
|
|
ping_params_t* params {
|
|
static_cast<ping_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
task_to = params->timeout;
|
|
} else if (task.type == task_type_t::readonly) {
|
|
readonly_params_t* params {
|
|
static_cast<readonly_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
task_to = params->timeout;
|
|
} else {
|
|
assert(0 && "Non-implemented task-type");
|
|
}
|
|
|
|
return task_to;
|
|
}
|
|
|
|
uint64_t get_task_max_ping_fails(task_st_t& task) {
|
|
uint64_t max_fails { 0 };
|
|
|
|
if (task.type == task_type_t::connect) {
|
|
connect_params_t* params {
|
|
static_cast<connect_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
max_fails = params->ping_max_failures;
|
|
} else if (task.type == task_type_t::ping) {
|
|
ping_params_t* params {
|
|
static_cast<ping_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
max_fails = params->max_failures;
|
|
} else if (task.type == task_type_t::readonly) {
|
|
readonly_params_t* params {
|
|
static_cast<readonly_params_t*>(task.op_st.op_params.get())
|
|
};
|
|
|
|
max_fails = params->ping_max_failures;
|
|
} else {
|
|
assert(0 && "Non-implemented task-type");
|
|
}
|
|
|
|
return max_fails;
|
|
}
|
|
|
|
void proc_task_state(state_t& state, uint64_t task_start) {
|
|
pgsql_conn_t& pg_conn { state.conn };
|
|
state.task.op_st.exec_time += monotonic_time() - task_start;
|
|
|
|
if (state.task.type == task_type_t::connect) {
|
|
if (monotonic_time() - state.task.start > get_task_timeout(state.task)) {
|
|
// TODO: Unified state processing
|
|
pg_conn.state = ASYNC_ST::ASYNC_CONNECT_TIMEOUT;
|
|
pg_conn.err = mf_unique_ptr<char>(strdup("Operation timed out"));
|
|
state.task.end = monotonic_time();
|
|
|
|
// TODO: proxy_error + metrics update
|
|
update_connect_table(&GloPgMon->monitordb, state);
|
|
} else if (is_task_finish(state.conn, state.task)) {
|
|
update_connect_table(&GloPgMon->monitordb, state);
|
|
}
|
|
} else if (state.task.type == task_type_t::ping) {
|
|
if (monotonic_time() - state.task.start > get_task_timeout(state.task)) {
|
|
// TODO: Unified state processing
|
|
pg_conn.state = ASYNC_ST::ASYNC_PING_TIMEOUT;
|
|
pg_conn.err = mf_unique_ptr<char>(strdup("Operation timed out"));
|
|
state.task.end = monotonic_time();
|
|
|
|
// TODO: proxy_error + metrics update
|
|
perf_ping_actions(&GloPgMon->monitordb, state);
|
|
} else if (is_task_finish(state.conn, state.task)) {
|
|
perf_ping_actions(&GloPgMon->monitordb, state);
|
|
}
|
|
} else if (state.task.type == task_type_t::readonly) {
|
|
if (monotonic_time() - state.task.start > get_task_timeout(state.task)) {
|
|
// TODO: Unified state processing
|
|
pg_conn.state = ASYNC_ST::ASYNC_QUERY_TIMEOUT;
|
|
pg_conn.err = mf_unique_ptr<char>(strdup("Operation timed out"));
|
|
state.task.end = monotonic_time();
|
|
|
|
// TODO: proxy_error + metrics update
|
|
perf_readonly_actions(&GloPgMon->monitordb, state);
|
|
} else if (is_task_finish(state.conn, state.task)) {
|
|
perf_readonly_actions(&GloPgMon->monitordb, state);
|
|
}
|
|
} else {
|
|
assert(0 && "Non-implemented task-type");
|
|
}
|
|
}
|
|
|
|
void add_scheduler_comm_task(const task_queue_t& tasks_queue, task_poll_t& task_poll) {
|
|
state_t dummy_state {
|
|
pgsql_conn_t {
|
|
nullptr,
|
|
tasks_queue.comm_fd[0],
|
|
0,
|
|
ASYNC_ST::ASYNC_CONNECT_FAILED,
|
|
{}
|
|
},
|
|
task_st_t {}
|
|
};
|
|
|
|
add_task(task_poll, POLLIN, std::move(dummy_state));
|
|
}
|
|
|
|
const uint64_t MAX_CHECK_DELAY_US { 500000 };
|
|
|
|
void* worker_thread(void* args) {
|
|
pair<task_queue_t, result_queue_t>* queues {
|
|
static_cast<pair<task_queue_t, result_queue_t>*>(args)
|
|
};
|
|
task_queue_t& tasks_queue { queues->first };
|
|
|
|
queue<task_st_t> next_tasks {};
|
|
task_poll_t task_poll {};
|
|
|
|
bool recv_stop_signal = 0;
|
|
uint64_t prev_it_time = 0;
|
|
|
|
// Insert dummy task for scheduler comms
|
|
add_scheduler_comm_task(tasks_queue, task_poll);
|
|
|
|
while (recv_stop_signal == false) {
|
|
// Process wakeup signal from scheduler
|
|
if (task_poll.fds[0].revents & POLLIN) {
|
|
recv_stop_signal = read_signal(task_poll.fds[0].fd);
|
|
|
|
if (recv_stop_signal == 1) {
|
|
proxy_info("Received exit signal. Stopping worker thread=%ld\n", pthread_self());
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Fetch the next tasks from the queue
|
|
{
|
|
std::lock_guard<std::mutex> tasks_mutex { tasks_queue.mutex };
|
|
#ifdef DEBUG
|
|
if (tasks_queue.queue.size()) {
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Fetching tasks from queue size=%lu\n", tasks_queue.queue.size()
|
|
);
|
|
}
|
|
#endif
|
|
while (tasks_queue.queue.size()) {
|
|
next_tasks.push(std::move(tasks_queue.queue.front()));
|
|
tasks_queue.queue.pop();
|
|
}
|
|
}
|
|
|
|
// Start processing the new tasks; create/fetch conns
|
|
while (next_tasks.size()) {
|
|
task_st_t task { std::move(next_tasks.front()) };
|
|
next_tasks.pop();
|
|
|
|
if (task.type != task_type_t::ping) {
|
|
// Check if server is responsive; if not, only ping tasks are processed
|
|
const mon_srv_t& srv { task.op_st.srv_info };
|
|
uint64_t max_fails { get_task_max_ping_fails(task) };
|
|
|
|
bool resp_srv {
|
|
server_responds_to_ping(
|
|
GloPgMon->monitordb, srv.addr.c_str(), srv.port, max_fails
|
|
)
|
|
};
|
|
|
|
if (resp_srv == false) {
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 6,
|
|
"Skipping unresponsive server addr='%s:%d'\n",
|
|
srv.addr.c_str(), srv.port
|
|
);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Acquire new conn, update task on failure
|
|
uint64_t t1 { monotonic_time() };
|
|
pgsql_conn_t conn { create_conn(task) };
|
|
task.op_st.exec_time += monotonic_time() - t1;
|
|
|
|
state_t init_st { std::move(conn), std::move(task) };
|
|
|
|
#ifdef DEBUG
|
|
const mon_srv_t& srv { init_st.task.op_st.srv_info };
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 6,
|
|
"Adding new task to poll fd=%d type=%d addr='%s:%d'\n",
|
|
conn.fd, int(init_st.task.type), srv.addr.c_str(), srv.port
|
|
);
|
|
#endif
|
|
|
|
add_task(task_poll, POLLOUT, std::move(init_st));
|
|
}
|
|
|
|
uint64_t next_timeout_at = ULONG_LONG_MAX;
|
|
uint64_t tasks_start = monotonic_time();
|
|
|
|
// Continue processing tasks; Next async operation
|
|
for (size_t i = 1; i < task_poll.size; i++) {
|
|
uint64_t task_start = monotonic_time();
|
|
|
|
#if DEBUG
|
|
pollfd& pfd { task_poll.fds[i] };
|
|
state_t& task_st { task_poll.tasks[i] };
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Processing task fd=%d revents=%d type=%d state=%d\n",
|
|
pfd.fd, pfd.revents, int(task_st.task.type), task_st.conn.state
|
|
);
|
|
#endif
|
|
|
|
// Filtering is possible here for the task
|
|
if (task_poll.fds[i].revents) {
|
|
task_poll.fds[i].events = handle_pg_event(
|
|
task_poll.tasks[i], task_poll.fds[i].revents
|
|
);
|
|
}
|
|
|
|
// Reference invalidated by 'rm_task_fast'.
|
|
pgsql_conn_t& conn { task_poll.tasks[i].conn };
|
|
|
|
// TODO: Dump all relevant task state and changes due 'pg_event'
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Updating task state fd=%d conn_st=%d\n", conn.fd, conn.state
|
|
);
|
|
|
|
// Process task status; Update final state if finished
|
|
proc_task_state(task_poll.tasks[i], task_start);
|
|
|
|
// TODO: Dump all relevant task state
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Updated task state fd=%d conn_st=%d\n", conn.fd, conn.state
|
|
);
|
|
|
|
// Failed/finished task; resuse conn / cleanup resources
|
|
if (is_task_finish(conn, task_poll.tasks[i].task)) {
|
|
// TODO: Dump all relevant task state
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Finished task fd=%d conn_st=%d\n", conn.fd, conn.state
|
|
);
|
|
|
|
if (is_task_success(task_poll.tasks[i].conn, task_poll.tasks[i].task)) {
|
|
const mon_srv_t& srv { task_poll.tasks[i].task.op_st.srv_info };
|
|
|
|
// TODO: Better unified design to update state
|
|
task_poll.tasks[i].conn.state = ASYNC_ST::ASYNC_CONNECT_END;
|
|
task_poll.tasks[i].conn.last_used = task_poll.tasks[i].task.start;
|
|
|
|
put_conn(mon_conn_pool, srv, std::move(task_poll.tasks[i].conn));
|
|
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Succeed task conn returned to pool fd=%d conn_st=%d\n",
|
|
conn.fd, conn.state
|
|
);
|
|
} else {
|
|
PQfinish(task_poll.tasks[i].conn.conn);
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Failed task conn killed fd=%d conn_st=%d\n", conn.fd, conn.state
|
|
);
|
|
}
|
|
|
|
// Remove from poll; after conn cleanup
|
|
rm_task_fast(task_poll, i);
|
|
} else {
|
|
uint64_t task_to = get_task_timeout(task_poll.tasks[i].task);
|
|
uint64_t task_due_to = task_poll.tasks[i].task.start + task_to;
|
|
next_timeout_at = next_timeout_at > task_due_to ? task_due_to : next_timeout_at;
|
|
}
|
|
}
|
|
|
|
const uint64_t tasks_end { monotonic_time() };
|
|
prev_it_time = tasks_end - tasks_start;
|
|
|
|
uint64_t to_timeout_us { next_timeout_at - tasks_end };
|
|
uint64_t poll_timeout_us {
|
|
to_timeout_us > MAX_CHECK_DELAY_US ? MAX_CHECK_DELAY_US : to_timeout_us
|
|
};
|
|
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Waiting for poll fds_len=%lu poll_to=%lu\n", task_poll.size, poll_timeout_us
|
|
);
|
|
|
|
int rc = poll(task_poll.fds.data(), task_poll.size, poll_timeout_us/1000);
|
|
uint64_t poll_waited = monotonic_time() - tasks_end;
|
|
|
|
for (size_t i = 1; i < task_poll.size; i++) {
|
|
if (!task_poll.fds[i].revents) {
|
|
task_poll.tasks[i].task.op_st.exec_time += prev_it_time;
|
|
}
|
|
|
|
task_poll.tasks[i].task.op_st.exec_time += poll_waited;
|
|
}
|
|
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Wokeup from poll fds_len=%lu\n", task_poll.size
|
|
);
|
|
|
|
if (rc == -1 && errno == EINTR)
|
|
continue;
|
|
if (rc == -1) {
|
|
proxy_error("Call to 'poll' failed. Aborting rc=%d errno=%d\n", rc, errno);
|
|
assert(0);
|
|
}
|
|
}
|
|
|
|
sqlite_finalize_statement(CHECK_HOST_ERR_LIMIT_STMT);
|
|
sqlite_finalize_statement(FETCH_HOST_LATENCY_STMT);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
void maint_monitor_table(SQLite3DB* db, const char query[], const ping_params_t& params) {
|
|
sqlite3_stmt* stmt { nullptr };
|
|
int rc = db->prepare_v2(query, &stmt);
|
|
ASSERT_SQLITE_OK(rc, db);
|
|
|
|
if (pgsql_thread___monitor_history < (params.interval * (params.max_failures + 1)) / 1000) {
|
|
if (static_cast<uint64_t>(params.interval) < uint64_t(3600000) * 1000) {
|
|
pgsql_thread___monitor_history = (params.interval * (params.max_failures + 1)) / 1000;
|
|
}
|
|
}
|
|
|
|
uint64_t max_history_age { realtime_time() - uint64_t(pgsql_thread___monitor_history)*1000 };
|
|
sqlite_bind_int64(stmt, 1, max_history_age);
|
|
SAFE_SQLITE3_STEP2(stmt);
|
|
|
|
sqlite_clear_bindings(stmt);
|
|
sqlite_reset_statement(stmt);
|
|
sqlite_finalize_statement(stmt);
|
|
}
|
|
|
|
const char MAINT_PING_LOG_QUERY[] {
|
|
"DELETE FROM pgsql_server_ping_log WHERE time_start_us < ?1"
|
|
};
|
|
|
|
const char MAINT_CONNECT_LOG_QUERY[] {
|
|
"DELETE FROM pgsql_server_connect_log WHERE time_start_us < ?1"
|
|
};
|
|
|
|
const char MAINT_READONLY_LOG_QUERY[] {
|
|
"DELETE FROM pgsql_server_read_only_log WHERE time_start_us < ?1"
|
|
};
|
|
|
|
/**
|
|
* @brief Performs the required maintenance in the monitor log tables.
|
|
* @param tasks_conf The updated tasks config for the interval.
|
|
* @param next_intvs Timestamps of each operation next interval.
|
|
* @param intv_start Timestamp of current interval start.
|
|
*/
|
|
void maint_mon_tables(
|
|
const tasks_conf_t& tasks_conf, const tasks_intvs_t& next_intvs, uint64_t intv_start
|
|
) {
|
|
if (next_intvs.next_ping_at <= intv_start) {
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Performed PING table maintenance intv_start=%lu\n", intv_start
|
|
);
|
|
maint_monitor_table(
|
|
&GloPgMon->monitordb, MAINT_PING_LOG_QUERY, tasks_conf.ping.params
|
|
);
|
|
}
|
|
|
|
if (next_intvs.next_connect_at <= intv_start) {
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Performed CONNECT table maintenance intv_start=%lu\n", intv_start
|
|
);
|
|
maint_monitor_table(
|
|
&GloPgMon->monitordb, MAINT_CONNECT_LOG_QUERY, tasks_conf.ping.params
|
|
);
|
|
}
|
|
|
|
if (next_intvs.next_readonly_at <= intv_start) {
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Performed READONLY table maintenance intv_start=%lu\n", intv_start
|
|
);
|
|
maint_monitor_table(
|
|
&GloPgMon->monitordb, MAINT_READONLY_LOG_QUERY, tasks_conf.ping.params
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Builds the tasks batches for the current interval.
|
|
* @param tasks_conf The updated tasks config for the interval.
|
|
* @param next_intvs Timestamps of each operation next interval.
|
|
* @param intv_start Timestamp of current interval start.
|
|
* @return The new tasks batches to be queued for the worker threads.
|
|
*/
|
|
vector<task_batch_t> build_intv_batches(
|
|
const tasks_conf_t& tasks_conf, const tasks_intvs_t& next_intvs, uint64_t intv_start
|
|
) {
|
|
vector<task_batch_t> intv_tasks {};
|
|
|
|
if (next_intvs.next_ping_at <= intv_start && tasks_conf.ping.srvs_info->rows_count) {
|
|
intv_tasks.push_back({
|
|
task_type_t::ping,
|
|
uint64_t(tasks_conf.ping.srvs_info->rows_count),
|
|
tasks_conf.ping.params.interval,
|
|
tasks_conf.ping.params.interval_window,
|
|
intv_start,
|
|
create_simple_tasks<ping_conf_t,ping_params_t>(
|
|
intv_start, tasks_conf.user_info, tasks_conf.ping, task_type_t::ping
|
|
)
|
|
});
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Created PING tasks tasks=%lu intv_start=%lu\n",
|
|
intv_tasks.back().tasks.size(), intv_start
|
|
);
|
|
}
|
|
|
|
if (next_intvs.next_connect_at <= intv_start && tasks_conf.connect.srvs_info->rows_count) {
|
|
intv_tasks.push_back({
|
|
task_type_t::connect,
|
|
uint64_t(tasks_conf.connect.srvs_info->rows_count),
|
|
tasks_conf.connect.params.interval,
|
|
tasks_conf.connect.params.interval_window,
|
|
intv_start,
|
|
create_simple_tasks<connect_conf_t,connect_params_t>(
|
|
intv_start, tasks_conf.user_info, tasks_conf.connect, task_type_t::connect
|
|
)
|
|
});
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Created CONNECT tasks tasks=%lu intv_start=%lu\n",
|
|
intv_tasks.back().tasks.size(), intv_start
|
|
);
|
|
}
|
|
|
|
if (next_intvs.next_readonly_at <= intv_start && tasks_conf.readonly.srvs_info->rows_count) {
|
|
intv_tasks.push_back({
|
|
task_type_t::readonly,
|
|
uint64_t(tasks_conf.readonly.srvs_info->rows_count),
|
|
tasks_conf.readonly.params.interval,
|
|
tasks_conf.readonly.params.interval_window,
|
|
intv_start,
|
|
create_simple_tasks<readonly_conf_t,readonly_params_t>(
|
|
intv_start, tasks_conf.user_info, tasks_conf.readonly, task_type_t::readonly
|
|
)
|
|
});
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Created READONLY tasks tasks=%lu intv_start=%lu\n",
|
|
intv_tasks.back().tasks.size(), intv_start
|
|
);
|
|
}
|
|
|
|
return intv_tasks;
|
|
}
|
|
|
|
/**
|
|
* @brief Computes new tasks intervals using current ones and interval start.
|
|
* @param conf The updated tasks config for the interval.
|
|
* @param next_intvs Timestamps of each operation next interval.
|
|
* @param intv_start Timestamp of current interval start.
|
|
* @return The new next intervals for the tasks.
|
|
*/
|
|
tasks_intvs_t compute_next_intvs(
|
|
const tasks_conf_t& conf, const tasks_intvs_t& next_intvs, uint64_t intv_start
|
|
) {
|
|
tasks_intvs_t upd_intvs { next_intvs };
|
|
|
|
if (next_intvs.next_ping_at <= intv_start && conf.ping.params.interval != 0) {
|
|
if (conf.ping.params.interval != 0) {
|
|
upd_intvs.next_ping_at = intv_start + conf.ping.params.interval;
|
|
} else {
|
|
upd_intvs.next_ping_at = ULONG_MAX;
|
|
}
|
|
}
|
|
if (next_intvs.next_connect_at <= intv_start && conf.connect.params.interval != 0) {
|
|
if (conf.connect.params.interval != 0) {
|
|
upd_intvs.next_connect_at = intv_start + conf.connect.params.interval;
|
|
} else {
|
|
upd_intvs.next_connect_at = ULONG_MAX;
|
|
}
|
|
}
|
|
if (next_intvs.next_readonly_at <= intv_start && conf.readonly.params.interval != 0) {
|
|
if (conf.readonly.params.interval != 0) {
|
|
upd_intvs.next_readonly_at = intv_start + conf.readonly.params.interval;
|
|
} else {
|
|
upd_intvs.next_readonly_at = ULONG_MAX;
|
|
}
|
|
}
|
|
|
|
return upd_intvs;
|
|
}
|
|
|
|
void* PgSQL_monitor_scheduler_thread() {
|
|
proxy_info("Started Monitor scheduler thread for PgSQL servers\n");
|
|
|
|
// Quick exit during shutdown/restart
|
|
if (!GloPTH) { return NULL; }
|
|
|
|
// Initial Monitor thread variables version
|
|
unsigned int PgSQL_Thread__variables_version = GloPTH->get_global_version();
|
|
// PgSQL thread structure used for variable refreshing
|
|
unique_ptr<PgSQL_Thread> pgsql_thread { init_pgsql_thread_struct() };
|
|
|
|
task_queue_t conn_tasks {};
|
|
result_queue_t conn_results {};
|
|
|
|
uint32_t worker_threads_count = pgsql_thread___monitor_threads;
|
|
vector<worker_thread_t> workers {};
|
|
|
|
// TODO: Threads are right now fixed on startup.
|
|
for (uint32_t i = 0; i < worker_threads_count; i++) {
|
|
unique_ptr<worker_queue_t> worker_queue { new worker_queue_t {} };
|
|
auto [err, th] { create_thread(2048 * 1024, worker_thread, worker_queue.get()) };
|
|
assert(err == 0 && "Thread creation failed");
|
|
|
|
workers.emplace_back(worker_thread_t { std::move(th), std::move(worker_queue) });
|
|
}
|
|
|
|
uint64_t cur_intv_start = 0;
|
|
tasks_intvs_t next_intvs {};
|
|
vector<task_batch_t> tasks_batches {};
|
|
|
|
while (GloPgMon->shutdown == false && pgsql_thread___monitor_enabled == true) {
|
|
cur_intv_start = monotonic_time();
|
|
|
|
uint64_t closest_intv {
|
|
std::min({
|
|
next_intvs.next_ping_at,
|
|
next_intvs.next_connect_at,
|
|
next_intvs.next_readonly_at
|
|
})
|
|
};
|
|
|
|
if (cur_intv_start >= closest_intv) {
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Scheduling interval time=%lu delta=%lu ping=%lu connect=%lu readonly=%lu\n",
|
|
cur_intv_start,
|
|
cur_intv_start - closest_intv,
|
|
next_intvs.next_ping_at,
|
|
next_intvs.next_connect_at,
|
|
next_intvs.next_readonly_at
|
|
);
|
|
|
|
// Quick exit during shutdown/restart
|
|
if (!GloPTH) { return NULL; }
|
|
|
|
// Check variable version changes; refresh if needed
|
|
unsigned int glover = GloPTH->get_global_version();
|
|
if (PgSQL_Thread__variables_version < glover) {
|
|
PgSQL_Thread__variables_version = glover;
|
|
pgsql_thread->refresh_variables();
|
|
}
|
|
|
|
// Fetch config for next task scheduling
|
|
tasks_conf_t tasks_conf { fetch_updated_conf(GloPgMon, PgHGM) };
|
|
|
|
// Perform table maintenance
|
|
maint_mon_tables(tasks_conf, next_intvs, cur_intv_start);
|
|
|
|
// Create the tasks from config for this interval
|
|
vector<task_batch_t> next_batches {
|
|
build_intv_batches(tasks_conf, next_intvs, cur_intv_start)
|
|
};
|
|
|
|
if (next_batches.size()) {
|
|
append(tasks_batches, std::move(next_batches));
|
|
}
|
|
|
|
// Compute the next intervals for the tasks
|
|
next_intvs = compute_next_intvs(tasks_conf, next_intvs, cur_intv_start);
|
|
}
|
|
|
|
uint64_t batches_max_wait { ULONG_MAX };
|
|
|
|
for (task_batch_t& batch : tasks_batches) {
|
|
if (batch.next_sched > cur_intv_start) {
|
|
uint64_t wait { batch.next_sched - cur_intv_start };
|
|
|
|
if (batches_max_wait < wait) {
|
|
batches_max_wait = wait;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
const auto [rate, wait] = compute_task_rate(
|
|
workers.size(), batch.batch_sz, batch.intv_us, batch.intv_window
|
|
);
|
|
|
|
proxy_debug(PROXY_DEBUG_MONITOR, 5,
|
|
"Scheduling tasks batch type=%d workers=%lu rate=%lu wait=%lu\n",
|
|
int(batch.type), workers.size(), rate, wait
|
|
);
|
|
|
|
// Schedule tasks between the worker threads; simple even distribution
|
|
vector<task_st_t> tasks { get_from_batch(batch, rate) };
|
|
schedule_tasks(workers, std::move(tasks));
|
|
|
|
// Only set if there are tasks remaining
|
|
if (wait < batches_max_wait && batch.tasks.size() != 0) {
|
|
batches_max_wait = wait;
|
|
}
|
|
|
|
batch.next_sched = cur_intv_start + wait;
|
|
}
|
|
|
|
// Remove finished batches
|
|
tasks_batches.erase(
|
|
std::remove_if(tasks_batches.begin(), tasks_batches.end(),
|
|
[] (const task_batch_t& batch) -> bool {
|
|
return batch.tasks.empty();
|
|
}
|
|
),
|
|
tasks_batches.end()
|
|
);
|
|
|
|
{
|
|
const uint64_t curtime { monotonic_time() };
|
|
uint64_t upd_closest_intv {
|
|
std::min({
|
|
next_intvs.next_ping_at,
|
|
next_intvs.next_connect_at,
|
|
next_intvs.next_readonly_at
|
|
})
|
|
};
|
|
const uint64_t next_intv_diff { upd_closest_intv < curtime ? 0 : upd_closest_intv - curtime };
|
|
const uint64_t sched_wait_us { std::min({ batches_max_wait, next_intv_diff }) };
|
|
|
|
usleep(sched_wait_us > MAX_CHECK_DELAY_US ? MAX_CHECK_DELAY_US : sched_wait_us);
|
|
}
|
|
}
|
|
|
|
proxy_info("Exiting PgSQL_Monitor scheduling thread\n");
|
|
|
|
// Wakeup workers for shutdown
|
|
{
|
|
for (worker_thread_t& worker : workers) {
|
|
write_signal(worker.second->first.comm_fd[1], 1);
|
|
}
|
|
|
|
// Give some time for a clean exit
|
|
usleep(500 * 1000);
|
|
|
|
// Force the exit on the remaining threads
|
|
for (worker_thread_t& worker : workers) {
|
|
pthread_cancel(worker.first);
|
|
}
|
|
|
|
// Wait for the threads to actually exit
|
|
for (worker_thread_t& worker : workers) {
|
|
pthread_join(worker.first, NULL);
|
|
}
|
|
|
|
// Cleanup the global connection pool; no mutex, threads joined
|
|
for (auto& entry : mon_conn_pool.conn_map) {
|
|
for (auto& conn : entry.second) {
|
|
PQfinish(conn.conn);
|
|
}
|
|
}
|
|
mon_conn_pool.conn_map.clear();
|
|
}
|
|
|
|
return nullptr;
|
|
}
|