diff --git a/include/MySQL_Data_Stream.h b/include/MySQL_Data_Stream.h index ea8cd1be3..0e4eedf6f 100644 --- a/include/MySQL_Data_Stream.h +++ b/include/MySQL_Data_Stream.h @@ -103,6 +103,10 @@ class MySQL_Data_Stream void set_net_failure(); void setDSS_STATE_QUERY_SENT_NET(); + void setDSS(enum mysql_data_stream_status dss) { + DSS=dss; + } + int read_pkts(); int write_pkts(); @@ -117,5 +121,20 @@ class MySQL_Data_Stream void move_from_OUT_to_OUTpending(); unsigned char * resultset2buffer(bool); void buffer2resultset(unsigned char *, unsigned int); + + // safe way to attach a MySQL Connection + void attach_connection(MySQL_Connection *mc) { + myconn=mc; + mc->myds=this; + } + + // safe way to detach a MySQL Connection + void detach_connection() { + assert(myconn==NULL); + myconn->myds=NULL; + myconn=NULL; + } + + }; #endif /* __CLASS_MYSQL_DATA_STREAM_H */ diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 630c11940..f929b6713 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -42,8 +42,8 @@ class MySQL_Session void handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSize_t *); void handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *); void handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *); - void handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(PtrSize_t *); - void handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *); + //void handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(PtrSize_t *); + //void handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *); void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *, bool *); void handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 1988d2efd..331be5ae2 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -177,9 +177,9 @@ class MySQL_Thread void process_all_sessions_connections_handler(); void register_session_connection_handler(MySQL_Session *_sess); void unregister_session_connection_handler(int idx); - void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n); - void myds_backend_pause_connect(MySQL_Data_Stream *myds); - void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n); + //void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n); + //void myds_backend_pause_connect(MySQL_Data_Stream *myds); + //void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n); void listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n); SQLite3_result * SQL3_Thread_status(MySQL_Session *sess); }; diff --git a/include/mysql_connection.h b/include/mysql_connection.h index bb0aeabe9..ac6f54e8e 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -34,6 +34,9 @@ class MySQL_Connection { public: int fd; char scramble_buff[40]; + int async_status; + MYSQL *mysql; + MYSQL *ret_mysql; struct { uint32_t max_allowed_pkt; uint32_t server_capabilities; @@ -71,5 +74,6 @@ class MySQL_Connection { bool get_status_compression(); bool get_status_prepared_statement(); bool get_status_user_variable(); + void connect_start(); }; #endif /* __CLASS_MYSQL_CONNECTION_H */ diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index a2b67a671..c834a614a 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -89,6 +89,13 @@ enum mysql_data_stream_status { STATE_READING_COM_STMT_PREPARE_RESPONSE, + STATE_MARIADB_BEGIN, // dummy state + STATE_MARIADB_CONNECTING, // using MariaDB Client Library + STATE_MARIADB_PING_START, + STATE_MARIADB_PING_CONT, + STATE_MARIADB_PING_FAILURE, + STATE_MARIADB_END, // dummy state + STATE_END /* STATE_ONE_STRING, diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 2eaeed042..ba499df25 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -443,27 +443,63 @@ __get_a_backend: __exit_DSS__STATE_NOT_INITIALIZED: + if (mybe && mybe->server_myds) { + if (mybe->server_myds->DSS > STATE_MARIADB_BEGIN && mybe->server_myds->DSS < STATE_MARIADB_END) { + MySQL_Data_Stream *myds=mybe->server_myds; + MySQL_Connection *myconn=mybe->server_myds->myconn; + int ms_status = 0; + switch (status) { + case CONNECTING_SERVER: + if (myds->revents & POLLIN) ms_status |= MYSQL_WAIT_READ; + if (myds->revents & POLLOUT) ms_status |= MYSQL_WAIT_WRITE; + if (myds->revents & POLLPRI) ms_status |= MYSQL_WAIT_EXCEPT; + if (ms_status) { + myconn->async_status = mysql_real_connect_cont(&myconn->ret_mysql, myconn->mysql, ms_status); + if (myconn->async_status==0) { + if (myconn->ret_mysql) { + myds->myds_type=MYDS_BACKEND; + myds->DSS=STATE_READY; + //mybe->myconn=server_myds->myconn; + status=WAITING_SERVER_DATA; + unsigned int k; + PtrSize_t pkt2; + for (k=0; kPSarrayOUTpending->len;) { + myds->PSarrayOUTpending->remove_index(0,&pkt2); + myds->PSarrayOUT->add(pkt2.ptr, pkt2.size); + myds->DSS=STATE_QUERY_SENT_DS; + } + //assert(0); + } + } + } + break; + default: + assert(0); + break; + } + } else { + + // } - if (mybe && mybe->server_myds) { for (j=0; jserver_myds->PSarrayIN->len;) { mybe->server_myds->PSarrayIN->remove_index(0,&pkt); switch (status) { - case CONNECTING_SERVER: - - switch (mybe->server_myds->DSS) { - case STATE_NOT_CONNECTED: - handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(&pkt); - break; - case STATE_CLIENT_HANDSHAKE: - handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(&pkt, &wrong_pass); - break; - default: - assert(0); - - } - break; +// case CONNECTING_SERVER: +// +// switch (mybe->server_myds->DSS) { +// case STATE_NOT_CONNECTED: +// handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(&pkt); +// break; +// case STATE_CLIENT_HANDSHAKE: +// handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(&pkt, &wrong_pass); +// break; +// default: +// assert(0); +// +// } +// break; case WAITING_SERVER_DATA: switch (mybe->server_myds->DSS) { @@ -521,7 +557,7 @@ __exit_DSS__STATE_NOT_INITIALIZED: } } - + } writeout(); // FIXME: see bug #211 @@ -552,12 +588,14 @@ __exit_DSS__STATE_NOT_INITIALIZED: if (mybe->server_myds->net_failure) { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess:%p , MYDS:%p , myds_type=%d, DSS=%d , myconn:%p\n" , this, mybe->server_myds , mybe->server_myds->myds_type , mybe->server_myds->DSS, mybe->server_myds->myconn); if (( mybe->server_myds->DSS==STATE_READY || mybe->server_myds->DSS==STATE_QUERY_SENT_DS ) && mybe->server_myds->myds_type==MYDS_BACKEND) { - mybe->server_myds->myconn=NULL; + //mybe->server_myds->myconn=NULL; + mybe->server_myds->detach_connection(); mybe->server_myds->DSS=STATE_NOT_INITIALIZED; mybe->server_myds->move_from_OUT_to_OUTpending(); if (mybe->server_myds->myconn) { MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); - mybe->server_myds->myconn=NULL; + //mybe->server_myds->myconn=NULL; + mybe->server_myds->detach_connection(); } if (mybe->server_myds->fd) { mybe->server_myds->shut_hard(); @@ -699,7 +737,8 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSi mybe->server_myds->myconn->last_time_used=thread->curtime; MyHGM->push_MyConn_to_pool(mybe->server_myds->myconn); //MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn); - mybe->server_myds->myconn=NULL; + //mybe->server_myds->myconn=NULL; + mybe->server_myds->detach_connection(); mybe->server_myds->unplug_backend(); } } @@ -927,7 +966,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t } } - +/* void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(PtrSize_t *pkt) { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CONNECTING_SERVER - STATE_NOT_CONNECTED\n"); if (mybe->server_myds->myprot.process_pkt_initial_handshake((unsigned char *)pkt->ptr,pkt->size)==true) { @@ -996,7 +1035,7 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE( mybe->server_myds->myconn->reusable=false; } } - +*/ void MySQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) { @@ -1320,7 +1359,10 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED // Get a MySQL Connection // if (rand()%3==0) { - mybe->server_myds->myconn=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id); + MySQL_Connection *mc=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id); + if (mc) { + mybe->server_myds->attach_connection(mc); + } // } proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- server_myds=%p -- MySQL_Connection %p\n", this, mybe->server_myds, mybe->server_myds->myconn); if (mybe->server_myds->myconn==NULL) { @@ -1336,6 +1378,27 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED // we didn't get a valid connection, we need to create one proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- MySQL Connection has no FD\n", this); int __fd; + MySQL_Connection *myconn=mybe->server_myds->myconn; + myconn->mysql=mysql_init(NULL); + assert(myconn->mysql); + mysql_options(myconn->mysql, MYSQL_OPT_NONBLOCK, 0); + myconn->userinfo->set(client_myds->myconn->userinfo); + // FIXME: set client_flags + //mybe->server_myds->myconn->connect_start(); + //mybe->server_myds->fd=myconn->fd; + + + if (myconn->parent->port) { + myconn->async_status=mysql_real_connect_start(&myconn->ret_mysql,myconn->mysql, myconn->parent->address, myconn->userinfo->username, myconn->userinfo->password, myconn->userinfo->schemaname, myconn->parent->port, NULL, 0); + } else { + myconn->async_status=mysql_real_connect_start(&myconn->ret_mysql,myconn->mysql, "localhost", myconn->userinfo->username, myconn->userinfo->password, myconn->userinfo->schemaname, myconn->parent->port, myconn->parent->address, 0); + } + myconn->fd=mysql_get_socket(myconn->mysql); + mybe->server_myds->fd=myconn->fd; + mybe->server_myds->DSS=STATE_MARIADB_CONNECTING; + status=CONNECTING_SERVER; + +/* __fd=mybe->server_myds->myds_connect(mybe->server_myds->myconn->parent->address, mybe->server_myds->myconn->parent->port, &pending_connect); if (__fd==-1) { @@ -1349,6 +1412,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED mybe->server_myds->myconn->fd=mybe->server_myds->fd; status=CONNECTING_SERVER; mybe->server_myds->DSS=STATE_NOT_CONNECTED; +*/ } else { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- MySQL Connection found = %p\n", this, mybe->server_myds->myconn); mybe->server_myds->assign_fd_from_mysql_conn(); diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 8733edb5e..c6c93f051 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -835,8 +835,10 @@ MySQL_Session * MySQL_Thread::create_new_session_and_client_data_stream(int _fd) //sess->myprot_client.dump_pkt=true; sess->client_myds->myprot.dump_pkt=true; #endif - sess->client_myds->myconn=new MySQL_Connection(); - MySQL_Connection *myconn=sess->client_myds->myconn; + //sess->client_myds->myconn=new MySQL_Connection(); + //MySQL_Connection *myconn=sess->client_myds->myconn; + MySQL_Connection *myconn=new MySQL_Connection; + sess->client_myds->attach_connection(myconn); //myconn=new MySQL_Connection(); // 20141011 // if (mysql_thread___have_compress) { // myconn->options.compression_min_length=50; @@ -987,7 +989,14 @@ void MySQL_Thread::run() { for (n = 0; n < mypolls.len; n++) { mypolls.fds[n].revents=0; if (mypolls.myds[n] && mypolls.myds[n]->myds_type!=MYDS_LISTENER && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT) { - mypolls.myds[n]->set_pollout(); +// mypolls.myds[n]->set_pollout(); + if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) { + mypolls.fds[n].events = POLLIN; + if (mypolls.myds[n]->myconn->async_status & MYSQL_WAIT_WRITE) + mypolls.fds[n].events |= POLLOUT; + } else { + mypolls.myds[n]->set_pollout(); + } } } @@ -1060,6 +1069,8 @@ void MySQL_Thread::run() { } if (mypolls.fds[n].revents==0) { +/* + // FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout switch(myds->myds_type) { case MYDS_BACKEND_NOT_CONNECTED: myds_backend_set_failed_connect(myds,n); @@ -1072,11 +1083,13 @@ void MySQL_Thread::run() { continue; break; } - +*/ } else { // check if the FD is valid assert(mypolls.fds[n].revents!=POLLNVAL); switch(myds->myds_type) { +/* + // FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout case MYDS_BACKEND_NOT_CONNECTED: // if (myds->myds_type==MYDS_BACKEND_NOT_CONNECTED && mypolls.fds[n].revents) { if ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) { @@ -1089,6 +1102,7 @@ void MySQL_Thread::run() { myds_backend_first_packet_after_connect(myds, n); } break; +*/ case MYDS_LISTENER: // we got a new connection! listener_handle_new_connection(myds,n); @@ -1117,9 +1131,11 @@ void MySQL_Thread::run() { void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) { mypolls.last_recv[n]=curtime; myds->revents=mypolls.fds[n].revents; - myds->read_from_net(); - myds->read_pkts(); - + if (mypolls.myds[n]->DSS < STATE_MARIADB_BEGIN || mypolls.myds[n]->DSS > STATE_MARIADB_END) { + // only if we aren't using MariaDB Client Library + myds->read_from_net(); + myds->read_pkts(); + } if ( (mypolls.fds[n].events & POLLOUT) && ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) @@ -1302,7 +1318,7 @@ void MySQL_Thread::unregister_session_connection_handler(int idx) { mysql_sessions_connections_handler->remove_index_fast(idx); } - +/* void MySQL_Thread::myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n) { if (curtime>mypolls.last_recv[n]+10000000) { proxy_error("connect() timeout . curtime: %llu , last_recv: %llu , failed after %lluus . fd: %d , myds_type: %s\n", curtime, mypolls.last_recv[n] , (curtime-mypolls.last_recv[n]) , myds->fd, (myds->myds_type==MYDS_BACKEND_PAUSE_CONNECT ? "MYDS_BACKEND_PAUSE_CONNECT" : "MYDS_BACKEND_NOT_CONNECTED" ) ); @@ -1333,6 +1349,7 @@ void MySQL_Thread::myds_backend_first_packet_after_connect(MySQL_Data_Stream *my myds->sess->pause=curtime+10000000; } } +*/ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n) { int c; diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index b2afe7411..1b4cbcb63 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -101,6 +101,8 @@ bool MySQL_Connection_userinfo::set_schemaname(char *_new, int l) { MySQL_Connection::MySQL_Connection() { //memset(&myconn,0,sizeof(MYSQL)); + mysql=NULL; + ret_mysql=NULL; myds=NULL; inserted_into_pool=0; reusable=false; @@ -124,7 +126,11 @@ MySQL_Connection::~MySQL_Connection() { delete userinfo; userinfo=NULL; } - if (myds) { + if (mysql) { + mysql_close(mysql); + mysql=NULL; + } + if (myds) { // FIXME: with the use of mysql client library , this part should be gone myds->shut_hard(); } else { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "MySQL_Connection %p , fd:%d\n", this, fd); @@ -192,3 +198,14 @@ bool MySQL_Connection::get_status_user_variable() { bool MySQL_Connection::get_status_prepared_statement() { return status_flags & STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT; } + +// non blocking API +void MySQL_Connection::connect_start() { + if (parent->port) { + async_status=mysql_real_connect_start(&ret_mysql, mysql, parent->address, userinfo->username, userinfo->password, userinfo->schemaname, parent->port, NULL, 0); + } else { + async_status=mysql_real_connect_start(&ret_mysql, mysql, "localhost", userinfo->username, userinfo->password, userinfo->schemaname, parent->port, parent->address, 0); + } + fd=mysql_get_socket(mysql); +} + diff --git a/src/proxysql.cfg b/src/proxysql.cfg index 080440418..0c3e05a1a 100644 --- a/src/proxysql.cfg +++ b/src/proxysql.cfg @@ -18,7 +18,7 @@ admin_variables= mysql_variables= { - threads=1 + threads=4 //threads=32 have_compress=true poll_timeout=2000