diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 5bd0acbeb..49826ca8b 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -153,6 +153,12 @@ class MySQL_Session void handler___status_WAITING_CLIENT_DATA___default(); void handler___status_NONE_or_default(PtrSize_t& pkt); + void handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt); + void handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt); + void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt); + void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt); + int handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(PtrSize_t *pkt, bool *lock_hostgroup, unsigned int nTrx, string& nq); + public: bool handler_again___status_SETTING_GENERIC_VARIABLE(int *_rc, const char *var_name, const char *var_value, bool no_quote=false, bool set_transaction=false); bool handler_again___status_SETTING_SQL_LOG_BIN(int *); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index fa037671e..7b14a8c89 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -4907,6 +4907,143 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } + +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +void MySQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt) { + // the query was rewritten + l_free(pkt->size,pkt->ptr); // free old pkt + // allocate new pkt + timespec begint; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint); + } + pkt->size=sizeof(mysql_hdr)+1+qpo->new_query->length(); + pkt->ptr=l_alloc(pkt->size); + mysql_hdr hdr; + hdr.pkt_id=0; + hdr.pkt_length=pkt->size-sizeof(mysql_hdr); + memcpy((unsigned char *)pkt->ptr, &hdr, sizeof(mysql_hdr)); // copy header + unsigned char *c=(unsigned char *)pkt->ptr+sizeof(mysql_hdr); + *c=(unsigned char)_MYSQL_COM_QUERY; // set command type + memcpy((unsigned char *)pkt->ptr+sizeof(mysql_hdr)+1,qpo->new_query->data(),qpo->new_query->length()); // copy query + CurrentQuery.query_parser_free(); + CurrentQuery.begin((unsigned char *)pkt->ptr,pkt->size,true); + delete qpo->new_query; + timespec endt; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec*1000000000+endt.tv_nsec) - + (begint.tv_sec*1000000000+begint.tv_nsec); + } +} + +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +void MySQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt) { + gtid_hid = -1; + client_myds->DSS=STATE_QUERY_SENT_NET; + unsigned int nTrx=NumActiveTransactions(); + uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,0,0,setStatus,0,qpo->OK_msg); + RequestEnd(NULL); + l_free(pkt->size,pkt->ptr); +} + +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +void MySQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt) { + client_myds->DSS=STATE_QUERY_SENT_NET; + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1148,(char *)"42000",qpo->error_msg); + RequestEnd(NULL); + l_free(pkt->size,pkt->ptr); +} + +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +void MySQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt) { + // ER_NET_PACKET_TOO_LARGE + client_myds->DSS=STATE_QUERY_SENT_NET; + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1153,(char *)"08S01",(char *)"Got a packet bigger than 'max_allowed_packet' bytes", true); + RequestEnd(NULL); + l_free(pkt->size,pkt->ptr); +} + + +// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo +// returned values: +// 0 : no action +// 1 : return false +// 2 : return true +int MySQL_Session::handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(PtrSize_t *pkt, bool *lock_hostgroup, unsigned int nTrx, string& nq) { + re2::RE2::Options *opt2=new re2::RE2::Options(RE2::Quiet); + opt2->set_case_sensitive(false); + char *pattern=(char *)"(?: *)SET *(?:|SESSION +|@@|@@session.)SQL_LOG_BIN *(?:|:)= *(\\d+) *(?:(|;|-- .*|#.*))$"; + re2::RE2 *re=new RE2(pattern, *opt2); + int i; + int rc=RE2::PartialMatch(nq, *re, &i); + delete re; + delete opt2; + if (rc && ( i==0 || i==1) ) { + //fprintf(stderr,"sql_log_bin=%d\n", i); + if (i == 1) { + if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "1")) + return 1; + } + else if (i == 0) { + if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "0")) + return 1; + } + +#ifdef DEBUG + proxy_info("Setting SQL_LOG_BIN to %d\n", i); +#endif +#ifdef DEBUG + { + string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str()); + } +#endif + // we recompute command_type instead of taking it from the calling function + unsigned char command_type=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + if (command_type == _MYSQL_COM_QUERY) { + client_myds->DSS=STATE_QUERY_SENT_NET; + uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + RequestEnd(NULL); + l_free(pkt->size,pkt->ptr); + return 2; + } + } else { + int kq = 0; + kq = strncmp((const char *)CurrentQuery.QueryPointer, (const char *)"SET @@SESSION.SQL_LOG_BIN = @MYSQLDUMP_TEMP_LOG_BIN;" , CurrentQuery.QueryLength); +#ifdef DEBUG + { + string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str()); + } +#endif + if (kq == 0) { + client_myds->DSS=STATE_QUERY_SENT_NET; + uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); + if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + RequestEnd(NULL); + l_free(pkt->size,pkt->ptr); + return 2; + } else { + string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); + proxy_error("Unable to parse query. If correct, report it as a bug: %s\n", nqn.c_str()); + unable_to_parse_set_statement(lock_hostgroup); + return 1; + } + } + return 0; +} + bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *pkt, bool *lock_hostgroup, bool prepared) { /* lock_hostgroup: @@ -4921,60 +5058,21 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C bool exit_after_SetParse = true; unsigned char command_type=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); if (qpo->new_query) { - // the query was rewritten - l_free(pkt->size,pkt->ptr); // free old pkt - // allocate new pkt - timespec begint; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint); - } - pkt->size=sizeof(mysql_hdr)+1+qpo->new_query->length(); - pkt->ptr=l_alloc(pkt->size); - mysql_hdr hdr; - hdr.pkt_id=0; - hdr.pkt_length=pkt->size-sizeof(mysql_hdr); - memcpy((unsigned char *)pkt->ptr, &hdr, sizeof(mysql_hdr)); // copy header - unsigned char *c=(unsigned char *)pkt->ptr+sizeof(mysql_hdr); - *c=(unsigned char)_MYSQL_COM_QUERY; // set command type - memcpy((unsigned char *)pkt->ptr+sizeof(mysql_hdr)+1,qpo->new_query->data(),qpo->new_query->length()); // copy query - CurrentQuery.query_parser_free(); - CurrentQuery.begin((unsigned char *)pkt->ptr,pkt->size,true); - delete qpo->new_query; - timespec endt; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt); - thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + - (endt.tv_sec*1000000000+endt.tv_nsec) - - (begint.tv_sec*1000000000+begint.tv_nsec); - } + handler_WCD_SS_MCQ_qpo_QueryRewrite(pkt); } if (pkt->size > (unsigned int) mysql_thread___max_allowed_packet) { - // ER_NET_PACKET_TOO_LARGE - client_myds->DSS=STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1153,(char *)"08S01",(char *)"Got a packet bigger than 'max_allowed_packet' bytes", true); - RequestEnd(NULL); - l_free(pkt->size,pkt->ptr); + handler_WCD_SS_MCQ_qpo_LargePacket(pkt); return true; } if (qpo->OK_msg) { - gtid_hid = -1; - client_myds->DSS=STATE_QUERY_SENT_NET; - unsigned int nTrx=NumActiveTransactions(); - uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,0,0,setStatus,0,qpo->OK_msg); - RequestEnd(NULL); - l_free(pkt->size,pkt->ptr); + handler_WCD_SS_MCQ_qpo_OK_msg(pkt); return true; } if (qpo->error_msg) { - client_myds->DSS=STATE_QUERY_SENT_NET; - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1148,(char *)"42000",qpo->error_msg); - RequestEnd(NULL); - l_free(pkt->size,pkt->ptr); + handler_WCD_SS_MCQ_qpo_error_msg(pkt); return true; } @@ -5005,71 +5103,10 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C RE2::GlobalReplace(&nq,(char *)"^/\\*!\\d\\d\\d\\d\\d SET(.*)\\*/",(char *)"SET\\1"); RE2::GlobalReplace(&nq,(char *)"(?U)/\\*.*\\*/",(char *)""); if (match_regexes && match_regexes[0]->match(dig)) { - re2::RE2::Options *opt2=new re2::RE2::Options(RE2::Quiet); - opt2->set_case_sensitive(false); - char *pattern=(char *)"(?: *)SET *(?:|SESSION +|@@|@@session.)SQL_LOG_BIN *(?:|:)= *(\\d+) *(?:(|;|-- .*|#.*))$"; - re2::RE2 *re=new RE2(pattern, *opt2); - int i; - rc=RE2::PartialMatch(nq, *re, &i); - delete re; - delete opt2; - if (rc && ( i==0 || i==1) ) { - //fprintf(stderr,"sql_log_bin=%d\n", i); - if (i == 1) { - if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "1")) - return false; - } - else if (i == 0) { - if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "0")) - return false; - } - -#ifdef DEBUG - proxy_info("Setting SQL_LOG_BIN to %d\n", i); -#endif -#ifdef DEBUG - { - string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str()); - } -#endif - if (command_type == _MYSQL_COM_QUERY) { - client_myds->DSS=STATE_QUERY_SENT_NET; - uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); - client_myds->DSS=STATE_SLEEP; - status=WAITING_CLIENT_DATA; - RequestEnd(NULL); - l_free(pkt->size,pkt->ptr); - return true; - } - } else { - int kq = 0; - kq = strncmp((const char *)CurrentQuery.QueryPointer, (const char *)"SET @@SESSION.SQL_LOG_BIN = @MYSQLDUMP_TEMP_LOG_BIN;" , CurrentQuery.QueryLength); -#ifdef DEBUG - { - string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str()); - } -#endif - if (kq == 0) { - client_myds->DSS=STATE_QUERY_SENT_NET; - uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 ); - if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; - client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL); - client_myds->DSS=STATE_SLEEP; - status=WAITING_CLIENT_DATA; - RequestEnd(NULL); - l_free(pkt->size,pkt->ptr); - return true; - } else { - string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); - proxy_error("Unable to parse query. If correct, report it as a bug: %s\n", nqn.c_str()); - unable_to_parse_set_statement(lock_hostgroup); - return false; - } - } + int rc = handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(pkt, lock_hostgroup, nTrx, nq); + if (rc == 1) return false; + if (rc == 2) return true; + // if rc == 0 , continue as normal } if ( (