Added support to terminate running queries on timeout

pull/5044/head
Rahim Kanji 8 months ago
parent 58eac30ff3
commit 2d569f3b95

@ -664,4 +664,21 @@ private:
static std::map<std::string, std::vector<std::string>> parse_pq_error_message(const std::string& error_str);
};
class PgSQL_CancelQueryArgs {
public:
PGconn* conn;
PgSQL_Thread* mt;
char* username;
char* hostname;
unsigned int port;
int backend_pid;
unsigned int hid;
PgSQL_CancelQueryArgs(PGconn* _conn, const char* user, const char* host,
unsigned int _port, unsigned int _hid, int _backend_pid, PgSQL_Thread* _mt);
~PgSQL_CancelQueryArgs();
};
void* PgSQL_cancel_query_thread(void* arg);
#endif /* __CLASS_PGSQL_CONNECTION_H */

@ -294,7 +294,16 @@ private:
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
int handler_again___status_PINGING_SERVER();
int handler_again___status_RESETTING_CONNECTION();
void handler_again___new_thread_to_kill_connection();
/**
* @brief Initiates a new thread to kill current running query.
*
* The handler_again___new_thread_to_cancel_query() method creates a new thread to initiate
* the cancellation of the current running query.
*
*/
void handler_again___new_thread_to_cancel_query();
bool handler_again___verify_init_connect();
#if 0
@ -554,31 +563,7 @@ public:
void set_previous_status_mode3(bool allow_execute = true);
};
#define PgSQL_KILL_QUERY 1
#define PgSQL_KILL_CONNECTION 2
class PgSQL_KillArgs {
public:
PgSQL_Thread* mt;
char* username;
char* password;
char* hostname;
unsigned int port;
int id;
int kill_type;
unsigned int hid;
int use_ssl;
PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt);
PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt, char* ip);
~PgSQL_KillArgs();
const char* get_host_address() const;
private:
char* ip_addr;
};
void* PgSQL_kill_query_thread(void* arg);
#endif /* __CLASS_PGSQL_SESSION_H */
#endif // CLASS_BASE_SESSION_H

@ -2638,3 +2638,48 @@ void PgSQL_Connection::copy_startup_parameters_to_pgsql_variables(bool copy_only
}
}
}
PgSQL_CancelQueryArgs::PgSQL_CancelQueryArgs(PGconn* _conn, const char* user, const char* host,
unsigned int _port, unsigned int _hid, int _backend_pid, PgSQL_Thread* _mt) {
conn = _conn;
username = strdup(user);
hostname = strdup(host);
port = _port;
hid = _hid;
backend_pid = _backend_pid;
mt = _mt;
}
PgSQL_CancelQueryArgs::~PgSQL_CancelQueryArgs() {
free(username);
free(hostname);
}
void* PgSQL_cancel_query_thread(void* arg) {
assert(arg);
PgSQL_CancelQueryArgs* ka = static_cast<PgSQL_CancelQueryArgs*>(arg);
PGcancel* cancel = PQgetCancel(ka->conn);
if (!cancel) {
proxy_error("Failed to cancel query on %s:%d with backend PID %d\n", ka->hostname, ka->port, ka->backend_pid);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, ka->hid, ka->hostname, ka->port, 999);
goto __exit_cancel_query_thread;
}
if (ka->mt) ka->mt->status_variables.stvar[st_var_killed_queries]++;
char errbuf[256];
if (!PQcancel(cancel, errbuf, sizeof(errbuf))) {
proxy_error("Failed to cancel query on %s:%d with backend PID %d: %s\n", ka->hostname, ka->port,
ka->backend_pid, errbuf);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, ka->hid, ka->hostname, ka->port, 999);
} else {
proxy_warning("Canceled query on %s:%d with backend PID %d successfully\n", ka->hostname, ka->port, ka->backend_pid);
}
__exit_cancel_query_thread:
if (cancel) PQfreeCancel(cancel);
delete ka;
return NULL;
}

@ -153,135 +153,6 @@ bool PgSQL_Session_Regex::match(char *m) {
}
*/
PgSQL_KillArgs::PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt) :
PgSQL_KillArgs(u, p, h, P, _hid, i, kt, _use_ssl, _mt, NULL) {
// resolving DNS if available in Cache
if (h && P) {
const std::string& ip = MySQL_Monitor::dns_lookup(h, false);
if (ip.empty() == false) {
ip_addr = strdup(ip.c_str());
}
}
}
PgSQL_KillArgs::PgSQL_KillArgs(char* u, char* p, char* h, unsigned int P, unsigned int _hid, int i, int kt, int _use_ssl, PgSQL_Thread* _mt, char* ip) {
username = strdup(u);
password = strdup(p);
hostname = strdup(h);
ip_addr = NULL;
if (ip)
ip_addr = strdup(ip);
port = P;
hid = _hid;
id = i;
kill_type = kt;
use_ssl = _use_ssl;
mt = _mt;
}
PgSQL_KillArgs::~PgSQL_KillArgs() {
free(username);
free(password);
free(hostname);
if (ip_addr)
free(ip_addr);
}
const char* PgSQL_KillArgs::get_host_address() const {
const char* host_address = hostname;
if (ip_addr)
host_address = ip_addr;
return host_address;
}
void* PgSQL_kill_query_thread(void* arg) {
PgSQL_KillArgs* ka = (PgSQL_KillArgs*)arg;
std::unique_ptr<MySQL_Thread> mysql_thr(new MySQL_Thread());
mysql_thr->curtime = monotonic_time();
mysql_thr->refresh_variables();
MYSQL* pgsql = mysql_init(NULL);
mysql_options4(pgsql, MYSQL_OPT_CONNECT_ATTR_ADD, "program_name", "proxysql_killer");
mysql_options4(pgsql, MYSQL_OPT_CONNECT_ATTR_ADD, "_server_host", ka->hostname);
if (ka->use_ssl && ka->port) {
mysql_ssl_set(pgsql,
pgsql_thread___ssl_p2s_key,
pgsql_thread___ssl_p2s_cert,
pgsql_thread___ssl_p2s_ca,
pgsql_thread___ssl_p2s_capath,
pgsql_thread___ssl_p2s_cipher);
mysql_options(pgsql, MYSQL_OPT_SSL_CRL, pgsql_thread___ssl_p2s_crl);
mysql_options(pgsql, MYSQL_OPT_SSL_CRLPATH, pgsql_thread___ssl_p2s_crlpath);
mysql_options(pgsql, MARIADB_OPT_SSL_KEYLOG_CALLBACK, (void*)proxysql_keylog_write_line_callback);
}
if (!pgsql) {
goto __exit_kill_query_thread;
}
MYSQL* ret;
if (ka->port) {
switch (ka->kill_type) {
case KILL_QUERY:
proxy_warning("KILL QUERY %d on %s:%d\n", ka->id, ka->hostname, ka->port);
if (ka->mt) {
ka->mt->status_variables.stvar[st_var_killed_queries]++;
}
break;
case KILL_CONNECTION:
proxy_warning("KILL CONNECTION %d on %s:%d\n", ka->id, ka->hostname, ka->port);
if (ka->mt) {
ka->mt->status_variables.stvar[st_var_killed_connections]++;
}
break;
default:
break;
}
ret = mysql_real_connect(pgsql, ka->get_host_address(), ka->username, ka->password, NULL, ka->port, NULL, 0);
}
else {
switch (ka->kill_type) {
case KILL_QUERY:
proxy_warning("KILL QUERY %d on localhost\n", ka->id);
break;
case KILL_CONNECTION:
proxy_warning("KILL CONNECTION %d on localhost\n", ka->id);
break;
default:
break;
}
ret = mysql_real_connect(pgsql, "localhost", ka->username, ka->password, NULL, 0, ka->hostname, 0);
}
if (!ret) {
proxy_error("Failed to connect to server %s:%d to run KILL %s %d: Error: %s\n", ka->hostname, ka->port, (ka->kill_type == KILL_QUERY ? "QUERY" : "CONNECTION"), ka->id, mysql_error(pgsql));
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, ka->hid, ka->hostname, ka->port, mysql_errno(pgsql));
goto __exit_kill_query_thread;
}
MySQL_Monitor::update_dns_cache_from_mysql_conn(pgsql);
char buf[100];
switch (ka->kill_type) {
case KILL_QUERY:
sprintf(buf, "KILL QUERY %d", ka->id);
break;
case KILL_CONNECTION:
sprintf(buf, "KILL CONNECTION %d", ka->id);
break;
default:
sprintf(buf, "KILL %d", ka->id);
break;
}
// FIXME: these 2 calls are blocking, fortunately on their own thread
mysql_query(pgsql, buf);
__exit_kill_query_thread:
if (pgsql)
mysql_close(pgsql);
delete ka;
return NULL;
}
extern PgSQL_Query_Processor* GloPgQPro;
extern PgSQL_Query_Cache *GloPgQC;
extern ProxySQL_Admin* GloAdmin;
@ -1218,32 +1089,22 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() {
return 0;
}
void PgSQL_Session::handler_again___new_thread_to_kill_connection() {
void PgSQL_Session::handler_again___new_thread_to_cancel_query() {
PgSQL_Data_Stream* myds = mybe->server_myds;
if (myds->myconn && false /*myds->myconn->pgsql*/) { // TODO: fix this
if (myds->myconn) {
if (myds->killed_at == 0) {
myds->wait_until = 0;
myds->killed_at = thread->curtime;
//fprintf(stderr,"Expired: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime);
PgSQL_Connection_userinfo* ui = client_myds->myconn->userinfo;
char* auth_password = NULL;
if (ui->password) {
if (ui->password[0] == '*') { // we don't have the real password, let's pass sha1
auth_password = ui->sha1_pass;
}
else {
auth_password = ui->password;
}
}
PgSQL_KillArgs* ka = new PgSQL_KillArgs(ui->username, auth_password, myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->parent->myhgc->hid, myds->myconn->get_backend_pid(), KILL_QUERY, myds->myconn->parent->use_ssl, thread, myds->myconn->connected_host_details.ip);
const PgSQL_Connection_userinfo* ui = client_myds->myconn->userinfo;
std::unique_ptr<PgSQL_CancelQueryArgs> ka = std::make_unique<PgSQL_CancelQueryArgs>((PGconn*)myds->myconn->get_pg_connection(), ui->username,
myds->myconn->parent->address, myds->myconn->parent->port, myds->myconn->parent->myhgc->hid, myds->myconn->get_backend_pid(), thread);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setstacksize(&attr, 256 * 1024);
pthread_t pt;
if (pthread_create(&pt, &attr, &PgSQL_kill_query_thread, ka) != 0) {
if (pthread_create(&pt, &attr, &PgSQL_cancel_query_thread, ka.release()) != 0) {
// LCOV_EXCL_START
proxy_error("Thread creation\n");
assert(0);
@ -2926,8 +2787,7 @@ handler_again:
if (CurrentQuery.extended_query_info.stmt_info == NULL) { // text protocol
query = std::string{ mybe->server_myds->myconn->query.ptr, mybe->server_myds->myconn->query.length };
}
else { // prepared statement
} else { // prepared statement
query = std::string{ CurrentQuery.extended_query_info.stmt_info->query, CurrentQuery.extended_query_info.stmt_info->query_length };
}
@ -2938,17 +2798,19 @@ handler_again:
client_addr = client_myds->addr.addr ? client_myds->addr.addr : "";
client_port = client_myds->addr.port;
}
proxy_warning(
"Killing connection %s:%d because query '%s' from client '%s':%d timed out.\n",
"Terminating running query %s on connection %s:%d from client %s:%d because it timed out.\n",
query.c_str(),
mybe->server_myds->myconn->parent->address,
mybe->server_myds->myconn->parent->port,
query.c_str(),
client_addr.c_str(),
client_port
);
}
handler_again___new_thread_to_kill_connection();
// it calls handler_again___new_thread_to_cancel_query() to initiate the killing of the connection
// associated with the session that timed out.
handler_again___new_thread_to_cancel_query();
}
if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) {
// we don't have a backend yet

Loading…
Cancel
Save