Connects to backends is performed via MariaDB Client Library

pull/317/head
René Cannaò 11 years ago
parent ba16850d2d
commit ff5c62830c

@ -103,6 +103,10 @@ class MySQL_Data_Stream
void set_net_failure();
void setDSS_STATE_QUERY_SENT_NET();
void setDSS(enum mysql_data_stream_status dss) {
DSS=dss;
}
int read_pkts();
int write_pkts();
@ -117,5 +121,20 @@ class MySQL_Data_Stream
void move_from_OUT_to_OUTpending();
unsigned char * resultset2buffer(bool);
void buffer2resultset(unsigned char *, unsigned int);
// safe way to attach a MySQL Connection
void attach_connection(MySQL_Connection *mc) {
myconn=mc;
mc->myds=this;
}
// safe way to detach a MySQL Connection
void detach_connection() {
assert(myconn==NULL);
myconn->myds=NULL;
myconn=NULL;
}
};
#endif /* __CLASS_MYSQL_DATA_STREAM_H */

@ -42,8 +42,8 @@ class MySQL_Session
void handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *);
void handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(PtrSize_t *);
void handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *);
//void handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(PtrSize_t *);
//void handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *);
void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *, bool *);
void handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *);

@ -177,9 +177,9 @@ class MySQL_Thread
void process_all_sessions_connections_handler();
void register_session_connection_handler(MySQL_Session *_sess);
void unregister_session_connection_handler(int idx);
void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n);
void myds_backend_pause_connect(MySQL_Data_Stream *myds);
void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n);
//void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n);
//void myds_backend_pause_connect(MySQL_Data_Stream *myds);
//void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n);
void listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n);
SQLite3_result * SQL3_Thread_status(MySQL_Session *sess);
};

@ -34,6 +34,9 @@ class MySQL_Connection {
public:
int fd;
char scramble_buff[40];
int async_status;
MYSQL *mysql;
MYSQL *ret_mysql;
struct {
uint32_t max_allowed_pkt;
uint32_t server_capabilities;
@ -71,5 +74,6 @@ class MySQL_Connection {
bool get_status_compression();
bool get_status_prepared_statement();
bool get_status_user_variable();
void connect_start();
};
#endif /* __CLASS_MYSQL_CONNECTION_H */

@ -89,6 +89,13 @@ enum mysql_data_stream_status {
STATE_READING_COM_STMT_PREPARE_RESPONSE,
STATE_MARIADB_BEGIN, // dummy state
STATE_MARIADB_CONNECTING, // using MariaDB Client Library
STATE_MARIADB_PING_START,
STATE_MARIADB_PING_CONT,
STATE_MARIADB_PING_FAILURE,
STATE_MARIADB_END, // dummy state
STATE_END
/*
STATE_ONE_STRING,

@ -443,27 +443,63 @@ __get_a_backend:
__exit_DSS__STATE_NOT_INITIALIZED:
if (mybe && mybe->server_myds) {
if (mybe->server_myds->DSS > STATE_MARIADB_BEGIN && mybe->server_myds->DSS < STATE_MARIADB_END) {
MySQL_Data_Stream *myds=mybe->server_myds;
MySQL_Connection *myconn=mybe->server_myds->myconn;
int ms_status = 0;
switch (status) {
case CONNECTING_SERVER:
if (myds->revents & POLLIN) ms_status |= MYSQL_WAIT_READ;
if (myds->revents & POLLOUT) ms_status |= MYSQL_WAIT_WRITE;
if (myds->revents & POLLPRI) ms_status |= MYSQL_WAIT_EXCEPT;
if (ms_status) {
myconn->async_status = mysql_real_connect_cont(&myconn->ret_mysql, myconn->mysql, ms_status);
if (myconn->async_status==0) {
if (myconn->ret_mysql) {
myds->myds_type=MYDS_BACKEND;
myds->DSS=STATE_READY;
//mybe->myconn=server_myds->myconn;
status=WAITING_SERVER_DATA;
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<myds->PSarrayOUTpending->len;) {
myds->PSarrayOUTpending->remove_index(0,&pkt2);
myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
myds->DSS=STATE_QUERY_SENT_DS;
}
//assert(0);
}
}
}
break;
default:
assert(0);
break;
}
} else {
// }
if (mybe && mybe->server_myds) {
for (j=0; j<mybe->server_myds->PSarrayIN->len;) {
mybe->server_myds->PSarrayIN->remove_index(0,&pkt);
switch (status) {
case CONNECTING_SERVER:
switch (mybe->server_myds->DSS) {
case STATE_NOT_CONNECTED:
handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(&pkt);
break;
case STATE_CLIENT_HANDSHAKE:
handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(&pkt, &wrong_pass);
break;
default:
assert(0);
}
break;
// case CONNECTING_SERVER:
//
// switch (mybe->server_myds->DSS) {
// case STATE_NOT_CONNECTED:
// handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(&pkt);
// break;
// case STATE_CLIENT_HANDSHAKE:
// handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(&pkt, &wrong_pass);
// break;
// default:
// assert(0);
//
// }
// break;
case WAITING_SERVER_DATA:
switch (mybe->server_myds->DSS) {
@ -521,7 +557,7 @@ __exit_DSS__STATE_NOT_INITIALIZED:
}
}
}
writeout();
// FIXME: see bug #211
@ -552,12 +588,14 @@ __exit_DSS__STATE_NOT_INITIALIZED:
if (mybe->server_myds->net_failure) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess:%p , MYDS:%p , myds_type=%d, DSS=%d , myconn:%p\n" , this, mybe->server_myds , mybe->server_myds->myds_type , mybe->server_myds->DSS, mybe->server_myds->myconn);
if (( mybe->server_myds->DSS==STATE_READY || mybe->server_myds->DSS==STATE_QUERY_SENT_DS ) && mybe->server_myds->myds_type==MYDS_BACKEND) {
mybe->server_myds->myconn=NULL;
//mybe->server_myds->myconn=NULL;
mybe->server_myds->detach_connection();
mybe->server_myds->DSS=STATE_NOT_INITIALIZED;
mybe->server_myds->move_from_OUT_to_OUTpending();
if (mybe->server_myds->myconn) {
MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn);
mybe->server_myds->myconn=NULL;
//mybe->server_myds->myconn=NULL;
mybe->server_myds->detach_connection();
}
if (mybe->server_myds->fd) {
mybe->server_myds->shut_hard();
@ -699,7 +737,8 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSi
mybe->server_myds->myconn->last_time_used=thread->curtime;
MyHGM->push_MyConn_to_pool(mybe->server_myds->myconn);
//MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn);
mybe->server_myds->myconn=NULL;
//mybe->server_myds->myconn=NULL;
mybe->server_myds->detach_connection();
mybe->server_myds->unplug_backend();
}
}
@ -927,7 +966,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t
}
}
/*
void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CONNECTING_SERVER - STATE_NOT_CONNECTED\n");
if (mybe->server_myds->myprot.process_pkt_initial_handshake((unsigned char *)pkt->ptr,pkt->size)==true) {
@ -996,7 +1035,7 @@ void MySQL_Session::handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(
mybe->server_myds->myconn->reusable=false;
}
}
*/
void MySQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) {
@ -1320,7 +1359,10 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
// Get a MySQL Connection
// if (rand()%3==0) {
mybe->server_myds->myconn=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id);
MySQL_Connection *mc=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id);
if (mc) {
mybe->server_myds->attach_connection(mc);
}
// }
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- server_myds=%p -- MySQL_Connection %p\n", this, mybe->server_myds, mybe->server_myds->myconn);
if (mybe->server_myds->myconn==NULL) {
@ -1336,6 +1378,27 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
// we didn't get a valid connection, we need to create one
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- MySQL Connection has no FD\n", this);
int __fd;
MySQL_Connection *myconn=mybe->server_myds->myconn;
myconn->mysql=mysql_init(NULL);
assert(myconn->mysql);
mysql_options(myconn->mysql, MYSQL_OPT_NONBLOCK, 0);
myconn->userinfo->set(client_myds->myconn->userinfo);
// FIXME: set client_flags
//mybe->server_myds->myconn->connect_start();
//mybe->server_myds->fd=myconn->fd;
if (myconn->parent->port) {
myconn->async_status=mysql_real_connect_start(&myconn->ret_mysql,myconn->mysql, myconn->parent->address, myconn->userinfo->username, myconn->userinfo->password, myconn->userinfo->schemaname, myconn->parent->port, NULL, 0);
} else {
myconn->async_status=mysql_real_connect_start(&myconn->ret_mysql,myconn->mysql, "localhost", myconn->userinfo->username, myconn->userinfo->password, myconn->userinfo->schemaname, myconn->parent->port, myconn->parent->address, 0);
}
myconn->fd=mysql_get_socket(myconn->mysql);
mybe->server_myds->fd=myconn->fd;
mybe->server_myds->DSS=STATE_MARIADB_CONNECTING;
status=CONNECTING_SERVER;
/*
__fd=mybe->server_myds->myds_connect(mybe->server_myds->myconn->parent->address, mybe->server_myds->myconn->parent->port, &pending_connect);
if (__fd==-1) {
@ -1349,6 +1412,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
mybe->server_myds->myconn->fd=mybe->server_myds->fd;
status=CONNECTING_SERVER;
mybe->server_myds->DSS=STATE_NOT_CONNECTED;
*/
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- MySQL Connection found = %p\n", this, mybe->server_myds->myconn);
mybe->server_myds->assign_fd_from_mysql_conn();

@ -835,8 +835,10 @@ MySQL_Session * MySQL_Thread::create_new_session_and_client_data_stream(int _fd)
//sess->myprot_client.dump_pkt=true;
sess->client_myds->myprot.dump_pkt=true;
#endif
sess->client_myds->myconn=new MySQL_Connection();
MySQL_Connection *myconn=sess->client_myds->myconn;
//sess->client_myds->myconn=new MySQL_Connection();
//MySQL_Connection *myconn=sess->client_myds->myconn;
MySQL_Connection *myconn=new MySQL_Connection;
sess->client_myds->attach_connection(myconn);
//myconn=new MySQL_Connection(); // 20141011
// if (mysql_thread___have_compress) {
// myconn->options.compression_min_length=50;
@ -987,7 +989,14 @@ void MySQL_Thread::run() {
for (n = 0; n < mypolls.len; n++) {
mypolls.fds[n].revents=0;
if (mypolls.myds[n] && mypolls.myds[n]->myds_type!=MYDS_LISTENER && mypolls.myds[n]->myds_type!=MYDS_BACKEND_PAUSE_CONNECT) {
mypolls.myds[n]->set_pollout();
// mypolls.myds[n]->set_pollout();
if (mypolls.myds[n]->DSS > STATE_MARIADB_BEGIN && mypolls.myds[n]->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
mypolls.myds[n]->set_pollout();
}
}
}
@ -1060,6 +1069,8 @@ void MySQL_Thread::run() {
}
if (mypolls.fds[n].revents==0) {
/*
// FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout
switch(myds->myds_type) {
case MYDS_BACKEND_NOT_CONNECTED:
myds_backend_set_failed_connect(myds,n);
@ -1072,11 +1083,13 @@ void MySQL_Thread::run() {
continue;
break;
}
*/
} else {
// check if the FD is valid
assert(mypolls.fds[n].revents!=POLLNVAL);
switch(myds->myds_type) {
/*
// FIXME: this logic was removed completely because we added mariadb client library. Yet, we need to implement a way to manage connection timeout
case MYDS_BACKEND_NOT_CONNECTED:
// if (myds->myds_type==MYDS_BACKEND_NOT_CONNECTED && mypolls.fds[n].revents) {
if ( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) ) {
@ -1089,6 +1102,7 @@ void MySQL_Thread::run() {
myds_backend_first_packet_after_connect(myds, n);
}
break;
*/
case MYDS_LISTENER:
// we got a new connection!
listener_handle_new_connection(myds,n);
@ -1117,9 +1131,11 @@ void MySQL_Thread::run() {
void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned int n) {
mypolls.last_recv[n]=curtime;
myds->revents=mypolls.fds[n].revents;
myds->read_from_net();
myds->read_pkts();
if (mypolls.myds[n]->DSS < STATE_MARIADB_BEGIN || mypolls.myds[n]->DSS > STATE_MARIADB_END) {
// only if we aren't using MariaDB Client Library
myds->read_from_net();
myds->read_pkts();
}
if ( (mypolls.fds[n].events & POLLOUT)
&&
( (mypolls.fds[n].revents & POLLERR) || (mypolls.fds[n].revents & POLLHUP) )
@ -1302,7 +1318,7 @@ void MySQL_Thread::unregister_session_connection_handler(int idx) {
mysql_sessions_connections_handler->remove_index_fast(idx);
}
/*
void MySQL_Thread::myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n) {
if (curtime>mypolls.last_recv[n]+10000000) {
proxy_error("connect() timeout . curtime: %llu , last_recv: %llu , failed after %lluus . fd: %d , myds_type: %s\n", curtime, mypolls.last_recv[n] , (curtime-mypolls.last_recv[n]) , myds->fd, (myds->myds_type==MYDS_BACKEND_PAUSE_CONNECT ? "MYDS_BACKEND_PAUSE_CONNECT" : "MYDS_BACKEND_NOT_CONNECTED" ) );
@ -1333,6 +1349,7 @@ void MySQL_Thread::myds_backend_first_packet_after_connect(MySQL_Data_Stream *my
myds->sess->pause=curtime+10000000;
}
}
*/
void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n) {
int c;

@ -101,6 +101,8 @@ bool MySQL_Connection_userinfo::set_schemaname(char *_new, int l) {
MySQL_Connection::MySQL_Connection() {
//memset(&myconn,0,sizeof(MYSQL));
mysql=NULL;
ret_mysql=NULL;
myds=NULL;
inserted_into_pool=0;
reusable=false;
@ -124,7 +126,11 @@ MySQL_Connection::~MySQL_Connection() {
delete userinfo;
userinfo=NULL;
}
if (myds) {
if (mysql) {
mysql_close(mysql);
mysql=NULL;
}
if (myds) { // FIXME: with the use of mysql client library , this part should be gone
myds->shut_hard();
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "MySQL_Connection %p , fd:%d\n", this, fd);
@ -192,3 +198,14 @@ bool MySQL_Connection::get_status_user_variable() {
bool MySQL_Connection::get_status_prepared_statement() {
return status_flags & STATUS_MYSQL_CONNECTION_PREPARED_STATEMENT;
}
// non blocking API
void MySQL_Connection::connect_start() {
if (parent->port) {
async_status=mysql_real_connect_start(&ret_mysql, mysql, parent->address, userinfo->username, userinfo->password, userinfo->schemaname, parent->port, NULL, 0);
} else {
async_status=mysql_real_connect_start(&ret_mysql, mysql, "localhost", userinfo->username, userinfo->password, userinfo->schemaname, parent->port, parent->address, 0);
}
fd=mysql_get_socket(mysql);
}

@ -18,7 +18,7 @@ admin_variables=
mysql_variables=
{
threads=1
threads=4
//threads=32
have_compress=true
poll_timeout=2000

Loading…
Cancel
Save