diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index f7fecf0af..2fa068c0c 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -128,6 +128,8 @@ class MySQL_Data_Stream short revents; + char kill_type; + bool encrypted; bool net_failure; diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 22191e4e8..19a359a95 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -218,6 +218,7 @@ class MySQL_Session void writeout(); void Memory_Stats(); void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds); + bool handle_command_query_kill(PtrSize_t *); }; #define KILL_QUERY 1 diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index bc64fe870..3ad5ba195 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -35,6 +35,17 @@ typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) _conn_exchange_t { } conn_exchange_t; #endif // IDLE_THREADS +typedef struct _thr_id_username_t { + uint32_t id; + char *username; +} thr_id_usr; + +typedef struct _kill_queue_t { + pthread_mutex_t m; + std::vector conn_ids; + std::vector query_ids; +} kill_queue_t; + class ProxySQL_Poll { private: @@ -198,6 +209,7 @@ class MySQL_Thread int pipefd[2]; int shutdown; + kill_queue_t kq; bool epoll_thread; bool poll_timeout_bool; @@ -265,6 +277,8 @@ class MySQL_Thread MySQL_Connection * get_MyConn_local(unsigned int, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid); void push_MyConn_local(MySQL_Connection *); void return_local_connections(); + void Scan_Sessions_to_Kill(PtrArray *mysess); + void Scan_Sessions_to_Kill_All(); }; @@ -501,6 +515,7 @@ class MySQL_Threads_Handler return MLM->find_iface_from_fd(fd); } void Get_Memory_Stats(); + void kill_connection_or_query(uint32_t _thread_session_id, bool query, char *username); }; diff --git a/include/c_tokenizer.h b/include/c_tokenizer.h index b16f0c7e0..40c15e15c 100644 --- a/include/c_tokenizer.h +++ b/include/c_tokenizer.h @@ -33,6 +33,7 @@ void tokenizer( tokenizer_t *, const char* s, const char* delimiters, int emptie const char* free_tokenizer( tokenizer_t* tokenizer ); const char* tokenize( tokenizer_t* tokenizer ); char * mysql_query_digest_and_first_comment(char *s , int len , char **first_comment, char *buf); +char * mysql_query_strip_comments(char *s , int len); void c_split_2(const char *in, const char *del, char **out1, char **out2); #ifdef __cplusplus } diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 5a721cb0d..52fbf1e54 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -2020,7 +2020,11 @@ void MySQL_ResultSet::add_err(MySQL_Data_Stream *_myds) { char sqlstate[10]; sprintf(sqlstate,"%s",mysql_sqlstate(_mysql)); if (_myds && _myds->killed_at) { // see case #750 - myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded"); + if (_myds->kill_type == 0) { + myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded"); + } else { + myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,1317,sqlstate,(char *)"Query execution was interrupted"); + } } else { myprot->generate_pkt_ERR(false,&pkt.ptr,&pkt.size,sid,mysql_errno(_mysql),sqlstate,mysql_error(_mysql)); } diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 86d0ec64a..30acc0edf 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -2289,6 +2289,7 @@ __get_pkts_from_client: proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n"); mybe->server_myds->killed_at=0; + mybe->server_myds->kill_type=0; mybe->server_myds->mysql_real_query.init(&pkt); client_myds->setDSS_STATE_QUERY_SENT_NET(); } else { @@ -2418,6 +2419,7 @@ __get_pkts_from_client: mybe->server_myds->wait_until=0; pause_until=0; mybe->server_myds->killed_at=0; + mybe->server_myds->kill_type=0; mybe->server_myds->mysql_real_query.init(&pkt); // fix memory leak for PREPARE in prepared statements #796 client_myds->setDSS_STATE_QUERY_SENT_NET(); } @@ -2497,6 +2499,7 @@ __get_pkts_from_client: mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure; mybe->server_myds->wait_until=0; mybe->server_myds->killed_at=0; + mybe->server_myds->kill_type=0; client_myds->setDSS_STATE_QUERY_SENT_NET(); } break; @@ -4211,6 +4214,12 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } + // handle command KILL #860 + if (prepared == false) { + if (handle_command_query_kill(pkt)) { + return true; + } + } if (qpo->cache_ttl>0) { uint32_t resbuf=0; unsigned char *aa=GloQC->get( @@ -4493,7 +4502,11 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My char sqlstate[10]; sprintf(sqlstate,"%s",mysql_sqlstate(mysql)); if (_myds && _myds->killed_at) { // see case #750 - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded"); + if (_myds->kill_type == 0) { + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1907,sqlstate,(char *)"Query execution was interrupted, query_timeout exceeded"); + } else { + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1317,sqlstate,(char *)"Query execution was interrupted"); + } } else { client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(mysql),sqlstate,mysql_error(mysql)); } @@ -4741,3 +4754,59 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_ delete new_sess; } } + +bool MySQL_Session::handle_command_query_kill(PtrSize_t *pkt) { + unsigned char command_type=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + if (CurrentQuery.QueryParserArgs.digest_text) { + if (command_type == _MYSQL_COM_QUERY) { + if (client_myds && client_myds->myconn) { + MySQL_Connection *mc = client_myds->myconn; + if (mc->userinfo && mc->userinfo->username) { + if (CurrentQuery.MyComQueryCmd == MYSQL_COM_QUERY_KILL) { + char *qu = mysql_query_strip_comments((char *)pkt->ptr+1+sizeof(mysql_hdr), pkt->size-1-sizeof(mysql_hdr)); + string nq=string(qu,strlen(qu)); + re2::RE2::Options *opt2=new re2::RE2::Options(RE2::Quiet); + opt2->set_case_sensitive(false); + char *pattern=(char *)"^KILL\\s+(CONNECTION |QUERY |)\\s*(\\d+)\\s*$"; + re2::RE2 *re=new RE2(pattern, *opt2); + int id=0; + string tk; + int rc; + rc=RE2::FullMatch(nq, *re, &tk, &id); + delete re; + delete opt2; + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 2, "filtered query= \"%s\"\n", qu); + free(qu); + if (id) { + int tki = -1; + if (tk.c_str()) { + if ((strlen(tk.c_str())==0) || (strcasecmp(tk.c_str(),"CONNECTION ")==0)) { + tki = 0; + } else { + if (strcasecmp(tk.c_str(),"QUERY ")==0) { + tki = 1; + } + } + } + if (tki >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 2, "Killing %s %d\n", (tki == 0 ? "CONNECTION" : "QUERY") , id); + GloMTH->kill_connection_or_query( id, (tki == 0 ? false : true ), mc->userinfo->username); + client_myds->DSS=STATE_QUERY_SENT_NET; + unsigned int nTrx=NumActiveTransactions(); + uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus= SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + l_free(pkt->size,pkt->ptr); + RequestEnd(NULL); + return true; + } + } + } + } + } + } + } + return false; +} diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index ed8421b54..32452aad7 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -2582,6 +2582,8 @@ bool MySQL_Thread::init() { assert(resume_mysql_sessions); #endif // IDLE_THREADS + pthread_mutex_init(&kq.m,NULL); + shutdown=0; my_idle_conns=(MySQL_Connection **)malloc(sizeof(MySQL_Connection *)*SESSIONS_FOR_CONNECTIONS_HANDLER); memset(my_idle_conns,0,sizeof(MySQL_Connection *)*SESSIONS_FOR_CONNECTIONS_HANDLER); @@ -2965,6 +2967,7 @@ __run_skip_1a: curtime=monotonic_time(); atomic_curtime=curtime; + poll_timeout_bool=false; if ( #ifdef IDLE_THREADS @@ -2988,6 +2991,13 @@ __run_skip_1a: maintenance_loop=false; } + pthread_mutex_lock(&kq.m); + if (kq.conn_ids.size() + kq.query_ids.size()) { + Scan_Sessions_to_Kill_All(); + maintenance_loop=true; + } + pthread_mutex_unlock(&kq.m); + // update polls statistics mypolls.loops++; mypolls.loop_counters->incr(curtime/1000000); @@ -4562,6 +4572,42 @@ void MySQL_Threads_Handler::signal_all_threads(unsigned char _c) { #endif // IDLE_THREADS } +void MySQL_Threads_Handler::kill_connection_or_query(uint32_t _thread_session_id, bool query, char *username) { + unsigned int i; + for (i=0;iid = _thread_session_id; + tu->username = strdup(username); + pthread_mutex_lock(&thr->kq.m); + if (query) { + thr->kq.query_ids.push_back(tu); + } else { + thr->kq.conn_ids.push_back(tu); + } + pthread_mutex_unlock(&thr->kq.m); + + } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + for (i=0;iid = _thread_session_id; + tu->username = strdup(username); + pthread_mutex_lock(&thr->kq.m); + if (query) { + thr->kq.query_ids.push_back(tu); + } else { + thr->kq.conn_ids.push_back(tu); + } + pthread_mutex_unlock(&thr->kq.m); + } + } +#endif + signal_all_threads(0); +} + bool MySQL_Threads_Handler::kill_session(uint32_t _thread_session_id) { bool ret=false; unsigned int i; @@ -5186,3 +5232,83 @@ unsigned long long MySQL_Threads_Handler::get_killed_queries() { } return q; } + + +void MySQL_Thread::Scan_Sessions_to_Kill_All() { + if (kq.conn_ids.size() + kq.query_ids.size()) { + Scan_Sessions_to_Kill(mysql_sessions); + } +#ifdef IDLE_THREADS + if (GloVars.global.idle_threads) { + if (kq.conn_ids.size() + kq.query_ids.size()) { + Scan_Sessions_to_Kill(idle_mysql_sessions); + } + if (kq.conn_ids.size() + kq.query_ids.size()) { + Scan_Sessions_to_Kill(resume_mysql_sessions); + } + if (kq.conn_ids.size() + kq.query_ids.size()) { + pthread_mutex_lock(&myexchange.mutex_idles); + Scan_Sessions_to_Kill(myexchange.idle_mysql_sessions); + pthread_mutex_unlock(&myexchange.mutex_idles); + } + if (kq.conn_ids.size() + kq.query_ids.size()) { + pthread_mutex_lock(&myexchange.mutex_resumes); + Scan_Sessions_to_Kill(myexchange.resume_mysql_sessions); + pthread_mutex_unlock(&myexchange.mutex_resumes); + } + } +#endif + for (std::vector::iterator it=kq.conn_ids.begin(); it!=kq.conn_ids.end(); ++it) { + thr_id_usr *t = *it; + free(t->username); + free(t); + } + for (std::vector::iterator it=kq.query_ids.begin(); it!=kq.query_ids.end(); ++it) { + thr_id_usr *t = *it; + free(t->username); + free(t); + } + kq.conn_ids.clear(); + kq.query_ids.clear(); +} + +void MySQL_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) { + for (unsigned int n=0; nlen && ( kq.conn_ids.size() + kq.query_ids.size() ) ; n++) { + MySQL_Session *_sess=(MySQL_Session *)mysess->index(n); + bool cont=true; + for (std::vector::iterator it=kq.conn_ids.begin(); cont && it!=kq.conn_ids.end(); ++it) { + thr_id_usr *t = *it; + if (t->id == _sess->thread_session_id) { + if (_sess->client_myds) { + if (strcmp(t->username,_sess->client_myds->myconn->userinfo->username)==0) { + _sess->killed=true; + } + } + cont=false; + free(t->username); + free(t); + kq.conn_ids.erase(it); + } + } + for (std::vector::iterator it=kq.query_ids.begin(); cont && it!=kq.query_ids.end(); ++it) { + thr_id_usr *t = *it; + if (t->id == _sess->thread_session_id) { + proxy_info("Killing query %d\n", t->id); + if (_sess->client_myds) { + if (strcmp(t->username,_sess->client_myds->myconn->userinfo->username)==0) { + if (_sess->mybe) { + if (_sess->mybe->server_myds) { + _sess->mybe->server_myds->wait_until=curtime; + _sess->mybe->server_myds->kill_type=1; + } + } + } + } + cont=false; + free(t->username); + free(t); + kq.query_ids.erase(it); + } + } + } +} diff --git a/lib/c_tokenizer.c b/lib/c_tokenizer.c index 5a9c4b651..caa52fff2 100644 --- a/lib/c_tokenizer.c +++ b/lib/c_tokenizer.c @@ -477,3 +477,128 @@ char *mysql_query_digest_and_first_comment(char *s, int _len, char **first_comme // process query stats return r; } + + +char *mysql_query_strip_comments(char *s, int _len) { + int i = 0; + int len = _len; + char *r = (char *) malloc(len + SIZECHAR); + char *p_r = r; + char *p_r_t = r; + + char prev_char = 0; + char qutr_char = 0; + + char flag = 0; + + char fns=0; + + bool lowercase=0; + lowercase=mysql_thread___query_digests_lowercase; + + while(i < len) + { + // ================================================= + // START - read token char and set flag what's going on. + // ================================================= + if(flag == 0) + { + // store current position + p_r_t = p_r; + + // comment type 1 - start with '/*' + if(prev_char == '/' && *s == '*') + { + flag = 1; + } + + // comment type 2 - start with '#' + else if(*s == '#') + { + flag = 2; + } + + // comment type 3 - start with '--' + else if(prev_char == '-' && *s == '-' && ((*(s+1)==' ') || (*(s+1)=='\n') || (*(s+1)=='\r') || (*(s+1)=='\t') )) + { + flag = 3; + } + // not above case - remove duplicated space char + else + { + flag = 0; + if (fns==0 && is_space_char(*s)) { + s++; + i++; + continue; + } + if (fns==0) fns=1; + if(is_space_char(prev_char) && is_space_char(*s)){ + prev_char = ' '; + *p_r = ' '; + s++; + i++; + continue; + } + } + } + + // ================================================= + // PROCESS and FINISH - do something on each case + // ================================================= + else + { + // -------- + // comment + // -------- + if( + // comment type 1 - /* .. */ + (flag == 1 && prev_char == '*' && *s == '/') || + + // comment type 2 - # ... \n + (flag == 2 && (*s == '\n' || *s == '\r' || (i == len - 1) )) + || + // comment type 3 - -- ... \n + (flag == 3 && (*s == '\n' || *s == '\r' || (i == len -1) )) + ) + { + p_r = p_r_t; + if (flag == 1 || (i == len -1)) { + p_r -= SIZECHAR; + } + prev_char = ' '; + flag = 0; + s++; + i++; + continue; + } + } + + // ================================================= + // COPY CHAR + // ================================================= + // convert every space char to ' ' + if (lowercase==0) { + *p_r++ = !is_space_char(*s) ? *s : ' '; + } else { + *p_r++ = !is_space_char(*s) ? (tolower(*s)) : ' '; + } + prev_char = *s++; + + i++; + } + + // remove a trailing space + if (p_r>r) { + char *e=p_r; + e--; + if (*e==' ') { + *e=0; + } + } + + *p_r = 0; + + return r; +} + diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index c48a54ca6..42bf28fcf 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -179,6 +179,7 @@ MySQL_Data_Stream::MySQL_Data_Stream() { max_connect_time=0; wait_until=0; pause_until=0; + kill_type=0; connect_tries=0; poll_fds_idx=-1; resultset_length=0;