diff --git a/include/MySQL_PreparedStatement.h b/include/MySQL_PreparedStatement.h index a5f98582c..d2048b65d 100644 --- a/include/MySQL_PreparedStatement.h +++ b/include/MySQL_PreparedStatement.h @@ -51,7 +51,7 @@ class MySQL_STMT_Global_info { char *schemaname; char *query; unsigned int query_length; - unsigned int hostgroup_id; +// unsigned int hostgroup_id; int ref_count_client; int ref_count_server; uint64_t statement_id; @@ -59,14 +59,14 @@ class MySQL_STMT_Global_info { uint16_t num_params; uint16_t warning_count; MYSQL_FIELD **fields; - struct { - int cache_ttl; - int timeout; - int delay; - } properties; +// struct { +// int cache_ttl; +// int timeout; +// int delay; +// } properties; bool is_select_NOT_for_update; MYSQL_BIND **params; // seems unused (?) - MySQL_STMT_Global_info(uint64_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h); + MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h); void update_metadata(MYSQL_STMT *stmt); ~MySQL_STMT_Global_info(); }; @@ -217,7 +217,7 @@ class MySQL_STMTs_local_v14 { return is_client_; } void backend_insert(uint64_t global_statement_id, MYSQL_STMT *stmt); - uint64_t compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length); + uint64_t compute_hash(char *user, char *schema, char *query, unsigned int query_length); unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); } uint32_t generate_new_client_stmt_id(uint64_t global_statement_id); uint64_t find_global_stmt_id_from_client(uint32_t client_stmt_id); @@ -259,7 +259,7 @@ class MySQL_STMT_Manager_v14 { void unlock() { pthread_rwlock_unlock(&rwlock_); } void ref_count_client(uint64_t _stmt, int _v, bool lock=true); void ref_count_server(uint64_t _stmt, int _v, bool lock=true); - MySQL_STMT_Global_info * add_prepared_statement(unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock=true); + MySQL_STMT_Global_info * add_prepared_statement(char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock=true); void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached, uint64_t *s_unique, uint64_t *s_total); SQLite3_result * get_prepared_statements_global_infos(); }; diff --git a/lib/MySQL_PreparedStatement.cpp b/lib/MySQL_PreparedStatement.cpp index 14a906c31..1f68eea22 100644 --- a/lib/MySQL_PreparedStatement.cpp +++ b/lib/MySQL_PreparedStatement.cpp @@ -12,11 +12,10 @@ extern MySQL_STMT_Manager_v14 *GloMyStmt; //#endif -static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user, +static uint64_t stmt_compute_hash(char *user, char *schema, char *query, unsigned int query_length) { int l = 0; - l += sizeof(hostgroup); l += strlen(user); l += strlen(schema); // two random seperators @@ -27,9 +26,6 @@ static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user, l += query_length; char *buf = (char *)malloc(l); l = 0; - // write hostgroup - memcpy(buf, &hostgroup, sizeof(hostgroup)); - l += sizeof(hostgroup); // write user strcpy(buf + l, user); @@ -57,7 +53,7 @@ static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user, } void MySQL_STMT_Global_info::compute_hash() { - hash = stmt_compute_hash(hostgroup_id, username, schemaname, query, + hash = stmt_compute_hash(username, schemaname, query, query_length); } @@ -134,13 +130,12 @@ void *StmtLongDataHandler::get(uint32_t _stmt_id, uint16_t _param_id, return NULL; } -MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, unsigned int h, +MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h) { pthread_rwlock_init(&rwlock_, NULL); statement_id = id; - hostgroup_id = h; ref_count_client = 0; ref_count_server = 0; digest_text = NULL; @@ -248,9 +243,9 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, unsigned int h, __exit_MySQL_STMT_Global_info___search_select: // set default properties: - properties.cache_ttl = -1; - properties.timeout = -1; - properties.delay = -1; +// properties.cache_ttl = -1; +// properties.timeout = -1; +// properties.delay = -1; fields = NULL; if (num_columns) { @@ -545,11 +540,11 @@ void MySQL_STMTs_local_v14::backend_insert(uint64_t global_statement_id, MYSQL_S // GloMyStmt->ref_count_client(global_statement_id, 1); } -uint64_t MySQL_STMTs_local_v14::compute_hash(unsigned int hostgroup, char *user, +uint64_t MySQL_STMTs_local_v14::compute_hash(char *user, char *schema, char *query, unsigned int query_length) { uint64_t hash; - hash = stmt_compute_hash(hostgroup, user, schema, query, query_length); + hash = stmt_compute_hash(user, schema, query, query_length); return hash; } @@ -816,11 +811,11 @@ bool MySQL_STMTs_local_v14::client_close(uint32_t client_statement_id) { } MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement( - unsigned int _h, char *u, char *s, char *q, unsigned int ql, - MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock) { + char *u, char *s, char *q, unsigned int ql, + MYSQL_STMT *stmt, bool lock) { MySQL_STMT_Global_info *ret = NULL; uint64_t hash = stmt_compute_hash( - _h, u, s, q, ql); // this identifies the prepared statement + u, s, q, ql); // this identifies the prepared statement if (lock) { pthread_rwlock_wrlock(&rwlock_); } @@ -852,10 +847,7 @@ MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement( //next_statement_id++; MySQL_STMT_Global_info *a = - new MySQL_STMT_Global_info(next_id, _h, u, s, q, ql, stmt, hash); - a->properties.cache_ttl = _cache_ttl; - a->properties.timeout = _timeout; - a->properties.delay = _delay; + new MySQL_STMT_Global_info(next_id, u, s, q, ql, stmt, hash); // insert it in both maps map_stmt_id_to_info.insert(std::make_pair(a->statement_id, a)); map_stmt_hash_to_info.insert(std::make_pair(a->hash, a)); @@ -936,16 +928,14 @@ void MySQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total, class PS_global_stats { public: uint64_t statement_id; - unsigned int hid; char *username; char *schemaname; uint64_t digest; unsigned long long ref_count_client; unsigned long long ref_count_server; char *query; - PS_global_stats(uint64_t stmt_id, unsigned int h, char *s, char *u, uint64_t d, char *q, unsigned long long ref_c, unsigned long long ref_s) { + PS_global_stats(uint64_t stmt_id, char *s, char *u, uint64_t d, char *q, unsigned long long ref_c, unsigned long long ref_s) { statement_id = stmt_id; - hid=h; digest=d; query=strndup(q, mysql_thread___query_digests_max_digest_length); username=strdup(u); @@ -969,31 +959,29 @@ class PS_global_stats { } char **get_row() { char buf[128]; - char **pta=(char **)malloc(sizeof(char *)*8); + char **pta=(char **)malloc(sizeof(char *)*7); sprintf(buf,"%lu",statement_id); pta[0]=strdup(buf); - sprintf(buf,"%u",hid); - pta[1]=strdup(buf); assert(schemaname); - pta[2]=strdup(schemaname); + pta[1]=strdup(schemaname); assert(username); - pta[3]=strdup(username); + pta[2]=strdup(username); sprintf(buf,"0x%016llX", (long long unsigned int)digest); - pta[4]=strdup(buf); + pta[3]=strdup(buf); assert(query); - pta[5]=strdup(query); + pta[4]=strdup(query); sprintf(buf,"%llu",ref_count_client); - pta[6]=strdup(buf); + pta[5]=strdup(buf); sprintf(buf,"%llu",ref_count_server); - pta[7]=strdup(buf); + pta[6]=strdup(buf); return pta; } void free_row(char **pta) { int i; - for (i=0;i<8;i++) { + for (i=0;i<7;i++) { assert(pta[i]); free(pta[i]); } @@ -1004,10 +992,9 @@ class PS_global_stats { SQLite3_result * MySQL_STMT_Manager_v14::get_prepared_statements_global_infos() { proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current prepared statements global info\n"); - SQLite3_result *result=new SQLite3_result(8); + SQLite3_result *result=new SQLite3_result(7); rdlock(); result->add_column_definition(SQLITE_TEXT,"stmt_id"); - result->add_column_definition(SQLITE_TEXT,"hid"); result->add_column_definition(SQLITE_TEXT,"schemaname"); result->add_column_definition(SQLITE_TEXT,"username"); result->add_column_definition(SQLITE_TEXT,"digest"); @@ -1017,7 +1004,7 @@ SQLite3_result * MySQL_STMT_Manager_v14::get_prepared_statements_global_infos() for (std::map::iterator it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) { MySQL_STMT_Global_info *a = it->second; - PS_global_stats * pgs = new PS_global_stats(a->statement_id, a->hostgroup_id, + PS_global_stats * pgs = new PS_global_stats(a->statement_id, a->schemaname, a->username, a->hash, a->query, a->ref_count_client, a->ref_count_server); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index e8770540c..081e8cf43 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -3104,7 +3104,18 @@ __get_pkts_from_client: // because the offset will be identical CurrentQuery.begin((unsigned char *)pkt.ptr,pkt.size,true); + timespec begint; + timespec endt; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint); + } qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec*1000000000+endt.tv_nsec) - + (begint.tv_sec*1000000000+begint.tv_nsec); + } assert(qpo); // GloQPro->process_mysql_query() should always return a qpo rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup); if (rc_break==true) { @@ -3143,7 +3154,7 @@ __get_pkts_from_client: if (client_myds->myconn->local_stmts==NULL) { client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true); } - uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); + uint64_t hash=client_myds->myconn->local_stmts->compute_hash((char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength); MySQL_STMT_Global_info *stmt_info=NULL; // we first lock GloStmt GloMyStmt->wrlock(); @@ -3187,9 +3198,10 @@ __get_pkts_from_client: break; } else { // if we reach here, we are on MySQL module + bool rc_break=false; + bool lock_hostgroup = false; thread->status_variables.stvar[st_var_frontend_stmt_execute]++; thread->status_variables.stvar[st_var_queries]++; - //bool rc_break=false; uint32_t client_stmt_id=0; uint64_t stmt_global_id=0; @@ -3215,6 +3227,22 @@ __get_pkts_from_client: CurrentQuery.stmt_info=stmt_info; CurrentQuery.start_time=thread->curtime; + timespec begint; + timespec endt; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint); + } + qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery); + if (qpo->max_lag_ms >= 0) { + thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; + } + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec*1000000000+endt.tv_nsec) - + (begint.tv_sec*1000000000+begint.tv_nsec); + } + assert(qpo); // GloQPro->process_mysql_query() should always return a qpo // we now take the metadata associated with STMT_EXECUTE from MySQL_STMTs_meta bool stmt_meta_found=true; // let's be optimistic and we assume we will found it stmt_execute_metadata_t *stmt_meta=sess_STMTs_meta->find(stmt_global_id); @@ -3240,10 +3268,41 @@ __get_pkts_from_client: // else CurrentQuery.stmt_meta=stmt_meta; -// assert(qpo); // GloQPro->process_mysql_query() should always return a qpo - // NOTE: we do not call YET the follow function for STMT_EXECUTE - //rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt); - current_hostgroup=stmt_info->hostgroup_id; + //current_hostgroup=qpo->destination_hostgroup; + rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, true); + if (rc_break==true) { + break; + } + if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6 + if (locked_on_hostgroup < 0) { + if (lock_hostgroup) { + // we are locking on hostgroup now + locked_on_hostgroup = current_hostgroup; + } + } + if (locked_on_hostgroup >= 0) { + if (current_hostgroup != locked_on_hostgroup) { + client_myds->DSS=STATE_QUERY_SENT_NET; + //int l = CurrentQuery.QueryLength; + int l = CurrentQuery.stmt_info->query_length; + char *end = (char *)""; + if (l>256) { + l=253; + end = (char *)"..."; + } + string nqn = string((char *)CurrentQuery.stmt_info->query,l); + char *err_msg = (char *)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s"; + char *buf = (char *)malloc(strlen(err_msg)+strlen(nqn.c_str())+strlen(end)+64); + sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); + client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true); + thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; + RequestEnd(NULL); + free(buf); + l_free(pkt.size,pkt.ptr); + break; + } + } + } mybe=find_or_create_backend(current_hostgroup); status=PROCESSING_STMT_EXECUTE; mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure; @@ -3676,15 +3735,12 @@ handler_again: uint64_t global_stmtid; //bool is_new; MySQL_STMT_Global_info *stmt_info=NULL; - stmt_info=GloMyStmt->add_prepared_statement(current_hostgroup, + stmt_info=GloMyStmt->add_prepared_statement( (char *)client_myds->myconn->userinfo->username, (char *)client_myds->myconn->userinfo->schemaname, (char *)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, CurrentQuery.mysql_stmt, - qpo->cache_ttl, - qpo->timeout, - qpo->delay, false); if (CurrentQuery.QueryParserArgs.digest_text) { if (stmt_info->digest_text==NULL) { diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 7cd7cf44b..f668cc2dc 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -497,7 +497,7 @@ static int http_handler(void *cls, struct MHD_Connection *connection, const char #endif /* PROXYSQLCLICKHOUSE */ -#define ADMIN_SQLITE_TABLE_STATS_MYSQL_PREPARED_STATEMENTS_INFO "CREATE TABLE stats_mysql_prepared_statements_info (global_stmt_id INT NOT NULL, hostgroup INT NOT NULL , schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , ref_count_client INT NOT NULL , ref_count_server INT NOT NULL , query VARCHAR NOT NULL)" +#define ADMIN_SQLITE_TABLE_STATS_MYSQL_PREPARED_STATEMENTS_INFO "CREATE TABLE stats_mysql_prepared_statements_info (global_stmt_id INT NOT NULL , schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , ref_count_client INT NOT NULL , ref_count_server INT NOT NULL , query VARCHAR NOT NULL)" static char * admin_variables_names[]= { (char *)"admin_credentials", @@ -11436,8 +11436,8 @@ void ProxySQL_Admin::stats___mysql_prepared_statements_info() { char *query1=NULL; char *query32=NULL; statsdb->execute("DELETE FROM stats_mysql_prepared_statements_info"); - query1=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"; - query32=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8), (?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16), (?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24), (?25, ?26, ?27, ?28, ?29, ?30, ?31, ?32), (?33, ?34, ?35, ?36, ?37, ?38, ?39, ?40), (?41, ?42, ?43, ?44, ?45, ?46, ?47, ?48), (?49, ?50, ?51, ?52, ?53, ?54, ?55, ?56), (?57, ?58, ?59, ?60, ?61, ?62, ?63, ?64), (?65, ?66, ?67, ?68, ?69, ?70, ?71, ?72), (?73, ?74, ?75, ?76, ?77, ?78, ?79, ?80), (?81, ?82, ?83, ?84, ?85, ?86, ?87, ?88), (?89, ?90, ?91, ?92, ?93, ?94, ?95, ?96), (?97, ?98, ?99, ?100, ?101, ?102, ?103, ?104), (?105, ?106, ?107, ?108, ?109, ?110, ?111, ?112), (?113, ?114, ?115, ?116, ?117, ?118, ?119, ?120), (?121, ?122, ?123, ?124, ?125, ?126, ?127, ?128), (?129, ?130, ?131, ?132, ?133, ?134, ?135, ?136), (?137, ?138, ?139, ?140, ?141, ?142, ?143, ?144), (?145, ?146, ?147, ?148, ?149, ?150, ?151, ?152), (?153, ?154, ?155, ?156, ?157, ?158, ?159, ?160), (?161, ?162, ?163, ?164, ?165, ?166, ?167, ?168), (?169, ?170, ?171, ?172, ?173, ?174, ?175, ?176), (?177, ?178, ?179, ?180, ?181, ?182, ?183, ?184), (?185, ?186, ?187, ?188, ?189, ?190, ?191, ?192), (?193, ?194, ?195, ?196, ?197, ?198, ?199, ?200), (?201, ?202, ?203, ?204, ?205, ?206, ?207, ?208), (?209, ?210, ?211, ?212, ?213, ?214, ?215, ?216), (?217, ?218, ?219, ?220, ?221, ?222, ?223, ?224), (?225, ?226, ?227, ?228, ?229, ?230, ?231, ?232), (?233, ?234, ?235, ?236, ?237, ?238, ?239, ?240), (?241, ?242, ?243, ?244, ?245, ?246, ?247, ?248), (?249, ?250, ?251, ?252, ?253, ?254, ?255, ?256)"; + query1=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"; + query32=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1,?2,?3,?4,?5,?6,?7),(?8,?9,?10,?11,?12,?13,?14),(?15,?16,?17,?18,?19,?20,?21),(?22,?23,?24,?25,?26,?27,?28),(?29,?30,?31,?32,?33,?34,?35),(?36,?37,?38,?39,?40,?41,?42),(?43,?44,?45,?46,?47,?48,?49),(?50,?51,?52,?53,?54,?55,?56),(?57,?58,?59,?60,?61,?62,?63),(?64,?65,?66,?67,?68,?69,?70),(?71,?72,?73,?74,?75,?76,?77),(?78,?79,?80,?81,?82,?83,?84),(?85,?86,?87,?88,?89,?90,?91),(?92,?93,?94,?95,?96,?97,?98),(?99,?100,?101,?102,?103,?104,?105),(?106,?107,?108,?109,?110,?111,?112),(?113,?114,?115,?116,?117,?118,?119),(?120,?121,?122,?123,?124,?125,?126),(?127,?128,?129,?130,?131,?132,?133),(?134,?135,?136,?137,?138,?139,?140),(?141,?142,?143,?144,?145,?146,?147),(?148,?149,?150,?151,?152,?153,?154),(?155,?156,?157,?158,?159,?160,?161),(?162,?163,?164,?165,?166,?167,?168),(?169,?170,?171,?172,?173,?174,?175),(?176,?177,?178,?179,?180,?181,?182),(?183,?184,?185,?186,?187,?188,?189),(?190,?191,?192,?193,?194,?195,?196),(?197,?198,?199,?200,?201,?202,?203),(?204,?205,?206,?207,?208,?209,?210),(?211,?212,?213,?214,?215,?216,?217),(?218,?219,?220,?221,?222,?223,?224)"; //rc=sqlite3_prepare_v2(mydb3, query1, -1, &statement1, 0); rc = statsdb->prepare_v2(query1, &statement1); ASSERT_SQLITE_OK(rc, statsdb); @@ -11451,14 +11451,13 @@ void ProxySQL_Admin::stats___mysql_prepared_statements_info() { SQLite3_row *r1=*it; int idx=row_idx%32; if (row_idxfields[0])); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_int64(statement32, (idx*8)+2, atoll(r1->fields[1])); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_text(statement32, (idx*8)+3, r1->fields[2], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_text(statement32, (idx*8)+4, r1->fields[3], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_text(statement32, (idx*8)+5, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_int64(statement32, (idx*8)+6, atoll(r1->fields[6])); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_int64(statement32, (idx*8)+7, atoll(r1->fields[7])); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_text(statement32, (idx*8)+8, r1->fields[5], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_int64(statement32, (idx*7)+1, atoll(r1->fields[0])); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_text(statement32, (idx*7)+2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_text(statement32, (idx*7)+3, r1->fields[2], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_text(statement32, (idx*7)+4, r1->fields[3], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_int64(statement32, (idx*7)+5, atoll(r1->fields[5])); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_int64(statement32, (idx*7)+6, atoll(r1->fields[6])); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_text(statement32, (idx*7)+7, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); if (idx==31) { SAFE_SQLITE3_STEP2(statement32); rc=sqlite3_clear_bindings(statement32); ASSERT_SQLITE_OK(rc, statsdb); @@ -11466,13 +11465,12 @@ void ProxySQL_Admin::stats___mysql_prepared_statements_info() { } } else { // single row rc=sqlite3_bind_int64(statement1, 1, atoll(r1->fields[0])); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_int64(statement1, 2, atoll(r1->fields[1])); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_text(statement1, 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); rc=sqlite3_bind_text(statement1, 3, r1->fields[2], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); rc=sqlite3_bind_text(statement1, 4, r1->fields[3], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_text(statement1, 5, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_int64(statement1, 5, atoll(r1->fields[5])); ASSERT_SQLITE_OK(rc, statsdb); rc=sqlite3_bind_int64(statement1, 6, atoll(r1->fields[6])); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_int64(statement1, 7, atoll(r1->fields[7])); ASSERT_SQLITE_OK(rc, statsdb); - rc=sqlite3_bind_text(statement1, 8, r1->fields[5], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); + rc=sqlite3_bind_text(statement1, 7, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb); SAFE_SQLITE3_STEP2(statement1); rc=sqlite3_clear_bindings(statement1); ASSERT_SQLITE_OK(rc, statsdb); rc=sqlite3_reset(statement1); ASSERT_SQLITE_OK(rc, statsdb); diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index 4d8eee466..27776b860 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -1290,26 +1290,44 @@ SQLite3_result * Query_Processor::get_query_digests_reset() { } - Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *sess, void *ptr, unsigned int size, Query_Info *qi) { + // NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE // to avoid unnecssary deallocation/allocation, we initialize qpo witout new allocation Query_Processor_Output *ret=sess->qpo; ret->init(); + + + SQP_par_t stmt_exec_qp; SQP_par_t *qp=NULL; if (qi) { - qp=(SQP_par_t *)&qi->QueryParserArgs; + // NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE + if (ptr) { + qp=(SQP_par_t *)&qi->QueryParserArgs; + } else { + qp=&stmt_exec_qp; + qp->digest = qi->stmt_info->digest; + qp->digest_text = qi->stmt_info->digest_text; + qp->first_comment = NULL; + } } #define stackbuffer_size 128 char stackbuffer[stackbuffer_size]; - unsigned int len=size-sizeof(mysql_hdr)-1; + unsigned int len=0; char *query=NULL; - if (len < stackbuffer_size) { - query=stackbuffer; + // NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE + if (ptr) { + len = size-sizeof(mysql_hdr)-1; + if (len < stackbuffer_size) { + query=stackbuffer; + } else { + query=(char *)l_alloc(len+1); + } + memcpy(query,(char *)ptr+sizeof(mysql_hdr)+1,len); + query[len]=0; } else { - query=(char *)l_alloc(len+1); + query = qi->stmt_info->query; + len = qi->stmt_info->query_length; } - memcpy(query,(char *)ptr+sizeof(mysql_hdr)+1,len); - query[len]=0; if (__sync_add_and_fetch(&version,0) > _thr_SQP_version) { // update local rules; proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Detected a changed in version. Global:%d , local:%d . Refreshing...\n", version, _thr_SQP_version); @@ -1524,112 +1542,113 @@ __internal_loop: flagIN=qr->flagOUT; set_flagOUT=true; //sess->query_info.flagOUT=flagIN; - } - if (qr->reconnect >= 0) { + } + if (qr->reconnect >= 0) { // Note: negative reconnect means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set reconnect: %d. Query will%s be rexecuted if connection is lost\n", qr->rule_id, qr->reconnect, (qr->reconnect == 0 ? " NOT" : "" )); - ret->reconnect=qr->reconnect; - } - if (qr->timeout >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set reconnect: %d. Query will%s be rexecuted if connection is lost\n", qr->rule_id, qr->reconnect, (qr->reconnect == 0 ? " NOT" : "" )); + ret->reconnect=qr->reconnect; + } + if (qr->timeout >= 0) { // Note: negative timeout means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set timeout: %d. Query will%s be interrupted if exceeding %dms\n", qr->rule_id, qr->timeout, (qr->timeout == 0 ? " NOT" : "" ) , qr->timeout); - ret->timeout=qr->timeout; - } - if (qr->retries >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set timeout: %d. Query will%s be interrupted if exceeding %dms\n", qr->rule_id, qr->timeout, (qr->timeout == 0 ? " NOT" : "" ) , qr->timeout); + ret->timeout=qr->timeout; + } + if (qr->retries >= 0) { // Note: negative retries means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set retries: %d. Query will%s be re-executed %d times in case of failure\n", qr->rule_id, qr->retries); - ret->retries=qr->retries; - } - if (qr->delay >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set retries: %d. Query will%s be re-executed %d times in case of failure\n", qr->rule_id, qr->retries); + ret->retries=qr->retries; + } + if (qr->delay >= 0) { // Note: negative delay means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set delay: %d. Session will%s be paused for %dms\n", qr->rule_id, qr->delay, (qr->delay == 0 ? " NOT" : "" ) , qr->delay); - ret->delay=qr->delay; - } - if (qr->next_query_flagIN >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set delay: %d. Session will%s be paused for %dms\n", qr->rule_id, qr->delay, (qr->delay == 0 ? " NOT" : "" ) , qr->delay); + ret->delay=qr->delay; + } + if (qr->next_query_flagIN >= 0) { // Note: Negative next_query_flagIN means this rule doesn't change the next query flagIN - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set next query flagIN: %d\n", qr->rule_id, qr->next_query_flagIN); - ret->next_query_flagIN=qr->next_query_flagIN; - } - if (qr->mirror_flagOUT >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set next query flagIN: %d\n", qr->rule_id, qr->next_query_flagIN); + ret->next_query_flagIN=qr->next_query_flagIN; + } + if (qr->mirror_flagOUT >= 0) { // Note: negative mirror_flagOUT means this rule doesn't change the mirror flagOUT - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror flagOUT: %d\n", qr->rule_id, qr->mirror_flagOUT); - ret->mirror_flagOUT=qr->mirror_flagOUT; - } - if (qr->mirror_hostgroup >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror flagOUT: %d\n", qr->rule_id, qr->mirror_flagOUT); + ret->mirror_flagOUT=qr->mirror_flagOUT; + } + if (qr->mirror_hostgroup >= 0) { // Note: negative mirror_hostgroup means this rule doesn't change the mirror - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror hostgroup: %d. A new session will be created\n", qr->rule_id, qr->mirror_hostgroup); - ret->mirror_hostgroup=qr->mirror_hostgroup; - } - if (qr->error_msg) { - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->error_msg); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror hostgroup: %d. A new session will be created\n", qr->rule_id, qr->mirror_hostgroup); + ret->mirror_hostgroup=qr->mirror_hostgroup; + } + if (qr->error_msg) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->error_msg); //proxy_warning("User \"%s\" has issued query that has been filtered: %s \n " , sess->client_myds->myconn->userinfo->username, query); - ret->error_msg=strdup(qr->error_msg); - } - if (qr->OK_msg) { - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->OK_msg); + ret->error_msg=strdup(qr->error_msg); + } + if (qr->OK_msg) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->OK_msg); //proxy_warning("User \"%s\" has issued query that has been filtered: %s \n " , sess->client_myds->myconn->userinfo->username, query); - ret->OK_msg=strdup(qr->OK_msg); - } - if (qr->cache_ttl >= 0) { + ret->OK_msg=strdup(qr->OK_msg); + } + if (qr->cache_ttl >= 0) { // Note: negative TTL means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_ttl: %d. Query will%s hit the cache\n", qr->rule_id, qr->cache_ttl, (qr->cache_ttl == 0 ? " NOT" : "" )); - ret->cache_ttl=qr->cache_ttl; - } - if (qr->cache_empty_result >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_ttl: %d. Query will%s hit the cache\n", qr->rule_id, qr->cache_ttl, (qr->cache_ttl == 0 ? " NOT" : "" )); + ret->cache_ttl=qr->cache_ttl; + } + if (qr->cache_empty_result >= 0) { // Note: negative value means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_empty_result: %d. Query with empty result will%s hit the cache\n", qr->rule_id, qr->cache_empty_result, (qr->cache_empty_result == 0 ? " NOT" : "" )); - ret->cache_empty_result=qr->cache_empty_result; - } - if (qr->cache_timeout >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_empty_result: %d. Query with empty result will%s hit the cache\n", qr->rule_id, qr->cache_empty_result, (qr->cache_empty_result == 0 ? " NOT" : "" )); + ret->cache_empty_result=qr->cache_empty_result; + } + if (qr->cache_timeout >= 0) { // Note: negative value means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_timeout: %dms. Query will wait up resulset to be avaiable in query cache before running on backend\n", qr->rule_id, qr->cache_timeout); - ret->cache_timeout=qr->cache_timeout; - } - if (qr->sticky_conn >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_timeout: %dms. Query will wait up resulset to be avaiable in query cache before running on backend\n", qr->rule_id, qr->cache_timeout); + ret->cache_timeout=qr->cache_timeout; + } + if (qr->sticky_conn >= 0) { // Note: negative sticky_conn means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set sticky_conn: %d. Connection will%s stick\n", qr->rule_id, qr->sticky_conn, (qr->sticky_conn == 0 ? " NOT" : "" )); - ret->sticky_conn=qr->sticky_conn; - } - if (qr->multiplex >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set sticky_conn: %d. Connection will%s stick\n", qr->rule_id, qr->sticky_conn, (qr->sticky_conn == 0 ? " NOT" : "" )); + ret->sticky_conn=qr->sticky_conn; + } + if (qr->multiplex >= 0) { // Note: negative multiplex means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set multiplex: %d. Connection will%s multiplex\n", qr->rule_id, qr->multiplex, (qr->multiplex == 0 ? " NOT" : "" )); - ret->multiplex=qr->multiplex; - } - if (qr->gtid_from_hostgroup >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set multiplex: %d. Connection will%s multiplex\n", qr->rule_id, qr->multiplex, (qr->multiplex == 0 ? " NOT" : "" )); + ret->multiplex=qr->multiplex; + } + if (qr->gtid_from_hostgroup >= 0) { // Note: negative gtid_from_hostgroup means this rule doesn't change the gtid_from_hostgroup - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set gtid from hostgroup: %d. A new session will be created\n", qr->rule_id, qr->gtid_from_hostgroup); - ret->gtid_from_hostgroup = qr->gtid_from_hostgroup; - } - if (qr->log >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set gtid from hostgroup: %d. A new session will be created\n", qr->rule_id, qr->gtid_from_hostgroup); + ret->gtid_from_hostgroup = qr->gtid_from_hostgroup; + } + if (qr->log >= 0) { // Note: negative log means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set log: %d. Query will%s logged\n", qr->rule_id, qr->log, (qr->log == 0 ? " NOT" : "" )); - ret->log=qr->log; - } - if (qr->destination_hostgroup >= 0) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set log: %d. Query will%s logged\n", qr->rule_id, qr->log, (qr->log == 0 ? " NOT" : "" )); + ret->log=qr->log; + } + if (qr->destination_hostgroup >= 0) { // Note: negative hostgroup means this rule doesn't change - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set destination hostgroup: %d\n", qr->rule_id, qr->destination_hostgroup); - ret->destination_hostgroup=qr->destination_hostgroup; - } - - if (qr->replace_pattern) { - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d on match_pattern \"%s\" has a replace_pattern \"%s\" to apply\n", qr->rule_id, qr->match_pattern, qr->replace_pattern); - if (ret->new_query==NULL) ret->new_query=new std::string(query); - re2_t *re2p=(re2_t *)qr->regex_engine2; - if (re2p->re2) { - //RE2::Replace(ret->new_query,qr->match_pattern,qr->replace_pattern); - if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) { - re2p->re2->GlobalReplace(ret->new_query,qr->match_pattern,qr->replace_pattern); - } else { - re2p->re2->Replace(ret->new_query,qr->match_pattern,qr->replace_pattern); - } - } else { - //re2p->re1->Replace(ret->new_query,qr->replace_pattern); - if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) { - re2p->re1->GlobalReplace(qr->replace_pattern,ret->new_query); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set destination hostgroup: %d\n", qr->rule_id, qr->destination_hostgroup); + ret->destination_hostgroup=qr->destination_hostgroup; + } + if (ptr) { // we aren't processing a STMT_EXECUTE + if (qr->replace_pattern) { + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d on match_pattern \"%s\" has a replace_pattern \"%s\" to apply\n", qr->rule_id, qr->match_pattern, qr->replace_pattern); + if (ret->new_query==NULL) ret->new_query=new std::string(query); + re2_t *re2p=(re2_t *)qr->regex_engine2; + if (re2p->re2) { + //RE2::Replace(ret->new_query,qr->match_pattern,qr->replace_pattern); + if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) { + re2p->re2->GlobalReplace(ret->new_query,qr->match_pattern,qr->replace_pattern); + } else { + re2p->re2->Replace(ret->new_query,qr->match_pattern,qr->replace_pattern); + } } else { - re2p->re1->Replace(qr->replace_pattern,ret->new_query); + //re2p->re1->Replace(ret->new_query,qr->replace_pattern); + if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) { + re2p->re1->GlobalReplace(qr->replace_pattern,ret->new_query); + } else { + re2p->re1->Replace(qr->replace_pattern,ret->new_query); + } } - } + } } if (qr->apply==true) {