From 6c44e9d96fed581f4fbe1de8493e05901bcb950a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 16 Jul 2015 20:08:32 +0000 Subject: [PATCH] Further attempt to use MariaDB Client Library --- include/MySQL_Data_Stream.h | 5 + include/MySQL_Protocol.h | 2 +- include/MySQL_Session.h | 1 + include/mysql_connection.h | 10 + include/proxysql_structs.h | 7 + lib/MySQL_Protocol.cpp | 2 +- lib/MySQL_Session.cpp | 121 ++- lib/MySQL_Session.cpp_old | 1677 +++++++++++++++++++++++++++++++++++ lib/mysql_connection.cpp | 63 ++ lib/mysql_data_stream.cpp | 6 +- src/proxysql.cfg | 2 +- 11 files changed, 1891 insertions(+), 5 deletions(-) create mode 100644 lib/MySQL_Session.cpp_old diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index 5eaff6be0..bbe761a27 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -84,6 +84,11 @@ class MySQL_Data_Stream bool net_failure; + struct { + char *ptr; + unsigned int size; + } mysql_real_query; + MySQL_Data_Stream(); ~MySQL_Data_Stream(); diff --git a/include/MySQL_Protocol.h b/include/MySQL_Protocol.h index 08166de11..2c3019cde 100644 --- a/include/MySQL_Protocol.h +++ b/include/MySQL_Protocol.h @@ -65,7 +65,7 @@ class MySQL_Protocol { bool generate_pkt_column_count(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint64_t count); // bool generate_pkt_field(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue); bool generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue); - bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, int *fieldslen, char **fieldstxt); + bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt); // bool generate_pkt_initial_handshake(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len); bool generate_pkt_initial_handshake(bool send, void **ptr, unsigned int *len); // bool generate_pkt_handshake_response(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len); diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index ed0318dad..b66ffee24 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -129,6 +129,7 @@ class MySQL_Session MySQL_Backend * find_or_create_backend(int, MySQL_Data_Stream *_myds=NULL); void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *); + void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MYSQL_RES *result, MySQL_Protocol *myprot); SQLite3_result * SQL3_Session_status(); void reset_all_backends(); diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 3731f276c..e7211945e 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -41,6 +41,11 @@ class MySQL_Connection { MDB_ASYNC_ST async_state_machine; // Async state machine MYSQL *mysql; MYSQL *ret_mysql; + MYSQL_RES *mysql_result; + struct { + char *ptr; + unsigned long length; + } query; struct { uint32_t max_allowed_pkt; uint32_t server_capabilities; @@ -84,8 +89,13 @@ class MySQL_Connection { void ping_cont(short event); void set_names_start(); void set_names_cont(short event); + void real_query_start(); + void real_query_cont(short event); + void store_result_start(); + void store_result_cont(short event); void initdb_start(); void initdb_cont(short event); + void set_query(char *stmt, unsigned long length); MDB_ASYNC_ST handler(short event); void next_event(MDB_ASYNC_ST new_st); }; diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 7246c2579..65f442f14 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -27,6 +27,11 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine ASYNC_SET_NAMES_END, ASYNC_SET_NAMES_SUCCESSFUL, ASYNC_SET_NAMES_FAILED, + ASYNC_QUERY_START, + ASYNC_QUERY_CONT, + ASYNC_QUERY_END, + ASYNC_STORE_RESULT_START, + ASYNC_STORE_RESULT_CONT, ASYNC_INITDB_START, ASYNC_INITDB_CONT, ASYNC_INITDB_END, @@ -82,6 +87,7 @@ enum session_status { PINGING_SERVER, WAITING_CLIENT_DATA, WAITING_SERVER_DATA, + PROCESSING_QUERY, CHANGING_SCHEMA, CHANGING_CHARSET, CHANGING_USER_CLIENT, @@ -118,6 +124,7 @@ enum mysql_data_stream_status { STATE_MARIADB_PING, STATE_MARIADB_SET_NAMES, STATE_MARIADB_INITDB, + STATE_MARIADB_QUERY, STATE_MARIADB_END, // dummy state STATE_END diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index 7fc0a5bd0..fd27aedc9 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -1102,7 +1102,7 @@ bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len } -bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, int *fieldslen, char **fieldstxt) { +bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) { int col=0; int rowlen=0; for (col=0; colserver_myds) { // server_myds=mybe->server_myds; //} + + +#ifdef EXPMARIA + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n"); + mybe->server_myds->mysql_real_query.size=pkt.size-5; + mybe->server_myds->mysql_real_query.ptr=(char *)malloc(pkt.size-5); + memcpy(mybe->server_myds->mysql_real_query.ptr,(char *)pkt.ptr+5,pkt.size-5); + l_free(pkt.size,pkt.ptr); +#else mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); +#endif /* EXPMARIA */ + client_myds->setDSS_STATE_QUERY_SENT_NET(); } else { // this is processed by the admin module @@ -365,6 +377,7 @@ __get_a_backend: if (status!=FAST_FORWARD && client_myds->DSS==STATE_QUERY_SENT_NET) { // the client has completely sent the query, now we should handle it server side // + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET\n", this); if (mybe && mybe->server_myds->DSS==STATE_NOT_INITIALIZED) { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_NOT_INITIALIZED\n", this); // DSS is STATE_NOT_INITIALIZED. It means we are not connected to any server @@ -423,17 +436,30 @@ __get_a_backend: handler___client_DSS_QUERY_SENT___send_INIT_DB_to_backend(); } } else { +#ifndef EXPMARIA if (client_myds->myconn->options.charset!=mybe->server_myds->myconn->options.charset /* FIXME: this was for debugging only */ /*|| rand()%3==0 */) { handler___client_DSS_QUERY_SENT___send_SET_NAMES_to_backend(); } else { +#endif //server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); +#ifdef EXPMARIA + MySQL_Data_Stream *myds=mybe->server_myds; + myds->DSS=STATE_MARIADB_QUERY; + status=PROCESSING_QUERY; + myds->myconn->async_state_machine=ASYNC_QUERY_START; + myds->myconn->set_query(myds->mysql_real_query.ptr,myds->mysql_real_query.size); + myds->myconn->handler(0); +#else mybe->server_myds->DSS=STATE_QUERY_SENT_DS; // if (client_myds->myconn->processing_prepared_statement) { mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; mybe->server_myds->myconn->processing_prepared_statement_execute=client_myds->myconn->processing_prepared_statement_execute; // } status=WAITING_SERVER_DATA; +#endif /* EXPMARIA */ +#ifndef EXPMARIA } +#endif } } // } TRY #1 @@ -485,6 +511,33 @@ __exit_DSS__STATE_NOT_INITIALIZED: // } } break; + case PROCESSING_QUERY: + if (myds->revents) { + myconn->handler(myds->revents); + if (myconn->async_state_machine==ASYNC_QUERY_END) { + +// status=WAITING_SERVER_DATA; +// myds->DSS=STATE_READY; + /* multi-plexing attempt */ + if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + myds->myconn->last_time_used=thread->curtime; + MyHGM->push_MyConn_to_pool(myds->myconn); + //MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); + //mybe->server_myds->myconn=NULL; + myds->detach_connection(); + myds->unplug_backend(); + } + /* multi-plexing attempt */ + //status=NONE; + MySQL_Result_to_MySQL_wire(myconn->mysql,myconn->mysql_result,&client_myds->myprot); + mysql_free_result(myconn->mysql_result); + myconn->mysql_result=NULL; + myds->DSS=STATE_NOT_INITIALIZED; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + } + break; case PINGING_SERVER: if (myds->revents) { myconn->handler(myds->revents); @@ -529,8 +582,16 @@ __exit_DSS__STATE_NOT_INITIALIZED: if (myds->revents) { myconn->handler(myds->revents); if (myconn->async_state_machine==ASYNC_SET_NAMES_SUCCESSFUL) { +#ifdef EXPMARIA + myds->DSS=STATE_MARIADB_QUERY; + status=PROCESSING_QUERY; + myds->myconn->async_state_machine=ASYNC_QUERY_START; + myds->myconn->set_query(myds->mysql_real_query.ptr,myds->mysql_real_query.size); + myds->myconn->handler(0); +#else myds->DSS=STATE_READY; status=WAITING_SERVER_DATA; +#endif /* EXPMARIA */ unsigned int k; PtrSize_t pkt2; for (k=0; kserver_myds->PSarrayOUTpending->len;) { @@ -1456,6 +1517,64 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_CHANGE_USER_to_backen } +void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MYSQL_RES *result, MySQL_Protocol *myprot) { + assert(myprot); + MySQL_Data_Stream *myds=myprot->get_myds(); + myds->DSS=STATE_QUERY_SENT_DS; + int sid=1; + unsigned int num_fields=mysql_field_count(mysql); + unsigned int num_rows; + if (result) { + // we have a result set, this should be a SELECT statement with result + assert(result->current_field==0); + myprot->generate_pkt_column_count(true,NULL,NULL,sid,num_fields); sid++; + for (unsigned int i=0; igenerate_pkt_field(true,NULL,NULL,sid,field->db,field->table,field->org_table,field->name,field->org_name,field->charsetnr,field->length,field->type,field->flags,field->decimals,false,0,NULL); + sid++; + } + myds->DSS=STATE_COLUMN_DEFINITION; + num_rows=mysql_num_rows(result); + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,0); sid++; + //char **p=(char **)malloc(sizeof(char*)*num_fields); + //int *l=(int *)malloc(sizeof(int*)*num_fields); + //p[0]="column test"; + for (unsigned int r=0; rrows[r]->sizes[i]; +// p[i]=result->rows[r]->fields[i]; +// } + myprot->generate_pkt_row(true,NULL,NULL,sid,num_fields,lengths,row); sid++; + } + myds->DSS=STATE_ROW; + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,2); sid++; + myds->DSS=STATE_SLEEP; + //free(l); + //free(p); + } else { // no result set + if (num_fields) { + num_rows = mysql_affected_rows(mysql); + myprot->generate_pkt_OK(true,NULL,NULL,sid,num_rows,mysql->insert_id,0,mysql->warning_count,mysql->info); + } else { + // error + char sqlstate[10]; + sprintf(sqlstate,"#%s",mysql_sqlstate(mysql)); + myprot->generate_pkt_ERR(true,NULL,NULL,sid,mysql_errno(mysql),sqlstate,mysql_error(mysql)); + } +// if (error) { +// // there was an error +// myprot->generate_pkt_ERR(true,NULL,NULL,sid,1045,(char *)"#28000",error); +// } else { +// // no error, DML succeeded +// myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,0,0,NULL); +// } +// myds->DSS=STATE_SLEEP; + } +} + void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int affected_rows, MySQL_Protocol *myprot) { assert(myprot); MySQL_Data_Stream *myds=myprot->get_myds(); @@ -1472,7 +1591,7 @@ void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int af myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,0); sid++; char **p=(char **)malloc(sizeof(char*)*result->columns); - int *l=(int *)malloc(sizeof(int*)*result->columns); + unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*result->columns); //p[0]="column test"; for (int r=0; rrows_count; r++) { for (int i=0; icolumns; i++) { diff --git a/lib/MySQL_Session.cpp_old b/lib/MySQL_Session.cpp_old new file mode 100644 index 000000000..62419899d --- /dev/null +++ b/lib/MySQL_Session.cpp_old @@ -0,0 +1,1677 @@ +#include "proxysql.h" +#include "cpp.h" + +//#define EXPMARIA + +extern Query_Processor *GloQPro; +extern Query_Cache *GloQC; +extern ProxySQL_Admin *GloAdmin; +extern MySQL_Threads_Handler *GloMTH; + +Query_Info::Query_Info() { + MyComQueryCmd=MYSQL_COM_QUERY___NONE; + QueryPointer=NULL; + QueryLength=0; + QueryParserArgs=NULL; +} + +Query_Info::~Query_Info() { + if (QueryParserArgs) { + GloQPro->query_parser_free(QueryParserArgs); + } + if (QueryPointer) { + l_free(QueryLength+1,QueryPointer); + } +} + +void Query_Info::init(unsigned char *_p, int len, bool mysql_header) { + QueryLength=(mysql_header ? len-5 : len); + QueryPointer=(unsigned char *)l_alloc(QueryLength+1); + memcpy(QueryPointer,(mysql_header ? _p+5 : _p),QueryLength); + QueryPointer[QueryLength]=0; + //QueryPointer=(mysql_header ? _p+5 : _p); + QueryParserArgs=NULL; + MyComQueryCmd=MYSQL_COM_QUERY_UNKNOWN; +} + +void Query_Info::query_parser_init() { + QueryParserArgs=GloQPro->query_parser_init((char *)QueryPointer,QueryLength,0); +} + +enum MYSQL_COM_QUERY_command Query_Info::query_parser_command_type() { + MyComQueryCmd=GloQPro->query_parser_command_type(QueryParserArgs); + return MyComQueryCmd; +} + +void Query_Info::query_parser_free() { + GloQPro->query_parser_free(QueryParserArgs); + QueryParserArgs=NULL; +} + +unsigned long long Query_Info::query_parser_update_counters() { + if (MyComQueryCmd==MYSQL_COM_QUERY___NONE) return 0; + unsigned long long ret=GloQPro->query_parser_update_counters(MyComQueryCmd, end_time-start_time); + MyComQueryCmd=MYSQL_COM_QUERY___NONE; + l_free(QueryLength+1,QueryPointer); + QueryPointer=NULL; + QueryLength=0; + return ret; +} + + +void * MySQL_Session::operator new(size_t size) { + return l_alloc(size); +} + +void MySQL_Session::operator delete(void *ptr) { + l_free(sizeof(MySQL_Session),ptr); +} + + +MySQL_Session::MySQL_Session() { + pause=0; + pause_until=0; + status=NONE; + qpo=NULL; + command_counters=new StatCounters(15,10,false); + healthy=1; + admin=false; + connections_handler=false; + stats=false; + default_schema=NULL; + schema_locked=false; + session_fast_forward=false; + admin_func=NULL; + //client_fd=0; + //server_fd=0; + client_myds=NULL; + //server_myds=NULL; + to_process=0; + mybe=NULL; + mybes= new (true) PtrArray(4,true); + + current_hostgroup=-1; + default_hostgroup=-1; + transaction_persistent=false; + active_transactions=0; +} + +MySQL_Session::~MySQL_Session() { + if (client_myds) { + delete client_myds; + } + //if (server_myds) { + // delete server_myds; + //} + reset_all_backends(); + delete mybes; + if (default_schema) { + int s=strlen(default_schema); + l_free(s+1,default_schema); + } + proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Shutdown Session %p\n" , this->thread, this, this); + delete command_counters; +} + + +// scan the pointer array of mysql backends (mybes) looking for a backend for the specified hostgroup_id +MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) { + MySQL_Backend *_mybe; + unsigned int i; + for (i=0; i < mybes->len; i++) { + _mybe=(MySQL_Backend *)mybes->index(i); + if (_mybe->hostgroup_id==hostgroup_id) { + return _mybe; + } + } + return NULL; // NULL = backend not found +}; + + +MySQL_Backend * MySQL_Session::create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) { + MySQL_Backend *_mybe=new MySQL_Backend(); + proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe); + _mybe->hostgroup_id=hostgroup_id; + if (_myds) { + _mybe->server_myds=_myds; + } else { + _mybe->server_myds = new MySQL_Data_Stream(); + //_mybe->server_myds->myconn = new MySQL_Connection(); + _mybe->server_myds->DSS=STATE_NOT_INITIALIZED; + _mybe->server_myds->init(MYDS_BACKEND_NOT_CONNECTED, this, 0); + } + mybes->add(_mybe); + return _mybe; +}; + +MySQL_Backend * MySQL_Session::find_or_create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) { + MySQL_Backend *_mybe=find_backend(hostgroup_id); + proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe); + return ( _mybe ? _mybe : create_backend(hostgroup_id, _myds) ); +}; + +void MySQL_Session::reset_all_backends() { + MySQL_Backend *mybe; + while(mybes->len) { + mybe=(MySQL_Backend *)mybes->remove_index_fast(0); + mybe->reset(); + delete mybe; + } +}; + +void MySQL_Session::writeout() { + if (client_myds) client_myds->array2buffer_full(); + if (mybe && mybe->server_myds && mybe->server_myds->myds_type==MYDS_BACKEND) { + if (admin==false) { + if (mybe->server_myds->net_failure==false) { + //if (mybe->server_myds->poll_fds_idx>-1 && (mybe->server_myds->mypolls->fds[mybe->server_myds->poll_fds_idx].revents & POLLOUT)) { + if (mybe->server_myds->poll_fds_idx>-1) { // NOTE: attempt to force writes + mybe->server_myds->array2buffer_full(); + } + } else { + mybe->server_myds->move_from_OUT_to_OUTpending(); + } + } else { + mybe->server_myds->array2buffer_full(); + } + } + // FIXME: experimental + //if (client_myds) client_myds->set_pollout(); + //if (server_myds) server_myds->set_pollout(); + if (client_myds) client_myds->write_to_net_poll(); + //if (server_myds && server_myds->net_failure==false) server_myds->write_to_net_poll(); + if (mybe) { + if (mybe->server_myds) mybe->server_myds->write_to_net_poll(); + } + proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this); +} + + +int MySQL_Session::handler() { + bool wrong_pass=false; + if (to_process==0) return 0; // this should be redundant if the called does the same check + proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Processing session %p\n" , this->thread, this, this); + PtrSize_t pkt; + unsigned int j; + unsigned char c; + + if (session_fast_forward==false) { + if (client_myds==NULL) { + // if we are here, probably we are trying to ping backends + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Processing session %p without client_myds\n", this); + assert(mybe); + assert(mybe->server_myds); +// if (mybe->server_myds->DSS==STATE_PING_SENT_NET) { +// assert(mybe->server_myds->myconn); +// proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Processing session %p without client_myds . server_myds=%p , myconn=%p , fd=%d , timeout=%llu , curtime=%llu\n", this, mybe->server_myds , mybe->server_myds->myconn, mybe->server_myds->myconn->fd , mybe->server_myds->timeout , thread->curtime); +// if (mybe->server_myds->timeout < thread->curtime) { +// MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); +// mybe->server_myds->myconn=NULL; +// mybe->server_myds->fd=-1; +// thread->mypolls.remove_index_fast(mybe->server_myds->poll_fds_idx); +// return -1; +// } +// } + goto __exit_DSS__STATE_NOT_INITIALIZED; + } + } + for (j=0; jPSarrayIN->len;) { + client_myds->PSarrayIN->remove_index(0,&pkt); + //prot.parse_mysql_pkt(&pkt,client_myds); + + switch (status) { +/* + case CHANGING_USER_CLIENT: + switch (client_myds->DSS) { + case STATE_CLIENT_HANDSHAKE: + handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(&pkt, &wrong_pass); + break; + default: + assert(0); + } + break; +*/ + case CONNECTING_CLIENT: + switch (client_myds->DSS) { + case STATE_SERVER_HANDSHAKE: + handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(&pkt, &wrong_pass); + break; + case STATE_SSL_INIT: + handler___status_CONNECTING_CLIENT___STATE_SSL_INIT(&pkt); + break; + default: + assert(0); // FIXME: this should become close connection + } + break; + + case WAITING_CLIENT_DATA: + switch (client_myds->DSS) { + case STATE_SLEEP: + command_counters->incr(thread->curtime/1000000); + current_hostgroup=default_hostgroup; + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_CLIENT_DATA - STATE_SLEEP\n"); + //unsigned char c; + c=*((unsigned char *)pkt.ptr+sizeof(mysql_hdr)); + switch ((enum_mysql_command)c) { + case _MYSQL_COM_QUERY: +#ifdef DEBUG + if (mysql_thread___session_debug) { + if ((pkt.size>9) && strncasecmp("dbg ",(const char *)pkt.ptr+sizeof(mysql_hdr)+1,4)==0) { + if (mysql_thread___commands_stats==true) { + CurrentQuery.init((unsigned char *)pkt.ptr,pkt.size,true); + CurrentQuery.start_time=thread->curtime; + CurrentQuery.query_parser_init(); + CurrentQuery.query_parser_command_type(); + CurrentQuery.query_parser_free(); + } + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_debug(&pkt); + if (mysql_thread___commands_stats==true) { + CurrentQuery.end_time=thread->curtime; + CurrentQuery.query_parser_update_counters(); + } + break; + } + } +#endif /* DEBUG */ + if (admin==false) { + if (session_fast_forward==false) { + if (mysql_thread___commands_stats==true) { + CurrentQuery.init((unsigned char *)pkt.ptr,pkt.size,true); + CurrentQuery.start_time=thread->curtime; + CurrentQuery.query_parser_init(); + CurrentQuery.query_parser_command_type(); + CurrentQuery.query_parser_free(); + //client_myds->myprot.process_pkt_COM_QUERY((unsigned char *)pkt.ptr,pkt.size); + } + } + qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,false); + if (qpo) { + bool rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); + if (rc_break==true) { break; } + } + mybe=find_or_create_backend(current_hostgroup); + //if (server_myds!=mybe->server_myds) { + // server_myds=mybe->server_myds; + //} + + +#ifdef EXPMARIA + mybe->server_myds->mysql_real_query.size=pkt.size-5; + mybe->server_myds->mysql_real_query.ptr=(char *)malloc(pkt.size-5); + memcpy(mybe->server_myds->mysql_real_query.ptr,(char *)pkt.ptr+5,pkt.size-5); + l_free(pkt.size,pkt.ptr); +#else + mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); +#endif /* EXPMARIA */ + + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } else { + // this is processed by the admin module + admin_func(this, GloAdmin, &pkt); + l_free(pkt.size,pkt.ptr); + } + break; + case _MYSQL_COM_CHANGE_USER: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_CHANGE_USER(&pkt, &wrong_pass); + break; + case _MYSQL_COM_STMT_PREPARE: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(&pkt); + break; + case _MYSQL_COM_STMT_EXECUTE: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(&pkt); + break; + case _MYSQL_COM_STMT_CLOSE: + mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); + break; + case _MYSQL_COM_QUIT: + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n"); + l_free(pkt.size,pkt.ptr); + return -1; + break; + case _MYSQL_COM_PING: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PING(&pkt); + break; + case _MYSQL_COM_SET_OPTION: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_SET_OPTION(&pkt); + break; + case _MYSQL_COM_STATISTICS: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STATISTICS(&pkt); + break; + case _MYSQL_COM_INIT_DB: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_INIT_DB(&pkt); + break; + case _MYSQL_COM_FIELD_LIST: + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_FIELD_LIST(&pkt); + break; + default: + assert(0); + break; + } + break; + default: + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_CLIENT_DATA - STATE_UNKNOWN\n"); + assert(0); // FIXME: this should become close connection + } + + break; + case FAST_FORWARD: + mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); + break; + case NONE: + default: + assert(0); + break; + } + } + + + +__get_a_backend: + + if (client_myds==NULL) { + goto __exit_DSS__STATE_NOT_INITIALIZED; + } + + //if ((client_myds->DSS==STATE_QUERY_SENT_NET && session_fast_forward==false) || session_fast_forward==true) { + if (status!=FAST_FORWARD && client_myds->DSS==STATE_QUERY_SENT_NET) { + // the client has completely sent the query, now we should handle it server side + // + if (mybe && mybe->server_myds->DSS==STATE_NOT_INITIALIZED) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_NOT_INITIALIZED\n", this); + // DSS is STATE_NOT_INITIALIZED. It means we are not connected to any server + // try to connect + pending_connect=1; + unsigned long long curtime=monotonic_time(); + + // if DSS==STATE_NOT_INITIALIZED , we expect few pointers to be NULL . If it is not null, we have a bug + //assert(server_myds->myconn==NULL); + assert(mybe->server_myds->myconn==NULL); + + handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection(); + if (mybe->server_myds->myconn==NULL) { + pause_until=thread->curtime+100*1000; + goto __exit_DSS__STATE_NOT_INITIALIZED; + } + //mybe->server_myds->myprot.init(&mybe->server_myds, mybe->myconn->userinfo, this); + mybe->server_myds->myprot.init(&mybe->server_myds, mybe->server_myds->myconn->userinfo, this); + if (client_myds->myconn->has_prepared_statement==true) { + mybe->server_myds->myconn->has_prepared_statement=true; + mybe->server_myds->myconn->reusable=false; + } + // FIXME : handle missing connection from connection pool + // FIXME : perhaps is a goto __exit_DSS__STATE_NOT_INITIALIZED after setting time wait + + thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, curtime); + + if (mybe->server_myds->DSS!=STATE_READY) { + mybe->server_myds->move_from_OUT_to_OUTpending(); + } + // END OF if (server_myds->DSS==STATE_NOT_INITIALIZED) + //} else { TRY #1 + } // TRY #1 + if (session_fast_forward==true && mybe && mybe->server_myds && mybe->server_myds->myconn) { + mybe->server_myds->myconn->reusable=false; + } + if (session_fast_forward==false) { + if (mybe && mybe->server_myds->myds_type==MYDS_BACKEND && mybe->server_myds->DSS==STATE_READY) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_READY , server_myds->myds_type==MYDS_BACKEND\n", this); + //if (strcmp(userinfo_client.schemaname,userinfo_server.schemaname)==0) { + if ( + (client_myds->myconn->userinfo->hash!=mybe->server_myds->myconn->userinfo->hash) +/* + (mybe->myconn->userinfo->schemaname==NULL) + || + strcmp(client_myds->myconn->userinfo->schemaname,mybe->myconn->userinfo->schemaname) + || + strcmp(client_myds->myconn->userinfo->username,mybe->myconn->userinfo->username) +*/ + ) { + if (strcmp(client_myds->myconn->userinfo->username,mybe->server_myds->myconn->userinfo->username)) { + // username don't match, we must change user + handler___client_DSS_QUERY_SENT___send_CHANGE_USER_to_backend(); + } else { + // we should chek that schema is different, but here we assume that if we reach here user is identical, but schema is not + handler___client_DSS_QUERY_SENT___send_INIT_DB_to_backend(); + } + } else { + if (client_myds->myconn->options.charset!=mybe->server_myds->myconn->options.charset /* FIXME: this was for debugging only */ /*|| rand()%3==0 */) { + handler___client_DSS_QUERY_SENT___send_SET_NAMES_to_backend(); + } else { + //server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); +#ifdef EXPMARIA + mybe->server_myds->DSS=STATE_MARIADB_QUERY; + status=PROCESSING_QUERY; + mybe->server_myds->myconn->async_state_machine=ASYNC_QUERY_START; + mybe->server_myds->myconn->set_query(mybe->server_myds->mysql_real_query.ptr,mybe->server_myds->mysql_real_query.size); + mybe->server_myds->myconn->handler(0); +#else + mybe->server_myds->DSS=STATE_QUERY_SENT_DS; +// if (client_myds->myconn->processing_prepared_statement) { + mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; + mybe->server_myds->myconn->processing_prepared_statement_execute=client_myds->myconn->processing_prepared_statement_execute; +// } + status=WAITING_SERVER_DATA; +#endif /* EXPMARIA */ + } + } + } + // } TRY #1 + } + } + +__exit_DSS__STATE_NOT_INITIALIZED: + + + if (mybe && mybe->server_myds) { + if (mybe->server_myds->DSS > STATE_MARIADB_BEGIN && mybe->server_myds->DSS < STATE_MARIADB_END) { + MySQL_Data_Stream *myds=mybe->server_myds; + MySQL_Connection *myconn=mybe->server_myds->myconn; + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, status=%d, server_myds->DSS==%d , revents==%d , async_state_machine=%d\n", this, status, mybe->server_myds->DSS, myds->revents, myconn->async_state_machine); +// int ms_status = 0; + switch (status) { + case CONNECTING_SERVER: +// if (myds->revents & POLLIN) ms_status |= MYSQL_WAIT_READ; +// if (myds->revents & POLLOUT) ms_status |= MYSQL_WAIT_WRITE; +// if (myds->revents & POLLPRI) ms_status |= MYSQL_WAIT_EXCEPT; +// if (ms_status) { + if (myds->revents) { + myconn->handler(myds->revents); +// myconn->async_exit_status = mysql_real_connect_cont(&myconn->ret_mysql, myconn->mysql, ms_status); +// if (myconn->async_exit_status==0) { + if (myconn->ret_mysql) { + if (myconn->async_state_machine==ASYNC_CONNECT_SUCCESSFUL) { + myds->myds_type=MYDS_BACKEND; + myds->DSS=STATE_READY; + //mybe->myconn=server_myds->myconn; + status=WAITING_SERVER_DATA; + unsigned int k; + PtrSize_t pkt2; + for (k=0; kPSarrayOUTpending->len;) { + myds->PSarrayOUTpending->remove_index(0,&pkt2); + myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); + myds->DSS=STATE_QUERY_SENT_DS; + } + } else { + assert(0); + } + //assert(0); + } else { + //assert(0); + if (myconn->async_state_machine!=ASYNC_CONNECT_CONT) { + wrong_pass=true; + } + } +// } + } + break; + case PROCESSING_QUERY: + if (myds->revents) { + myconn->handler(myds->revents); + if (myconn->async_state_machine==ASYNC_QUERY_END) { + +// status=WAITING_SERVER_DATA; +// myds->DSS=STATE_READY; + /* multi-plexing attempt */ + if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + myds->myconn->last_time_used=thread->curtime; + MyHGM->push_MyConn_to_pool(myds->myconn); + //MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); + //mybe->server_myds->myconn=NULL; + myds->detach_connection(); + myds->unplug_backend(); + } + /* multi-plexing attempt */ + //status=NONE; + MySQL_Result_to_MySQL_wire(myconn->mysql,myconn->mysql_result,&client_myds->myprot); + + myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + } + break; + case PINGING_SERVER: + if (myds->revents) { + myconn->handler(myds->revents); + if (myconn->async_state_machine==ASYNC_PING_SUCCESSFUL) { + myds->DSS=STATE_READY; + /* multi-plexing attempt */ + if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + myds->myconn->last_time_used=thread->curtime; + MyHGM->push_MyConn_to_pool(myds->myconn); + //MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); + //mybe->server_myds->myconn=NULL; + myds->detach_connection(); + myds->unplug_backend(); + } + /* multi-plexing attempt */ + status=NONE; + } + } + break; + case CHANGING_SCHEMA: + if (myds->revents) { + myconn->handler(myds->revents); + if (myconn->async_state_machine==ASYNC_INITDB_SUCCESSFUL) { + myds->DSS=STATE_READY; + status=WAITING_SERVER_DATA; + unsigned int k; + PtrSize_t pkt2; + for (k=0; kserver_myds->PSarrayOUTpending->len;) { + myds->PSarrayOUTpending->remove_index(0,&pkt2); + myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); + myds->DSS=STATE_QUERY_SENT_DS; + } + } + if (myconn->async_state_machine==ASYNC_INITDB_FAILED) { + set_unhealthy(); + myds->myconn->reusable=false; + return -1; + } + } + break; + case CHANGING_CHARSET: + if (myds->revents) { + myconn->handler(myds->revents); + if (myconn->async_state_machine==ASYNC_SET_NAMES_SUCCESSFUL) { + myds->DSS=STATE_READY; + status=WAITING_SERVER_DATA; + unsigned int k; + PtrSize_t pkt2; + for (k=0; kserver_myds->PSarrayOUTpending->len;) { + myds->PSarrayOUTpending->remove_index(0,&pkt2); + myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); + myds->DSS=STATE_QUERY_SENT_DS; + } + } + if (myconn->async_state_machine==ASYNC_SET_NAMES_FAILED) { + set_unhealthy(); + myds->myconn->reusable=false; + return -1; + } + } + break; + default: + assert(0); + break; + } + } else { + + +// } + + for (j=0; jserver_myds->PSarrayIN->len;) { + mybe->server_myds->PSarrayIN->remove_index(0,&pkt); + + switch (status) { + case WAITING_SERVER_DATA: + switch (mybe->server_myds->DSS) { +// case STATE_PING_SENT_NET: +// handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(&pkt); +// break; + + case STATE_QUERY_SENT_NET: + handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(&pkt); + break; + + case STATE_ROW: + handler___status_WAITING_SERVER_DATA___STATE_ROW(&pkt); + break; + + case STATE_EOF1: + handler___status_WAITING_SERVER_DATA___STATE_EOF1(&pkt); + break; + + case STATE_READING_COM_STMT_PREPARE_RESPONSE: + handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(&pkt); + break; + + default: + assert(0); + } + break; + +// case CHANGING_SCHEMA: +// if (handler___status_CHANGING_SCHEMA(&pkt)==false) { +// return -1; +// } +// break; + + case CHANGING_USER_SERVER: + if (handler___status_CHANGING_USER_SERVER(&pkt)==false) { + return -1; + } + break; + +// case CHANGING_CHARSET: +// if (handler___status_CHANGING_CHARSET(&pkt)==false) { +// return -1; +// } +// break; + + case FAST_FORWARD: + client_myds->PSarrayOUT->add(pkt.ptr, pkt.size); + break; + + default: + assert(0); + break; + } + + } + } + } + writeout(); + + // FIXME: see bug #211 + if ( + mybe + && + mybe->server_myds + && + mybe->server_myds->DSS==STATE_QUERY_SENT_DS + && + mybe->server_myds->PSarrayOUT->len==0 + && + mybe->server_myds->PSarrayOUTpending->len==0 + && + mybe->server_myds->net_failure==false + && + mybe->server_myds->available_data_out()==false + ) { + if (connections_handler) { + //fprintf(stderr,"time=%llu\n",monotonic_time()); + //mybe->server_myds->timeout=thread->curtime+100; + //mybe->server_myds->DSS=STATE_PING_SENT_NET; + } else { + mybe->server_myds->setDSS_STATE_QUERY_SENT_NET(); + } + } + if (mybe && mybe->server_myds) { + if (mybe->server_myds->net_failure) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess:%p , MYDS:%p , myds_type=%d, DSS=%d , myconn:%p\n" , this, mybe->server_myds , mybe->server_myds->myds_type , mybe->server_myds->DSS, mybe->server_myds->myconn); + if (( mybe->server_myds->DSS==STATE_READY || mybe->server_myds->DSS==STATE_QUERY_SENT_DS ) && mybe->server_myds->myds_type==MYDS_BACKEND) { + //mybe->server_myds->myconn=NULL; + mybe->server_myds->detach_connection(); + mybe->server_myds->DSS=STATE_NOT_INITIALIZED; + mybe->server_myds->move_from_OUT_to_OUTpending(); + if (mybe->server_myds->myconn) { + MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); + //mybe->server_myds->myconn=NULL; + mybe->server_myds->detach_connection(); + } + if (mybe->server_myds->fd) { + mybe->server_myds->shut_hard(); +// shutdown(mybe->server_myds->fd,SHUT_RDWR); +// close(mybe->server_myds->fd); + mybe->server_myds->fd=0; + thread->mypolls.remove_index_fast(mybe->server_myds->poll_fds_idx); + //server_fd=0; + } + mybe->server_myds->clean_net_failure(); + mybe->server_myds->active=1; + goto __get_a_backend; + } else { + set_unhealthy(); + } + } + } + + //writeout(); + +/* + if ( // FIXME: this implementation is horrible + (server_myds ? server_myds->PSarrayIN->len==0 : 1 ) && + (server_myds ? server_myds->PSarrayOUT->len==0 : 1 ) && + (client_myds ? client_myds->PSarrayIN->len==0 : 1 ) && + (client_myds ? client_myds->PSarrayOUT->len==0 : 1 ) + ) + { + to_process=0; + } +*/ + if (wrong_pass==true) { + client_myds->array2buffer_full(); + client_myds->write_to_net(); + return -1; + } + return 0; +} + + +//bool MySQL_Session::handler___status_CHANGING_SCHEMA(PtrSize_t *pkt) { +// proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_SCHEMA - UNKNWON\n"); +// if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) { +// l_free(pkt->size,pkt->ptr); +// mybe->server_myds->DSS=STATE_READY; +// //mybe->myconn=server_myds->myconn; +// status=WAITING_SERVER_DATA; +// unsigned int k; +// PtrSize_t pkt2; +// for (k=0; kserver_myds->PSarrayOUTpending->len;) { +// mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2); +// mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); +// mybe->server_myds->DSS=STATE_QUERY_SENT_DS; +// } +// // set prepared statement processing +// mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; +// return true; +// } else { +// l_free(pkt->size,pkt->ptr); +// set_unhealthy(); +// //mybe->myconn=server_myds->myconn; +// // if we reach here, server_myds->DSS should be STATE_QUERY_SENT , therefore the connection to the backend should be dropped anyway +// // although we enforce this here +// mybe->server_myds->myconn->reusable=false; +// return false; +// } +// return false; +//} + +bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_USER_SERVER - UNKNWON\n"); + if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) { + l_free(pkt->size,pkt->ptr); + mybe->server_myds->DSS=STATE_READY; + //mybe->myconn=server_myds->myconn; + status=WAITING_SERVER_DATA; + unsigned int k; + PtrSize_t pkt2; + for (k=0; kserver_myds->PSarrayOUTpending->len;) { + mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2); + mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); + mybe->server_myds->DSS=STATE_QUERY_SENT_DS; + } + // set prepared statement processing + mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; + return true; + } else { + l_free(pkt->size,pkt->ptr); + set_unhealthy(); + //mybe->myconn=server_myds->myconn; + // if we reach here, server_myds->DSS should be STATE_QUERY_SENT , therefore the connection to the backend should be dropped anyway + // although we enforce this here + mybe->server_myds->myconn->reusable=false; + return false; + } + return false; +} + +//bool MySQL_Session::handler___status_CHANGING_CHARSET(PtrSize_t *pkt) { +// proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_CHARSET - UNKNWON\n"); +// if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) { +// l_free(pkt->size,pkt->ptr); +// mybe->server_myds->DSS=STATE_READY; +// //mybe->myconn=server_myds->myconn; +// status=WAITING_SERVER_DATA; +// unsigned int k; +// PtrSize_t pkt2; +// for (k=0; kserver_myds->PSarrayOUTpending->len;) { +// mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2); +// mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); +// mybe->server_myds->DSS=STATE_QUERY_SENT_DS; +// } +// // set prepared statement processing +// mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare; +// return true; +// } else { +// l_free(pkt->size,pkt->ptr); +// set_unhealthy(); +// //mybe->myconn=server_myds->myconn; +// // if we reach here, server_myds->DSS should be STATE_QUERY_SENT , therefore the connection to the backend should be dropped anyway +// // although we enforce this here +// mybe->server_myds->myconn->reusable=false; +// return false; +// } +// return false; +//} + + +//void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSize_t *pkt) { +// proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_SERVER_DATA - STATE_PING_SENT\n"); +// unsigned char c; +// c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); +// if (c==0 || c==0xff) { +// mybe->server_myds->DSS=STATE_READY; +// /* multi-plexing attempt */ +// if (c==0) { +// mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size); +// if ((mybe->server_myds->myconn->reusable==true) && ((mybe->server_myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { +// mybe->server_myds->myconn->last_time_used=thread->curtime; +// MyHGM->push_MyConn_to_pool(mybe->server_myds->myconn); +// //MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); +// //mybe->server_myds->myconn=NULL; +// mybe->server_myds->detach_connection(); +// mybe->server_myds->unplug_backend(); +// } +// } +// /* multi-plexing attempt */ +// status=NONE; +// } +// l_free(pkt->size,pkt->ptr); +//} + +void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrSize_t *pkt) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_SERVER_DATA - STATE_QUERY_SENT\n"); + unsigned char c; + c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==false && mybe->server_myds->myconn->processing_prepared_statement_execute==false) { + if (c==0 || c==0xff) { + mybe->server_myds->DSS=STATE_READY; + /* multi-plexing attempt */ + if (c==0) { + mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size); + if ((mybe->server_myds->myconn->reusable==true) && ((mybe->server_myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + mybe->server_myds->myconn->last_time_used=thread->curtime; + MyHGM->push_MyConn_to_pool(mybe->server_myds->myconn); + //MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); + mybe->server_myds->myconn=NULL; + mybe->server_myds->unplug_backend(); + } + } + /* multi-plexing attempt */ + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + if (mysql_thread___commands_stats==true) { + CurrentQuery.end_time=thread->curtime; + CurrentQuery.query_parser_update_counters(); + } + } else { + // this should be a result set + if (qpo && qpo->cache_ttl>0) { + mybe->server_myds->resultset->add(pkt->ptr, pkt->size); + mybe->server_myds->resultset_length+=pkt->size; + } else { + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } + mybe->server_myds->DSS=STATE_ROW; // FIXME: this is catch all for now + } + } else { + // mybe->server_myds->myconn->processing_prepared_statement_prepare==true + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==true) { + switch (c) { + case 0xff: + // ERR packet , send it to client + mybe->server_myds->DSS=STATE_READY; + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + break; + case 0x00: + if (mybe->server_myds->myprot.current_PreStmt) delete mybe->server_myds->myprot.current_PreStmt; + mybe->server_myds->myprot.current_PreStmt=new MySQL_Prepared_Stmt_info((unsigned char *)pkt->ptr, pkt->size); + if (mybe->server_myds->myprot.current_PreStmt->num_columns+mybe->server_myds->myprot.current_PreStmt->num_params) { + mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE; + } else { + mybe->server_myds->DSS=STATE_READY; + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + break; + default: + assert(0); + break; + } + } else { + // mybe->server_myds->myconn->processing_prepared_statement_execute==true + switch (c) { + case 0x00: + // OK packet , send it to client + case 0xff: + // ERR packet , send it to client + mybe->server_myds->DSS=STATE_READY; + mybe->server_myds->myconn->processing_prepared_statement_execute=false; + client_myds->myconn->processing_prepared_statement_execute=false; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + break; + default: + mybe->server_myds->DSS=STATE_ROW; // FIXME: this is catch all for now + //assert(0); + break; + } + // always send to client + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } + } +} + +void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *pkt) { + unsigned char c; + c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + + //fprintf(stderr,"%d %d\n", mybe->server_myds->myprot.current_PreStmt->pending_num_params, mybe->server_myds->myprot.current_PreStmt->pending_num_columns); + if (c==0xfe && pkt->size < 13) { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + mybe->server_myds->DSS=STATE_EOF1; + } else { + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + mybe->server_myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + } else { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_params; + } else { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_columns; + } + } + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); +} + +void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *pkt) { + unsigned char c; + c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + if (c==0xfe && pkt->size < 13) { + mybe->server_myds->DSS=STATE_EOF1; + } + if (qpo && qpo->cache_ttl>0) { + mybe->server_myds->resultset->add(pkt->ptr, pkt->size); + mybe->server_myds->resultset_length+=pkt->size; + } else { + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } +} + + +void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *pkt) { + unsigned char c; + c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr)); + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==false && mybe->server_myds->myconn->processing_prepared_statement_execute==false) +{ + if (qpo && qpo->cache_ttl>0) { + mybe->server_myds->resultset->add(pkt->ptr, pkt->size); + mybe->server_myds->resultset_length+=pkt->size; + } else { + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } + if ((c==0xfe && pkt->size < 13) || c==0xff) { + mybe->server_myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + + + /* multi-plexing attempt */ + if (c==0xfe) { + mybe->server_myds->myprot.process_pkt_EOF((unsigned char *)pkt->ptr,pkt->size); + //fprintf(stderr,"hid=%d status=%d\n", mybe->hostgroup_id, server_myds->myprot.prot_status); + if ((mybe->server_myds->myconn->reusable==true) && ((mybe->server_myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { + mybe->server_myds->myconn->last_time_used=thread->curtime; + MyHGM->push_MyConn_to_pool(mybe->server_myds->myconn); + //MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn);; + mybe->server_myds->myconn=NULL; + mybe->server_myds->unplug_backend(); + } + } + /* multi-plexing attempt */ + + if (qpo) { + if (qpo->cache_ttl>0) { // Fixed bug #145 + client_myds->PSarrayOUT->copy_add(mybe->server_myds->resultset,0,mybe->server_myds->resultset->len); + unsigned char *aa=mybe->server_myds->resultset2buffer(false); + while (mybe->server_myds->resultset->len) mybe->server_myds->resultset->remove_index(mybe->server_myds->resultset->len-1,NULL); + GloQC->set((unsigned char *)client_myds->query_SQL,strlen((char *)client_myds->query_SQL)+1,aa,mybe->server_myds->resultset_length,30); + l_free(mybe->server_myds->resultset_length,aa); + mybe->server_myds->resultset_length=0; + l_free(strlen((char *)client_myds->query_SQL)+1,client_myds->query_SQL); + } + GloQPro->delete_QP_out(qpo); + qpo=NULL; + } + if (mysql_thread___commands_stats==true) { + CurrentQuery.end_time=thread->curtime; + CurrentQuery.query_parser_update_counters(); + } + } +} else { + if (mybe->server_myds->myconn->processing_prepared_statement_prepare==true) { +// fprintf(stderr,"EOF: %d %d\n", mybe->server_myds->myprot.current_PreStmt->pending_num_params, mybe->server_myds->myprot.current_PreStmt->pending_num_columns); + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_params; + } else { + if (mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + --mybe->server_myds->myprot.current_PreStmt->pending_num_columns; + } + } + if (mybe->server_myds->myprot.current_PreStmt->pending_num_params+mybe->server_myds->myprot.current_PreStmt->pending_num_columns) { + mybe->server_myds->DSS=STATE_READING_COM_STMT_PREPARE_RESPONSE; + } + } else { + mybe->server_myds->myconn->processing_prepared_statement_prepare=false; + client_myds->myconn->processing_prepared_statement_prepare=false; + mybe->server_myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } else { + //mybe->server_myds->myconn->processing_prepared_statement_execute==true + if ((c==0xfe && pkt->size < 13) || c==0xff) { + mybe->server_myds->myconn->processing_prepared_statement_execute=false; + client_myds->myconn->processing_prepared_statement_execute=false; + mybe->server_myds->DSS=STATE_READY; + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } + client_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + } +} +} + + +void MySQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) { + // FIXME: no support for SSL yet + if ( + client_myds->myprot.process_pkt_auth_swich_response((unsigned char *)pkt->ptr,pkt->size)==true + ) { + l_free(pkt->size,pkt->ptr); + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,2,0,0,0,0,NULL); + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } else { + l_free(pkt->size,pkt->ptr); + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for frontend: disconnecting\n"); + *wrong_pass=true; + // FIXME: this should become close connection + client_myds->setDSS_STATE_QUERY_SENT_NET(); + char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100); + sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO")); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"#28000", _s); + free(_s); + } +} + +void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) { + if ( + (client_myds->myprot.process_pkt_handshake_response((unsigned char *)pkt->ptr,pkt->size)==true) + && + ( (default_hostgroup<0 && admin==true) || (default_hostgroup>=0 && admin==false) ) + ) { + if (default_hostgroup<0 && admin==true) { + if (default_hostgroup==STATS_HOSTGROUP) { + stats=true; + } + } + l_free(pkt->size,pkt->ptr); + if (client_myds->encrypted==false) { + if (client_myds->myconn->userinfo->schemaname==NULL) { + client_myds->myconn->userinfo->set_schemaname(mysql_thread___default_schema,strlen(mysql_thread___default_schema)); + } + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,2,0,0,0,0,NULL); + //server_myds->myconn->userinfo->set(client_myds->myconn->userinfo); + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_CLIENT_AUTH_OK; + //MySQL_Connection *myconn=client_myds->myconn; +/* + // enable compression + if (myconn->options.server_capabilities & CLIENT_COMPRESS) { + if (myconn->options.compression_min_length) { + myconn->set_status_compression(true); + } + } else { + //explicitly disable compression + myconn->options.compression_min_length=0; + myconn->set_status_compression(false); + } +*/ + } else { + // use SSL + client_myds->DSS=STATE_SSL_INIT; + client_myds->ssl=SSL_new(GloVars.global.ssl_ctx); + SSL_set_fd(client_myds->ssl, client_myds->fd); + ioctl_FIONBIO(client_myds->fd,0); + if (SSL_accept(client_myds->ssl)==-1) { + ERR_print_errors_fp(stderr); + } + ioctl_FIONBIO(client_myds->fd,1); + } + } else { + l_free(pkt->size,pkt->ptr); + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for frontend: disconnecting\n"); + *wrong_pass=true; + // FIXME: this should become close connection + client_myds->setDSS_STATE_QUERY_SENT_NET(); + char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100); + sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO")); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"#28000", _s); + free(_s); + client_myds->DSS=STATE_SLEEP; + //return -1; + } +} + +void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SSL_INIT(PtrSize_t *pkt) { + if (client_myds->myprot.process_pkt_handshake_response((unsigned char *)pkt->ptr,pkt->size)==true) { + l_free(pkt->size,pkt->ptr); + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,3,0,0,0,0,NULL); + mybe->server_myds->myconn->userinfo->set(client_myds->myconn->userinfo); + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + } else { + l_free(pkt->size,pkt->ptr); + // FIXME: this should become close connection + perror("Hitting a not implemented feature: https://github.com/sysown/proxysql-0.2/issues/124"); + assert(0); + } +} + + + + + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_SET_OPTION(PtrSize_t *pkt) { + char v; + v=*((char *)pkt->ptr+3); + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_SET_OPTION packet , value %d\n", v); + // FIXME: ProxySQL doesn't support yet CLIENT_MULTI_STATEMENTS + client_myds->setDSS_STATE_QUERY_SENT_NET(); + if (v==1) { + client_myds->myprot.generate_pkt_EOF(true,NULL,NULL,1,0,0); + } else { + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)""); + } + client_myds->DSS=STATE_SLEEP; + l_free(pkt->size,pkt->ptr); +} + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_PING(PtrSize_t *pkt) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_PING packet\n"); + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL); + client_myds->DSS=STATE_SLEEP; +} + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_FIELD_LIST(PtrSize_t *pkt) { + if (admin==false) { + /* FIXME: temporary */ + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + } else { + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + } +} + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(PtrSize_t *pkt) { + if (admin==false) { + client_myds->myconn->has_prepared_statement=true; + client_myds->myconn->processing_prepared_statement_prepare=true; + mybe=find_or_create_backend(default_hostgroup); + mybe->server_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } else { + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + } +} + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(PtrSize_t *pkt) { + if (admin==false) { + //client_myds->myconn->has_prepared_statement_execute=true; + client_myds->myconn->processing_prepared_statement_execute=true; + mybe=find_or_create_backend(default_hostgroup); + mybe->server_myds->PSarrayOUT->add(pkt->ptr, pkt->size); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } else { + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + } +} + +#ifdef DEBUG +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_debug(PtrSize_t *pkt) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got ProxySQL dbg packet\n"); + //SQLite3_result * result = SQL3_Session_status(); + SQLite3_result * result=NULL; + char *query=NULL; + unsigned int query_length=pkt->size-sizeof(mysql_hdr); + query=(char *)l_alloc(query_length); + memcpy(query,(char *)pkt->ptr+sizeof(mysql_hdr)+1,query_length-1); + query[query_length-1]=0; + + char *query_no_space=(char *)l_alloc(query_length); + memcpy(query_no_space,query,query_length); + + /*unsigned int query_no_space_length=*/remove_spaces(query_no_space); + + if (!strcasecmp(query_no_space,"DBG THREAD STATUS")) { + result = thread->SQL3_Thread_status(this); + goto __exit_from_debug; + } + if (!strcasecmp(query_no_space,"DBG THREADS STATUS")) { + result = GloMTH->SQL3_Threads_status(this); + goto __exit_from_debug; + } + if (!strcasecmp(query_no_space,"DBG SESSION STATUS")) { + result = SQL3_Session_status(); + goto __exit_from_debug; + } + + + + +__exit_from_debug: + l_free(query_length,query); + l_free(query_length,query_no_space); + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + if (result) { + // SQLite3_result * result = thread->SQL3_Thread_status(this); + SQLite3_to_MySQL(result,NULL,0,&client_myds->myprot); + delete result; + } else { + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + } + client_myds->DSS=STATE_SLEEP; +} +#endif /* DEBUG */ + + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_INIT_DB(PtrSize_t *pkt) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_INIT_DB packet\n"); + if (admin==false) { + client_myds->myconn->userinfo->set_schemaname((char *)pkt->ptr+sizeof(mysql_hdr)+1,pkt->size-sizeof(mysql_hdr)-1); + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL); + client_myds->DSS=STATE_SLEEP; + } else { + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,2,0,NULL); + client_myds->DSS=STATE_SLEEP; + } +} + + + +bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *pkt) { + if (qpo->new_query) { + // the query was rewritten + l_free(pkt->size,pkt->ptr); // free old pkt + // allocate new pkt + pkt->size=sizeof(mysql_hdr)+1+qpo->new_query->length(); + pkt->ptr=l_alloc(pkt->size); + mysql_hdr hdr; + hdr.pkt_id=0; + hdr.pkt_length=pkt->size-sizeof(mysql_hdr); + memcpy((unsigned char *)pkt->ptr, &hdr, sizeof(mysql_hdr)); // copy header + unsigned char *c=(unsigned char *)pkt->ptr+sizeof(mysql_hdr); + *c=(unsigned char)_MYSQL_COM_QUERY; // set command type + memcpy((unsigned char *)pkt->ptr+sizeof(mysql_hdr)+1,qpo->new_query->data(),qpo->new_query->length()); // copy query + delete qpo->new_query; + } + if (qpo->cache_ttl>0) { + client_myds->query_SQL=(unsigned char *)l_alloc(pkt->size-sizeof(mysql_hdr)); + memcpy(client_myds->query_SQL,(unsigned char *)pkt->ptr+sizeof(mysql_hdr)+1,pkt->size-sizeof(mysql_hdr)-1); + client_myds->query_SQL[pkt->size-sizeof(mysql_hdr)-1]=0; + uint32_t resbuf=0; + unsigned char *aa=GloQC->get(client_myds->query_SQL,&resbuf); + if (aa) { + l_free(pkt->size,pkt->ptr); + l_free(strlen((char *)client_myds->query_SQL)+1,client_myds->query_SQL); + client_myds->buffer2resultset(aa,resbuf); + free(aa); + client_myds->PSarrayOUT->copy_add(client_myds->resultset,0,client_myds->resultset->len); + while (client_myds->resultset->len) client_myds->resultset->remove_index(client_myds->resultset->len-1,NULL); + status=WAITING_CLIENT_DATA; + client_myds->DSS=STATE_SLEEP; + if (mysql_thread___commands_stats==true) { + CurrentQuery.end_time=thread->curtime; + CurrentQuery.query_parser_update_counters(); + } + GloQPro->delete_QP_out(qpo); + qpo=NULL; + return true; + } + } + if ( qpo->destination_hostgroup >= 0 ) current_hostgroup=qpo->destination_hostgroup; + return false; +} + + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STATISTICS(PtrSize_t *pkt) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_STATISTICS packet\n"); + l_free(pkt->size,pkt->ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_statistics_response(true,NULL,NULL); + client_myds->DSS=STATE_SLEEP; +} + +void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_CHANGE_USER(PtrSize_t *pkt, bool *wrong_pass) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_CHANGE_USER packet\n"); + if (admin==false) { + if (client_myds->myprot.process_pkt_COM_CHANGE_USER((unsigned char *)pkt->ptr, pkt->size)==true) { + l_free(pkt->size,pkt->ptr); + //client_myds->myprot.generate_pkt_auth_switch_request(true,NULL,NULL); + //client_myds->DSS=STATE_CLIENT_HANDSHAKE; + //status=CHANGING_USER_CLIENT; + client_myds->myprot.generate_pkt_OK(true,NULL,NULL,1,0,0,0,0,NULL); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + *wrong_pass=false; + } else { + l_free(pkt->size,pkt->ptr); + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Wrong credentials for frontend: disconnecting\n"); + *wrong_pass=true; + // FIXME: this should become close connection + client_myds->setDSS_STATE_QUERY_SENT_NET(); + char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100); + sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO")); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"#28000", _s); + free(_s); + } + } else { + //FIXME: send an error message saying "not supported" or disconnect + l_free(pkt->size,pkt->ptr); + } +} + +void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection() { + // Get a MySQL Connection + +// if (rand()%3==0) { + MySQL_Connection *mc=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id); + if (mc) { + mybe->server_myds->attach_connection(mc); + } +// } + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- server_myds=%p -- MySQL_Connection %p\n", this, mybe->server_myds, mybe->server_myds->myconn); + if (mybe->server_myds->myconn==NULL) { + // we couldn't get a connection for whatever reason, ex: no backends, or too busy + if (thread->mypolls.poll_timeout==0) { // tune poll timeout + if (thread->mypolls.poll_timeout > mysql_thread___poll_timeout_on_failure) { + thread->mypolls.poll_timeout = mysql_thread___poll_timeout_on_failure; + } + } + return; + } + if (mybe->server_myds->myconn->fd==-1) { + // we didn't get a valid connection, we need to create one + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- MySQL Connection has no FD\n", this); + int __fd; + MySQL_Connection *myconn=mybe->server_myds->myconn; +// myconn->mysql=mysql_init(NULL); +// assert(myconn->mysql); +// mysql_options(myconn->mysql, MYSQL_OPT_NONBLOCK, 0); + myconn->userinfo->set(client_myds->myconn->userinfo); + // FIXME: set client_flags + //mybe->server_myds->myconn->connect_start(); + //mybe->server_myds->fd=myconn->fd; + + //myconn->connect_start(); + myconn->handler(0); +/* + if (myconn->parent->port) { + myconn->async_exit_status=mysql_real_connect_start(&myconn->ret_mysql,myconn->mysql, myconn->parent->address, myconn->userinfo->username, myconn->userinfo->password, myconn->userinfo->schemaname, myconn->parent->port, NULL, 0); + } else { + myconn->async_exit_status=mysql_real_connect_start(&myconn->ret_mysql,myconn->mysql, "localhost", myconn->userinfo->username, myconn->userinfo->password, myconn->userinfo->schemaname, myconn->parent->port, myconn->parent->address, 0); + } + myconn->fd=mysql_get_socket(myconn->mysql); + if (myconn->async_exit_status) { +// myconn->async_state_machine=1; + } else { +// myconn->async_state_machine=2; + } +*/ + mybe->server_myds->fd=myconn->fd; + mybe->server_myds->DSS=STATE_MARIADB_CONNECTING; + status=CONNECTING_SERVER; + mybe->server_myds->myconn->reusable=true; + +/* + __fd=mybe->server_myds->myds_connect(mybe->server_myds->myconn->parent->address, mybe->server_myds->myconn->parent->port, &pending_connect); + + if (__fd==-1) { + MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); + mybe->server_myds->myconn=NULL; + return; + } + + mybe->server_myds->init((pending_connect==1 ? MYDS_BACKEND_NOT_CONNECTED : MYDS_BACKEND), this, __fd); + mybe->server_myds->myconn->reusable=true; + mybe->server_myds->myconn->fd=mybe->server_myds->fd; + status=CONNECTING_SERVER; + mybe->server_myds->DSS=STATE_NOT_CONNECTED; +*/ + } else { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- MySQL Connection found = %p\n", this, mybe->server_myds->myconn); + mybe->server_myds->assign_fd_from_mysql_conn(); + mybe->server_myds->myds_type=MYDS_BACKEND; + mybe->server_myds->DSS=STATE_READY; + if (session_fast_forward==true) { + status=FAST_FORWARD; + } + } +} + +void MySQL_Session::handler___client_DSS_QUERY_SENT___send_INIT_DB_to_backend() { + mybe->server_myds->move_from_OUT_to_OUTpending(); + mybe->server_myds->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname)); + status=CHANGING_SCHEMA; + mybe->server_myds->DSS=STATE_MARIADB_INITDB; + mybe->server_myds->myconn->async_state_machine=ASYNC_INITDB_START; + mybe->server_myds->myconn->handler(0); +// mybe->server_myds->move_from_OUT_to_OUTpending(); +// //userinfo_server.set_schemaname(userinfo_client.schemaname,strlen(userinfo_client.schemaname)); +// mybe->server_myds->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname)); +// //myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname); +// mybe->server_myds->myprot.generate_COM_INIT_DB(true,NULL,NULL,mybe->server_myds->myconn->userinfo->schemaname); +// mybe->server_myds->DSS=STATE_QUERY_SENT_DS; +// status=CHANGING_SCHEMA; +} + +void MySQL_Session::handler___client_DSS_QUERY_SENT___send_SET_NAMES_to_backend() { + mybe->server_myds->move_from_OUT_to_OUTpending(); + mybe->server_myds->myconn->set_charset(client_myds->myconn->options.charset); +// mybe->server_myds->myprot.generate_COM_QUERY(true,NULL,NULL,(char *)"SET NAMES utf8"); +// mybe->server_myds->DSS=STATE_QUERY_SENT_DS; + mybe->server_myds->DSS=STATE_MARIADB_SET_NAMES; + mybe->server_myds->myconn->async_state_machine=ASYNC_SET_NAMES_START; + mybe->server_myds->myconn->handler(0); + status=CHANGING_CHARSET; +} + +void MySQL_Session::handler___client_DSS_QUERY_SENT___send_CHANGE_USER_to_backend() { + mybe->server_myds->move_from_OUT_to_OUTpending(); + mybe->server_myds->myconn->userinfo->set(client_myds->myconn->userinfo); + mybe->server_myds->myprot.generate_COM_CHANGE_USER(true,NULL,NULL); + mybe->server_myds->DSS=STATE_QUERY_SENT_DS; + status=CHANGING_USER_SERVER; +} + + +void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MYSQL_RES *result, MySQL_Protocol *myprot) { + assert(myprot); + MySQL_Data_Stream *myds=myprot->get_myds(); + myds->DSS=STATE_QUERY_SENT_DS; + int sid=1; + unsigned int num_fields=mysql_field_count(mysql); + unsigned int num_rows; + if (result) { + // we have a result set, this should be a SELECT statement with result + myprot->generate_pkt_column_count(true,NULL,NULL,sid,num_fields); sid++; + for (unsigned int i=0; igenerate_pkt_field(true,NULL,NULL,sid,field->db,field->table,field->org_table,field->name,field->org_name,field->charsetnr,field->length,field->type,field->flags,field->decimals,false,0,NULL); + sid++; + } + myds->DSS=STATE_COLUMN_DEFINITION; + num_rows=mysql_num_rows(result); + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,0); sid++; + //char **p=(char **)malloc(sizeof(char*)*num_fields); + //int *l=(int *)malloc(sizeof(int*)*num_fields); + //p[0]="column test"; + for (unsigned int r=0; rrows[r]->sizes[i]; +// p[i]=result->rows[r]->fields[i]; +// } + myprot->generate_pkt_row(true,NULL,NULL,sid,num_fields,lengths,row); sid++; + } + myds->DSS=STATE_ROW; + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,2); sid++; + myds->DSS=STATE_SLEEP; + //free(l); + //free(p); + } else { // no result set + if (num_fields) { + num_rows = mysql_affected_rows(mysql); + myprot->generate_pkt_OK(true,NULL,NULL,sid,num_rows,mysql->insert_id,0,mysql->warning_count,mysql->info); + } else { + // error + char sqlstate[10]; + sprintf(sqlstate,"#%s",mysql_sqlstate(mysql)); + myprot->generate_pkt_ERR(true,NULL,NULL,sid,mysql_errno(mysql),sqlstate,mysql_error(mysql)); + } +// if (error) { +// // there was an error +// myprot->generate_pkt_ERR(true,NULL,NULL,sid,1045,(char *)"#28000",error); +// } else { +// // no error, DML succeeded +// myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,0,0,NULL); +// } +// myds->DSS=STATE_SLEEP; + } +} + +void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int affected_rows, MySQL_Protocol *myprot) { + assert(myprot); + MySQL_Data_Stream *myds=myprot->get_myds(); + myds->DSS=STATE_QUERY_SENT_DS; + int sid=1; + if (result) { +// sess->myprot_client.generate_pkt_OK(true,NULL,NULL,1,0,0,0,0,NULL); + myprot->generate_pkt_column_count(true,NULL,NULL,sid,result->columns); sid++; + for (int i=0; icolumns; i++) { + myprot->generate_pkt_field(true,NULL,NULL,sid,(char *)"",(char *)"",(char *)"",result->column_definition[i]->name,(char *)"",33,15,MYSQL_TYPE_VAR_STRING,1,0x1f,false,0,NULL); + sid++; + } + myds->DSS=STATE_COLUMN_DEFINITION; + + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,0); sid++; + char **p=(char **)malloc(sizeof(char*)*result->columns); + unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*result->columns); + //p[0]="column test"; + for (int r=0; rrows_count; r++) { + for (int i=0; icolumns; i++) { + l[i]=result->rows[r]->sizes[i]; + p[i]=result->rows[r]->fields[i]; + } + myprot->generate_pkt_row(true,NULL,NULL,sid,result->columns,l,p); sid++; + } + myds->DSS=STATE_ROW; + myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,2); sid++; + myds->DSS=STATE_SLEEP; + free(l); + free(p); + + } else { // no result set + if (error) { + // there was an error + myprot->generate_pkt_ERR(true,NULL,NULL,sid,1045,(char *)"#28000",error); + } else { + // no error, DML succeeded + myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,0,0,NULL); + } + myds->DSS=STATE_SLEEP; + } +} + +SQLite3_result * MySQL_Session::SQL3_Session_status() { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping MySQL Session status\n"); + SQLite3_result *result=new SQLite3_result(4); + result->add_column_definition(SQLITE_TEXT,"ThreadID"); + result->add_column_definition(SQLITE_TEXT,"Thread_ptr"); + result->add_column_definition(SQLITE_TEXT,"Session_ptr"); + result->add_column_definition(SQLITE_TEXT,"Status"); + + char buf[1024]; + + char **pta=(char **)malloc(sizeof(char *)*4); + long long int thread_id=syscall(SYS_gettid); + itostr(pta[0],thread_id); + pta[1]=(char *)malloc(32); + sprintf(pta[1],"%p",this->thread); + pta[2]=(char *)malloc(32); + sprintf(pta[2],"%p",this); + + std::string status_str; + status_str.reserve(10000); + status_str = "\n"; + status_str+= "============\n"; + status_str+= "MySQL Thread\n"; + status_str+= "============\n"; + status_str+= "ThreadID: "; + status_str.append(pta[0]); + status_str+= "\n"; + + status_str+="\ndefault_schema : "; status_str.append(mysql_thread___default_schema); + status_str+="\nserver_version : "; status_str.append(mysql_thread___server_version); + sprintf(buf,"\ncapabilities : %d\npoll_timeout : %d\n", mysql_thread___server_capabilities, mysql_thread___poll_timeout); + status_str.append(buf); + status_str+= "\n"; + + sprintf(buf, "Proxy_Polls: %p , len: %d , loops: %lu\n", &thread->mypolls, thread->mypolls.len, thread->mypolls.loops); + status_str.append(buf); + for (unsigned int i=0; i < thread->mypolls.len; i++) { + MySQL_Data_Stream *_myds=thread->mypolls.myds[i]; + sprintf(buf, "myds[%d]: %p = { fd=%d , events=%d , revents=%d } , type=%d , dss=%d , sess=%p , conn=%p\n", i, _myds , thread->mypolls.fds[i].fd , thread->mypolls.fds[i].events , thread->mypolls.fds[i].revents , _myds->myds_type , _myds->DSS , _myds->sess , _myds->myconn); + status_str.append(buf); + } + status_str+= "\n"; + + sprintf(buf, "MySQL Sessions: %p, len: %d\n", thread->mysql_sessions, thread->mysql_sessions->len); + status_str.append(buf); + for (unsigned int i=0; i < thread->mysql_sessions->len; i++) { + MySQL_Session *s=(MySQL_Session *)thread->mysql_sessions->pdata[i]; + MySQL_Connection_userinfo *ui=s->client_myds->myconn->userinfo; + sprintf(buf, "session[%d] = %p :\n\tuserinfo={%s,%s} , status=%d , myds={%p,%p} , HG={d:%d,c:%d}\n\tLast query= ", i, s, ui->username, ui->schemaname, s->status, s->client_myds, s->mybe->server_myds, s->default_hostgroup, s->current_hostgroup); + status_str.append(buf); + if (mysql_thread___commands_stats==true) { + if (s->CurrentQuery.QueryLength && s->CurrentQuery.MyComQueryCmd!=MYSQL_COM_QUERY___NONE) { + status_str.append((char *)s->CurrentQuery.QueryPointer); + } + } + status_str+= "\n"; + } + + + + + pta[3]=(char *)status_str.c_str(); + result->add_row(pta); + for (int i=0; i<3; i++) + free(pta[i]); + free(pta); + return result; +} + + +void MySQL_Session::set_unhealthy() { + proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess:%p\n", this); + healthy=0; +} diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index e0b37a900..1b4ada257 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -260,6 +260,27 @@ void MySQL_Connection::set_names_cont(short event) { async_exit_status = mysql_set_character_set_cont(&interr,mysql, mysql_status(event)); } +void MySQL_Connection::set_query(char *stmt, unsigned long length) { + query.ptr=stmt; + query.length=length; +} + +void MySQL_Connection::real_query_start() { + async_exit_status = mysql_real_query_start(&interr , mysql, query.ptr, query.length); +} + +void MySQL_Connection::real_query_cont(short event) { + async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event)); +} + +void MySQL_Connection::store_result_start() { + async_exit_status = mysql_store_result_start(&mysql_result, mysql); +} + +void MySQL_Connection::store_result_cont(short event) { + async_exit_status = mysql_store_result_cont(&mysql_result , mysql , mysql_status(event)); +} + #define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) MDB_ASYNC_ST MySQL_Connection::handler(short event) { @@ -325,6 +346,48 @@ handler_again: break; case ASYNC_PING_FAILED: break; + case ASYNC_QUERY_START: + real_query_start(); + if (async_exit_status) { + next_event(ASYNC_QUERY_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); + } + break; + case ASYNC_QUERY_CONT: + real_query_cont(event); + if (async_exit_status) { + next_event(ASYNC_QUERY_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); + } + break; + case ASYNC_STORE_RESULT_START: + if (mysql_errno(mysql)) { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + store_result_start(); + if (async_exit_status) { + next_event(ASYNC_STORE_RESULT_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + break; + case ASYNC_STORE_RESULT_CONT: + store_result_cont(event); + if (async_exit_status) { + next_event(ASYNC_STORE_RESULT_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + break; + case ASYNC_QUERY_END: +// if (interr) {i +// NEXT_IMMEDIATE(ASYNC_PING_FAILED); +// } else { +// NEXT_IMMEDIATE(ASYNC_PING_SUCCESSFUL); +// } + break; case ASYNC_SET_NAMES_START: set_names_start(); if (async_exit_status) { diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index 48bf14db4..70c0f51aa 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -95,6 +95,9 @@ MySQL_Data_Stream::MySQL_Data_Stream() { pkts_recv=0; pkts_sent=0; + mysql_real_query.ptr=NULL; + mysql_real_query.size=0; + timeout=0; connect_tries=0; poll_fds_idx=-1; @@ -255,7 +258,8 @@ int MySQL_Data_Stream::read_from_net() { //proxy_error("read %d bytes from fd %d into a buffer of %d bytes free\n", r, fd, s); if (r < 1) { if (encrypted==false) { - if (r==0 || (r==-1 && errno != EINTR && errno != EAGAIN)) { + int myds_errno=errno; + if (r==0 || (r==-1 && myds_errno != EINTR && myds_errno != EAGAIN)) { shut_soft(); } } else { diff --git a/src/proxysql.cfg b/src/proxysql.cfg index 41dc533c4..a27c2c32d 100644 --- a/src/proxysql.cfg +++ b/src/proxysql.cfg @@ -18,7 +18,7 @@ admin_variables= mysql_variables= { - threads=2 + threads=1 //threads=32 have_compress=true poll_timeout=2000