diff --git a/include/MySQL_PreparedStatement.h b/include/MySQL_PreparedStatement.h new file mode 100644 index 000000000..600fecb57 --- /dev/null +++ b/include/MySQL_PreparedStatement.h @@ -0,0 +1,116 @@ +#ifndef CLASS_MYSQL_PREPARED_STATEMENT_H +#define CLASS_MYSQL_PREPARED_STATEMENT_H + +#include "proxysql.h" +#include "cpp.h" + +/* +One of the main challenge in handling prepared statement (PS) is that a single +PS could be executed on multiple backends, and on each backend it could have a +different stmt_id. +For this reason ProxySQL returns to the client a stmt_id generated by the proxy +itself, and internally maps client's stmt_id with the backend stmt_id. + +The implementation in ProxySQL is, simplified, the follow: +* when a client sends a MYSQL_COM_STMT_PREPARE, ProxySQL executes it to one of + the backend +* the backend returns a stmt_id. This stmt_id is NOT returned to the client. The + stmt_id returned from the backend is stored in MySQL_STMTs_local(), and + MySQL_STMTs_local() is responsible for mapping the connection's MYSQL_STMT + and a global_stmt_id +* the global_stmt_id is the stmt_id returned to the client +* the global_stmt_id is used to locate the relevant MySQL_STMT_Global_info() in + MySQL_STMT_Manager() +* MySQL_STMT_Global_info() stores all metadata associated with a PS +* MySQL_STMT_Manager() is responsible for storing all MySQL_STMT_Global_info() + in global structures accessible and shareble by all threads. + +To summarie the most important classes: +* MySQL_STMT_Global_info() stores all metadata associated with a PS +* MySQL_STMT_Manager() stores all the MySQL_STMT_Global_info(), indexes using + a global_stmt_id that iis the stmt_id generated by ProxySQL and returned to + the client +* MySQL_STMTs_local() associate PS located in a backend connection to a + global_stmt_id +*/ + + +// class MySQL_STMTs_local associates a global statement ID with a local statement ID for a specific connection +class MySQL_STMTs_local { + private: + unsigned int num_entries; + std::map m; + public: + MySQL_STMTs_local() { + num_entries=0; + } + ~MySQL_STMTs_local(); + // we declare it here to be inline + void insert(uint32_t global_statement_id, MYSQL_STMT *stmt) { + std::pair::iterator,bool> ret; + ret=m.insert(std::make_pair(global_statement_id, stmt)); + if (ret.second==true) { + num_entries++; + } + } + // we declare it here to be inline + MYSQL_STMT * find(uint32_t global_statement_id) { + auto s=m.find(global_statement_id); + if (s!=m.end()) { // found + return s->second; + } + return NULL; // not found + } + bool erase(uint32_t global_statement_id); + uint64_t compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length); +}; + + + +// class MySQL_STMT_Global_info represents information about a MySQL Prepared Statement +// it is an internal representation of prepared statement +// it include all metadata associated with it +class MySQL_STMT_Global_info { + private: + void compute_hash(); + public: + uint64_t hash; + char *username; + char *schemaname; + char *query; + unsigned int query_length; + unsigned int hostgroup_id; + int ref_count; + uint32_t statement_id; + uint16_t num_columns; + uint16_t num_params; + uint16_t warning_count; + MYSQL_FIELD **fields; + struct { + int cache_ttl; + int timeout; + int delay; + } properties; + //MYSQL_BIND **params; // seems unused + MySQL_STMT_Global_info(uint32_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h); + ~MySQL_STMT_Global_info(); +}; + +class MySQL_STMT_Manager { + private: + uint32_t next_statement_id; + rwlock_t rwlock; + std::map m; // map using statement id + std::map h; // map using hashes + public: + MySQL_STMT_Manager(); + ~MySQL_STMT_Manager(); + int ref_count(uint32_t statement_id, int cnt, bool lock=true); + MySQL_STMT_Global_info * add_prepared_statement(unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock=true); + MySQL_STMT_Global_info * add_prepared_statement(unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock=true); + MySQL_STMT_Global_info * find_prepared_statement_by_stmt_id(uint32_t id, bool lock=true); + MySQL_STMT_Global_info * find_prepared_statement_by_hash(uint64_t hash, bool lock=true); + uint32_t total_prepared_statements() { return next_statement_id-1; } +}; + +#endif /* CLASS_MYSQL_PREPARED_STATEMENT_H */ diff --git a/include/MySQL_Protocol.h b/include/MySQL_Protocol.h index 000298ba5..117344d32 100644 --- a/include/MySQL_Protocol.h +++ b/include/MySQL_Protocol.h @@ -6,6 +6,31 @@ #define RESULTSET_BUFLEN 16300 +class stmt_execute_metadata_t { + public: + uint32_t stmt_id; + uint8_t flags; + uint16_t num_params; + MYSQL_BIND *binds; + my_bool *is_nulls; + unsigned long *lengths; + void *pkt; + stmt_execute_metadata_t() { + binds=NULL; + is_nulls=NULL; + lengths=NULL; + pkt=NULL; + } + ~stmt_execute_metadata_t() { + if (binds) + free(binds); + if (is_nulls) + free(is_nulls); + if (lengths) + free(lengths); + } +}; + class MySQL_ResultSet { private: public: @@ -20,9 +45,10 @@ class MySQL_ResultSet { unsigned int num_rows; unsigned long long resultset_size; PtrSizeArray *PSarrayOUT; - MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my); + MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my, MYSQL_STMT *_stmt=NULL); ~MySQL_ResultSet(); unsigned int add_row(MYSQL_ROW row); + unsigned int add_row2(MYSQL_ROWS *row, unsigned char *offset); void add_eof(); bool get_resultset(PtrSizeArray *PSarrayFinal); unsigned char *buffer; @@ -100,5 +126,10 @@ class MySQL_Protocol { bool process_pkt_COM_QUERY(unsigned char *pkt, unsigned int len); bool process_pkt_COM_CHANGE_USER(unsigned char *pkt, unsigned int len); void * Query_String_to_packet(uint8_t sid, std::string *s, unsigned int *l); + + // prepared statements + bool generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info); + + stmt_execute_metadata_t * get_binds_from_pkt(void *ptr, unsigned int size, uint16_t num_params); }; #endif /* __CLASS_MYSQL_PROTOCOL_H */ diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 5a0df3db9..a2f69a11c 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -23,6 +23,10 @@ class Query_Info { unsigned long long start_time; unsigned long long end_time; + MYSQL_STMT *mysql_stmt; + stmt_execute_metadata_t *stmt_meta; + uint32_t stmt_global_id; + int QueryLength; enum MYSQL_COM_QUERY_command MyComQueryCmd; @@ -60,7 +64,7 @@ class MySQL_Session void handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *); void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_SET_OPTION(PtrSize_t *); void handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STATISTICS(PtrSize_t *); - bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *); + bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *, bool ps=false); void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection(); @@ -71,6 +75,8 @@ class MySQL_Session // void return_MySQL_Connection_To_Poll(MySQL_Data_Stream *); +// MySQL_STMT_Manager *Session_STMT_Manager; + public: void * operator new(size_t); void operator delete(void *); @@ -149,6 +155,7 @@ class MySQL_Session void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *); void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS); + void MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt); unsigned int NumActiveTransactions(); int FindOneActiveTransaction(); unsigned long long IdleTime(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index d196e0eb4..c0b0c8cbe 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -161,6 +161,9 @@ class MySQL_Thread // in this way, there is no need for atomic operation and there is no cache miss // when it is needed a total, all threads are checked struct { + unsigned long long stmt_prepare; + unsigned long long stmt_execute; + unsigned long long stmt_close; unsigned long long queries; unsigned long long queries_slow; unsigned long long queries_backends_bytes_sent; @@ -289,6 +292,7 @@ class MySQL_Threads_Handler bool have_compress; bool client_found_rows; bool multiplexing; +// bool stmt_multiplexing; bool enforce_autocommit_on_reads; int max_allowed_packet; int max_transaction_time; @@ -351,6 +355,9 @@ class MySQL_Threads_Handler SQLite3_result * SQL3_Processlist(); SQLite3_result * SQL3_GlobalStatus(); bool kill_session(uint32_t _thread_session_id); + unsigned long long get_total_stmt_prepare(); + unsigned long long get_total_stmt_execute(); + unsigned long long get_total_stmt_close(); unsigned long long get_total_queries(); unsigned long long get_slow_queries(); unsigned long long get_queries_backends_bytes_recv(); diff --git a/include/cpp.h b/include/cpp.h index 041b526cd..0b865a8dc 100644 --- a/include/cpp.h +++ b/include/cpp.h @@ -18,6 +18,7 @@ #include "proxysql_admin.h" #include "MySQL_HostGroups_Manager.h" #include "MySQL_Logger.hpp" +#include "MySQL_PreparedStatement.h" #undef swap #undef min #undef max diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 4034c8073..9298de327 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -49,12 +49,15 @@ class MySQL_Connection { struct { unsigned long length; char *ptr; + MYSQL_STMT *stmt; + stmt_execute_metadata_t *stmt_meta; } query; char scramble_buff[40]; unsigned long long creation_time; unsigned long long last_time_used; unsigned long long timeout; int fd; + MySQL_STMTs_local *local_stmts; // local view of prepared statements MYSQL *mysql; MYSQL *ret_mysql; MYSQL_RES *mysql_result; @@ -125,9 +128,18 @@ class MySQL_Connection { int async_select_db(short event); int async_set_autocommit(short event, bool); int async_set_names(short event, uint8_t nr); - int async_query(short event, char *stmt, unsigned long length); int async_send_simple_command(short event, char *stmt, unsigned long length); // no result set expected + int async_query(short event, char *stmt, unsigned long length, MYSQL_STMT **_stmt=NULL, stmt_execute_metadata_t *_stmt_meta=NULL); int async_ping(short event); + + void stmt_prepare_start(); + void stmt_prepare_cont(short event); + void stmt_execute_start(); + void stmt_execute_cont(short event); + void stmt_execute_store_result_start(); + void stmt_execute_store_result_cont(short event); + + void async_free_result(); bool IsActiveTransaction(); bool IsAutoCommit(); diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index d9197893e..8030168ee 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -61,6 +61,16 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine ASYNC_INITDB_END, ASYNC_INITDB_SUCCESSFUL, ASYNC_INITDB_FAILED, + ASYNC_STMT_PREPARE_START, + ASYNC_STMT_PREPARE_CONT, + ASYNC_STMT_PREPARE_END, + ASYNC_STMT_PREPARE_SUCCESSFUL, + ASYNC_STMT_PREPARE_FAILED, + ASYNC_STMT_EXECUTE_START, + ASYNC_STMT_EXECUTE_CONT, + ASYNC_STMT_EXECUTE_STORE_RESULT_START, + ASYNC_STMT_EXECUTE_STORE_RESULT_CONT, + ASYNC_STMT_EXECUTE_END, ASYNC_IDLE }; @@ -121,6 +131,8 @@ enum session_status { CHANGING_USER_SERVER, SETTING_INIT_CONNECT, FAST_FORWARD, + PROCESSING_STMT_PREPARE, + PROCESSING_STMT_EXECUTE, NONE }; @@ -252,7 +264,7 @@ typedef struct _proxysql_mysql_thread_t proxysql_mysql_thread_t; typedef struct { char * table_name; char * table_def; } table_def_t; typedef struct __SQP_query_parser_t SQP_par_t; //typedef struct _mysql_server_t mysql_server_t; - +//typedef struct _stmt_execute_metadata_t stmt_execute_metadata_t; #endif /* PROXYSQL_TYPEDEFS */ //#ifdef __cplusplus @@ -282,6 +294,7 @@ class ProxySQL_ConfigFile; class Query_Info; //class MySQL_Server; class SQLite3_result; +class stmt_execute_metadata_t; //class MySQL_Servers; //class MySQL_Hostgroup_Entry; //class MySQL_Hostgroup; @@ -546,7 +559,17 @@ struct _mysql_session_t { }; - +/* +struct _stmt_execute_metadata_t { + uint32_t stmt_id; + uint8_t flags; + uint16_t num_params; + MYSQL_BIND *binds; + my_bool *is_nulls; + unsigned long *lengths; + void *pkt; +}; +*/ #endif /* PROXYSQL_STRUCTS */ @@ -730,6 +753,7 @@ __thread int mysql_thread___poll_timeout_on_failure; __thread bool mysql_thread___have_compress; __thread bool mysql_thread___client_found_rows; __thread bool mysql_thread___multiplexing; +// __thread bool mysql_thread___stmt_multiplexing; __thread bool mysql_thread___enforce_autocommit_on_reads; __thread bool mysql_thread___servers_stats; __thread bool mysql_thread___commands_stats; @@ -805,6 +829,7 @@ extern __thread int mysql_thread___poll_timeout_on_failure; extern __thread bool mysql_thread___have_compress; extern __thread bool mysql_thread___client_found_rows; extern __thread bool mysql_thread___multiplexing; +// extern __thread bool mysql_thread___stmt_multiplexing; extern __thread bool mysql_thread___enforce_autocommit_on_reads; extern __thread bool mysql_thread___servers_stats; extern __thread bool mysql_thread___commands_stats; diff --git a/lib/Makefile b/lib/Makefile index 2b04a94cb..e5625fd6a 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -47,7 +47,7 @@ default: libproxysql.a _OBJ = c_tokenizer.o OBJ = $(patsubst %,$(ODIR)/%,$(_OBJ)) -_OBJ_CPP = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo simple_kv.oo sqlite3db.oo global_variables.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo +_OBJ_CPP = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo simple_kv.oo sqlite3db.oo global_variables.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo OBJ_CPP = $(patsubst %,$(ODIR)/%,$(_OBJ_CPP)) %.ko: %.cpp diff --git a/lib/MySQL_PreparedStatement.cpp b/lib/MySQL_PreparedStatement.cpp new file mode 100644 index 000000000..b6ca95937 --- /dev/null +++ b/lib/MySQL_PreparedStatement.cpp @@ -0,0 +1,281 @@ +#include "proxysql.h" +#include "cpp.h" + +#include "SpookyV2.h" + +extern MySQL_STMT_Manager *GloMyStmt; + + + + +static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length) { + int l=0; + l+=sizeof(hostgroup); + l+=strlen(user); + l+=strlen(schema); +// two random seperators +#define _COMPUTE_HASH_DEL1_ "-ujhtgf76y576574fhYTRDFwdt-" +#define _COMPUTE_HASH_DEL2_ "-8k7jrhtrgJHRgrefgreRFewg6-" + l+=strlen(_COMPUTE_HASH_DEL1_); + l+=strlen(_COMPUTE_HASH_DEL2_); + l+=query_length; + char *buf=(char *)malloc(l); + l=0; + // write hostgroup + memcpy(buf,&hostgroup,sizeof(hostgroup)); + l+=sizeof(hostgroup); + + // write user + strcpy(buf+l,user); + l+=strlen(user); + + // write delimiter1 + strcpy(buf+l,_COMPUTE_HASH_DEL1_); + l+=strlen(_COMPUTE_HASH_DEL1_); + + // write schema + strcpy(buf+l,schema); + l+=strlen(schema); + + // write delimiter2 + strcpy(buf+l,_COMPUTE_HASH_DEL2_); + l+=strlen(_COMPUTE_HASH_DEL2_); + + // write query + memcpy(buf+l,query,query_length); + l+=query_length; + + uint64_t hash=SpookyHash::Hash64(buf,l,0); + free(buf); + return hash; +} + + +void MySQL_STMT_Global_info::compute_hash() { + hash=stmt_compute_hash(hostgroup_id, username, schemaname, query, query_length); +} + +uint64_t MySQL_STMTs_local::compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length){ + uint64_t hash; + hash=stmt_compute_hash(hostgroup, user, schema, query, query_length); + return hash; +} + +MySQL_STMTs_local::~MySQL_STMTs_local() { + // Note: we do not free the prepared statements because we assume that + // if we call this destructor the connection is being destroyed anyway + for (std::map::iterator it=m.begin(); it!=m.end(); ++it) { + uint32_t stmt_id=it->first; + GloMyStmt->ref_count(stmt_id,-1); + } + m.erase(m.begin(),m.end()); +} + +bool MySQL_STMTs_local::erase(uint32_t global_statement_id) { + auto s=m.find(global_statement_id); + if (s!=m.end()) { // found + if (num_entries>1000) { + MYSQL_STMT *stmt=s->second; + mysql_stmt_close(stmt); + m.erase(s); + num_entries--; + return true; // we truly removed the prepared statement + } + } + return false; // we don't really remove the prepared statement +} + +MySQL_STMT_Manager::MySQL_STMT_Manager() { + spinlock_rwlock_init(&rwlock); + next_statement_id=1; // we initialize this as 1 because we 0 is not allowed +} + +MySQL_STMT_Manager::~MySQL_STMT_Manager() { + for (std::map::iterator it=m.begin(); it!=m.end(); ++it) { + MySQL_STMT_Global_info *a=it->second; + delete a; + } + m.erase(m.begin(),m.end()); + // we do not loop in h because all the MySQL_STMT_Global_info() were already deleted + h.erase(h.begin(),h.end()); +} + +int MySQL_STMT_Manager::ref_count(uint32_t statement_id, int cnt, bool lock) { + int ret=-1; + if (lock) { + spin_wrlock(&rwlock); + } + auto s = m.find(statement_id); + if (s!=m.end()) { + MySQL_STMT_Global_info *a=s->second; + a->ref_count+=cnt; + ret=a->ref_count; + } + if (lock) { + spin_wrunlock(&rwlock); + } + return ret; +} + +MySQL_STMT_Global_info * MySQL_STMT_Manager::add_prepared_statement(unsigned int _h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock) { + return add_prepared_statement(_h, u, s, q, ql, stmt, -1, -1, -1, lock); +} + +MySQL_STMT_Global_info * MySQL_STMT_Manager::add_prepared_statement(unsigned int _h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock) { + MySQL_STMT_Global_info *ret=NULL; + uint64_t hash=stmt_compute_hash(_h, u, s, q, ql); // this identifies the prepared statement + if (lock) { + spin_wrlock(&rwlock); + } + // try to find the statement + auto f = h.find(hash); + if (f!=h.end()) { + // found it! + //MySQL_STMT_Global_info *a=f->second; + //ret=a->statement_id; + ret=f->second; + } else { + // we need to create a new one + MySQL_STMT_Global_info *a=new MySQL_STMT_Global_info(next_statement_id,_h,u,s,q,ql,stmt,hash); + a->properties.cache_ttl=_cache_ttl; + a->properties.timeout=_timeout; + a->properties.delay=_delay; + // insert it in both maps + m.insert(std::make_pair(a->statement_id, a)); + h.insert(std::make_pair(a->hash, a)); + //ret=a->statement_id; + ret=a; + next_statement_id++; // increment it + } + + if (lock) { + spin_wrunlock(&rwlock); + } + return ret; +} + + +MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_stmt_id(uint32_t id, bool lock) { + MySQL_STMT_Global_info *ret=NULL; // assume we do not find it + if (lock) { + spin_wrlock(&rwlock); + } + + auto s=m.find(id); + if (s!=m.end()) { + ret=s->second; + } + + if (lock) { + spin_wrunlock(&rwlock); + } + return ret; +} + +MySQL_STMT_Global_info * MySQL_STMT_Manager::find_prepared_statement_by_hash(uint64_t hash, bool lock) { + MySQL_STMT_Global_info *ret=NULL; // assume we do not find it + if (lock) { + spin_wrlock(&rwlock); + } + + auto s=h.find(hash); + if (s!=h.end()) { + ret=s->second; + } + + if (lock) { + spin_wrunlock(&rwlock); + } + return ret; +} + +MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint32_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h) { + statement_id=id; + hostgroup_id=h; + ref_count=0; + username=strdup(u); + schemaname=strdup(s); + query=(char *)malloc(ql); + memcpy(query,q,ql); + query_length=ql; + num_params=stmt->param_count; + num_columns=stmt->field_count; + warning_count=stmt->upsert_status.warning_count; + if (_h) { + hash=_h; + } else { + compute_hash(); + } + + // set default properties: + properties.cache_ttl=-1; + properties.timeout=-1; + properties.delay=-1; + + fields=NULL; + if (num_columns) { + fields=(MYSQL_FIELD **)malloc(num_columns*sizeof(MYSQL_FIELD *)); + uint16_t i; + for (i=0;ifields[i]); + MYSQL_FIELD *fd=fields[i]; + // first copy all fields + memcpy(fd,fs,sizeof(MYSQL_FIELD)); + // then duplicate strings + fd->name = ( fs->name ? strdup(fs->name) : NULL ); + fd->org_name = ( fs->org_name ? strdup(fs->org_name) : NULL ); + fd->table = ( fs->table ? strdup(fs->table) : NULL ); + fd->org_table = ( fs->org_table ? strdup(fs->org_table) : NULL ); + fd->db = ( fs->db ? strdup(fs->db) : NULL ); + fd->catalog = ( fs->catalog ? strdup(fs->catalog) : NULL ); + fd->def = ( fs->def ? strdup(fs->def) : NULL ); + } + } +/* + params=NULL; + if(num_params) { + params=(MYSQL_BIND **)malloc(num_columns*sizeof(MYSQL_BIND *)); + uint16_t i; + for (i=0;iparams[i]); + MYSQL_BIND *pd=params[i]; + // copy all params + memcpy(pd,ps,sizeof(MYSQL_BIND)); + } + } +*/ +} + +MySQL_STMT_Global_info::~MySQL_STMT_Global_info() { + free(username); + free(schemaname); + free(query); + if (num_columns) { + uint16_t i; + for (i=0;iname) { free(f->name); f->name=NULL; } + if (f->org_name) { free(f->org_name); f->org_name=NULL; } + if (f->table) { free(f->table); f->table=NULL; } + if (f->org_table) { free(f->org_table); f->org_table=NULL; } + if (f->db) { free(f->db); f->db=NULL; } + if (f->catalog) { free(f->catalog); f->catalog=NULL; } + if (f->def) { free(f->def); f->def=NULL; } + free(fields[i]); + } + free(fields); + fields=NULL; + } +/* + if (num_params) { + uint16_t i; + for (i=0;iDSS=STATE_EOF2; break; default: - assert(0); + //assert(0); + break; } } if (len) { *len=size; } @@ -769,6 +770,51 @@ bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len } +// FIXME FIXME function not completed yet! +// see https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html +bool MySQL_Protocol::generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info) { + uint8_t sid=sequence_id; + uint16_t i; + char *okpack=(char *)malloc(16); // first packet + mysql_hdr hdr; + hdr.pkt_id=sid; + hdr.pkt_length=12; + memcpy(okpack,&hdr,sizeof(mysql_hdr)); // copy header + okpack[4]=0; + okpack[13]=0; + memcpy(okpack+5,&stmt_info->statement_id,sizeof(uint32_t)); + memcpy(okpack+9,&stmt_info->num_columns,sizeof(uint16_t)); + memcpy(okpack+11,&stmt_info->num_params,sizeof(uint16_t)); + memcpy(okpack+14,&stmt_info->warning_count,sizeof(uint16_t)); + (*myds)->PSarrayOUT->add((void *)okpack,16); + sid++; + if (stmt_info->num_params) { + for (i=0; inum_params; i++) { + generate_pkt_field(true,NULL,NULL,sid, + (char *)"", (char *)"", (char *)"", (char *)"", (char *)"", + 0,0,0,0,0,false,0,NULL); + sid++; + } + generate_pkt_EOF(true,NULL,NULL,sid,0,SERVER_STATUS_AUTOCOMMIT); // FIXME : for now we pass a very broken flag + sid++; + } + if (stmt_info->num_columns) { + for (i=0; inum_columns; i++) { + MYSQL_FIELD *fd=stmt_info->fields[i]; + //bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, char *schema, char *table, char *org_table, char *name, char *org_name, uint16_t charset, uint32_t column_length, uint8_t type, uint16_t flags, uint8_t decimals, bool field_list, uint64_t defvalue_length, char *defvalue) { + generate_pkt_field(true,NULL,NULL,sid, + fd->db, + fd->table, fd->org_table, + fd->name, fd->org_name, + fd->charsetnr, 0, fd->type, fd->flags, fd->decimals, false,0,NULL); + sid++; + } + generate_pkt_EOF(true,NULL,NULL,sid,0,SERVER_STATUS_AUTOCOMMIT); // FIXME : for now we pass a very broken flag + sid++; + } + return true; +} + bool MySQL_Protocol::generate_pkt_row(bool send, void **ptr, unsigned int *len, uint8_t sequence_id, int colnums, unsigned long *fieldslen, char **fieldstxt) { int col=0; int rowlen=0; @@ -1425,7 +1471,146 @@ void * MySQL_Protocol::Query_String_to_packet(uint8_t sid, std::string *s, unsig return pkt; } -MySQL_ResultSet::MySQL_ResultSet(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my) { +// See https://dev.mysql.com/doc/internals/en/com-stmt-execute.html for reference +stmt_execute_metadata_t * MySQL_Protocol::get_binds_from_pkt(void *ptr, unsigned int size, uint16_t num_params) { + stmt_execute_metadata_t *ret=NULL; //return NULL in case of failure + if (size<14) { + // some error! + return ret; + } + //ret=(stmt_execute_metadata_t *)malloc(sizeof(stmt_execute_metadata_t)); + ret= new stmt_execute_metadata_t(); + char *p=(char *)ptr+5; + memcpy(&ret->stmt_id,p,4); p+=4; // stmt-id + memcpy(&ret->flags,p,1); p+=1; // flags + p+=4; // iteration-count + ret->num_params=num_params; +// ret->binds=NULL; +// ret->is_nulls=NULL; +// ret->lengths=NULL; + ret->pkt=ptr; + if (num_params) { + uint16_t i; + size_t null_bitmap_length=(num_params+7)/8; + if (size < (14+1+null_bitmap_length)) { + // some data missing? + delete ret; + return NULL; + } + uint8_t new_params_bound_flag; + memcpy(&new_params_bound_flag,p+null_bitmap_length,1); + if (new_params_bound_flag!=1) { + // something wrong + delete ret; + return NULL; + } + uint8_t *null_bitmap=(uint8_t *)malloc(null_bitmap_length); + memcpy(null_bitmap,p,null_bitmap_length); + p+=null_bitmap_length; + p+=1; // new_params_bound_flag + MYSQL_BIND *binds=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)*num_params); + my_bool *is_nulls=(my_bool *)malloc(sizeof(my_bool)*num_params); + unsigned long *lengths=(unsigned long *)malloc(sizeof(unsigned long)*num_params); + ret->binds=binds; + ret->is_nulls=is_nulls; + ret->lengths=lengths; + for (i=0;i> idx; + is_nulls[i]=is_null; + binds[i].is_null=&is_nulls[i]; + // set buffer_type and is_unsigned + //enum enum_field_types buffer_type=MYSQL_TYPE_DECIMAL; // set a random default + uint16_t buffer_type=0; + memcpy(&buffer_type,p,2); + binds[i].is_unsigned=0; + if (buffer_type >= 32768) { // is_unsigned bit + buffer_type-=32768; + binds[i].is_unsigned=1; + } + binds[i].buffer_type=(enum enum_field_types)buffer_type; + p+=2; + + // set length + lengths[i]=0; +// unsigned long l=0; +// uint8_t ll=mysql_decode_length((unsigned char *)p,&l); +// lengths[i]=l; +// p+=ll; + binds[i].length=&lengths[i]; + + } + for (i=0;iadd(pkt.ptr,pkt.size); resultset_size+=pkt.size; + if (_stmt) { // binary protocol , we also assume we have ALL the resultset + //MYSQL_RES * prepare_meta_result = mysql_stmt_result_metadata(_stmt); +// int column_count=mysql_num_fields(prepare_meta_result); +// MYSQL_BIND *binds=(MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)*column_count); +// mysql_stmt_bind_result(_stmt, binds); + fprintf(stdout, "Fetching results ...\n"); +// int row_count=0; +// while (!mysql_stmt_fetch(_stmt)) { +// row_count++; +// fprintf(stdout, " row %d\n", row_count); +// } +// free (binds); + unsigned long long total_size=0; + MYSQL_ROWS *r=_stmt->result.data; + total_size+=r->length; + if (r->length > 0xFFFFFF) { + total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr); + } + total_size+=sizeof(mysql_hdr); + while(r->next) { + r=r->next; + total_size+=r->length; + if (r->length > 0xFFFFFF) { + total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr); + } + total_size+=sizeof(mysql_hdr); + } + PtrSize_t pkt; + pkt.size=total_size; + pkt.ptr=malloc(pkt.size); + total_size=0; + r=_stmt->result.data; + add_row2(r,(unsigned char *)pkt.ptr); + total_size+=r->length; + if (r->length > 0xFFFFFF) { + total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr); + } + total_size+=sizeof(mysql_hdr); + while(r->next) { + r=r->next; + add_row2(r,(unsigned char *)pkt.ptr+total_size); + total_size+=r->length; + if (r->length > 0xFFFFFF) { + total_size+=(r->length / 0xFFFFFF) * sizeof(mysql_hdr); + } + total_size+=sizeof(mysql_hdr); + } + PSarrayOUT->add(pkt.ptr,pkt.size); + resultset_size+=pkt.size; + add_eof(); + } } @@ -1507,6 +1743,50 @@ unsigned int MySQL_ResultSet::add_row(MYSQL_ROW row) { return pkt_length; } +// add_row2 is perhaps a faster implementation of add_row() +// still experimentatl +// so far, used only for prepared statements +// it assumes that the MYSQL_ROW is an format ready to be sent to the client +unsigned int MySQL_ResultSet::add_row2(MYSQL_ROWS *row, unsigned char *offset) { + unsigned long length=row->length; + + uint8_t pkt_sid=sid; + if (length < (0xFFFFFF+sizeof(mysql_hdr))) { + mysql_hdr myhdr; + myhdr.pkt_length=length; + myhdr.pkt_id=pkt_sid; + memcpy(offset, &myhdr, sizeof(mysql_hdr)); + memcpy(offset+sizeof(mysql_hdr), row->data, row->length); + pkt_sid++; + } else { + unsigned int left=length; + unsigned int copied=0; + while (left>=0xFFFFFF) { + mysql_hdr myhdr; + myhdr.pkt_length=0xFFFFFF; + myhdr.pkt_id=pkt_sid; + pkt_sid++; + memcpy(offset, &myhdr, sizeof(mysql_hdr)); + offset+=sizeof(mysql_hdr); + memcpy(offset, row->data+copied, myhdr.pkt_length); + offset+=0xFFFFFF; + // we are writing a large packet (over 16MB), we assume we are always outside the buffer + copied+=0xFFFFFF; + left-=0xFFFFFF; + } + mysql_hdr myhdr; + myhdr.pkt_length=left; + myhdr.pkt_id=pkt_sid; + pkt_sid++; + memcpy(offset, &myhdr, sizeof(mysql_hdr)); + offset+=sizeof(mysql_hdr); + memcpy(offset, row->data+copied, myhdr.pkt_length); + // we are writing a large packet (over 16MB), we assume we are always outside the buffer + } + sid=pkt_sid; + return length; +} + void MySQL_ResultSet::add_eof() { PtrSize_t pkt; if (myprot) { diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 5914be64b..fd8643d8d 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -15,6 +15,7 @@ extern const CHARSET_INFO * proxysql_find_charset_name(const char * const name); extern MySQL_Authentication *GloMyAuth; extern ProxySQL_Admin *GloAdmin; extern MySQL_Logger *GloMyLogger; +extern MySQL_STMT_Manager *GloMyStmt; class KillArgs { public: @@ -92,6 +93,8 @@ void Query_Info::begin(unsigned char *_p, int len, bool mysql_header) { MyComQueryCmd=MYSQL_COM_QUERY___NONE; QueryPointer=NULL; QueryLength=0; + mysql_stmt=NULL; + stmt_meta=NULL; //QueryParserArgs=NULL; QueryParserArgs.digest_text=NULL; QueryParserArgs.first_comment=NULL; @@ -110,6 +113,7 @@ void Query_Info::end() { if ((end_time-start_time) > (unsigned int)mysql_thread___long_query_time*1000) { __sync_add_and_fetch(&sess->thread->status_variables.queries_slow,1); } + assert(mysql_stmt==NULL); } void Query_Info::init(unsigned char *_p, int len, bool mysql_header) { @@ -189,6 +193,7 @@ MySQL_Session::MySQL_Session() { thread_session_id=0; pause_until=0; qpo=NULL; +// Session_STMT_Manager=NULL; start_time=0; command_counters=new StatCounters(15,10,false); healthy=1; @@ -250,6 +255,9 @@ MySQL_Session::~MySQL_Session() { if (admin==false && connections_handler==false && mirror==false) { __sync_fetch_and_sub(&MyHGM->status.client_connections,1); } +// if (Session_STMT_Manager) { +// delete Session_STMT_Manager; +// } } @@ -837,8 +845,6 @@ __get_pkts_from_client: case _MYSQL_COM_CHANGE_USER: handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_CHANGE_USER(&pkt, &wrong_pass); break; - case _MYSQL_COM_STMT_PREPARE: - case _MYSQL_COM_STMT_EXECUTE: case _MYSQL_COM_STMT_CLOSE: l_free(pkt.size,pkt.ptr); client_myds->setDSS_STATE_QUERY_SENT_NET(); @@ -846,6 +852,124 @@ __get_pkts_from_client: client_myds->DSS=STATE_SLEEP; status=WAITING_CLIENT_DATA; break; + case _MYSQL_COM_STMT_PREPARE: + if (admin==true) { // admin module will not support prepared statement!! + l_free(pkt.size,pkt.ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + break; + } else { + thread->status_variables.stmt_prepare++; + thread->status_variables.queries++; + // if we reach here, we are not on admin + bool rc_break=false; + + // Note: CurrentQuery sees the query as sent by the client. + // shortly after, the packets it used to contain the query will be deallocated + // Note2 : we call the next function as if it was _MYSQL_COM_QUERY + // because the offset will be identical + CurrentQuery.begin((unsigned char *)pkt.ptr,pkt.size,true); + + qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); + assert(qpo); // GloQPro->process_mysql_query() should always return a qpo + rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); + if (rc_break==true) { + break; + } +// if (Session_STMT_Manager==NULL) { +// Session_STMT_Manager = new MySQL_STMT_Manager(); +// } + if (client_myds->myconn->local_stmts==NULL) { + client_myds->myconn->local_stmts=new MySQL_STMTs_local(); + } + uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); + MySQL_STMT_Global_info *stmt_info=NULL; +// if (mysql_thread___stmt_multiplexing) { + stmt_info=GloMyStmt->find_prepared_statement_by_hash(hash); +// } else { +// stmt_info=Session_STMT_Manager->find_prepared_statement_by_hash(hash); +// } + if (stmt_info) { + l_free(pkt.size,pkt.ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + break; + } else { + mybe=find_or_create_backend(current_hostgroup); + status=PROCESSING_STMT_PREPARE; + mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure; + mybe->server_myds->wait_until=0; + pause_until=0; + mybe->server_myds->killed_at=0; + //mybe->server_myds->mysql_real_query.init(&pkt); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } + } + break; + case _MYSQL_COM_STMT_EXECUTE: + if (admin==true) { // admin module will not support prepared statement!! + l_free(pkt.size,pkt.ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Command not supported"); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + break; + } else { + // if we reach here, we are not on admin + thread->status_variables.stmt_execute++; + thread->status_variables.queries++; + bool rc_break=false; + + uint32_t stmt_global_id=0; + memcpy(&stmt_global_id,(char *)pkt.ptr+5,sizeof(uint32_t)); + CurrentQuery.stmt_global_id=stmt_global_id; + // now we get the statement information + MySQL_STMT_Global_info *stmt_info=NULL; +// if (mysql_thread___stmt_multiplexing) { + stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id); +// } else { +// stmt_info=Session_STMT_Manager->find_prepared_statement_by_stmt_id(stmt_global_id); +// } + if (stmt_info==NULL) { + // we couldn't find it + l_free(pkt.size,pkt.ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Prepared statement doesn't exist"); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + break; + } + stmt_execute_metadata_t *stmt_meta=client_myds->myprot.get_binds_from_pkt(pkt.ptr,pkt.size,stmt_info->num_params); + if (stmt_meta==NULL) { + l_free(pkt.size,pkt.ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"#28000",(char *)"Error in prepared statement execution"); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + break; + } + // else + +// CurrentQuery.begin((unsigned char *)stmt_info->query, stmt_info->query_length,false); + CurrentQuery.stmt_meta=stmt_meta; +// qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); // FIXME: not sure we need to call this +// assert(qpo); // GloQPro->process_mysql_query() should always return a qpo + // NOTE: we do not call YET the follow function for STMT_EXECUTE + //rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); + current_hostgroup=stmt_info->hostgroup_id; + mybe=find_or_create_backend(current_hostgroup); + status=PROCESSING_STMT_EXECUTE; + mybe->server_myds->connect_retries_on_failure=0; + mybe->server_myds->wait_until=0; + mybe->server_myds->killed_at=0; + //mybe->server_myds->mysql_real_query.init(&pkt); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } + break; // case _MYSQL_COM_STMT_PREPARE: // handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(&pkt); // break; @@ -977,6 +1101,8 @@ handler_again: } break; + case PROCESSING_STMT_PREPARE: + case PROCESSING_STMT_EXECUTE: case PROCESSING_QUERY: //fprintf(stderr,"PROCESSING_QUERY\n"); if (pause_until > thread->curtime) { @@ -1013,7 +1139,20 @@ handler_again: } if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) { // we don't have a backend yet - previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CONNECTING_SERVER); } else { MySQL_Data_Stream *myds=mybe->server_myds; @@ -1028,11 +1167,39 @@ handler_again: if (default_hostgroup>=0) { if (client_myds->myconn->userinfo->hash!=mybe->server_myds->myconn->userinfo->hash) { if (strcmp(client_myds->myconn->userinfo->username,myds->myconn->userinfo->username)) { - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CHANGING_USER_SERVER); } if (strcmp(client_myds->myconn->userinfo->schemaname,myds->myconn->userinfo->schemaname)) { - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CHANGING_SCHEMA); } } @@ -1048,7 +1215,21 @@ handler_again: } } if (client_myds->myconn->options.charset != mybe->server_myds->myconn->mysql->charset->nr) { - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CHANGING_CHARSET); } if (autocommit != mybe->server_myds->myconn->IsAutoCommit()) { @@ -1057,17 +1238,65 @@ handler_again: // enforce_autocommit_on_reads is disabled // we need to check if it is a SELECT not FOR UPDATE if (CurrentQuery.is_select_NOT_for_update()==false) { - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT); } } else { // in every other cases, enforce autocommit - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CHANGING_AUTOCOMMIT); } } + if (status==PROCESSING_STMT_EXECUTE) { + CurrentQuery.mysql_stmt=myconn->local_stmts->find(CurrentQuery.stmt_global_id); + if (CurrentQuery.mysql_stmt==NULL) { + // the conection we too doesn't have the prepared statements prepared + // we try to create it now + MySQL_STMT_Global_info *stmt_info=NULL; + stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(CurrentQuery.stmt_global_id); + CurrentQuery.QueryLength=stmt_info->query_length; + CurrentQuery.QueryPointer=(unsigned char *)stmt_info->query; + previous_status.push(PROCESSING_STMT_EXECUTE); + NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE); + } + } } } +// FIXME : this part of the code is commented just for reference. It seems it has been removed in v1.2, as copied above +// //status=PROCESSING_QUERY; +// mybe->server_myds->max_connect_time=0; +// // we insert it in mypolls only if not already there +// if (myds->mypolls==NULL) { +// thread->mypolls.add(POLLIN|POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime); +// } if (myconn->async_state_machine==ASYNC_IDLE) { mybe->server_myds->wait_until=0; @@ -1084,15 +1313,54 @@ handler_again: } } } + int rc; timespec begint; clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint); - int rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize); + switch (status) { + case PROCESSING_QUERY: + rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize); + break; + case PROCESSING_STMT_PREPARE: + //rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize,&CurrentQuery.mysql_stmt); + rc=myconn->async_query(myds->revents, (char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength,&CurrentQuery.mysql_stmt); + break; + case PROCESSING_STMT_EXECUTE: + // PROCESSING_STMT_EXECUTE FIXME + { + //MySQL_STMT_Global_info *stmt_info=NULL; +/* + if (mysql_thread___stmt_multiplexing) { + stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id); + } else { + stmt_info=Session_STMT_Manager->find_prepared_statement_by_stmt_id(stmt_global_id); + } +*/ +/* + CurrentQuery.mysql_stmt=myconn->local_stmts->find(CurrentQuery.stmt_global_id); + if (CurrentQuery.mysql_stmt==NULL) { + // the conection we too doesn't have the prepared statements prepared + // we try to create it now + MySQL_STMT_Global_info *stmt_info=NULL; + stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(CurrentQuery.stmt_global_id); + CurrentQuery.QueryLength=stmt_info->query_length; + CurrentQuery.QueryPointer=(unsigned char *)stmt_info->query; + previous_status.push(PROCESSING_STMT_EXECUTE); + NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE); + } +*/ + //rc=myconn->async_query(myds->revents, myds->mysql_real_query.QueryPtr,myds->mysql_real_query.QuerySize,&CurrentQuery.mysql_stmt, CurrentQuery.stmt_meta); + rc=myconn->async_query(myds->revents, (char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength,&CurrentQuery.mysql_stmt, CurrentQuery.stmt_meta); + } + break; + default: + assert(0); + break; + } timespec endt; clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt); thread->status_variables.backend_query_time=thread->status_variables.backend_query_time + (endt.tv_sec*1000000000+endt.tv_nsec) - (begint.tv_sec*1000000000+begint.tv_nsec); - // if (myconn->async_state_machine==ASYNC_QUERY_END) { if (rc==0) { // FIXME: deprecate old MySQL_Result_to_MySQL_wire , not completed yet @@ -1108,7 +1376,60 @@ handler_again: if (myconn->mysql->insert_id) { last_insert_id=myconn->mysql->insert_id; } - MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS); + + switch (status) { + case PROCESSING_QUERY: + MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS); + break; + case PROCESSING_STMT_PREPARE: + { + uint32_t stmid; + MySQL_STMT_Global_info *stmt_info=NULL; +// if (mysql_thread___stmt_multiplexing) { + stmt_info=GloMyStmt->add_prepared_statement(current_hostgroup, + (char *)client_myds->myconn->userinfo->username, + (char *)client_myds->myconn->userinfo->schemaname, + (char *)CurrentQuery.QueryPointer, + CurrentQuery.QueryLength, + CurrentQuery.mysql_stmt, + qpo->cache_ttl, + qpo->timeout, + qpo->delay, + true); + stmid=stmt_info->statement_id; + //uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); +/* + } else { + //stmt_info=Session_STMT_Manager->find_prepared_statement_by_hash(hash); + stmt_info=Session_STMT_Manager->add_prepared_statement(current_hostgroup, + (char *)client_myds->myconn->userinfo->username, + (char *)client_myds->myconn->userinfo->schemaname, + (char *)CurrentQuery.QueryPointer, + CurrentQuery.QueryLength, + CurrentQuery.mysql_stmt, + qpo->cache_ttl, + qpo->timeout, + qpo->delay, + false); + stmid=stmt_info->statement_id; +*/ +// } + myds->myconn->local_stmts->insert(stmid,CurrentQuery.mysql_stmt); + client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info); + } + CurrentQuery.mysql_stmt=NULL; + break; + case PROCESSING_STMT_EXECUTE: + { + MySQL_Stmt_Result_to_MySQL_wire(CurrentQuery.mysql_stmt); + } + CurrentQuery.mysql_stmt=NULL; + break; + default: + assert(0); + break; + } + // GloQPro->delete_QP_out(qpo); // qpo=NULL; // myconn->async_free_result(); @@ -1159,7 +1480,18 @@ handler_again: myds->fd=0; if (retry_conn) { myds->DSS=STATE_NOT_INITIALIZED; - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CONNECTING_SERVER); } return -1; @@ -1185,7 +1517,18 @@ handler_again: myds->fd=0; if (retry_conn) { myds->DSS=STATE_NOT_INITIALIZED; - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CONNECTING_SERVER); } return -1; @@ -1215,7 +1558,18 @@ handler_again: myds->fd=0; if (retry_conn) { myds->DSS=STATE_NOT_INITIALIZED; - previous_status.push(PROCESSING_QUERY); + //previous_status.push(PROCESSING_QUERY); + switch(status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + default: + assert(0); + break; + } NEXT_IMMEDIATE(CONNECTING_SERVER); } return -1; @@ -1227,7 +1581,24 @@ handler_again: break; // continue normally } - MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS); + switch (status) { + case PROCESSING_QUERY: + MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS); + break; + case PROCESSING_STMT_PREPARE: + //MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS, true); + { + char sqlstate[10]; + sprintf(sqlstate,"#%s",mysql_sqlstate(myconn->mysql)); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,mysql_errno(myconn->mysql),sqlstate,(char *)mysql_stmt_error(myconn->query.stmt)); + client_myds->pkt_sid++; + } + break; + default: + assert(0); + break; + } +// MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS); // CurrentQuery.end(); // GloQPro->delete_QP_out(qpo); // qpo=NULL; @@ -1980,7 +2351,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C -bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *pkt) { +bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(PtrSize_t *pkt, bool prepared) { if (qpo->new_query) { // the query was rewritten l_free(pkt->size,pkt->ptr); // free old pkt @@ -2027,6 +2398,11 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C RequestEnd(NULL); return true; } + + if (prepared) { // for prepared statement we exit here + goto __exit_set_destination_hostgroup; + } + if (mirror==true) { // for mirror session we exit here current_hostgroup=qpo->destination_hostgroup; return false; @@ -2061,6 +2437,9 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C return true; } } + +__exit_set_destination_hostgroup: + if ( qpo->destination_hostgroup >= 0 ) { if (transaction_persistent_hostgroup == -1) { current_hostgroup=qpo->destination_hostgroup; @@ -2164,6 +2543,14 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED } } +void MySQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt) { + MYSQL_RES *stmt_result=mysql_stmt_result_metadata(stmt); + if (stmt_result) { + MySQL_ResultSet *MyRS=new MySQL_ResultSet(&client_myds->myprot, stmt_result, stmt->mysql, stmt); + bool resultset_completed=MyRS->get_resultset(client_myds->PSarrayOUT); + } +} + void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS) { if (MyRS) { assert(MyRS->result); diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 832ae6869..c081176d9 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -182,6 +182,7 @@ static char * mysql_thread_variables_names[]= { (char *)"max_allowed_packet", (char *)"max_transaction_time", (char *)"multiplexing", +// (char *)"stmt_multiplexing", (char *)"enforce_autocommit_on_reads", (char *)"threshold_query_length", (char *)"threshold_resultset_size", @@ -283,6 +284,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.client_found_rows=true; variables.commands_stats=true; variables.multiplexing=true; +// variables.stmt_multiplexing=false; variables.enforce_autocommit_on_reads=false; variables.query_digests=true; variables.sessions_sort=true; @@ -477,6 +479,7 @@ int MySQL_Threads_Handler::get_variable_int(char *name) { if (!strcasecmp(name,"have_compress")) return (int)variables.have_compress; if (!strcasecmp(name,"client_found_rows")) return (int)variables.client_found_rows; if (!strcasecmp(name,"multiplexing")) return (int)variables.multiplexing; +// if (!strcasecmp(name,"stmt_multiplexing")) return (int)variables.stmt_multiplexing; if (!strcasecmp(name,"enforce_autocommit_on_reads")) return (int)variables.enforce_autocommit_on_reads; if (!strcasecmp(name,"commands_stats")) return (int)variables.commands_stats; if (!strcasecmp(name,"query_digests")) return (int)variables.query_digests; @@ -725,6 +728,9 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f if (!strcasecmp(name,"multiplexing")) { return strdup((variables.multiplexing ? "true" : "false")); } +// if (!strcasecmp(name,"stmt_multiplexing")) { +// return strdup((variables.stmt_multiplexing ? "true" : "false")); +// } if (!strcasecmp(name,"enforce_autocommit_on_reads")) { return strdup((variables.enforce_autocommit_on_reads ? "true" : "false")); } @@ -1306,6 +1312,21 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t } return false; } +// if (!strcasecmp(name,"stmt_multiplexing")) { +/* + // FIXME : for now, stmt_multiplexing cannot be enabled + // there is no code to handle it + if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { + variables.stmt_multiplexing=true; + return true; + } +*/ +// if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) { +// variables.stmt_multiplexing=false; +// return true; +// } +// return false; +// } if (!strcasecmp(name,"enforce_autocommit_on_reads")) { if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) { variables.enforce_autocommit_on_reads=true; @@ -2061,6 +2082,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___have_compress=(bool)GloMTH->get_variable_int((char *)"have_compress"); mysql_thread___client_found_rows=(bool)GloMTH->get_variable_int((char *)"client_found_rows"); mysql_thread___multiplexing=(bool)GloMTH->get_variable_int((char *)"multiplexing"); +// mysql_thread___stmt_multiplexing=(bool)GloMTH->get_variable_int((char *)"stmt_multiplexing"); mysql_thread___enforce_autocommit_on_reads=(bool)GloMTH->get_variable_int((char *)"enforce_autocommit_on_reads"); mysql_thread___commands_stats=(bool)GloMTH->get_variable_int((char *)"commands_stats"); mysql_thread___query_digests=(bool)GloMTH->get_variable_int((char *)"query_digests"); @@ -2098,6 +2120,9 @@ MySQL_Thread::MySQL_Thread() { last_maintenance_time=0; maintenance_loop=true; + status_variables.stmt_prepare=0; + status_variables.stmt_execute=0; + status_variables.stmt_close=0; status_variables.queries=0; status_variables.queries_slow=0; status_variables.queries_backends_bytes_sent=0; @@ -2302,6 +2327,24 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus() { pta[1]=buf; result->add_row(pta); } + { // stmt prepare + pta[0]=(char *)"Com_stmt_prepare"; + sprintf(buf,"%llu",get_total_stmt_prepare()); + pta[1]=buf; + result->add_row(pta); + } + { // stmt execute + pta[0]=(char *)"Com_stmt_execute"; + sprintf(buf,"%llu",get_total_stmt_execute()); + pta[1]=buf; + result->add_row(pta); + } + { // stmt prepare + pta[0]=(char *)"Com_stmt_close"; + sprintf(buf,"%llu",get_total_stmt_close()); + pta[1]=buf; + result->add_row(pta); + } { // Queries pta[0]=(char *)"Questions"; sprintf(buf,"%llu",get_total_queries()); @@ -2554,6 +2597,45 @@ __exit_kill_session: return ret; } +unsigned long long MySQL_Threads_Handler::get_total_stmt_prepare() { + unsigned long long q=0; + unsigned int i; + for (i=0;istatus_variables.stmt_prepare,0); + } + } + return q; +} + +unsigned long long MySQL_Threads_Handler::get_total_stmt_execute() { + unsigned long long q=0; + unsigned int i; + for (i=0;istatus_variables.stmt_execute,0); + } + } + return q; +} + +unsigned long long MySQL_Threads_Handler::get_total_stmt_close() { + unsigned long long q=0; + unsigned int i; + for (i=0;istatus_variables.stmt_close,0); + } + } + return q; +} + unsigned long long MySQL_Threads_Handler::get_total_queries() { unsigned long long q=0; unsigned int i; diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index a4aaecda7..8661dcc05 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -171,11 +171,14 @@ MySQL_Connection::MySQL_Connection() { mysql_result=NULL; query.ptr=NULL; query.length=0; + query.stmt=NULL; + query.stmt_meta=NULL; largest_query_length=0; MyRS=NULL; creation_time=0; processing_multi_statement=false; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this); + local_stmts=new MySQL_STMTs_local(); }; MySQL_Connection::~MySQL_Connection() { @@ -206,6 +209,13 @@ MySQL_Connection::~MySQL_Connection() { if (MyRS) { delete MyRS; } + if (local_stmts) { + delete local_stmts; + } + if (query.stmt) { + mysql_stmt_close(query.stmt); + query.stmt=NULL; + } }; bool MySQL_Connection::set_autocommit(bool _ac) { @@ -430,6 +440,10 @@ void MySQL_Connection::set_query(char *stmt, unsigned long length) { if (length > largest_query_length) { largest_query_length=length; } + if (query.stmt) { + mysql_stmt_close(query.stmt); + query.stmt=NULL; + } //query.ptr=(char *)malloc(length); //memcpy(query.ptr,stmt,length); } @@ -444,6 +458,40 @@ void MySQL_Connection::real_query_cont(short event) { async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event, true)); } +void MySQL_Connection::stmt_prepare_start() { + PROXY_TRACE(); + query.stmt=mysql_stmt_init(mysql); + my_bool my_arg=true; + mysql_stmt_attr_set(query.stmt, STMT_ATTR_UPDATE_MAX_LENGTH, &my_arg); + async_exit_status = mysql_stmt_prepare_start(&interr , query.stmt, query.ptr, query.length); +} + +void MySQL_Connection::stmt_prepare_cont(short event) { + proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); + async_exit_status = mysql_stmt_prepare_cont(&interr , query.stmt , mysql_status(event, true)); +} + +void MySQL_Connection::stmt_execute_start() { + PROXY_TRACE(); + mysql_stmt_bind_param(query.stmt, query.stmt_meta->binds); // FIXME : add error handling + async_exit_status = mysql_stmt_execute_start(&interr , query.stmt); +} + +void MySQL_Connection::stmt_execute_cont(short event) { + proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); + async_exit_status = mysql_stmt_execute_cont(&interr , query.stmt , mysql_status(event, true)); +} + +void MySQL_Connection::stmt_execute_store_result_start() { + PROXY_TRACE(); + async_exit_status = mysql_stmt_store_result_start(&interr, query.stmt); +} + +void MySQL_Connection::stmt_execute_store_result_cont(short event) { + proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); + async_exit_status = mysql_stmt_store_result_cont(&interr , query.stmt , mysql_status(event, true)); +} + void MySQL_Connection::store_result_start() { PROXY_TRACE(); async_exit_status = mysql_store_result_start(&mysql_result, mysql); @@ -622,6 +670,108 @@ handler_again: } break; + case ASYNC_STMT_PREPARE_START: + stmt_prepare_start(); + __sync_fetch_and_add(&parent->queries_sent,1); + __sync_fetch_and_add(&parent->bytes_sent,query.length); + myds->sess->thread->status_variables.queries_backends_bytes_sent+=query.length; + if (async_exit_status) { + next_event(ASYNC_STMT_PREPARE_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END); + } + break; + case ASYNC_STMT_PREPARE_CONT: + stmt_prepare_cont(event); + if (async_exit_status) { + next_event(ASYNC_STMT_PREPARE_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END); + } + break; + + case ASYNC_STMT_PREPARE_END: + if (interr) { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_FAILED); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_SUCCESSFUL); + } + break; + case ASYNC_STMT_PREPARE_SUCCESSFUL: + break; + case ASYNC_STMT_PREPARE_FAILED: + break; + + case ASYNC_STMT_EXECUTE_START: + stmt_execute_start(); + __sync_fetch_and_add(&parent->queries_sent,1); +// __sync_fetch_and_add(&parent->bytes_sent,query.length); +// myds->sess->thread->status_variables.queries_backends_bytes_sent+=query.length; + if (async_exit_status) { + next_event(ASYNC_STMT_EXECUTE_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START); + } + break; + case ASYNC_STMT_EXECUTE_CONT: + stmt_execute_cont(event); + if (async_exit_status) { + next_event(ASYNC_STMT_EXECUTE_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START); + } + break; + + case ASYNC_STMT_EXECUTE_STORE_RESULT_START: + if (mysql_stmt_errno(query.stmt)) { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + { + MYSQL_RES *stmt_result=mysql_stmt_result_metadata(query.stmt); + if (stmt_result==NULL) { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + } + stmt_execute_store_result_start(); + if (async_exit_status) { + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + break; + case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT: + stmt_execute_store_result_cont(event); + if (async_exit_status) { + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + break; + case ASYNC_STMT_EXECUTE_END: + { +/* + int row_count= 0; + fprintf(stdout, "Fetching results ...\n"); + while (!mysql_stmt_fetch(query.stmt)) + { + row_count++; + fprintf(stdout, " row %d\n", row_count); + } +*/ + } +/* + if (interr) { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_FAILED); + } else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_SUCCESSFUL); + } +*/ + break; +// case ASYNC_STMT_EXECUTE_SUCCESSFUL: +// break; +// case ASYNC_STMT_EXECUTE_FAILED: +// break; + case ASYNC_NEXT_RESULT_START: async_exit_status = mysql_next_result_start(&interr, mysql); if (async_exit_status) { @@ -897,7 +1047,7 @@ int MySQL_Connection::async_connect(short event) { // 0 when the query is completed // 1 when the query is not completed // the calling function should check mysql error in mysql struct -int MySQL_Connection::async_query(short event, char *stmt, unsigned long length) { +int MySQL_Connection::async_query(short event, char *stmt, unsigned long length, MYSQL_STMT **_stmt, stmt_execute_metadata_t *stmt_meta) { PROXY_TRACE(); assert(mysql); assert(ret_mysql); @@ -914,8 +1064,20 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length) return 0; break; case ASYNC_IDLE: - set_query(stmt,length); + if (stmt_meta==NULL) + set_query(stmt,length); async_state_machine=ASYNC_QUERY_START; + if (_stmt) { + query.stmt=*_stmt; + if (stmt_meta==NULL) { + async_state_machine=ASYNC_STMT_PREPARE_START; + } else { + if (query.stmt_meta==NULL) { + query.stmt_meta=stmt_meta; + } + async_state_machine=ASYNC_STMT_EXECUTE_START; + } + } default: handler(event); break; @@ -928,6 +1090,24 @@ int MySQL_Connection::async_query(short event, char *stmt, unsigned long length) return 0; } } + if (async_state_machine==ASYNC_STMT_EXECUTE_END) { + async_state_machine=ASYNC_QUERY_END; + if (mysql_stmt_errno(query.stmt)) { + return -1; + } else { + return 0; + } + } + if (async_state_machine==ASYNC_STMT_PREPARE_SUCCESSFUL || async_state_machine==ASYNC_STMT_PREPARE_FAILED) { + if (async_state_machine==ASYNC_STMT_PREPARE_FAILED) { + //mysql_stmt_close(query.stmt); + //query.stmt=NULL; + return -1; + } else { + *_stmt=query.stmt; + return 0; + } + } if (async_state_machine==ASYNC_NEXT_RESULT_START) { // if we reached this point it measn we are processing a multi-statement // and we need to exit to give control to MySQL_Session diff --git a/src/main.cpp b/src/main.cpp index 774c5990d..37dd8699e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -142,6 +142,8 @@ Query_Processor *GloQPro; ProxySQL_Admin *GloAdmin; MySQL_Threads_Handler *GloMTH; +MySQL_STMT_Manager *GloMyStmt; + MySQL_Monitor *GloMyMon; std::thread *MyMon_thread; @@ -236,9 +238,11 @@ void ProxySQL_Main_init_main_modules() { GloMyAuth=NULL; GloMyMon=NULL; GloMyLogger=NULL; + GloMyStmt=NULL; MyHGM=new MySQL_HostGroups_Manager(); GloMTH=new MySQL_Threads_Handler(); GloMyLogger = new MySQL_Logger(); + GloMyStmt=new MySQL_STMT_Manager(); } @@ -372,6 +376,10 @@ void ProxySQL_Main_shutdown_all_modules() { std::cerr << "GloMyLogger shutdown in "; #endif } + if (GloMyStmt) { + delete GloMyStmt; + GloMyStmt=NULL; + } { cpu_timer t; diff --git a/test/PrepStmt/.gitignore b/test/PrepStmt/.gitignore new file mode 100644 index 000000000..c76b1d9d9 --- /dev/null +++ b/test/PrepStmt/.gitignore @@ -0,0 +1,7 @@ +client1 +client2 +client3 +client4 +client5 +client6 +client7 diff --git a/test/PrepStmt/Makefile b/test/PrepStmt/Makefile new file mode 100644 index 000000000..1e48b9919 --- /dev/null +++ b/test/PrepStmt/Makefile @@ -0,0 +1,85 @@ + + + + +DEPS_PATH=../../deps + +MARIADB_PATH=$(DEPS_PATH)/mariadb-client-library/mariadb_client +MARIADB_IDIR=$(MARIADB_PATH)/include +MARIADB_LDIR=$(MARIADB_PATH)/libmariadb + + +DAEMONPATH=$(DEPS_PATH)/libdaemon/libdaemon +DAEMONPATH_IDIR=$(DAEMONPATH) +DAEMONPATH_LDIR=$(DAEMONPATH)/libdaemon/.libs + +JEMALLOC_PATH=$(DEPS_PATH)/jemalloc/jemalloc +JEMALLOC_IDIR=$(JEMALLOC_PATH)/include/jemalloc +JEMALLOC_LDIR=$(JEMALLOC_PATH)/lib + +LIBCONFIG_PATH=$(DEPS_PATH)/libconfig/libconfig-1.4.9 +LIBCONFIG_IDIR=-I$(LIBCONFIG_PATH)/lib +LIBCONFIG_LDIR=-L$(LIBCONFIG_PATH)/lib/.libs + +LIBEVENT_PATH=$(DEPS_PATH)/libevent/libevent +LIBEVENT_IDIR=$(LIBEVENT_PATH)/include +LIBEVENT_LDIR=$(LIBEVENT_PATH)/.libs + +RE2_PATH=$(DEPS_PATH)/re2/re2 +RE2_IDIR=$(RE2_PATH) + +SQLITE3_DIR=$(DEPS_PATH)/sqlite3/sqlite3 + +IDIR=../../include +LDIR=../../lib +IDIRS=-I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(DAEMONPATH_IDIR) -I$(SQLITE3_DIR) +LDIRS=-L$(LDIR) -L$(JEMALLOC_LDIR) $(LIBCONFIG_LDIR) -L$(RE2_PATH)/obj -L$(LIBEVENT_LDIR) -L$(MARIADB_LDIR) -L$(DAEMONPATH_LDIR) + +DEBUG=-DDEBUG +MYCPPFLAGS=-std=c++11 $(IDIRS) $(OPTZ) $(DEBUG) -ggdb +LDFLAGS+= +MYLIBS=-Wl,--export-dynamic -Wl,-Bstatic -lconfig -lproxysql -ldaemon -ljemalloc -lconfig++ -lre2 -levent -lmariadbclient -Wl,-Bdynamic -lpthread -lm -lz -lrt -lcrypto -lssl $(EXTRALINK) + +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S),Linux) + MYLIBS+= -ldl +endif +ifeq ($(UNAME_S),FreeBSD) + MYLIBS+= -lexecinfo +endif + +LIBPROXYSQLAR=$(LDIR)/libproxysql.a + +.PHONY: default +default: client1 + +client1: client1.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + +client2: client2.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + +client3: client3.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + +client4: client4.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + +client5: client5.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + +client6: client6.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + +client7: client7.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + +client8: client8.cpp + $(CXX) -o $@ $@.cpp $(LIBPROXYSQLAR) $(MYCPPFLAGS) $(CPPFLAGS) $(LDIRS) $(LIBS) $(LDFLAGS) $(MYLIBS) + + +default: $(EXECUTABLE) + +clean: + rm -f *~ core $(default) + diff --git a/test/PrepStmt/client1.cpp b/test/PrepStmt/client1.cpp new file mode 100644 index 000000000..b1b197eaf --- /dev/null +++ b/test/PrepStmt/client1.cpp @@ -0,0 +1,75 @@ +#include "proxysql.h" +#include "cpp.h" + +#define QUERY1 "SELECT ? + ? + ?" +MYSQL *mysql; +MYSQL_STMT *stmt; +uint32_t statement_id; +uint16_t num_params; +uint16_t num_columns; +uint16_t warning_count; + +int run_stmt(MYSQL_STMT *stmt, int int_data) { + MYSQL_BIND bind[3]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + bind[1].buffer_type= MYSQL_TYPE_LONG; + bind[1].buffer= (char *)&int_data; + bind[1].is_null= 0; + bind[1].length= 0; + bind[2].buffer_type= MYSQL_TYPE_LONG; + bind[2].buffer= (char *)&int_data; + bind[2].is_null= 0; + bind[2].length= 0; + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// memset(bind, 0, sizeof(bind)); + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +int main() { + std::mt19937 mt_rand(time(0)); + mysql = mysql_init(NULL); + if (!mysql_real_connect(mysql,"127.0.0.1","msandbox","msandbox","test",6033,NULL,0)) { + //if (!mysql_real_connect(mysql,"127.0.0.1","root","","test",3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, QUERY1, strlen(QUERY1))) { + fprintf(stderr, " mysql_stmt_prepare(), failed: %d, %s\n" , mysql_errno(mysql), mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// param_count= mysql_stmt_param_count(stmt); +// fprintf(stdout, " total parameters in Query1 : %d\n", param_count); + statement_id=stmt->stmt_id; + num_params=stmt->param_count; + num_columns=stmt->field_count; + warning_count=stmt->upsert_status.warning_count; + fprintf(stdout, "statement_id=%d , columns=%d , params=%d , warnings=%d\n", statement_id, num_columns, num_params, warning_count); + run_stmt(stmt,(uint32_t)mt_rand()); + return 0; +} diff --git a/test/PrepStmt/client2.cpp b/test/PrepStmt/client2.cpp new file mode 100644 index 000000000..c43187ae8 --- /dev/null +++ b/test/PrepStmt/client2.cpp @@ -0,0 +1,178 @@ +#include "proxysql.h" +#include "cpp.h" + +#include + +#define QUERY1 "SELECT ?" +#define NUMPREP 100000 +#define NUMPRO 1000 +//#define NUMPREP 160 +//#define NUMPRO 4 +#define LOOPS 10 +#define USER "root" +#define SCHEMA "test" +MYSQL *mysql; +MYSQL_STMT **stmt; +uint32_t statement_id; +uint16_t num_params; +uint16_t num_columns; +uint16_t warning_count; + +MySQL_STMT_Manager *GloMyStmt; + +struct cpu_timer +{ + ~cpu_timer() + { + auto end = std::clock() ; + std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ; + }; + + const std::clock_t begin = std::clock() ; +}; + + +int run_stmt(MYSQL_STMT *stmt, int int_data) { + MYSQL_BIND bind[1]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// memset(bind, 0, sizeof(bind)); + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +int main() { + std::mt19937 mt_rand(time(0)); + GloMyStmt=new MySQL_STMT_Manager(); + MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local(); + mysql = mysql_init(NULL); + char buff[128]; + unsigned int bl=0; + if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + int i; + stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP); + { + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a==NULL) { + if (mysql_stmt_prepare(stmt[i], buff, bl)) { + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt[i])); + exit(EXIT_FAILURE); + } + uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt[i]); + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt[i]->stmt_id, stmid); + local_stmts->insert(stmid,stmt[i]); + } + } + fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", NUMPREP, GloMyStmt->total_prepared_statements()); + fprintf(stdout, "Created in: "); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u random strings in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u hashes in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) founds++; + } + fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) { + // we have a prepared statement, we can run it + founds++; + MYSQL_STMT *stm=local_stmts->find(a->statement_id); + run_stmt(stm,(uint32_t)mt_rand()); + } + } + fprintf(stdout, "Executed %u prepared statements in: ", founds); + } + + { + // for comparison, we run also queries in TEXT protocol + cpu_timer t; + for (i=0; i +#include + +#define QUERY1 "SELECT ?" +#define NUMPREP 100000 +#define NUMPRO 10000 +//#define NUMPREP 160 +//#define NUMPRO 4 +#define LOOPS 1 +#define USER "root" +#define SCHEMA "test" + +#define NTHREADS 4 + +MySQL_STMT_Manager *GloMyStmt; + +typedef struct _thread_data_t { + std::thread *thread; + MYSQL *mysql; + MYSQL_STMT **stmt; +} thread_data_t; + + +thread_data_t **GloThrData; + +struct cpu_timer +{ + ~cpu_timer() + { + auto end = std::clock() ; + std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ; + }; + + const std::clock_t begin = std::clock() ; +}; + + +int run_stmt(MYSQL_STMT *stmt, int int_data) { + MYSQL_BIND bind[1]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// memset(bind, 0, sizeof(bind)); + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +void * mysql_thread(int tid) { + std::mt19937 mt_rand(time(0)); + + thread_data_t *THD; + THD=GloThrData[tid]; + + MYSQL *mysql; + MYSQL_STMT **stmt; + + MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local(); + THD->mysql = mysql_init(NULL); + mysql=THD->mysql; + + char buff[128]; + unsigned int bl=0; + if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + int i; + stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP); + + { + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a==NULL) { // no prepared statement was found in global + if (mysql_stmt_prepare(stmt[i], buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt[i])); + exit(EXIT_FAILURE); + } + uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt[i]); + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt[i]->stmt_id, stmid); + local_stmts->insert(stmid,stmt[i]); + } + } + fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", NUMPREP, GloMyStmt->total_prepared_statements()); + fprintf(stdout, "Created in: "); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u random strings in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u hashes in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) founds++; + } + fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) { + // we have a prepared statement, we can run it + MYSQL_STMT *stm=local_stmts->find(a->statement_id); + if (stm) { + run_stmt(stm,(uint32_t)mt_rand()); + founds++; + } + } + } + fprintf(stdout, "Executed %u prepared statements in: ", founds); + } + + { + // for comparison, we run also queries in TEXT protocol + cpu_timer t; + for (i=0; ithread = new std::thread(&mysql_thread,i); + } + for (i=0; ithread->join(); + } + return 0; +} diff --git a/test/PrepStmt/client4.cpp b/test/PrepStmt/client4.cpp new file mode 100644 index 000000000..a272656b5 --- /dev/null +++ b/test/PrepStmt/client4.cpp @@ -0,0 +1,190 @@ +#include "proxysql.h" +#include "cpp.h" + +#include + +#define QUERY1 "SELECT ?" +#define NUMPREP 100000 +#define NUMPRO 1000 +//#define NUMPREP 160 +//#define NUMPRO 4 +#define LOOPS 10 +#define USER "root" +#define SCHEMA "test" + +#define NTHREADS 4 + + +typedef struct _thread_data_t { + MYSQL *mysql; + MYSQL_STMT **stmt; +} thread_data_t; + +uint32_t statement_id; +uint16_t num_params; +uint16_t num_columns; +uint16_t warning_count; + +MySQL_STMT_Manager *GloMyStmt; + +struct cpu_timer +{ + ~cpu_timer() + { + auto end = std::clock() ; + std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ; + }; + + const std::clock_t begin = std::clock() ; +}; + + +int run_stmt(MYSQL_STMT *stmt, int int_data) { + MYSQL_BIND bind[1]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// memset(bind, 0, sizeof(bind)); + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +void * mysql_thread() { + std::mt19937 mt_rand(time(0)); + + + MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local(); + mysql = mysql_init(NULL); + char buff[128]; + unsigned int bl=0; + if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + int i; + stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP); + { + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a==NULL) { + if (mysql_stmt_prepare(stmt[i], buff, bl)) { + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt[i])); + exit(EXIT_FAILURE); + } + uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt[i]); + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt[i]->stmt_id, stmid); + local_stmts->insert(stmid,stmt[i]); + } + } + fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", NUMPREP, GloMyStmt->total_prepared_statements()); + fprintf(stdout, "Created in: "); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u random strings in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u hashes in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) founds++; + } + fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) { + // we have a prepared statement, we can run it + founds++; + MYSQL_STMT *stm=local_stmts->find(a->statement_id); + run_stmt(stm,(uint32_t)mt_rand()); + } + } + fprintf(stdout, "Executed %u prepared statements in: ", founds); + } + + { + // for comparison, we run also queries in TEXT protocol + cpu_timer t; + for (i=0; i +#include + +#define QUERY1 "SELECT ?" +#define NUMPREP 100000 +#define NUMPRO 20000 +//#define NUMPREP 160 +//#define NUMPRO 4 +#define LOOPS 1 +#define USER "root" +#define SCHEMA "test" + +#define NTHREADS 4 + +MySQL_STMT_Manager *GloMyStmt; + +typedef struct _thread_data_t { + std::thread *thread; + MYSQL *mysql; + MYSQL_STMT **stmt; +} thread_data_t; + + +thread_data_t **GloThrData; + +struct cpu_timer +{ + ~cpu_timer() + { + auto end = std::clock() ; + std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ; + }; + + const std::clock_t begin = std::clock() ; +}; + + +int run_stmt(MYSQL_STMT *stmt, int int_data) { + MYSQL_BIND bind[1]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// memset(bind, 0, sizeof(bind)); + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +void * mysql_thread(int tid) { + std::mt19937 mt_rand(time(0)); + + thread_data_t *THD; + THD=GloThrData[tid]; + + // in this version, each mysql thread has just ONE connection + // for now we use blocking API + MYSQL *mysql; + + //MYSQL_STMT **stmt; + + // we intialize the local mapping : MySQL_STMTs_local() + MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local(); + + // we initialize a MYSQL structure + THD->mysql = mysql_init(NULL); + mysql=THD->mysql; + + char buff[128]; + unsigned int bl=0; + + // we establish a connection to the database + if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + int i; + + // array of (MYSQL_STMT *) ; we don't use it in this version + //stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP); + + MYSQL_STMT *stmt; + { + cpu_timer t; + // in this loop we create only some the prepared statements + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a==NULL) { // no prepared statement was found in global + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + MySQL_STMT_Global_info *stmt_info=NULL; + stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt); + uint32_t stmid=stmt_info->statement_id; + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid); + local_stmts->insert(stmid,stmt); + } + } + fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", i, GloMyStmt->total_prepared_statements()); + fprintf(stdout, "Created in: "); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u random strings in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u hashes in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) founds++; + } + fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds); + } + + { + unsigned int founds=0; + unsigned int created=0; + unsigned int executed=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) { + // we have a prepared statement, we can run it + MYSQL_STMT *stm=local_stmts->find(a->statement_id); + if (stm) { // the statement exists in local + run_stmt(stm,(uint32_t)mt_rand()); + founds++; + executed++; + local_stmts->erase(a->statement_id); + } else { // the statement doesn't exist locally + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + local_stmts->insert(a->statement_id,stmt); + run_stmt(stmt,(uint32_t)mt_rand()); + created++; + executed++; + local_stmts->erase(a->statement_id); + } + } else { // no prepared statement was found in global + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + MySQL_STMT_Global_info *stmt_info=NULL; + stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt); + uint32_t stmid=stmt_info->statement_id; + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid); + local_stmts->insert(stmid,stmt); + run_stmt(stmt,(uint32_t)mt_rand()); + created++; + executed++; + local_stmts->erase(stmid); + } + } + fprintf(stdout, "Found %u , created %u and executed %u prepared statements in: ", founds, created, executed); + } +/* + { + // for comparison, we run also queries in TEXT protocol + cpu_timer t; + for (i=0; ithread = new std::thread(&mysql_thread,i); + } + // wait for the threads to complete + for (i=0; ithread->join(); + } + return 0; +} diff --git a/test/PrepStmt/client6.cpp b/test/PrepStmt/client6.cpp new file mode 100644 index 000000000..f2e2b12bc --- /dev/null +++ b/test/PrepStmt/client6.cpp @@ -0,0 +1,357 @@ +#define PROXYSQL_EXTERN +#include "proxysql.h" +#include "cpp.h" + +#include +#include + +#define QUERY1 "SELECT ?" +#define NUMPREP 100000 +#define NUMPRO 20000 +//#define NUMPREP 160 +//#define NUMPRO 4 +#define LOOPS 1 +#define USER "root" +#define SCHEMA "test" + +#define NTHREADS 4 + +MySQL_STMT_Manager *GloMyStmt; +Query_Cache *GloQC; +MySQL_Authentication *GloMyAuth; +Query_Processor *GloQPro; +ProxySQL_Admin *GloAdmin; +MySQL_Threads_Handler *GloMTH; +MySQL_Monitor *GloMyMon; +std::thread *MyMon_thread; + +MySQL_Logger *GloMyLogger; + + +typedef struct _thread_data_t { + std::thread *thread; + MYSQL *mysql; + MYSQL_STMT **stmt; +} thread_data_t; + + +thread_data_t **GloThrData; + +struct cpu_timer +{ + ~cpu_timer() + { + auto end = std::clock() ; + std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ; + }; + + const std::clock_t begin = std::clock() ; +}; + + +int run_stmt(MYSQL_STMT *stmt, int int_data) { + MYSQL_BIND bind[1]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// memset(bind, 0, sizeof(bind)); + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +void * mysql_thread(int tid) { + std::mt19937 mt_rand(time(0)); + + + thread_data_t *THD; + THD=GloThrData[tid]; + + MySQL_Thread *worker = new MySQL_Thread(); + worker->init(); + + char buff[128]; + unsigned int bl=0; + + MySrvC *mysrvc=new MySrvC((char *)"127.0.0.1", 3306, 100, MYSQL_SERVER_STATUS_ONLINE, 100, 0, 0, 0, 10000); + + { + int i; + MySQL_Session **SESS=(MySQL_Session **)malloc(16*sizeof(MySQL_Session *)); + + for (i=0; i<16; i++) { + SESS[i]=new MySQL_Session(); + MySQL_Session *sess=SESS[i]; + sess->mirror==true; + sess->client_myds=NULL; + sess->client_myds = new MySQL_Data_Stream(); + sess->client_myds->DSS=STATE_SLEEP; + sess->client_myds->sess=sess; + sess->client_myds->myds_type=MYDS_FRONTEND; + sess->client_myds->PSarrayIN= new PtrSizeArray(); + sess->client_myds->PSarrayOUT= new PtrSizeArray(); + worker->register_session(sess); + sess->current_hostgroup=0; + sess->default_hostgroup=0; + sess->mybe=sess->find_or_create_backend(sess->current_hostgroup); + MySQL_Connection *myconn=new MySQL_Connection(); + sess->mybe->server_myds->attach_connection(myconn); + myconn->userinfo->set((char *)"root",(char *)"",(char *)"information_schema"); + myconn->local_stmts = new MySQL_STMTs_local(); + //myconn->mysql=mysql_init(NULL); + myconn->parent=mysrvc; + /* + if (!mysql_real_connect(myconn->mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(myconn->mysql)); + exit(EXIT_FAILURE); + }*/ + myconn->handler(0); + if (myconn->mysql==NULL) { + myconn->handler(0); + } + } + for (i=0; i<16; i++) { + MySQL_Session *sess=SESS[i]; + sprintf(buff+5,"SELECT %u + ?",(uint32_t)mt_rand()%NUMPRO); + bl=strlen(buff+5); + mysql_hdr hdr; + hdr.pkt_id=0; + hdr.pkt_length=bl+1; + memcpy(buff,&hdr,sizeof(mysql_hdr)); + buff[4]=0x16; + PtrSize_t pkt; + pkt.size=hdr.pkt_length+sizeof(mysql_hdr); + pkt.ptr=malloc(pkt.size); + memcpy(pkt.ptr,buff,pkt.size); + sess->client_myds->PSarrayIN->add(pkt.ptr,pkt.size); + sess->status=WAITING_CLIENT_DATA;; + sess->handler(); + } + } + +/* + // in this version, each mysql thread has just ONE connection + // for now we use blocking API + MYSQL *mysql; + + //MYSQL_STMT **stmt; + + // we intialize the local mapping : MySQL_STMTs_local() + MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local(); + + // we initialize a MYSQL structure + THD->mysql = mysql_init(NULL); + mysql=THD->mysql; + + char buff[128]; + unsigned int bl=0; + + // we establish a connection to the database + if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + int i; + + // array of (MYSQL_STMT *) ; we don't use it in this version + //stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP); + + MYSQL_STMT *stmt; + { + cpu_timer t; + // in this loop we create only some the prepared statements + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a==NULL) { // no prepared statement was found in global + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt); + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid); + local_stmts->insert(stmid,stmt); + } + } + fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", i, GloMyStmt->total_prepared_statements()); + fprintf(stdout, "Created in: "); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u random strings in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u hashes in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) founds++; + } + fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds); + } + + { + unsigned int founds=0; + unsigned int created=0; + unsigned int executed=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) { + // we have a prepared statement, we can run it + MYSQL_STMT *stm=local_stmts->find(a->statement_id); + if (stm) { // the statement exists in local + run_stmt(stm,(uint32_t)mt_rand()); + founds++; + executed++; + local_stmts->erase(a->statement_id); + } else { // the statement doesn't exist locally + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + local_stmts->insert(a->statement_id,stmt); + run_stmt(stmt,(uint32_t)mt_rand()); + created++; + executed++; + local_stmts->erase(a->statement_id); + } + } else { // no prepared statement was found in global + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + uint32_t stmid=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt); + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid); + local_stmts->insert(stmid,stmt); + run_stmt(stmt,(uint32_t)mt_rand()); + created++; + executed++; + local_stmts->erase(stmid); + } + } + fprintf(stdout, "Found %u , created %u and executed %u prepared statements in: ", founds, created, executed); + } +*/ +/* + { + // for comparison, we run also queries in TEXT protocol + cpu_timer t; + for (i=0; iinit(); + + // create a new MySQL_STMT_Manager() + GloMyStmt=new MySQL_STMT_Manager(); + GloThrData = (thread_data_t **)malloc(sizeof(thread_data_t *)*NTHREADS); + + // starts N threads + int i; + for (i=0; ithread = new std::thread(&mysql_thread,i); + } + while (glovars.shutdown==0) { + sleep(1); // FIXME: TERRIBLE UGLY + } + // wait for the threads to complete + for (i=0; ithread->join(); + } + return 0; +} diff --git a/test/PrepStmt/client7.cpp b/test/PrepStmt/client7.cpp new file mode 100644 index 000000000..470669a73 --- /dev/null +++ b/test/PrepStmt/client7.cpp @@ -0,0 +1,369 @@ +/* +this is a modified version of client5.cpp +that supports async calls +*/ + + + +#include "proxysql.h" +#include "cpp.h" + +#include +#include + +#define QUERY1 "SELECT ?" +#define NUMPREP 100000 +#define NUMPRO 20000 +//#define NUMPREP 160 +//#define NUMPRO 4 +#define LOOPS 1 +#define USER "root" +#define SCHEMA "test" + +#define NTHREADS 4 + +static int wait_for_mysql(MYSQL *mysql, int status) { + struct pollfd pfd; + int timeout, res; + + pfd.fd = mysql_get_socket(mysql); + pfd.events = + (status & MYSQL_WAIT_READ ? POLLIN : 0) | + (status & MYSQL_WAIT_WRITE ? POLLOUT : 0) | + (status & MYSQL_WAIT_EXCEPT ? POLLPRI : 0); +// if (status & MYSQL_WAIT_TIMEOUT) +// timeout = 1000*mysql_get_timeout_value(mysql); +// else + timeout = -1; + res = poll(&pfd, 1, timeout); + if (res == 0) + return MYSQL_WAIT_TIMEOUT; + else if (res < 0) + return MYSQL_WAIT_TIMEOUT; + else { + int status = 0; + if (pfd.revents & POLLIN) status |= MYSQL_WAIT_READ; + if (pfd.revents & POLLOUT) status |= MYSQL_WAIT_WRITE; + if (pfd.revents & POLLPRI) status |= MYSQL_WAIT_EXCEPT; + return status; + } +} + + + + + +MySQL_STMT_Manager *GloMyStmt; + +typedef struct _thread_data_t { + std::thread *thread; + MYSQL *mysql; + MYSQL_STMT **stmt; +} thread_data_t; + + +thread_data_t **GloThrData; + +struct cpu_timer +{ + ~cpu_timer() + { + auto end = std::clock() ; + std::cout << double( end - begin ) / CLOCKS_PER_SEC << " secs.\n" ; + }; + + const std::clock_t begin = std::clock() ; +}; + + +int run_stmt(MYSQL *mysql, MYSQL_STMT *stmt, int int_data) { + int async_exit_status=0; + int ret_int; + MYSQL_BIND bind[1]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check +/* blocking + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +*/ + async_exit_status=mysql_stmt_execute_start(&ret_int, stmt); + while (async_exit_status) { + async_exit_status=wait_for_mysql(mysql, async_exit_status); + async_exit_status=mysql_stmt_execute_cont(&ret_int, stmt, async_exit_status); + } + if (ret_int) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + async_exit_status=mysql_stmt_store_result_start(&ret_int, stmt); + while (async_exit_status) { + async_exit_status=wait_for_mysql(mysql, async_exit_status); + async_exit_status=mysql_stmt_store_result_cont(&ret_int, stmt, async_exit_status); + } + if (ret_int) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +void * mysql_thread(int tid) { + std::mt19937 mt_rand(time(0)); + + thread_data_t *THD; + THD=GloThrData[tid]; + + // in this version, each mysql thread has just ONE connection + // for now we use blocking API + MYSQL *mysql; + + //MYSQL_STMT **stmt; + + // we intialize the local mapping : MySQL_STMTs_local() + MySQL_STMTs_local *local_stmts=new MySQL_STMTs_local(); + + // we initialize a MYSQL structure + THD->mysql = mysql_init(NULL); + mysql=THD->mysql; + + char buff[128]; + unsigned int bl=0; + + // we establish a connection to the database + if (!mysql_real_connect(mysql,"127.0.0.1",USER,"",SCHEMA,3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + int i; + //set not blocking + mysql_options(mysql, MYSQL_OPT_NONBLOCK, 0); + int async_exit_status=0; + int ret_int; + // array of (MYSQL_STMT *) ; we don't use it in this version + //stmt=(MYSQL_STMT **)malloc(sizeof(MYSQL_STMT*)*NUMPREP); + + MYSQL_STMT *stmt; + { + cpu_timer t; + // in this loop we create only some the prepared statements + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a==NULL) { // no prepared statement was found in global + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } +/* blocking call + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +*/ + async_exit_status=mysql_stmt_prepare_start(&ret_int, stmt, buff, bl); + while (async_exit_status) { + async_exit_status=wait_for_mysql(mysql, async_exit_status); + async_exit_status=mysql_stmt_prepare_cont(&ret_int, stmt, async_exit_status); + } + if (ret_int) { + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + + MySQL_STMT_Global_info *stmt_info=NULL; + stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt); + uint32_t stmid=stmt_info->statement_id; + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid); + local_stmts->insert(stmid,stmt); + } + } + fprintf(stdout, "Prepared statements: %u client, %u proxy/server. ", i, GloMyStmt->total_prepared_statements()); + fprintf(stdout, "Created in: "); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u random strings in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + //MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + //if (a) founds++; + } + fprintf(stdout, "Computed %u hashes in: ", i); + } + { + unsigned int founds=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) founds++; + } + fprintf(stdout, "Found %u prepared statements searching by hash in: ", founds); + } + + { + unsigned int founds=0; + unsigned int created=0; + unsigned int executed=0; + cpu_timer t; + for (i=0; icompute_hash(0,(char *)USER,(char *)SCHEMA,buff,bl); + MySQL_STMT_Global_info *a=GloMyStmt->find_prepared_statement_by_hash(hash); + if (a) { + // we have a prepared statement, we can run it + MYSQL_STMT *stm=local_stmts->find(a->statement_id); + if (stm) { // the statement exists in local + run_stmt(mysql, stm,(uint32_t)mt_rand()); + founds++; + executed++; + local_stmts->erase(a->statement_id); + } else { // the statement doesn't exist locally + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } +/* blocking + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +*/ + async_exit_status=mysql_stmt_prepare_start(&ret_int, stmt, buff, bl); + while (async_exit_status) { + async_exit_status=wait_for_mysql(mysql, async_exit_status); + async_exit_status=mysql_stmt_prepare_cont(&ret_int, stmt, async_exit_status); + } + if (ret_int) { + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + local_stmts->insert(a->statement_id,stmt); + run_stmt(mysql, stmt,(uint32_t)mt_rand()); + created++; + executed++; + local_stmts->erase(a->statement_id); + } + } else { // no prepared statement was found in global + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } +/* blocking + if (mysql_stmt_prepare(stmt, buff, bl)) { // the prepared statement is created + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +*/ + async_exit_status=mysql_stmt_prepare_start(&ret_int, stmt, buff, bl); + while (async_exit_status) { + async_exit_status=wait_for_mysql(mysql, async_exit_status); + async_exit_status=mysql_stmt_prepare_cont(&ret_int, stmt, async_exit_status); + } + if (ret_int) { + fprintf(stderr, " mysql_stmt_prepare(), failed: %s\n" , mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + MySQL_STMT_Global_info *stmt_info=NULL; + stmt_info=GloMyStmt->add_prepared_statement(0,(char *)USER,(char *)SCHEMA,buff,bl,stmt); + uint32_t stmid=stmt_info->statement_id; + if (NUMPRO < 32) + fprintf(stdout, "SERVER_statement_id=%lu , PROXY_statement_id=%u\n", stmt->stmt_id, stmid); + local_stmts->insert(stmid,stmt); + run_stmt(mysql, stmt,(uint32_t)mt_rand()); + created++; + executed++; + local_stmts->erase(stmid); + } + } + fprintf(stdout, "Found %u , created %u and executed %u prepared statements in: ", founds, created, executed); + } +/* + { + // for comparison, we run also queries in TEXT protocol + cpu_timer t; + for (i=0; ithread = new std::thread(&mysql_thread,i); + } + // wait for the threads to complete + for (i=0; ithread->join(); + } + return 0; +} diff --git a/test/PrepStmt/client8 b/test/PrepStmt/client8 new file mode 100755 index 000000000..a7b324b89 Binary files /dev/null and b/test/PrepStmt/client8 differ diff --git a/test/PrepStmt/client8.cpp b/test/PrepStmt/client8.cpp new file mode 100644 index 000000000..07e1efa3c --- /dev/null +++ b/test/PrepStmt/client8.cpp @@ -0,0 +1,79 @@ +#include "proxysql.h" +#include "cpp.h" + +// dummy copy of client1.cpp + +#define QUERY1 "SELECT * from sbtest limit 10" +MYSQL *mysql; +MYSQL_STMT *stmt; +uint32_t statement_id; +uint16_t num_params; +uint16_t num_columns; +uint16_t warning_count; + +int run_stmt(MYSQL_STMT *stmt, int int_data) { + MYSQL_BIND bind[3]; + MYSQL_RES *prepare_meta_result; + bind[0].buffer_type= MYSQL_TYPE_LONG; + bind[0].buffer= (char *)&int_data; + bind[0].is_null= 0; + bind[0].length= 0; + bind[1].buffer_type= MYSQL_TYPE_LONG; + bind[1].buffer= (char *)&int_data; + bind[1].is_null= 0; + bind[1].length= 0; + bind[2].buffer_type= MYSQL_TYPE_LONG; + bind[2].buffer= (char *)&int_data; + bind[2].is_null= 0; + bind[2].length= 0; +/* + if (mysql_stmt_bind_param(stmt, bind)) { + fprintf(stderr, " mysql_stmt_bind_param() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +*/ + prepare_meta_result = mysql_stmt_result_metadata(stmt); // FIXME: no error check + if (mysql_stmt_execute(stmt)) { + fprintf(stderr, " mysql_stmt_execute(), 1 failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// memset(bind, 0, sizeof(bind)); + if (mysql_stmt_store_result(stmt)) { + fprintf(stderr, " mysql_stmt_store_result() failed\n"); + fprintf(stderr, " %s\n", mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } + mysql_free_result(prepare_meta_result); + return 0; +} + + +int main() { + std::mt19937 mt_rand(time(0)); + mysql = mysql_init(NULL); + if (!mysql_real_connect(mysql,"127.0.0.1","msandbox","msandbox","sbtest",6033,NULL,0)) { + //if (!mysql_real_connect(mysql,"127.0.0.1","root","","test",3306,NULL,0)) { + fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(mysql)); + exit(EXIT_FAILURE); + } + stmt = mysql_stmt_init(mysql); + if (!stmt) { + fprintf(stderr, " mysql_stmt_init(), out of memory\n"); + exit(EXIT_FAILURE); + } + if (mysql_stmt_prepare(stmt, QUERY1, strlen(QUERY1))) { + fprintf(stderr, " mysql_stmt_prepare(), failed: %d, %s\n" , mysql_errno(mysql), mysql_stmt_error(stmt)); + exit(EXIT_FAILURE); + } +// param_count= mysql_stmt_param_count(stmt); +// fprintf(stdout, " total parameters in Query1 : %d\n", param_count); + statement_id=stmt->stmt_id; + num_params=stmt->param_count; + num_columns=stmt->field_count; + warning_count=stmt->upsert_status.warning_count; + fprintf(stdout, "statement_id=%d , columns=%d , params=%d , warnings=%d\n", statement_id, num_columns, num_params, warning_count); + run_stmt(stmt,(uint32_t)mt_rand()); + return 0; +}