Merge pull request #3453 from sysown/v2.2.0-3427

Closes #3427: Read/Write Split query rules do not work while using sysbench to evaluate perfromance
v2.2.0 v2.2.0
René Cannaò 5 years ago committed by GitHub
commit e14accd78b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -59,6 +59,7 @@ class MySQL_STMT_Global_info {
uint16_t num_params;
uint16_t warning_count;
MYSQL_FIELD **fields;
char* first_comment;
// struct {
// int cache_ttl;
// int timeout;
@ -66,7 +67,7 @@ class MySQL_STMT_Global_info {
// } properties;
bool is_select_NOT_for_update;
MYSQL_BIND **params; // seems unused (?)
MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, uint64_t _h);
MySQL_STMT_Global_info(uint64_t id, char *u, char *s, char *q, unsigned int ql, char *fc, MYSQL_STMT *stmt, uint64_t _h);
void update_metadata(MYSQL_STMT *stmt);
~MySQL_STMT_Global_info();
};
@ -259,7 +260,7 @@ class MySQL_STMT_Manager_v14 {
void unlock() { pthread_rwlock_unlock(&rwlock_); }
void ref_count_client(uint64_t _stmt, int _v, bool lock=true);
void ref_count_server(uint64_t _stmt, int _v, bool lock=true);
MySQL_STMT_Global_info * add_prepared_statement(char *u, char *s, char *q, unsigned int ql, MYSQL_STMT *stmt, bool lock=true);
MySQL_STMT_Global_info * add_prepared_statement(char *u, char *s, char *q, unsigned int ql, char *fc, MYSQL_STMT *stmt, bool lock=true);
void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached, uint64_t *s_unique, uint64_t *s_total);
SQLite3_result * get_prepared_statements_global_infos();
};

@ -133,6 +133,7 @@ void *StmtLongDataHandler::get(uint32_t _stmt_id, uint16_t _param_id,
MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id,
char *u, char *s, char *q,
unsigned int ql,
char *fc,
MYSQL_STMT *stmt, uint64_t _h) {
pthread_rwlock_init(&rwlock_, NULL);
statement_id = id;
@ -145,6 +146,11 @@ MySQL_STMT_Global_info::MySQL_STMT_Global_info(uint64_t id,
memcpy(query, q, ql);
query[ql] = '\0'; // add NULL byte
query_length = ql;
if (fc) {
first_comment = strdup(fc);
} else {
first_comment = NULL;
}
MyComQueryCmd = MYSQL_COM_QUERY__UNINITIALIZED;
num_params = stmt->param_count;
num_columns = stmt->field_count;
@ -476,6 +482,9 @@ MySQL_STMT_Global_info::~MySQL_STMT_Global_info() {
free(username);
free(schemaname);
free(query);
if (first_comment) {
free(first_comment);
}
if (num_columns) {
uint16_t i;
for (i = 0; i < num_columns; i++) {
@ -812,10 +821,10 @@ bool MySQL_STMTs_local_v14::client_close(uint32_t client_statement_id) {
MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement(
char *u, char *s, char *q, unsigned int ql,
MYSQL_STMT *stmt, bool lock) {
char *fc, MYSQL_STMT *stmt, bool lock) {
MySQL_STMT_Global_info *ret = NULL;
uint64_t hash = stmt_compute_hash(
u, s, q, ql); // this identifies the prepared statement
u, s, q, ql); // this identifies the prepared statement
if (lock) {
pthread_rwlock_wrlock(&rwlock_);
}
@ -847,7 +856,7 @@ MySQL_STMT_Global_info *MySQL_STMT_Manager_v14::add_prepared_statement(
//next_statement_id++;
MySQL_STMT_Global_info *a =
new MySQL_STMT_Global_info(next_id, u, s, q, ql, stmt, hash);
new MySQL_STMT_Global_info(next_id, u, s, q, ql, fc, stmt, hash);
// insert it in both maps
map_stmt_id_to_info.insert(std::make_pair(a->statement_id, a));
map_stmt_hash_to_info.insert(std::make_pair(a->hash, a));

@ -3052,7 +3052,12 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
if (client_myds->myconn->local_stmts==NULL) {
client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true);
}
uint64_t hash=client_myds->myconn->local_stmts->compute_hash((char *)client_myds->myconn->userinfo->username,(char *)client_myds->myconn->userinfo->schemaname,(char *)CurrentQuery.QueryPointer,CurrentQuery.QueryLength);
uint64_t hash=client_myds->myconn->local_stmts->compute_hash(
(char *)client_myds->myconn->userinfo->username,
(char *)client_myds->myconn->userinfo->schemaname,
(char *)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength
);
MySQL_STMT_Global_info *stmt_info=NULL;
// we first lock GloStmt
GloMyStmt->wrlock();
@ -3137,7 +3142,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID,&begint);
}
qpo=GloQPro->process_mysql_query(this,pkt.ptr,pkt.size,&CurrentQuery);
qpo=GloQPro->process_mysql_query(this,NULL,0,&CurrentQuery);
if (qpo->max_lag_ms >= 0) {
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
}
@ -3858,6 +3863,7 @@ bool MySQL_Session::handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st,
(char *)client_myds->myconn->userinfo->schemaname,
(char *)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
CurrentQuery.QueryParserArgs.first_comment,
CurrentQuery.mysql_stmt,
false);
if (CurrentQuery.QueryParserArgs.digest_text) {
@ -4449,6 +4455,12 @@ handler_again:
stmt_info=GloMyStmt->find_prepared_statement_by_stmt_id(CurrentQuery.stmt_global_id);
CurrentQuery.QueryLength=stmt_info->query_length;
CurrentQuery.QueryPointer=(unsigned char *)stmt_info->query;
// NOTE: Update 'first_comment' with the the from the retrieved
// 'stmt_info' from the found prepared statement. 'CurrentQuery' requires its
// own copy of 'first_comment' because it will later be free by 'QueryInfo::end'.
if (stmt_info->first_comment) {
CurrentQuery.QueryParserArgs.first_comment=strdup(stmt_info->first_comment);
}
previous_status.push(PROCESSING_STMT_EXECUTE);
NEXT_IMMEDIATE(PROCESSING_STMT_PREPARE);
if (CurrentQuery.stmt_global_id!=stmt_info->statement_id) {

@ -1319,7 +1319,7 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses
qp=&stmt_exec_qp;
qp->digest = qi->stmt_info->digest;
qp->digest_text = qi->stmt_info->digest_text;
qp->first_comment = NULL;
qp->first_comment = qi->stmt_info->first_comment;
}
}
#define stackbuffer_size 128
@ -1710,7 +1710,9 @@ __exit_process_mysql_query:
if (len < stackbuffer_size) {
// query is in the stack
} else {
l_free(len+1,query);
if (ptr) {
l_free(len+1,query);
}
}
if (sess->mirror==false) { // we process comments only on original queries, not on mirrors
if (qp && qp->first_comment) {

@ -0,0 +1 @@
reg_test_3427-stmt_first_comment1-t.cpp

@ -0,0 +1,273 @@
/**
* @file reg_test_3427-stmt_first_comment-t.cpp
* @brief This test is a regression test for exercising all code related to
* 'first_comment' changes added in PR #3453.
* @details Testing revealed that the fix introduced for proper routing of
* prepared statements with query rules has invalid interaction with query
* annotation 'hostgroup' feature.
* For solving the issue, 'first_comment' was made part of 'MySQL_STMT_Global_info'.
* This test aims to exercise all the parts of ProxySQL affected by this change.
*
* Procedure:
* =========
*
* The test creates a number of prepared statements and execute them, until passing
* the limit of prepared statements allowed per connection. After the connection
* has been reset by ProxySQL because of the limit exceeding, it tries to execute
* the same prepared statements again. This way those prepared statements wont be
* available in the connection and will need to be fetched by ProxySQL for the
* reset connection.
*/
#include <iostream>
#include <chrono>
#include <ctime>
#include <cstring>
#include <unistd.h>
#include <time.h>
#include <vector>
#include <string>
#include <stdio.h>
#include <mysql.h>
#include <mysql/mysqld_error.h>
#include "proxysql_utils.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
#include "errno.h"
/**
* @brief String size of the columns created for the testing table.
*/
const int STRING_SIZE=32;
/**
* @brief Number of max stmt per connection to be configured for
* ProxySQL.
*/
const uint32_t MAX_STMT_NUM_QUERIES = 20;
/**
* @brief Number of queries to RESET the connection being target,
* it's simply: MAX_STMT_NUM_QUERIES + 1
*/
const uint32_t RESET_CONNECTION_QUERIES = 2*MAX_STMT_NUM_QUERIES;
/**
* @brief Id for the current writer hostgroup.
*/
const uint32_t WRITER_HOSTGROUP_ID = 0;
int main(int argc, char** argv) {
int res = EXIT_SUCCESS;
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to get the required environmental variables.");
return -1;
}
bool param = false;
{
// we parse argv[0] to see if filename includes "param"
std::string str = std::string(argv[0]);
std::size_t found = str.find("param");
if (found!=std::string::npos) {
param = true;
}
}
MYSQL_STMT* stmt = nullptr;
MYSQL* proxysql_mysql = mysql_init(NULL);
MYSQL* proxysql_admin = mysql_init(NULL);
if (!mysql_real_connect(proxysql_mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_mysql));
return -1;
}
if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin));
return -1;
}
stmt = mysql_stmt_init(proxysql_mysql);
if (!stmt) {
diag("mysql_stmt_init(), out of memory");
res = EXIT_FAILURE;
goto exit;
}
// Insert data in the table to be queried
// *************************************************************************
MYSQL_QUERY(proxysql_mysql, "CREATE DATABASE IF NOT EXISTS test");
MYSQL_QUERY(proxysql_mysql, "DROP TABLE IF EXISTS test.reg_test_3427");
MYSQL_QUERY(
proxysql_mysql,
"CREATE TABLE IF NOT EXISTS test.reg_test_3427"
" (id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, `c1` BIGINT, `c2` varchar(32))"
);
MYSQL_QUERY(proxysql_mysql, "INSERT INTO test.reg_test_3427(c1, c2) VALUES (100, 'abcde')");
mysql_close(proxysql_mysql);
// Initialize the connection again
proxysql_mysql = mysql_init(NULL);
if (!mysql_real_connect(proxysql_mysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_mysql));
return -1;
}
// *************************************************************************
{
// Set the number of maximum connections for servers in the writer hostgroup
std::string t_update_mysql_servers {
"UPDATE mysql_servers SET max_connections=1 WHERE hostgroup_id=%d"
};
std::string update_mysql_queries {};
string_format(t_update_mysql_servers, update_mysql_queries, WRITER_HOSTGROUP_ID);
MYSQL_QUERY(proxysql_admin, update_mysql_queries.c_str());
MYSQL_QUERY(proxysql_admin, "LOAD MYSQL SERVERS TO RUNTIME");
// Set the number of maximum prepared statements per connection
std::string t_max_stmt_query {
"SET mysql-max_stmts_per_connection=%d"
};
std::string max_stmt_query {};
string_format(t_max_stmt_query, max_stmt_query, MAX_STMT_NUM_QUERIES);
MYSQL_QUERY(proxysql_admin, max_stmt_query.c_str());
MYSQL_QUERY(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME");
uint32_t query_id = 0;
for (uint32_t i = 0; i < RESET_CONNECTION_QUERIES; i++) {
if (i <= MAX_STMT_NUM_QUERIES) {
query_id = i;
} else if (i == MAX_STMT_NUM_QUERIES + 1) {
query_id = 0;
} else {
query_id += 1;
}
// create unique stmt
std::string query_t {};
if (param) {
query_t = "SELECT /*+ ;hostgroup=0;%d */ * FROM test.reg_test_3427 WHERE id IN (?)";
} else {
query_t = "SELECT /*+ ;hostgroup=0;%d */ * FROM test.reg_test_3427";
}
std::string query {};
string_format(query_t, query, query_id);
if (mysql_stmt_prepare(stmt, query.c_str(), strlen(query.c_str()))) {
diag("mysql_stmt_prepare at line %d failed: %s", __LINE__ , mysql_error(proxysql_mysql));
mysql_close(proxysql_mysql);
res = EXIT_FAILURE;
goto exit;
}
if (param) {
MYSQL_BIND bind_params;
int64_t data_param = 1;
memset(&bind_params, 0, sizeof(MYSQL_BIND));
bind_params.buffer_type = MYSQL_TYPE_LONGLONG;
bind_params.buffer = (char *)&data_param;
bind_params.buffer_length = sizeof(int64_t);
if (mysql_stmt_bind_param(stmt, &bind_params)) {
diag(
"mysql_stmt_bind_result at line %d failed: %s", __LINE__ ,
mysql_stmt_error(stmt)
);
res = EXIT_FAILURE;
goto exit;
}
}
if (mysql_stmt_execute(stmt)) {
diag(
"mysql_stmt_execute at line %d failed: %s", __LINE__ ,
mysql_stmt_error(stmt)
);
res = EXIT_FAILURE;
goto exit;
}
MYSQL_BIND bind[3];
int data_id;
int64_t data_c1;
char data_c2[STRING_SIZE];
char is_null[3];
long unsigned int length[3];
char error[3];
memset(bind, 0, sizeof(bind));
bind[0].buffer_type = MYSQL_TYPE_LONG;
bind[0].buffer = (char *)&data_id;
bind[0].buffer_length = sizeof(int);
bind[0].is_null = &is_null[0];
bind[0].length = &length[0];
bind[1].buffer_type = MYSQL_TYPE_LONGLONG;
bind[1].buffer = (char *)&data_c1;
bind[1].buffer_length = sizeof(int64_t);
bind[1].is_null = &is_null[1];
bind[1].length = &length[1];
bind[2].buffer_type = MYSQL_TYPE_STRING;
bind[2].buffer = (char *)&data_c2;
bind[2].buffer_length = STRING_SIZE;
bind[2].is_null = &is_null[2];
bind[2].length = &length[2];
bind[2].error = &error[2];
if (mysql_stmt_bind_result(stmt, bind)) {
diag(
"mysql_stmt_bind_result at line %d failed: %s", __LINE__,
mysql_stmt_error(stmt)
);
res = EXIT_FAILURE;
goto exit;
}
if (mysql_stmt_fetch(stmt) == 1) {
diag(
"mysql_stmt_fetch at line %d failed: %s", __LINE__,
mysql_stmt_error(stmt)
);
res = EXIT_FAILURE;
goto exit;
}
bool data_match_expected =
(data_id == static_cast<int64_t>(1)) &&
(data_c1 == static_cast<int64_t>(100)) &&
(strcmp(data_c2, "abcde") == 0);
if (data_match_expected == false) {
diag(
"Prepared statement SELECT result didn't matched expected -"
" Exp=(id:1, c1:100, c2:'abcde'), Act=(id:%d, c1:%ld, c2:'%s')",
data_id,
data_c1,
data_c2
);
res = EXIT_FAILURE;
goto exit;
}
}
}
exit:
if (stmt) { mysql_stmt_close(stmt); }
mysql_close(proxysql_mysql);
mysql_close(proxysql_admin);
return exit_status();
}

@ -0,0 +1 @@
reg_test_3427-stmt_first_comment1-t.cpp

@ -0,0 +1 @@
reg_test_3427-stmt_first_comment1-t.cpp

@ -0,0 +1,594 @@
/**
* @file test_query_rules_routing-t.cpp
* @brief This test is an initial version for testing query routing to
* different hostgroups through 'query rules'. It aims to check that
* arbitrary query rules are properly matched and queries are executed in
* the target hostgroups for both 'text protocol' and 'prepared statements'.
*/
#include <algorithm>
#include <cstring>
#include <cmath>
#include <chrono>
#include <climits>
#include <numeric>
#include <memory>
#include <string>
#include <stdio.h>
#include <vector>
#include <tuple>
#include <unistd.h>
#include <mysql.h>
#include <mysql/mysqld_error.h>
#include "command_line.h"
#include "proxysql_utils.h"
#include "tap.h"
#include "utils.h"
int g_seed = 0;
inline int fastrand() {
g_seed = (214013*g_seed+2531011);
return (g_seed>>16)&0x7FFF;
}
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
void gen_random_str(char *s, const int len) {
g_seed = monotonic_time() ^ getpid() ^ pthread_self();
static const char alphanum[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
for (int i = 0; i < len; ++i) {
s[i] = alphanum[fastrand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
/**
* @brief For now a query rules test for destination hostgroup is going
* to consist into:
*
* - A set of rules to apply.
* - A set of queries to exercise those rules.
* - The destination hostgroup in which the queries are supposed to end.
*/
using dst_hostgroup_test =
std::pair<std::vector<std::string>, std::vector<std::pair<std::string, int>>>;
/**
* @brief All supplied queries should be unique, to know that two queries
* are going to be executed in the backend when a prepared statement
* is executed: <PREPARE + EXECUTE>
*/
std::vector<dst_hostgroup_test> dst_hostgroup_tests {
{
{
"INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)"
" VALUES (1,1,'^SELECT.*FOR UPDATE',0,1)",
"INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)"
" VALUES (2,1,'^SELECT',1,1)"
},
{
{
"SELECT /*+ ;%s */ 1",
1
},
{
"SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id=1",
1
},
{
"SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 20",
1
},
{
"SELECT /*+ ;%s */ SUM(k) c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10",
1
},
{
"INSERT /*+ ;%s */ INTO test.reg_test_3427_0 (k) VALUES (2)",
0
},
{
"UPDATE /*+ ;%s */ test.reg_test_3427_0 SET pad=\"random\" WHERE id=2",
0
},
{
"SELECT DISTINCT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10 ORDER BY c",
1
}
}
},
{
{
"INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)"
" VALUES (1,1,'^SELECT.*FROM test.reg_test_3427_0 .*',1,1)",
"INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)"
" VALUES (2,1,'^SELECT.*FROM test.reg_test_3427_1 .*',0,1)",
},
{
{
"UPDATE /*+ ;%s */ test.reg_test_3427_0 SET pad=\"random\" WHERE id=2",
0
},
{
"SELECT DISTINCT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10 ORDER BY c",
1
},
{
"SELECT /*+ ;%s */ c FROM test.reg_test_3427_1 WHERE id BETWEEN 1 AND 10 ORDER BY c",
0
},
{
"INSERT /*+ ;%s */ INTO test.reg_test_3427_0 (k) VALUES (2)",
0
},
{
"SELECT DISTINCT /*+ ;hostgroup=0;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10 ORDER BY c",
0
},
}
},
{
{
"INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)"
" VALUES (1,1,'^SELECT.*FOR UPDATE',0,1)",
"INSERT INTO mysql_query_rules (rule_id,active,match_digest,destination_hostgroup,apply)"
" VALUES (2,1,'^SELECT',1,1)"
},
{
{
"UPDATE /*+ ;%s */ test.reg_test_3427_0 SET pad=\"random\" WHERE id=2",
0
},
{
"SELECT /*+ ;hostgroup=0;%s */ c FROM test.reg_test_3427_0 WHERE id=1",
0
},
{
"SELECT /*+ ;hostgroup=0;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 20",
0
},
{
"SELECT /*+ ;hostgroup=0;%s */ SUM(k) c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10",
0
},
{
"SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id=1",
1
},
{
"SELECT /*+ ;%s */ c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 20",
1
},
{
"SELECT /*+ ;%s */ SUM(k) c FROM test.reg_test_3427_0 WHERE id BETWEEN 1 AND 10",
1
}
}
}
};
/**
* @brief Get the current query count for a specific hostgroup.
*
* @param proxysql_admin A already opened MYSQL connection to ProxySQL admin
* interface.
* @param hostgroup_id The 'hostgroup_id' from which get the query count.
*
* @return The number of queries that have been executed in that hostgroup id.
*/
int get_hostgroup_query_count(MYSQL* proxysql_admin, const int hostgroup_id) {
if (proxysql_admin == NULL) {
return EXIT_FAILURE;
}
int query_count = -1;
std::string t_query {
"SELECT SUM(Queries) FROM stats.stats_mysql_connection_pool WHERE hostgroup=%d"
};
std::string query {};
string_format(t_query, query, hostgroup_id);
MYSQL_QUERY(proxysql_admin, query.c_str());
MYSQL_RES* sum_res = mysql_store_result(proxysql_admin);
MYSQL_ROW row = mysql_fetch_row(sum_res);
if (row[0]) {
query_count = atoi(row[0]);
}
mysql_free_result(sum_res);
return query_count;
}
/**
* @brief Simple function that performs a text protocol query and discards the result.
*
* @param proxysql A already opened MYSQL connection to ProxySQL.
* @param query The query to be executed.
*
* @return The error code of executing the query.
*/
int perform_text_procotol_query(MYSQL* proxysql, const std::string& query) {
int rc = mysql_query(proxysql, query.c_str());
if (rc == 0) {
MYSQL_RES* query_res = mysql_store_result(proxysql);
if (query_res) {
mysql_free_result(query_res);
}
}
return rc;
}
/**
* @brief Simple function that performs a stmt query and discards the result.
*
* @param proxysql A already opened MYSQL connection to ProxySQL.
* @param query The query to be executed.
*
* @return The error code of executing the query.
*/
int perform_stmt_query(MYSQL* proxysql, const std::string& query) {
int rc = EXIT_FAILURE;
MYSQL_STMT* stmt = mysql_stmt_init(proxysql);
if (stmt == NULL) { return EXIT_FAILURE; }
rc = mysql_stmt_prepare(stmt, query.c_str(), strlen(query.c_str()));
if (rc) { return EXIT_FAILURE; }
rc = mysql_stmt_execute(stmt);
if (rc) { return EXIT_FAILURE; }
rc = mysql_stmt_close(stmt);
if (rc) { return EXIT_FAILURE; }
return rc;
}
/**
* @brief Simple helper function for creating a 'sysbench'
* alike testing table.
*
* @param proxysql A already opened MYSQL connection to ProxySQL.
*
* @return EXIT_FAILURE in case of failure or EXIT_SUCCESS otherwise.
*/
int create_testing_tables(MYSQL* proxysql, uint32_t num_tables) {
if (proxysql == NULL) { return EXIT_FAILURE; }
MYSQL_QUERY(proxysql, "CREATE DATABASE IF NOT EXISTS test");
for (uint32_t i = 0; i < num_tables; i++) {
std::string t_drop_table_query {
"DROP TABLE IF EXISTS test.reg_test_3427_%d"
};
std::string t_create_table_query {
"CREATE TABLE IF NOT EXISTS test.reg_test_3427_%d ("
" id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,"
" `k` int(11) NOT NULL DEFAULT '0',"
" `c` char(120) NOT NULL DEFAULT '',"
" `pad` char(60) NOT NULL DEFAULT '',"
" KEY `k_1` (`k`)"
")"
};
std::string t_insert_trivial_val {
"INSERT INTO test.reg_test_3427_%d (k, c, pad) VALUES (3427, 'foo', 'bar')"
};
// Format the queries
std::string drop_table_query {};
string_format(t_drop_table_query, drop_table_query, i);
std::string create_table_query {};
string_format(t_create_table_query, create_table_query, i);
std::string insert_trivial_val {};
string_format(t_insert_trivial_val, insert_trivial_val, i);
// Perform the queries
MYSQL_QUERY(proxysql, drop_table_query.c_str());
MYSQL_QUERY(proxysql, create_table_query.c_str());
// Insert trivial value, we are only interesting in routing
MYSQL_QUERY(proxysql, insert_trivial_val.c_str());
}
return EXIT_SUCCESS;
}
const double COLISSION_PROB = 1e-8;
/**
* @brief Helper function to wait for replication to complete,
* performs a simple supplied queried until it succeed or the
* timeout expires.
*
* @param proxysql A already opened MYSQL connection to ProxySQL.
* @param proxysql_admin A already opened MYSQL connection to ProxySQL Admin interface.
* @param check The query to perform until timeout expires.
* @param timeout The timeout in seconds to retry the query.
* @param reader_hostgroup The current 'reader hostgroup' for which
* servers replication needs to be waited.
*
* @return EXIT_SUCCESS in case of success, EXIT_FAILURE
* otherwise.
*/
int wait_for_replication(
MYSQL* proxysql,
MYSQL* proxysql_admin,
const std::string& check,
uint32_t timeout,
uint32_t read_hostgroup
) {
if (proxysql == NULL) { return EXIT_FAILURE; }
const std::string t_count_reader_hg_servers {
"SELECT COUNT(*) FROM mysql_servers WHERE hostgroup_id=%d"
};
std::string count_reader_hg_servers {};
size_t size =
snprintf(
nullptr, 0, t_count_reader_hg_servers.c_str(), read_hostgroup
) + 1;
{
std::unique_ptr<char[]> buf(new char[size]);
snprintf(buf.get(), size, t_count_reader_hg_servers.c_str(), read_hostgroup);
count_reader_hg_servers = std::string(buf.get(), buf.get() + size - 1);
}
MYSQL_QUERY(proxysql_admin, count_reader_hg_servers.c_str());
MYSQL_RES* hg_count_res = mysql_store_result(proxysql_admin);
MYSQL_ROW row = mysql_fetch_row(hg_count_res);
uint32_t srv_count = strtoul(row[0], NULL, 10);
mysql_free_result(hg_count_res);
if (srv_count > UINT_MAX) {
return EXIT_FAILURE;
}
int waited = 0;
int queries = 0;
int result = EXIT_FAILURE;
if (srv_count != 0) {
int retries =
ceil(
log10(COLISSION_PROB) /
log10(static_cast<long double>(1)/srv_count)
);
auto start = std::chrono::system_clock::now();
std::chrono::duration<double> elapsed {};
while (elapsed.count() < timeout && queries < retries) {
int rc = mysql_query(proxysql, check.c_str());
if (rc == EXIT_SUCCESS) {
MYSQL_RES* st_res = mysql_store_result(proxysql);
if (st_res) {
mysql_free_result(st_res);
}
queries += 1;
continue;
} else {
queries = 0;
waited += 1;
sleep(1);
}
auto it_end = std::chrono::system_clock::now();
elapsed = it_end - start;
}
if (queries == retries) {
result = EXIT_SUCCESS;
}
} else {
result = EXIT_SUCCESS;
}
return result;
}
int main(int argc, char** argv) {
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to get the required environmental variables.");
return -1;
}
plan(dst_hostgroup_tests.size());
MYSQL* proxysql_admin = mysql_init(NULL);
MYSQL* proxysql_text = mysql_init(NULL);
MYSQL* proxysql_stmt = mysql_init(NULL);
if (!mysql_real_connect(proxysql_text, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_text));
return -1;
}
if (!mysql_real_connect(proxysql_stmt, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_stmt));
return -1;
}
if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin));
return -1;
}
// Disable 'auto_increment_delay_multiplex' for avoiding unintentionally
// disabling multiplexing due to inserts.
MYSQL_QUERY(proxysql_admin, "SET mysql-auto_increment_delay_multiplex=0");
MYSQL_QUERY(proxysql_admin, "LOAD MYSQL VARIABLES TO RUNTIME");
// Create the testing table
int c_table_res = create_testing_tables(proxysql_text, 2);
if (c_table_res) { return EXIT_FAILURE; }
int rep_err = wait_for_replication(
proxysql_text,
proxysql_admin,
"SELECT c FROM test.reg_test_3427_0 WHERE id=1",
10,
1
);
if (rep_err) {
fprintf(stderr,
"File %s, line %d, Error: %s\n",
__FILE__, __LINE__, "Waiting for replication failed."
);
return EXIT_FAILURE;
}
for (const auto& dst_hostgroup_test : dst_hostgroup_tests) {
const auto& query_rules = dst_hostgroup_test.first;
const auto& queries_hids = dst_hostgroup_test.second;
// First prepare the query rules
// ********************************************************************
MYSQL_QUERY(proxysql_admin, "DELETE FROM mysql_query_rules");
for (const auto& query_rule : query_rules) {
MYSQL_QUERY(proxysql_admin, query_rule.c_str());
}
MYSQL_QUERY(proxysql_admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
// ********************************************************************
// Secondly execute the queries and check the hostgroup
// ********************************************************************
bool queries_properly_routed = true;
std::vector<std::string> text_queries_failed_to_route {};
std::vector<std::string> stmt_queries_failed_to_route {};
for (const auto& query_hid : queries_hids) {
// Create an unique query
std::string query {};
std::string rnd_str(static_cast<std::size_t>(20), '\0');
gen_random_str(&rnd_str[0], 20);
string_format(query_hid.first, query, rnd_str.c_str());
// First execute the query for text protocol
// ********************************************************************
// Get the current hosgtroup queries
int cur_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second);
// Perform the query in a text protocol connection
int text_prot_res = perform_text_procotol_query(proxysql_text, query);
if (text_prot_res) {
diag(
"Executing 'text_protocol' query: '%s' failed with err code: '%d'",
query.c_str(),
text_prot_res
);
return EXIT_FAILURE;
}
// Get the new hosgtroup queries
int new_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second);
if (new_hid_queries - cur_hid_queries != 1) {
queries_properly_routed = false;
text_queries_failed_to_route.push_back(query);
}
// Secondly execute the query for binary protocol
// ********************************************************************
// Get the current hosgtroup queries
cur_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second);
// Perform the query in a stmt protocol connection
int stmt_res = perform_stmt_query(proxysql_stmt, query);
if (stmt_res) {
diag(
"Executing 'stmt' query: '%s' failed with err code: '%d', err: '%s'",
query.c_str(),
stmt_res,
mysql_error(proxysql_stmt)
);
return EXIT_FAILURE;
}
// Get the new hosgtroup queries
new_hid_queries = get_hostgroup_query_count(proxysql_admin, query_hid.second);
if (new_hid_queries - cur_hid_queries != 2) {
queries_properly_routed = false;
stmt_queries_failed_to_route.push_back(query);
}
}
if (queries_properly_routed == false) {
std::string str_query_rules =
std::accumulate(
query_rules.begin(),
query_rules.end(),
std::string {},
[](const std::string& a, const std::string& b) -> std::string {
return a + (a.length() > 0 ? "\n" : "") + b;
}
);
std::string str_text_queries =
std::accumulate(
text_queries_failed_to_route.begin(),
text_queries_failed_to_route.end(),
std::string {},
[](const std::string& a, const std::string& b) -> std::string {
return a + (a.length() > 0 ? "\n" : "") + b;
}
);
std::string str_stmt_queries =
std::accumulate(
stmt_queries_failed_to_route.begin(),
stmt_queries_failed_to_route.end(),
std::string {},
[](const std::string& a, const std::string& b) -> std::string {
return a + (a.length() > 0 ? "\n" : "") + b;
}
);
diag(
"Test with rules:\n\n%s\n\nFailed to route the following text queries:\n\n%s\n",
str_query_rules.c_str(),
str_text_queries.c_str()
);
diag(
"Test with rules:\n\n%s\n\nFailed to route the following stmt queries:\n\n%s\n",
str_query_rules.c_str(),
str_stmt_queries.c_str()
);
}
ok(queries_properly_routed, "Queries for test were properly routed to the target hostgroups");
}
mysql_close(proxysql_admin);
mysql_close(proxysql_stmt);
mysql_close(proxysql_text);
return exit_status();
}
Loading…
Cancel
Save