Unstable changes to support prepared statements

So far, single thread sysbench is working.
Multi-threaded and resume from broken connections is still not working.
It means it still needs a lot of changes.
pull/739/head
René Cannaò 10 years ago
parent fd7fa08cf5
commit a68e5721eb

@ -91,7 +91,7 @@ class MySQL_STMT_Global_info {
int timeout;
int delay;
} properties;
//MYSQL_BIND **params; // seems unused
MYSQL_BIND **params; // seems unused (?)
MySQL_STMT_Global_info(uint32_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h);
~MySQL_STMT_Global_info();
};

@ -130,6 +130,7 @@ class MySQL_Protocol {
// prepared statements
bool generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info);
stmt_execute_metadata_t * get_binds_from_pkt(void *ptr, unsigned int size, uint16_t num_params);
//stmt_execute_metadata_t * get_binds_from_pkt(void *ptr, unsigned int size, uint16_t num_params);
stmt_execute_metadata_t * get_binds_from_pkt(void *ptr, unsigned int size, MySQL_STMT_Global_info *stmt_info);
};
#endif /* __CLASS_MYSQL_PROTOCOL_H */

@ -50,6 +50,7 @@ class MySQL_Connection {
unsigned long length;
char *ptr;
MYSQL_STMT *stmt;
MYSQL_RES *stmt_result;
stmt_execute_metadata_t *stmt_meta;
} query;
char scramble_buff[40];

@ -232,20 +232,24 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint32_t id, unsigned int h, char
fd->def = ( fs->def ? strdup(fs->def) : NULL );
}
}
/*
params=NULL;
if (num_params==2) {
PROXY_TRACE();
}
if(num_params) {
params=(MYSQL_BIND **)malloc(num_columns*sizeof(MYSQL_BIND *));
params=(MYSQL_BIND **)malloc(num_params*sizeof(MYSQL_BIND *));
uint16_t i;
for (i=0;i<num_params;i++) {
params[i]=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND));
MYSQL_BIND *ps=&(stmt->params[i]);
MYSQL_BIND *pd=params[i];
//MYSQL_BIND *ps=&(stmt->params[i]);
//MYSQL_BIND *pd=params[i];
// copy all params
memcpy(pd,ps,sizeof(MYSQL_BIND));
//memcpy(pd,ps,sizeof(MYSQL_BIND));
memset(params[i],0,sizeof(MYSQL_BIND));
}
}
*/
}
MySQL_STMT_Global_info::~MySQL_STMT_Global_info() {
@ -268,7 +272,7 @@ MySQL_STMT_Global_info::~MySQL_STMT_Global_info() {
free(fields);
fields=NULL;
}
/*
if (num_params) {
uint16_t i;
for (i=0;i<num_params;i++) {
@ -277,5 +281,5 @@ MySQL_STMT_Global_info::~MySQL_STMT_Global_info() {
free(params);
params=NULL;
}
*/
}

@ -792,7 +792,7 @@ bool MySQL_Protocol::generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_S
for (i=0; i<stmt_info->num_params; i++) {
generate_pkt_field(true,NULL,NULL,sid,
(char *)"", (char *)"", (char *)"", (char *)"?", (char *)"",
63,0,0,0,0,false,0,NULL); // NOTE: charset is 63 = binary !
63,0,253,128,0,false,0,NULL); // NOTE: charset is 63 = binary !
sid++;
}
generate_pkt_EOF(true,NULL,NULL,sid,0,SERVER_STATUS_AUTOCOMMIT); // FIXME : for now we pass a very broken flag
@ -1472,12 +1472,16 @@ void * MySQL_Protocol::Query_String_to_packet(uint8_t sid, std::string *s, unsig
}
// See https://dev.mysql.com/doc/internals/en/com-stmt-execute.html for reference
stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned int size, uint16_t num_params) {
stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned int size, MySQL_STMT_Global_info *stmt_info) {
stmt_execute_metadata_t *ret=NULL; //return NULL in case of failure
if (size<14) {
// some error!
return ret;
}
uint16_t num_params=stmt_info->num_params;
if (num_params==2) {
PROXY_TRACE();
}
//ret=(stmt_execute_metadata_t *)malloc(sizeof(stmt_execute_metadata_t));
ret= new stmt_execute_metadata_t();
char *p=(char *)ptr+5;
@ -1489,6 +1493,7 @@ stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned
// ret->is_nulls=NULL;
// ret->lengths=NULL;
ret->pkt=ptr;
uint8_t new_params_bound_flag;
if (num_params) {
uint16_t i;
size_t null_bitmap_length=(num_params+7)/8;
@ -1497,22 +1502,23 @@ stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned
delete ret;
return NULL;
}
uint8_t new_params_bound_flag;
memcpy(&new_params_bound_flag,p+null_bitmap_length,1);
if (new_params_bound_flag!=1) {
if (new_params_bound_flag==1) {
// something wrong
delete ret;
return NULL;
// delete ret;
// return NULL;
// the client is sending us the params type. We ignore it
//p+=(2*num_params);
}
uint8_t *null_bitmap=(uint8_t *)malloc(null_bitmap_length);
memcpy(null_bitmap,p,null_bitmap_length);
p+=null_bitmap_length;
p+=1; // new_params_bound_flag
MYSQL_BIND *binds=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)*num_params);
my_bool *is_nulls=(my_bool *)malloc(sizeof(my_bool)*num_params);
unsigned long *lengths=(unsigned long *)malloc(sizeof(unsigned long)*num_params);
ret->binds=binds;
my_bool *is_nulls=(my_bool *)malloc(sizeof(my_bool)*num_params);
ret->is_nulls=is_nulls;
unsigned long *lengths=(unsigned long *)malloc(sizeof(unsigned long)*num_params);
ret->lengths=lengths;
for (i=0;i<num_params;i++) {
// set null
@ -1521,6 +1527,9 @@ stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned
my_bool is_null = (null_byte & ( 1 << idx )) >> idx;
is_nulls[i]=is_null;
binds[i].is_null=&is_nulls[i];
}
if (new_params_bound_flag) {
for (i=0;i<num_params;i++) {
// set buffer_type and is_unsigned
//enum enum_field_types buffer_type=MYSQL_TYPE_DECIMAL; // set a random default
uint16_t buffer_type=0;
@ -1540,7 +1549,14 @@ stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned
// lengths[i]=l;
// p+=ll;
binds[i].length=&lengths[i];
stmt_info->params[i]->buffer_type=binds[i].buffer_type;
}
} else {
for (i=0;i<num_params;i++) {
binds[i].buffer_type=stmt_info->params[i]->buffer_type;
lengths[i]=0;
binds[i].length=&lengths[i];
}
}
for (i=0;i<num_params;i++) {
if (is_nulls[i]==true) {
@ -1607,6 +1623,18 @@ stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned
}
}
}
/*
#ifdef DEBUG
// debug
fprintf(stderr,"STMT_EXEC: %d\n",ret->stmt_id);
if (num_params==2) {
PROXY_TRACE();
}
for (int i=0;i<num_params;i++) {
fprintf(stderr," Param %d, is_null=%d, type=%d\n", i, *(ret->binds[i].is_null), ret->binds[i].buffer_type);
}
#endif
*/
return ret;
}
@ -1660,7 +1688,7 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
// int column_count=mysql_num_fields(prepare_meta_result);
// MYSQL_BIND *binds=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)*column_count);
// mysql_stmt_bind_result(_stmt, binds);
fprintf(stdout, "Fetching results ...\n");
//fprintf(stdout, "Fetching results ...\n");
// int row_count=0;
// while (!mysql_stmt_fetch(_stmt)) {
// row_count++;
@ -1669,6 +1697,7 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
// free (binds);
unsigned long long total_size=0;
MYSQL_ROWS *r=_stmt->result.data;
if (r) {
total_size+=r->length;
if (r->length > 0xFFFFFF) {
total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr);
@ -1704,6 +1733,7 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
}
PSarrayOUT->add(pkt.ptr,pkt.size);
resultset_size+=pkt.size;
}
add_eof();
}
}

@ -1455,8 +1455,9 @@ __get_pkts_from_client:
break;
case _MYSQL_COM_STMT_CLOSE:
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
// FIXME: this is not complete. Counters should be decreased
// client_myds->setDSS_STATE_QUERY_SENT_NET();
// client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
@ -1551,7 +1552,8 @@ __get_pkts_from_client:
status=WAITING_CLIENT_DATA;
break;
}
stmt_execute_metadata_t *stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info->num_params);
//stmt_execute_metadata_t *stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info->num_params);
stmt_execute_metadata_t *stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info);
if (stmt_meta==NULL) {
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
@ -1874,9 +1876,17 @@ handler_again:
*/
// }
myds->myconn->local_stmts->insert(stmid,CurrentQuery.mysql_stmt);
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info);
CurrentQuery.mysql_stmt=NULL;
enum session_status st=status;
size_t sts=previous_status.size();
if (sts) {
st=previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE(st);
} else {
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info);
}
}
CurrentQuery.mysql_stmt=NULL;
break;
case PROCESSING_STMT_EXECUTE:
{
@ -2054,6 +2064,15 @@ handler_again:
client_myds->pkt_sid++;
}
break;
case PROCESSING_STMT_EXECUTE:
//MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS, true);
{
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(myconn->mysql),sqlstate,(char *)mysql_stmt_error(myconn->query.stmt));
client_myds->pkt_sid++;
}
break;
default:
assert(0);
break;
@ -2670,6 +2689,26 @@ void MySQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt) {
if (stmt_result) {
MySQL_ResultSet *MyRS=new MySQL_ResultSet(&client_myds->myprot, stmt_result, stmt->mysql, stmt);
bool resultset_completed=MyRS->get_resultset(client_myds->PSarrayOUT);
} else {
MYSQL *mysql=stmt->mysql;
// no result set
int myerrno=mysql_stmt_errno(stmt);
if (myerrno==0) {
unsigned int num_rows = mysql_affected_rows(stmt->mysql);
unsigned int nTrx=NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0 );
if (autocommit) setStatus += SERVER_STATUS_AUTOCOMMIT;
if (mysql->server_status & SERVER_MORE_RESULTS_EXIST)
setStatus += SERVER_MORE_RESULTS_EXIST;
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,client_myds->pkt_sid+1,num_rows,mysql->insert_id,mysql->server_status|setStatus,mysql->warning_count,mysql->info);
client_myds->pkt_sid++;
} else {
// error
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(mysql),sqlstate,mysql_error(mysql));
client_myds->pkt_sid++;
}
}
}

@ -173,6 +173,7 @@ MySQL_Connection::MySQL_Connection() {
query.length=0;
query.stmt=NULL;
query.stmt_meta=NULL;
query.stmt_result=NULL;
largest_query_length=0;
MyRS=NULL;
creation_time=0;
@ -727,8 +728,8 @@ handler_again:
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
{
MYSQL_RES *stmt_result=mysql_stmt_result_metadata(query.stmt);
if (stmt_result==NULL) {
query.stmt_result=mysql_stmt_result_metadata(query.stmt);
if (query.stmt_result==NULL) {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
}
@ -1091,6 +1092,7 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length,
}
}
if (async_state_machine==ASYNC_STMT_EXECUTE_END) {
query.stmt_meta=NULL;
async_state_machine=ASYNC_QUERY_END;
if (mysql_stmt_errno(query.stmt)) {
return -1;
@ -1099,6 +1101,7 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length,
}
}
if (async_state_machine==ASYNC_STMT_PREPARE_SUCCESSFUL || async_state_machine==ASYNC_STMT_PREPARE_FAILED) {
query.stmt_meta=NULL;
if (async_state_machine==ASYNC_STMT_PREPARE_FAILED) {
//mysql_stmt_close(query.stmt);
//query.stmt=NULL;
@ -1320,6 +1323,10 @@ void MySQL_Connection::async_free_result() {
query.ptr=NULL;
query.length=0;
}
if (query.stmt_result) {
mysql_free_result(query.stmt_result);
query.stmt_result=NULL;
}
if (mysql_result) {
mysql_free_result(mysql_result);
mysql_result=NULL;

Loading…
Cancel
Save