Merge pull request #5458 from sysown/copilot/support-zstd-compression

Add MySQL protocol ZSTD compression support
pull/5465/head
René Cannaò 1 month ago committed by GitHub
commit 540fa174b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -109,7 +109,9 @@ class MyProt_tmp_auth_vars {
uint32_t capabilities = 0;
uint32_t max_pkt;
uint32_t pass_len;
uint8_t zstd_compression_level = 0;
bool use_ssl = false;
bool use_zstd_compression = false;
enum proxysql_session_type session_type;
};

@ -716,7 +716,7 @@ class MySQL_Threads_Handler
~MySQL_Threads_Handler();
char *get_variable_string(char *name);
uint16_t get_variable_uint16(char *name);
uint32_t get_variable_uint32(char *name);
int get_variable_int(const char *name);
void print_version();
void init(unsigned int num=0, size_t stack=0);

@ -83,6 +83,7 @@ class MySQL_Connection {
uint32_t server_capabilities;
uint32_t client_flag;
unsigned int compression_min_length;
uint8_t zstd_compression_level;
char *init_connect;
bool init_connect_sent;
char * session_track_gtids;
@ -93,6 +94,7 @@ class MySQL_Connection {
uint8_t protocol_version;
int8_t last_set_autocommit;
bool autocommit;
bool compression_zstd;
bool no_backslash_escapes;
} options;

@ -54,6 +54,18 @@
#include "mariadb_com.h"
#include "proxysql_mem.h"
#if !defined(CLIENT_ZSTD_COMPRESSION_ALGORITHM) && defined(CLIENT_ZSTD_COMPRESSION)
#define CLIENT_ZSTD_COMPRESSION_ALGORITHM CLIENT_ZSTD_COMPRESSION
#endif
#if !defined(CLIENT_ZSTD_COMPRESSION_ALGORITHM)
#define CLIENT_ZSTD_COMPRESSION_ALGORITHM 0
#endif
#if !defined(CLIENT_ZSTD_COMPRESSION)
#define CLIENT_ZSTD_COMPRESSION CLIENT_ZSTD_COMPRESSION_ALGORITHM
#endif
#include "proxysql_structs.h"
#include "proxysql_debug.h"
#include "proxysql_macros.h"

@ -15,6 +15,7 @@ using json = nlohmann::json;
#include "MySQL_Variables.h"
#include <sstream>
#include <zstd.h>
//#include <ma_global.h>
@ -1072,8 +1073,10 @@ bool MySQL_Protocol::generate_pkt_initial_handshake(bool send, void **ptr, unsig
_ptr[l]=0x00; l+=1; //0x00
if (mysql_thread___have_compress) {
mysql_thread___server_capabilities |= CLIENT_COMPRESS;
mysql_thread___server_capabilities |= CLIENT_ZSTD_COMPRESSION_ALGORITHM;
} else {
mysql_thread___server_capabilities &= ~CLIENT_COMPRESS;
mysql_thread___server_capabilities &= ~CLIENT_ZSTD_COMPRESSION_ALGORITHM;
}
if (mysql_thread___have_ssl==true || mysql_thread___default_authentication_plugin_int==2) {
// we enable SSL for client connections for either of these 2 conditions:
@ -1090,8 +1093,14 @@ bool MySQL_Protocol::generate_pkt_initial_handshake(bool send, void **ptr, unsig
} else {
mysql_thread___server_capabilities &= ~CLIENT_DEPRECATE_EOF;
}
(*myds)->myconn->options.server_capabilities=mysql_thread___server_capabilities;
memcpy(_ptr+l,&mysql_thread___server_capabilities, sizeof(mysql_thread___server_capabilities)/2); l+=sizeof(mysql_thread___server_capabilities)/2;
uint32_t server_capabilities = mysql_thread___server_capabilities;
if (deprecate_eof_active && mysql_thread___enable_client_deprecate_eof) {
server_capabilities |= CLIENT_DEPRECATE_EOF;
} else {
server_capabilities &= ~CLIENT_DEPRECATE_EOF;
}
(*myds)->myconn->options.server_capabilities=server_capabilities;
memcpy(_ptr+l,&server_capabilities, sizeof(server_capabilities)/2); l+=sizeof(server_capabilities)/2;
const MARIADB_CHARSET_INFO *ci = NULL;
ci = proxysql_find_charset_collate(mysql_thread___default_variables[SQL_COLLATION_CONNECTION]);
if (!ci) {
@ -1104,18 +1113,8 @@ bool MySQL_Protocol::generate_pkt_initial_handshake(bool send, void **ptr, unsig
uint8_t uint8_charset = ci->nr & 255;
memcpy(_ptr+l,&uint8_charset, sizeof(uint8_charset)); l+=sizeof(uint8_charset);
memcpy(_ptr+l,&server_status, sizeof(server_status)); l+=sizeof(server_status);
uint32_t extended_capabilities = CLIENT_MULTI_RESULTS | CLIENT_MULTI_STATEMENTS | CLIENT_PS_MULTI_RESULTS |
CLIENT_PLUGIN_AUTH | CLIENT_SESSION_TRACKING | CLIENT_REMEMBER_OPTIONS;
// we conditionally reply the client specifying in 'server_capabilities' that
// 'CLIENT_DEPRECATE_EOF' is available if explicitly enabled by 'mysql-enable_client_deprecate_eof'
// variable. This is the first step of ensuring that client connections doesn't
// enable 'CLIENT_DEPRECATE_EOF' unless explicitly stated by 'mysql-enable_client_deprecate_eof'.
// Second step occurs during client handshake response (process_pkt_handshake_response).
if (deprecate_eof_active && mysql_thread___enable_client_deprecate_eof) {
extended_capabilities |= CLIENT_DEPRECATE_EOF;
}
// Copy the 'capability_flags_2'
uint16_t upper_word = static_cast<uint16_t>(extended_capabilities >> 16);
// Copy the upper 16 capability bits from the effective server capability mask.
uint16_t upper_word = static_cast<uint16_t>(server_capabilities >> 16);
memcpy(_ptr+l, static_cast<void*>(&upper_word), sizeof(upper_word)); l += sizeof(upper_word);
// Copy the 'auth_plugin_data_len'. Hardcoded due to 'CLIENT_PLUGIN_AUTH' always enabled and reported
// as 'mysql_native_password'.
@ -1682,9 +1681,50 @@ bool MySQL_Protocol::PPHR_2(unsigned char *pkt, unsigned int len, bool& ret, MyP
vars1.pass_len--; // remove the extra 0 if present
}
}
if (vars1._ptr+len > pkt) {
unsigned char *extra_pkt = pkt;
if (vars1._ptr+len > extra_pkt) {
if (vars1.capabilities & CLIENT_PLUGIN_AUTH) {
vars1.auth_plugin = pkt;
unsigned char *packet_end = vars1._ptr + len;
if (extra_pkt >= packet_end) {
ret = false;
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s' . malformed auth plugin offset in handshake response\n", (*myds), (*myds)->sess, vars1.user);
return false;
}
const size_t extra_len = packet_end - extra_pkt;
const size_t auth_plugin_len = strnlen(reinterpret_cast<const char*>(extra_pkt), extra_len);
if (auth_plugin_len == extra_len) {
ret = false;
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s' . malformed auth plugin in handshake response\n", (*myds), (*myds)->sess, vars1.user);
return false;
}
vars1.auth_plugin = extra_pkt;
extra_pkt += auth_plugin_len + 1;
}
const unsigned char* packet_end = vars1._ptr + len;
const bool has_zstd_level = vars1.capabilities & CLIENT_ZSTD_COMPRESSION_ALGORITHM;
const unsigned char* connect_attrs_end = packet_end - (has_zstd_level ? 1 : 0);
if ((vars1.capabilities & CLIENT_CONNECT_ATTRS) && extra_pkt < connect_attrs_end) {
uint64_t attrs_len = 0;
const int attrs_len_enc = mysql_decode_length_ll(extra_pkt, &attrs_len);
if (
attrs_len_enc <= 0
||
static_cast<uint64_t>(connect_attrs_end - extra_pkt) < static_cast<uint64_t>(attrs_len_enc) + attrs_len
) {
ret = false;
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s' . malformed connect attrs in handshake response\n", (*myds), (*myds)->sess, vars1.user);
return false;
}
extra_pkt += attrs_len_enc + attrs_len;
}
if (has_zstd_level) {
if (packet_end <= extra_pkt) {
ret = false;
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s' . missing zstd compression level in handshake response\n", (*myds), (*myds)->sess, vars1.user);
return false;
}
vars1.use_zstd_compression = true;
vars1.zstd_compression_level = *extra_pkt;
}
}
return true;
@ -2161,12 +2201,32 @@ void MySQL_Protocol::PPHR_SetConnAttrs(MyProt_tmp_auth_vars& vars1, account_deta
mysql_variables.client_set_value(sess, SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());
mysql_variables.client_set_value(sess, SQL_COLLATION_CONNECTION, ss.str().c_str());
// enable compression
if (vars1.capabilities & CLIENT_COMPRESS) {
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
myconn->options.compression_min_length=50;
//myconn->set_status_compression(true); // don't enable this here. It needs to be enabled after the OK is sent
}
// Honor an explicit zstd negotiation from the client before falling back to legacy zlib compression.
const bool use_zstd_compression =
vars1.use_zstd_compression
&&
(vars1.capabilities & CLIENT_ZSTD_COMPRESSION_ALGORITHM)
&&
(myconn->options.server_capabilities & CLIENT_ZSTD_COMPRESSION_ALGORITHM);
const bool use_zlib_compression =
!use_zstd_compression
&&
(vars1.capabilities & CLIENT_COMPRESS)
&&
(myconn->options.server_capabilities & CLIENT_COMPRESS);
const uint8_t zstd_compression_level =
(vars1.zstd_compression_level > 0 && vars1.zstd_compression_level <= ZSTD_maxCLevel())
? vars1.zstd_compression_level
: static_cast<uint8_t>(std::min<int>(ZSTD_maxCLevel(), std::max<int>(1, mysql_thread___protocol_compression_level)));
myconn->options.compression_zstd = false;
myconn->options.zstd_compression_level = 0;
if (use_zlib_compression || use_zstd_compression) {
myconn->options.compression_min_length=50;
myconn->options.compression_zstd = use_zstd_compression;
myconn->options.zstd_compression_level = use_zstd_compression ? zstd_compression_level : 0;
//myconn->set_status_compression(true); // don't enable this here. It needs to be enabled after the OK is sent
}
if (attr1.use_ssl==true) {
(*myds)->sess->use_ssl = true;

@ -1678,7 +1678,7 @@ char * MySQL_Threads_Handler::get_variable_string(char *name) {
// LCOV_EXCL_STOP
}
uint16_t MySQL_Threads_Handler::get_variable_uint16(char *name) {
uint32_t MySQL_Threads_Handler::get_variable_uint32(char *name) {
if (!strcasecmp(name,"server_capabilities")) return variables.server_capabilities;
// LCOV_EXCL_START
proxy_error("Not existing variable: %s\n", name); assert(0);
@ -2434,11 +2434,13 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.have_compress=true;
variables.server_capabilities |= CLIENT_COMPRESS;
variables.server_capabilities |= CLIENT_ZSTD_COMPRESSION_ALGORITHM;
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.have_compress=false;
variables.server_capabilities &= ~CLIENT_COMPRESS;
variables.server_capabilities &= ~CLIENT_ZSTD_COMPRESSION_ALGORITHM;
return true;
}
return false;
@ -4704,7 +4706,7 @@ void MySQL_Thread::refresh_variables() {
REFRESH_VARIABLE_CHAR(proxy_protocol_networks);
REFRESH_VARIABLE_CHAR(default_authentication_plugin);
mysql_thread___default_authentication_plugin_int = GloMTH->variables.default_authentication_plugin_int;
mysql_thread___server_capabilities=GloMTH->get_variable_uint16((char *)"server_capabilities");
mysql_thread___server_capabilities=GloMTH->get_variable_uint32((char *)"server_capabilities");
REFRESH_VARIABLE_INT(handle_unknown_charset);
REFRESH_VARIABLE_INT(poll_timeout);
REFRESH_VARIABLE_INT(poll_timeout_on_failure);

@ -444,9 +444,11 @@ MySQL_Connection::MySQL_Connection() {
options.client_flag = 0;
options.compression_min_length=0;
options.zstd_compression_level=0;
options.server_version=NULL;
options.last_set_autocommit=-1; // -1 = never set
options.autocommit=true;
options.compression_zstd=false;
options.no_backslash_escapes=false;
options.init_connect=NULL;
options.init_connect_sent=false;
@ -863,8 +865,10 @@ void MySQL_Connection::connect_start_SetCharset() {
void MySQL_Connection::connect_start_SetClientFlag(unsigned long& client_flags) {
client_flags = 0;
if (parent->compression)
if (parent->compression) {
client_flags |= CLIENT_COMPRESS;
client_flags |= CLIENT_ZSTD_COMPRESSION_ALGORITHM;
}
if (myds) {
if (myds->sess) {
@ -911,9 +915,13 @@ void MySQL_Connection::connect_start_SetClientFlag(unsigned long& client_flags)
// In case of 'fast_forward', we only enable compression if both, client and backend matches. Otherwise,
// we honor the behavior of a regular connection of when a connection doesn't agree on using compression
// during handshake, and we fallback to an uncompressed connection.
client_flags &= ~(CLIENT_COMPRESS); // we disable it by default
if (c->options.client_flag & CLIENT_COMPRESS) {
if (c->options.server_capabilities & CLIENT_COMPRESS) {
client_flags &= ~(CLIENT_COMPRESS | CLIENT_ZSTD_COMPRESSION_ALGORITHM); // we disable it by default
if (c->options.compression_min_length > 0) {
if (c->options.compression_zstd) {
if (c->options.server_capabilities & CLIENT_ZSTD_COMPRESSION_ALGORITHM) {
client_flags |= CLIENT_ZSTD_COMPRESSION_ALGORITHM;
}
} else if (c->options.server_capabilities & CLIENT_COMPRESS) {
client_flags |= CLIENT_COMPRESS;
}
}
@ -999,7 +1007,7 @@ void MySQL_Connection::connect_start() {
char* host_ip = connect_start_DNS_lookup();
async_exit_status=mysql_real_connect_start(&ret_mysql, mysql, host_ip, userinfo->username, auth_password, userinfo->schemaname, parent->port, NULL, client_flags);
} else {
client_flags &= ~(CLIENT_COMPRESS); // disabling compression for connections made via Unix socket
client_flags &= ~(CLIENT_COMPRESS | CLIENT_ZSTD_COMPRESSION_ALGORITHM); // disabling compression for connections made via Unix socket
async_exit_status=mysql_real_connect_start(&ret_mysql, mysql, "localhost", userinfo->username, auth_password, userinfo->schemaname, parent->port, parent->address, client_flags);
}
fd=mysql_get_socket(mysql);

@ -5,6 +5,7 @@ using json = nlohmann::json;
#include "proxysql.h"
#include "cpp.h"
#include <zlib.h>
#include <zstd.h>
#ifndef UNIX_PATH_MAX
#define UNIX_PATH_MAX 108
#endif
@ -19,6 +20,68 @@ using json = nlohmann::json;
extern MySQL_Threads_Handler *GloMTH;
static inline bool use_zstd_compression(const MySQL_Connection* myconn) {
return myconn && myconn->options.compression_zstd;
}
static int get_zstd_compression_level(const MySQL_Connection* myconn) {
const int zstd_level = myconn ? myconn->options.zstd_compression_level : 0;
if (zstd_level > 0 && zstd_level <= ZSTD_maxCLevel()) {
return zstd_level;
}
if (mysql_thread___protocol_compression_level > 0 && mysql_thread___protocol_compression_level <= ZSTD_maxCLevel()) {
return mysql_thread___protocol_compression_level;
}
return ZSTD_CLEVEL_DEFAULT;
}
static bool decompress_mysql_payload(
const MySQL_Connection* myconn, Bytef* dest, uLongf destLen, const unsigned char* source, size_t sourceLen
) {
if (use_zstd_compression(myconn)) {
const size_t rc = ZSTD_decompress(dest, destLen, source, sourceLen);
return !ZSTD_isError(rc) && rc == destLen;
}
const int rc = uncompress(dest, &destLen, source, sourceLen);
return rc == Z_OK;
}
static bool fallback_to_uncompressed_mysql_payload(
Bytef* dest, uLongf destLen, unsigned int& datalength, const unsigned char* source, size_t sourceLen,
const unsigned char* packet
) {
if (sourceLen > destLen || sourceLen < 3) {
return false;
}
memcpy(dest, source, sourceLen);
datalength = sourceLen;
return packet[9] == 0 && packet[8] == 0 && packet[7] == sourceLen;
}
static bool compress_mysql_payload(
const MySQL_Connection* myconn, Bytef* dest, size_t& destLen, const unsigned char* source, size_t sourceLen
) {
if (use_zstd_compression(myconn)) {
const size_t rc = ZSTD_compress(dest, destLen, source, sourceLen, get_zstd_compression_level(myconn));
if (ZSTD_isError(rc)) {
return false;
}
destLen = rc;
return true;
}
uLongf zlib_dest_len = destLen;
const int rc = compress2(dest, &zlib_dest_len, source, sourceLen, mysql_thread___protocol_compression_level);
if (rc != Z_OK) {
return false;
}
destLen = zlib_dest_len;
return true;
}
#ifdef DEBUG
static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len) {
@ -1300,36 +1363,19 @@ int MySQL_Data_Stream::buffer2array() {
destLen=payload_length;
//dest=(Bytef *)l_alloc(destLen);
dest=(Bytef *)malloc(destLen);
int rc=uncompress(dest, &destLen, _ptr, queueIN.pkt.size-7);
if (rc!=Z_OK) {
const bool decompressed = decompress_mysql_payload(myconn, dest, destLen, _ptr, queueIN.pkt.size-7);
if (!decompressed) {
// for some reason, uncompress failed
// accoding to debugging on #1410 , it seems some library may send uncompress data claiming it is compressed
// we try to assume it is not compressed, and we do some sanity check
memcpy(dest, _ptr, queueIN.pkt.size-7);
datalength=queueIN.pkt.size-7;
// some sanity check now
unsigned char _u;
bool sanity_check = false;
_u = *(u+9);
// 2nd and 3rd bytes are 0
if (_u == 0) {
_u = *(u+8);
if (_u == 0) {
_u = *(u+7);
// 1st byte = size - 7
unsigned int _size = _u ;
if (queueIN.pkt.size-7 == _size) {
sanity_check = true;
}
}
}
if (sanity_check == false) {
if (!fallback_to_uncompressed_mysql_payload(dest, destLen, datalength, _ptr, queueIN.pkt.size-7, u)) {
proxy_error("Unable to uncompress a compressed packet\n");
shut_soft();
return ret;
}
} else {
datalength=payload_length;
}
datalength=payload_length;
// change _ptr to the new buffer
_ptr=dest;
} else {
@ -1417,7 +1463,7 @@ void MySQL_Data_Stream::generate_compressed_packet() {
// this worked in the past . it applies for small packets
uLong sourceLen=total_size;
Bytef *source=(Bytef *)l_alloc(total_size);
uLongf destLen=total_size*120/100+12;
size_t destLen=use_zstd_compression(myconn) ? ZSTD_compressBound(total_size) : total_size*120/100+12;
Bytef *dest=(Bytef *)malloc(destLen);
i=0;
total_size=0;
@ -1428,8 +1474,8 @@ void MySQL_Data_Stream::generate_compressed_packet() {
total_size+=p2.size;
l_free(p2.size,p2.ptr);
}
int rc=compress2(dest, &destLen, source, sourceLen, mysql_thread___protocol_compression_level);
assert(rc==Z_OK);
const bool compressed = compress_mysql_payload(myconn, dest, destLen, source, sourceLen);
assert(compressed);
l_free(total_size, source);
queueOUT.pkt.size=destLen+7;
queueOUT.pkt.ptr=l_alloc(queueOUT.pkt.size);
@ -1448,22 +1494,21 @@ void MySQL_Data_Stream::generate_compressed_packet() {
unsigned int len1=MAX_COMPRESSED_PACKET_SIZE/2;
unsigned int len2=p2.size-len1;
uLongf destLen1;
uLongf destLen2;
size_t destLen1;
size_t destLen2;
Bytef *dest1;
Bytef *dest2;
int rc;
mysql_hdr hdr;
destLen1=len1*120/100+12;
destLen1=use_zstd_compression(myconn) ? ZSTD_compressBound(len1) : len1*120/100+12;
dest1=(Bytef *)malloc(destLen1+7);
destLen2=len2*120/100+12;
destLen2=use_zstd_compression(myconn) ? ZSTD_compressBound(len2) : len2*120/100+12;
dest2=(Bytef *)malloc(destLen2+7);
rc=compress2(dest1+7, &destLen1, (const unsigned char *)p2.ptr, len1, mysql_thread___protocol_compression_level);
assert(rc==Z_OK);
rc=compress2(dest2+7, &destLen2, (const unsigned char *)p2.ptr+len1, len2, mysql_thread___protocol_compression_level);
assert(rc==Z_OK);
const bool compressed1 = compress_mysql_payload(myconn, dest1+7, destLen1, (const unsigned char *)p2.ptr, len1);
assert(compressed1);
const bool compressed2 = compress_mysql_payload(myconn, dest2+7, destLen2, (const unsigned char *)p2.ptr+len1, len2);
assert(compressed2);
hdr.pkt_length=destLen1;
hdr.pkt_id=++myconn->compression_pkt_id;
@ -1532,13 +1577,15 @@ int MySQL_Data_Stream::array2buffer() {
if (DSS==STATE_CLIENT_AUTH_OK && idx == PSarrayOUT->len) {
DSS=STATE_SLEEP;
// enable compression
if (myconn->options.server_capabilities & CLIENT_COMPRESS) {
if (myconn->options.server_capabilities & (CLIENT_COMPRESS | CLIENT_ZSTD_COMPRESSION_ALGORITHM)) {
if (myconn->options.compression_min_length) {
myconn->set_status(true, STATUS_MYSQL_CONNECTION_COMPRESSION);
}
} else {
//explicitly disable compression
myconn->options.compression_min_length=0;
myconn->options.compression_zstd=false;
myconn->options.zstd_compression_level=0;
myconn->set_status(false, STATUS_MYSQL_CONNECTION_COMPRESSION);
}
}

@ -126,15 +126,15 @@ ifeq ($(UNAME_S),Linux)
STATICMYLIBS += -lcoredumper
endif
MYLIBS := -Wl,--export-dynamic $(STATICMYLIBS) -Wl,-Bdynamic -lgnutls -lpthread -lssl -lcrypto -lm -lz -lrt -lprometheus-cpp-pull -lprometheus-cpp-core -luuid $(EXTRALINK)
MYLIBS := -Wl,--export-dynamic $(STATICMYLIBS) -Wl,-Bdynamic -lgnutls -lpthread -lssl -lcrypto -lm -lz -lzstd -lrt -lprometheus-cpp-pull -lprometheus-cpp-core -luuid $(EXTRALINK)
ifeq ($(DISTRO),almalinux)
ifeq ($(CENTOSVER),8)
MYLIBS := -Wl,--export-dynamic $(STATICMYLIBS) -Wl,-Bdynamic -lgnutls -lpthread $(LIB_SSL_PATH) $(LIB_CRYPTO_PATH) -lm -lz -lrt -lprometheus-cpp-pull -lprometheus-cpp-core -luuid $(EXTRALINK)
MYLIBS := -Wl,--export-dynamic $(STATICMYLIBS) -Wl,-Bdynamic -lgnutls -lpthread $(LIB_SSL_PATH) $(LIB_CRYPTO_PATH) -lm -lz -lzstd -lrt -lprometheus-cpp-pull -lprometheus-cpp-core -luuid $(EXTRALINK)
endif
endif
ifeq ($(UNAME_S),Darwin)
MYLIBS :=-lre2 -lmariadbclient -lssl -lcrypto -lpthread -lm -lz -liconv -lgnutls -lprometheus-cpp-pull -lprometheus-cpp-core -luuid
MYLIBS :=-lre2 -lmariadbclient -lssl -lcrypto -lpthread -lm -lz -lzstd -liconv -lgnutls -lprometheus-cpp-pull -lprometheus-cpp-core -luuid
else
CURL_DIR := $(DEPS_PATH)/curl/curl
IDIRS += -L$(CURL_DIR)/include
@ -212,4 +212,3 @@ default: $(EXECUTABLE)
clean:
rm -rf *.pid $(ODIR)/*.o $(ODIR)/*.gcno $(ODIR)/*.gcda *~ core perf.data* heaptrack.proxysql.* $(EXECUTABLE) $(EXECUTABLE).sha1 $(ODIR)

@ -9,6 +9,15 @@ export WORKSPACE="${REPO_ROOT}"
# Default INFRA_ID if not provided
export INFRA_ID="${INFRA_ID:-dev-$USER}"
export INFRA="${INFRA:-${INFRA_TYPE}}"
expand_infra_list() {
local list_path="$1"
while IFS= read -r infra_name; do
[ -n "${infra_name}" ] || continue
eval "printf '%s\n' \"${infra_name}\""
done < "${list_path}"
}
if [ -z "${TAP_GROUP}" ]; then
echo "ERROR: TAP_GROUP is not set."
@ -27,7 +36,7 @@ fi
INFRAS=""
if [ -n "${LST_PATH}" ]; then
INFRAS=$(cat "${LST_PATH}")
INFRAS=$(expand_infra_list "${LST_PATH}")
echo ">>> Found infrastructure requirements for group '${TAP_GROUP}' in '${LST_PATH}'"
else
if [ -n "${INFRA_TYPE}" ]; then

@ -9,6 +9,15 @@ export WORKSPACE="${REPO_ROOT}"
# Default INFRA_ID if not provided
export INFRA_ID="${INFRA_ID:-dev-$USER}"
export INFRA="${INFRA:-${INFRA_TYPE}}"
expand_infra_list() {
local list_path="$1"
while IFS= read -r infra_name; do
[ -n "${infra_name}" ] || continue
eval "printf '%s\n' \"${infra_name}\""
done < "${list_path}"
}
# 1. Determine Required Infras
INFRAS_TO_CHECK=""
@ -16,9 +25,9 @@ BASE_GROUP="${TAP_GROUP%%-g[0-9]*}" # Strip -g1, -g2 etc.
if [ -n "${TAP_GROUP}" ]; then
if [ -f "${WORKSPACE}/test/tap/groups/${TAP_GROUP}/infras.lst" ]; then
INFRAS_TO_CHECK=$(cat "${WORKSPACE}/test/tap/groups/${TAP_GROUP}/infras.lst")
INFRAS_TO_CHECK=$(expand_infra_list "${WORKSPACE}/test/tap/groups/${TAP_GROUP}/infras.lst")
elif [ -f "${WORKSPACE}/test/tap/groups/${BASE_GROUP}/infras.lst" ]; then
INFRAS_TO_CHECK=$(cat "${WORKSPACE}/test/tap/groups/${BASE_GROUP}/infras.lst")
INFRAS_TO_CHECK=$(expand_infra_list "${WORKSPACE}/test/tap/groups/${BASE_GROUP}/infras.lst")
fi
fi

@ -40,7 +40,7 @@ relay_log_recovery=ON
#source_info_repository=TABLE
read_only=0
log-error=/var/log/mysql/error.log
log-error=/var/lib/mysql/error.log
#log_warnings=2
#ssl-ca=/var/lib/mysql/ca.pem

@ -40,7 +40,7 @@ relay_log_recovery=ON
#source_info_repository=TABLE
read_only=1
log-error=/var/log/mysql/error.log
log-error=/var/lib/mysql/error.log
#log_warnings=2
#ssl-ca=/var/lib/mysql/ca.pem

@ -40,7 +40,7 @@ relay_log_recovery=ON
#source_info_repository=TABLE
read_only=1
log-error=/var/log/mysql/error.log
log-error=/var/lib/mysql/error.log
#log_warnings=2
#ssl-ca=/var/lib/mysql/ca.pem

@ -92,11 +92,12 @@ for RAW_PATH in ${MOUNTED_PATHS}; do
fi
done
# 3. Inject dynamic variables into Orchestrator configs
# 3. Inject dynamic values into Orchestrator configs
if [ -d "./conf/orchestrator" ]; then
echo "Patching Orchestrator configurations..."
find ./conf/orchestrator -name "orchestrator.conf.json" -exec sed -i "s/\"MySQLTopologyPassword\": \".*\"/\"MySQLTopologyPassword\": \"${ROOT_PASSWORD}\"/g" {} +
find ./conf/orchestrator -name "orchestrator.conf.json" -exec sed -i "s/\${INFRA}/${INFRA}/g" {} +
echo "Injecting ROOT_PASSWORD and INFRA into Orchestrator configurations..."
find ./conf/orchestrator -name "orchestrator.conf.json" -exec sed -i \
-e "s/\"MySQLTopologyPassword\": \".*\"/\"MySQLTopologyPassword\": \"${ROOT_PASSWORD}\"/g" \
-e "s/\${INFRA}/${INFRA}/g" {} +
fi
# 4. TRANSIENT SSL SETUP (Avoiding repo permission changes)

@ -64,7 +64,7 @@ OBJ := $(PROXYSQL_PATH)/src/obj/proxysql_global.o $(PROXYSQL_PATH)/src/obj/main.
MYLIBS_DYNAMIC_PART := -Wl,--export-dynamic -Wl,-Bdynamic -lgnutls -lcpp_dotenv -lcurl -lssl -lcrypto -luuid
MYLIBS_STATIC_PART := -Wl,-Bstatic -lconfig -lproxysql -ldaemon -lconfig++ -lre2 -lpcrecpp -lpcre -lmariadbclient -lhttpserver -lmicrohttpd -linjection -lev -lprometheus-cpp-pull -lprometheus-cpp-core
MYLIBS_PG_PART := -Wl,-Bstatic -lpq -lpgcommon -lpgport
MYLIBS_LAST_PART := -Wl,-Bdynamic -lpthread -lm -lz -lrt -ldl $(EXTRALINK)
MYLIBS_LAST_PART := -Wl,-Bdynamic -lpthread -lm -lz -lzstd -lrt -ldl $(EXTRALINK)
MYLIBS := -Wl,-Bdynamic -ltap $(MYLIBS_DYNAMIC_PART) $(MYLIBS_STATIC_PART) $(MYLIBS_PG_PART) $(MYLIBS_LAST_PART)
#MYLIBS_PG := $(MYLIBS_DYNAMIC_PART) $(MYLIBS_STATIC_PART) $(MYLIBS_PG_PART) $(MYLIBS_LAST_PART)
#MYLIBS := -Wl,--export-dynamic -Wl,-Bdynamic -lssl -lcrypto -lgnutls -ltap -lcpp_dotenv -Wl,-Bstatic -lconfig -lproxysql -ldaemon -lconfig++ -lre2 -lpcrecpp -lpcre -lmariadbclient -lhttpserver -lmicrohttpd -linjection -lev -lprometheus-cpp-pull -lprometheus-cpp-core -luuid -Wl,-Bdynamic -lpthread -lm -lz -lrt -ldl $(EXTRALINK)
@ -93,7 +93,7 @@ default: all
CUSTOMARGS := -I$(TAP_IDIR) -I$(CURL_IDIR) -I$(SQLITE3_IDIR) -I$(PROXYSQL_IDIR) -I$(JSON_IDIR) -I$(RE2_IDIR) -I$(SSL_IDIR)
CUSTOMARGS += -L$(TAP_LDIR) -L$(CURL_LDIR) -L$(RE2_LDIR) -L$(SSL_LDIR) -L$(POSTGRESQL_LDIR)
CUSTOMARGS += -Wl,-Bdynamic -ltap -lcpp_dotenv -lcurl -lssl -lcrypto -lre2 -lpthread -lz -ldl -lpq $(LWGCOV)
CUSTOMARGS += -Wl,-Bdynamic -ltap -lcpp_dotenv -lcurl -lssl -lcrypto -lre2 -lpthread -lz -lzstd -ldl -lpq $(LWGCOV)
.PHONY: all
all: tests

@ -24,9 +24,18 @@ int main(int argc, char** argv) {
return -1;
}
plan(1);
const std::string mysql_client = "mysql";
std::string help_output {};
const int help_res = execvp(mysql_client, { "mysql", "--help" }, help_output);
const bool mysql_supports_zstd =
help_res == 0
&&
help_output.find("compression-algorithms") != std::string::npos
&&
help_output.find("zstd-compression-level") != std::string::npos;
plan(mysql_supports_zstd ? 2 : 1);
const std::string name = std::string("-u") + cl.username;
const std::string pass = std::string("-p") + cl.password;
const std::string tg_port = std::string("-P") + std::to_string(cl.port);
@ -37,5 +46,17 @@ int main(int argc, char** argv) {
int query_res = execvp(mysql_client, cargs, result);
ok(query_res == 0 && result != "", "Compressed query should be executed correctly.");
if (mysql_supports_zstd) {
const std::vector<const char*> zstd_args = {
"mysql", name.c_str(), pass.c_str(), "-h", cl.host, tg_port.c_str(),
"--compression-algorithms=zstd", "--zstd-compression-level=3", "-e", "select 1"
};
result.clear();
query_res = execvp(mysql_client, zstd_args, result);
ok(query_res == 0 && result != "", "ZSTD compressed query should be executed correctly.");
} else {
diag("Skipping ZSTD CLI coverage because the local mysql client does not expose zstd compression options.");
}
return exit_status();
}

Loading…
Cancel
Save