Merge branch 'PrepStmtExec' into 1.3.0-alpha

Conflicts:
	include/MySQL_Protocol.h
	include/mysql_connection.h
	lib/Makefile
	lib/MySQL_Session.cpp
pull/739/head
René Cannaò 10 years ago
commit febd61bc9e

@ -0,0 +1,116 @@
#ifndef CLASS_MYSQL_PREPARED_STATEMENT_H
#define CLASS_MYSQL_PREPARED_STATEMENT_H
#include "proxysql.h"
#include "cpp.h"
/*
One of the main challenge in handling prepared statement (PS) is that a single
PS could be executed on multiple backends, and on each backend it could have a
different stmt_id.
For this reason ProxySQL returns to the client a stmt_id generated by the proxy
itself, and internally maps client's stmt_id with the backend stmt_id.
The implementation in ProxySQL is, simplified, the follow:
* when a client sends a MYSQL_COM_STMT_PREPARE, ProxySQL executes it to one of
the backend
* the backend returns a stmt_id. This stmt_id is NOT returned to the client. The
stmt_id returned from the backend is stored in MySQL_STMTs_local(), and
MySQL_STMTs_local() is responsible for mapping the connection's MYSQL_STMT
and a global_stmt_id
* the global_stmt_id is the stmt_id returned to the client
* the global_stmt_id is used to locate the relevant MySQL_STMT_Global_info() in
MySQL_STMT_Manager()
* MySQL_STMT_Global_info() stores all metadata associated with a PS
* MySQL_STMT_Manager() is responsible for storing all MySQL_STMT_Global_info()
in global structures accessible and shareble by all threads.
To summarie the most important classes:
* MySQL_STMT_Global_info() stores all metadata associated with a PS
* MySQL_STMT_Manager() stores all the MySQL_STMT_Global_info(), indexes using
a global_stmt_id that iis the stmt_id generated by ProxySQL and returned to
the client
* MySQL_STMTs_local() associate PS located in a backend connection to a
global_stmt_id
*/
// class MySQL_STMTs_local associates a global statement ID with a local statement ID for a specific connection
class MySQL_STMTs_local {
private:
unsigned int num_entries;
std::map<uint32_t, MYSQL_STMT *> m;
public:
MySQL_STMTs_local() {
num_entries=0;
}
~MySQL_STMTs_local();
// we declare it here to be inline
void insert(uint32_t global_statement_id, MYSQL_STMT *stmt) {
std::pair<std::map<uint32_t, MYSQL_STMT *>::iterator,bool> ret;
ret=m.insert(std::make_pair(global_statement_id, stmt));
if (ret.second==true) {
num_entries++;
}
}
// we declare it here to be inline
MYSQL_STMT * find(uint32_t global_statement_id) {
auto s=m.find(global_statement_id);
if (s!=m.end()) { // found
return s->second;
}
return NULL; // not found
}
bool erase(uint32_t global_statement_id);
uint64_t compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length);
};
// class MySQL_STMT_Global_info represents information about a MySQL Prepared Statement
// it is an internal representation of prepared statement
// it include all metadata associated with it
class MySQL_STMT_Global_info {
private:
void compute_hash();
public:
uint64_t hash;
char *username;
char *schemaname;
char *query;
unsigned int query_length;
unsigned int hostgroup_id;
int ref_count;
uint32_t statement_id;
uint16_t num_columns;
uint16_t num_params;
uint16_t warning_count;
MYSQL_FIELD **fields;
struct {
int cache_ttl;
int timeout;
int delay;
} properties;
//MYSQL_BIND **params; // seems unused
MySQL_STMT_Global_info(uint32_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h);
~MySQL_STMT_Global_info();
};
class MySQL_STMT_Manager {
private:
uint32_t next_statement_id;
rwlock_t rwlock;
std::map<uint32_t, MySQL_STMT_Global_info *> m; // map using statement id
std::map<uint64_t, MySQL_STMT_Global_info *> h; // map using hashes
public:
MySQL_STMT_Manager();
~MySQL_STMT_Manager();
int ref_count(uint32_t statement_id, int cnt, bool lock=true);
MySQL_STMT_Global_info * add_prepared_statement(unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock=true);
MySQL_STMT_Global_info * add_prepared_statement(unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock=true);
MySQL_STMT_Global_info * find_prepared_statement_by_stmt_id(uint32_t id, bool lock=true);
MySQL_STMT_Global_info * find_prepared_statement_by_hash(uint64_t hash, bool lock=true);
uint32_t total_prepared_statements() { return next_statement_id-1; }
};
#endif /* CLASS_MYSQL_PREPARED_STATEMENT_H */

@ -6,6 +6,31 @@
#define RESULTSET_BUFLEN 16300
class stmt_execute_metadata_t {
public:
uint32_t stmt_id;
uint8_t flags;
uint16_t num_params;
MYSQL_BIND *binds;
my_bool *is_nulls;
unsigned long *lengths;
void *pkt;
stmt_execute_metadata_t() {
binds=NULL;
is_nulls=NULL;
lengths=NULL;
pkt=NULL;
}
~stmt_execute_metadata_t() {
if (binds)
free(binds);
if (is_nulls)
free(is_nulls);
if (lengths)
free(lengths);
}
};
class MySQL_ResultSet {
private:
public:
@ -20,9 +45,10 @@ class MySQL_ResultSet {
unsigned int num_rows;
unsigned long long resultset_size;
PtrSizeArray *PSarrayOUT;
MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my);
MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my, MYSQL_STMT *_stmt=NULL);
~MySQL_ResultSet();
unsigned int add_row(MYSQL_ROW row);
unsigned int add_row2(MYSQL_ROWS *row, unsigned char *offset);
void add_eof();
bool get_resultset(PtrSizeArray *PSarrayFinal);
unsigned char *buffer;
@ -100,5 +126,10 @@ class MySQL_Protocol {
bool process_pkt_COM_QUERY(unsigned char *pkt, unsigned int len);
bool process_pkt_COM_CHANGE_USER(unsigned char *pkt, unsigned int len);
void * Query_String_to_packet(uint8_t sid, std::string *s, unsigned int *l);
// prepared statements
bool generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info);
stmt_execute_metadata_t * get_binds_from_pkt(void *ptr, unsigned int size, uint16_t num_params);
};
#endif /* __CLASS_MYSQL_PROTOCOL_H */

@ -23,6 +23,10 @@ class Query_Info {
unsigned long long start_time;
unsigned long long end_time;
MYSQL_STMT *mysql_stmt;
stmt_execute_metadata_t *stmt_meta;
uint32_t stmt_global_id;
int QueryLength;
enum MYSQL_COM_QUERY_command MyComQueryCmd;
@ -60,7 +64,7 @@ class MySQL_Session
void handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_SET_OPTION(PtrSize_t *);
void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STATISTICS(PtrSize_t *);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *, bool ps=false);
void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection();
@ -71,6 +75,8 @@ class MySQL_Session
// void return_MySQL_Connection_To_Poll(MySQL_Data_Stream *);
// MySQL_STMT_Manager *Session_STMT_Manager;
public:
void * operator new(size_t);
void operator delete(void *);
@ -149,6 +155,7 @@ class MySQL_Session
void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *);
void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS);
void MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt);
unsigned int NumActiveTransactions();
int FindOneActiveTransaction();
unsigned long long IdleTime();

@ -161,6 +161,9 @@ class MySQL_Thread
// in this way, there is no need for atomic operation and there is no cache miss
// when it is needed a total, all threads are checked
struct {
unsigned long long stmt_prepare;
unsigned long long stmt_execute;
unsigned long long stmt_close;
unsigned long long queries;
unsigned long long queries_slow;
unsigned long long queries_backends_bytes_sent;
@ -289,6 +292,7 @@ class MySQL_Threads_Handler
bool have_compress;
bool client_found_rows;
bool multiplexing;
// bool stmt_multiplexing;
bool enforce_autocommit_on_reads;
int max_allowed_packet;
int max_transaction_time;
@ -351,6 +355,9 @@ class MySQL_Threads_Handler
SQLite3_result * SQL3_Processlist();
SQLite3_result * SQL3_GlobalStatus();
bool kill_session(uint32_t _thread_session_id);
unsigned long long get_total_stmt_prepare();
unsigned long long get_total_stmt_execute();
unsigned long long get_total_stmt_close();
unsigned long long get_total_queries();
unsigned long long get_slow_queries();
unsigned long long get_queries_backends_bytes_recv();

@ -18,6 +18,7 @@
#include "proxysql_admin.h"
#include "MySQL_HostGroups_Manager.h"
#include "MySQL_Logger.hpp"
#include "MySQL_PreparedStatement.h"
#undef swap
#undef min
#undef max

@ -49,12 +49,15 @@ class MySQL_Connection {
struct {
unsigned long length;
char *ptr;
MYSQL_STMT *stmt;
stmt_execute_metadata_t *stmt_meta;
} query;
char scramble_buff[40];
unsigned long long creation_time;
unsigned long long last_time_used;
unsigned long long timeout;
int fd;
MySQL_STMTs_local *local_stmts; // local view of prepared statements
MYSQL *mysql;
MYSQL *ret_mysql;
MYSQL_RES *mysql_result;
@ -125,9 +128,18 @@ class MySQL_Connection {
int async_select_db(short event);
int async_set_autocommit(short event, bool);
int async_set_names(short event, uint8_t nr);
int async_query(short event, char *stmt, unsigned long length);
int async_send_simple_command(short event, char *stmt, unsigned long length); // no result set expected
int async_query(short event, char *stmt, unsigned long length, MYSQL_STMT **_stmt=NULL, stmt_execute_metadata_t *_stmt_meta=NULL);
int async_ping(short event);
void stmt_prepare_start();
void stmt_prepare_cont(short event);
void stmt_execute_start();
void stmt_execute_cont(short event);
void stmt_execute_store_result_start();
void stmt_execute_store_result_cont(short event);
void async_free_result();
bool IsActiveTransaction();
bool IsAutoCommit();

@ -61,6 +61,16 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine
ASYNC_INITDB_END,
ASYNC_INITDB_SUCCESSFUL,
ASYNC_INITDB_FAILED,
ASYNC_STMT_PREPARE_START,
ASYNC_STMT_PREPARE_CONT,
ASYNC_STMT_PREPARE_END,
ASYNC_STMT_PREPARE_SUCCESSFUL,
ASYNC_STMT_PREPARE_FAILED,
ASYNC_STMT_EXECUTE_START,
ASYNC_STMT_EXECUTE_CONT,
ASYNC_STMT_EXECUTE_STORE_RESULT_START,
ASYNC_STMT_EXECUTE_STORE_RESULT_CONT,
ASYNC_STMT_EXECUTE_END,
ASYNC_IDLE
};
@ -121,6 +131,8 @@ enum session_status {
CHANGING_USER_SERVER,
SETTING_INIT_CONNECT,
FAST_FORWARD,
PROCESSING_STMT_PREPARE,
PROCESSING_STMT_EXECUTE,
NONE
};
@ -252,7 +264,7 @@ typedef struct _proxysql_mysql_thread_t proxysql_mysql_thread_t;
typedef struct { char * table_name; char * table_def; } table_def_t;
typedef struct __SQP_query_parser_t SQP_par_t;
//typedef struct _mysql_server_t mysql_server_t;
//typedef struct _stmt_execute_metadata_t stmt_execute_metadata_t;
#endif /* PROXYSQL_TYPEDEFS */
//#ifdef __cplusplus
@ -282,6 +294,7 @@ class ProxySQL_ConfigFile;
class Query_Info;
//class MySQL_Server;
class SQLite3_result;
class stmt_execute_metadata_t;
//class MySQL_Servers;
//class MySQL_Hostgroup_Entry;
//class MySQL_Hostgroup;
@ -546,7 +559,17 @@ struct _mysql_session_t {
};
/*
struct _stmt_execute_metadata_t {
uint32_t stmt_id;
uint8_t flags;
uint16_t num_params;
MYSQL_BIND *binds;
my_bool *is_nulls;
unsigned long *lengths;
void *pkt;
};
*/
#endif /* PROXYSQL_STRUCTS */
@ -730,6 +753,7 @@ __thread int mysql_thread___poll_timeout_on_failure;
__thread bool mysql_thread___have_compress;
__thread bool mysql_thread___client_found_rows;
__thread bool mysql_thread___multiplexing;
// __thread bool mysql_thread___stmt_multiplexing;
__thread bool mysql_thread___enforce_autocommit_on_reads;
__thread bool mysql_thread___servers_stats;
__thread bool mysql_thread___commands_stats;
@ -805,6 +829,7 @@ extern __thread int mysql_thread___poll_timeout_on_failure;
extern __thread bool mysql_thread___have_compress;
extern __thread bool mysql_thread___client_found_rows;
extern __thread bool mysql_thread___multiplexing;
// extern __thread bool mysql_thread___stmt_multiplexing;
extern __thread bool mysql_thread___enforce_autocommit_on_reads;
extern __thread bool mysql_thread___servers_stats;
extern __thread bool mysql_thread___commands_stats;

@ -47,7 +47,7 @@ default: libproxysql.a
_OBJ = c_tokenizer.o
OBJ = $(patsubst %,$(ODIR)/%,$(_OBJ))
_OBJ_CPP = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo simple_kv.oo sqlite3db.oo global_variables.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo
_OBJ_CPP = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo simple_kv.oo sqlite3db.oo global_variables.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo
OBJ_CPP = $(patsubst %,$(ODIR)/%,$(_OBJ_CPP))
%.ko: %.cpp

@ -0,0 +1,281 @@
#include "proxysql.h"
#include "cpp.h"
#include "SpookyV2.h"
extern MySQL_STMT_Manager *GloMyStmt;
static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length) {
int l=0;
l+=sizeof(hostgroup);
l+=strlen(user);
l+=strlen(schema);
// two random seperators
#define _COMPUTE_HASH_DEL1_ "-ujhtgf76y576574fhYTRDFwdt-"
#define _COMPUTE_HASH_DEL2_ "-8k7jrhtrgJHRgrefgreRFewg6-"
l+=strlen(_COMPUTE_HASH_DEL1_);
l+=strlen(_COMPUTE_HASH_DEL2_);
l+=query_length;
char *buf=(char *)malloc(l);
l=0;
// write hostgroup
memcpy(buf,&hostgroup,sizeof(hostgroup));
l+=sizeof(hostgroup);
// write user
strcpy(buf+l,user);
l+=strlen(user);
// write delimiter1
strcpy(buf+l,_COMPUTE_HASH_DEL1_);
l+=strlen(_COMPUTE_HASH_DEL1_);
// write schema
strcpy(buf+l,schema);
l+=strlen(schema);
// write delimiter2
strcpy(buf+l,_COMPUTE_HASH_DEL2_);
l+=strlen(_COMPUTE_HASH_DEL2_);
// write query
memcpy(buf+l,query,query_length);
l+=query_length;
uint64_t hash=SpookyHash::Hash64(buf,l,0);
free(buf);
return hash;
}
void MySQL_STMT_Global_info::compute_hash() {
hash=stmt_compute_hash(hostgroup_id, username, schemaname, query, query_length);
}
uint64_t MySQL_STMTs_local::compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length){
uint64_t hash;
hash=stmt_compute_hash(hostgroup, user, schema, query, query_length);
return hash;
}
MySQL_STMTs_local::~MySQL_STMTs_local() {
// Note: we do not free the prepared statements because we assume that
// if we call this destructor the connection is being destroyed anyway
for (std::map<uint32_t, MYSQL_STMT *>::iterator it=m.begin(); it!=m.end(); ++it) {
uint32_t stmt_id=it->first;
GloMyStmt->ref_count(stmt_id,-1);
}
m.erase(m.begin(),m.end());
}
bool MySQL_STMTs_local::erase(uint32_t global_statement_id) {
auto s=m.find(global_statement_id);
if (s!=m.end()) { // found
if (num_entries>1000) {
MYSQL_STMT *stmt=s->second;
mysql_stmt_close(stmt);
m.erase(s);
num_entries--;
return true; // we truly removed the prepared statement
}
}
return false; // we don't really remove the prepared statement
}
MySQL_STMT_Manager::MySQL_STMT_Manager() {
spinlock_rwlock_init(&rwlock);
next_statement_id=1; // we initialize this as 1 because we 0 is not allowed
}
MySQL_STMT_Manager::~MySQL_STMT_Manager() {
for (std::map<uint32_t, MySQL_STMT_Global_info *>::iterator it=m.begin(); it!=m.end(); ++it) {
MySQL_STMT_Global_info *a=it->second;
delete a;
}
m.erase(m.begin(),m.end());
// we do not loop in h because all the MySQL_STMT_Global_info() were already deleted
h.erase(h.begin(),h.end());
}
int MySQL_STMT_Manager::ref_count(uint32_t statement_id, int cnt, bool lock) {
int ret=-1;
if (lock) {
spin_wrlock(&rwlock);
}
auto s = m.find(statement_id);
if (s!=m.end()) {
MySQL_STMT_Global_info *a=s->second;
a->ref_count+=cnt;
ret=a->ref_count;
}
if (lock) {
spin_wrunlock(&rwlock);
}
return ret;
}
MySQL_STMT_Global_info * MySQL_STMT_Manager::add_prepared_statement(unsigned int _h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock) {
return add_prepared_statement(_h, u, s, q, ql, stmt, -1, -1, -1, lock);
}
MySQL_STMT_Global_info * MySQL_STMT_Manager::add_prepared_statement(unsigned int _h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock) {
MySQL_STMT_Global_info *ret=NULL;
uint64_t hash=stmt_compute_hash(_h, u, s, q, ql); // this identifies the prepared statement
if (lock) {
spin_wrlock(&rwlock);
}
// try to find the statement
auto f = h.find(hash);
if (f!=h.end()) {
// found it!
//MySQL_STMT_Global_info *a=f->second;
//ret=a->statement_id;
ret=f->second;
} else {
// we need to create a new one
MySQL_STMT_Global_info *a=new MySQL_STMT_Global_info(next_statement_id,_h,u,s,q,ql,stmt,hash);
a->properties.cache_ttl=_cache_ttl;
a->properties.timeout=_timeout;
a->properties.delay=_delay;
// insert it in both maps
m.insert(std::make_pair(a->statement_id, a));
h.insert(std::make_pair(a->hash, a));
//ret=a->statement_id;
ret=a;
next_statement_id++; // increment it
}
if (lock) {
spin_wrunlock(&rwlock);
}
return ret;
}
MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_stmt_id(uint32_t id, bool lock) {
MySQL_STMT_Global_info *ret=NULL; // assume we do not find it
if (lock) {
spin_wrlock(&rwlock);
}
auto s=m.find(id);
if (s!=m.end()) {
ret=s->second;
}
if (lock) {
spin_wrunlock(&rwlock);
}
return ret;
}
MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_hash(uint64_t hash, bool lock) {
MySQL_STMT_Global_info *ret=NULL; // assume we do not find it
if (lock) {
spin_wrlock(&rwlock);
}
auto s=h.find(hash);
if (s!=h.end()) {
ret=s->second;
}
if (lock) {
spin_wrunlock(&rwlock);
}
return ret;
}
MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint32_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h) {
statement_id=id;
hostgroup_id=h;
ref_count=0;
username=strdup(u);
schemaname=strdup(s);
query=(char *)malloc(ql);
memcpy(query,q,ql);
query_length=ql;
num_params=stmt->param_count;
num_columns=stmt->field_count;
warning_count=stmt->upsert_status.warning_count;
if (_h) {
hash=_h;
} else {
compute_hash();
}
// set default properties:
properties.cache_ttl=-1;
properties.timeout=-1;
properties.delay=-1;
fields=NULL;
if (num_columns) {
fields=(MYSQL_FIELD **)malloc(num_columns*sizeof(MYSQL_FIELD *));
uint16_t i;
for (i=0;i<num_columns;i++) {
fields[i]=(MYSQL_FIELD *)malloc(sizeof(MYSQL_FIELD));
MYSQL_FIELD *fs=&(stmt->fields[i]);
MYSQL_FIELD *fd=fields[i];
// first copy all fields
memcpy(fd,fs,sizeof(MYSQL_FIELD));
// then duplicate strings
fd->name = ( fs->name ? strdup(fs->name) : NULL );
fd->org_name = ( fs->org_name ? strdup(fs->org_name) : NULL );
fd->table = ( fs->table ? strdup(fs->table) : NULL );
fd->org_table = ( fs->org_table ? strdup(fs->org_table) : NULL );
fd->db = ( fs->db ? strdup(fs->db) : NULL );
fd->catalog = ( fs->catalog ? strdup(fs->catalog) : NULL );
fd->def = ( fs->def ? strdup(fs->def) : NULL );
}
}
/*
params=NULL;
if(num_params) {
params=(MYSQL_BIND **)malloc(num_columns*sizeof(MYSQL_BIND *));
uint16_t i;
for (i=0;i<num_params;i++) {
params[i]=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND));
MYSQL_BIND *ps=&(stmt->params[i]);
MYSQL_BIND *pd=params[i];
// copy all params
memcpy(pd,ps,sizeof(MYSQL_BIND));
}
}
*/
}
MySQL_STMT_Global_info::~MySQL_STMT_Global_info() {
free(username);
free(schemaname);
free(query);
if (num_columns) {
uint16_t i;
for (i=0;i<num_columns;i++) {
MYSQL_FIELD *f=fields[i];
if (f->name) { free(f->name); f->name=NULL; }
if (f->org_name) { free(f->org_name); f->org_name=NULL; }
if (f->table) { free(f->table); f->table=NULL; }
if (f->org_table) { free(f->org_table); f->org_table=NULL; }
if (f->db) { free(f->db); f->db=NULL; }
if (f->catalog) { free(f->catalog); f->catalog=NULL; }
if (f->def) { free(f->def); f->def=NULL; }
free(fields[i]);
}
free(fields);
fields=NULL;
}
/*
if (num_params) {
uint16_t i;
for (i=0;i<num_params;i++) {
free(params[i]);
}
free(params);
params=NULL;
}
*/
}

@ -526,7 +526,8 @@ bool MySQL_Protocol::generate_pkt_EOF(bool send, void **ptr, unsigned int *len,
(*myds)->DSS=STATE_EOF2;
break;
default:
assert(0);
//assert(0);
break;
}
}
if (len) { *len=size; }
@ -769,6 +770,51 @@ bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len
}
// FIXME FIXME function not completed yet!
// see https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html
bool MySQL_Protocol::generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info) {
uint8_t sid=sequence_id;
uint16_t i;
char *okpack=(char *)malloc(16); // first packet
mysql_hdr hdr;
hdr.pkt_id=sid;
hdr.pkt_length=12;
memcpy(okpack,&hdr,sizeof(mysql_hdr)); // copy header
okpack[4]=0;
okpack[13]=0;
memcpy(okpack+5,&stmt_info->statement_id,sizeof(uint32_t));
memcpy(okpack+9,&stmt_info->num_columns,sizeof(uint16_t));
memcpy(okpack+11,&stmt_info->num_params,sizeof(uint16_t));
memcpy(okpack+14,&stmt_info->warning_count,sizeof(uint16_t));
(*myds)->PSarrayOUT->add((void *)okpack,16);
sid++;
if (stmt_info->num_params) {
for (i=0; i<stmt_info->num_params; i++) {
generate_pkt_field(true,NULL,NULL,sid,
(char *)"", (char *)"", (char *)"", (char *)"", (char *)"",
0,0,0,0,0,false,0,NULL);
sid++;
}
generate_pkt_EOF(true,NULL,NULL,sid,0,SERVER_STATUS_AUTOCOMMIT); // FIXME : for now we pass a very broken flag
sid++;
}
if (stmt_info->num_columns) {
for (i=0; i<stmt_info->num_columns; i++) {
MYSQL_FIELD *fd=stmt_info->fields[i];
//bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue) {
generate_pkt_field(true,NULL,NULL,sid,
fd->db,
fd->table, fd->org_table,
fd->name, fd->org_name,
fd->charsetnr, 0, fd->type, fd->flags, fd->decimals, false,0,NULL);
sid++;
}
generate_pkt_EOF(true,NULL,NULL,sid,0,SERVER_STATUS_AUTOCOMMIT); // FIXME : for now we pass a very broken flag
sid++;
}
return true;
}
bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) {
int col=0;
int rowlen=0;
@ -1425,7 +1471,146 @@ void * MySQL_Protocol::Query_String_to_packet(uint8_t sid, std::string *s, unsig
return pkt;
}
MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my) {
// See https://dev.mysql.com/doc/internals/en/com-stmt-execute.html for reference
stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned int size, uint16_t num_params) {
stmt_execute_metadata_t *ret=NULL; //return NULL in case of failure
if (size<14) {
// some error!
return ret;
}
//ret=(stmt_execute_metadata_t *)malloc(sizeof(stmt_execute_metadata_t));
ret= new stmt_execute_metadata_t();
char *p=(char *)ptr+5;
memcpy(&ret->stmt_id,p,4); p+=4; // stmt-id
memcpy(&ret->flags,p,1); p+=1; // flags
p+=4; // iteration-count
ret->num_params=num_params;
// ret->binds=NULL;
// ret->is_nulls=NULL;
// ret->lengths=NULL;
ret->pkt=ptr;
if (num_params) {
uint16_t i;
size_t null_bitmap_length=(num_params+7)/8;
if (size < (14+1+null_bitmap_length)) {
// some data missing?
delete ret;
return NULL;
}
uint8_t new_params_bound_flag;
memcpy(&new_params_bound_flag,p+null_bitmap_length,1);
if (new_params_bound_flag!=1) {
// something wrong
delete ret;
return NULL;
}
uint8_t *null_bitmap=(uint8_t *)malloc(null_bitmap_length);
memcpy(null_bitmap,p,null_bitmap_length);
p+=null_bitmap_length;
p+=1; // new_params_bound_flag
MYSQL_BIND *binds=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)*num_params);
my_bool *is_nulls=(my_bool *)malloc(sizeof(my_bool)*num_params);
unsigned long *lengths=(unsigned long *)malloc(sizeof(unsigned long)*num_params);
ret->binds=binds;
ret->is_nulls=is_nulls;
ret->lengths=lengths;
for (i=0;i<num_params;i++) {
// set null
uint8_t null_byte=null_bitmap[i/8];
uint8_t idx=i%8;
my_bool is_null = (null_byte & ( 1 << idx )) >> idx;
is_nulls[i]=is_null;
binds[i].is_null=&is_nulls[i];
// set buffer_type and is_unsigned
//enum enum_field_types buffer_type=MYSQL_TYPE_DECIMAL; // set a random default
uint16_t buffer_type=0;
memcpy(&buffer_type,p,2);
binds[i].is_unsigned=0;
if (buffer_type >= 32768) { // is_unsigned bit
buffer_type-=32768;
binds[i].is_unsigned=1;
}
binds[i].buffer_type=(enum enum_field_types)buffer_type;
p+=2;
// set length
lengths[i]=0;
// unsigned long l=0;
// uint8_t ll=mysql_decode_length((unsigned char *)p,&l);
// lengths[i]=l;
// p+=ll;
binds[i].length=&lengths[i];
}
for (i=0;i<num_params;i++) {
if (is_nulls[i]==true) {
continue;
}
enum enum_field_types buffer_type=binds[i].buffer_type;
switch (buffer_type) {
case MYSQL_TYPE_TINY:
binds[i].buffer=p;
p+=1;
break;
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_YEAR:
binds[i].buffer=p;
p+=2;
break;
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_INT24:
binds[i].buffer=p;
p+=4;
break;
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_LONGLONG:
binds[i].buffer=p;
p+=8;
break;
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_DATETIME:
{
binds[i].buffer=malloc(sizeof(MYSQL_TIME)); // NOTE: remember to free() this
memset(binds[i].buffer,0,sizeof(MYSQL_TIME));
uint8_t l;
memcpy(&l,p,1);
p++;
memcpy(binds[i].buffer,p,l);
p+=l;
}
break;
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_STRING:
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_NEWDECIMAL:
{
uint8_t l=0;
uint64_t len;
l=mysql_decode_length((unsigned char *)p, &len);
p+=l;
binds[i].buffer=p;
p+=len;
lengths[i]=len;
}
break;
default:
assert(0);
break;
}
}
}
return ret;
}
MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my, MYSQL_STMT *_stmt) {
transfer_started=false;
resultset_completed=false;
myprot=_myprot;
@ -1470,6 +1655,57 @@ MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL
sid++;
PSarrayOUT->add(pkt.ptr,pkt.size);
resultset_size+=pkt.size;
if (_stmt) { // binary protocol , we also assume we have ALL the resultset
//MYSQL_RES * prepare_meta_result = mysql_stmt_result_metadata(_stmt);
// int column_count=mysql_num_fields(prepare_meta_result);
// MYSQL_BIND *binds=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)*column_count);
// mysql_stmt_bind_result(_stmt, binds);
fprintf(stdout, "Fetching results ...\n");
// int row_count=0;
// while (!mysql_stmt_fetch(_stmt)) {
// row_count++;
// fprintf(stdout, " row %d\n", row_count);
// }
// free (binds);
unsigned long long total_size=0;
MYSQL_ROWS *r=_stmt->result.data;
total_size+=r->length;
if (r->length > 0xFFFFFF) {
total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr);
}
total_size+=sizeof(mysql_hdr);
while(r->next) {
r=r->next;
total_size+=r->length;
if (r->length > 0xFFFFFF) {
total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr);
}
total_size+=sizeof(mysql_hdr);
}
PtrSize_t pkt;
pkt.size=total_size;
pkt.ptr=malloc(pkt.size);
total_size=0;
r=_stmt->result.data;
add_row2(r,(unsigned char *)pkt.ptr);
total_size+=r->length;
if (r->length > 0xFFFFFF) {
total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr);
}
total_size+=sizeof(mysql_hdr);
while(r->next) {
r=r->next;
add_row2(r,(unsigned char *)pkt.ptr+total_size);
total_size+=r->length;
if (r->length > 0xFFFFFF) {
total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr);
}
total_size+=sizeof(mysql_hdr);
}
PSarrayOUT->add(pkt.ptr,pkt.size);
resultset_size+=pkt.size;
add_eof();
}
}
@ -1507,6 +1743,50 @@ unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) {
return pkt_length;
}
// add_row2 is perhaps a faster implementation of add_row()
// still experimentatl
// so far, used only for prepared statements
// it assumes that the MYSQL_ROW is an format ready to be sent to the client
unsigned int MySQL_ResultSet::add_row2(MYSQL_ROWS *row, unsigned char *offset) {
unsigned long length=row->length;
uint8_t pkt_sid=sid;
if (length < (0xFFFFFF+sizeof(mysql_hdr))) {
mysql_hdr myhdr;
myhdr.pkt_length=length;
myhdr.pkt_id=pkt_sid;
memcpy(offset, &myhdr, sizeof(mysql_hdr));
memcpy(offset+sizeof(mysql_hdr), row->data, row->length);
pkt_sid++;
} else {
unsigned int left=length;
unsigned int copied=0;
while (left>=0xFFFFFF) {
mysql_hdr myhdr;
myhdr.pkt_length=0xFFFFFF;
myhdr.pkt_id=pkt_sid;
pkt_sid++;
memcpy(offset, &myhdr, sizeof(mysql_hdr));
offset+=sizeof(mysql_hdr);
memcpy(offset, row->data+copied, myhdr.pkt_length);
offset+=0xFFFFFF;
// we are writing a large packet (over 16MB), we assume we are always outside the buffer
copied+=0xFFFFFF;
left-=0xFFFFFF;
}
mysql_hdr myhdr;
myhdr.pkt_length=left;
myhdr.pkt_id=pkt_sid;
pkt_sid++;
memcpy(offset, &myhdr, sizeof(mysql_hdr));
offset+=sizeof(mysql_hdr);
memcpy(offset, row->data+copied, myhdr.pkt_length);
// we are writing a large packet (over 16MB), we assume we are always outside the buffer
}
sid=pkt_sid;
return length;
}
void MySQL_ResultSet::add_eof() {
PtrSize_t pkt;
if (myprot) {

@ -15,6 +15,7 @@ extern const CHARSET_INFO * proxysql_find_charset_name(const char * const name);
extern MySQL_Authentication *GloMyAuth;
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Logger *GloMyLogger;
extern MySQL_STMT_Manager *GloMyStmt;
class KillArgs {
public:
@ -92,6 +93,8 @@ void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) {
MyComQueryCmd=MYSQL_COM_QUERY___NONE;
QueryPointer=NULL;
QueryLength=0;
mysql_stmt=NULL;
stmt_meta=NULL;
//QueryParserArgs=NULL;
QueryParserArgs.digest_text=NULL;
QueryParserArgs.first_comment=NULL;
@ -110,6 +113,7 @@ void Query_Info::end() {
if ((end_time-start_time) > (unsigned int)mysql_thread___long_query_time*1000) {
__sync_add_and_fetch(&sess->thread->status_variables.queries_slow,1);
}
assert(mysql_stmt==NULL);
}
void Query_Info::init(unsigned char *_p, int len, bool mysql_header) {
@ -189,6 +193,7 @@ MySQL_Session::MySQL_Session() {
thread_session_id=0;
pause_until=0;
qpo=NULL;
// Session_STMT_Manager=NULL;
start_time=0;
command_counters=new StatCounters(15,10,false);
healthy=1;
@ -250,6 +255,9 @@ MySQL_Session::~MySQL_Session() {
if (admin==false && connections_handler==false && mirror==false) {
__sync_fetch_and_sub(&MyHGM->status.client_connections,1);
}
// if (Session_STMT_Manager) {
// delete Session_STMT_Manager;
// }
}
@ -837,8 +845,6 @@ __get_pkts_from_client:
case _MYSQL_COM_CHANGE_USER:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_CHANGE_USER(&pkt, &wrong_pass);
break;
case _MYSQL_COM_STMT_PREPARE:
case _MYSQL_COM_STMT_EXECUTE:
case _MYSQL_COM_STMT_CLOSE:
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
@ -846,6 +852,124 @@ __get_pkts_from_client:
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
case _MYSQL_COM_STMT_PREPARE:
if (admin==true) { // admin module will not support prepared statement!!
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
} else {
thread->status_variables.stmt_prepare++;
thread->status_variables.queries++;
// if we reach here, we are not on admin
bool rc_break=false;
// Note: CurrentQuery sees the query as sent by the client.
// shortly after, the packets it used to contain the query will be deallocated
// Note2 : we call the next function as if it was _MYSQL_COM_QUERY
// because the offset will be identical
CurrentQuery.begin((unsigned char *)pkt.ptr,pkt.size,true);
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt);
if (rc_break==true) {
break;
}
// if (Session_STMT_Manager==NULL) {
// Session_STMT_Manager = new MySQL_STMT_Manager();
// }
if (client_myds->myconn->local_stmts==NULL) {
client_myds->myconn->local_stmts=new MySQL_STMTs_local();
}
uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
MySQL_STMT_Global_info *stmt_info=NULL;
// if (mysql_thread___stmt_multiplexing) {
stmt_info=GloMyStmt->find_prepared_statement_by_hash(hash);
// } else {
// stmt_info=Session_STMT_Manager->find_prepared_statement_by_hash(hash);
// }
if (stmt_info) {
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info);
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
} else {
mybe=find_or_create_backend(current_hostgroup);
status=PROCESSING_STMT_PREPARE;
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until=0;
pause_until=0;
mybe->server_myds->killed_at=0;
//mybe->server_myds->mysql_real_query.init(&pkt);
client_myds->setDSS_STATE_QUERY_SENT_NET();
}
}
break;
case _MYSQL_COM_STMT_EXECUTE:
if (admin==true) { // admin module will not support prepared statement!!
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported");
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
} else {
// if we reach here, we are not on admin
thread->status_variables.stmt_execute++;
thread->status_variables.queries++;
bool rc_break=false;
uint32_t stmt_global_id=0;
memcpy(&stmt_global_id,(char *)pkt.ptr+5,sizeof(uint32_t));
CurrentQuery.stmt_global_id=stmt_global_id;
// now we get the statement information
MySQL_STMT_Global_info *stmt_info=NULL;
// if (mysql_thread___stmt_multiplexing) {
stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id);
// } else {
// stmt_info=Session_STMT_Manager->find_prepared_statement_by_stmt_id(stmt_global_id);
// }
if (stmt_info==NULL) {
// we couldn't find it
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Prepared statement doesn't exist");
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
}
stmt_execute_metadata_t *stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info->num_params);
if (stmt_meta==NULL) {
l_free(pkt.size,pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Error in prepared statement execution");
client_myds->DSS=STATE_SLEEP;
status=WAITING_CLIENT_DATA;
break;
}
// else
// CurrentQuery.begin((unsigned char *)stmt_info->query, stmt_info->query_length,false);
CurrentQuery.stmt_meta=stmt_meta;
// qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); // FIXME: not sure we need to call this
// assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
// NOTE: we do not call YET the follow function for STMT_EXECUTE
//rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt);
current_hostgroup=stmt_info->hostgroup_id;
mybe=find_or_create_backend(current_hostgroup);
status=PROCESSING_STMT_EXECUTE;
mybe->server_myds->connect_retries_on_failure=0;
mybe->server_myds->wait_until=0;
mybe->server_myds->killed_at=0;
//mybe->server_myds->mysql_real_query.init(&pkt);
client_myds->setDSS_STATE_QUERY_SENT_NET();
}
break;
// case _MYSQL_COM_STMT_PREPARE:
// handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(&pkt);
// break;
@ -977,6 +1101,8 @@ handler_again:
}
break;
case PROCESSING_STMT_PREPARE:
case PROCESSING_STMT_EXECUTE:
case PROCESSING_QUERY:
//fprintf(stderr,"PROCESSING_QUERY\n");
if (pause_until > thread->curtime) {
@ -1013,7 +1139,20 @@ handler_again:
}
if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) {
// we don't have a backend yet
previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CONNECTING_SERVER);
} else {
MySQL_Data_Stream *myds=mybe->server_myds;
@ -1028,11 +1167,39 @@ handler_again:
if (default_hostgroup>=0) {
if (client_myds->myconn->userinfo->hash!=mybe->server_myds->myconn->userinfo->hash) {
if (strcmp(client_myds->myconn->userinfo->username,myds->myconn->userinfo->username)) {
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_USER_SERVER);
}
if (strcmp(client_myds->myconn->userinfo->schemaname,myds->myconn->userinfo->schemaname)) {
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_SCHEMA);
}
}
@ -1048,7 +1215,21 @@ handler_again:
}
}
if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) {
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_CHARSET);
}
if (autocommit != mybe->server_myds->myconn->IsAutoCommit()) {
@ -1057,17 +1238,65 @@ handler_again:
// enforce_autocommit_on_reads is disabled
// we need to check if it is a SELECT not FOR UPDATE
if (CurrentQuery.is_select_NOT_for_update()==false) {
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT);
}
} else {
// in every other cases, enforce autocommit
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT);
}
}
if (status==PROCESSING_STMT_EXECUTE) {
CurrentQuery.mysql_stmt=myconn->local_stmts->find(CurrentQuery.stmt_global_id);
if (CurrentQuery.mysql_stmt==NULL) {
// the conection we too doesn't have the prepared statements prepared
// we try to create it now
MySQL_STMT_Global_info *stmt_info=NULL;
stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(CurrentQuery.stmt_global_id);
CurrentQuery.QueryLength=stmt_info->query_length;
CurrentQuery.QueryPointer=(unsigned char *)stmt_info->query;
previous_status.push(PROCESSING_STMT_EXECUTE);
NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE);
}
}
}
}
// FIXME : this part of the code is commented just for reference. It seems it has been removed in v1.2, as copied above
// //status=PROCESSING_QUERY;
// mybe->server_myds->max_connect_time=0;
// // we insert it in mypolls only if not already there
// if (myds->mypolls==NULL) {
// thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
// }
if (myconn->async_state_machine==ASYNC_IDLE) {
mybe->server_myds->wait_until=0;
@ -1084,15 +1313,54 @@ handler_again:
}
}
}
int rc;
timespec begint;
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
int rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize);
switch (status) {
case PROCESSING_QUERY:
rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize);
break;
case PROCESSING_STMT_PREPARE:
//rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize,&CurrentQuery.mysql_stmt);
rc=myconn->async_query(myds->revents, (char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength,&CurrentQuery.mysql_stmt);
break;
case PROCESSING_STMT_EXECUTE:
// PROCESSING_STMT_EXECUTE FIXME
{
//MySQL_STMT_Global_info *stmt_info=NULL;
/*
if (mysql_thread___stmt_multiplexing) {
stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id);
} else {
stmt_info=Session_STMT_Manager->find_prepared_statement_by_stmt_id(stmt_global_id);
}
*/
/*
CurrentQuery.mysql_stmt=myconn->local_stmts->find(CurrentQuery.stmt_global_id);
if (CurrentQuery.mysql_stmt==NULL) {
// the conection we too doesn't have the prepared statements prepared
// we try to create it now
MySQL_STMT_Global_info *stmt_info=NULL;
stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(CurrentQuery.stmt_global_id);
CurrentQuery.QueryLength=stmt_info->query_length;
CurrentQuery.QueryPointer=(unsigned char *)stmt_info->query;
previous_status.push(PROCESSING_STMT_EXECUTE);
NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE);
}
*/
//rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize,&CurrentQuery.mysql_stmt, CurrentQuery.stmt_meta);
rc=myconn->async_query(myds->revents, (char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength,&CurrentQuery.mysql_stmt, CurrentQuery.stmt_meta);
}
break;
default:
assert(0);
break;
}
timespec endt;
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
thread->status_variables.backend_query_time=thread->status_variables.backend_query_time +
(endt.tv_sec*1000000000+endt.tv_nsec) -
(begint.tv_sec*1000000000+begint.tv_nsec);
// if (myconn->async_state_machine==ASYNC_QUERY_END) {
if (rc==0) {
// FIXME: deprecate old MySQL_Result_to_MySQL_wire , not completed yet
@ -1108,7 +1376,60 @@ handler_again:
if (myconn->mysql->insert_id) {
last_insert_id=myconn->mysql->insert_id;
}
MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS);
switch (status) {
case PROCESSING_QUERY:
MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS);
break;
case PROCESSING_STMT_PREPARE:
{
uint32_t stmid;
MySQL_STMT_Global_info *stmt_info=NULL;
// if (mysql_thread___stmt_multiplexing) {
stmt_info=GloMyStmt->add_prepared_statement(current_hostgroup,
(char *)client_myds->myconn->userinfo->username,
(char *)client_myds->myconn->userinfo->schemaname,
(char *)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
CurrentQuery.mysql_stmt,
qpo->cache_ttl,
qpo->timeout,
qpo->delay,
true);
stmid=stmt_info->statement_id;
//uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
/*
} else {
//stmt_info=Session_STMT_Manager->find_prepared_statement_by_hash(hash);
stmt_info=Session_STMT_Manager->add_prepared_statement(current_hostgroup,
(char *)client_myds->myconn->userinfo->username,
(char *)client_myds->myconn->userinfo->schemaname,
(char *)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
CurrentQuery.mysql_stmt,
qpo->cache_ttl,
qpo->timeout,
qpo->delay,
false);
stmid=stmt_info->statement_id;
*/
// }
myds->myconn->local_stmts->insert(stmid,CurrentQuery.mysql_stmt);
client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info);
}
CurrentQuery.mysql_stmt=NULL;
break;
case PROCESSING_STMT_EXECUTE:
{
MySQL_Stmt_Result_to_MySQL_wire(CurrentQuery.mysql_stmt);
}
CurrentQuery.mysql_stmt=NULL;
break;
default:
assert(0);
break;
}
// GloQPro->delete_QP_out(qpo);
// qpo=NULL;
// myconn->async_free_result();
@ -1159,7 +1480,18 @@ handler_again:
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
return -1;
@ -1185,7 +1517,18 @@ handler_again:
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
return -1;
@ -1215,7 +1558,18 @@ handler_again:
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
previous_status.push(PROCESSING_QUERY);
//previous_status.push(PROCESSING_QUERY);
switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
default:
assert(0);
break;
}
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
return -1;
@ -1227,7 +1581,24 @@ handler_again:
break; // continue normally
}
MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS);
switch (status) {
case PROCESSING_QUERY:
MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS);
break;
case PROCESSING_STMT_PREPARE:
//MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS, true);
{
char sqlstate[10];
sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(myconn->mysql),sqlstate,(char *)mysql_stmt_error(myconn->query.stmt));
client_myds->pkt_sid++;
}
break;
default:
assert(0);
break;
}
// MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS);
// CurrentQuery.end();
// GloQPro->delete_QP_out(qpo);
// qpo=NULL;
@ -1980,7 +2351,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *pkt) {
bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *pkt, bool prepared) {
if (qpo->new_query) {
// the query was rewritten
l_free(pkt->size,pkt->ptr); // free old pkt
@ -2027,6 +2398,11 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
RequestEnd(NULL);
return true;
}
if (prepared) { // for prepared statement we exit here
goto __exit_set_destination_hostgroup;
}
if (mirror==true) { // for mirror session we exit here
current_hostgroup=qpo->destination_hostgroup;
return false;
@ -2061,6 +2437,9 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return true;
}
}
__exit_set_destination_hostgroup:
if ( qpo->destination_hostgroup >= 0 ) {
if (transaction_persistent_hostgroup == -1) {
current_hostgroup=qpo->destination_hostgroup;
@ -2164,6 +2543,14 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
}
}
void MySQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt) {
MYSQL_RES *stmt_result=mysql_stmt_result_metadata(stmt);
if (stmt_result) {
MySQL_ResultSet *MyRS=new MySQL_ResultSet(&client_myds->myprot, stmt_result, stmt->mysql, stmt);
bool resultset_completed=MyRS->get_resultset(client_myds->PSarrayOUT);
}
}
void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS) {
if (MyRS) {
assert(MyRS->result);

@ -182,6 +182,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"max_allowed_packet",
(char *)"max_transaction_time",
(char *)"multiplexing",
// (char *)"stmt_multiplexing",
(char *)"enforce_autocommit_on_reads",
(char *)"threshold_query_length",
(char *)"threshold_resultset_size",
@ -283,6 +284,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.client_found_rows=true;
variables.commands_stats=true;
variables.multiplexing=true;
// variables.stmt_multiplexing=false;
variables.enforce_autocommit_on_reads=false;
variables.query_digests=true;
variables.sessions_sort=true;
@ -477,6 +479,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) {
if (!strcasecmp(name,"have_compress")) return (int)variables.have_compress;
if (!strcasecmp(name,"client_found_rows")) return (int)variables.client_found_rows;
if (!strcasecmp(name,"multiplexing")) return (int)variables.multiplexing;
// if (!strcasecmp(name,"stmt_multiplexing")) return (int)variables.stmt_multiplexing;
if (!strcasecmp(name,"enforce_autocommit_on_reads")) return (int)variables.enforce_autocommit_on_reads;
if (!strcasecmp(name,"commands_stats")) return (int)variables.commands_stats;
if (!strcasecmp(name,"query_digests")) return (int)variables.query_digests;
@ -725,6 +728,9 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
if (!strcasecmp(name,"multiplexing")) {
return strdup((variables.multiplexing ? "true" : "false"));
}
// if (!strcasecmp(name,"stmt_multiplexing")) {
// return strdup((variables.stmt_multiplexing ? "true" : "false"));
// }
if (!strcasecmp(name,"enforce_autocommit_on_reads")) {
return strdup((variables.enforce_autocommit_on_reads ? "true" : "false"));
}
@ -1306,6 +1312,21 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
}
return false;
}
// if (!strcasecmp(name,"stmt_multiplexing")) {
/*
// FIXME : for now, stmt_multiplexing cannot be enabled
// there is no code to handle it
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.stmt_multiplexing=true;
return true;
}
*/
// if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
// variables.stmt_multiplexing=false;
// return true;
// }
// return false;
// }
if (!strcasecmp(name,"enforce_autocommit_on_reads")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.enforce_autocommit_on_reads=true;
@ -2061,6 +2082,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___have_compress=(bool)GloMTH->get_variable_int((char *)"have_compress");
mysql_thread___client_found_rows=(bool)GloMTH->get_variable_int((char *)"client_found_rows");
mysql_thread___multiplexing=(bool)GloMTH->get_variable_int((char *)"multiplexing");
// mysql_thread___stmt_multiplexing=(bool)GloMTH->get_variable_int((char *)"stmt_multiplexing");
mysql_thread___enforce_autocommit_on_reads=(bool)GloMTH->get_variable_int((char *)"enforce_autocommit_on_reads");
mysql_thread___commands_stats=(bool)GloMTH->get_variable_int((char *)"commands_stats");
mysql_thread___query_digests=(bool)GloMTH->get_variable_int((char *)"query_digests");
@ -2098,6 +2120,9 @@ MySQL_Thread::MySQL_Thread() {
last_maintenance_time=0;
maintenance_loop=true;
status_variables.stmt_prepare=0;
status_variables.stmt_execute=0;
status_variables.stmt_close=0;
status_variables.queries=0;
status_variables.queries_slow=0;
status_variables.queries_backends_bytes_sent=0;
@ -2302,6 +2327,24 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus() {
pta[1]=buf;
result->add_row(pta);
}
{ // stmt prepare
pta[0]=(char *)"Com_stmt_prepare";
sprintf(buf,"%llu",get_total_stmt_prepare());
pta[1]=buf;
result->add_row(pta);
}
{ // stmt execute
pta[0]=(char *)"Com_stmt_execute";
sprintf(buf,"%llu",get_total_stmt_execute());
pta[1]=buf;
result->add_row(pta);
}
{ // stmt prepare
pta[0]=(char *)"Com_stmt_close";
sprintf(buf,"%llu",get_total_stmt_close());
pta[1]=buf;
result->add_row(pta);
}
{ // Queries
pta[0]=(char *)"Questions";
sprintf(buf,"%llu",get_total_queries());
@ -2554,6 +2597,45 @@ __exit_kill_session:
return ret;
}
unsigned long long MySQL_Threads_Handler::get_total_stmt_prepare() {
unsigned long long q=0;
unsigned int i;
for (i=0;i<num_threads;i++) {
if (mysql_threads) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.stmt_prepare,0);
}
}
return q;
}
unsigned long long MySQL_Threads_Handler::get_total_stmt_execute() {
unsigned long long q=0;
unsigned int i;
for (i=0;i<num_threads;i++) {
if (mysql_threads) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.stmt_execute,0);
}
}
return q;
}
unsigned long long MySQL_Threads_Handler::get_total_stmt_close() {
unsigned long long q=0;
unsigned int i;
for (i=0;i<num_threads;i++) {
if (mysql_threads) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.stmt_close,0);
}
}
return q;
}
unsigned long long MySQL_Threads_Handler::get_total_queries() {
unsigned long long q=0;
unsigned int i;

@ -171,11 +171,14 @@ MySQL_Connection::MySQL_Connection() {
mysql_result=NULL;
query.ptr=NULL;
query.length=0;
query.stmt=NULL;
query.stmt_meta=NULL;
largest_query_length=0;
MyRS=NULL;
creation_time=0;
processing_multi_statement=false;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this);
local_stmts=new MySQL_STMTs_local();
};
MySQL_Connection::~MySQL_Connection() {
@ -206,6 +209,13 @@ MySQL_Connection::~MySQL_Connection() {
if (MyRS) {
delete MyRS;
}
if (local_stmts) {
delete local_stmts;
}
if (query.stmt) {
mysql_stmt_close(query.stmt);
query.stmt=NULL;
}
};
bool MySQL_Connection::set_autocommit(bool _ac) {
@ -430,6 +440,10 @@ void MySQL_Connection::set_query(char *stmt, unsigned long length) {
if (length > largest_query_length) {
largest_query_length=length;
}
if (query.stmt) {
mysql_stmt_close(query.stmt);
query.stmt=NULL;
}
//query.ptr=(char *)malloc(length);
//memcpy(query.ptr,stmt,length);
}
@ -444,6 +458,40 @@ void MySQL_Connection::real_query_cont(short event) {
async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event, true));
}
void MySQL_Connection::stmt_prepare_start() {
PROXY_TRACE();
query.stmt=mysql_stmt_init(mysql);
my_bool my_arg=true;
mysql_stmt_attr_set(query.stmt, STMT_ATTR_UPDATE_MAX_LENGTH, &my_arg);
async_exit_status = mysql_stmt_prepare_start(&interr , query.stmt, query.ptr, query.length);
}
void MySQL_Connection::stmt_prepare_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_stmt_prepare_cont(&interr , query.stmt , mysql_status(event, true));
}
void MySQL_Connection::stmt_execute_start() {
PROXY_TRACE();
mysql_stmt_bind_param(query.stmt, query.stmt_meta->binds); // FIXME : add error handling
async_exit_status = mysql_stmt_execute_start(&interr , query.stmt);
}
void MySQL_Connection::stmt_execute_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_stmt_execute_cont(&interr , query.stmt , mysql_status(event, true));
}
void MySQL_Connection::stmt_execute_store_result_start() {
PROXY_TRACE();
async_exit_status = mysql_stmt_store_result_start(&interr, query.stmt);
}
void MySQL_Connection::stmt_execute_store_result_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_stmt_store_result_cont(&interr , query.stmt , mysql_status(event, true));
}
void MySQL_Connection::store_result_start() {
PROXY_TRACE();
async_exit_status = mysql_store_result_start(&mysql_result, mysql);
@ -622,6 +670,108 @@ handler_again:
}
break;
case ASYNC_STMT_PREPARE_START:
stmt_prepare_start();
__sync_fetch_and_add(&parent->queries_sent,1);
__sync_fetch_and_add(&parent->bytes_sent,query.length);
myds->sess->thread->status_variables.queries_backends_bytes_sent+=query.length;
if (async_exit_status) {
next_event(ASYNC_STMT_PREPARE_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END);
}
break;
case ASYNC_STMT_PREPARE_CONT:
stmt_prepare_cont(event);
if (async_exit_status) {
next_event(ASYNC_STMT_PREPARE_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END);
}
break;
case ASYNC_STMT_PREPARE_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_FAILED);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_SUCCESSFUL);
}
break;
case ASYNC_STMT_PREPARE_SUCCESSFUL:
break;
case ASYNC_STMT_PREPARE_FAILED:
break;
case ASYNC_STMT_EXECUTE_START:
stmt_execute_start();
__sync_fetch_and_add(&parent->queries_sent,1);
// __sync_fetch_and_add(&parent->bytes_sent,query.length);
// myds->sess->thread->status_variables.queries_backends_bytes_sent+=query.length;
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START);
}
break;
case ASYNC_STMT_EXECUTE_CONT:
stmt_execute_cont(event);
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START);
}
break;
case ASYNC_STMT_EXECUTE_STORE_RESULT_START:
if (mysql_stmt_errno(query.stmt)) {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
{
MYSQL_RES *stmt_result=mysql_stmt_result_metadata(query.stmt);
if (stmt_result==NULL) {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
}
stmt_execute_store_result_start();
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
break;
case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT:
stmt_execute_store_result_cont(event);
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
break;
case ASYNC_STMT_EXECUTE_END:
{
/*
int row_count= 0;
fprintf(stdout, "Fetching results ...\n");
while (!mysql_stmt_fetch(query.stmt))
{
row_count++;
fprintf(stdout, " row %d\n", row_count);
}
*/
}
/*
if (interr) {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_FAILED);
} else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_SUCCESSFUL);
}
*/
break;
// case ASYNC_STMT_EXECUTE_SUCCESSFUL:
// break;
// case ASYNC_STMT_EXECUTE_FAILED:
// break;
case ASYNC_NEXT_RESULT_START:
async_exit_status = mysql_next_result_start(&interr, mysql);
if (async_exit_status) {
@ -897,7 +1047,7 @@ int MySQL_Connection::async_connect(short event) {
// 0 when the query is completed
// 1 when the query is not completed
// the calling function should check mysql error in mysql struct
int MySQL_Connection::async_query(short event, char *stmt, unsigned long length) {
int MySQL_Connection::async_query(short event, char *stmt, unsigned long length, MYSQL_STMT **_stmt, stmt_execute_metadata_t *stmt_meta) {
PROXY_TRACE();
assert(mysql);
assert(ret_mysql);
@ -914,8 +1064,20 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length)
return 0;
break;
case ASYNC_IDLE:
set_query(stmt,length);
if (stmt_meta==NULL)
set_query(stmt,length);
async_state_machine=ASYNC_QUERY_START;
if (_stmt) {
query.stmt=*_stmt;
if (stmt_meta==NULL) {
async_state_machine=ASYNC_STMT_PREPARE_START;
} else {
if (query.stmt_meta==NULL) {
query.stmt_meta=stmt_meta;
}
async_state_machine=ASYNC_STMT_EXECUTE_START;
}
}
default:
handler(event);
break;
@ -928,6 +1090,24 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length)
return 0;
}
}
if (async_state_machine==ASYNC_STMT_EXECUTE_END) {
async_state_machine=ASYNC_QUERY_END;
if (mysql_stmt_errno(query.stmt)) {
return -1;
} else {
return 0;
}
}
if (async_state_machine==ASYNC_STMT_PREPARE_SUCCESSFUL || async_state_machine==ASYNC_STMT_PREPARE_FAILED) {
if (async_state_machine==ASYNC_STMT_PREPARE_FAILED) {
//mysql_stmt_close(query.stmt);
//query.stmt=NULL;
return -1;
} else {
*_stmt=query.stmt;
return 0;
}
}
if (async_state_machine==ASYNC_NEXT_RESULT_START) {
// if we reached this point it measn we are processing a multi-statement
// and we need to exit to give control to MySQL_Session

@ -142,6 +142,8 @@ Query_Processor *GloQPro;
ProxySQL_Admin *GloAdmin;
MySQL_Threads_Handler *GloMTH;
MySQL_STMT_Manager *GloMyStmt;
MySQL_Monitor *GloMyMon;
std::thread *MyMon_thread;
@ -236,9 +238,11 @@ void ProxySQL_Main_init_main_modules() {
GloMyAuth=NULL;
GloMyMon=NULL;
GloMyLogger=NULL;
GloMyStmt=NULL;
MyHGM=new MySQL_HostGroups_Manager();
GloMTH=new MySQL_Threads_Handler();
GloMyLogger = new MySQL_Logger();
GloMyStmt=new MySQL_STMT_Manager();
}
@ -372,6 +376,10 @@ void ProxySQL_Main_shutdown_all_modules() {
std::cerr << "GloMyLogger shutdown in ";
#endif
}
if (GloMyStmt) {
delete GloMyStmt;
GloMyStmt=NULL;
}
{
cpu_timer t;

@ -0,0 +1,7 @@
client1
client2
client3
client4
client5
client6
client7

@ -0,0 +1,85 @@
DEPS_PATH=../../deps
MARIADB_PATH=$(DEPS_PATH)/mariadb-client-library/mariadb_client
MARIADB_IDIR=$(MARIADB_PATH)/include
MARIADB_LDIR=$(MARIADB_PATH)/libmariadb
DAEMONPATH=$(DEPS_PATH)/libdaemon/libdaemon
DAEMONPATH_IDIR=$(DAEMONPATH)
DAEMONPATH_LDIR=$(DAEMONPATH)/libdaemon/.libs
JEMALLOC_PATH=$(DEPS_PATH)/jemalloc/jemalloc
JEMALLOC_IDIR=$(JEMALLOC_PATH)/include/jemalloc
JEMALLOC_LDIR=$(JEMALLOC_PATH)/lib
LIBCONFIG_PATH=$(DEPS_PATH)/libconfig/libconfig-1.4.9
LIBCONFIG_IDIR=-I$(LIBCONFIG_PATH)/lib
LIBCONFIG_LDIR=-L$(LIBCONFIG_PATH)/lib/.libs
LIBEVENT_PATH=$(DEPS_PATH)/libevent/libevent
LIBEVENT_IDIR=$(LIBEVENT_PATH)/include
LIBEVENT_LDIR=$(LIBEVENT_PATH)/.libs
RE2_PATH=$(DEPS_PATH)/re2/re2
RE2_IDIR=$(RE2_PATH)
SQLITE3_DIR=$(DEPS_PATH)/sqlite3/sqlite3
IDIR=../../include
LDIR=../../lib
IDIRS=-I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(DAEMONPATH_IDIR) -I$(SQLITE3_DIR)
LDIRS=-L$(LDIR) -L$(JEMALLOC_LDIR) $(LIBCONFIG_LDIR) -L$(RE2_PATH)/obj -L$(LIBEVENT_LDIR) -L$(MARIADB_LDIR) -L$(DAEMONPATH_LDIR)
DEBUG=-DDEBUG
MYCPPFLAGS=-std=c++11 $(IDIRS) $(OPTZ) $(DEBUG) -ggdb
LDFLAGS+=
MYLIBS=-Wl,--export-dynamic -Wl,-Bstatic -lconfig -lproxysql -ldaemon -ljemalloc -lconfig++ -lre2 -levent -lmariadbclient -Wl,-Bdynamic -lpthread -lm -lz -lrt -lcrypto -lssl $(EXTRALINK)
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
MYLIBS+= -ldl
endif
ifeq ($(UNAME_S),FreeBSD)
MYLIBS+= -lexecinfo
endif
LIBPROXYSQLAR=$(LDIR)/libproxysql.a
.PHONY: default
default: client1
client1: client1.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
client2: client2.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
client3: client3.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
client4: client4.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
client5: client5.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
client6: client6.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
client7: client7.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
client8: client8.cpp
$(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS)
default: $(EXECUTABLE)
clean:
rm -f *~ core $(default)

@ -0,0 +1,75 @@
#include "proxysql.h"
#include "cpp.h"
#define QUERY1 "SELECT ? + ? + ?"
MYSQL *mysql;
MYSQL_STMT *stmt;
uint32_t statement_id;
uint16_t num_params;
uint16_t num_columns;
uint16_t warning_count;
int run_stmt(MYSQL_STMT *stmt, int int_data) {
MYSQL_BIND bind[3];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
bind[1].buffer_type= MYSQL_TYPE_LONG;
bind[1].buffer= (char *)&int_data;
bind[1].is_null= 0;
bind[1].length= 0;
bind[2].buffer_type= MYSQL_TYPE_LONG;
bind[2].buffer= (char *)&int_data;
bind[2].is_null= 0;
bind[2].length= 0;
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// memset(bind, 0, sizeof(bind));
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
int main() {
std::mt19937 mt_rand(time(0));
mysql = mysql_init(NULL);
if (!mysql_real_connect(mysql,"127.0.0.1","msandbox","msandbox","test",6033,NULL,0)) {
//if (!mysql_real_connect(mysql,"127.0.0.1","root","","test",3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, QUERY1, strlen(QUERY1))) {
fprintf(stderr, " mysql_stmt_prepare(), failed: %d, %s\n" , mysql_errno(mysql), mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// param_count= mysql_stmt_param_count(stmt);
// fprintf(stdout, " total parameters in Query1 : %d\n", param_count);
statement_id=stmt->stmt_id;
num_params=stmt->param_count;
num_columns=stmt->field_count;
warning_count=stmt->upsert_status.warning_count;
fprintf(stdout, "statement_id=%d , columns=%d , params=%d , warnings=%d\n", statement_id, num_columns, num_params, warning_count);
run_stmt(stmt,(uint32_t)mt_rand());
return 0;
}

@ -0,0 +1,178 @@
#include "proxysql.h"
#include "cpp.h"
#include <ctime>
#define QUERY1 "SELECT ?"
#define NUMPREP 100000
#define NUMPRO 1000
//#define NUMPREP 160
//#define NUMPRO 4
#define LOOPS 10
#define USER "root"
#define SCHEMA "test"
MYSQL *mysql;
MYSQL_STMT **stmt;
uint32_t statement_id;
uint16_t num_params;
uint16_t num_columns;
uint16_t warning_count;
MySQL_STMT_Manager *GloMyStmt;
struct cpu_timer
{
~cpu_timer()
{
auto end = std::clock() ;
std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ;
};
const std::clock_t begin = std::clock() ;
};
int run_stmt(MYSQL_STMT *stmt, int int_data) {
MYSQL_BIND bind[1];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// memset(bind, 0, sizeof(bind));
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
int main() {
std::mt19937 mt_rand(time(0));
GloMyStmt=new MySQL_STMT_Manager();
MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local();
mysql = mysql_init(NULL);
char buff[128];
unsigned int bl=0;
if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
int i;
stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP);
{
cpu_timer t;
for (i=0; i<NUMPREP; i++) {
stmt[i] = mysql_stmt_init(mysql);
if (!stmt[i]) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a==NULL) {
if (mysql_stmt_prepare(stmt[i], buff, bl)) {
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt[i]));
exit(EXIT_FAILURE);
}
uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt[i]);
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt[i]->stmt_id, stmid);
local_stmts->insert(stmid,stmt[i]);
}
}
fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", NUMPREP, GloMyStmt->total_prepared_statements());
fprintf(stdout, "Created in: ");
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
//uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u random strings in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u hashes in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) founds++;
}
fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) {
// we have a prepared statement, we can run it
founds++;
MYSQL_STMT *stm=local_stmts->find(a->statement_id);
run_stmt(stm,(uint32_t)mt_rand());
}
}
fprintf(stdout, "Executed %u prepared statements in: ", founds);
}
{
// for comparison, we run also queries in TEXT protocol
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + %u",i,(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
int rc=mysql_real_query(mysql,buff,bl);
if (rc) {
fprintf(stderr, " mysql_real_query(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
MYSQL_RES *res=mysql_store_result(mysql);
if (res==NULL) {
fprintf(stderr, " mysql_store_result(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
mysql_free_result(res);
}
fprintf(stdout, "Executed %u queries in: ", i);
}
return 0;
}

@ -0,0 +1,211 @@
#include "proxysql.h"
#include "cpp.h"
#include <ctime>
#include <thread>
#define QUERY1 "SELECT ?"
#define NUMPREP 100000
#define NUMPRO 10000
//#define NUMPREP 160
//#define NUMPRO 4
#define LOOPS 1
#define USER "root"
#define SCHEMA "test"
#define NTHREADS 4
MySQL_STMT_Manager *GloMyStmt;
typedef struct _thread_data_t {
std::thread *thread;
MYSQL *mysql;
MYSQL_STMT **stmt;
} thread_data_t;
thread_data_t **GloThrData;
struct cpu_timer
{
~cpu_timer()
{
auto end = std::clock() ;
std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ;
};
const std::clock_t begin = std::clock() ;
};
int run_stmt(MYSQL_STMT *stmt, int int_data) {
MYSQL_BIND bind[1];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// memset(bind, 0, sizeof(bind));
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
void * mysql_thread(int tid) {
std::mt19937 mt_rand(time(0));
thread_data_t *THD;
THD=GloThrData[tid];
MYSQL *mysql;
MYSQL_STMT **stmt;
MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local();
THD->mysql = mysql_init(NULL);
mysql=THD->mysql;
char buff[128];
unsigned int bl=0;
if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
int i;
stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP);
{
cpu_timer t;
for (i=0; i<NUMPREP; i++) {
stmt[i] = mysql_stmt_init(mysql);
if (!stmt[i]) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a==NULL) { // no prepared statement was found in global
if (mysql_stmt_prepare(stmt[i], buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt[i]));
exit(EXIT_FAILURE);
}
uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt[i]);
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt[i]->stmt_id, stmid);
local_stmts->insert(stmid,stmt[i]);
}
}
fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", NUMPREP, GloMyStmt->total_prepared_statements());
fprintf(stdout, "Created in: ");
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
//uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u random strings in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u hashes in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) founds++;
}
fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) {
// we have a prepared statement, we can run it
MYSQL_STMT *stm=local_stmts->find(a->statement_id);
if (stm) {
run_stmt(stm,(uint32_t)mt_rand());
founds++;
}
}
}
fprintf(stdout, "Executed %u prepared statements in: ", founds);
}
{
// for comparison, we run also queries in TEXT protocol
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + %u",i,(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
int rc=mysql_real_query(mysql,buff,bl);
if (rc) {
fprintf(stderr, " mysql_real_query(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
MYSQL_RES *res=mysql_store_result(mysql);
if (res==NULL) {
fprintf(stderr, " mysql_store_result(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
mysql_free_result(res);
}
fprintf(stdout, "Executed %u queries in: ", i);
}
return 0;
}
int main() {
mysql_library_init(0,NULL,NULL);
GloMyStmt=new MySQL_STMT_Manager();
GloThrData = (thread_data_t **)malloc(sizeof(thread_data_t *)*NTHREADS);
int i;
for (i=0; i<NTHREADS; i++) {
GloThrData[i]=(thread_data_t *)malloc(sizeof(thread_data_t));
GloThrData[i]->thread = new std::thread(&mysql_thread,i);
}
for (i=0; i<NTHREADS; i++) {
GloThrData[i]->thread->join();
}
return 0;
}

@ -0,0 +1,190 @@
#include "proxysql.h"
#include "cpp.h"
#include <ctime>
#define QUERY1 "SELECT ?"
#define NUMPREP 100000
#define NUMPRO 1000
//#define NUMPREP 160
//#define NUMPRO 4
#define LOOPS 10
#define USER "root"
#define SCHEMA "test"
#define NTHREADS 4
typedef struct _thread_data_t {
MYSQL *mysql;
MYSQL_STMT **stmt;
} thread_data_t;
uint32_t statement_id;
uint16_t num_params;
uint16_t num_columns;
uint16_t warning_count;
MySQL_STMT_Manager *GloMyStmt;
struct cpu_timer
{
~cpu_timer()
{
auto end = std::clock() ;
std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ;
};
const std::clock_t begin = std::clock() ;
};
int run_stmt(MYSQL_STMT *stmt, int int_data) {
MYSQL_BIND bind[1];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// memset(bind, 0, sizeof(bind));
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
void * mysql_thread() {
std::mt19937 mt_rand(time(0));
MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local();
mysql = mysql_init(NULL);
char buff[128];
unsigned int bl=0;
if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
int i;
stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP);
{
cpu_timer t;
for (i=0; i<NUMPREP; i++) {
stmt[i] = mysql_stmt_init(mysql);
if (!stmt[i]) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a==NULL) {
if (mysql_stmt_prepare(stmt[i], buff, bl)) {
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt[i]));
exit(EXIT_FAILURE);
}
uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt[i]);
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt[i]->stmt_id, stmid);
local_stmts->insert(stmid,stmt[i]);
}
}
fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", NUMPREP, GloMyStmt->total_prepared_statements());
fprintf(stdout, "Created in: ");
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
//uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u random strings in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u hashes in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) founds++;
}
fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) {
// we have a prepared statement, we can run it
founds++;
MYSQL_STMT *stm=local_stmts->find(a->statement_id);
run_stmt(stm,(uint32_t)mt_rand());
}
}
fprintf(stdout, "Executed %u prepared statements in: ", founds);
}
{
// for comparison, we run also queries in TEXT protocol
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + %u",i,(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
int rc=mysql_real_query(mysql,buff,bl);
if (rc) {
fprintf(stderr, " mysql_real_query(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
MYSQL_RES *res=mysql_store_result(mysql);
if (res==NULL) {
fprintf(stderr, " mysql_store_result(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
mysql_free_result(res);
}
fprintf(stdout, "Executed %u queries in: ", i);
}
return 0;
}
int main() {
GloMyStmt=new MySQL_STMT_Manager();
}

@ -0,0 +1,271 @@
#include "proxysql.h"
#include "cpp.h"
#include <ctime>
#include <thread>
#define QUERY1 "SELECT ?"
#define NUMPREP 100000
#define NUMPRO 20000
//#define NUMPREP 160
//#define NUMPRO 4
#define LOOPS 1
#define USER "root"
#define SCHEMA "test"
#define NTHREADS 4
MySQL_STMT_Manager *GloMyStmt;
typedef struct _thread_data_t {
std::thread *thread;
MYSQL *mysql;
MYSQL_STMT **stmt;
} thread_data_t;
thread_data_t **GloThrData;
struct cpu_timer
{
~cpu_timer()
{
auto end = std::clock() ;
std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ;
};
const std::clock_t begin = std::clock() ;
};
int run_stmt(MYSQL_STMT *stmt, int int_data) {
MYSQL_BIND bind[1];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// memset(bind, 0, sizeof(bind));
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
void * mysql_thread(int tid) {
std::mt19937 mt_rand(time(0));
thread_data_t *THD;
THD=GloThrData[tid];
// in this version, each mysql thread has just ONE connection
// for now we use blocking API
MYSQL *mysql;
//MYSQL_STMT **stmt;
// we intialize the local mapping : MySQL_STMTs_local()
MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local();
// we initialize a MYSQL structure
THD->mysql = mysql_init(NULL);
mysql=THD->mysql;
char buff[128];
unsigned int bl=0;
// we establish a connection to the database
if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
int i;
// array of (MYSQL_STMT *) ; we don't use it in this version
//stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP);
MYSQL_STMT *stmt;
{
cpu_timer t;
// in this loop we create only some the prepared statements
for (i=0; i<NUMPREP/100; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a==NULL) { // no prepared statement was found in global
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
MySQL_STMT_Global_info *stmt_info=NULL;
stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt);
uint32_t stmid=stmt_info->statement_id;
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid);
local_stmts->insert(stmid,stmt);
}
}
fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", i, GloMyStmt->total_prepared_statements());
fprintf(stdout, "Created in: ");
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
//uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u random strings in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u hashes in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) founds++;
}
fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds);
}
{
unsigned int founds=0;
unsigned int created=0;
unsigned int executed=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) {
// we have a prepared statement, we can run it
MYSQL_STMT *stm=local_stmts->find(a->statement_id);
if (stm) { // the statement exists in local
run_stmt(stm,(uint32_t)mt_rand());
founds++;
executed++;
local_stmts->erase(a->statement_id);
} else { // the statement doesn't exist locally
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
local_stmts->insert(a->statement_id,stmt);
run_stmt(stmt,(uint32_t)mt_rand());
created++;
executed++;
local_stmts->erase(a->statement_id);
}
} else { // no prepared statement was found in global
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
MySQL_STMT_Global_info *stmt_info=NULL;
stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt);
uint32_t stmid=stmt_info->statement_id;
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid);
local_stmts->insert(stmid,stmt);
run_stmt(stmt,(uint32_t)mt_rand());
created++;
executed++;
local_stmts->erase(stmid);
}
}
fprintf(stdout, "Found %u , created %u and executed %u prepared statements in: ", founds, created, executed);
}
/*
{
// for comparison, we run also queries in TEXT protocol
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + %u",i,(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
int rc=mysql_real_query(mysql,buff,bl);
if (rc) {
fprintf(stderr, " mysql_real_query(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
MYSQL_RES *res=mysql_store_result(mysql);
if (res==NULL) {
fprintf(stderr, " mysql_store_result(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
mysql_free_result(res);
}
fprintf(stdout, "Executed %u queries in: ", i);
}
return 0;
*/
}
int main() {
// initialize mysql
mysql_library_init(0,NULL,NULL);
// create a new MySQL_STMT_Manager()
GloMyStmt=new MySQL_STMT_Manager();
GloThrData = (thread_data_t **)malloc(sizeof(thread_data_t *)*NTHREADS);
// starts N threads
int i;
for (i=0; i<NTHREADS; i++) {
GloThrData[i]=(thread_data_t *)malloc(sizeof(thread_data_t));
GloThrData[i]->thread = new std::thread(&mysql_thread,i);
}
// wait for the threads to complete
for (i=0; i<NTHREADS; i++) {
GloThrData[i]->thread->join();
}
return 0;
}

@ -0,0 +1,357 @@
#define PROXYSQL_EXTERN
#include "proxysql.h"
#include "cpp.h"
#include <ctime>
#include <thread>
#define QUERY1 "SELECT ?"
#define NUMPREP 100000
#define NUMPRO 20000
//#define NUMPREP 160
//#define NUMPRO 4
#define LOOPS 1
#define USER "root"
#define SCHEMA "test"
#define NTHREADS 4
MySQL_STMT_Manager *GloMyStmt;
Query_Cache *GloQC;
MySQL_Authentication *GloMyAuth;
Query_Processor *GloQPro;
ProxySQL_Admin *GloAdmin;
MySQL_Threads_Handler *GloMTH;
MySQL_Monitor *GloMyMon;
std::thread *MyMon_thread;
MySQL_Logger *GloMyLogger;
typedef struct _thread_data_t {
std::thread *thread;
MYSQL *mysql;
MYSQL_STMT **stmt;
} thread_data_t;
thread_data_t **GloThrData;
struct cpu_timer
{
~cpu_timer()
{
auto end = std::clock() ;
std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ;
};
const std::clock_t begin = std::clock() ;
};
int run_stmt(MYSQL_STMT *stmt, int int_data) {
MYSQL_BIND bind[1];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// memset(bind, 0, sizeof(bind));
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
void * mysql_thread(int tid) {
std::mt19937 mt_rand(time(0));
thread_data_t *THD;
THD=GloThrData[tid];
MySQL_Thread *worker = new MySQL_Thread();
worker->init();
char buff[128];
unsigned int bl=0;
MySrvC *mysrvc=new MySrvC((char *)"127.0.0.1", 3306, 100, MYSQL_SERVER_STATUS_ONLINE, 100, 0, 0, 0, 10000);
{
int i;
MySQL_Session **SESS=(MySQL_Session **)malloc(16*sizeof(MySQL_Session *));
for (i=0; i<16; i++) {
SESS[i]=new MySQL_Session();
MySQL_Session *sess=SESS[i];
sess->mirror==true;
sess->client_myds=NULL;
sess->client_myds = new MySQL_Data_Stream();
sess->client_myds->DSS=STATE_SLEEP;
sess->client_myds->sess=sess;
sess->client_myds->myds_type=MYDS_FRONTEND;
sess->client_myds->PSarrayIN= new PtrSizeArray();
sess->client_myds->PSarrayOUT= new PtrSizeArray();
worker->register_session(sess);
sess->current_hostgroup=0;
sess->default_hostgroup=0;
sess->mybe=sess->find_or_create_backend(sess->current_hostgroup);
MySQL_Connection *myconn=new MySQL_Connection();
sess->mybe->server_myds->attach_connection(myconn);
myconn->userinfo->set((char *)"root",(char *)"",(char *)"information_schema");
myconn->local_stmts = new MySQL_STMTs_local();
//myconn->mysql=mysql_init(NULL);
myconn->parent=mysrvc;
/*
if (!mysql_real_connect(myconn->mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(myconn->mysql));
exit(EXIT_FAILURE);
}*/
myconn->handler(0);
if (myconn->mysql==NULL) {
myconn->handler(0);
}
}
for (i=0; i<16; i++) {
MySQL_Session *sess=SESS[i];
sprintf(buff+5,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff+5);
mysql_hdr hdr;
hdr.pkt_id=0;
hdr.pkt_length=bl+1;
memcpy(buff,&hdr,sizeof(mysql_hdr));
buff[4]=0x16;
PtrSize_t pkt;
pkt.size=hdr.pkt_length+sizeof(mysql_hdr);
pkt.ptr=malloc(pkt.size);
memcpy(pkt.ptr,buff,pkt.size);
sess->client_myds->PSarrayIN->add(pkt.ptr,pkt.size);
sess->status=WAITING_CLIENT_DATA;;
sess->handler();
}
}
/*
// in this version, each mysql thread has just ONE connection
// for now we use blocking API
MYSQL *mysql;
//MYSQL_STMT **stmt;
// we intialize the local mapping : MySQL_STMTs_local()
MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local();
// we initialize a MYSQL structure
THD->mysql = mysql_init(NULL);
mysql=THD->mysql;
char buff[128];
unsigned int bl=0;
// we establish a connection to the database
if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
int i;
// array of (MYSQL_STMT *) ; we don't use it in this version
//stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP);
MYSQL_STMT *stmt;
{
cpu_timer t;
// in this loop we create only some the prepared statements
for (i=0; i<NUMPREP/100; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a==NULL) { // no prepared statement was found in global
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt);
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid);
local_stmts->insert(stmid,stmt);
}
}
fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", i, GloMyStmt->total_prepared_statements());
fprintf(stdout, "Created in: ");
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
//uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u random strings in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u hashes in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) founds++;
}
fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds);
}
{
unsigned int founds=0;
unsigned int created=0;
unsigned int executed=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) {
// we have a prepared statement, we can run it
MYSQL_STMT *stm=local_stmts->find(a->statement_id);
if (stm) { // the statement exists in local
run_stmt(stm,(uint32_t)mt_rand());
founds++;
executed++;
local_stmts->erase(a->statement_id);
} else { // the statement doesn't exist locally
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
local_stmts->insert(a->statement_id,stmt);
run_stmt(stmt,(uint32_t)mt_rand());
created++;
executed++;
local_stmts->erase(a->statement_id);
}
} else { // no prepared statement was found in global
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt);
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid);
local_stmts->insert(stmid,stmt);
run_stmt(stmt,(uint32_t)mt_rand());
created++;
executed++;
local_stmts->erase(stmid);
}
}
fprintf(stdout, "Found %u , created %u and executed %u prepared statements in: ", founds, created, executed);
}
*/
/*
{
// for comparison, we run also queries in TEXT protocol
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + %u",i,(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
int rc=mysql_real_query(mysql,buff,bl);
if (rc) {
fprintf(stderr, " mysql_real_query(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
MYSQL_RES *res=mysql_store_result(mysql);
if (res==NULL) {
fprintf(stderr, " mysql_store_result(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
mysql_free_result(res);
}
fprintf(stdout, "Executed %u queries in: ", i);
}
return 0;
*/
}
int main() {
// initialize mysql
mysql_library_init(0,NULL,NULL);
sqlite3_config(SQLITE_CONFIG_URI, 1);
MyHGM=new MySQL_HostGroups_Manager();
GloMTH=new MySQL_Threads_Handler();
GloMyLogger = new MySQL_Logger();
GloVars.datadir=(char *)".";
GloVars.admindb=(char *)"proxysql.db";
GloAdmin = new ProxySQL_Admin();
GloAdmin->init();
// create a new MySQL_STMT_Manager()
GloMyStmt=new MySQL_STMT_Manager();
GloThrData = (thread_data_t **)malloc(sizeof(thread_data_t *)*NTHREADS);
// starts N threads
int i;
for (i=0; i<NTHREADS; i++) {
GloThrData[i]=(thread_data_t *)malloc(sizeof(thread_data_t));
GloThrData[i]->thread = new std::thread(&mysql_thread,i);
}
while (glovars.shutdown==0) {
sleep(1); // FIXME: TERRIBLE UGLY
}
// wait for the threads to complete
for (i=0; i<NTHREADS; i++) {
GloThrData[i]->thread->join();
}
return 0;
}

@ -0,0 +1,369 @@
/*
this is a modified version of client5.cpp
that supports async calls
*/
#include "proxysql.h"
#include "cpp.h"
#include <ctime>
#include <thread>
#define QUERY1 "SELECT ?"
#define NUMPREP 100000
#define NUMPRO 20000
//#define NUMPREP 160
//#define NUMPRO 4
#define LOOPS 1
#define USER "root"
#define SCHEMA "test"
#define NTHREADS 4
static int wait_for_mysql(MYSQL *mysql, int status) {
struct pollfd pfd;
int timeout, res;
pfd.fd = mysql_get_socket(mysql);
pfd.events =
(status & MYSQL_WAIT_READ ? POLLIN : 0) |
(status & MYSQL_WAIT_WRITE ? POLLOUT : 0) |
(status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0);
// if (status & MYSQL_WAIT_TIMEOUT)
// timeout = 1000*mysql_get_timeout_value(mysql);
// else
timeout = -1;
res = poll(&pfd, 1, timeout);
if (res == 0)
return MYSQL_WAIT_TIMEOUT;
else if (res < 0)
return MYSQL_WAIT_TIMEOUT;
else {
int status = 0;
if (pfd.revents & POLLIN) status |= MYSQL_WAIT_READ;
if (pfd.revents & POLLOUT) status |= MYSQL_WAIT_WRITE;
if (pfd.revents & POLLPRI) status |= MYSQL_WAIT_EXCEPT;
return status;
}
}
MySQL_STMT_Manager *GloMyStmt;
typedef struct _thread_data_t {
std::thread *thread;
MYSQL *mysql;
MYSQL_STMT **stmt;
} thread_data_t;
thread_data_t **GloThrData;
struct cpu_timer
{
~cpu_timer()
{
auto end = std::clock() ;
std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ;
};
const std::clock_t begin = std::clock() ;
};
int run_stmt(MYSQL *mysql, MYSQL_STMT *stmt, int int_data) {
int async_exit_status=0;
int ret_int;
MYSQL_BIND bind[1];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
/* blocking
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
*/
async_exit_status=mysql_stmt_execute_start(&ret_int, stmt);
while (async_exit_status) {
async_exit_status=wait_for_mysql(mysql, async_exit_status);
async_exit_status=mysql_stmt_execute_cont(&ret_int, stmt, async_exit_status);
}
if (ret_int) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
async_exit_status=mysql_stmt_store_result_start(&ret_int, stmt);
while (async_exit_status) {
async_exit_status=wait_for_mysql(mysql, async_exit_status);
async_exit_status=mysql_stmt_store_result_cont(&ret_int, stmt, async_exit_status);
}
if (ret_int) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
void * mysql_thread(int tid) {
std::mt19937 mt_rand(time(0));
thread_data_t *THD;
THD=GloThrData[tid];
// in this version, each mysql thread has just ONE connection
// for now we use blocking API
MYSQL *mysql;
//MYSQL_STMT **stmt;
// we intialize the local mapping : MySQL_STMTs_local()
MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local();
// we initialize a MYSQL structure
THD->mysql = mysql_init(NULL);
mysql=THD->mysql;
char buff[128];
unsigned int bl=0;
// we establish a connection to the database
if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
int i;
//set not blocking
mysql_options(mysql, MYSQL_OPT_NONBLOCK, 0);
int async_exit_status=0;
int ret_int;
// array of (MYSQL_STMT *) ; we don't use it in this version
//stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP);
MYSQL_STMT *stmt;
{
cpu_timer t;
// in this loop we create only some the prepared statements
for (i=0; i<NUMPREP/100; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a==NULL) { // no prepared statement was found in global
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
/* blocking call
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
*/
async_exit_status=mysql_stmt_prepare_start(&ret_int, stmt, buff, bl);
while (async_exit_status) {
async_exit_status=wait_for_mysql(mysql, async_exit_status);
async_exit_status=mysql_stmt_prepare_cont(&ret_int, stmt, async_exit_status);
}
if (ret_int) {
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
MySQL_STMT_Global_info *stmt_info=NULL;
stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt);
uint32_t stmid=stmt_info->statement_id;
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid);
local_stmts->insert(stmid,stmt);
}
}
fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", i, GloMyStmt->total_prepared_statements());
fprintf(stdout, "Created in: ");
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
//uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u random strings in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
//MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
//if (a) founds++;
}
fprintf(stdout, "Computed %u hashes in: ", i);
}
{
unsigned int founds=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) founds++;
}
fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds);
}
{
unsigned int founds=0;
unsigned int created=0;
unsigned int executed=0;
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
uint64_t hash=local_stmts->compute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl);
MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash);
if (a) {
// we have a prepared statement, we can run it
MYSQL_STMT *stm=local_stmts->find(a->statement_id);
if (stm) { // the statement exists in local
run_stmt(mysql, stm,(uint32_t)mt_rand());
founds++;
executed++;
local_stmts->erase(a->statement_id);
} else { // the statement doesn't exist locally
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
/* blocking
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
*/
async_exit_status=mysql_stmt_prepare_start(&ret_int, stmt, buff, bl);
while (async_exit_status) {
async_exit_status=wait_for_mysql(mysql, async_exit_status);
async_exit_status=mysql_stmt_prepare_cont(&ret_int, stmt, async_exit_status);
}
if (ret_int) {
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
local_stmts->insert(a->statement_id,stmt);
run_stmt(mysql, stmt,(uint32_t)mt_rand());
created++;
executed++;
local_stmts->erase(a->statement_id);
}
} else { // no prepared statement was found in global
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
/* blocking
if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
*/
async_exit_status=mysql_stmt_prepare_start(&ret_int, stmt, buff, bl);
while (async_exit_status) {
async_exit_status=wait_for_mysql(mysql, async_exit_status);
async_exit_status=mysql_stmt_prepare_cont(&ret_int, stmt, async_exit_status);
}
if (ret_int) {
fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
MySQL_STMT_Global_info *stmt_info=NULL;
stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt);
uint32_t stmid=stmt_info->statement_id;
if (NUMPRO < 32)
fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid);
local_stmts->insert(stmid,stmt);
run_stmt(mysql, stmt,(uint32_t)mt_rand());
created++;
executed++;
local_stmts->erase(stmid);
}
}
fprintf(stdout, "Found %u , created %u and executed %u prepared statements in: ", founds, created, executed);
}
/*
{
// for comparison, we run also queries in TEXT protocol
cpu_timer t;
for (i=0; i<NUMPREP*LOOPS; i++) {
sprintf(buff,"SELECT %u + %u",i,(uint32_t)mt_rand()%NUMPRO);
bl=strlen(buff);
int rc=mysql_real_query(mysql,buff,bl);
if (rc) {
fprintf(stderr, " mysql_real_query(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
MYSQL_RES *res=mysql_store_result(mysql);
if (res==NULL) {
fprintf(stderr, " mysql_store_result(), failed: %s\n" , mysql_error(mysql));
exit(EXIT_FAILURE);
}
mysql_free_result(res);
}
fprintf(stdout, "Executed %u queries in: ", i);
}
return 0;
*/
}
int main() {
// initialize mysql
mysql_library_init(0,NULL,NULL);
// create a new MySQL_STMT_Manager()
GloMyStmt=new MySQL_STMT_Manager();
GloThrData = (thread_data_t **)malloc(sizeof(thread_data_t *)*NTHREADS);
// starts N threads
int i;
for (i=0; i<NTHREADS; i++) {
GloThrData[i]=(thread_data_t *)malloc(sizeof(thread_data_t));
GloThrData[i]->thread = new std::thread(&mysql_thread,i);
}
// wait for the threads to complete
for (i=0; i<NTHREADS; i++) {
GloThrData[i]->thread->join();
}
return 0;
}

Binary file not shown.

@ -0,0 +1,79 @@
#include "proxysql.h"
#include "cpp.h"
// dummy copy of client1.cpp
#define QUERY1 "SELECT * from sbtest limit 10"
MYSQL *mysql;
MYSQL_STMT *stmt;
uint32_t statement_id;
uint16_t num_params;
uint16_t num_columns;
uint16_t warning_count;
int run_stmt(MYSQL_STMT *stmt, int int_data) {
MYSQL_BIND bind[3];
MYSQL_RES *prepare_meta_result;
bind[0].buffer_type= MYSQL_TYPE_LONG;
bind[0].buffer= (char *)&int_data;
bind[0].is_null= 0;
bind[0].length= 0;
bind[1].buffer_type= MYSQL_TYPE_LONG;
bind[1].buffer= (char *)&int_data;
bind[1].is_null= 0;
bind[1].length= 0;
bind[2].buffer_type= MYSQL_TYPE_LONG;
bind[2].buffer= (char *)&int_data;
bind[2].is_null= 0;
bind[2].length= 0;
/*
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, " mysql_stmt_bind_param() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
*/
prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, " mysql_stmt_execute(), 1 failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// memset(bind, 0, sizeof(bind));
if (mysql_stmt_store_result(stmt)) {
fprintf(stderr, " mysql_stmt_store_result() failed\n");
fprintf(stderr, " %s\n", mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
mysql_free_result(prepare_meta_result);
return 0;
}
int main() {
std::mt19937 mt_rand(time(0));
mysql = mysql_init(NULL);
if (!mysql_real_connect(mysql,"127.0.0.1","msandbox","msandbox","sbtest",6033,NULL,0)) {
//if (!mysql_real_connect(mysql,"127.0.0.1","root","","test",3306,NULL,0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql));
exit(EXIT_FAILURE);
}
stmt = mysql_stmt_init(mysql);
if (!stmt) {
fprintf(stderr, " mysql_stmt_init(), out of memory\n");
exit(EXIT_FAILURE);
}
if (mysql_stmt_prepare(stmt, QUERY1, strlen(QUERY1))) {
fprintf(stderr, " mysql_stmt_prepare(), failed: %d, %s\n" , mysql_errno(mysql), mysql_stmt_error(stmt));
exit(EXIT_FAILURE);
}
// param_count= mysql_stmt_param_count(stmt);
// fprintf(stdout, " total parameters in Query1 : %d\n", param_count);
statement_id=stmt->stmt_id;
num_params=stmt->param_count;
num_columns=stmt->field_count;
warning_count=stmt->upsert_status.warning_count;
fprintf(stdout, "statement_id=%d , columns=%d , params=%d , warnings=%d\n", statement_id, num_columns, num_params, warning_count);
run_stmt(stmt,(uint32_t)mt_rand());
return 0;
}
Loading…
Cancel
Save