Basic implementation of prepared statements (issue #200)

pull/248/head
René Cannaò 11 years ago
parent efca45c7f6
commit 4b8f5784db

@ -5,6 +5,18 @@
#include "cpp.h"
class MySQL_Prepared_Stmt_info {
public:
uint32_t statement_id;
uint16_t num_columns;
uint16_t num_params;
uint16_t warning_count;
uint16_t pending_num_columns;
uint16_t pending_num_params;
MySQL_Prepared_Stmt_info(unsigned char *, unsigned int);
};
class MySQL_Protocol {
private:
MySQL_Data_Stream **myds;
@ -14,6 +26,7 @@ class MySQL_Protocol {
#ifdef DEBUG
bool dump_pkt;
#endif
MySQL_Prepared_Stmt_info *current_PreStmt;
uint16_t prot_status;
MySQL_Data_Stream *get_myds() { return *myds; }
void init(MySQL_Data_Stream **, MySQL_Connection_userinfo *, MySQL_Session *);

@ -54,6 +54,9 @@ class MySQL_Connection {
//MYSQL myconn;
//MySQL_Hostgroup_Entry *mshge;
bool reusable;
bool has_prepared_statement;
bool processing_prepared_statement_prepare;
bool processing_prepared_statement_execute;
MySQL_Connection();
~MySQL_Connection();
// int assign_mshge(unsigned int);

@ -49,6 +49,10 @@ class MySQL_Session
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_FIELD_LIST(PtrSize_t *);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_INIT_DB(PtrSize_t *);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PING(PtrSize_t *);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(PtrSize_t *);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *);
#ifdef DEBUG
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_debug(PtrSize_t *);
#endif /* DEBUG */

@ -84,6 +84,7 @@ enum mysql_data_stream_status {
STATE_OK,
STATE_ERR,
STATE_READING_COM_STMT_PREPARE_RESPONSE,
STATE_END
/*

@ -435,10 +435,29 @@ int pkt_handshake_server(unsigned char *pkt, unsigned int length, MySQL_Protocol
}
MySQL_Prepared_Stmt_info::MySQL_Prepared_Stmt_info(unsigned char *pkt, unsigned int length) {
pkt += 5;
statement_id = CPY4(pkt);
pkt += sizeof(uint32_t);
num_columns = CPY2(pkt);
pkt += sizeof(uint16_t);
num_params = CPY2(pkt);
pkt += sizeof(uint16_t);
pkt++; // reserved_1
warning_count = CPY2(pkt);
fprintf(stderr,"Generating prepared statement with id=%d, cols=%d, params=%d, warns=%d\n", statement_id, num_columns, num_params, warning_count);
pending_num_columns=num_columns;
pending_num_params=num_params;
}
void MySQL_Protocol::init(MySQL_Data_Stream **__myds, MySQL_Connection_userinfo *__userinfo, MySQL_Session *__sess) {
myds=__myds;
userinfo=__userinfo;
sess=__sess;
current_PreStmt=NULL;
}
int MySQL_Protocol::pkt_handshake_client(unsigned char *pkt, unsigned int length) {

@ -365,6 +365,15 @@ int MySQL_Session::handler() {
l_free(pkt.size,pkt.ptr);
}
break;
case _MYSQL_COM_STMT_PREPARE:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(&pkt);
break;
case _MYSQL_COM_STMT_EXECUTE:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(&pkt);
break;
case _MYSQL_COM_STMT_CLOSE:
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
break;
case _MYSQL_COM_QUIT:
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n");
l_free(pkt.size,pkt.ptr);
@ -432,7 +441,10 @@ __get_a_backend:
}
//mybe->server_myds->myprot.init(&mybe->server_myds, mybe->myconn->userinfo, this);
mybe->server_myds->myprot.init(&mybe->server_myds, mybe->server_myds->myconn->userinfo, this);
if (client_myds->myconn->has_prepared_statement==true) {
mybe->server_myds->myconn->has_prepared_statement=true;
mybe->server_myds->myconn->reusable=false;
}
// FIXME : handle missing connection from connection pool
// FIXME : perhaps is a goto __exit_DSS__STATE_NOT_INITIALIZED after setting time wait
@ -470,6 +482,10 @@ __get_a_backend:
} else {
//server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
// if (client_myds->myconn->processing_prepared_statement) {
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
mybe->server_myds->myconn->processing_prepared_statement_execute=client_myds->myconn->processing_prepared_statement_execute;
// }
status=WAITING_SERVER_DATA;
}
}
@ -521,6 +537,10 @@ __exit_DSS__STATE_NOT_INITIALIZED:
handler___status_WAITING_SERVER_DATA___STATE_EOF1(&pkt);
break;
case STATE_READING_COM_STMT_PREPARE_RESPONSE:
handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(&pkt);
break;
default:
assert(0);
}
@ -641,6 +661,8 @@ bool MySQL_Session::handler___status_CHANGING_SCHEMA(PtrSize_t *pkt) {
mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
}
// set prepared statement processing
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
return true;
} else {
l_free(pkt->size,pkt->ptr);
@ -668,6 +690,8 @@ bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) {
mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
}
// set prepared statement processing
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
return true;
} else {
l_free(pkt->size,pkt->ptr);
@ -695,6 +719,8 @@ bool MySQL_Session::handler___status_CHANGING_CHARSET(PtrSize_t *pkt) {
mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
}
// set prepared statement processing
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
return true;
} else {
l_free(pkt->size,pkt->ptr);
@ -739,6 +765,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrS
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_SERVER_DATA - STATE_QUERY_SENT\n");
unsigned char c;
c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
if (mybe->server_myds->myconn->processing_prepared_statement_prepare==false && mybe->server_myds->myconn->processing_prepared_statement_execute==false) {
if (c==0 || c==0xff) {
mybe->server_myds->DSS=STATE_READY;
/* multi-plexing attempt */
@ -771,6 +798,97 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrS
}
mybe->server_myds->DSS=STATE_ROW; // FIXME: this is catch all for now
}
} else {
// mybe->server_myds->myconn->processing_prepared_statement_prepare==true
if (mybe->server_myds->myconn->processing_prepared_statement_prepare==true) {
switch (c) {
case 0xff:
// ERR packet , send it to client
mybe->server_myds->DSS=STATE_READY;
mybe->server_myds->myconn->processing_prepared_statement_prepare=false;
client_myds->myconn->processing_prepared_statement_prepare=false;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
break;
case 0x00:
if (mybe->server_myds->myprot.current_PreStmt) delete mybe->server_myds->myprot.current_PreStmt;
mybe->server_myds->myprot.current_PreStmt=new MySQL_Prepared_Stmt_info((unsigned char *)pkt->ptr, pkt->size);
if (mybe->server_myds->myprot.current_PreStmt->num_columns+mybe->server_myds->myprot.current_PreStmt->num_params) {
mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE;
} else {
mybe->server_myds->DSS=STATE_READY;
mybe->server_myds->myconn->processing_prepared_statement_prepare=false;
client_myds->myconn->processing_prepared_statement_prepare=false;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
break;
default:
assert(0);
break;
}
} else {
// mybe->server_myds->myconn->processing_prepared_statement_execute==true
switch (c) {
case 0x00:
// OK packet , send it to client
case 0xff:
// ERR packet , send it to client
mybe->server_myds->DSS=STATE_READY;
mybe->server_myds->myconn->processing_prepared_statement_execute=false;
client_myds->myconn->processing_prepared_statement_execute=false;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
break;
/*
mybe->server_myds->myprot.current_PreStmt=new MySQL_Prepared_Stmt_info((unsigned char *)pkt->ptr, pkt->size);
if (mybe->server_myds->myprot.current_PreStmt->num_columns+mybe->server_myds->myprot.current_PreStmt->num_params) {
mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE;
} else {
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
break;
*/
default:
mybe->server_myds->DSS=STATE_ROW; // FIXME: this is catch all for now
//assert(0);
break;
}
// always send to client
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
}
}
}
void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *pkt) {
unsigned char c;
c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
fprintf(stderr,"%d %d\n", mybe->server_myds->myprot.current_PreStmt->pending_num_params, mybe->server_myds->myprot.current_PreStmt->pending_num_columns);
if (c==0xfe && pkt->size < 13) {
if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) {
mybe->server_myds->DSS=STATE_EOF1;
} else {
mybe->server_myds->myconn->processing_prepared_statement_prepare=false;
client_myds->myconn->processing_prepared_statement_prepare=false;
mybe->server_myds->DSS=STATE_READY;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
} else {
if (mybe->server_myds->myprot.current_PreStmt->pending_num_params) {
--mybe->server_myds->myprot.current_PreStmt->pending_num_params;
} else {
if (mybe->server_myds->myprot.current_PreStmt->pending_num_columns) {
--mybe->server_myds->myprot.current_PreStmt->pending_num_columns;
}
}
}
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
}
void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *pkt) {
@ -791,6 +909,8 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *
void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *pkt) {
unsigned char c;
c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
if (mybe->server_myds->myconn->processing_prepared_statement_prepare==false && mybe->server_myds->myconn->processing_prepared_statement_execute==false)
{
if (qpo && qpo->cache_ttl>0) {
mybe->server_myds->resultset->add(pkt->ptr, pkt->size);
mybe->server_myds->resultset_length+=pkt->size;
@ -835,6 +955,46 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t
CurrentQuery.end_time=thread->curtime;
CurrentQuery.query_parser_update_counters();
}
} else {
if (mybe->server_myds->myconn->processing_prepared_statement_prepare==true) {
fprintf(stderr,"EOF: %d %d\n", mybe->server_myds->myprot.current_PreStmt->pending_num_params, mybe->server_myds->myprot.current_PreStmt->pending_num_columns);
if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) {
if (mybe->server_myds->myprot.current_PreStmt->pending_num_params) {
--mybe->server_myds->myprot.current_PreStmt->pending_num_params;
} else {
if (mybe->server_myds->myprot.current_PreStmt->pending_num_columns) {
--mybe->server_myds->myprot.current_PreStmt->pending_num_columns;
}
}
if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) {
mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE;
// } else {
// mybe->server_myds->myconn->processing_prepared_statement_prepare=false;
// client_myds->myconn->processing_prepared_statement_prepare=false;
// mybe->server_myds->DSS=STATE_READY;
// status=WAITING_CLIENT_DATA;
// client_myds->DSS=STATE_SLEEP;
}
} else {
mybe->server_myds->myconn->processing_prepared_statement_prepare=false;
client_myds->myconn->processing_prepared_statement_prepare=false;
mybe->server_myds->DSS=STATE_READY;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
} else {
//mybe->server_myds->myconn->processing_prepared_statement_execute==true
if ((c==0xfe && pkt->size < 13) || c==0xff) {
mybe->server_myds->myconn->processing_prepared_statement_execute=false;
client_myds->myconn->processing_prepared_statement_execute=false;
mybe->server_myds->DSS=STATE_READY;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
}
}
}
@ -886,6 +1046,8 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(
myconn->options.compression_min_length=0;
myconn->set_status_compression(false);
}
// set prepared statement processing
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for backend: disconnecting\n");
l_free(pkt->size,pkt->ptr);
@ -923,7 +1085,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
//server_myds->myconn->userinfo->set(client_myds->myconn->userinfo);
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_CLIENT_AUTH_OK;
MySQL_Connection *myconn=client_myds->myconn;
//MySQL_Connection *myconn=client_myds->myconn;
/*
// enable compression
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
@ -1019,6 +1181,36 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(PtrSize_t *pkt) {
if (admin==false) {
client_myds->myconn->has_prepared_statement=true;
client_myds->myconn->processing_prepared_statement_prepare=true;
mybe=find_or_create_backend(default_hostgroup);
mybe->server_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
client_myds->setDSS_STATE_QUERY_SENT_NET();
} else {
l_free(pkt->size,pkt->ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
}
}
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(PtrSize_t *pkt) {
if (admin==false) {
//client_myds->myconn->has_prepared_statement_execute=true;
client_myds->myconn->processing_prepared_statement_execute=true;
mybe=find_or_create_backend(default_hostgroup);
mybe->server_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
client_myds->setDSS_STATE_QUERY_SENT_NET();
} else {
l_free(pkt->size,pkt->ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
}
}
#ifdef DEBUG
void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_debug(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got ProxySQL dbg packet\n");
@ -1033,7 +1225,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
char *query_no_space=(char *)l_alloc(query_length);
memcpy(query_no_space,query,query_length);
unsigned int query_no_space_length=remove_spaces(query_no_space);
/*unsigned int query_no_space_length=*/remove_spaces(query_no_space);
if (!strcasecmp(query_no_space,"DBG THREAD STATUS")) {
result = thread->SQL3_Thread_status(this);

@ -106,6 +106,9 @@ MySQL_Connection::MySQL_Connection() {
myds=NULL;
inserted_into_pool=0;
reusable=false;
has_prepared_statement=false;
processing_prepared_statement_prepare=false;
processing_prepared_statement_execute=false;
parent=NULL;
userinfo=new MySQL_Connection_userinfo();
fd=-1;

Loading…
Cancel
Save