From e4c0f5ccd89bf593f29bf9cf3cd6def1593e6763 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 23 Jul 2025 12:02:00 +0500 Subject: [PATCH] Refactored Watchdog and integrated PostgreSQL thread monitoring into its loop. --- lib/PgSQL_Data_Stream.cpp | 4 +- src/main.cpp | 264 +++++++++++++++++++++++--------------- 2 files changed, 165 insertions(+), 103 deletions(-) diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index 679a98d18..2fd59ac5d 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -1148,8 +1148,8 @@ void PgSQL_Data_Stream::return_MySQL_Connection_To_Pool() { unsigned long long intv = pgsql_thread___connection_max_age_ms; intv *= 1000; if ( - ((intv) && (mc->last_time_used > mc->creation_time + intv)) || - (mc->local_stmts->get_num_backend_stmts() > (unsigned int)GloPTH->variables.max_stmts_per_connection) + (((intv) && (mc->last_time_used > mc->creation_time + intv)) || + (mc->local_stmts->get_num_backend_stmts() > (unsigned int)GloPTH->variables.max_stmts_per_connection)) && // NOTE: If the current session if in 'PINGING_SERVER' status, there is // no need to reset the session. The destruction and creation of a new diff --git a/src/main.cpp b/src/main.cpp index 1fc66f81f..b44ca03ed 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -2315,6 +2315,168 @@ int print_jemalloc_conf() { } #endif +// Database type configuration +enum WatchDogEntityType { + ENTITY_TYPE_MYSQL, + ENTITY_TYPE_POSTGRESQL, + ENTITY_TYPE_COUNT +}; + +struct WatchdogGroup { + WatchDogEntityType type; + const char* name; + void* thread_handler; // GloMTH or GloPTH + unsigned int missed_heartbeats; + unsigned long long prev_time; +}; + +template +unsigned int check_heartbeats(ThreadType* threads, unsigned int num_threads, + unsigned long long curtime, unsigned long long poll_timeout) { + unsigned int missing = 0; + if (!threads) return missing; + + for (unsigned int i = 0; i < num_threads; ++i) { + if (threads[i].worker && + curtime > threads[i].worker->atomic_curtime + poll_timeout) { + missing++; + } + } + return missing; +} + +void watchdog_main_loop() { +#if 0 + { + // the following commented code is here only to manually test handleProcessRestart() + // DO NOT ENABLE + //proxy_info("Service is up\n"); + //sleep(2); + //assert(0); + } +#endif + + WatchdogGroup groups[ENTITY_TYPE_COUNT] = { + { ENTITY_TYPE_MYSQL, "MySQL", GloMTH, 0, monotonic_time() }, + { ENTITY_TYPE_POSTGRESQL, "PostgreSQL", GloPTH, 0, monotonic_time() } + }; + + unsigned int inner_loops = 0; + unsigned long long time_next_version_check = 0; + unsigned long long loop_prev_time = monotonic_time(); + while (glovars.shutdown == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(200000)); + + if (disable_watchdog) continue; + + const unsigned long long curtime = monotonic_time(); + + // Version check handling + if (GloVars.global.version_check && curtime > time_next_version_check) { + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_t thr; + if (pthread_create(&thr, &attr, main_check_latest_version_thread, NULL) != 0) { + perror("Thread creation"); + exit(EXIT_FAILURE); + } + if (time_next_version_check == 0) + time_next_version_check = curtime; + time_next_version_check += (24ULL * 3600 * 1000000); // 24h in microseconds + } + + inner_loops++; + if (curtime >= ((inner_loops * 300000) + loop_prev_time)) { + //proxy_info("Watchdog: main loop is blocked for more than 300ms, resetting prev_time\n"); + // if this happens, it means that this very simple loop is blocked + // probably we are running under gdb + loop_prev_time = curtime; + for (auto& group : groups) { + group.prev_time = curtime; + } + inner_loops = 0; + continue; + } + + // Process all + for (auto& group : groups) { + if (!group.thread_handler) continue; + + // Get poll timeout from thread handler + unsigned int poll_timeout_ms = 0; + switch (group.type) { + case ENTITY_TYPE_MYSQL: + poll_timeout_ms = static_cast(group.thread_handler)->variables.poll_timeout; + break; + case ENTITY_TYPE_POSTGRESQL: + poll_timeout_ms = static_cast(group.thread_handler)->variables.poll_timeout; + break; + default: + assert(0); + } + + const unsigned long long poll_timeout = (static_cast(poll_timeout_ms) + 1000) * 1000; + + if (curtime < group.prev_time + poll_timeout) continue; + loop_prev_time = curtime; + group.prev_time = curtime; + inner_loops = 0; + + // Check thread heartbeats + unsigned int threads_missing_heartbeat = 0; + switch (group.type) { + case ENTITY_TYPE_MYSQL: + { + MySQL_Threads_Handler* mth = static_cast(group.thread_handler); + threads_missing_heartbeat += check_heartbeats(mth->mysql_threads, mth->num_threads, curtime, poll_timeout); +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + threads_missing_heartbeat += check_heartbeats(mth->mysql_threads_idles, mth->num_threads, curtime, poll_timeout); + } +#endif + break; + } + case ENTITY_TYPE_POSTGRESQL: + { + PgSQL_Threads_Handler* pth = static_cast(group.thread_handler); + threads_missing_heartbeat += check_heartbeats(pth->pgsql_threads, pth->num_threads, curtime, poll_timeout); +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + threads_missing_heartbeat += check_heartbeats(pth->pgsql_threads_idles, pth->num_threads, curtime, poll_timeout); + } +#endif + break; + } + default: + assert(0); + } + // Handle heartbeat results + if (threads_missing_heartbeat > 0) { + proxy_error("Watchdog: %u %s threads missed a heartbeat\n", + threads_missing_heartbeat, group.name); + group.missed_heartbeats++; + + if (group.missed_heartbeats >= static_cast(GloVars.restart_on_missing_heartbeats)) { +#ifndef RUNNING_ON_VALGRIND + if (GloVars.restart_on_missing_heartbeats) { + proxy_error("Watchdog: reached %u missed %s thread heartbeats. Aborting!\n", + group.missed_heartbeats, group.name); + proxy_error("Watchdog: see details at https://github.com/sysown/proxysql/wiki/Watchdog\n"); + assert(0); + } +#else + proxy_error("Watchdog: reached %u missed %s thread heartbeats. Not aborting under Valgrind\n", + group.missed_heartbeats, group.name); +#endif + } + } else { + group.missed_heartbeats = 0; + } + } + } +} + int main(int argc, const char * argv[]) { if (check_openssl_version() == false) { @@ -2789,107 +2951,7 @@ __start_label: proxy_info("For support visit: https://proxysql.com/services/support/\n"); proxy_info("For consultancy visit: https://proxysql.com/services/consulting/\n"); - { -#if 0 - { - // the following commented code is here only to manually test handleProcessRestart() - // DO NOT ENABLE - //proxy_info("Service is up\n"); - //sleep(2); - //assert(0); - } -#endif - unsigned int missed_heartbeats = 0; - unsigned long long previous_time = monotonic_time(); - unsigned int inner_loops = 0; - unsigned long long time_next_version_check = 0; - while (glovars.shutdown==0) { - usleep(200000); - if (disable_watchdog) { - continue; - } - unsigned long long curtime = monotonic_time(); - if (GloVars.global.version_check) { - if (curtime > time_next_version_check) { - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - pthread_t thr; - if (pthread_create(&thr, &attr, main_check_latest_version_thread, NULL) !=0 ) { - perror("Thread creation"); - exit(EXIT_FAILURE); - } - if (time_next_version_check == 0) - time_next_version_check = curtime; - unsigned long long inter = 24*3600*1000; - inter *= 1000; - time_next_version_check += inter; - } - } - inner_loops++; - if (curtime >= inner_loops*300000 + previous_time ) { - // if this happens, it means that this very simple loop is blocked - // probably we are running under gdb - previous_time = curtime; - inner_loops = 0; - continue; - } - if (GloMTH) { - unsigned long long atomic_curtime = 0; - unsigned long long poll_timeout = (unsigned int)GloMTH->variables.poll_timeout; - unsigned int threads_missing_heartbeat = 0; - poll_timeout += 1000; // add 1 second (rounding up) - poll_timeout *= 1000; // convert to us - if (curtime < previous_time + poll_timeout) { - continue; - } - previous_time = curtime; - inner_loops = 0; - unsigned int i; - if (GloMTH->mysql_threads) { - for (i=0; inum_threads; i++) { - if (GloMTH->mysql_threads[i].worker) { - atomic_curtime = GloMTH->mysql_threads[i].worker->atomic_curtime; - if (curtime > atomic_curtime + poll_timeout) { - threads_missing_heartbeat++; - } - } - } - } -#ifdef IDLE_THREADS - if (GloVars.global.idle_threads) { - if (GloMTH->mysql_threads) { - for (i=0; inum_threads; i++) { - if (GloMTH->mysql_threads_idles[i].worker) { - atomic_curtime = GloMTH->mysql_threads_idles[i].worker->atomic_curtime; - if (curtime > atomic_curtime + poll_timeout) { - threads_missing_heartbeat++; - } - } - } - } - } -#endif - if (threads_missing_heartbeat) { - proxy_error("Watchdog: %u threads missed a heartbeat\n", threads_missing_heartbeat); - missed_heartbeats++; - if (missed_heartbeats >= (unsigned int)GloVars.restart_on_missing_heartbeats) { -#ifdef RUNNING_ON_VALGRIND - proxy_error("Watchdog: reached %u missed heartbeats. Not aborting because running under Valgrind\n", missed_heartbeats); -#else - if (GloVars.restart_on_missing_heartbeats) { - proxy_error("Watchdog: reached %u missed heartbeats. Aborting!\n", missed_heartbeats); - proxy_error("Watchdog: see details at https://github.com/sysown/proxysql/wiki/Watchdog\n"); - assert(0); - } -#endif - } - } else { - missed_heartbeats = 0; - } - } - } - } + watchdog_main_loop(); __shutdown: