Support for KILL command #860

Both KILL QUERY and KILL CONNECTION work

The only security check enforced is that the user sending the KILL
is the same user of the connection/query being killed.
pull/1779/head
René Cannaò 8 years ago
parent 2e381bbeb9
commit a058d5cc9b

@ -128,6 +128,8 @@ class MySQL_Data_Stream
short revents;
char kill_type;
bool encrypted;
bool net_failure;

@ -218,6 +218,7 @@ class MySQL_Session
void writeout();
void Memory_Stats();
void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds);
bool handle_command_query_kill(PtrSize_t *);
};
#define KILL_QUERY 1

@ -35,6 +35,17 @@ typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) _conn_exchange_t {
} conn_exchange_t;
#endif // IDLE_THREADS
typedef struct _thr_id_username_t {
uint32_t id;
char *username;
} thr_id_usr;
typedef struct _kill_queue_t {
pthread_mutex_t m;
std::vector<thr_id_usr *> conn_ids;
std::vector<thr_id_usr *> query_ids;
} kill_queue_t;
class ProxySQL_Poll {
private:
@ -198,6 +209,7 @@ class MySQL_Thread
int pipefd[2];
int shutdown;
kill_queue_t kq;
bool epoll_thread;
bool poll_timeout_bool;
@ -265,6 +277,8 @@ class MySQL_Thread
MySQL_Connection * get_MyConn_local(unsigned int, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid);
void push_MyConn_local(MySQL_Connection *);
void return_local_connections();
void Scan_Sessions_to_Kill(PtrArray *mysess);
void Scan_Sessions_to_Kill_All();
};
@ -501,6 +515,7 @@ class MySQL_Threads_Handler
return MLM->find_iface_from_fd(fd);
}
void Get_Memory_Stats();
void kill_connection_or_query(uint32_t _thread_session_id, bool query, char *username);
};

@ -33,6 +33,7 @@ void tokenizer( tokenizer_t *, const char* s, const char* delimiters, int emptie
const char* free_tokenizer( tokenizer_t* tokenizer );
const char* tokenize( tokenizer_t* tokenizer );
char * mysql_query_digest_and_first_comment(char *s , int len , char **first_comment, char *buf);
char * mysql_query_strip_comments(char *s , int len);
void c_split_2(const char *in, const char *del, char **out1, char **out2);
#ifdef __cplusplus
}

@ -2020,7 +2020,11 @@ void MySQL_ResultSet::add_err(MySQL_Data_Stream *_myds) {
char sqlstate[10];
sprintf(sqlstate,"%s",mysql_sqlstate(_mysql));
if (_myds && _myds->killed_at) { // see case #750
myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded");
if (_myds->kill_type == 0) {
myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded");
} else {
myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,1317,sqlstate,(char *)"Query execution was interrupted");
}
} else {
myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,mysql_errno(_mysql),sqlstate,mysql_error(_mysql));
}

@ -2289,6 +2289,7 @@ __get_pkts_from_client:
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
mybe->server_myds->killed_at=0;
mybe->server_myds->kill_type=0;
mybe->server_myds->mysql_real_query.init(&pkt);
client_myds->setDSS_STATE_QUERY_SENT_NET();
} else {
@ -2418,6 +2419,7 @@ __get_pkts_from_client:
mybe->server_myds->wait_until=0;
pause_until=0;
mybe->server_myds->killed_at=0;
mybe->server_myds->kill_type=0;
mybe->server_myds->mysql_real_query.init(&pkt); // fix memory leak for PREPARE in prepared statements #796
client_myds->setDSS_STATE_QUERY_SENT_NET();
}
@ -2497,6 +2499,7 @@ __get_pkts_from_client:
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until=0;
mybe->server_myds->killed_at=0;
mybe->server_myds->kill_type=0;
client_myds->setDSS_STATE_QUERY_SENT_NET();
}
break;
@ -4211,6 +4214,12 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
// handle command KILL #860
if (prepared == false) {
if (handle_command_query_kill(pkt)) {
return true;
}
}
if (qpo->cache_ttl>0) {
uint32_t resbuf=0;
unsigned char *aa=GloQC->get(
@ -4493,7 +4502,11 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My
char sqlstate[10];
sprintf(sqlstate,"%s",mysql_sqlstate(mysql));
if (_myds && _myds->killed_at) { // see case #750
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded");
if (_myds->kill_type == 0) {
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded");
} else {
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1317,sqlstate,(char *)"Query execution was interrupted");
}
} else {
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(mysql),sqlstate,mysql_error(mysql));
}
@ -4741,3 +4754,59 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_
delete new_sess;
}
}
bool MySQL_Session::handle_command_query_kill(PtrSize_t *pkt) {
unsigned char command_type=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
if (CurrentQuery.QueryParserArgs.digest_text) {
if (command_type == _MYSQL_COM_QUERY) {
if (client_myds && client_myds->myconn) {
MySQL_Connection *mc = client_myds->myconn;
if (mc->userinfo && mc->userinfo->username) {
if (CurrentQuery.MyComQueryCmd == MYSQL_COM_QUERY_KILL) {
char *qu = mysql_query_strip_comments((char *)pkt->ptr+1+sizeof(mysql_hdr), pkt->size-1-sizeof(mysql_hdr));
string nq=string(qu,strlen(qu));
re2::RE2::Options *opt2=new re2::RE2::Options(RE2::Quiet);
opt2->set_case_sensitive(false);
char *pattern=(char *)"^KILL\\s+(CONNECTION |QUERY |)\\s*(\\d+)\\s*$";
re2::RE2 *re=new RE2(pattern, *opt2);
int id=0;
string tk;
int rc;
rc=RE2::FullMatch(nq, *re, &tk, &id);
delete re;
delete opt2;
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 2, "filtered query= \"%s\"\n", qu);
free(qu);
if (id) {
int tki = -1;
if (tk.c_str()) {
if ((strlen(tk.c_str())==0) || (strcasecmp(tk.c_str(),"CONNECTION ")==0)) {
tki = 0;
} else {
if (strcasecmp(tk.c_str(),"QUERY ")==0) {
tki = 1;
}
}
}
if (tki >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 2, "Killing %s %d\n", (tki == 0 ? "CONNECTION" : "QUERY") , id);
GloMTH->kill_connection_or_query( id, (tki == 0 ? false : true ), mc->userinfo->username);
client_myds->DSS=STATE_QUERY_SENT_NET;
unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
l_free(pkt->size,pkt->ptr);
RequestEnd(NULL);
return true;
}
}
}
}
}
}
}
return false;
}

@ -2582,6 +2582,8 @@ bool MySQL_Thread::init() {
assert(resume_mysql_sessions);
#endif // IDLE_THREADS
pthread_mutex_init(&kq.m,NULL);
shutdown=0;
my_idle_conns=(MySQL_Connection **)malloc(sizeof(MySQL_Connection *)*SESSIONS_FOR_CONNECTIONS_HANDLER);
memset(my_idle_conns,0,sizeof(MySQL_Connection *)*SESSIONS_FOR_CONNECTIONS_HANDLER);
@ -2965,6 +2967,7 @@ __run_skip_1a:
curtime=monotonic_time();
atomic_curtime=curtime;
poll_timeout_bool=false;
if (
#ifdef IDLE_THREADS
@ -2988,6 +2991,13 @@ __run_skip_1a:
maintenance_loop=false;
}
pthread_mutex_lock(&kq.m);
if (kq.conn_ids.size() + kq.query_ids.size()) {
Scan_Sessions_to_Kill_All();
maintenance_loop=true;
}
pthread_mutex_unlock(&kq.m);
// update polls statistics
mypolls.loops++;
mypolls.loop_counters->incr(curtime/1000000);
@ -4562,6 +4572,42 @@ void MySQL_Threads_Handler::signal_all_threads(unsigned char _c) {
#endif // IDLE_THREADS
}
void MySQL_Threads_Handler::kill_connection_or_query(uint32_t _thread_session_id, bool query, char *username) {
unsigned int i;
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
thr_id_usr *tu = (thr_id_usr *)malloc(sizeof(thr_id_usr));
tu->id = _thread_session_id;
tu->username = strdup(username);
pthread_mutex_lock(&thr->kq.m);
if (query) {
thr->kq.query_ids.push_back(tu);
} else {
thr->kq.conn_ids.push_back(tu);
}
pthread_mutex_unlock(&thr->kq.m);
}
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads_idles[i].worker;
thr_id_usr *tu = (thr_id_usr *)malloc(sizeof(thr_id_usr));
tu->id = _thread_session_id;
tu->username = strdup(username);
pthread_mutex_lock(&thr->kq.m);
if (query) {
thr->kq.query_ids.push_back(tu);
} else {
thr->kq.conn_ids.push_back(tu);
}
pthread_mutex_unlock(&thr->kq.m);
}
}
#endif
signal_all_threads(0);
}
bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) {
bool ret=false;
unsigned int i;
@ -5186,3 +5232,83 @@ unsigned long long MySQL_Threads_Handler::get_killed_queries() {
}
return q;
}
void MySQL_Thread::Scan_Sessions_to_Kill_All() {
if (kq.conn_ids.size() + kq.query_ids.size()) {
Scan_Sessions_to_Kill(mysql_sessions);
}
#ifdef IDLE_THREADS
if (GloVars.global.idle_threads) {
if (kq.conn_ids.size() + kq.query_ids.size()) {
Scan_Sessions_to_Kill(idle_mysql_sessions);
}
if (kq.conn_ids.size() + kq.query_ids.size()) {
Scan_Sessions_to_Kill(resume_mysql_sessions);
}
if (kq.conn_ids.size() + kq.query_ids.size()) {
pthread_mutex_lock(&myexchange.mutex_idles);
Scan_Sessions_to_Kill(myexchange.idle_mysql_sessions);
pthread_mutex_unlock(&myexchange.mutex_idles);
}
if (kq.conn_ids.size() + kq.query_ids.size()) {
pthread_mutex_lock(&myexchange.mutex_resumes);
Scan_Sessions_to_Kill(myexchange.resume_mysql_sessions);
pthread_mutex_unlock(&myexchange.mutex_resumes);
}
}
#endif
for (std::vector<thr_id_usr *>::iterator it=kq.conn_ids.begin(); it!=kq.conn_ids.end(); ++it) {
thr_id_usr *t = *it;
free(t->username);
free(t);
}
for (std::vector<thr_id_usr *>::iterator it=kq.query_ids.begin(); it!=kq.query_ids.end(); ++it) {
thr_id_usr *t = *it;
free(t->username);
free(t);
}
kq.conn_ids.clear();
kq.query_ids.clear();
}
void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) {
for (unsigned int n=0; n<mysess->len && ( kq.conn_ids.size() + kq.query_ids.size() ) ; n++) {
MySQL_Session *_sess=(MySQL_Session *)mysess->index(n);
bool cont=true;
for (std::vector<thr_id_usr *>::iterator it=kq.conn_ids.begin(); cont && it!=kq.conn_ids.end(); ++it) {
thr_id_usr *t = *it;
if (t->id == _sess->thread_session_id) {
if (_sess->client_myds) {
if (strcmp(t->username,_sess->client_myds->myconn->userinfo->username)==0) {
_sess->killed=true;
}
}
cont=false;
free(t->username);
free(t);
kq.conn_ids.erase(it);
}
}
for (std::vector<thr_id_usr *>::iterator it=kq.query_ids.begin(); cont && it!=kq.query_ids.end(); ++it) {
thr_id_usr *t = *it;
if (t->id == _sess->thread_session_id) {
proxy_info("Killing query %d\n", t->id);
if (_sess->client_myds) {
if (strcmp(t->username,_sess->client_myds->myconn->userinfo->username)==0) {
if (_sess->mybe) {
if (_sess->mybe->server_myds) {
_sess->mybe->server_myds->wait_until=curtime;
_sess->mybe->server_myds->kill_type=1;
}
}
}
}
cont=false;
free(t->username);
free(t);
kq.query_ids.erase(it);
}
}
}
}

@ -477,3 +477,128 @@ char *mysql_query_digest_and_first_comment(char *s, int _len, char **first_comme
// process query stats
return r;
}
char *mysql_query_strip_comments(char *s, int _len) {
int i = 0;
int len = _len;
char *r = (char *) malloc(len + SIZECHAR);
char *p_r = r;
char *p_r_t = r;
char prev_char = 0;
char qutr_char = 0;
char flag = 0;
char fns=0;
bool lowercase=0;
lowercase=mysql_thread___query_digests_lowercase;
while(i < len)
{
// =================================================
// START - read token char and set flag what's going on.
// =================================================
if(flag == 0)
{
// store current position
p_r_t = p_r;
// comment type 1 - start with '/*'
if(prev_char == '/' && *s == '*')
{
flag = 1;
}
// comment type 2 - start with '#'
else if(*s == '#')
{
flag = 2;
}
// comment type 3 - start with '--'
else if(prev_char == '-' && *s == '-' && ((*(s+1)==' ') || (*(s+1)=='\n') || (*(s+1)=='\r') || (*(s+1)=='\t') ))
{
flag = 3;
}
// not above case - remove duplicated space char
else
{
flag = 0;
if (fns==0 && is_space_char(*s)) {
s++;
i++;
continue;
}
if (fns==0) fns=1;
if(is_space_char(prev_char) && is_space_char(*s)){
prev_char = ' ';
*p_r = ' ';
s++;
i++;
continue;
}
}
}
// =================================================
// PROCESS and FINISH - do something on each case
// =================================================
else
{
// --------
// comment
// --------
if(
// comment type 1 - /* .. */
(flag == 1 && prev_char == '*' && *s == '/') ||
// comment type 2 - # ... \n
(flag == 2 && (*s == '\n' || *s == '\r' || (i == len - 1) ))
||
// comment type 3 - -- ... \n
(flag == 3 && (*s == '\n' || *s == '\r' || (i == len -1) ))
)
{
p_r = p_r_t;
if (flag == 1 || (i == len -1)) {
p_r -= SIZECHAR;
}
prev_char = ' ';
flag = 0;
s++;
i++;
continue;
}
}
// =================================================
// COPY CHAR
// =================================================
// convert every space char to ' '
if (lowercase==0) {
*p_r++ = !is_space_char(*s) ? *s : ' ';
} else {
*p_r++ = !is_space_char(*s) ? (tolower(*s)) : ' ';
}
prev_char = *s++;
i++;
}
// remove a trailing space
if (p_r>r) {
char *e=p_r;
e--;
if (*e==' ') {
*e=0;
}
}
*p_r = 0;
return r;
}

@ -179,6 +179,7 @@ MySQL_Data_Stream::MySQL_Data_Stream() {
max_connect_time=0;
wait_until=0;
pause_until=0;
kill_type=0;
connect_tries=0;
poll_fds_idx=-1;
resultset_length=0;

Loading…
Cancel
Save