From 4b8f5784dbd35a272f65e5a26df814fe0db68f86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 21 Mar 2015 11:29:40 +0000 Subject: [PATCH] Basic implementation of prepared statements (issue #200) --- include/MySQL_Protocol.h | 13 +++ include/mysql_connection.h | 3 + include/mysql_session.h | 4 + include/proxysql_structs.h | 1 + lib/MySQL_Protocol.cpp | 19 ++++ lib/MySQL_Session.cpp | 198 ++++++++++++++++++++++++++++++++++++- lib/mysql_connection.cpp | 3 + 7 files changed, 238 insertions(+), 3 deletions(-) diff --git a/include/MySQL_Protocol.h b/include/MySQL_Protocol.h index 6b231e01c..c8ccfa332 100644 --- a/include/MySQL_Protocol.h +++ b/include/MySQL_Protocol.h @@ -5,6 +5,18 @@ #include "cpp.h" +class MySQL_Prepared_Stmt_info { + public: + uint32_t statement_id; + uint16_t num_columns; + uint16_t num_params; + uint16_t warning_count; + uint16_t pending_num_columns; + uint16_t pending_num_params; + MySQL_Prepared_Stmt_info(unsigned char *, unsigned int); +}; + + class MySQL_Protocol { private: MySQL_Data_Stream **myds; @@ -14,6 +26,7 @@ class MySQL_Protocol { #ifdef DEBUG bool dump_pkt; #endif + MySQL_Prepared_Stmt_info *current_PreStmt; uint16_t prot_status; MySQL_Data_Stream *get_myds() { return *myds; } void init(MySQL_Data_Stream **, MySQL_Connection_userinfo *, MySQL_Session *); diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 0cf8b01d8..c92f27e8a 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -54,6 +54,9 @@ class MySQL_Connection { //MYSQL myconn; //MySQL_Hostgroup_Entry *mshge; bool reusable; + bool has_prepared_statement; + bool processing_prepared_statement_prepare; + bool processing_prepared_statement_execute; MySQL_Connection(); ~MySQL_Connection(); // int assign_mshge(unsigned int); diff --git a/include/mysql_session.h b/include/mysql_session.h index 247863e01..b4a479abc 100644 --- a/include/mysql_session.h +++ b/include/mysql_session.h @@ -49,6 +49,10 @@ class MySQL_Session void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_FIELD_LIST(PtrSize_t *); void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_INIT_DB(PtrSize_t *); void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PING(PtrSize_t *); + + void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(PtrSize_t *); + void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(PtrSize_t *); + void handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *); #ifdef DEBUG void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_debug(PtrSize_t *); #endif /* DEBUG */ diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 22fe8c78b..3ac0e2acf 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -84,6 +84,7 @@ enum mysql_data_stream_status { STATE_OK, STATE_ERR, + STATE_READING_COM_STMT_PREPARE_RESPONSE, STATE_END /* diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 92d3db3b4..a102d62fa 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -435,10 +435,29 @@ int pkt_handshake_server(unsigned char *pkt, unsigned int length, MySQL_Protocol } + +MySQL_Prepared_Stmt_info::MySQL_Prepared_Stmt_info(unsigned char *pkt, unsigned int length) { + pkt += 5; + statement_id = CPY4(pkt); + pkt += sizeof(uint32_t); + num_columns = CPY2(pkt); + pkt += sizeof(uint16_t); + num_params = CPY2(pkt); + pkt += sizeof(uint16_t); + pkt++; // reserved_1 + warning_count = CPY2(pkt); + fprintf(stderr,"Generating prepared statement with id=%d, cols=%d, params=%d, warns=%d\n", statement_id, num_columns, num_params, warning_count); + pending_num_columns=num_columns; + pending_num_params=num_params; +} + + + void MySQL_Protocol::init(MySQL_Data_Stream **__myds, MySQL_Connection_userinfo *__userinfo, MySQL_Session *__sess) { myds=__myds; userinfo=__userinfo; sess=__sess; + current_PreStmt=NULL; } int MySQL_Protocol::pkt_handshake_client(unsigned char *pkt, unsigned int length) { diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 5310b7cd3..5acbf8416 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -365,6 +365,15 @@ int MySQL_Session::handler() { l_free(pkt.size,pkt.ptr); } break; + case _MYSQL_COM_STMT_PREPARE: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(&pkt); + break; + case _MYSQL_COM_STMT_EXECUTE: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(&pkt); + break; + case _MYSQL_COM_STMT_CLOSE: + mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); + break; case _MYSQL_COM_QUIT: proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n"); l_free(pkt.size,pkt.ptr); @@ -432,7 +441,10 @@ __get_a_backend: } //mybe->server_myds->myprot.init(&mybe->server_myds, mybe->myconn->userinfo, this); mybe->server_myds->myprot.init(&mybe->server_myds, mybe->server_myds->myconn->userinfo, this); - + if (client_myds->myconn->has_prepared_statement==true) { + mybe->server_myds->myconn->has_prepared_statement=true; + mybe->server_myds->myconn->reusable=false; + } // FIXME : handle missing connection from connection pool // FIXME : perhaps is a goto __exit_DSS__STATE_NOT_INITIALIZED after setting time wait @@ -470,6 +482,10 @@ __get_a_backend: } else { //server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); mybe->server_myds->DSS=STATE_QUERY_SENT_DS; +// if (client_myds->myconn->processing_prepared_statement) { + mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; + mybe->server_myds->myconn->processing_prepared_statement_execute=client_myds->myconn->processing_prepared_statement_execute; +// } status=WAITING_SERVER_DATA; } } @@ -521,6 +537,10 @@ __exit_DSS__STATE_NOT_INITIALIZED: handler___status_WAITING_SERVER_DATA___STATE_EOF1(&pkt); break; + case STATE_READING_COM_STMT_PREPARE_RESPONSE: + handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(&pkt); + break; + default: assert(0); } @@ -641,6 +661,8 @@ bool MySQL_Session::handler___status_CHANGING_SCHEMA(PtrSize_t *pkt) { mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); mybe->server_myds->DSS=STATE_QUERY_SENT_DS; } + // set prepared statement processing + mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; return true; } else { l_free(pkt->size,pkt->ptr); @@ -668,6 +690,8 @@ bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) { mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); mybe->server_myds->DSS=STATE_QUERY_SENT_DS; } + // set prepared statement processing + mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; return true; } else { l_free(pkt->size,pkt->ptr); @@ -695,6 +719,8 @@ bool MySQL_Session::handler___status_CHANGING_CHARSET(PtrSize_t *pkt) { mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); mybe->server_myds->DSS=STATE_QUERY_SENT_DS; } + // set prepared statement processing + mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; return true; } else { l_free(pkt->size,pkt->ptr); @@ -739,6 +765,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrS proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_SERVER_DATA - STATE_QUERY_SENT\n"); unsigned char c; c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==false && mybe->server_myds->myconn->processing_prepared_statement_execute==false) { if (c==0 || c==0xff) { mybe->server_myds->DSS=STATE_READY; /* multi-plexing attempt */ @@ -771,6 +798,97 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrS } mybe->server_myds->DSS=STATE_ROW; // FIXME: this is catch all for now } + } else { + // mybe->server_myds->myconn->processing_prepared_statement_prepare==true + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==true) { + switch (c) { + case 0xff: + // ERR packet , send it to client + mybe->server_myds->DSS=STATE_READY; + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + break; + case 0x00: + if (mybe->server_myds->myprot.current_PreStmt) delete mybe->server_myds->myprot.current_PreStmt; + mybe->server_myds->myprot.current_PreStmt=new MySQL_Prepared_Stmt_info((unsigned char *)pkt->ptr, pkt->size); + if (mybe->server_myds->myprot.current_PreStmt->num_columns+mybe->server_myds->myprot.current_PreStmt->num_params) { + mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE; + } else { + mybe->server_myds->DSS=STATE_READY; + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + break; + default: + assert(0); + break; + } + } else { + // mybe->server_myds->myconn->processing_prepared_statement_execute==true + switch (c) { + case 0x00: + // OK packet , send it to client + case 0xff: + // ERR packet , send it to client + mybe->server_myds->DSS=STATE_READY; + mybe->server_myds->myconn->processing_prepared_statement_execute=false; + client_myds->myconn->processing_prepared_statement_execute=false; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + break; +/* + mybe->server_myds->myprot.current_PreStmt=new MySQL_Prepared_Stmt_info((unsigned char *)pkt->ptr, pkt->size); + if (mybe->server_myds->myprot.current_PreStmt->num_columns+mybe->server_myds->myprot.current_PreStmt->num_params) { + mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE; + } else { + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + break; +*/ + default: + mybe->server_myds->DSS=STATE_ROW; // FIXME: this is catch all for now + //assert(0); + break; + } + // always send to client + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } + } +} + +void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *pkt) { + unsigned char c; + c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + + fprintf(stderr,"%d %d\n", mybe->server_myds->myprot.current_PreStmt->pending_num_params, mybe->server_myds->myprot.current_PreStmt->pending_num_columns); + if (c==0xfe && pkt->size < 13) { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + mybe->server_myds->DSS=STATE_EOF1; + } else { + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + mybe->server_myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + } else { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_params; + } else { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_columns; + } + } + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); } void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *pkt) { @@ -791,6 +909,8 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t * void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *pkt) { unsigned char c; c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==false && mybe->server_myds->myconn->processing_prepared_statement_execute==false) +{ if (qpo && qpo->cache_ttl>0) { mybe->server_myds->resultset->add(pkt->ptr, pkt->size); mybe->server_myds->resultset_length+=pkt->size; @@ -835,6 +955,46 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t CurrentQuery.end_time=thread->curtime; CurrentQuery.query_parser_update_counters(); } +} else { + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==true) { + fprintf(stderr,"EOF: %d %d\n", mybe->server_myds->myprot.current_PreStmt->pending_num_params, mybe->server_myds->myprot.current_PreStmt->pending_num_columns); + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_params; + } else { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_columns; + } + } + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE; +// } else { +// mybe->server_myds->myconn->processing_prepared_statement_prepare=false; +// client_myds->myconn->processing_prepared_statement_prepare=false; +// mybe->server_myds->DSS=STATE_READY; +// status=WAITING_CLIENT_DATA; +// client_myds->DSS=STATE_SLEEP; + } + } else { + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + mybe->server_myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } else { + //mybe->server_myds->myconn->processing_prepared_statement_execute==true + if ((c==0xfe && pkt->size < 13) || c==0xff) { + mybe->server_myds->myconn->processing_prepared_statement_execute=false; + client_myds->myconn->processing_prepared_statement_execute=false; + mybe->server_myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } +} } @@ -886,6 +1046,8 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE( myconn->options.compression_min_length=0; myconn->set_status_compression(false); } + // set prepared statement processing + mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; } else { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for backend: disconnecting\n"); l_free(pkt->size,pkt->ptr); @@ -923,7 +1085,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( //server_myds->myconn->userinfo->set(client_myds->myconn->userinfo); status=WAITING_CLIENT_DATA; client_myds->DSS=STATE_CLIENT_AUTH_OK; - MySQL_Connection *myconn=client_myds->myconn; + //MySQL_Connection *myconn=client_myds->myconn; /* // enable compression if (myconn->options.server_capabilities & CLIENT_COMPRESS) { @@ -1019,6 +1181,36 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(PtrSize_t *pkt) { + if (admin==false) { + client_myds->myconn->has_prepared_statement=true; + client_myds->myconn->processing_prepared_statement_prepare=true; + mybe=find_or_create_backend(default_hostgroup); + mybe->server_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } else { + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + } +} + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(PtrSize_t *pkt) { + if (admin==false) { + //client_myds->myconn->has_prepared_statement_execute=true; + client_myds->myconn->processing_prepared_statement_execute=true; + mybe=find_or_create_backend(default_hostgroup); + mybe->server_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } else { + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + } +} + #ifdef DEBUG void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_debug(PtrSize_t *pkt) { proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got ProxySQL dbg packet\n"); @@ -1033,7 +1225,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C char *query_no_space=(char *)l_alloc(query_length); memcpy(query_no_space,query,query_length); - unsigned int query_no_space_length=remove_spaces(query_no_space); + /*unsigned int query_no_space_length=*/remove_spaces(query_no_space); if (!strcasecmp(query_no_space,"DBG THREAD STATUS")) { result = thread->SQL3_Thread_status(this); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 039ace019..3b60761a3 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -106,6 +106,9 @@ MySQL_Connection::MySQL_Connection() { myds=NULL; inserted_into_pool=0; reusable=false; + has_prepared_statement=false; + processing_prepared_statement_prepare=false; + processing_prepared_statement_execute=false; parent=NULL; userinfo=new MySQL_Connection_userinfo(); fd=-1;