Merge pull request #4764 from sysown/v3.0_compression

Add MySQL compression level variable and change default to compression level 3 (v3.0)
pull/4809/head
René Cannaò 1 year ago committed by GitHub
commit 643b62ddfb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -470,6 +470,7 @@ class MySQL_Threads_Handler
bool parse_failure_logs_digest;
bool default_reconnect;
bool have_compress;
int protocol_compression_level;
bool have_ssl;
bool multiplexing;
// bool stmt_multiplexing;

@ -1168,6 +1168,7 @@ __thread int mysql_thread___poll_timeout;
__thread int mysql_thread___poll_timeout_on_failure;
__thread bool mysql_thread___connection_warming;
__thread bool mysql_thread___have_compress;
__thread int mysql_thread___protocol_compression_level;
__thread bool mysql_thread___have_ssl;
__thread bool mysql_thread___multiplexing;
__thread bool mysql_thread___log_unhealthy_connections;
@ -1464,6 +1465,7 @@ extern __thread int mysql_thread___poll_timeout;
extern __thread int mysql_thread___poll_timeout_on_failure;
extern __thread bool mysql_thread___connection_warming;
extern __thread bool mysql_thread___have_compress;
extern __thread int mysql_thread___protocol_compression_level;
extern __thread bool mysql_thread___have_ssl;
extern __thread bool mysql_thread___multiplexing;
extern __thread bool mysql_thread___log_unhealthy_connections;

@ -504,6 +504,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"handle_warnings",
(char *)"evaluate_replication_lag_on_servers_load",
(char *)"proxy_protocol_networks",
(char *)"protocol_compression_level",
NULL
};
@ -1137,6 +1138,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.enable_load_data_local_infile=false;
variables.log_mysql_warnings_enabled=false;
variables.data_packets_history_size=0;
variables.protocol_compression_level=3;
// status variables
status_variables.mirror_sessions_current=0;
__global_MySQL_Thread_Variables_version=1;
@ -2268,6 +2270,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false);
VariablesPointers_int["handle_warnings"] = make_tuple(&variables.handle_warnings, 0, 1, false);
VariablesPointers_int["evaluate_replication_lag_on_servers_load"] = make_tuple(&variables.evaluate_replication_lag_on_servers_load, 0, 1, false);
VariablesPointers_int["protocol_compression_level"] = make_tuple(&variables.protocol_compression_level, -1, 9, false);
// logs
VariablesPointers_int["auditlog_filesize"] = make_tuple(&variables.auditlog_filesize, 1024*1024, 1*1024*1024*1024, false);
@ -4178,6 +4181,7 @@ void MySQL_Thread::refresh_variables() {
REFRESH_VARIABLE_INT(poll_timeout);
REFRESH_VARIABLE_INT(poll_timeout_on_failure);
REFRESH_VARIABLE_BOOL(have_compress);
REFRESH_VARIABLE_INT(protocol_compression_level);
REFRESH_VARIABLE_BOOL(have_ssl);
REFRESH_VARIABLE_BOOL(multiplexing);
REFRESH_VARIABLE_BOOL(log_unhealthy_connections);
@ -4262,6 +4266,8 @@ MySQL_Thread::MySQL_Thread() {
mysql_thread___ssl_p2s_crl=NULL;
mysql_thread___ssl_p2s_crlpath=NULL;
mysql_thread___protocol_compression_level=3;
last_maintenance_time=0;
last_move_to_idle_thread_time=0;
maintenance_loop=true;

@ -1234,7 +1234,7 @@ void MySQL_Data_Stream::generate_compressed_packet() {
total_size+=p2.size;
l_free(p2.size,p2.ptr);
}
int rc=compress(dest, &destLen, source, sourceLen);
int rc=compress2(dest, &destLen, source, sourceLen, mysql_thread___protocol_compression_level);
assert(rc==Z_OK);
l_free(total_size, source);
queueOUT.pkt.size=destLen+7;
@ -1266,9 +1266,9 @@ void MySQL_Data_Stream::generate_compressed_packet() {
dest1=(Bytef *)malloc(destLen1+7);
destLen2=len2*120/100+12;
dest2=(Bytef *)malloc(destLen2+7);
rc=compress(dest1+7, &destLen1, (const unsigned char *)p2.ptr, len1);
rc=compress2(dest1+7, &destLen1, (const unsigned char *)p2.ptr, len1, mysql_thread___protocol_compression_level);
assert(rc==Z_OK);
rc=compress(dest2+7, &destLen2, (const unsigned char *)p2.ptr+len1, len2);
rc=compress2(dest2+7, &destLen2, (const unsigned char *)p2.ptr+len1, len2, mysql_thread___protocol_compression_level);
assert(rc==Z_OK);
hdr.pkt_length=destLen1;

@ -9,13 +9,6 @@ inline int fastrand() {
return (g_seed>>16)&0x7FFF;
}
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
#define NSRV 24
#define NLOOP 10000000

@ -180,5 +180,6 @@
"eof_fast_forward-t" : [ "default", "mysql-auto_increment_delay_multiplex=0", "mysql-multiplexing=false", "mysql-query_digests=0", "mysql-query_digests_keep_comment=1" ],
"eof_mixed_flags_queries-t" : [ "default", "mysql-auto_increment_delay_multiplex=0", "mysql-multiplexing=false", "mysql-query_digests=0", "mysql-query_digests_keep_comment=1" ],
"eof_packet_mixed_queries-t" : [ "default", "mysql-auto_increment_delay_multiplex=0", "mysql-multiplexing=false", "mysql-query_digests=0", "mysql-query_digests_keep_comment=1" ],
"ok_packet_mixed_queries-t" : [ "default", "mysql-auto_increment_delay_multiplex=0", "mysql-multiplexing=false", "mysql-query_digests=0", "mysql-query_digests_keep_comment=1" ]
"ok_packet_mixed_queries-t" : [ "default", "mysql-auto_increment_delay_multiplex=0", "mysql-multiplexing=false", "mysql-query_digests=0", "mysql-query_digests_keep_comment=1" ],
"mysql-protocol_compression_level-t" : [ "default", "mysql-auto_increment_delay_multiplex=0", "mysql-multiplexing=false", "mysql-query_digests=0", "mysql-query_digests_keep_comment=1" ]
}

@ -399,6 +399,12 @@ int create_table_test_sbtest1(int num_rows, MYSQL *mysql) {
return add_more_rows_test_sbtest1(num_rows, mysql);
}
unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
int create_table_test_sqlite_sbtest1(int num_rows, MYSQL *mysql) {
MYSQL_QUERY(mysql, "DROP TABLE IF EXISTS sbtest1");
MYSQL_QUERY(mysql, "CREATE TABLE IF NOT EXISTS sbtest1 (id INTEGER PRIMARY KEY AUTOINCREMENT, `k` int(10) NOT NULL DEFAULT '0', `c` char(120) NOT NULL DEFAULT '', `pad` char(60) NOT NULL DEFAULT '')");

@ -144,6 +144,8 @@ int create_table_test_sbtest1(int num_rows, MYSQL *mysql);
int create_table_test_sqlite_sbtest1(int num_rows, MYSQL *mysql); // as above, but for SQLite3 server
int add_more_rows_test_sbtest1(int num_rows, MYSQL *mysql, bool sqlite=false);
unsigned long long monotonic_time();
using mysql_res_row = std::vector<std::string>;
/**

@ -36,12 +36,6 @@ inline int fastrand() {
return (g_seed>>16)&0x7FFF;
}
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
#define NTHREADS 5
#define NCONNS 6
#define NPREP 15000

@ -17,12 +17,6 @@ It uses 2 valid init_connect, and 2 invalid ones that trigger PMC-10003.
It also sets a value that causes a syntax error
*/
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
int main(int argc, char** argv) {
CommandLine cl;

@ -21,12 +21,6 @@ mysql-init_connect
We configure both hostgroup 0 and 1
*/
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
int main(int argc, char** argv) {
CommandLine cl;

@ -11,12 +11,6 @@
#include "command_line.h"
#include "utils.h"
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
std::string queries[5] = {
"SELECT LAST_INSERT_ID() LIMIT 1",

@ -0,0 +1,255 @@
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <string>
#include <sstream>
#include <fstream>
#include "mysql.h"
#include "tap.h"
#include "command_line.h"
#include "utils.h"
unsigned long calculate_query_execution_time(MYSQL* mysql, const std::string& query) {
MYSQL_RES *res;
MYSQL_ROW row;
unsigned long long begin = monotonic_time();
unsigned long long row_count = 0;
unsigned long ret_query = 0;
ret_query = mysql_query(mysql, query.c_str());
if (ret_query != 0) {
fprintf(stderr, "Failed to execute query: Error: %s\n", mysql_error(mysql));
return -1;
}
res = mysql_use_result(mysql);
unsigned long long num_rows = mysql_num_rows(res);
unsigned int num_fields = mysql_num_fields(res);
while ((row = mysql_fetch_row(res))) {
for (unsigned int i = 1; i < num_fields; i++) {
char *field = row[i];
}
row_count++;
}
mysql_free_result(res);
unsigned long long end = monotonic_time();
fprintf(stderr, "Row count: %lld\n", row_count);
return (end - begin);
}
MYSQL* initilize_mysql_connection(char* host, char* username, char* password, int port, bool compression) {
MYSQL* mysql = mysql_init(NULL);
if (!mysql)
return nullptr;
fprintf(stderr, "MySQL connection details: %s %s %d\n", username, password, port);
if (!mysql_real_connect(mysql, host, username, password, NULL, port, NULL, 0)) {
fprintf(stderr, "Failed to connect to database: Error: %s\n",
mysql_error(mysql));
return nullptr;
}
if (compression) {
if (mysql_options(mysql, MYSQL_OPT_COMPRESS, nullptr) != 0) {
fprintf(stderr, "Failed to set mysql compression option: Error: %s\n",
mysql_error(mysql));
return nullptr;
}
}
return mysql;
}
int main(int argc, char** argv) {
CommandLine cl;
std::string query = "SELECT t1.id id1, t1.k k1, t1.c c1, t1.pad pad1, t2.id id2, t2.k k2, t2.c c2, t2.pad pad2 FROM test.sbtest1 t1 JOIN test.sbtest1 t2 LIMIT 90000000";
unsigned long time_proxy = 0;
unsigned long time_proxy_compressed = 0;
unsigned long diff = 0;
unsigned long time_mysql_compressed = 0;
unsigned long time_mysql_without_compressed = 0;
std::string compression_level = {""};
int32_t ret = 0;
MYSQL* proxysql = nullptr;
MYSQL* proxysql_compression = nullptr;
MYSQL* proxysql_admin = nullptr;
MYSQL* mysql_compression = nullptr;
MYSQL* mysql = nullptr;
float performance_diff = 0;
if(cl.getEnv())
return exit_status();
plan(8);
// ProxySQL connection without compression
proxysql = initilize_mysql_connection(cl.host, cl.username, cl.password, cl.port, false);
if (!proxysql) {
goto cleanup;
}
// ProxySQL connection with compression
proxysql_compression = initilize_mysql_connection(cl.host, cl.username, cl.password, cl.port, true);
if (!proxysql_compression) {
goto cleanup;
}
// MySQL connection with compression
mysql_compression = initilize_mysql_connection(cl.host, cl.username, cl.password, cl.mysql_port, true);
if (!mysql_compression) {
goto cleanup;
}
// MySQL connection without compression
mysql = initilize_mysql_connection(cl.host, cl.username, cl.password, cl.mysql_port, false);
if (!mysql) {
goto cleanup;
}
// ProxySQL admin connection
proxysql_admin = initilize_mysql_connection(cl.host, cl.admin_username, cl.admin_password, cl.admin_port, false);
if (!proxysql_admin) {
goto cleanup;
}
// Change default query rules to avoid replication issues; This test only requires the default hostgroup
MYSQL_QUERY(proxysql_admin, "UPDATE mysql_query_rules SET active=0");
MYSQL_QUERY(proxysql_admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
MYSQL_QUERY(proxysql, "CREATE DATABASE IF NOT EXISTS test");
MYSQL_QUERY(proxysql, "DROP TABLE IF EXISTS test.sbtest1");
mysql_query(proxysql, "CREATE TABLE IF NOT EXISTS test.sbtest1 (id INT UNSIGNED NOT NULL AUTO_INCREMENT, k INT UNSIGNED NOT NULL DEFAULT 0, c CHAR(120) NOT NULL DEFAULT '', pad CHAR(60) NOT NULL DEFAULT '', PRIMARY KEY (id), KEY k_1 (k));");
MYSQL_QUERY(proxysql, "USE test");
for (int i = 0; i < 100; i++) {
MYSQL_QUERY(proxysql, "INSERT INTO sbtest1 (k, c, pad) SELECT FLOOR(RAND() * 10000), REPEAT('a', 120), REPEAT('b', 60) FROM information_schema.tables LIMIT 1000;");
}
time_proxy = calculate_query_execution_time(proxysql, query);
diag("Time taken for query with proxysql without compression: %ld", time_proxy);
if (time_proxy == -1) {
goto cleanup;
}
time_proxy_compressed = calculate_query_execution_time(proxysql_compression, query);
diag("Time taken for query with proxysql with compression: %ld", time_proxy_compressed);
if (time_proxy_compressed == -1) {
goto cleanup;
}
diff = abs(time_proxy - time_proxy_compressed);
performance_diff = (float)(diff * 100) / time_proxy;
ok((performance_diff < 10), "proxysql with compression performed well compared to without compression. Performance difference: %f percentage", performance_diff);
time_mysql_compressed = calculate_query_execution_time(mysql_compression, query);
diag("Time taken for query with mysql with compression: %ld", time_mysql_compressed);
if (time_mysql_compressed == -1) {
goto cleanup;
}
time_mysql_without_compressed = calculate_query_execution_time(mysql, query);
diag("Time taken for query with mysql without compression: %ld", time_mysql_without_compressed);
if (time_mysql_without_compressed == -1) {
goto cleanup;
}
diff = abs(time_mysql_without_compressed - time_mysql_compressed);
performance_diff = (float)(diff * 100) / time_mysql_without_compressed;
ok((performance_diff < 10), "MySQL with compression performed well compared to without compression. Performance difference: %f percentage", performance_diff);
ret = get_variable_value(proxysql_admin, "mysql-protocol_compression_level", compression_level, true);
if (ret == EXIT_SUCCESS) {
ok(compression_level == "3", "Run-time default compression level is correct: %s", compression_level.c_str());
}
else {
diag("Failed to get the default compression level.");
goto cleanup;
}
ret = get_variable_value(proxysql_admin, "mysql-protocol_compression_level", compression_level);
if (ret == EXIT_SUCCESS) {
ok(compression_level == "3", "Default compression level is correct: %s", compression_level.c_str());
}
else {
diag("Failed to get the default compression level.");
goto cleanup;
}
set_admin_global_variable(proxysql_admin, "mysql-protocol_compression_level", "8");
if (mysql_query(proxysql_admin, "load mysql variables to runtime")) {
diag("Failed to load mysql variables to runtime.");
goto cleanup;
}
ret = get_variable_value(proxysql_admin, "mysql-protocol_compression_level", compression_level, true);
if (ret == EXIT_SUCCESS) {
ok(compression_level == "8", "Run-time Compression level is set correctly: %s", compression_level.c_str());
}
else {
diag("Failed to set the Compression level is set correctly:");
goto cleanup;
}
ret = get_variable_value(proxysql_admin, "mysql-protocol_compression_level", compression_level);
if (ret == EXIT_SUCCESS) {
ok(compression_level == "8", "Compression level is set correctly: %s", compression_level.c_str());
}
else {
diag("Failed to set the Compression level is set correctly:");
goto cleanup;
}
time_proxy_compressed = calculate_query_execution_time(proxysql_compression, query);
diag("Time taken for query with proxysql with compression level 8: %ld", time_proxy_compressed);
if (time_proxy_compressed == -1) {
goto cleanup;
}
set_admin_global_variable(proxysql_admin, "mysql-protocol_compression_level", "3");
if (mysql_query(proxysql_admin, "load mysql variables to runtime")) {
diag("Failed to load mysql variables to runtime.");
goto cleanup;
}
ret = get_variable_value(proxysql_admin, "mysql-protocol_compression_level", compression_level, true);
if (ret == EXIT_SUCCESS) {
ok(compression_level == "3", "Run-time Compression level set correctly: %s", compression_level.c_str());
}
else {
diag("Failed to set the Compression level set correctly:");
goto cleanup;
}
ret = get_variable_value(proxysql_admin, "mysql-protocol_compression_level", compression_level);
if (ret == EXIT_SUCCESS) {
ok(compression_level == "3", "Compression level set correctly: %s", compression_level.c_str());
}
else {
diag("Failed to set the Compression level set correctly:");
goto cleanup;
}
cleanup:
// Recover default query rules
if (proxysql_admin) {
MYSQL_QUERY(proxysql_admin, "UPDATE mysql_query_rules SET active=1");
MYSQL_QUERY(proxysql_admin, "LOAD MYSQL QUERY RULES TO RUNTIME");
}
if (proxysql)
mysql_close(proxysql);
if (proxysql_compression)
mysql_close(proxysql_compression);
if (mysql_compression)
mysql_close(mysql_compression);
if (mysql)
mysql_close(mysql);
if (proxysql_admin)
mysql_close(proxysql_admin);
return exit_status();
}

@ -19,12 +19,6 @@ This TAP test:
- creates new connections
*/
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
int main(int argc, char** argv) {
CommandLine cl;

@ -222,6 +222,7 @@ mysql_variables =
use_tcp_keepalive="mysql"
verbose_query_error="mysql"
wait_timeout="mysql"
compression_level="mysql"
}
mysql_users:
(

@ -35,12 +35,6 @@ inline int fastrand() {
return (g_seed>>16)&0x7FFF;
}
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
void gen_random_str(char *s, const int len) {
g_seed = monotonic_time() ^ getpid() ^ pthread_self();
static const char alphanum[] =

@ -19,14 +19,6 @@
#include "utils.h"
#include "command_line.h"
unsigned long long monotonic_time() {
struct timespec ts;
//clock_gettime(CLOCK_MONOTONIC_COARSE, &ts); // this is faster, but not precise
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
struct cpu_timer
{
cpu_timer() {

@ -115,13 +115,6 @@ int readTestCasesJSON(const std::string& fileName) {
return 1;
}
unsigned long long monotonic_time() {
struct timespec ts;
//clock_gettime(CLOCK_MONOTONIC_COARSE, &ts); // this is faster, but not precise
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
struct cpu_timer
{
cpu_timer() {

@ -109,13 +109,6 @@ int readTestCasesJSON(const std::string& fileName) {
return 1;
}
unsigned long long monotonic_time() {
struct timespec ts;
//clock_gettime(CLOCK_MONOTONIC_COARSE, &ts); // this is faster, but not precise
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
struct cpu_timer
{
cpu_timer() {

@ -34,12 +34,6 @@ inline int fastrand() {
return (g_seed>>16)&0x7FFF;
}
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
void gen_random_str(char *s, const int len) {
g_seed = monotonic_time() ^ getpid() ^ pthread_self();
static const char alphanum[] =

@ -11,12 +11,6 @@
#include "command_line.h"
#include "utils.h"
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
using std::string;
int main(int argc, char** argv) {

@ -24,12 +24,6 @@ const int ST = 5;
#include "modules_server_test.h"
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
int benchmark_query_rules_fast_routing(CommandLine& cl, MYSQL* proxysql_admin, MYSQL* proxysql_mysql) {
std::string s;
std::pair<std::string, int> host_port {};

@ -11,13 +11,6 @@
#include "command_line.h"
#include "utils.h"
inline unsigned long long monotonic_time() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
int main(int argc, char** argv) {
CommandLine cl;

Loading…
Cancel
Save