Refactored Watchdog and integrated PostgreSQL thread monitoring into its loop.

pull/5044/head
Rahim Kanji 10 months ago
parent 28931cd00d
commit e4c0f5ccd8

@ -1148,8 +1148,8 @@ void PgSQL_Data_Stream::return_MySQL_Connection_To_Pool() {
unsigned long long intv = pgsql_thread___connection_max_age_ms;
intv *= 1000;
if (
((intv) && (mc->last_time_used > mc->creation_time + intv)) ||
(mc->local_stmts->get_num_backend_stmts() > (unsigned int)GloPTH->variables.max_stmts_per_connection)
(((intv) && (mc->last_time_used > mc->creation_time + intv)) ||
(mc->local_stmts->get_num_backend_stmts() > (unsigned int)GloPTH->variables.max_stmts_per_connection))
&&
// NOTE: If the current session if in 'PINGING_SERVER' status, there is
// no need to reset the session. The destruction and creation of a new

@ -2315,6 +2315,168 @@ int print_jemalloc_conf() {
}
#endif
// Database type configuration
enum WatchDogEntityType {
ENTITY_TYPE_MYSQL,
ENTITY_TYPE_POSTGRESQL,
ENTITY_TYPE_COUNT
};
struct WatchdogGroup {
WatchDogEntityType type;
const char* name;
void* thread_handler; // GloMTH or GloPTH
unsigned int missed_heartbeats;
unsigned long long prev_time;
};
template <typename ThreadType>
unsigned int check_heartbeats(ThreadType* threads, unsigned int num_threads,
unsigned long long curtime, unsigned long long poll_timeout) {
unsigned int missing = 0;
if (!threads) return missing;
for (unsigned int i = 0; i < num_threads; ++i) {
if (threads[i].worker &&
curtime > threads[i].worker->atomic_curtime + poll_timeout) {
missing++;
}
}
return missing;
}
void watchdog_main_loop() {
#if 0
{
// the following commented code is here only to manually test handleProcessRestart()
// DO NOT ENABLE
//proxy_info("Service is up\n");
//sleep(2);
//assert(0);
}
#endif
WatchdogGroup groups[ENTITY_TYPE_COUNT] = {
{ ENTITY_TYPE_MYSQL, "MySQL", GloMTH, 0, monotonic_time() },
{ ENTITY_TYPE_POSTGRESQL, "PostgreSQL", GloPTH, 0, monotonic_time() }
};
unsigned int inner_loops = 0;
unsigned long long time_next_version_check = 0;
unsigned long long loop_prev_time = monotonic_time();
while (glovars.shutdown == 0) {
std::this_thread::sleep_for(std::chrono::microseconds(200000));
if (disable_watchdog) continue;
const unsigned long long curtime = monotonic_time();
// Version check handling
if (GloVars.global.version_check && curtime > time_next_version_check) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_t thr;
if (pthread_create(&thr, &attr, main_check_latest_version_thread, NULL) != 0) {
perror("Thread creation");
exit(EXIT_FAILURE);
}
if (time_next_version_check == 0)
time_next_version_check = curtime;
time_next_version_check += (24ULL * 3600 * 1000000); // 24h in microseconds
}
inner_loops++;
if (curtime >= ((inner_loops * 300000) + loop_prev_time)) {
//proxy_info("Watchdog: main loop is blocked for more than 300ms, resetting prev_time\n");
// if this happens, it means that this very simple loop is blocked
// probably we are running under gdb
loop_prev_time = curtime;
for (auto& group : groups) {
group.prev_time = curtime;
}
inner_loops = 0;
continue;
}
// Process all
for (auto& group : groups) {
if (!group.thread_handler) continue;
// Get poll timeout from thread handler
unsigned int poll_timeout_ms = 0;
switch (group.type) {
case ENTITY_TYPE_MYSQL:
poll_timeout_ms = static_cast<MySQL_Threads_Handler*>(group.thread_handler)->variables.poll_timeout;
break;
case ENTITY_TYPE_POSTGRESQL:
poll_timeout_ms = static_cast<PgSQL_Threads_Handler*>(group.thread_handler)->variables.poll_timeout;
break;
default:
assert(0);
}
const unsigned long long poll_timeout = (static_cast<unsigned long long>(poll_timeout_ms) + 1000) * 1000;
if (curtime < group.prev_time + poll_timeout) continue;
loop_prev_time = curtime;
group.prev_time = curtime;
inner_loops = 0;
// Check thread heartbeats
unsigned int threads_missing_heartbeat = 0;
switch (group.type) {
case ENTITY_TYPE_MYSQL:
{
MySQL_Threads_Handler* mth = static_cast<MySQL_Threads_Handler*>(group.thread_handler);
threads_missing_heartbeat += check_heartbeats(mth->mysql_threads, mth->num_threads, curtime, poll_timeout);
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
threads_missing_heartbeat += check_heartbeats(mth->mysql_threads_idles, mth->num_threads, curtime, poll_timeout);
}
#endif
break;
}
case ENTITY_TYPE_POSTGRESQL:
{
PgSQL_Threads_Handler* pth = static_cast<PgSQL_Threads_Handler*>(group.thread_handler);
threads_missing_heartbeat += check_heartbeats(pth->pgsql_threads, pth->num_threads, curtime, poll_timeout);
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
threads_missing_heartbeat += check_heartbeats(pth->pgsql_threads_idles, pth->num_threads, curtime, poll_timeout);
}
#endif
break;
}
default:
assert(0);
}
// Handle heartbeat results
if (threads_missing_heartbeat > 0) {
proxy_error("Watchdog: %u %s threads missed a heartbeat\n",
threads_missing_heartbeat, group.name);
group.missed_heartbeats++;
if (group.missed_heartbeats >= static_cast<unsigned int>(GloVars.restart_on_missing_heartbeats)) {
#ifndef RUNNING_ON_VALGRIND
if (GloVars.restart_on_missing_heartbeats) {
proxy_error("Watchdog: reached %u missed %s thread heartbeats. Aborting!\n",
group.missed_heartbeats, group.name);
proxy_error("Watchdog: see details at https://github.com/sysown/proxysql/wiki/Watchdog\n");
assert(0);
}
#else
proxy_error("Watchdog: reached %u missed %s thread heartbeats. Not aborting under Valgrind\n",
group.missed_heartbeats, group.name);
#endif
}
} else {
group.missed_heartbeats = 0;
}
}
}
}
int main(int argc, const char * argv[]) {
if (check_openssl_version() == false) {
@ -2789,107 +2951,7 @@ __start_label:
proxy_info("For support visit: https://proxysql.com/services/support/\n");
proxy_info("For consultancy visit: https://proxysql.com/services/consulting/\n");
{
#if 0
{
// the following commented code is here only to manually test handleProcessRestart()
// DO NOT ENABLE
//proxy_info("Service is up\n");
//sleep(2);
//assert(0);
}
#endif
unsigned int missed_heartbeats = 0;
unsigned long long previous_time = monotonic_time();
unsigned int inner_loops = 0;
unsigned long long time_next_version_check = 0;
while (glovars.shutdown==0) {
usleep(200000);
if (disable_watchdog) {
continue;
}
unsigned long long curtime = monotonic_time();
if (GloVars.global.version_check) {
if (curtime > time_next_version_check) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_t thr;
if (pthread_create(&thr, &attr, main_check_latest_version_thread, NULL) !=0 ) {
perror("Thread creation");
exit(EXIT_FAILURE);
}
if (time_next_version_check == 0)
time_next_version_check = curtime;
unsigned long long inter = 24*3600*1000;
inter *= 1000;
time_next_version_check += inter;
}
}
inner_loops++;
if (curtime >= inner_loops*300000 + previous_time ) {
// if this happens, it means that this very simple loop is blocked
// probably we are running under gdb
previous_time = curtime;
inner_loops = 0;
continue;
}
if (GloMTH) {
unsigned long long atomic_curtime = 0;
unsigned long long poll_timeout = (unsigned int)GloMTH->variables.poll_timeout;
unsigned int threads_missing_heartbeat = 0;
poll_timeout += 1000; // add 1 second (rounding up)
poll_timeout *= 1000; // convert to us
if (curtime < previous_time + poll_timeout) {
continue;
}
previous_time = curtime;
inner_loops = 0;
unsigned int i;
if (GloMTH->mysql_threads) {
for (i=0; i<GloMTH->num_threads; i++) {
if (GloMTH->mysql_threads[i].worker) {
atomic_curtime = GloMTH->mysql_threads[i].worker->atomic_curtime;
if (curtime > atomic_curtime + poll_timeout) {
threads_missing_heartbeat++;
}
}
}
}
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
if (GloMTH->mysql_threads) {
for (i=0; i<GloMTH->num_threads; i++) {
if (GloMTH->mysql_threads_idles[i].worker) {
atomic_curtime = GloMTH->mysql_threads_idles[i].worker->atomic_curtime;
if (curtime > atomic_curtime + poll_timeout) {
threads_missing_heartbeat++;
}
}
}
}
}
#endif
if (threads_missing_heartbeat) {
proxy_error("Watchdog: %u threads missed a heartbeat\n", threads_missing_heartbeat);
missed_heartbeats++;
if (missed_heartbeats >= (unsigned int)GloVars.restart_on_missing_heartbeats) {
#ifdef RUNNING_ON_VALGRIND
proxy_error("Watchdog: reached %u missed heartbeats. Not aborting because running under Valgrind\n", missed_heartbeats);
#else
if (GloVars.restart_on_missing_heartbeats) {
proxy_error("Watchdog: reached %u missed heartbeats. Aborting!\n", missed_heartbeats);
proxy_error("Watchdog: see details at https://github.com/sysown/proxysql/wiki/Watchdog\n");
assert(0);
}
#endif
}
} else {
missed_heartbeats = 0;
}
}
}
}
watchdog_main_loop();
__shutdown:

Loading…
Cancel
Save