First commit to split handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo

pull/3110/head
René Cannaò 6 years ago
parent 3ab0054af9
commit 8fb875aec1

@ -153,6 +153,12 @@ class MySQL_Session
void handler___status_WAITING_CLIENT_DATA___default();
void handler___status_NONE_or_default(PtrSize_t& pkt);
void handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt);
int handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(PtrSize_t *pkt, bool *lock_hostgroup, unsigned int nTrx, string& nq);
public:
bool handler_again___status_SETTING_GENERIC_VARIABLE(int *_rc, const char *var_name, const char *var_value, bool no_quote=false, bool set_transaction=false);
bool handler_again___status_SETTING_SQL_LOG_BIN(int *);

@ -4907,6 +4907,143 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
void MySQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt) {
// the query was rewritten
l_free(pkt->size,pkt->ptr); // free old pkt
// allocate new pkt
timespec begint;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
}
pkt->size=sizeof(mysql_hdr)+1+qpo->new_query->length();
pkt->ptr=l_alloc(pkt->size);
mysql_hdr hdr;
hdr.pkt_id=0;
hdr.pkt_length=pkt->size-sizeof(mysql_hdr);
memcpy((unsigned char *)pkt->ptr, &hdr, sizeof(mysql_hdr)); // copy header
unsigned char *c=(unsigned char *)pkt->ptr+sizeof(mysql_hdr);
*c=(unsigned char)_MYSQL_COM_QUERY; // set command type
memcpy((unsigned char *)pkt->ptr+sizeof(mysql_hdr)+1,qpo->new_query->data(),qpo->new_query->length()); // copy query
CurrentQuery.query_parser_free();
CurrentQuery.begin((unsigned char *)pkt->ptr,pkt->size,true);
delete qpo->new_query;
timespec endt;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
(endt.tv_sec*1000000000+endt.tv_nsec) -
(begint.tv_sec*1000000000+begint.tv_nsec);
}
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
void MySQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt) {
gtid_hid = -1;
client_myds->DSS=STATE_QUERY_SENT_NET;
unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,0,0,setStatus,0,qpo->OK_msg);
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
void MySQL_Session::handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt) {
client_myds->DSS=STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1148,(char *)"42000",qpo->error_msg);
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
void MySQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt) {
// ER_NET_PACKET_TOO_LARGE
client_myds->DSS=STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1153,(char *)"08S01",(char *)"Got a packet bigger than 'max_allowed_packet' bytes", true);
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
}
// this function as inline in handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo
// returned values:
// 0 : no action
// 1 : return false
// 2 : return true
int MySQL_Session::handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(PtrSize_t *pkt, bool *lock_hostgroup, unsigned int nTrx, string& nq) {
re2::RE2::Options *opt2=new re2::RE2::Options(RE2::Quiet);
opt2->set_case_sensitive(false);
char *pattern=(char *)"(?: *)SET *(?:|SESSION +|@@|@@session.)SQL_LOG_BIN *(?:|:)= *(\\d+) *(?:(|;|-- .*|#.*))$";
re2::RE2 *re=new RE2(pattern, *opt2);
int i;
int rc=RE2::PartialMatch(nq, *re, &i);
delete re;
delete opt2;
if (rc && ( i==0 || i==1) ) {
//fprintf(stderr,"sql_log_bin=%d\n", i);
if (i == 1) {
if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "1"))
return 1;
}
else if (i == 0) {
if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "0"))
return 1;
}
#ifdef DEBUG
proxy_info("Setting SQL_LOG_BIN to %d\n", i);
#endif
#ifdef DEBUG
{
string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str());
}
#endif
// we recompute command_type instead of taking it from the calling function
unsigned char command_type=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
if (command_type == _MYSQL_COM_QUERY) {
client_myds->DSS=STATE_QUERY_SENT_NET;
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
return 2;
}
} else {
int kq = 0;
kq = strncmp((const char *)CurrentQuery.QueryPointer, (const char *)"SET @@SESSION.SQL_LOG_BIN = @MYSQLDUMP_TEMP_LOG_BIN;" , CurrentQuery.QueryLength);
#ifdef DEBUG
{
string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str());
}
#endif
if (kq == 0) {
client_myds->DSS=STATE_QUERY_SENT_NET;
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
return 2;
} else {
string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
proxy_error("Unable to parse query. If correct, report it as a bug: %s\n", nqn.c_str());
unable_to_parse_set_statement(lock_hostgroup);
return 1;
}
}
return 0;
}
bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *pkt, bool *lock_hostgroup, bool prepared) {
/*
lock_hostgroup:
@ -4921,60 +5058,21 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
bool exit_after_SetParse = true;
unsigned char command_type=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
if (qpo->new_query) {
// the query was rewritten
l_free(pkt->size,pkt->ptr); // free old pkt
// allocate new pkt
timespec begint;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
}
pkt->size=sizeof(mysql_hdr)+1+qpo->new_query->length();
pkt->ptr=l_alloc(pkt->size);
mysql_hdr hdr;
hdr.pkt_id=0;
hdr.pkt_length=pkt->size-sizeof(mysql_hdr);
memcpy((unsigned char *)pkt->ptr, &hdr, sizeof(mysql_hdr)); // copy header
unsigned char *c=(unsigned char *)pkt->ptr+sizeof(mysql_hdr);
*c=(unsigned char)_MYSQL_COM_QUERY; // set command type
memcpy((unsigned char *)pkt->ptr+sizeof(mysql_hdr)+1,qpo->new_query->data(),qpo->new_query->length()); // copy query
CurrentQuery.query_parser_free();
CurrentQuery.begin((unsigned char *)pkt->ptr,pkt->size,true);
delete qpo->new_query;
timespec endt;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
(endt.tv_sec*1000000000+endt.tv_nsec) -
(begint.tv_sec*1000000000+begint.tv_nsec);
}
handler_WCD_SS_MCQ_qpo_QueryRewrite(pkt);
}
if (pkt->size > (unsigned int) mysql_thread___max_allowed_packet) {
// ER_NET_PACKET_TOO_LARGE
client_myds->DSS=STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1153,(char *)"08S01",(char *)"Got a packet bigger than 'max_allowed_packet' bytes", true);
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
handler_WCD_SS_MCQ_qpo_LargePacket(pkt);
return true;
}
if (qpo->OK_msg) {
gtid_hid = -1;
client_myds->DSS=STATE_QUERY_SENT_NET;
unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,0,0,setStatus,0,qpo->OK_msg);
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
handler_WCD_SS_MCQ_qpo_OK_msg(pkt);
return true;
}
if (qpo->error_msg) {
client_myds->DSS=STATE_QUERY_SENT_NET;
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,1148,(char *)"42000",qpo->error_msg);
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
handler_WCD_SS_MCQ_qpo_error_msg(pkt);
return true;
}
@ -5005,71 +5103,10 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
RE2::GlobalReplace(&nq,(char *)"^/\\*!\\d\\d\\d\\d\\d SET(.*)\\*/",(char *)"SET\\1");
RE2::GlobalReplace(&nq,(char *)"(?U)/\\*.*\\*/",(char *)"");
if (match_regexes && match_regexes[0]->match(dig)) {
re2::RE2::Options *opt2=new re2::RE2::Options(RE2::Quiet);
opt2->set_case_sensitive(false);
char *pattern=(char *)"(?: *)SET *(?:|SESSION +|@@|@@session.)SQL_LOG_BIN *(?:|:)= *(\\d+) *(?:(|;|-- .*|#.*))$";
re2::RE2 *re=new RE2(pattern, *opt2);
int i;
rc=RE2::PartialMatch(nq, *re, &i);
delete re;
delete opt2;
if (rc && ( i==0 || i==1) ) {
//fprintf(stderr,"sql_log_bin=%d\n", i);
if (i == 1) {
if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "1"))
return false;
}
else if (i == 0) {
if (!mysql_variables.client_set_value(this, SQL_LOG_BIN, "0"))
return false;
}
#ifdef DEBUG
proxy_info("Setting SQL_LOG_BIN to %d\n", i);
#endif
#ifdef DEBUG
{
string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str());
}
#endif
if (command_type == _MYSQL_COM_QUERY) {
client_myds->DSS=STATE_QUERY_SENT_NET;
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
return true;
}
} else {
int kq = 0;
kq = strncmp((const char *)CurrentQuery.QueryPointer, (const char *)"SET @@SESSION.SQL_LOG_BIN = @MYSQLDUMP_TEMP_LOG_BIN;" , CurrentQuery.QueryLength);
#ifdef DEBUG
{
string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Setting SQL_LOG_BIN to %d for query: %s\n", i, nqn.c_str());
}
#endif
if (kq == 0) {
client_myds->DSS=STATE_QUERY_SENT_NET;
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,setStatus,0,NULL);
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
RequestEnd(NULL);
l_free(pkt->size,pkt->ptr);
return true;
} else {
string nqn = string((char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
proxy_error("Unable to parse query. If correct, report it as a bug: %s\n", nqn.c_str());
unable_to_parse_set_statement(lock_hostgroup);
return false;
}
}
int rc = handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(pkt, lock_hostgroup, nTrx, nq);
if (rc == 1) return false;
if (rc == 2) return true;
// if rc == 0 , continue as normal
}
if (
(

Loading…
Cancel
Save