Further attempt to use MariaDB Client Library

pull/317/head
René Cannaò 11 years ago
parent f725182050
commit 6c44e9d96f

@ -84,6 +84,11 @@ class MySQL_Data_Stream
bool net_failure;
struct {
char *ptr;
unsigned int size;
} mysql_real_query;
MySQL_Data_Stream();
~MySQL_Data_Stream();

@ -65,7 +65,7 @@ class MySQL_Protocol {
bool generate_pkt_column_count(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, uint64_t count);
// bool generate_pkt_field(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue);
bool generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue);
bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, int *fieldslen, char **fieldstxt);
bool generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt);
// bool generate_pkt_initial_handshake(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len);
bool generate_pkt_initial_handshake(bool send, void **ptr, unsigned int *len);
// bool generate_pkt_handshake_response(MySQL_Data_Stream *myds, bool send, void **ptr, unsigned int *len);

@ -129,6 +129,7 @@ class MySQL_Session
MySQL_Backend * find_or_create_backend(int, MySQL_Data_Stream *_myds=NULL);
void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *);
void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MYSQL_RES *result, MySQL_Protocol *myprot);
SQLite3_result * SQL3_Session_status();
void reset_all_backends();

@ -41,6 +41,11 @@ class MySQL_Connection {
MDB_ASYNC_ST async_state_machine; // Async state machine
MYSQL *mysql;
MYSQL *ret_mysql;
MYSQL_RES *mysql_result;
struct {
char *ptr;
unsigned long length;
} query;
struct {
uint32_t max_allowed_pkt;
uint32_t server_capabilities;
@ -84,8 +89,13 @@ class MySQL_Connection {
void ping_cont(short event);
void set_names_start();
void set_names_cont(short event);
void real_query_start();
void real_query_cont(short event);
void store_result_start();
void store_result_cont(short event);
void initdb_start();
void initdb_cont(short event);
void set_query(char *stmt, unsigned long length);
MDB_ASYNC_ST handler(short event);
void next_event(MDB_ASYNC_ST new_st);
};

@ -27,6 +27,11 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine
ASYNC_SET_NAMES_END,
ASYNC_SET_NAMES_SUCCESSFUL,
ASYNC_SET_NAMES_FAILED,
ASYNC_QUERY_START,
ASYNC_QUERY_CONT,
ASYNC_QUERY_END,
ASYNC_STORE_RESULT_START,
ASYNC_STORE_RESULT_CONT,
ASYNC_INITDB_START,
ASYNC_INITDB_CONT,
ASYNC_INITDB_END,
@ -82,6 +87,7 @@ enum session_status {
PINGING_SERVER,
WAITING_CLIENT_DATA,
WAITING_SERVER_DATA,
PROCESSING_QUERY,
CHANGING_SCHEMA,
CHANGING_CHARSET,
CHANGING_USER_CLIENT,
@ -118,6 +124,7 @@ enum mysql_data_stream_status {
STATE_MARIADB_PING,
STATE_MARIADB_SET_NAMES,
STATE_MARIADB_INITDB,
STATE_MARIADB_QUERY,
STATE_MARIADB_END, // dummy state
STATE_END

@ -1102,7 +1102,7 @@ bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len
}
bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, int *fieldslen, char **fieldstxt) {
bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) {
int col=0;
int rowlen=0;
for (col=0; col<colnums; col++) {

@ -1,6 +1,7 @@
#include "proxysql.h"
#include "cpp.h"
#define EXPMARIA
extern Query_Processor *GloQPro;
extern Query_Cache *GloQC;
@ -292,7 +293,18 @@ int MySQL_Session::handler() {
//if (server_myds!=mybe->server_myds) {
// server_myds=mybe->server_myds;
//}
#ifdef EXPMARIA
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
mybe->server_myds->mysql_real_query.size=pkt.size-5;
mybe->server_myds->mysql_real_query.ptr=(char *)malloc(pkt.size-5);
memcpy(mybe->server_myds->mysql_real_query.ptr,(char *)pkt.ptr+5,pkt.size-5);
l_free(pkt.size,pkt.ptr);
#else
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
#endif /* EXPMARIA */
client_myds->setDSS_STATE_QUERY_SENT_NET();
} else {
// this is processed by the admin module
@ -365,6 +377,7 @@ __get_a_backend:
if (status!=FAST_FORWARD && client_myds->DSS==STATE_QUERY_SENT_NET) {
// the client has completely sent the query, now we should handle it server side
//
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET\n", this);
if (mybe && mybe->server_myds->DSS==STATE_NOT_INITIALIZED) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, client_myds->DSS==STATE_QUERY_SENT_NET , server_myds==STATE_NOT_INITIALIZED\n", this);
// DSS is STATE_NOT_INITIALIZED. It means we are not connected to any server
@ -423,17 +436,30 @@ __get_a_backend:
handler___client_DSS_QUERY_SENT___send_INIT_DB_to_backend();
}
} else {
#ifndef EXPMARIA
if (client_myds->myconn->options.charset!=mybe->server_myds->myconn->options.charset /* FIXME: this was for debugging only */ /*|| rand()%3==0 */) {
handler___client_DSS_QUERY_SENT___send_SET_NAMES_to_backend();
} else {
#endif
//server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
#ifdef EXPMARIA
MySQL_Data_Stream *myds=mybe->server_myds;
myds->DSS=STATE_MARIADB_QUERY;
status=PROCESSING_QUERY;
myds->myconn->async_state_machine=ASYNC_QUERY_START;
myds->myconn->set_query(myds->mysql_real_query.ptr,myds->mysql_real_query.size);
myds->myconn->handler(0);
#else
mybe->server_myds->DSS=STATE_QUERY_SENT_DS;
// if (client_myds->myconn->processing_prepared_statement) {
mybe->server_myds->myconn->processing_prepared_statement_prepare=client_myds->myconn->processing_prepared_statement_prepare;
mybe->server_myds->myconn->processing_prepared_statement_execute=client_myds->myconn->processing_prepared_statement_execute;
// }
status=WAITING_SERVER_DATA;
#endif /* EXPMARIA */
#ifndef EXPMARIA
}
#endif
}
}
// } TRY #1
@ -485,6 +511,33 @@ __exit_DSS__STATE_NOT_INITIALIZED:
// }
}
break;
case PROCESSING_QUERY:
if (myds->revents) {
myconn->handler(myds->revents);
if (myconn->async_state_machine==ASYNC_QUERY_END) {
// status=WAITING_SERVER_DATA;
// myds->DSS=STATE_READY;
/* multi-plexing attempt */
if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
myds->myconn->last_time_used=thread->curtime;
MyHGM->push_MyConn_to_pool(myds->myconn);
//MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn);
//mybe->server_myds->myconn=NULL;
myds->detach_connection();
myds->unplug_backend();
}
/* multi-plexing attempt */
//status=NONE;
MySQL_Result_to_MySQL_wire(myconn->mysql,myconn->mysql_result,&client_myds->myprot);
mysql_free_result(myconn->mysql_result);
myconn->mysql_result=NULL;
myds->DSS=STATE_NOT_INITIALIZED;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
}
}
break;
case PINGING_SERVER:
if (myds->revents) {
myconn->handler(myds->revents);
@ -529,8 +582,16 @@ __exit_DSS__STATE_NOT_INITIALIZED:
if (myds->revents) {
myconn->handler(myds->revents);
if (myconn->async_state_machine==ASYNC_SET_NAMES_SUCCESSFUL) {
#ifdef EXPMARIA
myds->DSS=STATE_MARIADB_QUERY;
status=PROCESSING_QUERY;
myds->myconn->async_state_machine=ASYNC_QUERY_START;
myds->myconn->set_query(myds->mysql_real_query.ptr,myds->mysql_real_query.size);
myds->myconn->handler(0);
#else
myds->DSS=STATE_READY;
status=WAITING_SERVER_DATA;
#endif /* EXPMARIA */
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
@ -1456,6 +1517,64 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___send_CHANGE_USER_to_backen
}
void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MYSQL_RES *result, MySQL_Protocol *myprot) {
assert(myprot);
MySQL_Data_Stream *myds=myprot->get_myds();
myds->DSS=STATE_QUERY_SENT_DS;
int sid=1;
unsigned int num_fields=mysql_field_count(mysql);
unsigned int num_rows;
if (result) {
// we have a result set, this should be a SELECT statement with result
assert(result->current_field==0);
myprot->generate_pkt_column_count(true,NULL,NULL,sid,num_fields); sid++;
for (unsigned int i=0; i<num_fields; i++) {
MYSQL_FIELD *field=mysql_fetch_field(result);
myprot->generate_pkt_field(true,NULL,NULL,sid,field->db,field->table,field->org_table,field->name,field->org_name,field->charsetnr,field->length,field->type,field->flags,field->decimals,false,0,NULL);
sid++;
}
myds->DSS=STATE_COLUMN_DEFINITION;
num_rows=mysql_num_rows(result);
myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,0); sid++;
//char **p=(char **)malloc(sizeof(char*)*num_fields);
//int *l=(int *)malloc(sizeof(int*)*num_fields);
//p[0]="column test";
for (unsigned int r=0; r<num_rows; r++) {
MYSQL_ROW row=mysql_fetch_row(result);
unsigned long *lengths=mysql_fetch_lengths(result);
//
// for (int i=0; i<num_fields; i++) {
// l[i]=result->rows[r]->sizes[i];
// p[i]=result->rows[r]->fields[i];
// }
myprot->generate_pkt_row(true,NULL,NULL,sid,num_fields,lengths,row); sid++;
}
myds->DSS=STATE_ROW;
myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,2); sid++;
myds->DSS=STATE_SLEEP;
//free(l);
//free(p);
} else { // no result set
if (num_fields) {
num_rows = mysql_affected_rows(mysql);
myprot->generate_pkt_OK(true,NULL,NULL,sid,num_rows,mysql->insert_id,0,mysql->warning_count,mysql->info);
} else {
// error
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(mysql));
myprot->generate_pkt_ERR(true,NULL,NULL,sid,mysql_errno(mysql),sqlstate,mysql_error(mysql));
}
// if (error) {
// // there was an error
// myprot->generate_pkt_ERR(true,NULL,NULL,sid,1045,(char *)"#28000",error);
// } else {
// // no error, DML succeeded
// myprot->generate_pkt_OK(true,NULL,NULL,sid,affected_rows,0,0,0,NULL);
// }
// myds->DSS=STATE_SLEEP;
}
}
void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int affected_rows, MySQL_Protocol *myprot) {
assert(myprot);
MySQL_Data_Stream *myds=myprot->get_myds();
@ -1472,7 +1591,7 @@ void MySQL_Session::SQLite3_to_MySQL(SQLite3_result *result, char *error, int af
myprot->generate_pkt_EOF(true,NULL,NULL,sid,0,0); sid++;
char **p=(char **)malloc(sizeof(char*)*result->columns);
int *l=(int *)malloc(sizeof(int*)*result->columns);
unsigned long *l=(unsigned long *)malloc(sizeof(unsigned long *)*result->columns);
//p[0]="column test";
for (int r=0; r<result->rows_count; r++) {
for (int i=0; i<result->columns; i++) {

File diff suppressed because it is too large Load Diff

@ -260,6 +260,27 @@ void MySQL_Connection::set_names_cont(short event) {
async_exit_status = mysql_set_character_set_cont(&interr,mysql, mysql_status(event));
}
void MySQL_Connection::set_query(char *stmt, unsigned long length) {
query.ptr=stmt;
query.length=length;
}
void MySQL_Connection::real_query_start() {
async_exit_status = mysql_real_query_start(&interr , mysql, query.ptr, query.length);
}
void MySQL_Connection::real_query_cont(short event) {
async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event));
}
void MySQL_Connection::store_result_start() {
async_exit_status = mysql_store_result_start(&mysql_result, mysql);
}
void MySQL_Connection::store_result_cont(short event) {
async_exit_status = mysql_store_result_cont(&mysql_result , mysql , mysql_status(event));
}
#define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0)
MDB_ASYNC_ST MySQL_Connection::handler(short event) {
@ -325,6 +346,48 @@ handler_again:
break;
case ASYNC_PING_FAILED:
break;
case ASYNC_QUERY_START:
real_query_start();
if (async_exit_status) {
next_event(ASYNC_QUERY_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
}
break;
case ASYNC_QUERY_CONT:
real_query_cont(event);
if (async_exit_status) {
next_event(ASYNC_QUERY_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
}
break;
case ASYNC_STORE_RESULT_START:
if (mysql_errno(mysql)) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
store_result_start();
if (async_exit_status) {
next_event(ASYNC_STORE_RESULT_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
break;
case ASYNC_STORE_RESULT_CONT:
store_result_cont(event);
if (async_exit_status) {
next_event(ASYNC_STORE_RESULT_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
break;
case ASYNC_QUERY_END:
// if (interr) {i
// NEXT_IMMEDIATE(ASYNC_PING_FAILED);
// } else {
// NEXT_IMMEDIATE(ASYNC_PING_SUCCESSFUL);
// }
break;
case ASYNC_SET_NAMES_START:
set_names_start();
if (async_exit_status) {

@ -95,6 +95,9 @@ MySQL_Data_Stream::MySQL_Data_Stream() {
pkts_recv=0;
pkts_sent=0;
mysql_real_query.ptr=NULL;
mysql_real_query.size=0;
timeout=0;
connect_tries=0;
poll_fds_idx=-1;
@ -255,7 +258,8 @@ int MySQL_Data_Stream::read_from_net() {
//proxy_error("read %d bytes from fd %d into a buffer of %d bytes free\n", r, fd, s);
if (r < 1) {
if (encrypted==false) {
if (r==0 || (r==-1 && errno != EINTR && errno != EAGAIN)) {
int myds_errno=errno;
if (r==0 || (r==-1 && myds_errno != EINTR && myds_errno != EAGAIN)) {
shut_soft();
}
} else {

@ -18,7 +18,7 @@ admin_variables=
mysql_variables=
{
threads=2
threads=1
//threads=32
have_compress=true
poll_timeout=2000

Loading…
Cancel
Save