From 2d569f3b95d5174ad97e571827f9bb0c81b5dcef Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Sun, 17 Aug 2025 14:55:33 +0500 Subject: [PATCH] Added support to terminate running queries on timeout --- include/PgSQL_Connection.h | 17 ++++ include/PgSQL_Session.h | 35 +++----- lib/PgSQL_Connection.cpp | 45 ++++++++++ lib/PgSQL_Session.cpp | 164 +++---------------------------------- 4 files changed, 85 insertions(+), 176 deletions(-) diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index 4141fd580..cae9d0d48 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -664,4 +664,21 @@ private: static std::map> parse_pq_error_message(const std::string& error_str); }; +class PgSQL_CancelQueryArgs { +public: + PGconn* conn; + PgSQL_Thread* mt; + char* username; + char* hostname; + unsigned int port; + int backend_pid; + unsigned int hid; + + PgSQL_CancelQueryArgs(PGconn* _conn, const char* user, const char* host, + unsigned int _port, unsigned int _hid, int _backend_pid, PgSQL_Thread* _mt); + ~PgSQL_CancelQueryArgs(); +}; + +void* PgSQL_cancel_query_thread(void* arg); + #endif /* __CLASS_PGSQL_CONNECTION_H */ diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 376ab56f6..ed4dd246e 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -294,7 +294,16 @@ private: void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); int handler_again___status_PINGING_SERVER(); int handler_again___status_RESETTING_CONNECTION(); - void handler_again___new_thread_to_kill_connection(); + + + /** + * @brief Initiates a new thread to kill current running query. + * + * The handler_again___new_thread_to_cancel_query() method creates a new thread to initiate + * the cancellation of the current running query. + * + */ + void handler_again___new_thread_to_cancel_query(); bool handler_again___verify_init_connect(); #if 0 @@ -554,31 +563,7 @@ public: void set_previous_status_mode3(bool allow_execute = true); }; -#define PgSQL_KILL_QUERY 1 -#define PgSQL_KILL_CONNECTION 2 - -class PgSQL_KillArgs { -public: - PgSQL_Thread* mt; - char* username; - char* password; - char* hostname; - unsigned int port; - int id; - int kill_type; - unsigned int hid; - int use_ssl; - - PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt); - PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt, char* ip); - ~PgSQL_KillArgs(); - const char* get_host_address() const; - -private: - char* ip_addr; -}; -void* PgSQL_kill_query_thread(void* arg); #endif /* __CLASS_PGSQL_SESSION_H */ #endif // CLASS_BASE_SESSION_H diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index 93259d04e..fb74a0a3b 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -2638,3 +2638,48 @@ void PgSQL_Connection::copy_startup_parameters_to_pgsql_variables(bool copy_only } } } + +PgSQL_CancelQueryArgs::PgSQL_CancelQueryArgs(PGconn* _conn, const char* user, const char* host, + unsigned int _port, unsigned int _hid, int _backend_pid, PgSQL_Thread* _mt) { + conn = _conn; + username = strdup(user); + hostname = strdup(host); + port = _port; + hid = _hid; + backend_pid = _backend_pid; + mt = _mt; +} + +PgSQL_CancelQueryArgs::~PgSQL_CancelQueryArgs() { + free(username); + free(hostname); +} + +void* PgSQL_cancel_query_thread(void* arg) { + assert(arg); + PgSQL_CancelQueryArgs* ka = static_cast(arg); + + PGcancel* cancel = PQgetCancel(ka->conn); + + if (!cancel) { + proxy_error("Failed to cancel query on %s:%d with backend PID %d\n", ka->hostname, ka->port, ka->backend_pid); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, ka->hid, ka->hostname, ka->port, 999); + goto __exit_cancel_query_thread; + } + + if (ka->mt) ka->mt->status_variables.stvar[st_var_killed_queries]++; + + char errbuf[256]; + if (!PQcancel(cancel, errbuf, sizeof(errbuf))) { + proxy_error("Failed to cancel query on %s:%d with backend PID %d: %s\n", ka->hostname, ka->port, + ka->backend_pid, errbuf); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, ka->hid, ka->hostname, ka->port, 999); + } else { + proxy_warning("Canceled query on %s:%d with backend PID %d successfully\n", ka->hostname, ka->port, ka->backend_pid); + } + +__exit_cancel_query_thread: + if (cancel) PQfreeCancel(cancel); + delete ka; + return NULL; +} diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 86718839b..e8a89870e 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -153,135 +153,6 @@ bool PgSQL_Session_Regex::match(char *m) { } */ -PgSQL_KillArgs::PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt) : - PgSQL_KillArgs(u, p, h, P, _hid, i, kt, _use_ssl, _mt, NULL) { - // resolving DNS if available in Cache - if (h && P) { - const std::string& ip = MySQL_Monitor::dns_lookup(h, false); - - if (ip.empty() == false) { - ip_addr = strdup(ip.c_str()); - } - } -} -PgSQL_KillArgs::PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt, char* ip) { - username = strdup(u); - password = strdup(p); - hostname = strdup(h); - ip_addr = NULL; - if (ip) - ip_addr = strdup(ip); - port = P; - hid = _hid; - id = i; - kill_type = kt; - use_ssl = _use_ssl; - mt = _mt; -} - -PgSQL_KillArgs::~PgSQL_KillArgs() { - free(username); - free(password); - free(hostname); - if (ip_addr) - free(ip_addr); -} - -const char* PgSQL_KillArgs::get_host_address() const { - const char* host_address = hostname; - - if (ip_addr) - host_address = ip_addr; - - return host_address; -} - -void* PgSQL_kill_query_thread(void* arg) { - PgSQL_KillArgs* ka = (PgSQL_KillArgs*)arg; - std::unique_ptr mysql_thr(new MySQL_Thread()); - mysql_thr->curtime = monotonic_time(); - mysql_thr->refresh_variables(); - MYSQL* pgsql = mysql_init(NULL); - mysql_options4(pgsql, MYSQL_OPT_CONNECT_ATTR_ADD, "program_name", "proxysql_killer"); - mysql_options4(pgsql, MYSQL_OPT_CONNECT_ATTR_ADD, "_server_host", ka->hostname); - - if (ka->use_ssl && ka->port) { - mysql_ssl_set(pgsql, - pgsql_thread___ssl_p2s_key, - pgsql_thread___ssl_p2s_cert, - pgsql_thread___ssl_p2s_ca, - pgsql_thread___ssl_p2s_capath, - pgsql_thread___ssl_p2s_cipher); - mysql_options(pgsql, MYSQL_OPT_SSL_CRL, pgsql_thread___ssl_p2s_crl); - mysql_options(pgsql, MYSQL_OPT_SSL_CRLPATH, pgsql_thread___ssl_p2s_crlpath); - mysql_options(pgsql, MARIADB_OPT_SSL_KEYLOG_CALLBACK, (void*)proxysql_keylog_write_line_callback); - } - - if (!pgsql) { - goto __exit_kill_query_thread; - } - MYSQL* ret; - if (ka->port) { - switch (ka->kill_type) { - case KILL_QUERY: - proxy_warning("KILL QUERY %d on %s:%d\n", ka->id, ka->hostname, ka->port); - if (ka->mt) { - ka->mt->status_variables.stvar[st_var_killed_queries]++; - } - break; - case KILL_CONNECTION: - proxy_warning("KILL CONNECTION %d on %s:%d\n", ka->id, ka->hostname, ka->port); - if (ka->mt) { - ka->mt->status_variables.stvar[st_var_killed_connections]++; - } - break; - default: - break; - } - ret = mysql_real_connect(pgsql, ka->get_host_address(), ka->username, ka->password, NULL, ka->port, NULL, 0); - } - else { - switch (ka->kill_type) { - case KILL_QUERY: - proxy_warning("KILL QUERY %d on localhost\n", ka->id); - break; - case KILL_CONNECTION: - proxy_warning("KILL CONNECTION %d on localhost\n", ka->id); - break; - default: - break; - } - ret = mysql_real_connect(pgsql, "localhost", ka->username, ka->password, NULL, 0, ka->hostname, 0); - } - if (!ret) { - proxy_error("Failed to connect to server %s:%d to run KILL %s %d: Error: %s\n", ka->hostname, ka->port, (ka->kill_type == KILL_QUERY ? "QUERY" : "CONNECTION"), ka->id, mysql_error(pgsql)); - PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, ka->hid, ka->hostname, ka->port, mysql_errno(pgsql)); - goto __exit_kill_query_thread; - } - - MySQL_Monitor::update_dns_cache_from_mysql_conn(pgsql); - - char buf[100]; - switch (ka->kill_type) { - case KILL_QUERY: - sprintf(buf, "KILL QUERY %d", ka->id); - break; - case KILL_CONNECTION: - sprintf(buf, "KILL CONNECTION %d", ka->id); - break; - default: - sprintf(buf, "KILL %d", ka->id); - break; - } - // FIXME: these 2 calls are blocking, fortunately on their own thread - mysql_query(pgsql, buf); -__exit_kill_query_thread: - if (pgsql) - mysql_close(pgsql); - delete ka; - return NULL; -} - extern PgSQL_Query_Processor* GloPgQPro; extern PgSQL_Query_Cache *GloPgQC; extern ProxySQL_Admin* GloAdmin; @@ -1218,32 +1089,22 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() { return 0; } - -void PgSQL_Session::handler_again___new_thread_to_kill_connection() { +void PgSQL_Session::handler_again___new_thread_to_cancel_query() { PgSQL_Data_Stream* myds = mybe->server_myds; - if (myds->myconn && false /*myds->myconn->pgsql*/) { // TODO: fix this + if (myds->myconn) { if (myds->killed_at == 0) { myds->wait_until = 0; myds->killed_at = thread->curtime; - //fprintf(stderr,"Expired: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime); - PgSQL_Connection_userinfo* ui = client_myds->myconn->userinfo; - char* auth_password = NULL; - if (ui->password) { - if (ui->password[0] == '*') { // we don't have the real password, let's pass sha1 - auth_password = ui->sha1_pass; - } - else { - auth_password = ui->password; - } - } - PgSQL_KillArgs* ka = new PgSQL_KillArgs(ui->username, auth_password, myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->parent->myhgc->hid, myds->myconn->get_backend_pid(), KILL_QUERY, myds->myconn->parent->use_ssl, thread, myds->myconn->connected_host_details.ip); + const PgSQL_Connection_userinfo* ui = client_myds->myconn->userinfo; + std::unique_ptr ka = std::make_unique((PGconn*)myds->myconn->get_pg_connection(), ui->username, + myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->parent->myhgc->hid, myds->myconn->get_backend_pid(), thread); pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setstacksize(&attr, 256 * 1024); pthread_t pt; - if (pthread_create(&pt, &attr, &PgSQL_kill_query_thread, ka) != 0) { + if (pthread_create(&pt, &attr, &PgSQL_cancel_query_thread, ka.release()) != 0) { // LCOV_EXCL_START proxy_error("Thread creation\n"); assert(0); @@ -2926,8 +2787,7 @@ handler_again: if (CurrentQuery.extended_query_info.stmt_info == NULL) { // text protocol query = std::string{ mybe->server_myds->myconn->query.ptr, mybe->server_myds->myconn->query.length }; - } - else { // prepared statement + } else { // prepared statement query = std::string{ CurrentQuery.extended_query_info.stmt_info->query, CurrentQuery.extended_query_info.stmt_info->query_length }; } @@ -2938,17 +2798,19 @@ handler_again: client_addr = client_myds->addr.addr ? client_myds->addr.addr : ""; client_port = client_myds->addr.port; } - + proxy_warning( - "Killing connection %s:%d because query '%s' from client '%s':%d timed out.\n", + "Terminating running query %s on connection %s:%d from client %s:%d because it timed out.\n", + query.c_str(), mybe->server_myds->myconn->parent->address, mybe->server_myds->myconn->parent->port, - query.c_str(), client_addr.c_str(), client_port ); } - handler_again___new_thread_to_kill_connection(); + // it calls handler_again___new_thread_to_cancel_query() to initiate the killing of the connection + // associated with the session that timed out. + handler_again___new_thread_to_cancel_query(); } if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) { // we don't have a backend yet