|
|
|
|
@ -2798,6 +2798,250 @@ bool MySQL_Session::handler_again___status_CHANGING_AUTOCOMMIT(int *_rc) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// this function was inline inside MySQL_Session::get_pkts_from_client
|
|
|
|
|
// where:
|
|
|
|
|
// status = WAITING_CLIENT_DATA
|
|
|
|
|
// client_myds->DSS = STATE_SLEEP
|
|
|
|
|
// enum_mysql_command = _MYSQL_COM_STMT_PREPARE
|
|
|
|
|
//
|
|
|
|
|
// all break were replaced with a return
|
|
|
|
|
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(PtrSize_t& pkt) {
|
|
|
|
|
if (session_type != PROXYSQL_SESSION_MYSQL) { // only MySQL module supports prepared statement!!
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Command not supported");
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
return;
|
|
|
|
|
} else {
|
|
|
|
|
thread->status_variables.stvar[st_var_frontend_stmt_prepare]++;
|
|
|
|
|
thread->status_variables.stvar[st_var_queries]++;
|
|
|
|
|
// if we reach here, we are not on MySQL module
|
|
|
|
|
bool rc_break=false;
|
|
|
|
|
bool lock_hostgroup = false;
|
|
|
|
|
|
|
|
|
|
// Note: CurrentQuery sees the query as sent by the client.
|
|
|
|
|
// shortly after, the packets it used to contain the query will be deallocated
|
|
|
|
|
// Note2 : we call the next function as if it was _MYSQL_COM_QUERY
|
|
|
|
|
// because the offset will be identical
|
|
|
|
|
CurrentQuery.begin((unsigned char *)pkt.ptr,pkt.size,true);
|
|
|
|
|
|
|
|
|
|
timespec begint;
|
|
|
|
|
timespec endt;
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
|
|
|
|
|
}
|
|
|
|
|
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
|
|
|
|
|
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
|
|
|
|
|
(endt.tv_sec*1000000000+endt.tv_nsec) -
|
|
|
|
|
(begint.tv_sec*1000000000+begint.tv_nsec);
|
|
|
|
|
}
|
|
|
|
|
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
|
|
|
|
|
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup);
|
|
|
|
|
if (rc_break==true) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6
|
|
|
|
|
if (locked_on_hostgroup < 0) {
|
|
|
|
|
if (lock_hostgroup) {
|
|
|
|
|
// we are locking on hostgroup now
|
|
|
|
|
locked_on_hostgroup = current_hostgroup;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (locked_on_hostgroup >= 0) {
|
|
|
|
|
if (current_hostgroup != locked_on_hostgroup) {
|
|
|
|
|
client_myds->DSS=STATE_QUERY_SENT_NET;
|
|
|
|
|
int l = CurrentQuery.QueryLength;
|
|
|
|
|
char *end = (char *)"";
|
|
|
|
|
if (l>256) {
|
|
|
|
|
l=253;
|
|
|
|
|
end = (char *)"...";
|
|
|
|
|
}
|
|
|
|
|
string nqn = string((char *)CurrentQuery.QueryPointer,l);
|
|
|
|
|
char *err_msg = (char *)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s";
|
|
|
|
|
char *buf = (char *)malloc(strlen(err_msg)+strlen(nqn.c_str())+strlen(end)+64);
|
|
|
|
|
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true);
|
|
|
|
|
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
|
|
|
|
|
RequestEnd(NULL);
|
|
|
|
|
free(buf);
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
mybe=find_or_create_backend(current_hostgroup);
|
|
|
|
|
if (client_myds->myconn->local_stmts==NULL) {
|
|
|
|
|
client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true);
|
|
|
|
|
}
|
|
|
|
|
uint64_t hash=client_myds->myconn->local_stmts->compute_hash((char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
|
|
|
|
|
MySQL_STMT_Global_info *stmt_info=NULL;
|
|
|
|
|
// we first lock GloStmt
|
|
|
|
|
GloMyStmt->wrlock();
|
|
|
|
|
stmt_info=GloMyStmt->find_prepared_statement_by_hash(hash,false);
|
|
|
|
|
if (stmt_info) {
|
|
|
|
|
// the prepared statement exists in GloMyStmt
|
|
|
|
|
// for this reason, we do not need to prepare it again, and we can already reply to the client
|
|
|
|
|
// we will now generate a unique stmt and send it to the client
|
|
|
|
|
uint32_t new_stmt_id=client_myds->myconn->local_stmts->generate_new_client_stmt_id(stmt_info->statement_id);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info,new_stmt_id);
|
|
|
|
|
LogQuery(NULL);
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
CurrentQuery.end_time=thread->curtime;
|
|
|
|
|
CurrentQuery.end();
|
|
|
|
|
} else {
|
|
|
|
|
mybe=find_or_create_backend(current_hostgroup);
|
|
|
|
|
status=PROCESSING_STMT_PREPARE;
|
|
|
|
|
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
|
|
|
|
|
mybe->server_myds->wait_until=0;
|
|
|
|
|
pause_until=0;
|
|
|
|
|
mybe->server_myds->killed_at=0;
|
|
|
|
|
mybe->server_myds->kill_type=0;
|
|
|
|
|
mybe->server_myds->mysql_real_query.init(&pkt); // fix memory leak for PREPARE in prepared statements #796
|
|
|
|
|
mybe->server_myds->statuses.questions++;
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
}
|
|
|
|
|
GloMyStmt->unlock();
|
|
|
|
|
return; // make sure to not return before unlocking GloMyStmt
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// this function was inline inside MySQL_Session::get_pkts_from_client
|
|
|
|
|
// where:
|
|
|
|
|
// status = WAITING_CLIENT_DATA
|
|
|
|
|
// client_myds->DSS = STATE_SLEEP
|
|
|
|
|
// enum_mysql_command = _MYSQL_COM_STMT_EXECUTE
|
|
|
|
|
//
|
|
|
|
|
// all break were replaced with a return
|
|
|
|
|
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(PtrSize_t& pkt) {
|
|
|
|
|
if (session_type != PROXYSQL_SESSION_MYSQL) { // only MySQL module supports prepared statement!!
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Command not supported");
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
return;
|
|
|
|
|
} else {
|
|
|
|
|
// if we reach here, we are on MySQL module
|
|
|
|
|
bool rc_break=false;
|
|
|
|
|
bool lock_hostgroup = false;
|
|
|
|
|
thread->status_variables.stvar[st_var_frontend_stmt_execute]++;
|
|
|
|
|
thread->status_variables.stvar[st_var_queries]++;
|
|
|
|
|
uint32_t client_stmt_id=0;
|
|
|
|
|
uint64_t stmt_global_id=0;
|
|
|
|
|
memcpy(&client_stmt_id,(char *)pkt.ptr+5,sizeof(uint32_t));
|
|
|
|
|
stmt_global_id=client_myds->myconn->local_stmts->find_global_stmt_id_from_client(client_stmt_id);
|
|
|
|
|
if (stmt_global_id == 0) {
|
|
|
|
|
// FIXME: add error handling
|
|
|
|
|
assert(0);
|
|
|
|
|
}
|
|
|
|
|
CurrentQuery.stmt_global_id=stmt_global_id;
|
|
|
|
|
// now we get the statement information
|
|
|
|
|
MySQL_STMT_Global_info *stmt_info=NULL;
|
|
|
|
|
stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id);
|
|
|
|
|
if (stmt_info==NULL) {
|
|
|
|
|
// we couldn't find it
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Prepared statement doesn't exist", true);
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
CurrentQuery.stmt_info=stmt_info;
|
|
|
|
|
CurrentQuery.start_time=thread->curtime;
|
|
|
|
|
|
|
|
|
|
timespec begint;
|
|
|
|
|
timespec endt;
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
|
|
|
|
|
}
|
|
|
|
|
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
|
|
|
|
|
if (qpo->max_lag_ms >= 0) {
|
|
|
|
|
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
|
|
|
|
|
}
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
|
|
|
|
|
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
|
|
|
|
|
(endt.tv_sec*1000000000+endt.tv_nsec) -
|
|
|
|
|
(begint.tv_sec*1000000000+begint.tv_nsec);
|
|
|
|
|
}
|
|
|
|
|
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
|
|
|
|
|
// we now take the metadata associated with STMT_EXECUTE from MySQL_STMTs_meta
|
|
|
|
|
bool stmt_meta_found=true; // let's be optimistic and we assume we will found it
|
|
|
|
|
stmt_execute_metadata_t *stmt_meta=sess_STMTs_meta->find(stmt_global_id);
|
|
|
|
|
if (stmt_meta==NULL) { // we couldn't find any metadata
|
|
|
|
|
stmt_meta_found=false;
|
|
|
|
|
}
|
|
|
|
|
stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info, &stmt_meta);
|
|
|
|
|
if (stmt_meta==NULL) {
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Error in prepared statement execution", true);
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
//__sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count
|
|
|
|
|
stmt_info=NULL;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (stmt_meta_found==false) {
|
|
|
|
|
// previously we didn't find any metadata
|
|
|
|
|
// but as we reached here, stmt_meta is not null and we save the metadata
|
|
|
|
|
sess_STMTs_meta->insert(stmt_global_id,stmt_meta);
|
|
|
|
|
}
|
|
|
|
|
// else
|
|
|
|
|
|
|
|
|
|
CurrentQuery.stmt_meta=stmt_meta;
|
|
|
|
|
//current_hostgroup=qpo->destination_hostgroup;
|
|
|
|
|
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, true);
|
|
|
|
|
if (rc_break==true) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6
|
|
|
|
|
if (locked_on_hostgroup < 0) {
|
|
|
|
|
if (lock_hostgroup) {
|
|
|
|
|
// we are locking on hostgroup now
|
|
|
|
|
locked_on_hostgroup = current_hostgroup;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (locked_on_hostgroup >= 0) {
|
|
|
|
|
if (current_hostgroup != locked_on_hostgroup) {
|
|
|
|
|
client_myds->DSS=STATE_QUERY_SENT_NET;
|
|
|
|
|
//int l = CurrentQuery.QueryLength;
|
|
|
|
|
int l = CurrentQuery.stmt_info->query_length;
|
|
|
|
|
char *end = (char *)"";
|
|
|
|
|
if (l>256) {
|
|
|
|
|
l=253;
|
|
|
|
|
end = (char *)"...";
|
|
|
|
|
}
|
|
|
|
|
string nqn = string((char *)CurrentQuery.stmt_info->query,l);
|
|
|
|
|
char *err_msg = (char *)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s";
|
|
|
|
|
char *buf = (char *)malloc(strlen(err_msg)+strlen(nqn.c_str())+strlen(end)+64);
|
|
|
|
|
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true);
|
|
|
|
|
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
|
|
|
|
|
RequestEnd(NULL);
|
|
|
|
|
free(buf);
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
mybe=find_or_create_backend(current_hostgroup);
|
|
|
|
|
status=PROCESSING_STMT_EXECUTE;
|
|
|
|
|
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
|
|
|
|
|
mybe->server_myds->wait_until=0;
|
|
|
|
|
mybe->server_myds->killed_at=0;
|
|
|
|
|
mybe->server_myds->kill_type=0;
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
|
|
|
|
|
int handler_ret = 0;
|
|
|
|
|
@ -3139,232 +3383,10 @@ __get_pkts_from_client:
|
|
|
|
|
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_SEND_LONG_DATA(pkt);
|
|
|
|
|
break;
|
|
|
|
|
case _MYSQL_COM_STMT_PREPARE:
|
|
|
|
|
if (session_type != PROXYSQL_SESSION_MYSQL) { // only MySQL module supports prepared statement!!
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Command not supported");
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
|
|
|
|
thread->status_variables.stvar[st_var_frontend_stmt_prepare]++;
|
|
|
|
|
thread->status_variables.stvar[st_var_queries]++;
|
|
|
|
|
// if we reach here, we are not on MySQL module
|
|
|
|
|
bool rc_break=false;
|
|
|
|
|
bool lock_hostgroup = false;
|
|
|
|
|
|
|
|
|
|
// Note: CurrentQuery sees the query as sent by the client.
|
|
|
|
|
// shortly after, the packets it used to contain the query will be deallocated
|
|
|
|
|
// Note2 : we call the next function as if it was _MYSQL_COM_QUERY
|
|
|
|
|
// because the offset will be identical
|
|
|
|
|
CurrentQuery.begin((unsigned char *)pkt.ptr,pkt.size,true);
|
|
|
|
|
|
|
|
|
|
timespec begint;
|
|
|
|
|
timespec endt;
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
|
|
|
|
|
}
|
|
|
|
|
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
|
|
|
|
|
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
|
|
|
|
|
(endt.tv_sec*1000000000+endt.tv_nsec) -
|
|
|
|
|
(begint.tv_sec*1000000000+begint.tv_nsec);
|
|
|
|
|
}
|
|
|
|
|
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
|
|
|
|
|
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup);
|
|
|
|
|
if (rc_break==true) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6
|
|
|
|
|
if (locked_on_hostgroup < 0) {
|
|
|
|
|
if (lock_hostgroup) {
|
|
|
|
|
// we are locking on hostgroup now
|
|
|
|
|
locked_on_hostgroup = current_hostgroup;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (locked_on_hostgroup >= 0) {
|
|
|
|
|
if (current_hostgroup != locked_on_hostgroup) {
|
|
|
|
|
client_myds->DSS=STATE_QUERY_SENT_NET;
|
|
|
|
|
int l = CurrentQuery.QueryLength;
|
|
|
|
|
char *end = (char *)"";
|
|
|
|
|
if (l>256) {
|
|
|
|
|
l=253;
|
|
|
|
|
end = (char *)"...";
|
|
|
|
|
}
|
|
|
|
|
string nqn = string((char *)CurrentQuery.QueryPointer,l);
|
|
|
|
|
char *err_msg = (char *)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s";
|
|
|
|
|
char *buf = (char *)malloc(strlen(err_msg)+strlen(nqn.c_str())+strlen(end)+64);
|
|
|
|
|
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true);
|
|
|
|
|
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
|
|
|
|
|
RequestEnd(NULL);
|
|
|
|
|
free(buf);
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
mybe=find_or_create_backend(current_hostgroup);
|
|
|
|
|
if (client_myds->myconn->local_stmts==NULL) {
|
|
|
|
|
client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true);
|
|
|
|
|
}
|
|
|
|
|
uint64_t hash=client_myds->myconn->local_stmts->compute_hash((char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
|
|
|
|
|
MySQL_STMT_Global_info *stmt_info=NULL;
|
|
|
|
|
// we first lock GloStmt
|
|
|
|
|
GloMyStmt->wrlock();
|
|
|
|
|
stmt_info=GloMyStmt->find_prepared_statement_by_hash(hash,false);
|
|
|
|
|
if (stmt_info) {
|
|
|
|
|
// the prepared statement exists in GloMyStmt
|
|
|
|
|
// for this reason, we do not need to prepare it again, and we can already reply to the client
|
|
|
|
|
// we will now generate a unique stmt and send it to the client
|
|
|
|
|
uint32_t new_stmt_id=client_myds->myconn->local_stmts->generate_new_client_stmt_id(stmt_info->statement_id);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info,new_stmt_id);
|
|
|
|
|
LogQuery(NULL);
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
CurrentQuery.end_time=thread->curtime;
|
|
|
|
|
CurrentQuery.end();
|
|
|
|
|
} else {
|
|
|
|
|
mybe=find_or_create_backend(current_hostgroup);
|
|
|
|
|
status=PROCESSING_STMT_PREPARE;
|
|
|
|
|
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
|
|
|
|
|
mybe->server_myds->wait_until=0;
|
|
|
|
|
pause_until=0;
|
|
|
|
|
mybe->server_myds->killed_at=0;
|
|
|
|
|
mybe->server_myds->kill_type=0;
|
|
|
|
|
mybe->server_myds->mysql_real_query.init(&pkt); // fix memory leak for PREPARE in prepared statements #796
|
|
|
|
|
mybe->server_myds->statuses.questions++;
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
}
|
|
|
|
|
GloMyStmt->unlock();
|
|
|
|
|
break; // make sure to not break before unlocking GloMyStmt
|
|
|
|
|
}
|
|
|
|
|
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(pkt);
|
|
|
|
|
break;
|
|
|
|
|
case _MYSQL_COM_STMT_EXECUTE:
|
|
|
|
|
if (session_type != PROXYSQL_SESSION_MYSQL) { // only MySQL module supports prepared statement!!
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Command not supported");
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
|
|
|
|
// if we reach here, we are on MySQL module
|
|
|
|
|
bool rc_break=false;
|
|
|
|
|
bool lock_hostgroup = false;
|
|
|
|
|
thread->status_variables.stvar[st_var_frontend_stmt_execute]++;
|
|
|
|
|
thread->status_variables.stvar[st_var_queries]++;
|
|
|
|
|
uint32_t client_stmt_id=0;
|
|
|
|
|
uint64_t stmt_global_id=0;
|
|
|
|
|
memcpy(&client_stmt_id,(char *)pkt.ptr+5,sizeof(uint32_t));
|
|
|
|
|
stmt_global_id=client_myds->myconn->local_stmts->find_global_stmt_id_from_client(client_stmt_id);
|
|
|
|
|
if (stmt_global_id == 0) {
|
|
|
|
|
// FIXME: add error handling
|
|
|
|
|
assert(0);
|
|
|
|
|
}
|
|
|
|
|
CurrentQuery.stmt_global_id=stmt_global_id;
|
|
|
|
|
// now we get the statement information
|
|
|
|
|
MySQL_STMT_Global_info *stmt_info=NULL;
|
|
|
|
|
stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id);
|
|
|
|
|
if (stmt_info==NULL) {
|
|
|
|
|
// we couldn't find it
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Prepared statement doesn't exist", true);
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
CurrentQuery.stmt_info=stmt_info;
|
|
|
|
|
CurrentQuery.start_time=thread->curtime;
|
|
|
|
|
|
|
|
|
|
timespec begint;
|
|
|
|
|
timespec endt;
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
|
|
|
|
|
}
|
|
|
|
|
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
|
|
|
|
|
if (qpo->max_lag_ms >= 0) {
|
|
|
|
|
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
|
|
|
|
|
}
|
|
|
|
|
if (thread->variables.stats_time_query_processor) {
|
|
|
|
|
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
|
|
|
|
|
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
|
|
|
|
|
(endt.tv_sec*1000000000+endt.tv_nsec) -
|
|
|
|
|
(begint.tv_sec*1000000000+begint.tv_nsec);
|
|
|
|
|
}
|
|
|
|
|
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
|
|
|
|
|
// we now take the metadata associated with STMT_EXECUTE from MySQL_STMTs_meta
|
|
|
|
|
bool stmt_meta_found=true; // let's be optimistic and we assume we will found it
|
|
|
|
|
stmt_execute_metadata_t *stmt_meta=sess_STMTs_meta->find(stmt_global_id);
|
|
|
|
|
if (stmt_meta==NULL) { // we couldn't find any metadata
|
|
|
|
|
stmt_meta_found=false;
|
|
|
|
|
}
|
|
|
|
|
stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info, &stmt_meta);
|
|
|
|
|
if (stmt_meta==NULL) {
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",(char *)"Error in prepared statement execution", true);
|
|
|
|
|
client_myds->DSS=STATE_SLEEP;
|
|
|
|
|
status=WAITING_CLIENT_DATA;
|
|
|
|
|
//__sync_fetch_and_sub(&stmt_info->ref_count,1); // decrease reference count
|
|
|
|
|
stmt_info=NULL;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (stmt_meta_found==false) {
|
|
|
|
|
// previously we didn't find any metadata
|
|
|
|
|
// but as we reached here, stmt_meta is not null and we save the metadata
|
|
|
|
|
sess_STMTs_meta->insert(stmt_global_id,stmt_meta);
|
|
|
|
|
}
|
|
|
|
|
// else
|
|
|
|
|
|
|
|
|
|
CurrentQuery.stmt_meta=stmt_meta;
|
|
|
|
|
//current_hostgroup=qpo->destination_hostgroup;
|
|
|
|
|
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, true);
|
|
|
|
|
if (rc_break==true) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6
|
|
|
|
|
if (locked_on_hostgroup < 0) {
|
|
|
|
|
if (lock_hostgroup) {
|
|
|
|
|
// we are locking on hostgroup now
|
|
|
|
|
locked_on_hostgroup = current_hostgroup;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (locked_on_hostgroup >= 0) {
|
|
|
|
|
if (current_hostgroup != locked_on_hostgroup) {
|
|
|
|
|
client_myds->DSS=STATE_QUERY_SENT_NET;
|
|
|
|
|
//int l = CurrentQuery.QueryLength;
|
|
|
|
|
int l = CurrentQuery.stmt_info->query_length;
|
|
|
|
|
char *end = (char *)"";
|
|
|
|
|
if (l>256) {
|
|
|
|
|
l=253;
|
|
|
|
|
end = (char *)"...";
|
|
|
|
|
}
|
|
|
|
|
string nqn = string((char *)CurrentQuery.stmt_info->query,l);
|
|
|
|
|
char *err_msg = (char *)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s";
|
|
|
|
|
char *buf = (char *)malloc(strlen(err_msg)+strlen(nqn.c_str())+strlen(end)+64);
|
|
|
|
|
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
|
|
|
|
|
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true);
|
|
|
|
|
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
|
|
|
|
|
RequestEnd(NULL);
|
|
|
|
|
free(buf);
|
|
|
|
|
l_free(pkt.size,pkt.ptr);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
mybe=find_or_create_backend(current_hostgroup);
|
|
|
|
|
status=PROCESSING_STMT_EXECUTE;
|
|
|
|
|
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
|
|
|
|
|
mybe->server_myds->wait_until=0;
|
|
|
|
|
mybe->server_myds->killed_at=0;
|
|
|
|
|
mybe->server_myds->kill_type=0;
|
|
|
|
|
client_myds->setDSS_STATE_QUERY_SENT_NET();
|
|
|
|
|
}
|
|
|
|
|
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(pkt);
|
|
|
|
|
break;
|
|
|
|
|
case _MYSQL_COM_QUIT:
|
|
|
|
|
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n");
|
|
|
|
|
@ -3477,6 +3499,7 @@ __get_pkts_from_client:
|
|
|
|
|
}
|
|
|
|
|
return handler_ret;
|
|
|
|
|
}
|
|
|
|
|
// end of MySQL_Session::get_pkts_from_client()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// this function returns:
|
|
|
|
|
|