Merge pull request #2723 from sysown/v2.0.11-QP_stmt_2

Do not cache routing metadata for COM_STMT_EXECUTE
pull/2749/head
René Cannaò 6 years ago committed by GitHub
commit 9acfaa211c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -51,7 +51,7 @@ class MySQL_STMT_Global_info {
char *schemaname;
char *query;
unsigned int query_length;
unsigned int hostgroup_id;
// unsigned int hostgroup_id;
int ref_count_client;
int ref_count_server;
uint64_t statement_id;
@ -59,14 +59,14 @@ class MySQL_STMT_Global_info {
uint16_t num_params;
uint16_t warning_count;
MYSQL_FIELD **fields;
struct {
int cache_ttl;
int timeout;
int delay;
} properties;
// struct {
// int cache_ttl;
// int timeout;
// int delay;
// } properties;
bool is_select_NOT_for_update;
MYSQL_BIND **params; // seems unused (?)
MySQL_STMT_Global_info(uint64_t id, unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h);
MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h);
void update_metadata(MYSQL_STMT *stmt);
~MySQL_STMT_Global_info();
};
@ -217,7 +217,7 @@ class MySQL_STMTs_local_v14 {
return is_client_;
}
void backend_insert(uint64_t global_statement_id, MYSQL_STMT *stmt);
uint64_t compute_hash(unsigned int hostgroup, char *user, char *schema, char *query, unsigned int query_length);
uint64_t compute_hash(char *user, char *schema, char *query, unsigned int query_length);
unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); }
uint32_t generate_new_client_stmt_id(uint64_t global_statement_id);
uint64_t find_global_stmt_id_from_client(uint32_t client_stmt_id);
@ -259,7 +259,7 @@ class MySQL_STMT_Manager_v14 {
void unlock() { pthread_rwlock_unlock(&rwlock_); }
void ref_count_client(uint64_t _stmt, int _v, bool lock=true);
void ref_count_server(uint64_t _stmt, int _v, bool lock=true);
MySQL_STMT_Global_info * add_prepared_statement(unsigned int h, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock=true);
MySQL_STMT_Global_info * add_prepared_statement(char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock=true);
void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached, uint64_t *s_unique, uint64_t *s_total);
SQLite3_result * get_prepared_statements_global_infos();
};

@ -12,11 +12,10 @@
extern MySQL_STMT_Manager_v14 *GloMyStmt;
//#endif
static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user,
static uint64_t stmt_compute_hash(char *user,
char *schema, char *query,
unsigned int query_length) {
int l = 0;
l += sizeof(hostgroup);
l += strlen(user);
l += strlen(schema);
// two random seperators
@ -27,9 +26,6 @@ static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user,
l += query_length;
char *buf = (char *)malloc(l);
l = 0;
// write hostgroup
memcpy(buf, &hostgroup, sizeof(hostgroup));
l += sizeof(hostgroup);
// write user
strcpy(buf + l, user);
@ -57,7 +53,7 @@ static uint64_t stmt_compute_hash(unsigned int hostgroup, char *user,
}
void MySQL_STMT_Global_info::compute_hash() {
hash = stmt_compute_hash(hostgroup_id, username, schemaname, query,
hash = stmt_compute_hash(username, schemaname, query,
query_length);
}
@ -134,13 +130,12 @@ void *StmtLongDataHandler::get(uint32_t _stmt_id, uint16_t _param_id,
return NULL;
}
MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, unsigned int h,
MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id,
char *u, char *s, char *q,
unsigned int ql,
MYSQL_STMT *stmt, uint64_t _h) {
pthread_rwlock_init(&rwlock_, NULL);
statement_id = id;
hostgroup_id = h;
ref_count_client = 0;
ref_count_server = 0;
digest_text = NULL;
@ -248,9 +243,9 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id, unsigned int h,
__exit_MySQL_STMT_Global_info___search_select:
// set default properties:
properties.cache_ttl = -1;
properties.timeout = -1;
properties.delay = -1;
// properties.cache_ttl = -1;
// properties.timeout = -1;
// properties.delay = -1;
fields = NULL;
if (num_columns) {
@ -545,11 +540,11 @@ void MySQL_STMTs_local_v14::backend_insert(uint64_t global_statement_id, MYSQL_S
// GloMyStmt->ref_count_client(global_statement_id, 1);
}
uint64_t MySQL_STMTs_local_v14::compute_hash(unsigned int hostgroup, char *user,
uint64_t MySQL_STMTs_local_v14::compute_hash(char *user,
char *schema, char *query,
unsigned int query_length) {
uint64_t hash;
hash = stmt_compute_hash(hostgroup, user, schema, query, query_length);
hash = stmt_compute_hash(user, schema, query, query_length);
return hash;
}
@ -816,11 +811,11 @@ bool MySQL_STMTs_local_v14::client_close(uint32_t client_statement_id) {
}
MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement(
unsigned int _h, char *u, char *s, char *q, unsigned int ql,
MYSQL_STMT *stmt, int _cache_ttl, int _timeout, int _delay, bool lock) {
char *u, char *s, char *q, unsigned int ql,
MYSQL_STMT *stmt, bool lock) {
MySQL_STMT_Global_info *ret = NULL;
uint64_t hash = stmt_compute_hash(
_h, u, s, q, ql); // this identifies the prepared statement
u, s, q, ql); // this identifies the prepared statement
if (lock) {
pthread_rwlock_wrlock(&rwlock_);
}
@ -852,10 +847,7 @@ MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement(
//next_statement_id++;
MySQL_STMT_Global_info *a =
new MySQL_STMT_Global_info(next_id, _h, u, s, q, ql, stmt, hash);
a->properties.cache_ttl = _cache_ttl;
a->properties.timeout = _timeout;
a->properties.delay = _delay;
new MySQL_STMT_Global_info(next_id, u, s, q, ql, stmt, hash);
// insert it in both maps
map_stmt_id_to_info.insert(std::make_pair(a->statement_id, a));
map_stmt_hash_to_info.insert(std::make_pair(a->hash, a));
@ -936,16 +928,14 @@ void MySQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total,
class PS_global_stats {
public:
uint64_t statement_id;
unsigned int hid;
char *username;
char *schemaname;
uint64_t digest;
unsigned long long ref_count_client;
unsigned long long ref_count_server;
char *query;
PS_global_stats(uint64_t stmt_id, unsigned int h, char *s, char *u, uint64_t d, char *q, unsigned long long ref_c, unsigned long long ref_s) {
PS_global_stats(uint64_t stmt_id, char *s, char *u, uint64_t d, char *q, unsigned long long ref_c, unsigned long long ref_s) {
statement_id = stmt_id;
hid=h;
digest=d;
query=strndup(q, mysql_thread___query_digests_max_digest_length);
username=strdup(u);
@ -969,31 +959,29 @@ class PS_global_stats {
}
char **get_row() {
char buf[128];
char **pta=(char **)malloc(sizeof(char *)*8);
char **pta=(char **)malloc(sizeof(char *)*7);
sprintf(buf,"%lu",statement_id);
pta[0]=strdup(buf);
sprintf(buf,"%u",hid);
pta[1]=strdup(buf);
assert(schemaname);
pta[2]=strdup(schemaname);
pta[1]=strdup(schemaname);
assert(username);
pta[3]=strdup(username);
pta[2]=strdup(username);
sprintf(buf,"0x%016llX", (long long unsigned int)digest);
pta[4]=strdup(buf);
pta[3]=strdup(buf);
assert(query);
pta[5]=strdup(query);
pta[4]=strdup(query);
sprintf(buf,"%llu",ref_count_client);
pta[6]=strdup(buf);
pta[5]=strdup(buf);
sprintf(buf,"%llu",ref_count_server);
pta[7]=strdup(buf);
pta[6]=strdup(buf);
return pta;
}
void free_row(char **pta) {
int i;
for (i=0;i<8;i++) {
for (i=0;i<7;i++) {
assert(pta[i]);
free(pta[i]);
}
@ -1004,10 +992,9 @@ class PS_global_stats {
SQLite3_result * MySQL_STMT_Manager_v14::get_prepared_statements_global_infos() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current prepared statements global info\n");
SQLite3_result *result=new SQLite3_result(8);
SQLite3_result *result=new SQLite3_result(7);
rdlock();
result->add_column_definition(SQLITE_TEXT,"stmt_id");
result->add_column_definition(SQLITE_TEXT,"hid");
result->add_column_definition(SQLITE_TEXT,"schemaname");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"digest");
@ -1017,7 +1004,7 @@ SQLite3_result * MySQL_STMT_Manager_v14::get_prepared_statements_global_infos()
for (std::map<uint64_t, MySQL_STMT_Global_info *>::iterator it = map_stmt_id_to_info.begin();
it != map_stmt_id_to_info.end(); ++it) {
MySQL_STMT_Global_info *a = it->second;
PS_global_stats * pgs = new PS_global_stats(a->statement_id, a->hostgroup_id,
PS_global_stats * pgs = new PS_global_stats(a->statement_id,
a->schemaname, a->username,
a->hash, a->query,
a->ref_count_client, a->ref_count_server);

@ -3104,7 +3104,18 @@ __get_pkts_from_client:
// because the offset will be identical
CurrentQuery.begin((unsigned char *)pkt.ptr,pkt.size,true);
timespec begint;
timespec endt;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
}
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
(endt.tv_sec*1000000000+endt.tv_nsec) -
(begint.tv_sec*1000000000+begint.tv_nsec);
}
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup);
if (rc_break==true) {
@ -3143,7 +3154,7 @@ __get_pkts_from_client:
if (client_myds->myconn->local_stmts==NULL) {
client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true);
}
uint64_t hash=client_myds->myconn->local_stmts->compute_hash(current_hostgroup,(char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
uint64_t hash=client_myds->myconn->local_stmts->compute_hash((char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
MySQL_STMT_Global_info *stmt_info=NULL;
// we first lock GloStmt
GloMyStmt->wrlock();
@ -3187,9 +3198,10 @@ __get_pkts_from_client:
break;
} else {
// if we reach here, we are on MySQL module
bool rc_break=false;
bool lock_hostgroup = false;
thread->status_variables.stvar[st_var_frontend_stmt_execute]++;
thread->status_variables.stvar[st_var_queries]++;
//bool rc_break=false;
uint32_t client_stmt_id=0;
uint64_t stmt_global_id=0;
@ -3215,6 +3227,22 @@ __get_pkts_from_client:
CurrentQuery.stmt_info=stmt_info;
CurrentQuery.start_time=thread->curtime;
timespec begint;
timespec endt;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
}
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
if (qpo->max_lag_ms >= 0) {
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
}
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&endt);
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
(endt.tv_sec*1000000000+endt.tv_nsec) -
(begint.tv_sec*1000000000+begint.tv_nsec);
}
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
// we now take the metadata associated with STMT_EXECUTE from MySQL_STMTs_meta
bool stmt_meta_found=true; // let's be optimistic and we assume we will found it
stmt_execute_metadata_t *stmt_meta=sess_STMTs_meta->find(stmt_global_id);
@ -3240,10 +3268,41 @@ __get_pkts_from_client:
// else
CurrentQuery.stmt_meta=stmt_meta;
// assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
// NOTE: we do not call YET the follow function for STMT_EXECUTE
//rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt);
current_hostgroup=stmt_info->hostgroup_id;
//current_hostgroup=qpo->destination_hostgroup;
rc_break=handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup, true);
if (rc_break==true) {
break;
}
if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6
if (locked_on_hostgroup < 0) {
if (lock_hostgroup) {
// we are locking on hostgroup now
locked_on_hostgroup = current_hostgroup;
}
}
if (locked_on_hostgroup >= 0) {
if (current_hostgroup != locked_on_hostgroup) {
client_myds->DSS=STATE_QUERY_SENT_NET;
//int l = CurrentQuery.QueryLength;
int l = CurrentQuery.stmt_info->query_length;
char *end = (char *)"";
if (l>256) {
l=253;
end = (char *)"...";
}
string nqn = string((char *)CurrentQuery.stmt_info->query,l);
char *err_msg = (char *)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s";
char *buf = (char *)malloc(strlen(err_msg)+strlen(nqn.c_str())+strlen(end)+64);
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,client_myds->pkt_sid+1,9005,(char *)"HY000",buf, true);
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
RequestEnd(NULL);
free(buf);
l_free(pkt.size,pkt.ptr);
break;
}
}
}
mybe=find_or_create_backend(current_hostgroup);
status=PROCESSING_STMT_EXECUTE;
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
@ -3676,15 +3735,12 @@ handler_again:
uint64_t global_stmtid;
//bool is_new;
MySQL_STMT_Global_info *stmt_info=NULL;
stmt_info=GloMyStmt->add_prepared_statement(current_hostgroup,
stmt_info=GloMyStmt->add_prepared_statement(
(char *)client_myds->myconn->userinfo->username,
(char *)client_myds->myconn->userinfo->schemaname,
(char *)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
CurrentQuery.mysql_stmt,
qpo->cache_ttl,
qpo->timeout,
qpo->delay,
false);
if (CurrentQuery.QueryParserArgs.digest_text) {
if (stmt_info->digest_text==NULL) {

@ -497,7 +497,7 @@ static int http_handler(void *cls, struct MHD_Connection *connection, const char
#endif /* PROXYSQLCLICKHOUSE */
#define ADMIN_SQLITE_TABLE_STATS_MYSQL_PREPARED_STATEMENTS_INFO "CREATE TABLE stats_mysql_prepared_statements_info (global_stmt_id INT NOT NULL, hostgroup INT NOT NULL , schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , ref_count_client INT NOT NULL , ref_count_server INT NOT NULL , query VARCHAR NOT NULL)"
#define ADMIN_SQLITE_TABLE_STATS_MYSQL_PREPARED_STATEMENTS_INFO "CREATE TABLE stats_mysql_prepared_statements_info (global_stmt_id INT NOT NULL , schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , ref_count_client INT NOT NULL , ref_count_server INT NOT NULL , query VARCHAR NOT NULL)"
static char * admin_variables_names[]= {
(char *)"admin_credentials",
@ -11436,8 +11436,8 @@ void ProxySQL_Admin::stats___mysql_prepared_statements_info() {
char *query1=NULL;
char *query32=NULL;
statsdb->execute("DELETE FROM stats_mysql_prepared_statements_info");
query1=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)";
query32=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8), (?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16), (?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24), (?25, ?26, ?27, ?28, ?29, ?30, ?31, ?32), (?33, ?34, ?35, ?36, ?37, ?38, ?39, ?40), (?41, ?42, ?43, ?44, ?45, ?46, ?47, ?48), (?49, ?50, ?51, ?52, ?53, ?54, ?55, ?56), (?57, ?58, ?59, ?60, ?61, ?62, ?63, ?64), (?65, ?66, ?67, ?68, ?69, ?70, ?71, ?72), (?73, ?74, ?75, ?76, ?77, ?78, ?79, ?80), (?81, ?82, ?83, ?84, ?85, ?86, ?87, ?88), (?89, ?90, ?91, ?92, ?93, ?94, ?95, ?96), (?97, ?98, ?99, ?100, ?101, ?102, ?103, ?104), (?105, ?106, ?107, ?108, ?109, ?110, ?111, ?112), (?113, ?114, ?115, ?116, ?117, ?118, ?119, ?120), (?121, ?122, ?123, ?124, ?125, ?126, ?127, ?128), (?129, ?130, ?131, ?132, ?133, ?134, ?135, ?136), (?137, ?138, ?139, ?140, ?141, ?142, ?143, ?144), (?145, ?146, ?147, ?148, ?149, ?150, ?151, ?152), (?153, ?154, ?155, ?156, ?157, ?158, ?159, ?160), (?161, ?162, ?163, ?164, ?165, ?166, ?167, ?168), (?169, ?170, ?171, ?172, ?173, ?174, ?175, ?176), (?177, ?178, ?179, ?180, ?181, ?182, ?183, ?184), (?185, ?186, ?187, ?188, ?189, ?190, ?191, ?192), (?193, ?194, ?195, ?196, ?197, ?198, ?199, ?200), (?201, ?202, ?203, ?204, ?205, ?206, ?207, ?208), (?209, ?210, ?211, ?212, ?213, ?214, ?215, ?216), (?217, ?218, ?219, ?220, ?221, ?222, ?223, ?224), (?225, ?226, ?227, ?228, ?229, ?230, ?231, ?232), (?233, ?234, ?235, ?236, ?237, ?238, ?239, ?240), (?241, ?242, ?243, ?244, ?245, ?246, ?247, ?248), (?249, ?250, ?251, ?252, ?253, ?254, ?255, ?256)";
query1=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)";
query32=(char *)"INSERT INTO stats_mysql_prepared_statements_info VALUES (?1,?2,?3,?4,?5,?6,?7),(?8,?9,?10,?11,?12,?13,?14),(?15,?16,?17,?18,?19,?20,?21),(?22,?23,?24,?25,?26,?27,?28),(?29,?30,?31,?32,?33,?34,?35),(?36,?37,?38,?39,?40,?41,?42),(?43,?44,?45,?46,?47,?48,?49),(?50,?51,?52,?53,?54,?55,?56),(?57,?58,?59,?60,?61,?62,?63),(?64,?65,?66,?67,?68,?69,?70),(?71,?72,?73,?74,?75,?76,?77),(?78,?79,?80,?81,?82,?83,?84),(?85,?86,?87,?88,?89,?90,?91),(?92,?93,?94,?95,?96,?97,?98),(?99,?100,?101,?102,?103,?104,?105),(?106,?107,?108,?109,?110,?111,?112),(?113,?114,?115,?116,?117,?118,?119),(?120,?121,?122,?123,?124,?125,?126),(?127,?128,?129,?130,?131,?132,?133),(?134,?135,?136,?137,?138,?139,?140),(?141,?142,?143,?144,?145,?146,?147),(?148,?149,?150,?151,?152,?153,?154),(?155,?156,?157,?158,?159,?160,?161),(?162,?163,?164,?165,?166,?167,?168),(?169,?170,?171,?172,?173,?174,?175),(?176,?177,?178,?179,?180,?181,?182),(?183,?184,?185,?186,?187,?188,?189),(?190,?191,?192,?193,?194,?195,?196),(?197,?198,?199,?200,?201,?202,?203),(?204,?205,?206,?207,?208,?209,?210),(?211,?212,?213,?214,?215,?216,?217),(?218,?219,?220,?221,?222,?223,?224)";
//rc=sqlite3_prepare_v2(mydb3, query1, -1, &statement1, 0);
rc = statsdb->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, statsdb);
@ -11451,14 +11451,13 @@ void ProxySQL_Admin::stats___mysql_prepared_statements_info() {
SQLite3_row *r1=*it;
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
rc=sqlite3_bind_int64(statement32, (idx*8)+1, atoll(r1->fields[0])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement32, (idx*8)+2, atoll(r1->fields[1])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*8)+3, r1->fields[2], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*8)+4, r1->fields[3], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*8)+5, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement32, (idx*8)+6, atoll(r1->fields[6])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement32, (idx*8)+7, atoll(r1->fields[7])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*8)+8, r1->fields[5], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement32, (idx*7)+1, atoll(r1->fields[0])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*7)+2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*7)+3, r1->fields[2], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*7)+4, r1->fields[3], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement32, (idx*7)+5, atoll(r1->fields[5])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement32, (idx*7)+6, atoll(r1->fields[6])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement32, (idx*7)+7, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
if (idx==31) {
SAFE_SQLITE3_STEP2(statement32);
rc=sqlite3_clear_bindings(statement32); ASSERT_SQLITE_OK(rc, statsdb);
@ -11466,13 +11465,12 @@ void ProxySQL_Admin::stats___mysql_prepared_statements_info() {
}
} else { // single row
rc=sqlite3_bind_int64(statement1, 1, atoll(r1->fields[0])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement1, 2, atoll(r1->fields[1])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement1, 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement1, 3, r1->fields[2], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement1, 4, r1->fields[3], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement1, 5, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement1, 5, atoll(r1->fields[5])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement1, 6, atoll(r1->fields[6])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_int64(statement1, 7, atoll(r1->fields[7])); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement1, 8, r1->fields[5], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_bind_text(statement1, 7, r1->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, statsdb);
SAFE_SQLITE3_STEP2(statement1);
rc=sqlite3_clear_bindings(statement1); ASSERT_SQLITE_OK(rc, statsdb);
rc=sqlite3_reset(statement1); ASSERT_SQLITE_OK(rc, statsdb);

@ -1290,26 +1290,44 @@ SQLite3_result * Query_Processor::get_query_digests_reset() {
}
Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *sess, void *ptr, unsigned int size, Query_Info *qi) {
// NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE
// to avoid unnecssary deallocation/allocation, we initialize qpo witout new allocation
Query_Processor_Output *ret=sess->qpo;
ret->init();
SQP_par_t stmt_exec_qp;
SQP_par_t *qp=NULL;
if (qi) {
qp=(SQP_par_t *)&qi->QueryParserArgs;
// NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE
if (ptr) {
qp=(SQP_par_t *)&qi->QueryParserArgs;
} else {
qp=&stmt_exec_qp;
qp->digest = qi->stmt_info->digest;
qp->digest_text = qi->stmt_info->digest_text;
qp->first_comment = NULL;
}
}
#define stackbuffer_size 128
char stackbuffer[stackbuffer_size];
unsigned int len=size-sizeof(mysql_hdr)-1;
unsigned int len=0;
char *query=NULL;
if (len < stackbuffer_size) {
query=stackbuffer;
// NOTE: if ptr == NULL , we are calling process_mysql_query() on an STMT_EXECUTE
if (ptr) {
len = size-sizeof(mysql_hdr)-1;
if (len < stackbuffer_size) {
query=stackbuffer;
} else {
query=(char *)l_alloc(len+1);
}
memcpy(query,(char *)ptr+sizeof(mysql_hdr)+1,len);
query[len]=0;
} else {
query=(char *)l_alloc(len+1);
query = qi->stmt_info->query;
len = qi->stmt_info->query_length;
}
memcpy(query,(char *)ptr+sizeof(mysql_hdr)+1,len);
query[len]=0;
if (__sync_add_and_fetch(&version,0) > _thr_SQP_version) {
// update local rules;
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Detected a changed in version. Global:%d , local:%d . Refreshing...\n", version, _thr_SQP_version);
@ -1524,112 +1542,113 @@ __internal_loop:
flagIN=qr->flagOUT;
set_flagOUT=true;
//sess->query_info.flagOUT=flagIN;
}
if (qr->reconnect >= 0) {
}
if (qr->reconnect >= 0) {
// Note: negative reconnect means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set reconnect: %d. Query will%s be rexecuted if connection is lost\n", qr->rule_id, qr->reconnect, (qr->reconnect == 0 ? " NOT" : "" ));
ret->reconnect=qr->reconnect;
}
if (qr->timeout >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set reconnect: %d. Query will%s be rexecuted if connection is lost\n", qr->rule_id, qr->reconnect, (qr->reconnect == 0 ? " NOT" : "" ));
ret->reconnect=qr->reconnect;
}
if (qr->timeout >= 0) {
// Note: negative timeout means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set timeout: %d. Query will%s be interrupted if exceeding %dms\n", qr->rule_id, qr->timeout, (qr->timeout == 0 ? " NOT" : "" ) , qr->timeout);
ret->timeout=qr->timeout;
}
if (qr->retries >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set timeout: %d. Query will%s be interrupted if exceeding %dms\n", qr->rule_id, qr->timeout, (qr->timeout == 0 ? " NOT" : "" ) , qr->timeout);
ret->timeout=qr->timeout;
}
if (qr->retries >= 0) {
// Note: negative retries means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set retries: %d. Query will%s be re-executed %d times in case of failure\n", qr->rule_id, qr->retries);
ret->retries=qr->retries;
}
if (qr->delay >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set retries: %d. Query will%s be re-executed %d times in case of failure\n", qr->rule_id, qr->retries);
ret->retries=qr->retries;
}
if (qr->delay >= 0) {
// Note: negative delay means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set delay: %d. Session will%s be paused for %dms\n", qr->rule_id, qr->delay, (qr->delay == 0 ? " NOT" : "" ) , qr->delay);
ret->delay=qr->delay;
}
if (qr->next_query_flagIN >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set delay: %d. Session will%s be paused for %dms\n", qr->rule_id, qr->delay, (qr->delay == 0 ? " NOT" : "" ) , qr->delay);
ret->delay=qr->delay;
}
if (qr->next_query_flagIN >= 0) {
// Note: Negative next_query_flagIN means this rule doesn't change the next query flagIN
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set next query flagIN: %d\n", qr->rule_id, qr->next_query_flagIN);
ret->next_query_flagIN=qr->next_query_flagIN;
}
if (qr->mirror_flagOUT >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set next query flagIN: %d\n", qr->rule_id, qr->next_query_flagIN);
ret->next_query_flagIN=qr->next_query_flagIN;
}
if (qr->mirror_flagOUT >= 0) {
// Note: negative mirror_flagOUT means this rule doesn't change the mirror flagOUT
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror flagOUT: %d\n", qr->rule_id, qr->mirror_flagOUT);
ret->mirror_flagOUT=qr->mirror_flagOUT;
}
if (qr->mirror_hostgroup >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror flagOUT: %d\n", qr->rule_id, qr->mirror_flagOUT);
ret->mirror_flagOUT=qr->mirror_flagOUT;
}
if (qr->mirror_hostgroup >= 0) {
// Note: negative mirror_hostgroup means this rule doesn't change the mirror
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror hostgroup: %d. A new session will be created\n", qr->rule_id, qr->mirror_hostgroup);
ret->mirror_hostgroup=qr->mirror_hostgroup;
}
if (qr->error_msg) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->error_msg);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set mirror hostgroup: %d. A new session will be created\n", qr->rule_id, qr->mirror_hostgroup);
ret->mirror_hostgroup=qr->mirror_hostgroup;
}
if (qr->error_msg) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->error_msg);
//proxy_warning("User \"%s\" has issued query that has been filtered: %s \n " , sess->client_myds->myconn->userinfo->username, query);
ret->error_msg=strdup(qr->error_msg);
}
if (qr->OK_msg) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->OK_msg);
ret->error_msg=strdup(qr->error_msg);
}
if (qr->OK_msg) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set error_msg: %s\n", qr->rule_id, qr->OK_msg);
//proxy_warning("User \"%s\" has issued query that has been filtered: %s \n " , sess->client_myds->myconn->userinfo->username, query);
ret->OK_msg=strdup(qr->OK_msg);
}
if (qr->cache_ttl >= 0) {
ret->OK_msg=strdup(qr->OK_msg);
}
if (qr->cache_ttl >= 0) {
// Note: negative TTL means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_ttl: %d. Query will%s hit the cache\n", qr->rule_id, qr->cache_ttl, (qr->cache_ttl == 0 ? " NOT" : "" ));
ret->cache_ttl=qr->cache_ttl;
}
if (qr->cache_empty_result >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_ttl: %d. Query will%s hit the cache\n", qr->rule_id, qr->cache_ttl, (qr->cache_ttl == 0 ? " NOT" : "" ));
ret->cache_ttl=qr->cache_ttl;
}
if (qr->cache_empty_result >= 0) {
// Note: negative value means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_empty_result: %d. Query with empty result will%s hit the cache\n", qr->rule_id, qr->cache_empty_result, (qr->cache_empty_result == 0 ? " NOT" : "" ));
ret->cache_empty_result=qr->cache_empty_result;
}
if (qr->cache_timeout >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_empty_result: %d. Query with empty result will%s hit the cache\n", qr->rule_id, qr->cache_empty_result, (qr->cache_empty_result == 0 ? " NOT" : "" ));
ret->cache_empty_result=qr->cache_empty_result;
}
if (qr->cache_timeout >= 0) {
// Note: negative value means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_timeout: %dms. Query will wait up resulset to be avaiable in query cache before running on backend\n", qr->rule_id, qr->cache_timeout);
ret->cache_timeout=qr->cache_timeout;
}
if (qr->sticky_conn >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set cache_timeout: %dms. Query will wait up resulset to be avaiable in query cache before running on backend\n", qr->rule_id, qr->cache_timeout);
ret->cache_timeout=qr->cache_timeout;
}
if (qr->sticky_conn >= 0) {
// Note: negative sticky_conn means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set sticky_conn: %d. Connection will%s stick\n", qr->rule_id, qr->sticky_conn, (qr->sticky_conn == 0 ? " NOT" : "" ));
ret->sticky_conn=qr->sticky_conn;
}
if (qr->multiplex >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set sticky_conn: %d. Connection will%s stick\n", qr->rule_id, qr->sticky_conn, (qr->sticky_conn == 0 ? " NOT" : "" ));
ret->sticky_conn=qr->sticky_conn;
}
if (qr->multiplex >= 0) {
// Note: negative multiplex means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set multiplex: %d. Connection will%s multiplex\n", qr->rule_id, qr->multiplex, (qr->multiplex == 0 ? " NOT" : "" ));
ret->multiplex=qr->multiplex;
}
if (qr->gtid_from_hostgroup >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set multiplex: %d. Connection will%s multiplex\n", qr->rule_id, qr->multiplex, (qr->multiplex == 0 ? " NOT" : "" ));
ret->multiplex=qr->multiplex;
}
if (qr->gtid_from_hostgroup >= 0) {
// Note: negative gtid_from_hostgroup means this rule doesn't change the gtid_from_hostgroup
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set gtid from hostgroup: %d. A new session will be created\n", qr->rule_id, qr->gtid_from_hostgroup);
ret->gtid_from_hostgroup = qr->gtid_from_hostgroup;
}
if (qr->log >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set gtid from hostgroup: %d. A new session will be created\n", qr->rule_id, qr->gtid_from_hostgroup);
ret->gtid_from_hostgroup = qr->gtid_from_hostgroup;
}
if (qr->log >= 0) {
// Note: negative log means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set log: %d. Query will%s logged\n", qr->rule_id, qr->log, (qr->log == 0 ? " NOT" : "" ));
ret->log=qr->log;
}
if (qr->destination_hostgroup >= 0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set log: %d. Query will%s logged\n", qr->rule_id, qr->log, (qr->log == 0 ? " NOT" : "" ));
ret->log=qr->log;
}
if (qr->destination_hostgroup >= 0) {
// Note: negative hostgroup means this rule doesn't change
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set destination hostgroup: %d\n", qr->rule_id, qr->destination_hostgroup);
ret->destination_hostgroup=qr->destination_hostgroup;
}
if (qr->replace_pattern) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d on match_pattern \"%s\" has a replace_pattern \"%s\" to apply\n", qr->rule_id, qr->match_pattern, qr->replace_pattern);
if (ret->new_query==NULL) ret->new_query=new std::string(query);
re2_t *re2p=(re2_t *)qr->regex_engine2;
if (re2p->re2) {
//RE2::Replace(ret->new_query,qr->match_pattern,qr->replace_pattern);
if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) {
re2p->re2->GlobalReplace(ret->new_query,qr->match_pattern,qr->replace_pattern);
} else {
re2p->re2->Replace(ret->new_query,qr->match_pattern,qr->replace_pattern);
}
} else {
//re2p->re1->Replace(ret->new_query,qr->replace_pattern);
if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) {
re2p->re1->GlobalReplace(qr->replace_pattern,ret->new_query);
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has set destination hostgroup: %d\n", qr->rule_id, qr->destination_hostgroup);
ret->destination_hostgroup=qr->destination_hostgroup;
}
if (ptr) { // we aren't processing a STMT_EXECUTE
if (qr->replace_pattern) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d on match_pattern \"%s\" has a replace_pattern \"%s\" to apply\n", qr->rule_id, qr->match_pattern, qr->replace_pattern);
if (ret->new_query==NULL) ret->new_query=new std::string(query);
re2_t *re2p=(re2_t *)qr->regex_engine2;
if (re2p->re2) {
//RE2::Replace(ret->new_query,qr->match_pattern,qr->replace_pattern);
if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) {
re2p->re2->GlobalReplace(ret->new_query,qr->match_pattern,qr->replace_pattern);
} else {
re2p->re2->Replace(ret->new_query,qr->match_pattern,qr->replace_pattern);
}
} else {
re2p->re1->Replace(qr->replace_pattern,ret->new_query);
//re2p->re1->Replace(ret->new_query,qr->replace_pattern);
if ((qr->re_modifiers & QP_RE_MOD_GLOBAL) == QP_RE_MOD_GLOBAL) {
re2p->re1->GlobalReplace(qr->replace_pattern,ret->new_query);
} else {
re2p->re1->Replace(qr->replace_pattern,ret->new_query);
}
}
}
}
}
if (qr->apply==true) {

Loading…
Cancel
Save