Refactored and Optimized Query Cache with PgSQL Support

* Added Query Cache Support for PgSQL:

	* Introduced new variables for PgSQL Query Cache management:
		* pgsql-query_cache_size_MB
		* pgsql-query_cache_soft_ttl_pct
		* pgsql-query_cache_stores_empty_result
		* pgsql-query_cache_handle_warnings (not yet utilized)
	* Overwriting Transaction Status:
		* Sending current transaction state to the client when a result is
 returned from the query cache.
		* This currently does not differentiate between normal and error
transaction states—both are treated as active transaction states.

* Cache Lifetime Management:
	Replaced manual reference counting with std::shared_ptr to efficiently manage the lifecycle of cache entries,
ensuring proper memory deallocation and reducing complexity.

* Unified Purging Logic:
	Refactored the query cache purging logic to use a single thread for both
MySQL and PgSQL cache entries.

* Code Cleanup:
	Removed unnecessary methods and data members to streamline the codebase
and improve maintainability.

* Performance Optimization:
	Improved memory management, reducing memory footprint and enhancing
performance.
pull/4703/head
Rahim Kanji 2 years ago
parent 391e4867fe
commit 720441af20

@ -0,0 +1,26 @@
#ifndef __CLASS_MYSQL_QUERY_CACHE_H
#define __CLASS_MYSQL_QUERY_CACHE_H
#include "proxysql.h"
#include "cpp.h"
#include "query_cache.hpp"
typedef struct _MySQL_QC_entry : public QC_entry_t {
uint32_t column_eof_pkt_offset;
uint32_t row_eof_pkt_offset;
uint32_t ok_pkt_offset;
} MySQL_QC_entry_t;
class MySQL_Query_Cache : public Query_Cache<MySQL_Query_Cache> {
public:
MySQL_Query_Cache() = default;
~MySQL_Query_Cache() = default;
bool set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, uint32_t vl,
uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms, bool deprecate_eof_active);
unsigned char* get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl, uint32_t* lv,
uint64_t curtime_ms, uint64_t cache_ttl, bool deprecate_eof_active);
//void* purgeHash_thread(void*);
};
#endif /* __CLASS_MYSQL_QUERY_CACHE_H */

@ -108,8 +108,8 @@ public:
FixedSizeQueue data_packets_history_IN;
FixedSizeQueue data_packets_history_OUT;
//PtrSizeArray *PSarrayOUTpending;
PtrSizeArray* resultset;
unsigned int resultset_length;
//PtrSizeArray* resultset;
//unsigned int resultset_length;
ProxySQL_Poll<PgSQL_Data_Stream>* mypolls;
//int listener;
@ -201,8 +201,9 @@ public:
void check_data_flow();
int assign_fd_from_mysql_conn();
unsigned char* resultset2buffer(bool);
void buffer2resultset(unsigned char*, unsigned int);
static unsigned char* copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length, bool del);
static void copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size,
char current_transaction_state);
// safe way to attach a PgSQL Connection
void attach_connection(PgSQL_Connection* mc) {

@ -0,0 +1,22 @@
#ifndef __CLASS_PGSQL_QUERY_CACHE_H
#define __CLASS_PGSQL_QUERY_CACHE_H
#include "proxysql.h"
#include "cpp.h"
#include "query_cache.hpp"
typedef struct _PgSQL_QC_entry : public QC_entry_t {} PgSQL_QC_entry_t;
class PgSQL_Query_Cache : public Query_Cache<PgSQL_Query_Cache> {
public:
PgSQL_Query_Cache() = default;
~PgSQL_Query_Cache() = default;
bool set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, uint32_t vl,
uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms);
const std::shared_ptr<PgSQL_QC_entry_t> get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl,
uint64_t curtime_ms, uint64_t cache_ttl);
//void* purgeHash_thread(void*);
};
#endif /* __CLASS_PGSQL_QUERY_CACHE_H */

@ -10,7 +10,7 @@
#include "PgSQL_Backend.h"
#include "ProxySQL_Poll.h"
//#include "MySQL_Data_Stream.h"
#include "query_cache.hpp"
//#include "MySQL_Query_Cache.h"
#include "mysql_connection.h"
#include "sqlite3db.h"
//#include "StatCounters.h"

@ -706,7 +706,8 @@ class SimpleKV;
class AdvancedKV;
template <class T>
class ProxySQL_Poll;
class Query_Cache;
class MySQL_Query_Cache;
class PgSQL_Query_Cache;
class MySQL_Authentication;
class MySQL_Connection;
class PgSQL_Connection;
@ -1095,6 +1096,10 @@ __thread int pgsql_thread___monitor_threads;
__thread char* pgsql_thread___monitor_username;
__thread char* pgsql_thread___monitor_password;
// PgSQL Query Cache
__thread int pgsql_thread___query_cache_size_MB;
__thread int pgsql_thread___query_cache_soft_ttl_pct;
__thread int pgsql_thread___query_cache_handle_warnings;
//---------------------------
__thread char *mysql_thread___default_schema;
@ -1382,6 +1387,10 @@ extern __thread int pgsql_thread___monitor_threads;
extern __thread char* pgsql_thread___monitor_username;
extern __thread char* pgsql_thread___monitor_password;
// PgSQL Query Cache
extern __thread int pgsql_thread___query_cache_size_MB;
extern __thread int pgsql_thread___query_cache_soft_ttl_pct;
extern __thread int pgsql_thread___query_cache_handle_warnings;
//---------------------------
extern __thread char *mysql_thread___default_schema;

@ -1,9 +1,9 @@
#ifndef __CLASS_QUERY_CACHE_H
#define __CLASS_QUERY_CACHE_H
#include "proxysql.h"
#include "cpp.h"
#include <tuple>
#include "prometheus/counter.h"
#include "prometheus/gauge.h"
#define EXPIRE_DROPIT 0
#define SHARED_QUERY_CACHE_HASH_TABLES 32
@ -13,30 +13,6 @@
#define DEFAULT_purge_threshold_pct_min 3
#define DEFAULT_purge_threshold_pct_max 90
#include "prometheus/counter.h"
#include "prometheus/gauge.h"
class KV_BtreeArray;
typedef struct __QC_entry_t QC_entry_t;
struct __QC_entry_t {
uint64_t key; // primary key
char *value; // pointer to value
KV_BtreeArray *kv; // pointer to the KV_BtreeArray where the entry is stored
QC_entry_t *self; // pointer to itself
uint32_t klen; // length of the key : FIXME: not sure if still relevant
uint32_t length; // length of the value
unsigned long long create_ms; // when the entry was created, monotonic, millisecond granularity
unsigned long long expire_ms; // when the entry will expire, monotonic , millisecond granularity
unsigned long long access_ms; // when the entry was read last , monotonic , millisecond granularity
bool refreshing; // true when a client will hit the backend to refresh the entry
uint32_t column_eof_pkt_offset = 0;
uint32_t row_eof_pkt_offset = 0;
uint32_t ok_pkt_offset = 0;
uint32_t ref_count; // reference counter
};
struct p_qc_counter {
enum metric {
query_cache_count_get = 0,
@ -65,34 +41,67 @@ struct qc_metrics_map_idx {
};
class KV_BtreeArray;
class MySQL_Query_Cache;
class PgSQL_Query_Cache;
struct _MySQL_QC_entry;
struct _PgSQL_QC_entry;
typedef struct _MySQL_QC_entry MySQL_QC_entry_t;
typedef struct _PgSQL_QC_entry PgSQL_QC_entry_t;
typedef struct _QC_entry {
uint64_t key; // primary key
unsigned char *value; // pointer to value
uint32_t length; // length of the value
uint32_t klen; // length of the key : FIXME: not sure if still relevant
uint64_t create_ms; // when the entry was created, monotonic, millisecond granularity
uint64_t expire_ms; // when the entry will expire, monotonic , millisecond granularity
uint64_t access_ms; // when the entry was read last , monotonic , millisecond granularity
bool refreshing; // true when a client will hit the backend to refresh the entry
KV_BtreeArray* kv; // pointer to the KV_BtreeArray where the entry is stored (used for troubleshooting)
//struct _QC_entry* self; // pointer to itself
} QC_entry_t;
template <typename QC_DERIVED>
class Query_Cache {
private:
KV_BtreeArray * KVs[SHARED_QUERY_CACHE_HASH_TABLES];
uint64_t get_data_size_total();
unsigned int current_used_memory_pct();
struct {
std::array<prometheus::Counter*, p_qc_counter::__size> p_counter_array {};
std::array<prometheus::Gauge*, p_qc_gauge::__size> p_gauge_array {};
} metrics;
public:
static_assert(std::is_same_v<QC_DERIVED,MySQL_Query_Cache> || std::is_same_v<QC_DERIVED,PgSQL_Query_Cache>,
"Invalid QC_DERIVED Query Cache type");
using TypeQCEntry = typename std::conditional<std::is_same_v<QC_DERIVED, MySQL_Query_Cache>,
MySQL_QC_entry_t, PgSQL_QC_entry_t>::type;
public:
static bool shutting_down;
static pthread_t purge_thread_id;
constexpr static unsigned int purge_loop_time = DEFAULT_purge_loop_time;
void print_version();
uint64_t flush();
void p_update_metrics();
void * purgeHash_thread(void *);
int size;
int shutdown;
unsigned long long QCnow_ms;
pthread_t purge_thread_id;
unsigned int purge_loop_time;
unsigned int purge_total_time;
unsigned int purge_threshold_pct_min;
unsigned int purge_threshold_pct_max;
uint64_t max_memory_size;
SQLite3_result* SQL3_getStats();
void purgeHash(uint64_t max_memory_size);
protected:
Query_Cache();
~Query_Cache();
void print_version();
bool set(uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp, uint32_t vl, unsigned long long create_ms, unsigned long long curtime_ms, unsigned long long expire_ms, bool deprecate_eof_active);
unsigned char * get(uint64_t , const unsigned char *, const uint32_t, uint32_t *, unsigned long long, unsigned long long, bool deprecate_eof_active);
uint64_t flush();
SQLite3_result * SQL3_getStats();
bool set(QC_entry_t* entry, uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp,
uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms);
std::shared_ptr<QC_entry_t> get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl,
uint64_t curtime_ms, uint64_t cache_ttl);
constexpr static unsigned int purge_total_time = DEFAULT_purge_total_time;
constexpr static unsigned int purge_threshold_pct_min = DEFAULT_purge_threshold_pct_min;
constexpr static unsigned int purge_threshold_pct_max = DEFAULT_purge_threshold_pct_max;
//uint64_t max_memory_size;
private:
KV_BtreeArray* KVs[SHARED_QUERY_CACHE_HASH_TABLES];
uint64_t get_data_size_total();
unsigned int current_used_memory_pct(uint64_t max_memory_size);
void purgeHash(uint64_t QCnow_ms, unsigned int curr_pct);
struct {
std::array<prometheus::Counter*, p_qc_counter::__size> p_counter_array{};
std::array<prometheus::Gauge*, p_qc_gauge::__size> p_gauge_array{};
} metrics;
};
#endif /* __CLASS_QUERY_CACHE_H */
#endif /* __CLASS_QUERY_CACHE_H */

@ -148,7 +148,7 @@ struct cpu_timer
extern int admin_load_main_;
extern bool admin_nostart_;
extern Query_Cache *GloQC;
//extern MySQL_Query_Cache *GloMyQC;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication *GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;

@ -125,7 +125,7 @@ extern char * proxysql_version;
#include "proxysql_find_charset.h"
extern Query_Cache *GloQC;
//extern MySQL_Query_Cache *GloMyQC;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication *GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;

@ -77,6 +77,8 @@ using json = nlohmann::json;
#include <uuid/uuid.h>
#include "PgSQL_Protocol.h"
#include "MySQL_Query_Cache.h"
#include "PgSQL_Query_Cache.h"
//#include "usual/time.h"
using std::string;
@ -135,7 +137,8 @@ extern bool admin_proxysql_pgsql_paused;
extern int admin_old_wait_timeout;
extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern PgSQL_Query_Cache* GloPgQC;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication *GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;
@ -653,8 +656,31 @@ bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_
if (query_no_space_length==strlen("PROXYSQL FLUSH QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH QUERY CACHE",query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL FLUSH QUERY CACHE command\n");
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
if (GloQC) {
GloQC->flush();
if (GloMyQC) {
GloMyQC->flush();
}
//if (GloPgQC) {
// GloPgQC->flush();
//}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}
if (query_no_space_length == strlen("PROXYSQL FLUSH MYSQL QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH MYSQL QUERY CACHE", query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL FLUSH MYSQL QUERY CACHE command\n");
ProxySQL_Admin* SPA = (ProxySQL_Admin*)pa;
if (GloMyQC) {
GloMyQC->flush();
}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}
if (query_no_space_length == strlen("PROXYSQL FLUSH PGSQL QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH PGSQL QUERY CACHE", query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL FLUSH PGSQL QUERY CACHE command\n");
ProxySQL_Admin* SPA = (ProxySQL_Admin*)pa;
if (GloPgQC) {
GloPgQC->flush();
}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;

@ -379,7 +379,7 @@ bool Base_Thread::set_backend_to_be_skipped_if_frontend_is_slow(DS * myds, unsig
if constexpr (std::is_same_v<T, PgSQL_Thread>) {
unsigned int buffered_data = 0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * PGSQL_RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN;
//buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 4) {
thr->mypolls.fds[n].events = 0;
return true;

@ -441,7 +441,7 @@ static char *s_strdup(char *s) {
static int __ClickHouse_Server_refresh_interval=1000;
extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern ClickHouse_Authentication *GloClickHouseAuth;
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Query_Processor* GloMyQPro;

@ -147,7 +147,8 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo
Base_Session.oo Base_Thread.oo \
proxy_protocol_info.oo \
proxysql_find_charset.oo ProxySQL_Poll.oo \
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo PgSQL_Monitor.oo
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo \
MySQL_Query_Cache.oo PgSQL_Query_Cache.oo PgSQL_Monitor.oo
OBJ_CXX := $(patsubst %,$(ODIR)/%,$(_OBJ_CXX))
HEADERS := ../include/*.h ../include/*.hpp

@ -0,0 +1,316 @@
#include "proxysql.h"
#include "cpp.h"
#include "MySQL_Protocol.h"
#include "MySQL_Query_Cache.h"
extern MySQL_Threads_Handler* GloMTH;
const int eof_to_ok_dif = static_cast<const int>(-(sizeof(mysql_hdr) + 5) + 2);
const int ok_to_eof_dif = static_cast<const int>(+(sizeof(mysql_hdr) + 5) - 2);
/**
* @brief Converts a 'EOF_Packet' to holded inside a 'QC_entry_t' into a 'OK_Packet'.
* Warning: This function assumes that the supplied 'QC_entry_t' holds a valid
* 'EOF_Packet'.
*
* @param entry The 'QC_entry_t' holding a 'OK_Packet' to be converted into
* a 'EOF_Packet'.
* @return The converted packet.
*/
unsigned char* eof_to_ok_packet(const MySQL_QC_entry_t* entry) {
unsigned char* result = (unsigned char*)malloc(entry->length + eof_to_ok_dif);
unsigned char* vp = result;
unsigned char* it = entry->value;
// Copy until the first EOF
memcpy(vp, entry->value, entry->column_eof_pkt_offset);
it += entry->column_eof_pkt_offset;
vp += entry->column_eof_pkt_offset;
// Skip the first EOF after columns def
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + hdr.pkt_length;
// Copy all the rows
uint64_t u_entry_val = reinterpret_cast<uint64_t>(entry->value);
uint64_t u_it_pos = reinterpret_cast<uint64_t>(it);
uint64_t rows_length = (u_entry_val + entry->row_eof_pkt_offset) - u_it_pos;
memcpy(vp, it, rows_length);
vp += rows_length;
it += rows_length;
// Replace final EOF in favor of OK packet
// =======================================
// Copy the mysql header
memcpy(&hdr, it, sizeof(mysql_hdr));
hdr.pkt_length = 7;
memcpy(vp, &hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
it += sizeof(mysql_hdr);
// OK packet header
*vp = 0xfe;
vp++;
it++;
// Initialize affected_rows and last_insert_id to zero
memset(vp, 0, 2);
vp += 2;
// Extract warning flags and status from 'EOF_packet'
unsigned char* eof_packet = entry->value + entry->row_eof_pkt_offset;
eof_packet += sizeof(mysql_hdr);
// Skip the '0xFE EOF packet header'
eof_packet += 1;
uint16_t warnings;
memcpy(&warnings, eof_packet, sizeof(uint16_t));
eof_packet += 2;
uint16_t status_flags;
memcpy(&status_flags, eof_packet, sizeof(uint16_t));
// Copy warnings an status flags
memcpy(vp, &status_flags, sizeof(uint16_t));
vp += 2;
memcpy(vp, &warnings, sizeof(uint16_t));
// =======================================
// Decrement ids after the first EOF
unsigned char* dp = result + entry->column_eof_pkt_offset;
mysql_hdr decrement_hdr;
for (;;) {
memcpy(&decrement_hdr, dp, sizeof(mysql_hdr));
decrement_hdr.pkt_id--;
memcpy(dp, &decrement_hdr, sizeof(mysql_hdr));
dp += sizeof(mysql_hdr) + decrement_hdr.pkt_length;
if (dp >= vp)
break;
}
return result;
}
/**
* @brief Converts a 'OK_Packet' holded inside 'QC_entry_t' into a 'EOF_Packet'.
* Warning: This function assumes that the supplied 'QC_entry_t' holds a valid
* 'OK_Packet'.
*
* @param entry The 'QC_entry_t' holding a 'EOF_Packet' to be converted into
* a 'OK_Packet'.
* @return The converted packet.
*/
unsigned char* ok_to_eof_packet(const MySQL_QC_entry_t* entry) {
unsigned char* result = (unsigned char*)malloc(entry->length + ok_to_eof_dif);
unsigned char* vp = result;
unsigned char* it = entry->value;
// Extract warning flags and status from 'OK_packet'
unsigned char* ok_packet = it + entry->ok_pkt_offset;
mysql_hdr ok_hdr;
memcpy(&ok_hdr, ok_packet, sizeof(mysql_hdr));
ok_packet += sizeof(mysql_hdr);
// Skip the 'OK packet header', 'affected_rows' and 'last_insert_id'
ok_packet += 3;
uint16_t status_flags;
memcpy(&status_flags, ok_packet, sizeof(uint16_t));
ok_packet += 2;
uint16_t warnings;
memcpy(&warnings, ok_packet, sizeof(uint16_t));
// Find the spot in which the first EOF needs to be placed
it += sizeof(mysql_hdr);
uint64_t c_count = 0;
int c_count_len = mysql_decode_length(reinterpret_cast<unsigned char*>(it), &c_count);
it += c_count_len;
mysql_hdr column_hdr;
for (uint64_t i = 0; i < c_count; i++) {
memcpy(&column_hdr, it, sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + column_hdr.pkt_length;
}
// Location for 'column_eof'
uint64_t column_eof_offset =
reinterpret_cast<unsigned char*>(it) -
reinterpret_cast<unsigned char*>(entry->value);
memcpy(vp, entry->value, column_eof_offset);
vp += column_eof_offset;
// Write 'column_eof_packet' header
column_hdr.pkt_id = column_hdr.pkt_id + 1;
column_hdr.pkt_length = 5;
memcpy(vp, &column_hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
// Write 'column_eof_packet' contents
*vp = 0xfe;
vp++;
memcpy(vp, &warnings, sizeof(uint16_t));
vp += 2;
memcpy(vp, &status_flags, sizeof(uint16_t));
vp += 2;
// Find the OK packet
for (;;) {
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
unsigned char* payload =
reinterpret_cast<unsigned char*>(it) +
sizeof(mysql_hdr);
if (hdr.pkt_length < 9 && *payload == 0xfe) {
mysql_hdr ok_hdr;
ok_hdr.pkt_id = hdr.pkt_id + 1;
ok_hdr.pkt_length = 5;
memcpy(vp, &ok_hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
*vp = 0xfe;
vp++;
memcpy(vp, &warnings, sizeof(uint16_t));
vp += 2;
memcpy(vp, &status_flags, sizeof(uint16_t));
break;
}
else {
// Increment the package id by one due to 'column_eof_packet'
hdr.pkt_id += 1;
memcpy(vp, &hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
it += sizeof(mysql_hdr);
memcpy(vp, it, hdr.pkt_length);
vp += hdr.pkt_length;
it += hdr.pkt_length;
}
}
return result;
}
bool MySQL_Query_Cache::set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp,
uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms, bool deprecate_eof_active) {
MySQL_QC_entry_t* entry = (MySQL_QC_entry_t*)malloc(sizeof(MySQL_QC_entry_t));
entry->column_eof_pkt_offset = 0;
entry->row_eof_pkt_offset = 0;
entry->ok_pkt_offset = 0;
// Find the first EOF location
unsigned char* it = vp;
it += sizeof(mysql_hdr);
uint64_t c_count = 0;
int c_count_len = mysql_decode_length(const_cast<unsigned char*>(it), &c_count);
it += c_count_len;
for (uint64_t i = 0; i < c_count; i++) {
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + hdr.pkt_length;
}
if (deprecate_eof_active == false) {
// Store EOF position and jump to rows
entry->column_eof_pkt_offset = it - vp;
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + hdr.pkt_length;
}
// Find the second EOF location or the OK packet
for (;;) {
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
unsigned char* payload = it + sizeof(mysql_hdr);
if (hdr.pkt_length < 9 && *payload == 0xfe) {
if (deprecate_eof_active) {
entry->ok_pkt_offset = it - vp;
// Reset the warning flags to zero before storing resultset in the cache
// Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS".
// However, when retrieving data from the cache, it's possible that there are no warnings present
// that might be associated with previous interactions.
unsigned char* payload_temp = payload + 1;
// skip affected_rows
payload_temp += mysql_decode_length(payload_temp, nullptr);
// skip last_insert_id
payload_temp += mysql_decode_length(payload_temp, nullptr);
// skip stats_flags
payload_temp += sizeof(uint16_t);
uint16_t warnings = 0;
memcpy(payload_temp, &warnings, sizeof(uint16_t));
}
else {
entry->row_eof_pkt_offset = it - vp;
// Reset the warning flags to zero before storing resultset in the cache
// Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS".
// However, when retrieving data from the cache, it's possible that there are no warnings present
// that might be associated with previous interactions.
uint16_t warnings = 0;
memcpy((payload + 1), &warnings, sizeof(uint16_t));
}
break;
}
else {
it += sizeof(mysql_hdr) + hdr.pkt_length;
}
}
return Query_Cache::set(entry, user_hash, kp, kl, vp, vl, create_ms, curtime_ms, expire_ms);
}
unsigned char* MySQL_Query_Cache::get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl, uint32_t* lv,
uint64_t curtime_ms, uint64_t cache_ttl, bool deprecate_eof_active) {
unsigned char* result = NULL;
std::shared_ptr<MySQL_QC_entry_t> entry_shared = std::static_pointer_cast<MySQL_QC_entry_t>(
Query_Cache::get(user_hash, kp, kl, curtime_ms, cache_ttl)
);
if (entry_shared) {
if (deprecate_eof_active && entry_shared->column_eof_pkt_offset) {
result = eof_to_ok_packet(entry_shared.get());
*lv = entry_shared->length + eof_to_ok_dif;
}
else if (!deprecate_eof_active && entry_shared->ok_pkt_offset) {
result = ok_to_eof_packet(entry_shared.get());
*lv = entry_shared->length + ok_to_eof_dif;
}
else {
result = (unsigned char*)malloc(entry_shared->length);
memcpy(result, entry_shared->value, entry_shared->length);
*lv = entry_shared->length;
}
//__sync_fetch_and_sub(&entry->ref_count, 1);
}
return result;
}
/*void* MySQL_Query_Cache::purgeHash_thread(void*) {
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread* mysql_thr = new MySQL_Thread();
MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version();
set_thread_name("MyQCPurge");
mysql_thr->refresh_variables();
max_memory_size = static_cast<uint64_t>(mysql_thread___query_cache_size_MB*1024ULL*1024ULL);
while (shutting_down == false) {
usleep(purge_loop_time);
unsigned int glover = GloMTH->get_global_version();
if (GloMTH) {
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover;
mysql_thr->refresh_variables();
max_memory_size = static_cast<uint64_t>(mysql_thread___query_cache_size_MB*1024ULL*1024ULL);
}
}
const unsigned int curr_pct = current_used_memory_pct();
if (curr_pct < purge_threshold_pct_min) continue;
Query_Cache::purgeHash((monotonic_time()/1000ULL), curr_pct);
}
delete mysql_thr;
return NULL;
}*/

@ -22,7 +22,7 @@ using json = nlohmann::json;
#include "SQLite3_Server.h"
#include "MySQL_Variables.h"
#include "ProxySQL_Cluster.hpp"
#include "MySQL_Query_Cache.h"
#include "libinjection.h"
#include "libinjection_sqli.h"
@ -333,7 +333,7 @@ __exit_kill_query_thread:
}
extern MySQL_Query_Processor* GloMyQPro;
extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Threads_Handler *GloMTH;
@ -6810,7 +6810,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
if (qpo->cache_ttl>0 && ((prepare_stmt_type & ps_type_prepare_stmt) == 0)) {
bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF;
uint32_t resbuf=0;
unsigned char *aa=GloQC->get(
unsigned char *aa= GloMyQC->get(
client_myds->myconn->userinfo->hash,
(const unsigned char *)CurrentQuery.QueryPointer ,
CurrentQuery.QueryLength ,
@ -7254,18 +7254,18 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My
unsigned char *aa=client_myds->resultset2buffer(false);
while (client_myds->resultset->len) client_myds->resultset->remove_index(client_myds->resultset->len-1,NULL);
bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF;
GloQC->set(
GloMyQC->set(
client_myds->myconn->userinfo->hash ,
(const unsigned char *)CurrentQuery.QueryPointer,
CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
aa ,
aa , // Query Cache now have the ownership, no need to free it
client_myds->resultset_length ,
thread->curtime/1000 ,
thread->curtime/1000 ,
thread->curtime/1000 + qpo->cache_ttl,
deprecate_eof_active
);
l_free(client_myds->resultset_length,aa);
//l_free(client_myds->resultset_length,aa);
client_myds->resultset_length=0;
}
}
@ -7491,7 +7491,7 @@ void MySQL_Session::Memory_Stats() {
unsigned long long internal=0;
internal+=sizeof(MySQL_Session);
if (qpo)
internal+=sizeof(Query_Processor_Output);
internal+=sizeof(MySQL_Query_Processor_Output);
if (client_myds) {
internal+=sizeof(MySQL_Data_Stream);
if (client_myds->queueIN.buffer)

@ -1759,7 +1759,7 @@ handler_again:
myds->sess->status != SHOW_WARNINGS*/) { // see issue#4072
unsigned int buffered_data = 0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * PGSQL_RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN;
//buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 8) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232
break;

@ -292,13 +292,13 @@ PgSQL_Data_Stream::PgSQL_Data_Stream() {
kill_type = 0;
connect_tries = 0;
poll_fds_idx = -1;
resultset_length = 0;
//resultset_length = 0;
revents = 0;
PSarrayIN = NULL;
PSarrayOUT = NULL;
resultset = NULL;
//resultset = NULL;
queue_init(queueIN, QUEUE_T_DEFAULT_SIZE);
queue_init(queueOUT, QUEUE_T_DEFAULT_SIZE);
mybe = NULL;
@ -374,13 +374,13 @@ PgSQL_Data_Stream::~PgSQL_Data_Stream() {
}
delete PSarrayOUT;
}
if (resultset) {
/*if (resultset) {
while (resultset->len) {
resultset->remove_index_fast(0, &pkt);
l_free(pkt.size, pkt.ptr);
}
delete resultset;
}
}*/
if (mypolls) mypolls->remove_index_fast(poll_fds_idx);
@ -438,7 +438,7 @@ void PgSQL_Data_Stream::init() {
if (PSarrayIN == NULL) PSarrayIN = new PtrSizeArray();
if (PSarrayOUT == NULL) PSarrayOUT = new PtrSizeArray();
// if (PSarrayOUTpending==NULL) PSarrayOUTpending= new PtrSizeArray();
if (resultset == NULL) resultset = new PtrSizeArray();
//if (resultset == NULL) resultset = new PtrSizeArray();
if (unlikely(GloVars.global.data_packets_history_size)) {
data_packets_history_IN.set_max_size(GloVars.global.data_packets_history_size);
@ -1174,7 +1174,8 @@ __exit_array2buffer:
return ret;
}
unsigned char* PgSQL_Data_Stream::resultset2buffer(bool del) {
unsigned char* PgSQL_Data_Stream::copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length,
bool del) {
unsigned int i;
unsigned int l = 0;
unsigned char* mybuff = (unsigned char*)l_alloc(resultset_length);
@ -1188,52 +1189,75 @@ unsigned char* PgSQL_Data_Stream::resultset2buffer(bool del) {
return mybuff;
};
void PgSQL_Data_Stream::buffer2resultset(unsigned char* ptr, unsigned int size) {
unsigned char* __ptr = ptr;
mysql_hdr hdr;
unsigned int l;
void* buff = NULL;
unsigned int bl;
unsigned int bf;
while (__ptr < ptr + size) {
memcpy(&hdr, __ptr, sizeof(mysql_hdr));
l = hdr.pkt_length + sizeof(mysql_hdr); // amount of space we need
if (buff) {
if (bf < l) {
// we ran out of space
resultset->add(buff, bl - bf);
buff = NULL;
static inline uint32_t get_uint32(const unsigned char* ptr) {
return (static_cast<uint32_t>(ptr[0]) << 24) |
(static_cast<uint32_t>(ptr[1]) << 16) |
(static_cast<uint32_t>(ptr[2]) << 8) |
static_cast<uint32_t>(ptr[3]);
}
void PgSQL_Data_Stream::copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size,
char current_transaction_state) {
unsigned char* current_ptr = ptr;
unsigned char* buffer = NULL;
uint32_t data_len;
uint32_t buffer_len;
uint32_t remaining_space;
while (current_ptr < ptr + size) {
uint8_t packet_type = *current_ptr; // read packet type (unused in the code)
// Read 4-byte length
/*unsigned int a = current_ptr[read_pos++];
unsigned int b = current_ptr[read_pos++];
unsigned int c = current_ptr[read_pos++];
unsigned int d = current_ptr[read_pos++];
const uint32_t packet_length = (a << 24) | (b << 16) | (c << 8) | d;
*/
data_len = get_uint32(current_ptr + 1 /*skip type*/) + 1; // total space needed, including type
// Handle buffer space
if (buffer) {
if (remaining_space < data_len) {
// Insufficient space, add buffer to resultset
resultset->add(buffer, buffer_len - remaining_space);
buffer = NULL;
}
}
if (buff == NULL) {
if (__ptr + RESULTSET_BUFLEN_DS_1M <= ptr + size) {
bl = RESULTSET_BUFLEN_DS_1M;
// Allocate new buffer if needed
if (buffer == NULL) {
// Set buffer size depending on available space
if (current_ptr + RESULTSET_BUFLEN_DS_1M <= ptr + size) {
buffer_len = RESULTSET_BUFLEN_DS_1M;
}
else {
bl = RESULTSET_BUFLEN_DS_16K;
buffer_len = RESULTSET_BUFLEN_DS_16K;
}
if (l > bl) {
bl = l; // make sure there is the space to copy a packet
// Ensure buffer is large enough for the current packet
if (data_len > buffer_len) {
buffer_len = data_len;
}
buff = malloc(bl);
bf = bl;
buffer = (unsigned char*)malloc(buffer_len);
remaining_space = buffer_len;
}
// Copy data to buffer
memcpy((unsigned char*)buffer + (buffer_len - remaining_space), current_ptr, data_len);
remaining_space -= data_len;
current_ptr += data_len;
if (packet_type == 'Z') {
// if packet is a 'Z' packet (Ready Packet), we need to overwrite trasaction state
*(buffer + (buffer_len - remaining_space) - 1) = current_transaction_state;
}
memcpy((char*)buff + (bl - bf), __ptr, l);
bf -= l;
__ptr += l;
/*
l=hdr.pkt_length+sizeof(mysql_hdr);
pkt=l_alloc(l);
memcpy(pkt,__ptr,l);
resultset->add(pkt,l);
__ptr+=l;
*/
}
if (buff) {
// last buffer to add
resultset->add(buff, bl - bf);
// Add the last buffer to the resultset, if any
if (buffer) {
resultset->add(buffer, buffer_len - remaining_space);
}
};
}
int PgSQL_Data_Stream::array2buffer_full() {
int rc = 0;

@ -0,0 +1,48 @@
#include "proxysql.h"
#include "cpp.h"
#include "PgSQL_Query_Cache.h"
extern PgSQL_Threads_Handler* GloPTH;
bool PgSQL_Query_Cache::set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp,
uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms) {
PgSQL_QC_entry_t* entry = (PgSQL_QC_entry_t*)malloc(sizeof(PgSQL_QC_entry_t));
return Query_Cache::set(entry, user_hash, kp, kl, vp, vl, create_ms, curtime_ms, expire_ms);
}
const std::shared_ptr<PgSQL_QC_entry_t> PgSQL_Query_Cache::get(uint64_t user_hash, const unsigned char* kp,
const uint32_t kl, uint64_t curtime_ms, uint64_t cache_ttl) {
const std::shared_ptr<PgSQL_QC_entry_t> entry_shared = std::static_pointer_cast<PgSQL_QC_entry_t>(
Query_Cache::get(user_hash, kp, kl, curtime_ms, cache_ttl)
);
return entry_shared;
}
/*
void* PgSQL_Query_Cache::purgeHash_thread(void*) {
unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version;
PgSQL_Thread* pgsql_thr = new PgSQL_Thread();
PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version();
set_thread_name("PgQCPurge");
pgsql_thr->refresh_variables();
max_memory_size = static_cast<uint64_t>(pgsql_thread___query_cache_size_MB*1024ULL*1024ULL);
while (shutting_down == false) {
usleep(purge_loop_time);
unsigned int glover = GloPTH->get_global_version();
if (GloPTH) {
if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < glover) {
PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = glover;
pgsql_thr->refresh_variables();
max_memory_size = static_cast<uint64_t>(pgsql_thread___query_cache_size_MB*1024ULL*1024ULL);
}
}
const unsigned int curr_pct = current_used_memory_pct();
if (curr_pct < purge_threshold_pct_min) continue;
Query_Cache::purgeHash((monotonic_time()/1000ULL), curr_pct);
}
delete pgsql_thr;
return NULL;
}*/

@ -23,7 +23,7 @@ using json = nlohmann::json;
#include "SQLite3_Server.h"
#include "MySQL_Variables.h"
#include "ProxySQL_Cluster.hpp"
#include "PgSQL_Query_Cache.h"
#include "libinjection.h"
#include "libinjection_sqli.h"
@ -313,7 +313,7 @@ __exit_kill_query_thread:
}
extern PgSQL_Query_Processor* GloPgQPro;
extern Query_Cache* GloQC;
extern PgSQL_Query_Cache *GloPgQC;
extern ProxySQL_Admin* GloAdmin;
extern PgSQL_Threads_Handler* GloPTH;
@ -5831,24 +5831,21 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
return true;
}
//}
/* Query Cache is not supported for PgSQL
if (qpo->cache_ttl > 0 && ((prepare_stmt_type & PgSQL_ps_type_prepare_stmt) == 0)) {
bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF;
uint32_t resbuf = 0;
unsigned char* aa = GloQC->get(
const std::shared_ptr<PgSQL_QC_entry_t> pgsql_qc_entry = GloPgQC->get(
client_myds->myconn->userinfo->hash,
(const unsigned char*)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
&resbuf,
thread->curtime / 1000,
qpo->cache_ttl,
deprecate_eof_active
qpo->cache_ttl
);
if (aa) {
client_myds->buffer2resultset(aa, resbuf);
free(aa);
client_myds->PSarrayOUT->copy_add(client_myds->resultset, 0, client_myds->resultset->len);
while (client_myds->resultset->len) client_myds->resultset->remove_index(client_myds->resultset->len - 1, NULL);
if (pgsql_qc_entry) {
// FIXME: Add Error Transaction state detection
unsigned int nTrx = NumActiveTransactions();
PgSQL_Data_Stream::copy_buffer_to_resultset(client_myds->PSarrayOUT,
pgsql_qc_entry->value, pgsql_qc_entry->length, (nTrx ? 'T' : 'I'));
//client_myds->PSarrayOUT->copy_add(resultset, 0, resultset->len);
if (transaction_persistent_hostgroup == -1) {
// not active, we can change it
current_hostgroup = -1;
@ -5857,7 +5854,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
l_free(pkt->size, pkt->ptr);
return true;
}
}*/
}
__exit_set_destination_hostgroup:
@ -6232,48 +6229,46 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da
bool transfer_started = query_result->is_transfer_started();
// if there is an error, it will be false so results are not cached
bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY);
CurrentQuery.rows_sent = query_result->get_num_rows();
const uint64_t num_rows = query_result->get_num_rows();
const uint64_t resultset_size = query_result->get_resultset_size();
const auto _affected_rows = query_result->get_affected_rows();
if (_affected_rows != -1) {
CurrentQuery.affected_rows = _affected_rows;
CurrentQuery.have_affected_rows = true;
}
CurrentQuery.rows_sent = num_rows;
bool resultset_completed = query_result->get_resultset(client_myds->PSarrayOUT);
if (_conn->processing_multi_statement == false)
assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called
if (transfer_started == false) { // we have all the resultset when PgSQL_Result_to_PgSQL_wire was called
if (qpo && qpo->cache_ttl > 0 && is_tuple == true) { // the resultset should be cached
/*if (mysql_errno(pgsql) == 0 &&
(mysql_warning_count(pgsql) == 0 ||
mysql_thread___query_cache_handle_warnings == 1)) { // no errors
if (_conn->is_error_present() == false &&
(/* check warnings count here*/ true ||
pgsql_thread___query_cache_handle_warnings == 1)) { // no errors
if (
(qpo->cache_empty_result == 1)
|| (
(qpo->cache_empty_result == -1)
&&
(thread->variables.query_cache_stores_empty_result || query_result->num_rows)
(qpo->cache_empty_result == 1) ||
(
(qpo->cache_empty_result == -1) &&
(thread->variables.query_cache_stores_empty_result || num_rows)
)
) {
client_myds->resultset->copy_add(client_myds->PSarrayOUT, 0, client_myds->PSarrayOUT->len);
client_myds->resultset_length = query_result->resultset_size;
unsigned char* aa = client_myds->resultset2buffer(false);
while (client_myds->resultset->len) client_myds->resultset->remove_index(client_myds->resultset->len - 1, NULL);
bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF;
GloQC->set(
// Query Cache will have the ownership to buff. No need to free it here
unsigned char* buff = PgSQL_Data_Stream::copy_array_to_buffer(client_myds->PSarrayOUT,
resultset_size, false);
GloPgQC->set(
client_myds->myconn->userinfo->hash,
(const unsigned char*)CurrentQuery.QueryPointer,
CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
aa,
client_myds->resultset_length,
buff,
resultset_size,
thread->curtime / 1000,
thread->curtime / 1000,
thread->curtime / 1000 + qpo->cache_ttl,
deprecate_eof_active
thread->curtime / 1000 + qpo->cache_ttl
);
l_free(client_myds->resultset_length, aa);
client_myds->resultset_length = 0;
}
}*/
}
}
}
} else { // if query result is empty, means there was an error before query result was generated
@ -6534,7 +6529,7 @@ void PgSQL_Session::Memory_Stats() {
unsigned long long internal = 0;
internal += sizeof(PgSQL_Session);
if (qpo)
internal += sizeof(Query_Processor_Output);
internal += sizeof(PgSQL_Query_Processor_Output);
if (client_myds) {
internal += sizeof(PgSQL_Data_Stream);
if (client_myds->queueIN.buffer)
@ -6552,7 +6547,7 @@ void PgSQL_Session::Memory_Stats() {
internal += client_myds->PSarrayOUT->total_size();
} else {
internal += client_myds->PSarrayOUT->total_size(PGSQL_RESULTSET_BUFLEN);
internal += client_myds->resultset->total_size(PGSQL_RESULTSET_BUFLEN);
//internal += client_myds->resultset->total_size(PGSQL_RESULTSET_BUFLEN);
}
}
}

@ -3789,9 +3789,9 @@ void PgSQL_Thread::refresh_variables() {
pgsql_thread___query_processor_iterations = GloPTH->get_variable_int((char*)"query_processor_iterations");
pgsql_thread___query_processor_regex = GloPTH->get_variable_int((char*)"query_processor_regex");
mysql_thread___query_cache_size_MB = GloPTH->get_variable_int((char*)"query_cache_size_MB");
mysql_thread___query_cache_soft_ttl_pct = GloPTH->get_variable_int((char*)"query_cache_soft_ttl_pct");
mysql_thread___query_cache_handle_warnings = GloPTH->get_variable_int((char*)"query_cache_handle_warnings");
pgsql_thread___query_cache_size_MB = GloPTH->get_variable_int((char*)"query_cache_size_MB");
pgsql_thread___query_cache_soft_ttl_pct = GloPTH->get_variable_int((char*)"query_cache_soft_ttl_pct");
pgsql_thread___query_cache_handle_warnings = GloPTH->get_variable_int((char*)"query_cache_handle_warnings");
/*
mysql_thread___max_stmts_per_connection = GloPTH->get_variable_int((char*)"max_stmts_per_connection");
mysql_thread___max_stmts_cache = GloPTH->get_variable_int((char*)"max_stmts_cache");

@ -77,6 +77,8 @@ using json = nlohmann::json;
#include <uuid/uuid.h>
#include "PgSQL_Protocol.h"
#include "MySQL_Query_Cache.h"
#include "PgSQL_Query_Cache.h"
//#include "usual/time.h"
using std::string;
@ -307,7 +309,8 @@ bool admin_proxysql_mysql_paused = false;
bool admin_proxysql_pgsql_paused = false;
int admin_old_wait_timeout;
extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern PgSQL_Query_Cache* GloPgQC;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication *GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;
@ -2328,8 +2331,8 @@ __end_while_pool:
}
}
if (GloProxyStats->MySQL_Query_Cache_timetoget(curtime)) {
if (GloQC) {
SQLite3_result * resultset=GloQC->SQL3_getStats();
if (GloMyQC) {
SQLite3_result * resultset=GloMyQC->SQL3_getStats();
if (resultset) {
GloProxyStats->MySQL_Query_Cache_sets(resultset);
delete resultset;
@ -2502,9 +2505,13 @@ void update_modules_metrics() {
if (GloMyMon) {
GloMyMon->p_update_metrics();
}
// Update query_cache metrics
if (GloQC) {
GloQC->p_update_metrics();
// Update mysql query_cache metrics
if (GloMyQC) {
GloMyQC->p_update_metrics();
}
// Update pgsql query_cache metrics
if (GloPgQC) {
GloPgQC->p_update_metrics();
}
// Update cluster metrics
if (GloProxyCluster) {

@ -12,7 +12,8 @@
#include "MySQL_LDAP_Authentication.hpp"
#include "MySQL_PreparedStatement.h"
#include "ProxySQL_Cluster.hpp"
#include "MySQL_Query_Cache.h"
#include "PgSQL_Query_Cache.h"
#include "MySQL_Query_Processor.h"
#include "PgSQL_Query_Processor.h"
@ -31,7 +32,8 @@ extern bool admin_proxysql_pgsql_paused;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication* GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;
extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern PgSQL_Query_Cache* GloPgQC;
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Threads_Handler *GloMTH;
extern PgSQL_Threads_Handler* GloPTH;
@ -531,7 +533,7 @@ void ProxySQL_Admin::stats___mysql_global() {
free(query);
}
if (GloQC && (resultset=GloQC->SQL3_getStats())) {
if (GloMyQC && (resultset= GloMyQC->SQL3_getStats())) {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
@ -691,7 +693,7 @@ void ProxySQL_Admin::stats___pgsql_global() {
free(query);
}*/
if (GloQC && (resultset = GloQC->SQL3_getStats())) {
if (GloPgQC && (resultset = GloPgQC->SQL3_getStats())) {
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
int arg_len = 0;

@ -1,12 +1,17 @@
#include "prometheus/counter.h"
#include "btree_map.h"
#include "proxysql.h"
#include "cpp.h"
#include "query_cache.hpp"
#include "proxysql_atomic.h"
//#include "SpookyV2.h"
#include "prometheus/counter.h"
#include "prometheus_helpers.h"
#include "MySQL_Protocol.h"
#include "query_cache.hpp"
#include "MySQL_Query_Cache.h"
#include "PgSQL_Query_Cache.h"
#ifdef DEBUG
#define DEB "_DEBUG"
#else
#define DEB ""
#endif /* DEBUG */
#define QUERY_CACHE_VERSION "2.0.0385" DEB
#define THR_UPDATE_CNT(__a, __b, __c, __d) \
do {\
@ -24,140 +29,155 @@
} \
} while(0)
#define DEFAULT_SQC_size 4*1024*1024
#ifdef DEBUG
#define DEB "_DEBUG"
#else
#define DEB ""
#endif /* DEBUG */
#define QUERY_CACHE_VERSION "1.2.0905" DEB
#define PROXYSQL_QC_PTHREAD_MUTEX
extern MySQL_Threads_Handler *GloMTH;
typedef btree::btree_map<uint64_t, QC_entry_t *> BtMap_cache;
#define GET_THREAD_VARIABLE(VARIABLE_NAME) \
({((std::is_same_v<QC_DERIVED,MySQL_Query_Cache>) ? mysql_thread___##VARIABLE_NAME : pgsql_thread___##VARIABLE_NAME) ;})
__thread uint64_t __thr_cntSet = 0;
__thread uint64_t __thr_cntGet = 0;
__thread uint64_t __thr_cntGetOK = 0;
__thread uint64_t __thr_dataIN = 0;
__thread uint64_t __thr_dataOUT = 0;
__thread uint64_t __thr_num_entries = 0;
__thread uint64_t __thr_num_deleted = 0;
__thread uint64_t __thr_size_values = 0;
static uint64_t Glo_cntSet = 0;
static uint64_t Glo_cntGet = 0;
static uint64_t Glo_cntGetOK = 0;
static uint64_t Glo_num_entries = 0;
static uint64_t Glo_dataIN = 0;
static uint64_t Glo_dataOUT = 0;
static uint64_t Glo_cntPurge = 0;
static uint64_t Glo_size_values = 0;
static uint64_t Glo_total_freed_memory = 0;
template<typename QC_DERIVED>
bool Query_Cache<QC_DERIVED>::shutting_down = false;
template<typename QC_DERIVED>
pthread_t Query_Cache<QC_DERIVED>::purge_thread_id;
class KV_BtreeArray {
private:
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_t lock;
#else
rwlock_t lock;
#endif
BtMap_cache bt_map;
PtrArray *ptrArray;
uint64_t purgeChunkSize;
uint64_t purgeIdx;
bool __insert(uint64_t, void *);
uint64_t freeable_memory;
public:
uint64_t tottopurge;
KV_BtreeArray();
public:
KV_BtreeArray(unsigned int entry_size);
~KV_BtreeArray();
uint64_t get_data_size();
void purge_some(unsigned long long, bool);
int cnt();
std::weak_ptr<QC_entry_t> lookup(uint64_t key);
bool replace(uint64_t key, QC_entry_t *entry);
QC_entry_t *lookup(uint64_t key);
void empty();
};
void clear(bool release_entries = false);
void purge_some(uint64_t QCnow_ms, bool aggressive);
uint64_t get_data_size() const;
int count() const;
__thread uint64_t __thr_cntSet=0;
__thread uint64_t __thr_cntGet=0;
__thread uint64_t __thr_cntGetOK=0;
__thread uint64_t __thr_dataIN=0;
__thread uint64_t __thr_dataOUT=0;
__thread uint64_t __thr_num_entries=0;
__thread uint64_t __thr_num_deleted=0;
__thread uint64_t __thr_size_values=0;
//__thread uint64_t __thr_freeable_memory=0;
private:
pthread_rwlock_t lock;
std::vector<std::shared_ptr<QC_entry_t>> entries;
using BtMap_cache = btree::btree_map<uint64_t,std::weak_ptr<QC_entry_t>>;
BtMap_cache bt_map;
const unsigned int qc_entry_size;
#define DEFAULT_SQC_size 4*1024*1024
void rdlock();
void wrlock();
void unlock();
void add_to_entries(const std::shared_ptr<QC_entry_t>& entry);
void remove_from_entries_by_index(size_t index);
};
void free_QC_Entry(QC_entry_t* entry) {
if (entry) {
free(entry->value);
free(entry);
}
}
static uint64_t Glo_cntSet=0;
static uint64_t Glo_cntGet=0;
static uint64_t Glo_cntGetOK=0;
static uint64_t Glo_num_entries=0;
static uint64_t Glo_dataIN=0;
static uint64_t Glo_dataOUT=0;
static uint64_t Glo_cntPurge=0;
static uint64_t Glo_size_values=0;
static uint64_t Glo_total_freed_memory;
KV_BtreeArray::KV_BtreeArray() {
freeable_memory=0;
tottopurge=0;
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
KV_BtreeArray::KV_BtreeArray(unsigned int entry_size) : qc_entry_size(entry_size) {
pthread_rwlock_init(&lock, NULL);
#else
spinlock_rwlock_init(&lock);
#endif
ptrArray = new PtrArray;
};
KV_BtreeArray::~KV_BtreeArray() {
proxy_debug(PROXY_DEBUG_QUERY_CACHE, 3, "Size of KVBtreeArray:%d , ptrArray:%u\n", cnt() , ptrArray->len);
empty();
QC_entry_t *qce=NULL;
while (ptrArray->len) {
qce=(QC_entry_t *)ptrArray->remove_index_fast(0);
free(qce->value);
free(qce);
}
delete ptrArray;
proxy_debug(PROXY_DEBUG_QUERY_CACHE, 3, "Size of KVBtreeArray:%d , entries:%lu\n", count(), entries.size());
clear(true);
pthread_rwlock_destroy(&lock);
};
inline void KV_BtreeArray::rdlock() { pthread_rwlock_rdlock(&lock); }
inline void KV_BtreeArray::wrlock() { pthread_rwlock_wrlock(&lock); }
inline void KV_BtreeArray::unlock() { pthread_rwlock_unlock(&lock); }
void KV_BtreeArray::add_to_entries(const std::shared_ptr<QC_entry_t>& entry) {
if (entries.capacity() <= (entries.size() + 1)) {
const unsigned int new_size = l_near_pow_2(entries.size() + 1);
entries.reserve(new_size);
}
entries.push_back(entry);
}
void KV_BtreeArray::remove_from_entries_by_index(size_t index) {
if (index >= entries.size()) {
return;
}
if (index != entries.size() - 1) {
std::swap(entries[index], entries.back());
}
entries.pop_back();
if ((entries.size() > MIN_ARRAY_LEN) && (entries.capacity() > entries.size() * MIN_ARRAY_DELETE_RATIO)) {
entries.shrink_to_fit();
}
}
uint64_t KV_BtreeArray::get_data_size() {
uint64_t r = __sync_fetch_and_add(&Glo_num_entries,0) * (sizeof(QC_entry_t)+sizeof(QC_entry_t *)*2+sizeof(uint64_t)*2); // + __sync_fetch_and_add(&Glo_size_values,0) ;
return r;
uint64_t KV_BtreeArray::get_data_size() const {
uint64_t data_size = __sync_fetch_and_add(&Glo_num_entries,0) * (qc_entry_size+sizeof(QC_entry_t*)*2+sizeof(uint64_t)*2); // + __sync_fetch_and_add(&Glo_size_values,0) ;
return data_size;
};
void KV_BtreeArray::purge_some(unsigned long long QCnow_ms, bool aggressive) {
uint64_t ret=0, i, _size=0;
QC_entry_t *qce;
unsigned long long access_ms_min=0;
unsigned long long access_ms_max=0;
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_rdlock(&lock);
#else
spin_rdlock(&lock);
#endif
for (i=0; i<ptrArray->len;i++) {
qce=(QC_entry_t *)ptrArray->index(i);
void KV_BtreeArray::purge_some(uint64_t QCnow_ms, bool aggressive) {
uint64_t ret = 0;
uint64_t freeable_memory = 0;
uint64_t access_ms_min = std::numeric_limits<uint64_t>::max();
uint64_t access_ms_max = 0;
rdlock();
for (const std::shared_ptr<QC_entry_t>& entry_shared : entries) {
if (aggressive) { // we have been asked to do aggressive purging
if (access_ms_min==0) {
access_ms_min = qce->access_ms;
access_ms_min = std::min(access_ms_min, entry_shared->access_ms);
access_ms_max = std::max(access_ms_max, entry_shared->access_ms);
/* if (access_ms_min == 0) {
access_ms_min = entry_shared->access_ms;
} else {
if (access_ms_min > qce->access_ms) {
access_ms_min = qce->access_ms;
if (access_ms_min > entry_shared->access_ms) {
access_ms_min = entry_shared->access_ms;
}
}
if (access_ms_max==0) {
access_ms_max = qce->access_ms;
access_ms_max = entry_shared->access_ms;
} else {
if (access_ms_max < qce->access_ms) {
access_ms_max = qce->access_ms;
if (access_ms_max < entry_shared->access_ms) {
access_ms_max = entry_shared->access_ms;
}
}
}*/
} else { // no aggresssive purging , legacy algorithm
if (qce->expire_ms==EXPIRE_DROPIT || qce->expire_ms<QCnow_ms) {
if (entry_shared->expire_ms == EXPIRE_DROPIT || entry_shared->expire_ms < QCnow_ms) {
ret++;
_size+=qce->length;
freeable_memory += entry_shared->length;
}
}
}
freeable_memory=_size;
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_unlock(&lock);
#else
spin_rdunlock(&lock);
#endif
//freeable_memory=_size;
unlock();
bool cond_freeable_memory=false;
if (aggressive==false) {
uint64_t total_freeable_memory=0;
total_freeable_memory=freeable_memory + ret * (sizeof(QC_entry_t)+sizeof(QC_entry_t *)*2+sizeof(uint64_t)*2);
total_freeable_memory=freeable_memory + ret * (qc_entry_size+sizeof(QC_entry_t*)*2+sizeof(uint64_t)*2);
if ( total_freeable_memory > get_data_size()*0.01 ) {
cond_freeable_memory=true; // there is memory that can be freed
}
@ -166,50 +186,46 @@ void KV_BtreeArray::purge_some(unsigned long long QCnow_ms, bool aggressive) {
if ( aggressive || cond_freeable_memory ) {
uint64_t removed_entries=0;
uint64_t freed_memory=0;
unsigned long long access_ms_lower_mark=0;
uint64_t access_ms_lower_mark=0;
if (aggressive) {
access_ms_lower_mark=access_ms_min+(access_ms_max-access_ms_min)*0.1; // hardcoded for now. Remove the entries with access time in the 10% range closest to access_ms_min
access_ms_lower_mark = access_ms_min + (access_ms_max-access_ms_min) * 0.1; // hardcoded for now. Remove the entries with access time in the 10% range closest to access_ms_min
}
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_wrlock(&lock);
#else
spin_wrlock(&lock);
#endif
for (i=0; i<ptrArray->len;i++) {
qce=(QC_entry_t *)ptrArray->index(i);
wrlock();
for (size_t i = 0; i < entries.size();) {
const std::shared_ptr<QC_entry_t>& entry_shared = entries[i];
bool drop_entry=false;
if (__sync_fetch_and_add(&qce->ref_count,0)<=1) { // currently not in use
if (qce->expire_ms==EXPIRE_DROPIT || qce->expire_ms<QCnow_ms) { //legacy algorithm
//if (__sync_fetch_and_add(&qce->ref_count,0)<=1) { // currently not in use
if (entry_shared.use_count() <= 1) { // we check this to avoid releasing entries that are still in use
if (entry_shared->expire_ms == EXPIRE_DROPIT || entry_shared->expire_ms < QCnow_ms) { //legacy algorithm
drop_entry=true;
}
if (aggressive) { // we have been asked to do aggressive purging
if (drop_entry==false) { // if the entry is already marked to be dropped, no further check
if (qce->access_ms < access_ms_lower_mark) {
if (entry_shared->access_ms < access_ms_lower_mark) {
drop_entry=true;
}
}
}
}
if (drop_entry) {
qce=(QC_entry_t *)ptrArray->remove_index_fast(i);
btree::btree_map<uint64_t, QC_entry_t *>::iterator lookup;
lookup = bt_map.find(qce->key);
if (lookup != bt_map.end()) {
const uint32_t length = entry_shared->length;
btree::btree_map<uint64_t,std::weak_ptr<QC_entry_t>>::iterator lookup;
lookup = bt_map.find(entry_shared->key);
if (lookup != bt_map.end()) {
bt_map.erase(lookup);
}
i--;
freed_memory+=qce->length;
remove_from_entries_by_index(i);
freed_memory+=length;
removed_entries++;
free(qce->value);
free(qce);
continue;
}
i++;
}
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_unlock(&lock);
#else
spin_wrunlock(&lock);
#endif
unlock();
THR_DECREASE_CNT(__thr_num_deleted,Glo_num_entries,removed_entries,1);
if (removed_entries) {
__sync_fetch_and_add(&Glo_total_freed_memory,freed_memory);
@ -219,82 +235,69 @@ void KV_BtreeArray::purge_some(unsigned long long QCnow_ms, bool aggressive) {
}
};
int KV_BtreeArray::cnt() {
inline int KV_BtreeArray::count() const {
return bt_map.size();
};
bool KV_BtreeArray::replace(uint64_t key, QC_entry_t *entry) {
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_wrlock(&lock);
#else
spin_wrlock(&lock);
#endif
std::shared_ptr<QC_entry_t> entry_shared(entry, &free_QC_Entry);
wrlock();
THR_UPDATE_CNT(__thr_cntSet,Glo_cntSet,1,1);
THR_UPDATE_CNT(__thr_size_values,Glo_size_values,entry->length,1);
THR_UPDATE_CNT(__thr_dataIN,Glo_dataIN,entry->length,1);
THR_UPDATE_CNT(__thr_num_entries,Glo_num_entries,1,1);
entry->ref_count=1;
ptrArray->add(entry);
btree::btree_map<uint64_t, QC_entry_t *>::iterator lookup;
lookup = bt_map.find(key);
if (lookup != bt_map.end()) {
lookup->second->expire_ms=EXPIRE_DROPIT;
__sync_fetch_and_sub(&lookup->second->ref_count,1);
add_to_entries(entry_shared);
btree::btree_map<uint64_t,std::weak_ptr<QC_entry_t>>::iterator lookup;
lookup = bt_map.find(key);
if (lookup != bt_map.end()) {
if (std::shared_ptr<QC_entry_t> found_entry_shared = lookup->second.lock()) {
found_entry_shared->expire_ms = EXPIRE_DROPIT;
}
bt_map.erase(lookup);
}
bt_map.insert(std::make_pair(key,entry));
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_unlock(&lock);
#else
spin_wrunlock(&lock);
#endif
bt_map.insert({key,entry_shared});
#ifdef DEBUG
assert(entry_shared.use_count() == 2); // it should be 2, one for entry_shared object and one for object in entries vector
#endif /* DEBUG */
unlock();
return true;
}
QC_entry_t * KV_BtreeArray::lookup(uint64_t key) {
QC_entry_t *entry=NULL;
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_rdlock(&lock);
#else
spin_rdlock(&lock);
#endif
std::weak_ptr<QC_entry_t> KV_BtreeArray::lookup(uint64_t key) {
std::weak_ptr<QC_entry_t> entry_ptr;
rdlock();
THR_UPDATE_CNT(__thr_cntGet,Glo_cntGet,1,1);
btree::btree_map<uint64_t, QC_entry_t *>::iterator lookup;
lookup = bt_map.find(key);
if (lookup != bt_map.end()) {
entry=lookup->second;
__sync_fetch_and_add(&entry->ref_count,1);
}
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_unlock(&lock);
#else
spin_rdunlock(&lock);
#endif
return entry;
btree::btree_map<uint64_t,std::weak_ptr<QC_entry_t>>::iterator lookup;
lookup = bt_map.find(key);
if (lookup != bt_map.end()) {
entry_ptr = lookup->second;
//__sync_fetch_and_add(&entry->ref_count,1);
}
unlock();
return entry_ptr;
};
void KV_BtreeArray::empty() {
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_wrlock(&lock);
#else
spin_wrlock(&lock);
#endif
btree::btree_map<uint64_t, QC_entry_t *>::iterator lookup;
void KV_BtreeArray::clear(bool release_entries) {
wrlock();
btree::btree_map<uint64_t,std::weak_ptr<QC_entry_t>>::iterator lookup;
while (bt_map.size()) {
lookup = bt_map.begin();
if ( lookup != bt_map.end() ) {
lookup->second->expire_ms=EXPIRE_DROPIT;
if (std::shared_ptr<QC_entry_t> found_entry_shared = lookup->second.lock()) {
found_entry_shared->expire_ms = EXPIRE_DROPIT;
}
bt_map.erase(lookup);
}
}
#ifdef PROXYSQL_QC_PTHREAD_MUTEX
pthread_rwlock_unlock(&lock);
#else
spin_wrunlock(&lock);
#endif
};
if (release_entries)
entries.clear();
unlock();
}
using metric_name = std::string;
using metric_help = std::string;
@ -398,17 +401,18 @@ qc_metrics_map = std::make_tuple(
}
);
uint64_t Query_Cache::get_data_size_total() {
uint64_t r=0;
int i;
for (i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
r+=KVs[i]->get_data_size();
template <typename QC_DERIVED>
uint64_t Query_Cache<QC_DERIVED>::get_data_size_total() {
uint64_t total_size = 0;
for (int i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
total_size += KVs[i]->get_data_size();
}
r += __sync_fetch_and_add(&Glo_size_values,0);
return r;
total_size += __sync_fetch_and_add(&Glo_size_values,0);
return total_size;
};
unsigned int Query_Cache::current_used_memory_pct() {
template <typename QC_DERIVED>
unsigned int Query_Cache<QC_DERIVED>::current_used_memory_pct(uint64_t max_memory_size) {
uint64_t cur_size=get_data_size_total();
float pctf = (float) cur_size*100/max_memory_size;
if (pctf > 100) return 100;
@ -416,7 +420,8 @@ unsigned int Query_Cache::current_used_memory_pct() {
return pct;
}
Query_Cache::Query_Cache() {
template <typename QC_DERIVED>
Query_Cache<QC_DERIVED>::Query_Cache() {
#ifdef DEBUG
if (glovars.has_debug==false) {
#else
@ -426,23 +431,22 @@ Query_Cache::Query_Cache() {
exit(EXIT_FAILURE);
}
for (int i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
KVs[i]=new KV_BtreeArray();
KVs[i]=new KV_BtreeArray(sizeof(TypeQCEntry));
}
QCnow_ms=monotonic_time()/1000;
size=SHARED_QUERY_CACHE_HASH_TABLES;
shutdown=0;
purge_loop_time=DEFAULT_purge_loop_time;
purge_total_time=DEFAULT_purge_total_time;
purge_threshold_pct_min=DEFAULT_purge_threshold_pct_min;
purge_threshold_pct_max=DEFAULT_purge_threshold_pct_max;
max_memory_size=DEFAULT_SQC_size;
//shutting_down = 0;
//purge_loop_time=DEFAULT_purge_loop_time;
//purge_total_time=DEFAULT_purge_total_time;
//purge_threshold_pct_min=DEFAULT_purge_threshold_pct_min;
//purge_threshold_pct_max=DEFAULT_purge_threshold_pct_max;
//max_memory_size=DEFAULT_SQC_size;
// Initialize prometheus metrics
init_prometheus_counter_array<qc_metrics_map_idx, p_qc_counter>(qc_metrics_map, this->metrics.p_counter_array);
init_prometheus_gauge_array<qc_metrics_map_idx, p_qc_gauge>(qc_metrics_map, this->metrics.p_gauge_array);
};
void Query_Cache::p_update_metrics() {
template <typename QC_DERIVED>
void Query_Cache<QC_DERIVED>::p_update_metrics() {
this->metrics.p_gauge_array[p_qc_gauge::query_cache_memory_bytes]->Set(get_data_size_total());
p_update_counter(this->metrics.p_counter_array[p_qc_counter::query_cache_count_get], Glo_cntGet - Glo_cntGetOK);
p_update_counter(this->metrics.p_counter_array[p_qc_counter::query_cache_count_get_ok], Glo_cntGetOK);
@ -453,209 +457,33 @@ void Query_Cache::p_update_metrics() {
p_update_counter(this->metrics.p_counter_array[p_qc_counter::query_cache_entries], Glo_num_entries);
}
void Query_Cache::print_version() {
template <typename QC_DERIVED>
void Query_Cache<QC_DERIVED>::print_version() {
fprintf(stderr,"In memory Standard Query Cache (SQC) rev. %s -- %s -- %s\n", QUERY_CACHE_VERSION, __FILE__, __TIMESTAMP__);
};
Query_Cache::~Query_Cache() {
unsigned int i;
for (i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
template <typename QC_DERIVED>
Query_Cache<QC_DERIVED>::~Query_Cache() {
for (unsigned int i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
delete KVs[i];
}
};
const int eof_to_ok_dif = static_cast<const int>(- (sizeof(mysql_hdr) + 5) + 2);
const int ok_to_eof_dif = static_cast<const int>(+ (sizeof(mysql_hdr) + 5) - 2);
/**
* @brief Converts a 'EOF_Packet' to holded inside a 'QC_entry_t' into a 'OK_Packet'.
* Warning: This function assumes that the supplied 'QC_entry_t' holds a valid
* 'EOF_Packet'.
*
* @param entry The 'QC_entry_t' holding a 'OK_Packet' to be converted into
* a 'EOF_Packet'.
* @return The converted packet.
*/
unsigned char* eof_to_ok_packet(QC_entry_t* entry) {
unsigned char* result = (unsigned char*)malloc(entry->length + eof_to_ok_dif);
unsigned char* vp = result;
char* it = entry->value;
// Copy until the first EOF
memcpy(vp, entry->value, entry->column_eof_pkt_offset);
it += entry->column_eof_pkt_offset;
vp += entry->column_eof_pkt_offset;
// Skip the first EOF after columns def
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + hdr.pkt_length;
// Copy all the rows
uint64_t u_entry_val = reinterpret_cast<uint64_t>(entry->value);
uint64_t u_it_pos = reinterpret_cast<uint64_t>(it);
uint64_t rows_length = (u_entry_val + entry->row_eof_pkt_offset) - u_it_pos;
memcpy(vp, it, rows_length);
vp += rows_length;
it += rows_length;
// Replace final EOF in favor of OK packet
// =======================================
// Copy the mysql header
memcpy(&hdr, it, sizeof(mysql_hdr));
hdr.pkt_length = 7;
memcpy(vp, &hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
it += sizeof(mysql_hdr);
// OK packet header
*vp = 0xfe;
vp++;
it++;
// Initialize affected_rows and last_insert_id to zero
memset(vp, 0, 2);
vp += 2;
// Extract warning flags and status from 'EOF_packet'
char* eof_packet = entry->value + entry->row_eof_pkt_offset;
eof_packet += sizeof(mysql_hdr);
// Skip the '0xFE EOF packet header'
eof_packet += 1;
uint16_t warnings;
memcpy(&warnings, eof_packet, sizeof(uint16_t));
eof_packet += 2;
uint16_t status_flags;
memcpy(&status_flags, eof_packet, sizeof(uint16_t));
// Copy warnings an status flags
memcpy(vp, &status_flags, sizeof(uint16_t));
vp += 2;
memcpy(vp, &warnings, sizeof(uint16_t));
// =======================================
// Decrement ids after the first EOF
unsigned char* dp = result + entry->column_eof_pkt_offset;
mysql_hdr decrement_hdr;
for (;;) {
memcpy(&decrement_hdr, dp, sizeof(mysql_hdr));
decrement_hdr.pkt_id--;
memcpy(dp, &decrement_hdr, sizeof(mysql_hdr));
dp += sizeof(mysql_hdr) + decrement_hdr.pkt_length;
if (dp >= vp)
break;
}
return result;
}
/**
* @brief Converts a 'OK_Packet' holded inside 'QC_entry_t' into a 'EOF_Packet'.
* Warning: This function assumes that the supplied 'QC_entry_t' holds a valid
* 'OK_Packet'.
*
* @param entry The 'QC_entry_t' holding a 'EOF_Packet' to be converted into
* a 'OK_Packet'.
* @return The converted packet.
*/
unsigned char* ok_to_eof_packet(QC_entry_t* entry) {
unsigned char* result = (unsigned char*)malloc(entry->length + ok_to_eof_dif);
unsigned char* vp = result;
char* it = entry->value;
// Extract warning flags and status from 'OK_packet'
char* ok_packet = it + entry->ok_pkt_offset;
mysql_hdr ok_hdr;
memcpy(&ok_hdr, ok_packet, sizeof(mysql_hdr));
ok_packet += sizeof(mysql_hdr);
// Skip the 'OK packet header', 'affected_rows' and 'last_insert_id'
ok_packet += 3;
uint16_t status_flags;
memcpy(&status_flags, ok_packet, sizeof(uint16_t));
ok_packet += 2;
uint16_t warnings;
memcpy(&warnings, ok_packet, sizeof(uint16_t));
// Find the spot in which the first EOF needs to be placed
it += sizeof(mysql_hdr);
uint64_t c_count = 0;
int c_count_len = mysql_decode_length(reinterpret_cast<unsigned char*>(it), &c_count);
it += c_count_len;
mysql_hdr column_hdr;
for (uint64_t i = 0; i < c_count; i++) {
memcpy(&column_hdr, it ,sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + column_hdr.pkt_length;
}
// Location for 'column_eof'
uint64_t column_eof_offset =
reinterpret_cast<unsigned char*>(it) -
reinterpret_cast<unsigned char*>(entry->value);
memcpy(vp, entry->value, column_eof_offset);
vp += column_eof_offset;
// Write 'column_eof_packet' header
column_hdr.pkt_id = column_hdr.pkt_id + 1;
column_hdr.pkt_length = 5;
memcpy(vp, &column_hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
// Write 'column_eof_packet' contents
*vp = 0xfe;
vp++;
memcpy(vp, &warnings, sizeof(uint16_t));
vp += 2;
memcpy(vp, &status_flags, sizeof(uint16_t));
vp += 2;
// Find the OK packet
for (;;) {
mysql_hdr hdr;
memcpy(&hdr, it ,sizeof(mysql_hdr));
unsigned char* payload =
reinterpret_cast<unsigned char*>(it) +
sizeof(mysql_hdr);
if (hdr.pkt_length < 9 && *payload == 0xfe) {
mysql_hdr ok_hdr;
ok_hdr.pkt_id = hdr.pkt_id + 1;
ok_hdr.pkt_length = 5;
memcpy(vp, &ok_hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
*vp = 0xfe;
vp++;
memcpy(vp, &warnings, sizeof(uint16_t));
vp += 2;
memcpy(vp, &status_flags, sizeof(uint16_t));
break;
} else {
// Increment the package id by one due to 'column_eof_packet'
hdr.pkt_id += 1;
memcpy(vp, &hdr, sizeof(mysql_hdr));
vp += sizeof(mysql_hdr);
it += sizeof(mysql_hdr);
memcpy(vp, it, hdr.pkt_length);
vp += hdr.pkt_length;
it += hdr.pkt_length;
}
}
return result;
}
unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, const uint32_t kl, uint32_t *lv, unsigned long long curtime_ms, unsigned long long cache_ttl, bool deprecate_eof_active) {
unsigned char *result=NULL;
template <typename QC_DERIVED>
std::shared_ptr<QC_entry_t> Query_Cache<QC_DERIVED>::get(uint64_t user_hash, const unsigned char *kp,
const uint32_t kl, uint64_t curtime_ms, uint64_t cache_ttl) {
uint64_t hk=SpookyHash::Hash64(kp, kl, user_hash);
unsigned char i=hk%SHARED_QUERY_CACHE_HASH_TABLES;
uint8_t i=hk%SHARED_QUERY_CACHE_HASH_TABLES;
QC_entry_t *entry=KVs[i]->lookup(hk);
std::shared_ptr<QC_entry_t> entry_shared = KVs[i]->lookup(hk).lock();
if (entry!=NULL) {
unsigned long long t=curtime_ms;
if (entry->expire_ms > t && entry->create_ms + cache_ttl > t) {
if (entry_shared) {
uint64_t t = curtime_ms;
if (entry_shared->expire_ms > t && entry_shared->create_ms + cache_ttl > t) {
if (
mysql_thread___query_cache_soft_ttl_pct && !entry->refreshing &&
entry->create_ms + cache_ttl * mysql_thread___query_cache_soft_ttl_pct / 100 <= t
GET_THREAD_VARIABLE(query_cache_soft_ttl_pct) && !entry_shared->refreshing &&
entry_shared->create_ms + cache_ttl * GET_THREAD_VARIABLE(query_cache_soft_ttl_pct) / 100 <= t
) {
// If the Query Cache entry reach the soft_ttl but do not reach
// the cache_ttl, the next query hit the backend and refresh
@ -663,165 +491,61 @@ unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, co
// refreshing is in process, other queries keep using the "old"
// Query Cache entry.
// soft_ttl_pct with value 0 and 100 disables the functionality.
entry->refreshing = true;
entry_shared->refreshing = true;
} else {
THR_UPDATE_CNT(__thr_cntGetOK,Glo_cntGetOK,1,1);
THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT,entry->length,1);
if (deprecate_eof_active && entry->column_eof_pkt_offset) {
result = eof_to_ok_packet(entry);
*lv = entry->length + eof_to_ok_dif;
} else if (!deprecate_eof_active && entry->ok_pkt_offset){
result = ok_to_eof_packet(entry);
*lv = entry->length + ok_to_eof_dif;
} else {
result = (unsigned char *)malloc(entry->length);
memcpy(result, entry->value, entry->length);
*lv = entry->length;
}
if (t > entry->access_ms) entry->access_ms=t;
THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT, entry_shared->length,1);
if (t > entry_shared->access_ms) entry_shared->access_ms=t;
return entry_shared;
}
}
__sync_fetch_and_sub(&entry->ref_count,1);
}
return result;
return std::shared_ptr<QC_entry_t>(nullptr);
}
bool Query_Cache::set(uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp, uint32_t vl, unsigned long long create_ms, unsigned long long curtime_ms, unsigned long long expire_ms, bool deprecate_eof_active) {
QC_entry_t *entry = (QC_entry_t *)malloc(sizeof(QC_entry_t));
template <typename QC_DERIVED>
bool Query_Cache<QC_DERIVED>::set(QC_entry_t* entry, uint64_t user_hash, const unsigned char *kp, uint32_t kl,
unsigned char *vp, uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms) {
entry->klen=kl;
entry->length=vl;
entry->ref_count=0;
entry->column_eof_pkt_offset=0;
entry->row_eof_pkt_offset=0;
entry->ok_pkt_offset=0;
entry->refreshing=false;
// Find the first EOF location
unsigned char* it = vp;
it += sizeof(mysql_hdr);
uint64_t c_count = 0;
int c_count_len = mysql_decode_length(const_cast<unsigned char*>(it), &c_count);
it += c_count_len;
for (uint64_t i = 0; i < c_count; i++) {
mysql_hdr hdr;
memcpy(&hdr, it ,sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + hdr.pkt_length;
}
if (deprecate_eof_active == false) {
// Store EOF position and jump to rows
entry->column_eof_pkt_offset = it - vp;
mysql_hdr hdr;
memcpy(&hdr, it, sizeof(mysql_hdr));
it += sizeof(mysql_hdr) + hdr.pkt_length;
}
// Find the second EOF location or the OK packet
for (;;) {
mysql_hdr hdr;
memcpy(&hdr, it ,sizeof(mysql_hdr));
unsigned char* payload = it + sizeof(mysql_hdr);
if (hdr.pkt_length < 9 && *payload == 0xfe) {
if (deprecate_eof_active) {
entry->ok_pkt_offset = it - vp;
// Reset the warning flags to zero before storing resultset in the cache
// Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS".
// However, when retrieving data from the cache, it's possible that there are no warnings present
// that might be associated with previous interactions.
unsigned char* payload_temp = payload+1;
// skip affected_rows
payload_temp += mysql_decode_length(payload_temp, nullptr);
// skip last_insert_id
payload_temp += mysql_decode_length(payload_temp, nullptr);
// skip stats_flags
payload_temp += sizeof(uint16_t);
uint16_t warnings = 0;
memcpy(payload_temp, &warnings, sizeof(uint16_t));
} else {
entry->row_eof_pkt_offset = it - vp;
// Reset the warning flags to zero before storing resultset in the cache
// Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS".
// However, when retrieving data from the cache, it's possible that there are no warnings present
// that might be associated with previous interactions.
uint16_t warnings = 0;
memcpy((payload + 1), &warnings, sizeof(uint16_t));
}
break;
} else {
it += sizeof(mysql_hdr) + hdr.pkt_length;
}
}
entry->value=(char *)malloc(vl);
memcpy(entry->value,vp,vl);
entry->self=entry;
// entry->value = (unsigned char*)malloc(vl);
// memcpy(entry->value, vp, vl);
entry->value = vp; // no need to allocate new memory and copy value
//entry->self=entry;
entry->create_ms=create_ms;
entry->access_ms=curtime_ms;
entry->expire_ms=expire_ms;
uint64_t hk=SpookyHash::Hash64(kp, kl, user_hash);
unsigned char i=hk%SHARED_QUERY_CACHE_HASH_TABLES;
uint8_t i=hk%SHARED_QUERY_CACHE_HASH_TABLES;
entry->key=hk;
entry->kv=KVs[i];
KVs[i]->replace(hk, entry);
return true;
}
uint64_t Query_Cache::flush() {
int i;
template <typename QC_DERIVED>
uint64_t Query_Cache<QC_DERIVED>::flush() {
uint64_t total_count=0;
for (i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
total_count+=KVs[i]->cnt();
KVs[i]->empty();
for (int i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
total_count+=KVs[i]->count();
KVs[i]->clear();
}
return total_count;
};
void * Query_Cache::purgeHash_thread(void *) {
unsigned int i;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread * mysql_thr = new MySQL_Thread();
MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version();
set_thread_name("QueryCachePurge");
mysql_thr->refresh_variables();
max_memory_size = (uint64_t) mysql_thread___query_cache_size_MB*1024*1024;
while (shutdown==0) {
usleep(purge_loop_time);
unsigned long long t=monotonic_time()/1000;
QCnow_ms=t;
unsigned int glover=GloMTH->get_global_version();
if (GloMTH) {
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover;
mysql_thr->refresh_variables();
max_memory_size = (uint64_t) mysql_thread___query_cache_size_MB*1024*1024;
}
}
unsigned int curr_pct=current_used_memory_pct();
if (curr_pct < purge_threshold_pct_min ) continue;
for (i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
KVs[i]->purge_some(QCnow_ms, (curr_pct > purge_threshold_pct_max));
}
template <typename QC_DERIVED>
void Query_Cache<QC_DERIVED>::purgeHash(uint64_t QCnow_ms, unsigned int curr_pct) {
for (int i = 0; i < SHARED_QUERY_CACHE_HASH_TABLES; i++) {
KVs[i]->purge_some(QCnow_ms, (curr_pct > purge_threshold_pct_max));
}
delete mysql_thr;
return NULL;
};
}
SQLite3_result * Query_Cache::SQL3_getStats() {
const int colnum=2;
template <typename QC_DERIVED>
SQLite3_result* Query_Cache<QC_DERIVED>::SQL3_getStats() {
constexpr int colnum =2;
char buf[256];
char **pta=(char **)malloc(sizeof(char *)*colnum);
//Get_Memory_Stats();
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"Variable_Name");
result->add_column_definition(SQLITE_TEXT,"Variable_Value");
@ -877,3 +601,16 @@ SQLite3_result * Query_Cache::SQL3_getStats() {
free(pta);
return result;
}
template <typename QC_DERIVED>
void Query_Cache<QC_DERIVED>::purgeHash(uint64_t max_memory_size) {
const unsigned int curr_pct = current_used_memory_pct(max_memory_size);
if (curr_pct < purge_threshold_pct_min) return;
purgeHash((monotonic_time() / 1000ULL), curr_pct);
}
template
class Query_Cache<MySQL_Query_Cache>;
template
class Query_Cache<PgSQL_Query_Cache>;

@ -99,7 +99,7 @@ static char *s_strdup(char *s) {
static int __SQLite3_Server_refresh_interval=1000;
extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern MySQL_Authentication *GloMyAuth;
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Query_Processor* GloMyQPro;

@ -31,6 +31,8 @@ using json = nlohmann::json;
#include "MySQL_Authentication.hpp"
#include "PgSQL_Authentication.h"
#include "MySQL_LDAP_Authentication.hpp"
#include "MySQL_Query_Cache.h"
#include "PgSQL_Query_Cache.h"
#include "proxysql_restapi.h"
#include "Web_Interface.hpp"
#include "proxysql_utils.h"
@ -444,7 +446,8 @@ int listen_fd;
int socket_fd;
Query_Cache *GloQC;
MySQL_Query_Cache *GloMyQC;
PgSQL_Query_Cache* GloPgQC;
MySQL_Authentication *GloMyAuth;
PgSQL_Authentication* GloPgAuth;
MySQL_LDAP_Authentication *GloMyLdapAuth;
@ -607,11 +610,55 @@ void* pgsql_worker_thread_func_idles(void* arg) {
}
#endif // IDLE_THREADS
void * mysql_shared_query_cache_funct(void *arg) {
GloQC->purgeHash_thread(NULL);
void* unified_query_cache_purge_thread(void *arg) {
set_thread_name("QCPurgeThread");
MySQL_Thread* mysql_thr = new MySQL_Thread();
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version();
mysql_thr->refresh_variables();
uint64_t mysql_max_memory_size = static_cast<uint64_t>(mysql_thread___query_cache_size_MB * 1024ULL * 1024ULL);
PgSQL_Thread* pgsql_thr = new PgSQL_Thread();
unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version;
PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version();
pgsql_thr->refresh_variables();
uint64_t pgsql_max_memory_size = static_cast<uint64_t>(pgsql_thread___query_cache_size_MB * 1024ULL * 1024ULL);
// Both MySQL and PgSQL query caches use the same shutting_down value
while (GloMyQC->shutting_down == false && GloPgQC->shutting_down == false) {
// Both MySQL and PgSQL query caches share the same purge_loop_time value.
// Therefore, using either purge_loop_time will have no impact on the behavior.
usleep(GloMyQC->purge_loop_time);
const unsigned int mysql_glover = GloMTH->get_global_version();
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < mysql_glover) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = mysql_glover;
mysql_thr->refresh_variables();
mysql_max_memory_size = static_cast<uint64_t>(mysql_thread___query_cache_size_MB * 1024ULL * 1024ULL);
}
GloMyQC->purgeHash(mysql_max_memory_size);
const unsigned int pgsql_glover = GloPTH->get_global_version();
if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < pgsql_glover) {
PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = pgsql_glover;
pgsql_thr->refresh_variables();
pgsql_max_memory_size = static_cast<uint64_t>(pgsql_thread___query_cache_size_MB * 1024ULL * 1024ULL);
}
GloPgQC->purgeHash(pgsql_max_memory_size);
}
delete mysql_thr;
delete pgsql_thr;
return NULL;
}
/*void* pgsql_shared_query_cache_funct(void* arg) {
GloPgQC->purgeHash_thread(NULL);
return NULL;
}*/
void ProxySQL_Main_process_global_variables(int argc, const char **argv) {
GloVars.errorlog = NULL;
@ -815,7 +862,8 @@ void ProxySQL_Main_process_global_variables(int argc, const char **argv) {
}
void ProxySQL_Main_init_main_modules() {
GloQC=NULL;
GloMyQC=NULL;
GloPgQC=NULL;
GloMyQPro=NULL;
GloMTH=NULL;
GloMyAuth=NULL;
@ -946,9 +994,15 @@ void ProxySQL_Main_init_PgSQL_Threads_Handler_module() {
}
void ProxySQL_Main_init_Query_Cache_module() {
GloQC = new Query_Cache();
GloQC->print_version();
pthread_create(&GloQC->purge_thread_id, NULL, mysql_shared_query_cache_funct , NULL);
GloMyQC = new MySQL_Query_Cache();
GloMyQC->print_version();
GloPgQC = new PgSQL_Query_Cache();
GloPgQC->print_version();
pthread_t purge_thread_id;
pthread_create(&purge_thread_id, NULL, unified_query_cache_purge_thread, NULL);
GloMyQC->purge_thread_id = purge_thread_id;
GloPgQC->purge_thread_id = purge_thread_id;
}
void ProxySQL_Main_init_MySQL_Monitor_module() {
@ -1003,10 +1057,12 @@ void ProxySQL_Main_join_all_threads() {
std::cerr << "GloPTH joined in ";
#endif
}
if (GloQC) {
GloQC->shutdown=1;
if (GloMyQC) {
GloMyQC->shutting_down=true;
}
if (GloPgQC) {
GloPgQC->shutting_down=true;
}
if (GloMyMon) {
GloMyMon->shutdown=true;
}
@ -1032,14 +1088,33 @@ void ProxySQL_Main_join_all_threads() {
std::cerr << "GloPgMon joined in ";
#endif
}
// join GloQC thread
if (GloQC) {
/* Unified QC Purge Thread for both MySQL and PgSQL query cache
// join GloMyQC thread
if (GloMyQC) {
cpu_timer t;
pthread_join(GloQC->purge_thread_id, NULL);
pthread_join(GloMyQC->purge_thread_id, NULL);
#ifdef DEBUG
std::cerr << "GloQC joined in ";
std::cerr << "GloMyQC joined in ";
#endif
}
// join GloPgQC thread
if (GloPgQC) {
cpu_timer t;
pthread_join(GloPgQC->purge_thread_id, NULL);
#ifdef DEBUG
std::cerr << "GloPgQC joined in ";
#endif
}*/
if (GloMyQC || GloPgQC) {
cpu_timer t;
// The purge_thread_id is shared by both MySQL and PgSQL.
// use either one to join the thread.
pthread_join(GloMyQC->purge_thread_id, NULL);
#ifdef DEBUG
std::cerr << "GloMyQC and GloPgQC joined in ";
#endif
}
#ifdef DEBUG
std::cerr << "All threads joined in ";
#endif
@ -1062,12 +1137,20 @@ void ProxySQL_Main_shutdown_all_modules() {
std::cerr << "GloPgMon shutdown in ";
#endif
}
if (GloQC) {
if (GloMyQC) {
cpu_timer t;
delete GloMyQC;
GloMyQC=NULL;
#ifdef DEBUG
std::cerr << "GloMyQC shutdown in ";
#endif
}
if (GloPgQC) {
cpu_timer t;
delete GloQC;
GloQC=NULL;
delete GloPgQC;
GloPgQC=NULL;
#ifdef DEBUG
std::cerr << "GloQC shutdown in ";
std::cerr << "GloPgQC shutdown in ";
#endif
}
if (GloMyQPro) {

@ -73,7 +73,7 @@ static bool testTimeoutSequence[] = {true, false, true, false, true, false, true
static int testIndex = 7;
static int testLag = 10;
extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern MySQL_Authentication *GloMyAuth;
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Query_Processor* GloQPro;

Loading…
Cancel
Save