Handling mysql_select_db() and mysql_set_character_set() for backends using non blocking API

pull/317/head
René Cannaò 11 years ago
parent 10b5f6da6d
commit 271afbb02c

1
deps/Makefile vendored

@ -41,6 +41,7 @@ mariadb-client-library/mariadb_client/include/my_config.h:
cd mariadb-client-library && tar -zxf mariadb-connector-c-2.1.0-src.tar.gz
cd mariadb-client-library/mariadb_client && cmake .
cd mariadb-client-library/mariadb_client && patch libmariadb/libmariadb.c < ../libmariadb.c.patch
cd mariadb-client-library/mariadb_client && patch include/mysql.h < ../mysql.h.patch
cd mariadb-client-library/mariadb_client && make
# cd mariadb-client-library/mariadb_client/include && make my_config.h

@ -0,0 +1,9 @@
@@ -420,6 +420,8 @@
unsigned long clientflag);
void STDCALL mysql_close(MYSQL *sock);
int STDCALL mysql_select_db(MYSQL *mysql, const char *db);
+int STDCALL mysql_select_db_start(int *ret, MYSQL *mysql, const char *db);
+int STDCALL mysql_select_db_cont(int *ret, MYSQL *mysql, int ready_status);
int STDCALL mysql_query(MYSQL *mysql, const char *q);
int STDCALL mysql_send_query(MYSQL *mysql, const char *q,
unsigned long length);

@ -35,9 +35,9 @@ class Query_Info {
class MySQL_Session
{
private:
bool handler___status_CHANGING_SCHEMA(PtrSize_t *);
// bool handler___status_CHANGING_SCHEMA(PtrSize_t *);
bool handler___status_CHANGING_USER_SERVER(PtrSize_t *);
bool handler___status_CHANGING_CHARSET(PtrSize_t *);
// bool handler___status_CHANGING_CHARSET(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrSize_t *);
// void handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *);

@ -82,6 +82,10 @@ class MySQL_Connection {
void connect_cont(short event);
void ping_start();
void ping_cont(short event);
void set_names_start();
void set_names_cont(short event);
void initdb_start();
void initdb_cont(short event);
MDB_ASYNC_ST handler(short event);
void next_event(MDB_ASYNC_ST new_st);
};

@ -21,7 +21,17 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine
ASYNC_PING_CONT,
ASYNC_PING_END,
ASYNC_PING_SUCCESSFUL,
ASYNC_PING_FAILED
ASYNC_PING_FAILED,
ASYNC_SET_NAMES_START,
ASYNC_SET_NAMES_CONT,
ASYNC_SET_NAMES_END,
ASYNC_SET_NAMES_SUCCESSFUL,
ASYNC_SET_NAMES_FAILED,
ASYNC_INITDB_START,
ASYNC_INITDB_CONT,
ASYNC_INITDB_END,
ASYNC_INITDB_SUCCESSFUL,
ASYNC_INITDB_FAILED
};
// list of possible debugging modules
@ -106,6 +116,8 @@ enum mysql_data_stream_status {
STATE_MARIADB_BEGIN, // dummy state
STATE_MARIADB_CONNECTING, // using MariaDB Client Library
STATE_MARIADB_PING,
STATE_MARIADB_SET_NAMES,
STATE_MARIADB_INITDB,
STATE_MARIADB_END, // dummy state
STATE_END

@ -1,6 +1,7 @@
#include "proxysql.h"
#include "cpp.h"
extern Query_Processor *GloQPro;
extern Query_Cache *GloQC;
extern ProxySQL_Admin *GloAdmin;
@ -503,6 +504,48 @@ __exit_DSS__STATE_NOT_INITIALIZED:
}
}
break;
case CHANGING_SCHEMA:
if (myds->revents) {
myconn->handler(myds->revents);
if (myconn->async_state_machine==ASYNC_INITDB_SUCCESSFUL) {
myds->DSS=STATE_READY;
status=WAITING_SERVER_DATA;
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
myds->PSarrayOUTpending->remove_index(0,&pkt2);
myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
myds->DSS=STATE_QUERY_SENT_DS;
}
}
if (myconn->async_state_machine==ASYNC_INITDB_FAILED) {
set_unhealthy();
myds->myconn->reusable=false;
return -1;
}
}
break;
case CHANGING_CHARSET:
if (myds->revents) {
myconn->handler(myds->revents);
if (myconn->async_state_machine==ASYNC_SET_NAMES_SUCCESSFUL) {
myds->DSS=STATE_READY;
status=WAITING_SERVER_DATA;
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
myds->PSarrayOUTpending->remove_index(0,&pkt2);
myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
myds->DSS=STATE_QUERY_SENT_DS;
}
}
if (myconn->async_state_machine==ASYNC_SET_NAMES_FAILED) {
set_unhealthy();
myds->myconn->reusable=false;
return -1;
}
}
break;
default:
assert(0);
break;
@ -543,11 +586,11 @@ __exit_DSS__STATE_NOT_INITIALIZED:
}
break;
case CHANGING_SCHEMA:
if (handler___status_CHANGING_SCHEMA(&pkt)==false) {
return -1;
}
break;
// case CHANGING_SCHEMA:
// if (handler___status_CHANGING_SCHEMA(&pkt)==false) {
// return -1;
// }
// break;
case CHANGING_USER_SERVER:
if (handler___status_CHANGING_USER_SERVER(&pkt)==false) {
@ -555,11 +598,11 @@ __exit_DSS__STATE_NOT_INITIALIZED:
}
break;
case CHANGING_CHARSET:
if (handler___status_CHANGING_CHARSET(&pkt)==false) {
return -1;
}
break;
// case CHANGING_CHARSET:
// if (handler___status_CHANGING_CHARSET(&pkt)==false) {
// return -1;
// }
// break;
case FAST_FORWARD:
client_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
@ -651,34 +694,34 @@ __exit_DSS__STATE_NOT_INITIALIZED:
}
bool MySQL_Session::handler___status_CHANGING_SCHEMA(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_SCHEMA - UNKNWON\n");
if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) {
l_free(pkt->size,pkt->ptr);
mybe->server_myds->DSS=STATE_READY;
//mybe->myconn=server_myds->myconn;
status=WAITING_SERVER_DATA;
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
}
// set prepared statement processing
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
return true;
} else {
l_free(pkt->size,pkt->ptr);
set_unhealthy();
//mybe->myconn=server_myds->myconn;
// if we reach here, server_myds->DSS should be STATE_QUERY_SENT , therefore the connection to the backend should be dropped anyway
// although we enforce this here
mybe->server_myds->myconn->reusable=false;
return false;
}
return false;
}
//bool MySQL_Session::handler___status_CHANGING_SCHEMA(PtrSize_t *pkt) {
// proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_SCHEMA - UNKNWON\n");
// if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) {
// l_free(pkt->size,pkt->ptr);
// mybe->server_myds->DSS=STATE_READY;
// //mybe->myconn=server_myds->myconn;
// status=WAITING_SERVER_DATA;
// unsigned int k;
// PtrSize_t pkt2;
// for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
// mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
// mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
// mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
// }
// // set prepared statement processing
// mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
// return true;
// } else {
// l_free(pkt->size,pkt->ptr);
// set_unhealthy();
// //mybe->myconn=server_myds->myconn;
// // if we reach here, server_myds->DSS should be STATE_QUERY_SENT , therefore the connection to the backend should be dropped anyway
// // although we enforce this here
// mybe->server_myds->myconn->reusable=false;
// return false;
// }
// return false;
//}
bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_USER_SERVER - UNKNWON\n");
@ -709,34 +752,34 @@ bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) {
return false;
}
bool MySQL_Session::handler___status_CHANGING_CHARSET(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_CHARSET - UNKNWON\n");
if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) {
l_free(pkt->size,pkt->ptr);
mybe->server_myds->DSS=STATE_READY;
//mybe->myconn=server_myds->myconn;
status=WAITING_SERVER_DATA;
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
}
// set prepared statement processing
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
return true;
} else {
l_free(pkt->size,pkt->ptr);
set_unhealthy();
//mybe->myconn=server_myds->myconn;
// if we reach here, server_myds->DSS should be STATE_QUERY_SENT , therefore the connection to the backend should be dropped anyway
// although we enforce this here
mybe->server_myds->myconn->reusable=false;
return false;
}
return false;
}
//bool MySQL_Session::handler___status_CHANGING_CHARSET(PtrSize_t *pkt) {
// proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: CHANGING_CHARSET - UNKNWON\n");
// if (mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size)==true) {
// l_free(pkt->size,pkt->ptr);
// mybe->server_myds->DSS=STATE_READY;
// //mybe->myconn=server_myds->myconn;
// status=WAITING_SERVER_DATA;
// unsigned int k;
// PtrSize_t pkt2;
// for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
// mybe->server_myds->PSarrayOUTpending->remove_index(0,&pkt2);
// mybe->server_myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
// mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
// }
// // set prepared statement processing
// mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
// return true;
// } else {
// l_free(pkt->size,pkt->ptr);
// set_unhealthy();
// //mybe->myconn=server_myds->myconn;
// // if we reach here, server_myds->DSS should be STATE_QUERY_SENT , therefore the connection to the backend should be dropped anyway
// // although we enforce this here
// mybe->server_myds->myconn->reusable=false;
// return false;
// }
// return false;
//}
//void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSize_t *pkt) {
@ -1379,20 +1422,29 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
void MySQL_Session::handler___client_DSS_QUERY_SENT___send_INIT_DB_to_backend() {
mybe->server_myds->move_from_OUT_to_OUTpending();
//userinfo_server.set_schemaname(userinfo_client.schemaname,strlen(userinfo_client.schemaname));
mybe->server_myds->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname));
//myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname);
mybe->server_myds->myprot.generate_COM_INIT_DB(true,NULL,NULL,mybe->server_myds->myconn->userinfo->schemaname);
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
status=CHANGING_SCHEMA;
mybe->server_myds->DSS=STATE_MARIADB_INITDB;
mybe->server_myds->myconn->async_state_machine=ASYNC_INITDB_START;
mybe->server_myds->myconn->handler(0);
// mybe->server_myds->move_from_OUT_to_OUTpending();
// //userinfo_server.set_schemaname(userinfo_client.schemaname,strlen(userinfo_client.schemaname));
// mybe->server_myds->myconn->userinfo->set_schemaname(client_myds->myconn->userinfo->schemaname,strlen(client_myds->myconn->userinfo->schemaname));
// //myprot_server.generate_COM_INIT_DB(true,NULL,NULL,userinfo_server.schemaname);
// mybe->server_myds->myprot.generate_COM_INIT_DB(true,NULL,NULL,mybe->server_myds->myconn->userinfo->schemaname);
// mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
// status=CHANGING_SCHEMA;
}
void MySQL_Session::handler___client_DSS_QUERY_SENT___send_SET_NAMES_to_backend() {
mybe->server_myds->move_from_OUT_to_OUTpending();
mybe->server_myds->myconn->set_charset(client_myds->myconn->options.charset);
mybe->server_myds->myprot.generate_COM_QUERY(true,NULL,NULL,(char *)"SET NAMES utf8");
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
status=CHANGING_SCHEMA;
// mybe->server_myds->myprot.generate_COM_QUERY(true,NULL,NULL,(char *)"SET NAMES utf8");
// mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
mybe->server_myds->DSS=STATE_MARIADB_SET_NAMES;
mybe->server_myds->myconn->async_state_machine=ASYNC_SET_NAMES_START;
mybe->server_myds->myconn->handler(0);
status=CHANGING_CHARSET;
}
void MySQL_Session::handler___client_DSS_QUERY_SENT___send_CHANGE_USER_to_backend() {

@ -2,6 +2,10 @@
#include "cpp.h"
#include "SpookyV2.h"
// Bug https://mariadb.atlassian.net/browse/CONC-136
//int STDCALL mysql_select_db_start(int *ret, MYSQL *mysql, const char *db);
//int STDCALL mysql_select_db_cont(int *ret, MYSQL *mysql, int ready_status);
/*
void * MySQL_Connection::operator new(size_t size) {
return l_alloc(size);
@ -239,6 +243,23 @@ void MySQL_Connection::ping_cont(short event) {
async_exit_status = mysql_ping_cont(&interr,mysql, mysql_status(event));
}
void MySQL_Connection::initdb_start() {
async_exit_status = mysql_select_db_start(&interr,mysql,userinfo->schemaname);
}
void MySQL_Connection::initdb_cont(short event) {
async_exit_status = mysql_select_db_cont(&interr,mysql, mysql_status(event));
}
// FIXME: UTF8 is hardcoded for now, needs to be dynamic
void MySQL_Connection::set_names_start() {
async_exit_status = mysql_set_character_set_start(&interr,mysql,"UTF8");
}
void MySQL_Connection::set_names_cont(short event) {
async_exit_status = mysql_set_character_set_cont(&interr,mysql, mysql_status(event));
}
#define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0)
MDB_ASYNC_ST MySQL_Connection::handler(short event) {
@ -304,6 +325,62 @@ handler_again:
break;
case ASYNC_PING_FAILED:
break;
case ASYNC_SET_NAMES_START:
set_names_start();
if (async_exit_status) {
next_event(ASYNC_SET_NAMES_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_END);
}
break;
case ASYNC_SET_NAMES_CONT:
set_names_cont(event);
if (async_exit_status) {
next_event(ASYNC_SET_NAMES_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_END);
}
break;
case ASYNC_SET_NAMES_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_FAILED);
} else {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_SUCCESSFUL);
}
break;
case ASYNC_SET_NAMES_SUCCESSFUL:
break;
case ASYNC_SET_NAMES_FAILED:
fprintf(stderr,"%s\n",mysql_error(mysql));
break;
case ASYNC_INITDB_START:
initdb_start();
if (async_exit_status) {
next_event(ASYNC_INITDB_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_INITDB_END);
}
break;
case ASYNC_INITDB_CONT:
initdb_cont(event);
if (async_exit_status) {
next_event(ASYNC_INITDB_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_INITDB_END);
}
break;
case ASYNC_INITDB_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_INITDB_FAILED);
} else {
NEXT_IMMEDIATE(ASYNC_INITDB_SUCCESSFUL);
}
break;
case ASYNC_INITDB_SUCCESSFUL:
break;
case ASYNC_INITDB_FAILED:
fprintf(stderr,"%s\n",mysql_error(mysql));
break;
default:
assert(0); //we should never reach here
break;

Loading…
Cancel
Save