diff --git a/include/MySQL_PreparedStatement.h b/include/MySQL_PreparedStatement.h index a1f7c7657..fad830bc2 100644 --- a/include/MySQL_PreparedStatement.h +++ b/include/MySQL_PreparedStatement.h @@ -37,25 +37,32 @@ To summarie the most important classes: // class MySQL_STMT_Global_info represents information about a MySQL Prepared Statement // it is an internal representation of prepared statement // it include all metadata associated with it + +#define PROXYSQL_STMT_V14 + class MySQL_STMT_Global_info { private: void compute_hash(); - public: + public: uint64_t digest; MYSQL_COM_QUERY_command MyComQueryCmd; char * digest_text; - uint64_t hash; - char *username; - char *schemaname; - char *query; - unsigned int query_length; + uint64_t hash; + char *username; + char *schemaname; + char *query; + unsigned int query_length; unsigned int hostgroup_id; int ref_count_client; int ref_count_server; - uint32_t statement_id; - uint16_t num_columns; - uint16_t num_params; - uint16_t warning_count; +#ifndef PROXYSQL_STMT_V14 + uint32_t statement_id; +#else + uint64_t statement_id; +#endif + uint16_t num_columns; + uint16_t num_params; + uint16_t warning_count; MYSQL_FIELD **fields; struct { int cache_ttl; @@ -64,7 +71,11 @@ class MySQL_STMT_Global_info { } properties; bool is_select_NOT_for_update; MYSQL_BIND **params; // seems unused (?) +#ifndef PROXYSQL_STMT_V14 MySQL_STMT_Global_info(uint32_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h); +#else + MySQL_STMT_Global_info(uint64_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h); +#endif ~MySQL_STMT_Global_info(); }; @@ -162,12 +173,14 @@ class MySQL_STMTs_meta { } }; +#ifndef PROXYSQL_STMT_V14 // class MySQL_STMTs_local associates a global statement ID with a local statement ID for a specific connection class MySQL_STMTs_local { private: unsigned int num_entries; bool is_client; + std::map client_stmt_to_global_id; std::map m; public: MySQL_Session *sess; @@ -191,6 +204,8 @@ class MySQL_STMTs_local { } return NULL; // not found } + uint32_t generate_new_stmt_id(uint32_t global_statement_id); + uint32_t find_original_id(uint32_t client_stmt_id); bool exists(uint32_t global_statement_id) { auto s=m.find(global_statement_id); if (s!=m.end()) { // found @@ -203,8 +218,6 @@ class MySQL_STMTs_local { unsigned int get_num_entries() { return num_entries; } }; - - class MySQL_STMT_Manager { private: uint32_t next_statement_id; @@ -215,6 +228,9 @@ class MySQL_STMT_Manager { public: MySQL_STMT_Manager(); ~MySQL_STMT_Manager(); + uint32_t generate_new_stmt_id() { + return __sync_add_and_fetch(&next_statement_id,1); + } int ref_count(uint32_t statement_id, int cnt, bool lock, bool is_client); MySQL_STMT_Global_info * add_prepared_statement(bool *is_new, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock=true); MySQL_STMT_Global_info * add_prepared_statement(bool *is_new, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock=true); @@ -224,4 +240,73 @@ class MySQL_STMT_Manager { void active_prepared_statements(uint32_t *unique, uint32_t *total); }; +#else // PROXYSQL_STMT_V14 +class MySQL_STMTs_local_v14 { + private: + bool is_client_; + // this map associate client_stmt_id to global_stmt_id : this is used only for client connections + std::map client_stmt_to_global_ids; + // this multimap associate global_stmt_id to client_stmt_id : this is used only for client connections + std::multimap global_stmt_to_client_ids; + + // this map associate backend_stmt_id to global_stmt_id : this is used only for backend connections + std::map backend_stmt_to_global_ids; + // this map associate global_stmt_id to backend_stmt_id : this is used only for backend connections + std::map global_stmt_to_backend_ids; + + std::map global_stmt_to_backend_stmt; + + std::stack free_client_ids; + uint32_t local_max_stmt_id; + public: + MySQL_Session *sess; + MySQL_STMTs_local_v14(bool _ic) { + local_max_stmt_id = 0; + sess = NULL; + is_client_ = _ic; + } + void set_is_client(MySQL_Session *_s) { + sess=_s; + is_client_ = true; + } + ~MySQL_STMTs_local_v14(); + bool is_client() { + return is_client_; + } + void backend_insert(uint64_t global_statement_id, MYSQL_STMT *stmt); + uint64_t compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length); + unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); } + uint32_t generate_new_client_stmt_id(uint64_t global_statement_id); + uint64_t find_global_stmt_id_from_client(uint32_t client_stmt_id); + bool client_close(uint32_t client_statement_id); + MYSQL_STMT * find_backend_stmt_by_global_id(uint32_t global_statement_id) { + auto s=global_stmt_to_backend_stmt.find(global_statement_id); + if (s!=global_stmt_to_backend_stmt.end()) { // found + return s->second; + } + return NULL; // not found + } +}; + +class MySQL_STMT_Manager_v14 { + private: + uint64_t next_statement_id; + pthread_rwlock_t rwlock_; + std::map map_stmt_id_to_info; // map using statement id + std::map map_stmt_hash_to_info; // map using hashes + public: + MySQL_STMT_Manager_v14(); + ~MySQL_STMT_Manager_v14(); + MySQL_STMT_Global_info * find_prepared_statement_by_hash(uint64_t hash, bool lock=true); + MySQL_STMT_Global_info * find_prepared_statement_by_stmt_id(uint64_t id, bool lock=true); + void rdlock() { pthread_rwlock_rdlock(&rwlock_); } + void wrlock() { pthread_rwlock_wrlock(&rwlock_); } + void unlock() { pthread_rwlock_unlock(&rwlock_); } + void ref_count_client(uint64_t _stmt, int _v, bool lock=true); + void ref_count_server(uint64_t _stmt, int _v, bool lock=true); + MySQL_STMT_Global_info * add_prepared_statement(unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock=true); + void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached, uint64_t *s_unique, uint64_t *s_total); +}; +#endif // PROXYSQL_STMT_V14 + #endif /* CLASS_MYSQL_PREPARED_STATEMENT_H */ diff --git a/include/MySQL_Protocol.h b/include/MySQL_Protocol.h index 4f459850c..3d92e7eb9 100644 --- a/include/MySQL_Protocol.h +++ b/include/MySQL_Protocol.h @@ -101,7 +101,7 @@ class MySQL_Protocol { void * Query_String_to_packet(uint8_t sid, std::string *s, unsigned int *l); // prepared statements - bool generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info); + bool generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info, uint32_t _stmt_id=0); stmt_execute_metadata_t * get_binds_from_pkt(void *ptr, unsigned int size, MySQL_STMT_Global_info *stmt_info, stmt_execute_metadata_t **stmt_meta); }; diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index 1350ec5b7..9f4f5ed49 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -27,7 +27,11 @@ class Query_Info { MYSQL_STMT *mysql_stmt; stmt_execute_metadata_t *stmt_meta; +#ifndef PROXYSQL_STMT_V14 uint32_t stmt_global_id; +#else + uint64_t stmt_global_id; +#endif MySQL_STMT_Global_info *stmt_info; int QueryLength; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index a6ed700d4..2aaa2971a 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -193,9 +193,12 @@ class MySQL_Thread // in this way, there is no need for atomic operation and there is no cache miss // when it is needed a total, all threads are checked struct { - unsigned long long stmt_prepare; - unsigned long long stmt_execute; - unsigned long long stmt_close; + unsigned long long backend_stmt_prepare; + unsigned long long backend_stmt_execute; + unsigned long long backend_stmt_close; + unsigned long long frontend_stmt_prepare; + unsigned long long frontend_stmt_execute; + unsigned long long frontend_stmt_close; unsigned long long queries; unsigned long long queries_slow; unsigned long long queries_backends_bytes_sent; @@ -411,9 +414,12 @@ class MySQL_Threads_Handler SQLite3_result * SQL3_GlobalStatus(); bool kill_session(uint32_t _thread_session_id); unsigned long long get_total_mirror_queue(); - unsigned long long get_total_stmt_prepare(); - unsigned long long get_total_stmt_execute(); - unsigned long long get_total_stmt_close(); + unsigned long long get_total_backend_stmt_prepare(); + unsigned long long get_total_backend_stmt_execute(); + unsigned long long get_total_backend_stmt_close(); + unsigned long long get_total_frontend_stmt_prepare(); + unsigned long long get_total_frontend_stmt_execute(); + unsigned long long get_total_frontend_stmt_close(); unsigned long long get_total_queries(); unsigned long long get_slow_queries(); unsigned long long get_queries_backends_bytes_recv(); diff --git a/include/mysql_connection.h b/include/mysql_connection.h index d740945a7..56d2db4bd 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -65,7 +65,11 @@ class MySQL_Connection { unsigned long long last_time_used; unsigned long long timeout; int fd; +#ifndef PROXYSQL_STMT_V14 MySQL_STMTs_local *local_stmts; // local view of prepared statements +#else + MySQL_STMTs_local_v14 *local_stmts; // local view of prepared statements +#endif MYSQL *mysql; MYSQL *ret_mysql; MYSQL_RES *mysql_result; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 4ef1d34e6..5a961cd93 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -1004,7 +1004,11 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lo } if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { if (c->async_state_machine==ASYNC_IDLE) { +#ifndef PROXYSQL_STMT_V14 if (c->local_stmts->get_num_entries() > (unsigned int)GloMTH->variables.max_stmts_per_connection) { +#else + if (c->local_stmts->get_num_backend_stmts() > (unsigned int)GloMTH->variables.max_stmts_per_connection) { +#endif proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d because has too many prepared statements\n", c, mysrvc->address, mysrvc->port, mysrvc->status); delete c; } else { diff --git a/lib/MySQL_PreparedStatement.cpp b/lib/MySQL_PreparedStatement.cpp index 307b5f288..5c377b162 100644 --- a/lib/MySQL_PreparedStatement.cpp +++ b/lib/MySQL_PreparedStatement.cpp @@ -3,10 +3,14 @@ #include "SpookyV2.h" +#ifndef PROXYSQL_STMT_V14 extern MySQL_STMT_Manager *GloMyStmt; - static uint32_t add_prepared_statement_calls = 0; static uint32_t find_prepared_statement_by_hash_calls = 0; +#else +extern MySQL_STMT_Manager_v14 *GloMyStmt; +#endif + static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, @@ -57,6 +61,223 @@ void MySQL_STMT_Global_info::compute_hash() { query_length); } +StmtLongDataHandler::StmtLongDataHandler() { long_datas = new PtrArray(); } + +StmtLongDataHandler::~StmtLongDataHandler() { + while (long_datas->len) { + stmt_long_data_t *sld = + (stmt_long_data_t *)long_datas->remove_index_fast(0); + free(sld->data); + free(sld); + } + delete long_datas; +} + +bool StmtLongDataHandler::add(uint32_t _stmt_id, uint16_t _param_id, + void *_data, unsigned long _size) { + stmt_long_data_t *sld = NULL; + unsigned int i; + for (i = 0; i < long_datas->len; i++) { + sld = (stmt_long_data_t *)long_datas->index(i); + if (sld->stmt_id == _stmt_id && sld->param_id == _param_id) { + // we found it! + unsigned long _new_size = sld->size + _size; + sld->data = realloc(sld->data, _new_size); + memcpy((unsigned char *)sld->data + sld->size, _data, _size); + sld->size = _new_size; + return true; + } + } + // if we reached here, we didn't find it + sld = (stmt_long_data_t *)malloc(sizeof(stmt_long_data_t)); + sld->stmt_id = _stmt_id; + sld->param_id = _param_id; + sld->size = _size; + sld->data = malloc(_size); + memcpy(sld->data, _data, _size); + long_datas->add(sld); + return false; // a new entry was created +} + +unsigned int StmtLongDataHandler::reset(uint32_t _stmt_id) { + unsigned int cnt = 0; + int i; + stmt_long_data_t *sld = NULL; + for (i = 0; i < (int)long_datas->len; + i++) { // we treat it as an int, so we can go to -1 + sld = (stmt_long_data_t *)long_datas->index(i); + if (sld->stmt_id == _stmt_id) { + sld = (stmt_long_data_t *)long_datas->remove_index_fast(i); + free(sld->data); + free(sld); + i--; + cnt++; + } + } + return cnt; +} + +void *StmtLongDataHandler::get(uint32_t _stmt_id, uint16_t _param_id, + unsigned long **_size) { + stmt_long_data_t *sld = NULL; + unsigned int i; + for (i = 0; i < long_datas->len; i++) { + sld = (stmt_long_data_t *)long_datas->index(i); + if (sld->stmt_id == _stmt_id && sld->param_id == _param_id) { + // we found it! + *_size = &sld->size; + return sld->data; + } + } + return NULL; +} + +#ifndef PROXYSQL_STMT_V14 +MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint32_t id, unsigned int h, +#else +MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, unsigned int h, +#endif + char *u, char *s, char *q, + unsigned int ql, + MYSQL_STMT *stmt, uint64_t _h) { + statement_id = id; + hostgroup_id = h; + ref_count_client = 0; + ref_count_server = 0; + digest_text = NULL; + username = strdup(u); + schemaname = strdup(s); + query = (char *)malloc(ql + 1); + memcpy(query, q, ql); + query[ql] = '\0'; // add NULL byte + query_length = ql; + num_params = stmt->param_count; + num_columns = stmt->field_count; + warning_count = stmt->upsert_status.warning_count; + if (_h) { + hash = _h; + } else { + compute_hash(); + } + + is_select_NOT_for_update = false; + { // see bug #899 . Most of the code is borrowed from + // Query_Info::is_select_NOT_for_update() + if (ql >= 7) { + if (strncasecmp(q, (char *)"SELECT ", 7) == 0) { // is a SELECT + is_select_NOT_for_update = true; + if (ql >= 17) { + char *p = (char *)q; + p += ql - 11; + if (strncasecmp(p, " FOR UPDATE", 11) == + 0) { // is a SELECT FOR UPDATE + is_select_NOT_for_update = false; + } + } + } + } + } + + // set default properties: + properties.cache_ttl = -1; + properties.timeout = -1; + properties.delay = -1; + + fields = NULL; + if (num_columns) { + fields = (MYSQL_FIELD **)malloc(num_columns * sizeof(MYSQL_FIELD *)); + uint16_t i; + for (i = 0; i < num_columns; i++) { + fields[i] = (MYSQL_FIELD *)malloc(sizeof(MYSQL_FIELD)); + MYSQL_FIELD *fs = &(stmt->fields[i]); + MYSQL_FIELD *fd = fields[i]; + // first copy all fields + memcpy(fd, fs, sizeof(MYSQL_FIELD)); + // then duplicate strings + fd->name = (fs->name ? strdup(fs->name) : NULL); + fd->org_name = (fs->org_name ? strdup(fs->org_name) : NULL); + fd->table = (fs->table ? strdup(fs->table) : NULL); + fd->org_table = (fs->org_table ? strdup(fs->org_table) : NULL); + fd->db = (fs->db ? strdup(fs->db) : NULL); + fd->catalog = (fs->catalog ? strdup(fs->catalog) : NULL); + fd->def = (fs->def ? strdup(fs->def) : NULL); + } + } + + params = NULL; + if (num_params == 2) { + PROXY_TRACE(); + } + if (num_params) { + params = (MYSQL_BIND **)malloc(num_params * sizeof(MYSQL_BIND *)); + uint16_t i; + for (i = 0; i < num_params; i++) { + params[i] = (MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)); + // MYSQL_BIND *ps=&(stmt->params[i]); + // MYSQL_BIND *pd=params[i]; + // copy all params + // memcpy(pd,ps,sizeof(MYSQL_BIND)); + memset(params[i], 0, sizeof(MYSQL_BIND)); + } + } +} + +MySQL_STMT_Global_info::~MySQL_STMT_Global_info() { + free(username); + free(schemaname); + free(query); + if (num_columns) { + uint16_t i; + for (i = 0; i < num_columns; i++) { + MYSQL_FIELD *f = fields[i]; + if (f->name) { + free(f->name); + f->name = NULL; + } + if (f->org_name) { + free(f->org_name); + f->org_name = NULL; + } + if (f->table) { + free(f->table); + f->table = NULL; + } + if (f->org_table) { + free(f->org_table); + f->org_table = NULL; + } + if (f->db) { + free(f->db); + f->db = NULL; + } + if (f->catalog) { + free(f->catalog); + f->catalog = NULL; + } + if (f->def) { + free(f->def); + f->def = NULL; + } + free(fields[i]); + } + free(fields); + fields = NULL; + } + + if (num_params) { + uint16_t i; + for (i = 0; i < num_params; i++) { + free(params[i]); + } + free(params); + params = NULL; + } + if (digest_text) { + free(digest_text); + digest_text = NULL; + } +} +#ifndef PROXYSQL_STMT_V14 uint64_t MySQL_STMTs_local::compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length) { @@ -89,6 +310,23 @@ MySQL_STMTs_local::~MySQL_STMTs_local() { m.erase(m.begin(), m.end()); } +uint32_t MySQL_STMTs_local::generate_new_stmt_id(uint32_t global_statement_id) { + uint32_t new_id; + new_id = GloMyStmt->generate_new_stmt_id(); + client_stmt_to_global_id.insert(std::make_pair(global_statement_id, new_id)); + return new_id; +} + + +uint32_t MySQL_STMTs_local::find_original_id(uint32_t client_stmt_id) { + auto s = client_stmt_to_global_id.find(client_stmt_id); + if (s != client_stmt_to_global_id.end()) { + uint32_t ret=s->second; + return ret; + } + return 0; +} + bool MySQL_STMTs_local::erase(uint32_t global_statement_id) { auto s = m.find(global_statement_id); if (s != m.end()) { // found @@ -257,8 +495,9 @@ MySQL_STMT_Global_info *MySQL_STMT_Manager::add_prepared_statement( next_id = free_stmt_ids.top(); free_stmt_ids.pop(); } else { - next_id = next_statement_id; - next_statement_id++; + // next_id = next_statement_id; + // next_statement_id++; + __sync_fetch_and_add(&next_statement_id, 1); } MySQL_STMT_Global_info *a = new MySQL_STMT_Global_info(next_id, _h, u, s, q, ql, stmt, hash); @@ -327,215 +566,291 @@ MySQL_STMT_Global_info *MySQL_STMT_Manager::find_prepared_statement_by_hash( return ret; } -MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint32_t id, unsigned int h, - char *u, char *s, char *q, - unsigned int ql, - MYSQL_STMT *stmt, uint64_t _h) { - statement_id = id; - hostgroup_id = h; - ref_count_client = 0; - ref_count_server = 0; - digest_text = NULL; - username = strdup(u); - schemaname = strdup(s); - query = (char *)malloc(ql + 1); - memcpy(query, q, ql); - query[ql] = '\0'; // add NULL byte - query_length = ql; - num_params = stmt->param_count; - num_columns = stmt->field_count; - warning_count = stmt->upsert_status.warning_count; - if (_h) { - hash = _h; - } else { - compute_hash(); - } - is_select_NOT_for_update = false; - { // see bug #899 . Most of the code is borrowed from - // Query_Info::is_select_NOT_for_update() - if (ql >= 7) { - if (strncasecmp(q, (char *)"SELECT ", 7) == 0) { // is a SELECT - is_select_NOT_for_update = true; - if (ql >= 17) { - char *p = (char *)q; - p += ql - 11; - if (strncasecmp(p, " FOR UPDATE", 11) == - 0) { // is a SELECT FOR UPDATE - is_select_NOT_for_update = false; - } - } + +#else // PROXYSQL_STMT_V14 +extern MySQL_STMT_Manager_v14 *GloMyStmt; + +void MySQL_STMTs_local_v14::backend_insert(uint64_t global_statement_id, MYSQL_STMT *stmt) { + std::pair::iterator, bool> ret; + ret = global_stmt_to_backend_stmt.insert(std::make_pair(global_statement_id, stmt)); + global_stmt_to_backend_ids.insert(std::make_pair(global_statement_id,stmt->stmt_id)); + backend_stmt_to_global_ids.insert(std::make_pair(stmt->stmt_id,global_statement_id)); + // note: backend_insert() is always called after add_prepared_statement() + // for this reason, we will the ref count increase in add_prepared_statement() + // GloMyStmt->ref_count_client(global_statement_id, 1); +} + +uint64_t MySQL_STMTs_local_v14::compute_hash(unsigned int hostgroup, char *user, + char *schema, char *query, + unsigned int query_length) { + uint64_t hash; + hash = stmt_compute_hash(hostgroup, user, schema, query, query_length); + return hash; +} + +MySQL_STMT_Manager_v14::MySQL_STMT_Manager_v14() { + pthread_rwlock_init(&rwlock_, NULL); + next_statement_id = + 1; // we initialize this as 1 because we 0 is not allowed +} + +MySQL_STMT_Manager_v14::~MySQL_STMT_Manager_v14() { +} + +void MySQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lock) { + if (lock) + pthread_rwlock_wrlock(&rwlock_); + auto s = map_stmt_id_to_info.find(_stmt_id); + if (s != map_stmt_id_to_info.end()) { + MySQL_STMT_Global_info *stmt_info = s->second; + stmt_info->ref_count_client += _v; + } + if (lock) + pthread_rwlock_unlock(&rwlock_); +} + +void MySQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lock) { + if (lock) + pthread_rwlock_wrlock(&rwlock_); + auto s = map_stmt_id_to_info.find(_stmt_id); + if (s != map_stmt_id_to_info.end()) { + MySQL_STMT_Global_info *stmt_info = s->second; + stmt_info->ref_count_server += _v; + } + if (lock) + pthread_rwlock_unlock(&rwlock_); +} + +MySQL_STMTs_local_v14::~MySQL_STMTs_local_v14() { + // Note: we do not free the prepared statements because we assume that + // if we call this destructor the connection is being destroyed anyway + + if (is_client_) { + for (std::map::iterator it = client_stmt_to_global_ids.begin(); + it != client_stmt_to_global_ids.end(); ++it) { + uint64_t global_stmt_id = it->second; + GloMyStmt->ref_count_client(global_stmt_id, -1); + } + } else { + for (std::map::iterator it = global_stmt_to_backend_stmt.begin(); + it != global_stmt_to_backend_stmt.end(); ++it) { + uint64_t global_stmt_id = it->first; + MYSQL_STMT *stmt = it->second; + if (stmt->mysql) { + stmt->mysql->stmts = + list_delete(stmt->mysql->stmts, &stmt->list); + } + stmt->mysql = NULL; + mysql_stmt_close(stmt); + GloMyStmt->ref_count_server(global_stmt_id, -1); + } + } +/* + for (std::map::iterator it = m.begin(); + it != m.end(); ++it) { + uint32_t stmt_id = it->first; + MYSQL_STMT *stmt = it->second; + if (stmt) { // is a server + if (stmt->mysql) { + stmt->mysql->stmts = + list_delete(stmt->mysql->stmts, &stmt->list); } + // we do a hack here: we pretend there is no server associate + // the connection will be dropped anyway immediately after + stmt->mysql = NULL; + mysql_stmt_close(stmt); + GloMyStmt->ref_count(stmt_id, -1, true, false); + } else { // is a client + GloMyStmt->ref_count(stmt_id, -1, true, true); } } + m.erase(m.begin(), m.end()); +*/ +} - // set default properties: - properties.cache_ttl = -1; - properties.timeout = -1; - properties.delay = -1; - fields = NULL; - if (num_columns) { - fields = (MYSQL_FIELD **)malloc(num_columns * sizeof(MYSQL_FIELD *)); - uint16_t i; - for (i = 0; i < num_columns; i++) { - fields[i] = (MYSQL_FIELD *)malloc(sizeof(MYSQL_FIELD)); - MYSQL_FIELD *fs = &(stmt->fields[i]); - MYSQL_FIELD *fd = fields[i]; - // first copy all fields - memcpy(fd, fs, sizeof(MYSQL_FIELD)); - // then duplicate strings - fd->name = (fs->name ? strdup(fs->name) : NULL); - fd->org_name = (fs->org_name ? strdup(fs->org_name) : NULL); - fd->table = (fs->table ? strdup(fs->table) : NULL); - fd->org_table = (fs->org_table ? strdup(fs->org_table) : NULL); - fd->db = (fs->db ? strdup(fs->db) : NULL); - fd->catalog = (fs->catalog ? strdup(fs->catalog) : NULL); - fd->def = (fs->def ? strdup(fs->def) : NULL); - } +MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::find_prepared_statement_by_hash( + uint64_t hash, bool lock) { + MySQL_STMT_Global_info *ret = NULL; // assume we do not find it + if (lock) { + pthread_rwlock_wrlock(&rwlock_); } - params = NULL; - if (num_params == 2) { - PROXY_TRACE(); + auto s = map_stmt_hash_to_info.find(hash); + if (s != map_stmt_hash_to_info.end()) { + ret = s->second; + //__sync_fetch_and_add(&ret->ref_count_client,1); // increase reference + //count +// __sync_fetch_and_add(&find_prepared_statement_by_hash_calls, 1); +// __sync_fetch_and_add(&ret->ref_count_client, 1); } - if (num_params) { - params = (MYSQL_BIND **)malloc(num_params * sizeof(MYSQL_BIND *)); - uint16_t i; - for (i = 0; i < num_params; i++) { - params[i] = (MYSQL_BIND *)malloc(sizeof(MYSQL_BIND)); - // MYSQL_BIND *ps=&(stmt->params[i]); - // MYSQL_BIND *pd=params[i]; - // copy all params - // memcpy(pd,ps,sizeof(MYSQL_BIND)); - memset(params[i], 0, sizeof(MYSQL_BIND)); - } + + if (lock) { + pthread_rwlock_unlock(&rwlock_); } + return ret; } -MySQL_STMT_Global_info::~MySQL_STMT_Global_info() { - free(username); - free(schemaname); - free(query); - if (num_columns) { - uint16_t i; - for (i = 0; i < num_columns; i++) { - MYSQL_FIELD *f = fields[i]; - if (f->name) { - free(f->name); - f->name = NULL; - } - if (f->org_name) { - free(f->org_name); - f->org_name = NULL; - } - if (f->table) { - free(f->table); - f->table = NULL; - } - if (f->org_table) { - free(f->org_table); - f->org_table = NULL; - } - if (f->db) { - free(f->db); - f->db = NULL; - } - if (f->catalog) { - free(f->catalog); - f->catalog = NULL; - } - if (f->def) { - free(f->def); - f->def = NULL; - } - free(fields[i]); - } - free(fields); - fields = NULL; +MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::find_prepared_statement_by_stmt_id( + uint64_t id, bool lock) { + MySQL_STMT_Global_info *ret = NULL; // assume we do not find it + if (lock) { + pthread_rwlock_wrlock(&rwlock_); } - if (num_params) { - uint16_t i; - for (i = 0; i < num_params; i++) { - free(params[i]); - } - free(params); - params = NULL; + auto s = map_stmt_id_to_info.find(id); + if (s != map_stmt_id_to_info.end()) { + ret = s->second; } - if (digest_text) { - free(digest_text); - digest_text = NULL; + + if (lock) { + pthread_rwlock_unlock(&rwlock_); } + return ret; } -StmtLongDataHandler::StmtLongDataHandler() { long_datas = new PtrArray(); } +uint32_t MySQL_STMTs_local_v14::generate_new_client_stmt_id(uint64_t global_statement_id) { + uint32_t ret=0; + if (free_client_ids.size()) { + ret=free_client_ids.top(); + free_client_ids.pop(); + } else { + local_max_stmt_id+=1; + ret=local_max_stmt_id; + } + assert(ret); + client_stmt_to_global_ids.insert(std::make_pair(ret,global_statement_id)); + global_stmt_to_client_ids.insert(std::make_pair(global_statement_id,ret)); + GloMyStmt->ref_count_client(global_statement_id, 1, false); // do not lock! + return ret; +} -StmtLongDataHandler::~StmtLongDataHandler() { - while (long_datas->len) { - stmt_long_data_t *sld = - (stmt_long_data_t *)long_datas->remove_index_fast(0); - free(sld->data); - free(sld); +uint64_t MySQL_STMTs_local_v14::find_global_stmt_id_from_client(uint32_t client_stmt_id) { + uint64_t ret=0; + auto s = client_stmt_to_global_ids.find(client_stmt_id); + if (s != client_stmt_to_global_ids.end()) { + ret = s->second; } - delete long_datas; + return ret; } -bool StmtLongDataHandler::add(uint32_t _stmt_id, uint16_t _param_id, - void *_data, unsigned long _size) { - stmt_long_data_t *sld = NULL; - unsigned int i; - for (i = 0; i < long_datas->len; i++) { - sld = (stmt_long_data_t *)long_datas->index(i); - if (sld->stmt_id == _stmt_id && sld->param_id == _param_id) { - // we found it! - unsigned long _new_size = sld->size + _size; - sld->data = realloc(sld->data, _new_size); - memcpy((unsigned char *)sld->data + sld->size, _data, _size); - sld->size = _new_size; - return true; +bool MySQL_STMTs_local_v14::client_close(uint32_t client_statement_id) { + auto s = client_stmt_to_global_ids.find(client_statement_id); + if (s != client_stmt_to_global_ids.end()) { // found + uint64_t global_stmt_id = s->second; + client_stmt_to_global_ids.erase(s); + GloMyStmt->ref_count_client(global_stmt_id, -1); + auto s2 = global_stmt_to_client_ids.find(global_stmt_id); + std::pair::iterator, std::multimap::iterator> ret; + ret = global_stmt_to_client_ids.equal_range(global_stmt_id); + for (std::multimap::iterator it=ret.first; it!=ret.second; ++it) { + if (it->second==client_statement_id) { + free_client_ids.push(client_statement_id); + global_stmt_to_client_ids.erase(it); + break; + } } + return true; } - // if we reached here, we didn't find it - sld = (stmt_long_data_t *)malloc(sizeof(stmt_long_data_t)); - sld->stmt_id = _stmt_id; - sld->param_id = _param_id; - sld->size = _size; - sld->data = malloc(_size); - memcpy(sld->data, _data, _size); - long_datas->add(sld); - return false; // a new entry was created + return false; // we don't really remove the prepared statement } -unsigned int StmtLongDataHandler::reset(uint32_t _stmt_id) { - unsigned int cnt = 0; - int i; - stmt_long_data_t *sld = NULL; - for (i = 0; i < (int)long_datas->len; - i++) { // we treat it as an int, so we can go to -1 - sld = (stmt_long_data_t *)long_datas->index(i); - if (sld->stmt_id == _stmt_id) { - sld = (stmt_long_data_t *)long_datas->remove_index_fast(i); - free(sld->data); - free(sld); - i--; - cnt++; +MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement( + unsigned int _h, char *u, char *s, char *q, unsigned int ql, + MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock) { + MySQL_STMT_Global_info *ret = NULL; + uint64_t hash = stmt_compute_hash( + _h, u, s, q, ql); // this identifies the prepared statement + if (lock) { + pthread_rwlock_wrlock(&rwlock_); + } + // try to find the statement + auto f = map_stmt_hash_to_info.find(hash); + if (f != map_stmt_hash_to_info.end()) { + // found it! + // MySQL_STMT_Global_info *a=f->second; + // ret=a->statement_id; + ret = f->second; + //*is_new = false; + } else { + // FIXME: add a stack here too!!! + // we need to create a new one +/* + bool free_id_avail = false; + free_id_avail = free_stmt_ids.size(); + + uint32_t next_id = 0; + if (free_id_avail) { + next_id = free_stmt_ids.top(); + free_stmt_ids.pop(); + } else { + // next_id = next_statement_id; + // next_statement_id++; + __sync_fetch_and_add(&next_statement_id, 1); } +*/ + next_statement_id++; + MySQL_STMT_Global_info *a = + new MySQL_STMT_Global_info(next_statement_id, _h, u, s, q, ql, stmt, hash); + a->properties.cache_ttl = _cache_ttl; + a->properties.timeout = _timeout; + a->properties.delay = _delay; + // insert it in both maps + map_stmt_id_to_info.insert(std::make_pair(a->statement_id, a)); + map_stmt_hash_to_info.insert(std::make_pair(a->hash, a)); + // ret=a->statement_id; + ret = a; + // next_statement_id++; // increment it + //__sync_fetch_and_add(&ret->ref_count_client,1); // increase reference + //count +// __sync_fetch_and_add(&ret->ref_count_client, +// 1); // increase reference count +// *is_new = true; } - return cnt; + ret->ref_count_server++; +// __sync_fetch_and_add(&add_prepared_statement_calls, 1); +// __sync_fetch_and_add(&ret->ref_count_server, +// 1); // increase reference count + if (lock) { + pthread_rwlock_unlock(&rwlock_); + } + return ret; } -void *StmtLongDataHandler::get(uint32_t _stmt_id, uint16_t _param_id, - unsigned long **_size) { - stmt_long_data_t *sld = NULL; - unsigned int i; - for (i = 0; i < long_datas->len; i++) { - sld = (stmt_long_data_t *)long_datas->index(i); - if (sld->stmt_id == _stmt_id && sld->param_id == _param_id) { - // we found it! - *_size = &sld->size; - return sld->data; +void MySQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total, + uint64_t *stmt_max_stmt_id, uint64_t *cached, + uint64_t *s_unique, uint64_t *s_total) { + uint64_t c_u = 0; + uint64_t c_t = 0; + uint64_t m = 0; + uint64_t c = 0; + uint64_t s_u = 0; + uint64_t s_t = 0; + pthread_rwlock_wrlock(&rwlock_); + for (std::map::iterator it = map_stmt_id_to_info.begin(); + it != map_stmt_id_to_info.end(); ++it) { + MySQL_STMT_Global_info *a = it->second; + c++; + if (a->ref_count_client) { + c_u++; + c_t += a->ref_count_client; + } + if (a->ref_count_server) { + s_u++; + s_t += a->ref_count_server; + } + if (it->first > m) { + m = it->first; } } - return NULL; + pthread_rwlock_unlock(&rwlock_); + *c_unique = c_u; + *c_total = c_t; + *stmt_max_stmt_id = m; + *cached = c; + *s_unique = s_u; + *s_total = s_t; } + +#endif // PROXYSQL_STMT_V14 diff --git a/lib/MySQL_Protocol.cpp b/lib/MySQL_Protocol.cpp index cc5fb967c..18b18c026 100644 --- a/lib/MySQL_Protocol.cpp +++ b/lib/MySQL_Protocol.cpp @@ -695,7 +695,7 @@ bool MySQL_Protocol::generate_pkt_field(bool send, void **ptr, unsigned int *len // FIXME FIXME function not completed yet! // see https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html -bool MySQL_Protocol::generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info) { +bool MySQL_Protocol::generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_STMT_Global_info *stmt_info, uint32_t _stmt_id) { uint8_t sid=sequence_id; uint16_t i; char *okpack=(char *)malloc(16); // first packet @@ -705,7 +705,11 @@ bool MySQL_Protocol::generate_STMT_PREPARE_RESPONSE(uint8_t sequence_id, MySQL_S memcpy(okpack,&hdr,sizeof(mysql_hdr)); // copy header okpack[4]=0; okpack[13]=0; - memcpy(okpack+5,&stmt_info->statement_id,sizeof(uint32_t)); + if (_stmt_id) { + memcpy(okpack+5,&_stmt_id,sizeof(uint32_t)); + } else { + memcpy(okpack+5,&stmt_info->statement_id,sizeof(uint32_t)); + } memcpy(okpack+9,&stmt_info->num_columns,sizeof(uint16_t)); memcpy(okpack+11,&stmt_info->num_params,sizeof(uint16_t)); memcpy(okpack+14,&stmt_info->warning_count,sizeof(uint16_t)); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index d5c109f3d..6b40ddfe4 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -18,7 +18,11 @@ extern const CHARSET_INFO * proxysql_find_charset_name(const char * const name); extern MySQL_Authentication *GloMyAuth; extern ProxySQL_Admin *GloAdmin; extern MySQL_Logger *GloMyLogger; +#ifndef PROXYSQL_STMT_V14 extern MySQL_STMT_Manager *GloMyStmt; +#else +extern MySQL_STMT_Manager_v14 *GloMyStmt; +#endif Session_Regex::Session_Regex(char *p) { s=strdup(p); @@ -1580,7 +1584,11 @@ bool MySQL_Session::handler_again___status_CHANGING_USER_SERVER(int *_rc) { } // we recreate local_stmts : see issue #752 delete myconn->local_stmts; - myconn->local_stmts=new MySQL_STMTs_local(false); +#ifndef PROXYSQL_STMT_V14 + myconn->local_stmts=new MySQL_STMTs_local(false); // false by default, it is a backend +#else + myconn->local_stmts=new MySQL_STMTs_local_v14(false); // false by default, it is a backend +#endif int rc=myconn->async_change_user(myds->revents); if (rc==0) { __sync_fetch_and_add(&MyHGM->status.backend_change_user, 1); @@ -1957,16 +1965,25 @@ __get_pkts_from_client: break; case _MYSQL_COM_STMT_CLOSE: { +#ifndef PROXYSQL_STMT_V14 uint32_t stmt_global_id=0; memcpy(&stmt_global_id,(char *)pkt.ptr+5,sizeof(uint32_t)); // FIXME: no input validation SLDH->reset(stmt_global_id); sess_STMTs_meta->erase(stmt_global_id); client_myds->myconn->local_stmts->erase(stmt_global_id); +#else + uint32_t client_global_id=0; + memcpy(&client_global_id,(char *)pkt.ptr+5,sizeof(uint32_t)); + // FIXME: no input validation + SLDH->reset(client_global_id); + sess_STMTs_meta->erase(client_global_id); + client_myds->myconn->local_stmts->client_close(client_global_id); +#endif } l_free(pkt.size,pkt.ptr); // FIXME: this is not complete. Counters should be decreased - thread->status_variables.stmt_close++; + thread->status_variables.frontend_stmt_close++; thread->status_variables.queries++; client_myds->DSS=STATE_SLEEP; status=WAITING_CLIENT_DATA; @@ -1993,7 +2010,7 @@ __get_pkts_from_client: status=WAITING_CLIENT_DATA; break; } else { - thread->status_variables.stmt_prepare++; + thread->status_variables.frontend_stmt_prepare++; thread->status_variables.queries++; // if we reach here, we are not on admin bool rc_break=false; @@ -2011,10 +2028,15 @@ __get_pkts_from_client: break; } if (client_myds->myconn->local_stmts==NULL) { +#ifndef PROXYSQL_STMT_V14 client_myds->myconn->local_stmts=new MySQL_STMTs_local(true); +#else + client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true); +#endif } uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); MySQL_STMT_Global_info *stmt_info=NULL; +#ifndef PROXYSQL_STMT_V14 stmt_info=GloMyStmt->find_prepared_statement_by_hash(hash); // find_prepared_statement_by_hash() always increase ref_count_client if (stmt_info) { // FIXME: there is a very interesting race condition here @@ -2047,6 +2069,35 @@ __get_pkts_from_client: mybe->server_myds->mysql_real_query.init(&pkt); // fix memory leak for PREPARE in prepared statements #796 client_myds->setDSS_STATE_QUERY_SENT_NET(); } +#else // PROXYSQL_STMT_V14 + // we first lock GloStmt + GloMyStmt->wrlock(); + stmt_info=GloMyStmt->find_prepared_statement_by_hash(hash,false); + if (stmt_info) { + // the prepared statement exists in GloMyStmt + // for this reason, we do not need to prepare it again, and we can already reply to the client + // we will now generate a unique stmt and send it to the client + uint32_t new_stmt_id=client_myds->myconn->local_stmts->generate_new_client_stmt_id(stmt_info->statement_id); + l_free(pkt.size,pkt.ptr); + client_myds->setDSS_STATE_QUERY_SENT_NET(); + client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info,new_stmt_id); + client_myds->DSS=STATE_SLEEP; + status=WAITING_CLIENT_DATA; + CurrentQuery.end_time=thread->curtime; + CurrentQuery.end(); + } else { + mybe=find_or_create_backend(current_hostgroup); + status=PROCESSING_STMT_PREPARE; + mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure; + mybe->server_myds->wait_until=0; + pause_until=0; + mybe->server_myds->killed_at=0; + mybe->server_myds->mysql_real_query.init(&pkt); // fix memory leak for PREPARE in prepared statements #796 + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } + GloMyStmt->unlock(); + break; // make sure to not break before unlocking GloMyStmt +#endif // PROXYSQL_STMT_V14 } break; case _MYSQL_COM_STMT_EXECUTE: @@ -2059,16 +2110,31 @@ __get_pkts_from_client: break; } else { // if we reach here, we are not on admin - thread->status_variables.stmt_execute++; + thread->status_variables.frontend_stmt_execute++; thread->status_variables.queries++; //bool rc_break=false; +#ifndef PROXYSQL_STMT_V14 uint32_t stmt_global_id=0; memcpy(&stmt_global_id,(char *)pkt.ptr+5,sizeof(uint32_t)); CurrentQuery.stmt_global_id=stmt_global_id; // now we get the statement information MySQL_STMT_Global_info *stmt_info=NULL; stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id); +#else + uint32_t client_stmt_id=0; + uint64_t stmt_global_id=0; + memcpy(&client_stmt_id,(char *)pkt.ptr+5,sizeof(uint32_t)); + stmt_global_id=client_myds->myconn->local_stmts->find_global_stmt_id_from_client(client_stmt_id); + if (stmt_global_id == 0) { + // FIXME: add error handling + assert(0); + } + CurrentQuery.stmt_global_id=stmt_global_id; + // now we get the statement information + MySQL_STMT_Global_info *stmt_info=NULL; + stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(stmt_global_id); +#endif if (stmt_info==NULL) { // we couldn't find it l_free(pkt.size,pkt.ptr); @@ -2329,7 +2395,11 @@ handler_again: goto handler_again; } if (status==PROCESSING_STMT_EXECUTE) { +#ifndef PROXYSQL_STMT_V14 CurrentQuery.mysql_stmt=myconn->local_stmts->find(CurrentQuery.stmt_global_id); +#else + CurrentQuery.mysql_stmt=myconn->local_stmts->find_backend_stmt_by_global_id(CurrentQuery.stmt_global_id); +#endif if (CurrentQuery.mysql_stmt==NULL) { MySQL_STMT_Global_info *stmt_info=NULL; // the conection we too doesn't have the prepared statements prepared @@ -2404,9 +2474,16 @@ handler_again: break; case PROCESSING_STMT_PREPARE: { + thread->status_variables.backend_stmt_prepare++; +#ifndef PROXYSQL_STMT_V14 uint32_t stmid; +#else + uint32_t client_stmtid; + uint64_t global_stmtid; +#endif bool is_new; MySQL_STMT_Global_info *stmt_info=NULL; +#ifndef PROXYSQL_STMT_V14 stmt_info=GloMyStmt->add_prepared_statement(&is_new, current_hostgroup, (char *)client_myds->myconn->userinfo->username, (char *)client_myds->myconn->userinfo->schemaname, @@ -2417,6 +2494,18 @@ handler_again: qpo->timeout, qpo->delay, true); +#else + stmt_info=GloMyStmt->add_prepared_statement(current_hostgroup, + (char *)client_myds->myconn->userinfo->username, + (char *)client_myds->myconn->userinfo->schemaname, + (char *)CurrentQuery.QueryPointer, + CurrentQuery.QueryLength, + CurrentQuery.mysql_stmt, + qpo->cache_ttl, + qpo->timeout, + qpo->delay, + true); +#endif if (CurrentQuery.QueryParserArgs.digest_text) { if (stmt_info->digest_text==NULL) { stmt_info->digest_text=strdup(CurrentQuery.QueryParserArgs.digest_text); @@ -2424,8 +2513,14 @@ handler_again: stmt_info->MyComQueryCmd=CurrentQuery.MyComQueryCmd; // copy MyComQueryCmd } } +#ifndef PROXYSQL_STMT_V14 stmid=stmt_info->statement_id; myds->myconn->local_stmts->insert(stmid,CurrentQuery.mysql_stmt); +#else + global_stmtid=stmt_info->statement_id; + myds->myconn->local_stmts->backend_insert(global_stmtid,CurrentQuery.mysql_stmt); + client_stmtid=client_myds->myconn->local_stmts->generate_new_client_stmt_id(global_stmtid); +#endif CurrentQuery.mysql_stmt=NULL; enum session_status st=status; size_t sts=previous_status.size(); @@ -2436,14 +2531,19 @@ handler_again: previous_status.pop(); NEXT_IMMEDIATE(st); } else { +#ifndef PROXYSQL_STMT_V14 client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info); client_myds->myconn->local_stmts->insert(stmt_info->statement_id,NULL); if (is_new) __sync_fetch_and_sub(&stmt_info->ref_count_client,1); +#else + client_myds->myprot.generate_STMT_PREPARE_RESPONSE(client_myds->pkt_sid+1,stmt_info,client_stmtid); +#endif } } break; case PROCESSING_STMT_EXECUTE: { + thread->status_variables.backend_stmt_execute++; MySQL_Stmt_Result_to_MySQL_wire(CurrentQuery.mysql_stmt, myds->myconn); if (CurrentQuery.stmt_meta) if (CurrentQuery.stmt_meta->pkt) { diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 2cb7ba80a..27f7f796b 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -3062,9 +3062,12 @@ MySQL_Thread::MySQL_Thread() { last_maintenance_time=0; maintenance_loop=true; - status_variables.stmt_prepare=0; - status_variables.stmt_execute=0; - status_variables.stmt_close=0; + status_variables.backend_stmt_prepare=0; + status_variables.backend_stmt_execute=0; + status_variables.backend_stmt_close=0; + status_variables.frontend_stmt_prepare=0; + status_variables.frontend_stmt_execute=0; + status_variables.frontend_stmt_close=0; status_variables.queries=0; status_variables.queries_slow=0; status_variables.queries_backends_bytes_sent=0; @@ -3348,20 +3351,38 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus() { result->add_row(pta); } { // stmt prepare - pta[0]=(char *)"Com_stmt_prepare"; - sprintf(buf,"%llu",get_total_stmt_prepare()); + pta[0]=(char *)"Com_backend_stmt_prepare"; + sprintf(buf,"%llu",get_total_backend_stmt_prepare()); pta[1]=buf; result->add_row(pta); } { // stmt execute - pta[0]=(char *)"Com_stmt_execute"; - sprintf(buf,"%llu",get_total_stmt_execute()); + pta[0]=(char *)"Com_backend_stmt_execute"; + sprintf(buf,"%llu",get_total_backend_stmt_execute()); pta[1]=buf; result->add_row(pta); } { // stmt prepare - pta[0]=(char *)"Com_stmt_close"; - sprintf(buf,"%llu",get_total_stmt_close()); + pta[0]=(char *)"Com_backend_stmt_close"; + sprintf(buf,"%llu",get_total_backend_stmt_close()); + pta[1]=buf; + result->add_row(pta); + } + { // stmt prepare + pta[0]=(char *)"Com_frontend_stmt_prepare"; + sprintf(buf,"%llu",get_total_frontend_stmt_prepare()); + pta[1]=buf; + result->add_row(pta); + } + { // stmt execute + pta[0]=(char *)"Com_frontend_stmt_execute"; + sprintf(buf,"%llu",get_total_frontend_stmt_execute()); + pta[1]=buf; + result->add_row(pta); + } + { // stmt prepare + pta[0]=(char *)"Com_frontend_stmt_close"; + sprintf(buf,"%llu",get_total_frontend_stmt_close()); pta[1]=buf; result->add_row(pta); } @@ -3824,40 +3845,79 @@ unsigned long long MySQL_Threads_Handler::get_total_mirror_queue() { return q; } -unsigned long long MySQL_Threads_Handler::get_total_stmt_prepare() { +unsigned long long MySQL_Threads_Handler::get_total_backend_stmt_prepare() { + unsigned long long q=0; + unsigned int i; + for (i=0;istatus_variables.backend_stmt_prepare,0); + } + } + return q; +} + +unsigned long long MySQL_Threads_Handler::get_total_backend_stmt_execute() { + unsigned long long q=0; + unsigned int i; + for (i=0;istatus_variables.backend_stmt_execute,0); + } + } + return q; +} + +unsigned long long MySQL_Threads_Handler::get_total_backend_stmt_close() { + unsigned long long q=0; + unsigned int i; + for (i=0;istatus_variables.backend_stmt_close,0); + } + } + return q; +} + +unsigned long long MySQL_Threads_Handler::get_total_frontend_stmt_prepare() { unsigned long long q=0; unsigned int i; for (i=0;istatus_variables.stmt_prepare,0); + q+=__sync_fetch_and_add(&thr->status_variables.frontend_stmt_prepare,0); } } return q; } -unsigned long long MySQL_Threads_Handler::get_total_stmt_execute() { +unsigned long long MySQL_Threads_Handler::get_total_frontend_stmt_execute() { unsigned long long q=0; unsigned int i; for (i=0;istatus_variables.stmt_execute,0); + q+=__sync_fetch_and_add(&thr->status_variables.frontend_stmt_execute,0); } } return q; } -unsigned long long MySQL_Threads_Handler::get_total_stmt_close() { +unsigned long long MySQL_Threads_Handler::get_total_frontend_stmt_close() { unsigned long long q=0; unsigned int i; for (i=0;istatus_variables.stmt_close,0); + q+=__sync_fetch_and_add(&thr->status_variables.frontend_stmt_close,0); } } return q; diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 59da9a3e2..77c5512be 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -134,7 +134,11 @@ extern ProxySQL_Admin *GloAdmin; extern Query_Processor *GloQPro; extern MySQL_Threads_Handler *GloMTH; extern MySQL_Logger *GloMyLogger; +#ifndef PROXYSQL_STMT_V14 extern MySQL_STMT_Manager *GloMyStmt; +#else +extern MySQL_STMT_Manager_v14 *GloMyStmt; +#endif extern MySQL_Monitor *GloMyMon; #define PANIC(msg) { perror(msg); exit(EXIT_FAILURE); } @@ -3323,6 +3327,7 @@ void ProxySQL_Admin::stats___mysql_global() { statsdb->execute(query); free(query); +#ifndef PROXYSQL_STMT_V14 if (GloMyStmt) { uint32_t stmt_active_unique=0; uint32_t stmt_active_total=0; @@ -3346,6 +3351,53 @@ void ProxySQL_Admin::stats___mysql_global() { statsdb->execute(query); free(query); } +#else + if (GloMyStmt) { + uint64_t stmt_client_active_unique = 0; + uint64_t stmt_client_active_total = 0; + uint64_t stmt_max_stmt_id = 0; + uint64_t stmt_cached = 0; + uint64_t stmt_server_active_unique = 0; + uint64_t stmt_server_active_total = 0; + GloMyStmt->get_metrics(&stmt_client_active_unique,&stmt_client_active_total,&stmt_max_stmt_id,&stmt_cached,&stmt_server_active_unique,&stmt_server_active_total); + vn=(char *)"Stmt_Client_Active_Total"; + sprintf(bu,"%lu",stmt_client_active_total); + query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16); + sprintf(query,a,vn,bu); + statsdb->execute(query); + free(query); + vn=(char *)"Stmt_Client_Active_Unique"; + sprintf(bu,"%lu",stmt_client_active_unique); + query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16); + sprintf(query,a,vn,bu); + statsdb->execute(query); + free(query); + vn=(char *)"Stmt_Server_Active_Total"; + sprintf(bu,"%lu",stmt_server_active_total); + query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16); + sprintf(query,a,vn,bu); + statsdb->execute(query); + free(query); + vn=(char *)"Stmt_Server_Active_Unique"; + sprintf(bu,"%lu",stmt_server_active_unique); + query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16); + sprintf(query,a,vn,bu); + statsdb->execute(query); + free(query); + vn=(char *)"Stmt_Max_Stmt_id"; + sprintf(bu,"%lu",stmt_max_stmt_id); + query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16); + sprintf(query,a,vn,bu); + statsdb->execute(query); + free(query); + vn=(char *)"Stmt_Cached"; + sprintf(bu,"%lu",stmt_cached); + query=(char *)malloc(strlen(a)+strlen(vn)+strlen(bu)+16); + sprintf(query,a,vn,bu); + statsdb->execute(query); + free(query); + } +#endif resultset=GloQC->SQL3_getStats(); if (resultset) { diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index a23aadc75..c1b972f20 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -181,7 +181,11 @@ MySQL_Connection::MySQL_Connection() { creation_time=0; processing_multi_statement=false; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this); +#ifndef PROXYSQL_STMT_V14 local_stmts=new MySQL_STMTs_local(false); // false by default, it is a backend +#else + local_stmts=new MySQL_STMTs_local_v14(false); // false by default, it is a backend +#endif }; MySQL_Connection::~MySQL_Connection() { @@ -1628,5 +1632,9 @@ void MySQL_Connection::reset() { reusable=true; options.last_set_autocommit=-1; // never sent delete local_stmts; +#ifndef PROXYSQL_STMT_V14 local_stmts=new MySQL_STMTs_local(false); +#else + local_stmts=new MySQL_STMTs_local_v14(false); +#endif } diff --git a/src/main.cpp b/src/main.cpp index 76f842690..6026d8f1d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -148,7 +148,11 @@ Query_Processor *GloQPro; ProxySQL_Admin *GloAdmin; MySQL_Threads_Handler *GloMTH; +#ifndef PROXYSQL_STMT_V14 MySQL_STMT_Manager *GloMyStmt; +#else +MySQL_STMT_Manager_v14 *GloMyStmt; +#endif MySQL_Monitor *GloMyMon; std::thread *MyMon_thread; @@ -272,7 +276,11 @@ void ProxySQL_Main_init_main_modules() { MyHGM=new MySQL_HostGroups_Manager(); GloMTH=new MySQL_Threads_Handler(); GloMyLogger = new MySQL_Logger(); +#ifndef PROXYSQL_STMT_V14 GloMyStmt=new MySQL_STMT_Manager(); +#else + GloMyStmt=new MySQL_STMT_Manager_v14(); +#endif }