From 720441af2041d06c95f067b64f31c648ed6fb503 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 9 Oct 2024 10:58:30 +0500 Subject: [PATCH] Refactored and Optimized Query Cache with PgSQL Support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added Query Cache Support for PgSQL: * Introduced new variables for PgSQL Query Cache management: * pgsql-query_cache_size_MB * pgsql-query_cache_soft_ttl_pct * pgsql-query_cache_stores_empty_result * pgsql-query_cache_handle_warnings (not yet utilized) * Overwriting Transaction Status: * Sending current transaction state to the client when a result is returned from the query cache. * This currently does not differentiate between normal and error transaction states—both are treated as active transaction states. * Cache Lifetime Management: Replaced manual reference counting with std::shared_ptr to efficiently manage the lifecycle of cache entries, ensuring proper memory deallocation and reducing complexity. * Unified Purging Logic: Refactored the query cache purging logic to use a single thread for both MySQL and PgSQL cache entries. * Code Cleanup: Removed unnecessary methods and data members to streamline the codebase and improve maintainability. * Performance Optimization: Improved memory management, reducing memory footprint and enhancing performance. --- include/MySQL_Query_Cache.h | 26 ++ include/PgSQL_Data_Stream.h | 9 +- include/PgSQL_Query_Cache.h | 22 + include/cpp.h | 2 +- include/proxysql_structs.h | 11 +- include/query_cache.hpp | 111 +++-- lib/Admin_Bootstrap.cpp | 2 +- lib/Admin_FlushVariables.cpp | 2 +- lib/Admin_Handler.cpp | 32 +- lib/Base_Thread.cpp | 2 +- lib/ClickHouse_Server.cpp | 2 +- lib/Makefile | 3 +- lib/MySQL_Query_Cache.cpp | 316 +++++++++++++ lib/MySQL_Session.cpp | 16 +- lib/PgSQL_Connection.cpp | 2 +- lib/PgSQL_Data_Stream.cpp | 110 +++-- lib/PgSQL_Query_Cache.cpp | 48 ++ lib/PgSQL_Session.cpp | 75 ++- lib/PgSQL_Thread.cpp | 6 +- lib/ProxySQL_Admin.cpp | 19 +- lib/ProxySQL_Admin_Stats.cpp | 10 +- lib/Query_Cache.cpp | 791 +++++++++++--------------------- src/SQLite3_Server.cpp | 2 +- src/main.cpp | 119 ++++- test/tap/tap/SQLite3_Server.cpp | 2 +- 25 files changed, 1023 insertions(+), 717 deletions(-) create mode 100644 include/MySQL_Query_Cache.h create mode 100644 include/PgSQL_Query_Cache.h create mode 100644 lib/MySQL_Query_Cache.cpp create mode 100644 lib/PgSQL_Query_Cache.cpp diff --git a/include/MySQL_Query_Cache.h b/include/MySQL_Query_Cache.h new file mode 100644 index 000000000..6bd822fc2 --- /dev/null +++ b/include/MySQL_Query_Cache.h @@ -0,0 +1,26 @@ +#ifndef __CLASS_MYSQL_QUERY_CACHE_H +#define __CLASS_MYSQL_QUERY_CACHE_H + +#include "proxysql.h" +#include "cpp.h" +#include "query_cache.hpp" + +typedef struct _MySQL_QC_entry : public QC_entry_t { + uint32_t column_eof_pkt_offset; + uint32_t row_eof_pkt_offset; + uint32_t ok_pkt_offset; +} MySQL_QC_entry_t; + +class MySQL_Query_Cache : public Query_Cache { +public: + MySQL_Query_Cache() = default; + ~MySQL_Query_Cache() = default; + + bool set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, uint32_t vl, + uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms, bool deprecate_eof_active); + unsigned char* get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl, uint32_t* lv, + uint64_t curtime_ms, uint64_t cache_ttl, bool deprecate_eof_active); + //void* purgeHash_thread(void*); +}; + +#endif /* __CLASS_MYSQL_QUERY_CACHE_H */ diff --git a/include/PgSQL_Data_Stream.h b/include/PgSQL_Data_Stream.h index 5b7dbc454..f13e9caba 100644 --- a/include/PgSQL_Data_Stream.h +++ b/include/PgSQL_Data_Stream.h @@ -108,8 +108,8 @@ public: FixedSizeQueue data_packets_history_IN; FixedSizeQueue data_packets_history_OUT; //PtrSizeArray *PSarrayOUTpending; - PtrSizeArray* resultset; - unsigned int resultset_length; + //PtrSizeArray* resultset; + //unsigned int resultset_length; ProxySQL_Poll* mypolls; //int listener; @@ -201,8 +201,9 @@ public: void check_data_flow(); int assign_fd_from_mysql_conn(); - unsigned char* resultset2buffer(bool); - void buffer2resultset(unsigned char*, unsigned int); + static unsigned char* copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length, bool del); + static void copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size, + char current_transaction_state); // safe way to attach a PgSQL Connection void attach_connection(PgSQL_Connection* mc) { diff --git a/include/PgSQL_Query_Cache.h b/include/PgSQL_Query_Cache.h new file mode 100644 index 000000000..2b695f28b --- /dev/null +++ b/include/PgSQL_Query_Cache.h @@ -0,0 +1,22 @@ +#ifndef __CLASS_PGSQL_QUERY_CACHE_H +#define __CLASS_PGSQL_QUERY_CACHE_H + +#include "proxysql.h" +#include "cpp.h" +#include "query_cache.hpp" + +typedef struct _PgSQL_QC_entry : public QC_entry_t {} PgSQL_QC_entry_t; + +class PgSQL_Query_Cache : public Query_Cache { +public: + PgSQL_Query_Cache() = default; + ~PgSQL_Query_Cache() = default; + + bool set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, uint32_t vl, + uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms); + const std::shared_ptr get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl, + uint64_t curtime_ms, uint64_t cache_ttl); + //void* purgeHash_thread(void*); +}; + +#endif /* __CLASS_PGSQL_QUERY_CACHE_H */ diff --git a/include/cpp.h b/include/cpp.h index 12f965997..e50670056 100644 --- a/include/cpp.h +++ b/include/cpp.h @@ -10,7 +10,7 @@ #include "PgSQL_Backend.h" #include "ProxySQL_Poll.h" //#include "MySQL_Data_Stream.h" -#include "query_cache.hpp" +//#include "MySQL_Query_Cache.h" #include "mysql_connection.h" #include "sqlite3db.h" //#include "StatCounters.h" diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 47b149081..a5ac7109f 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -706,7 +706,8 @@ class SimpleKV; class AdvancedKV; template class ProxySQL_Poll; -class Query_Cache; +class MySQL_Query_Cache; +class PgSQL_Query_Cache; class MySQL_Authentication; class MySQL_Connection; class PgSQL_Connection; @@ -1095,6 +1096,10 @@ __thread int pgsql_thread___monitor_threads; __thread char* pgsql_thread___monitor_username; __thread char* pgsql_thread___monitor_password; +// PgSQL Query Cache +__thread int pgsql_thread___query_cache_size_MB; +__thread int pgsql_thread___query_cache_soft_ttl_pct; +__thread int pgsql_thread___query_cache_handle_warnings; //--------------------------- __thread char *mysql_thread___default_schema; @@ -1382,6 +1387,10 @@ extern __thread int pgsql_thread___monitor_threads; extern __thread char* pgsql_thread___monitor_username; extern __thread char* pgsql_thread___monitor_password; +// PgSQL Query Cache +extern __thread int pgsql_thread___query_cache_size_MB; +extern __thread int pgsql_thread___query_cache_soft_ttl_pct; +extern __thread int pgsql_thread___query_cache_handle_warnings; //--------------------------- extern __thread char *mysql_thread___default_schema; diff --git a/include/query_cache.hpp b/include/query_cache.hpp index ee621d363..1659ceb9c 100644 --- a/include/query_cache.hpp +++ b/include/query_cache.hpp @@ -1,9 +1,9 @@ #ifndef __CLASS_QUERY_CACHE_H #define __CLASS_QUERY_CACHE_H - #include "proxysql.h" #include "cpp.h" -#include +#include "prometheus/counter.h" +#include "prometheus/gauge.h" #define EXPIRE_DROPIT 0 #define SHARED_QUERY_CACHE_HASH_TABLES 32 @@ -13,30 +13,6 @@ #define DEFAULT_purge_threshold_pct_min 3 #define DEFAULT_purge_threshold_pct_max 90 -#include "prometheus/counter.h" -#include "prometheus/gauge.h" - -class KV_BtreeArray; - -typedef struct __QC_entry_t QC_entry_t; - -struct __QC_entry_t { - uint64_t key; // primary key - char *value; // pointer to value - KV_BtreeArray *kv; // pointer to the KV_BtreeArray where the entry is stored - QC_entry_t *self; // pointer to itself - uint32_t klen; // length of the key : FIXME: not sure if still relevant - uint32_t length; // length of the value - unsigned long long create_ms; // when the entry was created, monotonic, millisecond granularity - unsigned long long expire_ms; // when the entry will expire, monotonic , millisecond granularity - unsigned long long access_ms; // when the entry was read last , monotonic , millisecond granularity - bool refreshing; // true when a client will hit the backend to refresh the entry - uint32_t column_eof_pkt_offset = 0; - uint32_t row_eof_pkt_offset = 0; - uint32_t ok_pkt_offset = 0; - uint32_t ref_count; // reference counter -}; - struct p_qc_counter { enum metric { query_cache_count_get = 0, @@ -65,34 +41,67 @@ struct qc_metrics_map_idx { }; class KV_BtreeArray; +class MySQL_Query_Cache; +class PgSQL_Query_Cache; +struct _MySQL_QC_entry; +struct _PgSQL_QC_entry; +typedef struct _MySQL_QC_entry MySQL_QC_entry_t; +typedef struct _PgSQL_QC_entry PgSQL_QC_entry_t; + +typedef struct _QC_entry { + uint64_t key; // primary key + unsigned char *value; // pointer to value + uint32_t length; // length of the value + uint32_t klen; // length of the key : FIXME: not sure if still relevant + uint64_t create_ms; // when the entry was created, monotonic, millisecond granularity + uint64_t expire_ms; // when the entry will expire, monotonic , millisecond granularity + uint64_t access_ms; // when the entry was read last , monotonic , millisecond granularity + bool refreshing; // true when a client will hit the backend to refresh the entry + KV_BtreeArray* kv; // pointer to the KV_BtreeArray where the entry is stored (used for troubleshooting) + //struct _QC_entry* self; // pointer to itself +} QC_entry_t; + +template class Query_Cache { - private: - KV_BtreeArray * KVs[SHARED_QUERY_CACHE_HASH_TABLES]; - uint64_t get_data_size_total(); - unsigned int current_used_memory_pct(); - struct { - std::array p_counter_array {}; - std::array p_gauge_array {}; - } metrics; - public: + static_assert(std::is_same_v || std::is_same_v, + "Invalid QC_DERIVED Query Cache type"); + using TypeQCEntry = typename std::conditional, + MySQL_QC_entry_t, PgSQL_QC_entry_t>::type; +public: + static bool shutting_down; + static pthread_t purge_thread_id; + constexpr static unsigned int purge_loop_time = DEFAULT_purge_loop_time; + + void print_version(); + uint64_t flush(); void p_update_metrics(); - void * purgeHash_thread(void *); - int size; - int shutdown; - unsigned long long QCnow_ms; - pthread_t purge_thread_id; - unsigned int purge_loop_time; - unsigned int purge_total_time; - unsigned int purge_threshold_pct_min; - unsigned int purge_threshold_pct_max; - uint64_t max_memory_size; + SQLite3_result* SQL3_getStats(); + void purgeHash(uint64_t max_memory_size); + +protected: Query_Cache(); ~Query_Cache(); - void print_version(); - bool set(uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp, uint32_t vl, unsigned long long create_ms, unsigned long long curtime_ms, unsigned long long expire_ms, bool deprecate_eof_active); - unsigned char * get(uint64_t , const unsigned char *, const uint32_t, uint32_t *, unsigned long long, unsigned long long, bool deprecate_eof_active); - uint64_t flush(); - SQLite3_result * SQL3_getStats(); + + bool set(QC_entry_t* entry, uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp, + uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms); + std::shared_ptr get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl, + uint64_t curtime_ms, uint64_t cache_ttl); + + constexpr static unsigned int purge_total_time = DEFAULT_purge_total_time; + constexpr static unsigned int purge_threshold_pct_min = DEFAULT_purge_threshold_pct_min; + constexpr static unsigned int purge_threshold_pct_max = DEFAULT_purge_threshold_pct_max; + //uint64_t max_memory_size; + +private: + KV_BtreeArray* KVs[SHARED_QUERY_CACHE_HASH_TABLES]; + uint64_t get_data_size_total(); + unsigned int current_used_memory_pct(uint64_t max_memory_size); + void purgeHash(uint64_t QCnow_ms, unsigned int curr_pct); + + struct { + std::array p_counter_array{}; + std::array p_gauge_array{}; + } metrics; }; -#endif /* __CLASS_QUERY_CACHE_H */ +#endif /* __CLASS_QUERY_CACHE_H */ diff --git a/lib/Admin_Bootstrap.cpp b/lib/Admin_Bootstrap.cpp index 6bd80ffe5..8d46dfaf2 100644 --- a/lib/Admin_Bootstrap.cpp +++ b/lib/Admin_Bootstrap.cpp @@ -148,7 +148,7 @@ struct cpu_timer extern int admin_load_main_; extern bool admin_nostart_; -extern Query_Cache *GloQC; +//extern MySQL_Query_Cache *GloMyQC; extern MySQL_Authentication *GloMyAuth; extern PgSQL_Authentication *GloPgAuth; extern MySQL_LDAP_Authentication *GloMyLdapAuth; diff --git a/lib/Admin_FlushVariables.cpp b/lib/Admin_FlushVariables.cpp index 445e98c1c..c89292190 100644 --- a/lib/Admin_FlushVariables.cpp +++ b/lib/Admin_FlushVariables.cpp @@ -125,7 +125,7 @@ extern char * proxysql_version; #include "proxysql_find_charset.h" -extern Query_Cache *GloQC; +//extern MySQL_Query_Cache *GloMyQC; extern MySQL_Authentication *GloMyAuth; extern PgSQL_Authentication *GloPgAuth; extern MySQL_LDAP_Authentication *GloMyLdapAuth; diff --git a/lib/Admin_Handler.cpp b/lib/Admin_Handler.cpp index 0210d9183..0e7017797 100644 --- a/lib/Admin_Handler.cpp +++ b/lib/Admin_Handler.cpp @@ -77,6 +77,8 @@ using json = nlohmann::json; #include #include "PgSQL_Protocol.h" +#include "MySQL_Query_Cache.h" +#include "PgSQL_Query_Cache.h" //#include "usual/time.h" using std::string; @@ -135,7 +137,8 @@ extern bool admin_proxysql_pgsql_paused; extern int admin_old_wait_timeout; -extern Query_Cache *GloQC; +extern MySQL_Query_Cache *GloMyQC; +extern PgSQL_Query_Cache* GloPgQC; extern MySQL_Authentication *GloMyAuth; extern PgSQL_Authentication *GloPgAuth; extern MySQL_LDAP_Authentication *GloMyLdapAuth; @@ -653,8 +656,31 @@ bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_ if (query_no_space_length==strlen("PROXYSQL FLUSH QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH QUERY CACHE",query_no_space, query_no_space_length)) { proxy_info("Received PROXYSQL FLUSH QUERY CACHE command\n"); ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa; - if (GloQC) { - GloQC->flush(); + if (GloMyQC) { + GloMyQC->flush(); + } + //if (GloPgQC) { + // GloPgQC->flush(); + //} + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + return false; + } + + if (query_no_space_length == strlen("PROXYSQL FLUSH MYSQL QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH MYSQL QUERY CACHE", query_no_space, query_no_space_length)) { + proxy_info("Received PROXYSQL FLUSH MYSQL QUERY CACHE command\n"); + ProxySQL_Admin* SPA = (ProxySQL_Admin*)pa; + if (GloMyQC) { + GloMyQC->flush(); + } + SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); + return false; + } + + if (query_no_space_length == strlen("PROXYSQL FLUSH PGSQL QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH PGSQL QUERY CACHE", query_no_space, query_no_space_length)) { + proxy_info("Received PROXYSQL FLUSH PGSQL QUERY CACHE command\n"); + ProxySQL_Admin* SPA = (ProxySQL_Admin*)pa; + if (GloPgQC) { + GloPgQC->flush(); } SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space); return false; diff --git a/lib/Base_Thread.cpp b/lib/Base_Thread.cpp index 50cbb0c18..a1e2ebcc8 100644 --- a/lib/Base_Thread.cpp +++ b/lib/Base_Thread.cpp @@ -379,7 +379,7 @@ bool Base_Thread::set_backend_to_be_skipped_if_frontend_is_slow(DS * myds, unsig if constexpr (std::is_same_v) { unsigned int buffered_data = 0; buffered_data = myds->sess->client_myds->PSarrayOUT->len * PGSQL_RESULTSET_BUFLEN; - buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN; + //buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN; if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 4) { thr->mypolls.fds[n].events = 0; return true; diff --git a/lib/ClickHouse_Server.cpp b/lib/ClickHouse_Server.cpp index 28ef34171..ae1fcfe31 100644 --- a/lib/ClickHouse_Server.cpp +++ b/lib/ClickHouse_Server.cpp @@ -441,7 +441,7 @@ static char *s_strdup(char *s) { static int __ClickHouse_Server_refresh_interval=1000; -extern Query_Cache *GloQC; +extern MySQL_Query_Cache *GloMyQC; extern ClickHouse_Authentication *GloClickHouseAuth; extern ProxySQL_Admin *GloAdmin; extern MySQL_Query_Processor* GloMyQPro; diff --git a/lib/Makefile b/lib/Makefile index 8585f4f40..504b766ee 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -147,7 +147,8 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo Base_Session.oo Base_Thread.oo \ proxy_protocol_info.oo \ proxysql_find_charset.oo ProxySQL_Poll.oo \ - PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo PgSQL_Monitor.oo + PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo \ + MySQL_Query_Cache.oo PgSQL_Query_Cache.oo PgSQL_Monitor.oo OBJ_CXX := $(patsubst %,$(ODIR)/%,$(_OBJ_CXX)) HEADERS := ../include/*.h ../include/*.hpp diff --git a/lib/MySQL_Query_Cache.cpp b/lib/MySQL_Query_Cache.cpp new file mode 100644 index 000000000..94334e130 --- /dev/null +++ b/lib/MySQL_Query_Cache.cpp @@ -0,0 +1,316 @@ +#include "proxysql.h" +#include "cpp.h" +#include "MySQL_Protocol.h" +#include "MySQL_Query_Cache.h" + +extern MySQL_Threads_Handler* GloMTH; + +const int eof_to_ok_dif = static_cast(-(sizeof(mysql_hdr) + 5) + 2); +const int ok_to_eof_dif = static_cast(+(sizeof(mysql_hdr) + 5) - 2); + +/** + * @brief Converts a 'EOF_Packet' to holded inside a 'QC_entry_t' into a 'OK_Packet'. + * Warning: This function assumes that the supplied 'QC_entry_t' holds a valid + * 'EOF_Packet'. + * + * @param entry The 'QC_entry_t' holding a 'OK_Packet' to be converted into + * a 'EOF_Packet'. + * @return The converted packet. + */ +unsigned char* eof_to_ok_packet(const MySQL_QC_entry_t* entry) { + unsigned char* result = (unsigned char*)malloc(entry->length + eof_to_ok_dif); + unsigned char* vp = result; + unsigned char* it = entry->value; + + // Copy until the first EOF + memcpy(vp, entry->value, entry->column_eof_pkt_offset); + it += entry->column_eof_pkt_offset; + vp += entry->column_eof_pkt_offset; + + // Skip the first EOF after columns def + mysql_hdr hdr; + memcpy(&hdr, it, sizeof(mysql_hdr)); + it += sizeof(mysql_hdr) + hdr.pkt_length; + + // Copy all the rows + uint64_t u_entry_val = reinterpret_cast(entry->value); + uint64_t u_it_pos = reinterpret_cast(it); + uint64_t rows_length = (u_entry_val + entry->row_eof_pkt_offset) - u_it_pos; + memcpy(vp, it, rows_length); + vp += rows_length; + it += rows_length; + + // Replace final EOF in favor of OK packet + // ======================================= + // Copy the mysql header + memcpy(&hdr, it, sizeof(mysql_hdr)); + hdr.pkt_length = 7; + memcpy(vp, &hdr, sizeof(mysql_hdr)); + vp += sizeof(mysql_hdr); + it += sizeof(mysql_hdr); + + // OK packet header + *vp = 0xfe; + vp++; + it++; + // Initialize affected_rows and last_insert_id to zero + memset(vp, 0, 2); + vp += 2; + // Extract warning flags and status from 'EOF_packet' + unsigned char* eof_packet = entry->value + entry->row_eof_pkt_offset; + eof_packet += sizeof(mysql_hdr); + // Skip the '0xFE EOF packet header' + eof_packet += 1; + uint16_t warnings; + memcpy(&warnings, eof_packet, sizeof(uint16_t)); + eof_packet += 2; + uint16_t status_flags; + memcpy(&status_flags, eof_packet, sizeof(uint16_t)); + // Copy warnings an status flags + memcpy(vp, &status_flags, sizeof(uint16_t)); + vp += 2; + memcpy(vp, &warnings, sizeof(uint16_t)); + // ======================================= + + // Decrement ids after the first EOF + unsigned char* dp = result + entry->column_eof_pkt_offset; + mysql_hdr decrement_hdr; + for (;;) { + memcpy(&decrement_hdr, dp, sizeof(mysql_hdr)); + decrement_hdr.pkt_id--; + memcpy(dp, &decrement_hdr, sizeof(mysql_hdr)); + dp += sizeof(mysql_hdr) + decrement_hdr.pkt_length; + if (dp >= vp) + break; + } + + return result; +} + +/** + * @brief Converts a 'OK_Packet' holded inside 'QC_entry_t' into a 'EOF_Packet'. + * Warning: This function assumes that the supplied 'QC_entry_t' holds a valid + * 'OK_Packet'. + * + * @param entry The 'QC_entry_t' holding a 'EOF_Packet' to be converted into + * a 'OK_Packet'. + * @return The converted packet. + */ +unsigned char* ok_to_eof_packet(const MySQL_QC_entry_t* entry) { + unsigned char* result = (unsigned char*)malloc(entry->length + ok_to_eof_dif); + unsigned char* vp = result; + unsigned char* it = entry->value; + + // Extract warning flags and status from 'OK_packet' + unsigned char* ok_packet = it + entry->ok_pkt_offset; + mysql_hdr ok_hdr; + memcpy(&ok_hdr, ok_packet, sizeof(mysql_hdr)); + ok_packet += sizeof(mysql_hdr); + // Skip the 'OK packet header', 'affected_rows' and 'last_insert_id' + ok_packet += 3; + uint16_t status_flags; + memcpy(&status_flags, ok_packet, sizeof(uint16_t)); + ok_packet += 2; + uint16_t warnings; + memcpy(&warnings, ok_packet, sizeof(uint16_t)); + + // Find the spot in which the first EOF needs to be placed + it += sizeof(mysql_hdr); + uint64_t c_count = 0; + int c_count_len = mysql_decode_length(reinterpret_cast(it), &c_count); + it += c_count_len; + + mysql_hdr column_hdr; + for (uint64_t i = 0; i < c_count; i++) { + memcpy(&column_hdr, it, sizeof(mysql_hdr)); + it += sizeof(mysql_hdr) + column_hdr.pkt_length; + } + + // Location for 'column_eof' + uint64_t column_eof_offset = + reinterpret_cast(it) - + reinterpret_cast(entry->value); + memcpy(vp, entry->value, column_eof_offset); + vp += column_eof_offset; + + // Write 'column_eof_packet' header + column_hdr.pkt_id = column_hdr.pkt_id + 1; + column_hdr.pkt_length = 5; + memcpy(vp, &column_hdr, sizeof(mysql_hdr)); + vp += sizeof(mysql_hdr); + + // Write 'column_eof_packet' contents + *vp = 0xfe; + vp++; + memcpy(vp, &warnings, sizeof(uint16_t)); + vp += 2; + memcpy(vp, &status_flags, sizeof(uint16_t)); + vp += 2; + + // Find the OK packet + for (;;) { + mysql_hdr hdr; + memcpy(&hdr, it, sizeof(mysql_hdr)); + unsigned char* payload = + reinterpret_cast(it) + + sizeof(mysql_hdr); + + if (hdr.pkt_length < 9 && *payload == 0xfe) { + mysql_hdr ok_hdr; + ok_hdr.pkt_id = hdr.pkt_id + 1; + ok_hdr.pkt_length = 5; + memcpy(vp, &ok_hdr, sizeof(mysql_hdr)); + vp += sizeof(mysql_hdr); + + *vp = 0xfe; + vp++; + memcpy(vp, &warnings, sizeof(uint16_t)); + vp += 2; + memcpy(vp, &status_flags, sizeof(uint16_t)); + break; + } + else { + // Increment the package id by one due to 'column_eof_packet' + hdr.pkt_id += 1; + memcpy(vp, &hdr, sizeof(mysql_hdr)); + vp += sizeof(mysql_hdr); + it += sizeof(mysql_hdr); + memcpy(vp, it, hdr.pkt_length); + vp += hdr.pkt_length; + it += hdr.pkt_length; + } + } + + return result; +} + +bool MySQL_Query_Cache::set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, + uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms, bool deprecate_eof_active) { + MySQL_QC_entry_t* entry = (MySQL_QC_entry_t*)malloc(sizeof(MySQL_QC_entry_t)); + + entry->column_eof_pkt_offset = 0; + entry->row_eof_pkt_offset = 0; + entry->ok_pkt_offset = 0; + + // Find the first EOF location + unsigned char* it = vp; + it += sizeof(mysql_hdr); + uint64_t c_count = 0; + int c_count_len = mysql_decode_length(const_cast(it), &c_count); + it += c_count_len; + + for (uint64_t i = 0; i < c_count; i++) { + mysql_hdr hdr; + memcpy(&hdr, it, sizeof(mysql_hdr)); + it += sizeof(mysql_hdr) + hdr.pkt_length; + } + + if (deprecate_eof_active == false) { + // Store EOF position and jump to rows + entry->column_eof_pkt_offset = it - vp; + mysql_hdr hdr; + memcpy(&hdr, it, sizeof(mysql_hdr)); + it += sizeof(mysql_hdr) + hdr.pkt_length; + } + + // Find the second EOF location or the OK packet + for (;;) { + mysql_hdr hdr; + memcpy(&hdr, it, sizeof(mysql_hdr)); + unsigned char* payload = it + sizeof(mysql_hdr); + + if (hdr.pkt_length < 9 && *payload == 0xfe) { + if (deprecate_eof_active) { + entry->ok_pkt_offset = it - vp; + + // Reset the warning flags to zero before storing resultset in the cache + // Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS". + // However, when retrieving data from the cache, it's possible that there are no warnings present + // that might be associated with previous interactions. + unsigned char* payload_temp = payload + 1; + + // skip affected_rows + payload_temp += mysql_decode_length(payload_temp, nullptr); + + // skip last_insert_id + payload_temp += mysql_decode_length(payload_temp, nullptr); + + // skip stats_flags + payload_temp += sizeof(uint16_t); + + uint16_t warnings = 0; + memcpy(payload_temp, &warnings, sizeof(uint16_t)); + + } + else { + entry->row_eof_pkt_offset = it - vp; + + // Reset the warning flags to zero before storing resultset in the cache + // Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS". + // However, when retrieving data from the cache, it's possible that there are no warnings present + // that might be associated with previous interactions. + uint16_t warnings = 0; + memcpy((payload + 1), &warnings, sizeof(uint16_t)); + } + break; + } + else { + it += sizeof(mysql_hdr) + hdr.pkt_length; + } + } + + return Query_Cache::set(entry, user_hash, kp, kl, vp, vl, create_ms, curtime_ms, expire_ms); +} + +unsigned char* MySQL_Query_Cache::get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl, uint32_t* lv, + uint64_t curtime_ms, uint64_t cache_ttl, bool deprecate_eof_active) { + unsigned char* result = NULL; + + std::shared_ptr entry_shared = std::static_pointer_cast( + Query_Cache::get(user_hash, kp, kl, curtime_ms, cache_ttl) + ); + + if (entry_shared) { + if (deprecate_eof_active && entry_shared->column_eof_pkt_offset) { + result = eof_to_ok_packet(entry_shared.get()); + *lv = entry_shared->length + eof_to_ok_dif; + } + else if (!deprecate_eof_active && entry_shared->ok_pkt_offset) { + result = ok_to_eof_packet(entry_shared.get()); + *lv = entry_shared->length + ok_to_eof_dif; + } + else { + result = (unsigned char*)malloc(entry_shared->length); + memcpy(result, entry_shared->value, entry_shared->length); + *lv = entry_shared->length; + } + //__sync_fetch_and_sub(&entry->ref_count, 1); + } + return result; +} + +/*void* MySQL_Query_Cache::purgeHash_thread(void*) { + + unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; + MySQL_Thread* mysql_thr = new MySQL_Thread(); + MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version(); + set_thread_name("MyQCPurge"); + mysql_thr->refresh_variables(); + max_memory_size = static_cast(mysql_thread___query_cache_size_MB*1024ULL*1024ULL); + while (shutting_down == false) { + usleep(purge_loop_time); + unsigned int glover = GloMTH->get_global_version(); + if (GloMTH) { + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover; + mysql_thr->refresh_variables(); + max_memory_size = static_cast(mysql_thread___query_cache_size_MB*1024ULL*1024ULL); + } + } + const unsigned int curr_pct = current_used_memory_pct(); + if (curr_pct < purge_threshold_pct_min) continue; + Query_Cache::purgeHash((monotonic_time()/1000ULL), curr_pct); + } + delete mysql_thr; + return NULL; +}*/ diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 490746306..a38a09065 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -22,7 +22,7 @@ using json = nlohmann::json; #include "SQLite3_Server.h" #include "MySQL_Variables.h" #include "ProxySQL_Cluster.hpp" - +#include "MySQL_Query_Cache.h" #include "libinjection.h" #include "libinjection_sqli.h" @@ -333,7 +333,7 @@ __exit_kill_query_thread: } extern MySQL_Query_Processor* GloMyQPro; -extern Query_Cache *GloQC; +extern MySQL_Query_Cache *GloMyQC; extern ProxySQL_Admin *GloAdmin; extern MySQL_Threads_Handler *GloMTH; @@ -6810,7 +6810,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C if (qpo->cache_ttl>0 && ((prepare_stmt_type & ps_type_prepare_stmt) == 0)) { bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF; uint32_t resbuf=0; - unsigned char *aa=GloQC->get( + unsigned char *aa= GloMyQC->get( client_myds->myconn->userinfo->hash, (const unsigned char *)CurrentQuery.QueryPointer , CurrentQuery.QueryLength , @@ -7254,18 +7254,18 @@ void MySQL_Session::MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *My unsigned char *aa=client_myds->resultset2buffer(false); while (client_myds->resultset->len) client_myds->resultset->remove_index(client_myds->resultset->len-1,NULL); bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF; - GloQC->set( + GloMyQC->set( client_myds->myconn->userinfo->hash , - (const unsigned char *)CurrentQuery.QueryPointer, + CurrentQuery.QueryPointer, CurrentQuery.QueryLength, - aa , + aa , // Query Cache now have the ownership, no need to free it client_myds->resultset_length , thread->curtime/1000 , thread->curtime/1000 , thread->curtime/1000 + qpo->cache_ttl, deprecate_eof_active ); - l_free(client_myds->resultset_length,aa); + //l_free(client_myds->resultset_length,aa); client_myds->resultset_length=0; } } @@ -7491,7 +7491,7 @@ void MySQL_Session::Memory_Stats() { unsigned long long internal=0; internal+=sizeof(MySQL_Session); if (qpo) - internal+=sizeof(Query_Processor_Output); + internal+=sizeof(MySQL_Query_Processor_Output); if (client_myds) { internal+=sizeof(MySQL_Data_Stream); if (client_myds->queueIN.buffer) diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index fa665e312..6fefbf3ef 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -1759,7 +1759,7 @@ handler_again: myds->sess->status != SHOW_WARNINGS*/) { // see issue#4072 unsigned int buffered_data = 0; buffered_data = myds->sess->client_myds->PSarrayOUT->len * PGSQL_RESULTSET_BUFLEN; - buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN; + //buffered_data += myds->sess->client_myds->resultset->len * PGSQL_RESULTSET_BUFLEN; if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 8) { next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232 break; diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index f7fb1c16b..755d9de9c 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -292,13 +292,13 @@ PgSQL_Data_Stream::PgSQL_Data_Stream() { kill_type = 0; connect_tries = 0; poll_fds_idx = -1; - resultset_length = 0; + //resultset_length = 0; revents = 0; PSarrayIN = NULL; PSarrayOUT = NULL; - resultset = NULL; + //resultset = NULL; queue_init(queueIN, QUEUE_T_DEFAULT_SIZE); queue_init(queueOUT, QUEUE_T_DEFAULT_SIZE); mybe = NULL; @@ -374,13 +374,13 @@ PgSQL_Data_Stream::~PgSQL_Data_Stream() { } delete PSarrayOUT; } - if (resultset) { + /*if (resultset) { while (resultset->len) { resultset->remove_index_fast(0, &pkt); l_free(pkt.size, pkt.ptr); } delete resultset; - } + }*/ if (mypolls) mypolls->remove_index_fast(poll_fds_idx); @@ -438,7 +438,7 @@ void PgSQL_Data_Stream::init() { if (PSarrayIN == NULL) PSarrayIN = new PtrSizeArray(); if (PSarrayOUT == NULL) PSarrayOUT = new PtrSizeArray(); // if (PSarrayOUTpending==NULL) PSarrayOUTpending= new PtrSizeArray(); - if (resultset == NULL) resultset = new PtrSizeArray(); + //if (resultset == NULL) resultset = new PtrSizeArray(); if (unlikely(GloVars.global.data_packets_history_size)) { data_packets_history_IN.set_max_size(GloVars.global.data_packets_history_size); @@ -1174,7 +1174,8 @@ __exit_array2buffer: return ret; } -unsigned char* PgSQL_Data_Stream::resultset2buffer(bool del) { +unsigned char* PgSQL_Data_Stream::copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length, + bool del) { unsigned int i; unsigned int l = 0; unsigned char* mybuff = (unsigned char*)l_alloc(resultset_length); @@ -1188,52 +1189,75 @@ unsigned char* PgSQL_Data_Stream::resultset2buffer(bool del) { return mybuff; }; -void PgSQL_Data_Stream::buffer2resultset(unsigned char* ptr, unsigned int size) { - unsigned char* __ptr = ptr; - mysql_hdr hdr; - unsigned int l; - void* buff = NULL; - unsigned int bl; - unsigned int bf; - while (__ptr < ptr + size) { - memcpy(&hdr, __ptr, sizeof(mysql_hdr)); - l = hdr.pkt_length + sizeof(mysql_hdr); // amount of space we need - if (buff) { - if (bf < l) { - // we ran out of space - resultset->add(buff, bl - bf); - buff = NULL; +static inline uint32_t get_uint32(const unsigned char* ptr) { + return (static_cast(ptr[0]) << 24) | + (static_cast(ptr[1]) << 16) | + (static_cast(ptr[2]) << 8) | + static_cast(ptr[3]); +} + +void PgSQL_Data_Stream::copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size, + char current_transaction_state) { + unsigned char* current_ptr = ptr; + unsigned char* buffer = NULL; + uint32_t data_len; + uint32_t buffer_len; + uint32_t remaining_space; + + while (current_ptr < ptr + size) { + uint8_t packet_type = *current_ptr; // read packet type (unused in the code) + // Read 4-byte length + /*unsigned int a = current_ptr[read_pos++]; + unsigned int b = current_ptr[read_pos++]; + unsigned int c = current_ptr[read_pos++]; + unsigned int d = current_ptr[read_pos++]; + const uint32_t packet_length = (a << 24) | (b << 16) | (c << 8) | d; + */ + data_len = get_uint32(current_ptr + 1 /*skip type*/) + 1; // total space needed, including type + + // Handle buffer space + if (buffer) { + if (remaining_space < data_len) { + // Insufficient space, add buffer to resultset + resultset->add(buffer, buffer_len - remaining_space); + buffer = NULL; } } - if (buff == NULL) { - if (__ptr + RESULTSET_BUFLEN_DS_1M <= ptr + size) { - bl = RESULTSET_BUFLEN_DS_1M; + + // Allocate new buffer if needed + if (buffer == NULL) { + // Set buffer size depending on available space + if (current_ptr + RESULTSET_BUFLEN_DS_1M <= ptr + size) { + buffer_len = RESULTSET_BUFLEN_DS_1M; } else { - bl = RESULTSET_BUFLEN_DS_16K; + buffer_len = RESULTSET_BUFLEN_DS_16K; } - if (l > bl) { - bl = l; // make sure there is the space to copy a packet + + // Ensure buffer is large enough for the current packet + if (data_len > buffer_len) { + buffer_len = data_len; } - buff = malloc(bl); - bf = bl; + + buffer = (unsigned char*)malloc(buffer_len); + remaining_space = buffer_len; + } + + // Copy data to buffer + memcpy((unsigned char*)buffer + (buffer_len - remaining_space), current_ptr, data_len); + remaining_space -= data_len; + current_ptr += data_len; + + if (packet_type == 'Z') { + // if packet is a 'Z' packet (Ready Packet), we need to overwrite trasaction state + *(buffer + (buffer_len - remaining_space) - 1) = current_transaction_state; } - memcpy((char*)buff + (bl - bf), __ptr, l); - bf -= l; - __ptr += l; - /* - l=hdr.pkt_length+sizeof(mysql_hdr); - pkt=l_alloc(l); - memcpy(pkt,__ptr,l); - resultset->add(pkt,l); - __ptr+=l; - */ } - if (buff) { - // last buffer to add - resultset->add(buff, bl - bf); + // Add the last buffer to the resultset, if any + if (buffer) { + resultset->add(buffer, buffer_len - remaining_space); } -}; +} int PgSQL_Data_Stream::array2buffer_full() { int rc = 0; diff --git a/lib/PgSQL_Query_Cache.cpp b/lib/PgSQL_Query_Cache.cpp new file mode 100644 index 000000000..8d62d3650 --- /dev/null +++ b/lib/PgSQL_Query_Cache.cpp @@ -0,0 +1,48 @@ +#include "proxysql.h" +#include "cpp.h" +#include "PgSQL_Query_Cache.h" + +extern PgSQL_Threads_Handler* GloPTH; + +bool PgSQL_Query_Cache::set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, + uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms) { + + PgSQL_QC_entry_t* entry = (PgSQL_QC_entry_t*)malloc(sizeof(PgSQL_QC_entry_t)); + return Query_Cache::set(entry, user_hash, kp, kl, vp, vl, create_ms, curtime_ms, expire_ms); +} + +const std::shared_ptr PgSQL_Query_Cache::get(uint64_t user_hash, const unsigned char* kp, + const uint32_t kl, uint64_t curtime_ms, uint64_t cache_ttl) { + + const std::shared_ptr entry_shared = std::static_pointer_cast( + Query_Cache::get(user_hash, kp, kl, curtime_ms, cache_ttl) + ); + return entry_shared; +} + +/* +void* PgSQL_Query_Cache::purgeHash_thread(void*) { + + unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version; + PgSQL_Thread* pgsql_thr = new PgSQL_Thread(); + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version(); + set_thread_name("PgQCPurge"); + pgsql_thr->refresh_variables(); + max_memory_size = static_cast(pgsql_thread___query_cache_size_MB*1024ULL*1024ULL); + while (shutting_down == false) { + usleep(purge_loop_time); + unsigned int glover = GloPTH->get_global_version(); + if (GloPTH) { + if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < glover) { + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = glover; + pgsql_thr->refresh_variables(); + max_memory_size = static_cast(pgsql_thread___query_cache_size_MB*1024ULL*1024ULL); + } + } + const unsigned int curr_pct = current_used_memory_pct(); + if (curr_pct < purge_threshold_pct_min) continue; + Query_Cache::purgeHash((monotonic_time()/1000ULL), curr_pct); + } + delete pgsql_thr; + return NULL; +}*/ diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index c454982d1..4072a4472 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -23,7 +23,7 @@ using json = nlohmann::json; #include "SQLite3_Server.h" #include "MySQL_Variables.h" #include "ProxySQL_Cluster.hpp" - +#include "PgSQL_Query_Cache.h" #include "libinjection.h" #include "libinjection_sqli.h" @@ -313,7 +313,7 @@ __exit_kill_query_thread: } extern PgSQL_Query_Processor* GloPgQPro; -extern Query_Cache* GloQC; +extern PgSQL_Query_Cache *GloPgQC; extern ProxySQL_Admin* GloAdmin; extern PgSQL_Threads_Handler* GloPTH; @@ -5831,24 +5831,21 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C return true; } //} - /* Query Cache is not supported for PgSQL if (qpo->cache_ttl > 0 && ((prepare_stmt_type & PgSQL_ps_type_prepare_stmt) == 0)) { - bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF; - uint32_t resbuf = 0; - unsigned char* aa = GloQC->get( + + const std::shared_ptr pgsql_qc_entry = GloPgQC->get( client_myds->myconn->userinfo->hash, (const unsigned char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, - &resbuf, thread->curtime / 1000, - qpo->cache_ttl, - deprecate_eof_active + qpo->cache_ttl ); - if (aa) { - client_myds->buffer2resultset(aa, resbuf); - free(aa); - client_myds->PSarrayOUT->copy_add(client_myds->resultset, 0, client_myds->resultset->len); - while (client_myds->resultset->len) client_myds->resultset->remove_index(client_myds->resultset->len - 1, NULL); + if (pgsql_qc_entry) { + // FIXME: Add Error Transaction state detection + unsigned int nTrx = NumActiveTransactions(); + PgSQL_Data_Stream::copy_buffer_to_resultset(client_myds->PSarrayOUT, + pgsql_qc_entry->value, pgsql_qc_entry->length, (nTrx ? 'T' : 'I')); + //client_myds->PSarrayOUT->copy_add(resultset, 0, resultset->len); if (transaction_persistent_hostgroup == -1) { // not active, we can change it current_hostgroup = -1; @@ -5857,7 +5854,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C l_free(pkt->size, pkt->ptr); return true; } - }*/ + } __exit_set_destination_hostgroup: @@ -6232,48 +6229,46 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da bool transfer_started = query_result->is_transfer_started(); // if there is an error, it will be false so results are not cached bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY); - CurrentQuery.rows_sent = query_result->get_num_rows(); + const uint64_t num_rows = query_result->get_num_rows(); + const uint64_t resultset_size = query_result->get_resultset_size(); const auto _affected_rows = query_result->get_affected_rows(); if (_affected_rows != -1) { CurrentQuery.affected_rows = _affected_rows; CurrentQuery.have_affected_rows = true; } + CurrentQuery.rows_sent = num_rows; bool resultset_completed = query_result->get_resultset(client_myds->PSarrayOUT); if (_conn->processing_multi_statement == false) assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called if (transfer_started == false) { // we have all the resultset when PgSQL_Result_to_PgSQL_wire was called if (qpo && qpo->cache_ttl > 0 && is_tuple == true) { // the resultset should be cached - /*if (mysql_errno(pgsql) == 0 && - (mysql_warning_count(pgsql) == 0 || - mysql_thread___query_cache_handle_warnings == 1)) { // no errors + + if (_conn->is_error_present() == false && + (/* check warnings count here*/ true || + pgsql_thread___query_cache_handle_warnings == 1)) { // no errors + if ( - (qpo->cache_empty_result == 1) - || ( - (qpo->cache_empty_result == -1) - && - (thread->variables.query_cache_stores_empty_result || query_result->num_rows) + (qpo->cache_empty_result == 1) || + ( + (qpo->cache_empty_result == -1) && + (thread->variables.query_cache_stores_empty_result || num_rows) ) ) { - client_myds->resultset->copy_add(client_myds->PSarrayOUT, 0, client_myds->PSarrayOUT->len); - client_myds->resultset_length = query_result->resultset_size; - unsigned char* aa = client_myds->resultset2buffer(false); - while (client_myds->resultset->len) client_myds->resultset->remove_index(client_myds->resultset->len - 1, NULL); - bool deprecate_eof_active = client_myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF; - GloQC->set( + // Query Cache will have the ownership to buff. No need to free it here + unsigned char* buff = PgSQL_Data_Stream::copy_array_to_buffer(client_myds->PSarrayOUT, + resultset_size, false); + GloPgQC->set( client_myds->myconn->userinfo->hash, - (const unsigned char*)CurrentQuery.QueryPointer, + CurrentQuery.QueryPointer, CurrentQuery.QueryLength, - aa, - client_myds->resultset_length, + buff, + resultset_size, thread->curtime / 1000, thread->curtime / 1000, - thread->curtime / 1000 + qpo->cache_ttl, - deprecate_eof_active + thread->curtime / 1000 + qpo->cache_ttl ); - l_free(client_myds->resultset_length, aa); - client_myds->resultset_length = 0; } - }*/ + } } } } else { // if query result is empty, means there was an error before query result was generated @@ -6534,7 +6529,7 @@ void PgSQL_Session::Memory_Stats() { unsigned long long internal = 0; internal += sizeof(PgSQL_Session); if (qpo) - internal += sizeof(Query_Processor_Output); + internal += sizeof(PgSQL_Query_Processor_Output); if (client_myds) { internal += sizeof(PgSQL_Data_Stream); if (client_myds->queueIN.buffer) @@ -6552,7 +6547,7 @@ void PgSQL_Session::Memory_Stats() { internal += client_myds->PSarrayOUT->total_size(); } else { internal += client_myds->PSarrayOUT->total_size(PGSQL_RESULTSET_BUFLEN); - internal += client_myds->resultset->total_size(PGSQL_RESULTSET_BUFLEN); + //internal += client_myds->resultset->total_size(PGSQL_RESULTSET_BUFLEN); } } } diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 3931eba3a..82533ad64 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -3789,9 +3789,9 @@ void PgSQL_Thread::refresh_variables() { pgsql_thread___query_processor_iterations = GloPTH->get_variable_int((char*)"query_processor_iterations"); pgsql_thread___query_processor_regex = GloPTH->get_variable_int((char*)"query_processor_regex"); - mysql_thread___query_cache_size_MB = GloPTH->get_variable_int((char*)"query_cache_size_MB"); - mysql_thread___query_cache_soft_ttl_pct = GloPTH->get_variable_int((char*)"query_cache_soft_ttl_pct"); - mysql_thread___query_cache_handle_warnings = GloPTH->get_variable_int((char*)"query_cache_handle_warnings"); + pgsql_thread___query_cache_size_MB = GloPTH->get_variable_int((char*)"query_cache_size_MB"); + pgsql_thread___query_cache_soft_ttl_pct = GloPTH->get_variable_int((char*)"query_cache_soft_ttl_pct"); + pgsql_thread___query_cache_handle_warnings = GloPTH->get_variable_int((char*)"query_cache_handle_warnings"); /* mysql_thread___max_stmts_per_connection = GloPTH->get_variable_int((char*)"max_stmts_per_connection"); mysql_thread___max_stmts_cache = GloPTH->get_variable_int((char*)"max_stmts_cache"); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index b4d010d00..2a9b7324d 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -77,6 +77,8 @@ using json = nlohmann::json; #include #include "PgSQL_Protocol.h" +#include "MySQL_Query_Cache.h" +#include "PgSQL_Query_Cache.h" //#include "usual/time.h" using std::string; @@ -307,7 +309,8 @@ bool admin_proxysql_mysql_paused = false; bool admin_proxysql_pgsql_paused = false; int admin_old_wait_timeout; -extern Query_Cache *GloQC; +extern MySQL_Query_Cache *GloMyQC; +extern PgSQL_Query_Cache* GloPgQC; extern MySQL_Authentication *GloMyAuth; extern PgSQL_Authentication *GloPgAuth; extern MySQL_LDAP_Authentication *GloMyLdapAuth; @@ -2328,8 +2331,8 @@ __end_while_pool: } } if (GloProxyStats->MySQL_Query_Cache_timetoget(curtime)) { - if (GloQC) { - SQLite3_result * resultset=GloQC->SQL3_getStats(); + if (GloMyQC) { + SQLite3_result * resultset=GloMyQC->SQL3_getStats(); if (resultset) { GloProxyStats->MySQL_Query_Cache_sets(resultset); delete resultset; @@ -2502,9 +2505,13 @@ void update_modules_metrics() { if (GloMyMon) { GloMyMon->p_update_metrics(); } - // Update query_cache metrics - if (GloQC) { - GloQC->p_update_metrics(); + // Update mysql query_cache metrics + if (GloMyQC) { + GloMyQC->p_update_metrics(); + } + // Update pgsql query_cache metrics + if (GloPgQC) { + GloPgQC->p_update_metrics(); } // Update cluster metrics if (GloProxyCluster) { diff --git a/lib/ProxySQL_Admin_Stats.cpp b/lib/ProxySQL_Admin_Stats.cpp index 16510160f..6de28f52f 100644 --- a/lib/ProxySQL_Admin_Stats.cpp +++ b/lib/ProxySQL_Admin_Stats.cpp @@ -12,7 +12,8 @@ #include "MySQL_LDAP_Authentication.hpp" #include "MySQL_PreparedStatement.h" #include "ProxySQL_Cluster.hpp" - +#include "MySQL_Query_Cache.h" +#include "PgSQL_Query_Cache.h" #include "MySQL_Query_Processor.h" #include "PgSQL_Query_Processor.h" @@ -31,7 +32,8 @@ extern bool admin_proxysql_pgsql_paused; extern MySQL_Authentication *GloMyAuth; extern PgSQL_Authentication* GloPgAuth; extern MySQL_LDAP_Authentication *GloMyLdapAuth; -extern Query_Cache *GloQC; +extern MySQL_Query_Cache *GloMyQC; +extern PgSQL_Query_Cache* GloPgQC; extern ProxySQL_Admin *GloAdmin; extern MySQL_Threads_Handler *GloMTH; extern PgSQL_Threads_Handler* GloPTH; @@ -531,7 +533,7 @@ void ProxySQL_Admin::stats___mysql_global() { free(query); } - if (GloQC && (resultset=GloQC->SQL3_getStats())) { + if (GloMyQC && (resultset= GloMyQC->SQL3_getStats())) { for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; int arg_len=0; @@ -691,7 +693,7 @@ void ProxySQL_Admin::stats___pgsql_global() { free(query); }*/ - if (GloQC && (resultset = GloQC->SQL3_getStats())) { + if (GloPgQC && (resultset = GloPgQC->SQL3_getStats())) { for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { SQLite3_row* r = *it; int arg_len = 0; diff --git a/lib/Query_Cache.cpp b/lib/Query_Cache.cpp index 002031499..cc4ef3571 100644 --- a/lib/Query_Cache.cpp +++ b/lib/Query_Cache.cpp @@ -1,12 +1,17 @@ -#include "prometheus/counter.h" #include "btree_map.h" -#include "proxysql.h" -#include "cpp.h" -#include "query_cache.hpp" #include "proxysql_atomic.h" -//#include "SpookyV2.h" +#include "prometheus/counter.h" #include "prometheus_helpers.h" -#include "MySQL_Protocol.h" +#include "query_cache.hpp" +#include "MySQL_Query_Cache.h" +#include "PgSQL_Query_Cache.h" + +#ifdef DEBUG +#define DEB "_DEBUG" +#else +#define DEB "" +#endif /* DEBUG */ +#define QUERY_CACHE_VERSION "2.0.0385" DEB #define THR_UPDATE_CNT(__a, __b, __c, __d) \ do {\ @@ -24,140 +29,155 @@ } \ } while(0) +#define DEFAULT_SQC_size 4*1024*1024 -#ifdef DEBUG -#define DEB "_DEBUG" -#else -#define DEB "" -#endif /* DEBUG */ -#define QUERY_CACHE_VERSION "1.2.0905" DEB -#define PROXYSQL_QC_PTHREAD_MUTEX - -extern MySQL_Threads_Handler *GloMTH; - -typedef btree::btree_map BtMap_cache; +#define GET_THREAD_VARIABLE(VARIABLE_NAME) \ +({((std::is_same_v) ? mysql_thread___##VARIABLE_NAME : pgsql_thread___##VARIABLE_NAME) ;}) + +__thread uint64_t __thr_cntSet = 0; +__thread uint64_t __thr_cntGet = 0; +__thread uint64_t __thr_cntGetOK = 0; +__thread uint64_t __thr_dataIN = 0; +__thread uint64_t __thr_dataOUT = 0; +__thread uint64_t __thr_num_entries = 0; +__thread uint64_t __thr_num_deleted = 0; +__thread uint64_t __thr_size_values = 0; + +static uint64_t Glo_cntSet = 0; +static uint64_t Glo_cntGet = 0; +static uint64_t Glo_cntGetOK = 0; +static uint64_t Glo_num_entries = 0; +static uint64_t Glo_dataIN = 0; +static uint64_t Glo_dataOUT = 0; +static uint64_t Glo_cntPurge = 0; +static uint64_t Glo_size_values = 0; +static uint64_t Glo_total_freed_memory = 0; + +template +bool Query_Cache::shutting_down = false; + +template +pthread_t Query_Cache::purge_thread_id; class KV_BtreeArray { - private: -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_t lock; -#else - rwlock_t lock; -#endif - BtMap_cache bt_map; - PtrArray *ptrArray; - uint64_t purgeChunkSize; - uint64_t purgeIdx; - bool __insert(uint64_t, void *); - uint64_t freeable_memory; - public: - uint64_t tottopurge; - KV_BtreeArray(); +public: + KV_BtreeArray(unsigned int entry_size); ~KV_BtreeArray(); - uint64_t get_data_size(); - void purge_some(unsigned long long, bool); - int cnt(); + std::weak_ptr lookup(uint64_t key); bool replace(uint64_t key, QC_entry_t *entry); - QC_entry_t *lookup(uint64_t key); - void empty(); -}; + void clear(bool release_entries = false); + void purge_some(uint64_t QCnow_ms, bool aggressive); + uint64_t get_data_size() const; + int count() const; -__thread uint64_t __thr_cntSet=0; -__thread uint64_t __thr_cntGet=0; -__thread uint64_t __thr_cntGetOK=0; -__thread uint64_t __thr_dataIN=0; -__thread uint64_t __thr_dataOUT=0; -__thread uint64_t __thr_num_entries=0; -__thread uint64_t __thr_num_deleted=0; -__thread uint64_t __thr_size_values=0; -//__thread uint64_t __thr_freeable_memory=0; +private: + pthread_rwlock_t lock; + std::vector> entries; + using BtMap_cache = btree::btree_map>; + BtMap_cache bt_map; + const unsigned int qc_entry_size; -#define DEFAULT_SQC_size 4*1024*1024 + void rdlock(); + void wrlock(); + void unlock(); + void add_to_entries(const std::shared_ptr& entry); + void remove_from_entries_by_index(size_t index); +}; +void free_QC_Entry(QC_entry_t* entry) { + if (entry) { + free(entry->value); + free(entry); + } +} -static uint64_t Glo_cntSet=0; -static uint64_t Glo_cntGet=0; -static uint64_t Glo_cntGetOK=0; -static uint64_t Glo_num_entries=0; -static uint64_t Glo_dataIN=0; -static uint64_t Glo_dataOUT=0; -static uint64_t Glo_cntPurge=0; -static uint64_t Glo_size_values=0; -static uint64_t Glo_total_freed_memory; - -KV_BtreeArray::KV_BtreeArray() { - freeable_memory=0; - tottopurge=0; -#ifdef PROXYSQL_QC_PTHREAD_MUTEX +KV_BtreeArray::KV_BtreeArray(unsigned int entry_size) : qc_entry_size(entry_size) { pthread_rwlock_init(&lock, NULL); -#else - spinlock_rwlock_init(&lock); -#endif - ptrArray = new PtrArray; }; KV_BtreeArray::~KV_BtreeArray() { - proxy_debug(PROXY_DEBUG_QUERY_CACHE, 3, "Size of KVBtreeArray:%d , ptrArray:%u\n", cnt() , ptrArray->len); - empty(); - QC_entry_t *qce=NULL; - while (ptrArray->len) { - qce=(QC_entry_t *)ptrArray->remove_index_fast(0); - free(qce->value); - free(qce); - } - delete ptrArray; + proxy_debug(PROXY_DEBUG_QUERY_CACHE, 3, "Size of KVBtreeArray:%d , entries:%lu\n", count(), entries.size()); + clear(true); + pthread_rwlock_destroy(&lock); }; +inline void KV_BtreeArray::rdlock() { pthread_rwlock_rdlock(&lock); } +inline void KV_BtreeArray::wrlock() { pthread_rwlock_wrlock(&lock); } +inline void KV_BtreeArray::unlock() { pthread_rwlock_unlock(&lock); } + +void KV_BtreeArray::add_to_entries(const std::shared_ptr& entry) { + if (entries.capacity() <= (entries.size() + 1)) { + const unsigned int new_size = l_near_pow_2(entries.size() + 1); + entries.reserve(new_size); + } + entries.push_back(entry); +} + +void KV_BtreeArray::remove_from_entries_by_index(size_t index) { + if (index >= entries.size()) { + return; + } + + if (index != entries.size() - 1) { + std::swap(entries[index], entries.back()); + } + + entries.pop_back(); + + if ((entries.size() > MIN_ARRAY_LEN) && (entries.capacity() > entries.size() * MIN_ARRAY_DELETE_RATIO)) { + entries.shrink_to_fit(); + } +} -uint64_t KV_BtreeArray::get_data_size() { - uint64_t r = __sync_fetch_and_add(&Glo_num_entries,0) * (sizeof(QC_entry_t)+sizeof(QC_entry_t *)*2+sizeof(uint64_t)*2); // + __sync_fetch_and_add(&Glo_size_values,0) ; - return r; +uint64_t KV_BtreeArray::get_data_size() const { + uint64_t data_size = __sync_fetch_and_add(&Glo_num_entries,0) * (qc_entry_size+sizeof(QC_entry_t*)*2+sizeof(uint64_t)*2); // + __sync_fetch_and_add(&Glo_size_values,0) ; + return data_size; }; -void KV_BtreeArray::purge_some(unsigned long long QCnow_ms, bool aggressive) { - uint64_t ret=0, i, _size=0; - QC_entry_t *qce; - unsigned long long access_ms_min=0; - unsigned long long access_ms_max=0; -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_rdlock(&lock); -#else - spin_rdlock(&lock); -#endif - for (i=0; ilen;i++) { - qce=(QC_entry_t *)ptrArray->index(i); +void KV_BtreeArray::purge_some(uint64_t QCnow_ms, bool aggressive) { + uint64_t ret = 0; + uint64_t freeable_memory = 0; + uint64_t access_ms_min = std::numeric_limits::max(); + uint64_t access_ms_max = 0; + + rdlock(); + + for (const std::shared_ptr& entry_shared : entries) { + if (aggressive) { // we have been asked to do aggressive purging - if (access_ms_min==0) { - access_ms_min = qce->access_ms; + + access_ms_min = std::min(access_ms_min, entry_shared->access_ms); + access_ms_max = std::max(access_ms_max, entry_shared->access_ms); + + /* if (access_ms_min == 0) { + access_ms_min = entry_shared->access_ms; } else { - if (access_ms_min > qce->access_ms) { - access_ms_min = qce->access_ms; + if (access_ms_min > entry_shared->access_ms) { + access_ms_min = entry_shared->access_ms; } } if (access_ms_max==0) { - access_ms_max = qce->access_ms; + access_ms_max = entry_shared->access_ms; } else { - if (access_ms_max < qce->access_ms) { - access_ms_max = qce->access_ms; + if (access_ms_max < entry_shared->access_ms) { + access_ms_max = entry_shared->access_ms; } - } + }*/ } else { // no aggresssive purging , legacy algorithm - if (qce->expire_ms==EXPIRE_DROPIT || qce->expire_msexpire_ms == EXPIRE_DROPIT || entry_shared->expire_ms < QCnow_ms) { ret++; - _size+=qce->length; + freeable_memory += entry_shared->length; } } } - freeable_memory=_size; -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_unlock(&lock); -#else - spin_rdunlock(&lock); -#endif + //freeable_memory=_size; + + unlock(); + bool cond_freeable_memory=false; if (aggressive==false) { uint64_t total_freeable_memory=0; - total_freeable_memory=freeable_memory + ret * (sizeof(QC_entry_t)+sizeof(QC_entry_t *)*2+sizeof(uint64_t)*2); + total_freeable_memory=freeable_memory + ret * (qc_entry_size+sizeof(QC_entry_t*)*2+sizeof(uint64_t)*2); if ( total_freeable_memory > get_data_size()*0.01 ) { cond_freeable_memory=true; // there is memory that can be freed } @@ -166,50 +186,46 @@ void KV_BtreeArray::purge_some(unsigned long long QCnow_ms, bool aggressive) { if ( aggressive || cond_freeable_memory ) { uint64_t removed_entries=0; uint64_t freed_memory=0; - unsigned long long access_ms_lower_mark=0; + uint64_t access_ms_lower_mark=0; if (aggressive) { - access_ms_lower_mark=access_ms_min+(access_ms_max-access_ms_min)*0.1; // hardcoded for now. Remove the entries with access time in the 10% range closest to access_ms_min + access_ms_lower_mark = access_ms_min + (access_ms_max-access_ms_min) * 0.1; // hardcoded for now. Remove the entries with access time in the 10% range closest to access_ms_min } -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_wrlock(&lock); -#else - spin_wrlock(&lock); -#endif - for (i=0; ilen;i++) { - qce=(QC_entry_t *)ptrArray->index(i); + + wrlock(); + + for (size_t i = 0; i < entries.size();) { + const std::shared_ptr& entry_shared = entries[i]; bool drop_entry=false; - if (__sync_fetch_and_add(&qce->ref_count,0)<=1) { // currently not in use - if (qce->expire_ms==EXPIRE_DROPIT || qce->expire_msref_count,0)<=1) { // currently not in use + if (entry_shared.use_count() <= 1) { // we check this to avoid releasing entries that are still in use + if (entry_shared->expire_ms == EXPIRE_DROPIT || entry_shared->expire_ms < QCnow_ms) { //legacy algorithm drop_entry=true; } if (aggressive) { // we have been asked to do aggressive purging if (drop_entry==false) { // if the entry is already marked to be dropped, no further check - if (qce->access_ms < access_ms_lower_mark) { + if (entry_shared->access_ms < access_ms_lower_mark) { drop_entry=true; } } } } if (drop_entry) { - qce=(QC_entry_t *)ptrArray->remove_index_fast(i); - - btree::btree_map::iterator lookup; - lookup = bt_map.find(qce->key); - if (lookup != bt_map.end()) { + const uint32_t length = entry_shared->length; + btree::btree_map>::iterator lookup; + lookup = bt_map.find(entry_shared->key); + if (lookup != bt_map.end()) { bt_map.erase(lookup); } - i--; - freed_memory+=qce->length; + remove_from_entries_by_index(i); + freed_memory+=length; removed_entries++; - free(qce->value); - free(qce); + continue; } + i++; } -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_unlock(&lock); -#else - spin_wrunlock(&lock); -#endif + + unlock(); + THR_DECREASE_CNT(__thr_num_deleted,Glo_num_entries,removed_entries,1); if (removed_entries) { __sync_fetch_and_add(&Glo_total_freed_memory,freed_memory); @@ -219,82 +235,69 @@ void KV_BtreeArray::purge_some(unsigned long long QCnow_ms, bool aggressive) { } }; -int KV_BtreeArray::cnt() { +inline int KV_BtreeArray::count() const { return bt_map.size(); }; bool KV_BtreeArray::replace(uint64_t key, QC_entry_t *entry) { -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_wrlock(&lock); -#else - spin_wrlock(&lock); -#endif + + std::shared_ptr entry_shared(entry, &free_QC_Entry); + wrlock(); THR_UPDATE_CNT(__thr_cntSet,Glo_cntSet,1,1); THR_UPDATE_CNT(__thr_size_values,Glo_size_values,entry->length,1); THR_UPDATE_CNT(__thr_dataIN,Glo_dataIN,entry->length,1); THR_UPDATE_CNT(__thr_num_entries,Glo_num_entries,1,1); - entry->ref_count=1; - ptrArray->add(entry); - btree::btree_map::iterator lookup; - lookup = bt_map.find(key); - if (lookup != bt_map.end()) { - lookup->second->expire_ms=EXPIRE_DROPIT; - __sync_fetch_and_sub(&lookup->second->ref_count,1); + add_to_entries(entry_shared); + btree::btree_map>::iterator lookup; + lookup = bt_map.find(key); + if (lookup != bt_map.end()) { + if (std::shared_ptr found_entry_shared = lookup->second.lock()) { + found_entry_shared->expire_ms = EXPIRE_DROPIT; + } bt_map.erase(lookup); } - bt_map.insert(std::make_pair(key,entry)); -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_unlock(&lock); -#else - spin_wrunlock(&lock); -#endif + bt_map.insert({key,entry_shared}); + +#ifdef DEBUG + assert(entry_shared.use_count() == 2); // it should be 2, one for entry_shared object and one for object in entries vector +#endif /* DEBUG */ + unlock(); return true; } -QC_entry_t * KV_BtreeArray::lookup(uint64_t key) { - QC_entry_t *entry=NULL; -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_rdlock(&lock); -#else - spin_rdlock(&lock); -#endif +std::weak_ptr KV_BtreeArray::lookup(uint64_t key) { + std::weak_ptr entry_ptr; + rdlock(); THR_UPDATE_CNT(__thr_cntGet,Glo_cntGet,1,1); - btree::btree_map::iterator lookup; - lookup = bt_map.find(key); - if (lookup != bt_map.end()) { - entry=lookup->second; - __sync_fetch_and_add(&entry->ref_count,1); - } -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_unlock(&lock); -#else - spin_rdunlock(&lock); -#endif - return entry; + btree::btree_map>::iterator lookup; + lookup = bt_map.find(key); + if (lookup != bt_map.end()) { + entry_ptr = lookup->second; + //__sync_fetch_and_add(&entry->ref_count,1); + } + unlock(); + return entry_ptr; }; -void KV_BtreeArray::empty() { -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_wrlock(&lock); -#else - spin_wrlock(&lock); -#endif - btree::btree_map::iterator lookup; +void KV_BtreeArray::clear(bool release_entries) { + wrlock(); + btree::btree_map>::iterator lookup; while (bt_map.size()) { lookup = bt_map.begin(); if ( lookup != bt_map.end() ) { - lookup->second->expire_ms=EXPIRE_DROPIT; + if (std::shared_ptr found_entry_shared = lookup->second.lock()) { + found_entry_shared->expire_ms = EXPIRE_DROPIT; + } bt_map.erase(lookup); } } -#ifdef PROXYSQL_QC_PTHREAD_MUTEX - pthread_rwlock_unlock(&lock); -#else - spin_wrunlock(&lock); -#endif -}; + if (release_entries) + entries.clear(); + + unlock(); +} using metric_name = std::string; using metric_help = std::string; @@ -398,17 +401,18 @@ qc_metrics_map = std::make_tuple( } ); -uint64_t Query_Cache::get_data_size_total() { - uint64_t r=0; - int i; - for (i=0; iget_data_size(); +template +uint64_t Query_Cache::get_data_size_total() { + uint64_t total_size = 0; + for (int i=0; iget_data_size(); } - r += __sync_fetch_and_add(&Glo_size_values,0); - return r; + total_size += __sync_fetch_and_add(&Glo_size_values,0); + return total_size; }; -unsigned int Query_Cache::current_used_memory_pct() { +template +unsigned int Query_Cache::current_used_memory_pct(uint64_t max_memory_size) { uint64_t cur_size=get_data_size_total(); float pctf = (float) cur_size*100/max_memory_size; if (pctf > 100) return 100; @@ -416,7 +420,8 @@ unsigned int Query_Cache::current_used_memory_pct() { return pct; } -Query_Cache::Query_Cache() { +template +Query_Cache::Query_Cache() { #ifdef DEBUG if (glovars.has_debug==false) { #else @@ -426,23 +431,22 @@ Query_Cache::Query_Cache() { exit(EXIT_FAILURE); } for (int i=0; i(qc_metrics_map, this->metrics.p_counter_array); init_prometheus_gauge_array(qc_metrics_map, this->metrics.p_gauge_array); }; -void Query_Cache::p_update_metrics() { +template +void Query_Cache::p_update_metrics() { this->metrics.p_gauge_array[p_qc_gauge::query_cache_memory_bytes]->Set(get_data_size_total()); p_update_counter(this->metrics.p_counter_array[p_qc_counter::query_cache_count_get], Glo_cntGet - Glo_cntGetOK); p_update_counter(this->metrics.p_counter_array[p_qc_counter::query_cache_count_get_ok], Glo_cntGetOK); @@ -453,209 +457,33 @@ void Query_Cache::p_update_metrics() { p_update_counter(this->metrics.p_counter_array[p_qc_counter::query_cache_entries], Glo_num_entries); } -void Query_Cache::print_version() { +template +void Query_Cache::print_version() { fprintf(stderr,"In memory Standard Query Cache (SQC) rev. %s -- %s -- %s\n", QUERY_CACHE_VERSION, __FILE__, __TIMESTAMP__); }; -Query_Cache::~Query_Cache() { - unsigned int i; - for (i=0; i +Query_Cache::~Query_Cache() { + for (unsigned int i=0; i(- (sizeof(mysql_hdr) + 5) + 2); -const int ok_to_eof_dif = static_cast(+ (sizeof(mysql_hdr) + 5) - 2); - -/** - * @brief Converts a 'EOF_Packet' to holded inside a 'QC_entry_t' into a 'OK_Packet'. - * Warning: This function assumes that the supplied 'QC_entry_t' holds a valid - * 'EOF_Packet'. - * - * @param entry The 'QC_entry_t' holding a 'OK_Packet' to be converted into - * a 'EOF_Packet'. - * @return The converted packet. - */ -unsigned char* eof_to_ok_packet(QC_entry_t* entry) { - unsigned char* result = (unsigned char*)malloc(entry->length + eof_to_ok_dif); - unsigned char* vp = result; - char* it = entry->value; - - // Copy until the first EOF - memcpy(vp, entry->value, entry->column_eof_pkt_offset); - it += entry->column_eof_pkt_offset; - vp += entry->column_eof_pkt_offset; - - // Skip the first EOF after columns def - mysql_hdr hdr; - memcpy(&hdr, it, sizeof(mysql_hdr)); - it += sizeof(mysql_hdr) + hdr.pkt_length; - - // Copy all the rows - uint64_t u_entry_val = reinterpret_cast(entry->value); - uint64_t u_it_pos = reinterpret_cast(it); - uint64_t rows_length = (u_entry_val + entry->row_eof_pkt_offset) - u_it_pos; - memcpy(vp, it, rows_length); - vp += rows_length; - it += rows_length; - - // Replace final EOF in favor of OK packet - // ======================================= - // Copy the mysql header - memcpy(&hdr, it, sizeof(mysql_hdr)); - hdr.pkt_length = 7; - memcpy(vp, &hdr, sizeof(mysql_hdr)); - vp += sizeof(mysql_hdr); - it += sizeof(mysql_hdr); - - // OK packet header - *vp = 0xfe; - vp++; - it++; - // Initialize affected_rows and last_insert_id to zero - memset(vp, 0, 2); - vp += 2; - // Extract warning flags and status from 'EOF_packet' - char* eof_packet = entry->value + entry->row_eof_pkt_offset; - eof_packet += sizeof(mysql_hdr); - // Skip the '0xFE EOF packet header' - eof_packet += 1; - uint16_t warnings; - memcpy(&warnings, eof_packet, sizeof(uint16_t)); - eof_packet += 2; - uint16_t status_flags; - memcpy(&status_flags, eof_packet, sizeof(uint16_t)); - // Copy warnings an status flags - memcpy(vp, &status_flags, sizeof(uint16_t)); - vp += 2; - memcpy(vp, &warnings, sizeof(uint16_t)); - // ======================================= - - // Decrement ids after the first EOF - unsigned char* dp = result + entry->column_eof_pkt_offset; - mysql_hdr decrement_hdr; - for (;;) { - memcpy(&decrement_hdr, dp, sizeof(mysql_hdr)); - decrement_hdr.pkt_id--; - memcpy(dp, &decrement_hdr, sizeof(mysql_hdr)); - dp += sizeof(mysql_hdr) + decrement_hdr.pkt_length; - if (dp >= vp) - break; - } - - return result; -} - -/** - * @brief Converts a 'OK_Packet' holded inside 'QC_entry_t' into a 'EOF_Packet'. - * Warning: This function assumes that the supplied 'QC_entry_t' holds a valid - * 'OK_Packet'. - * - * @param entry The 'QC_entry_t' holding a 'EOF_Packet' to be converted into - * a 'OK_Packet'. - * @return The converted packet. - */ -unsigned char* ok_to_eof_packet(QC_entry_t* entry) { - unsigned char* result = (unsigned char*)malloc(entry->length + ok_to_eof_dif); - unsigned char* vp = result; - char* it = entry->value; - - // Extract warning flags and status from 'OK_packet' - char* ok_packet = it + entry->ok_pkt_offset; - mysql_hdr ok_hdr; - memcpy(&ok_hdr, ok_packet, sizeof(mysql_hdr)); - ok_packet += sizeof(mysql_hdr); - // Skip the 'OK packet header', 'affected_rows' and 'last_insert_id' - ok_packet += 3; - uint16_t status_flags; - memcpy(&status_flags, ok_packet, sizeof(uint16_t)); - ok_packet += 2; - uint16_t warnings; - memcpy(&warnings, ok_packet, sizeof(uint16_t)); - - // Find the spot in which the first EOF needs to be placed - it += sizeof(mysql_hdr); - uint64_t c_count = 0; - int c_count_len = mysql_decode_length(reinterpret_cast(it), &c_count); - it += c_count_len; - - mysql_hdr column_hdr; - for (uint64_t i = 0; i < c_count; i++) { - memcpy(&column_hdr, it ,sizeof(mysql_hdr)); - it += sizeof(mysql_hdr) + column_hdr.pkt_length; - } - - // Location for 'column_eof' - uint64_t column_eof_offset = - reinterpret_cast(it) - - reinterpret_cast(entry->value); - memcpy(vp, entry->value, column_eof_offset); - vp += column_eof_offset; - - // Write 'column_eof_packet' header - column_hdr.pkt_id = column_hdr.pkt_id + 1; - column_hdr.pkt_length = 5; - memcpy(vp, &column_hdr, sizeof(mysql_hdr)); - vp += sizeof(mysql_hdr); - - // Write 'column_eof_packet' contents - *vp = 0xfe; - vp++; - memcpy(vp, &warnings, sizeof(uint16_t)); - vp += 2; - memcpy(vp, &status_flags, sizeof(uint16_t)); - vp += 2; - - // Find the OK packet - for (;;) { - mysql_hdr hdr; - memcpy(&hdr, it ,sizeof(mysql_hdr)); - unsigned char* payload = - reinterpret_cast(it) + - sizeof(mysql_hdr); - - if (hdr.pkt_length < 9 && *payload == 0xfe) { - mysql_hdr ok_hdr; - ok_hdr.pkt_id = hdr.pkt_id + 1; - ok_hdr.pkt_length = 5; - memcpy(vp, &ok_hdr, sizeof(mysql_hdr)); - vp += sizeof(mysql_hdr); - - *vp = 0xfe; - vp++; - memcpy(vp, &warnings, sizeof(uint16_t)); - vp += 2; - memcpy(vp, &status_flags, sizeof(uint16_t)); - break; - } else { - // Increment the package id by one due to 'column_eof_packet' - hdr.pkt_id += 1; - memcpy(vp, &hdr, sizeof(mysql_hdr)); - vp += sizeof(mysql_hdr); - it += sizeof(mysql_hdr); - memcpy(vp, it, hdr.pkt_length); - vp += hdr.pkt_length; - it += hdr.pkt_length; - } - } - - return result; -} - -unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, const uint32_t kl, uint32_t *lv, unsigned long long curtime_ms, unsigned long long cache_ttl, bool deprecate_eof_active) { - unsigned char *result=NULL; - +template +std::shared_ptr Query_Cache::get(uint64_t user_hash, const unsigned char *kp, + const uint32_t kl, uint64_t curtime_ms, uint64_t cache_ttl) { + uint64_t hk=SpookyHash::Hash64(kp, kl, user_hash); - unsigned char i=hk%SHARED_QUERY_CACHE_HASH_TABLES; + uint8_t i=hk%SHARED_QUERY_CACHE_HASH_TABLES; - QC_entry_t *entry=KVs[i]->lookup(hk); + std::shared_ptr entry_shared = KVs[i]->lookup(hk).lock(); - if (entry!=NULL) { - unsigned long long t=curtime_ms; - if (entry->expire_ms > t && entry->create_ms + cache_ttl > t) { + if (entry_shared) { + uint64_t t = curtime_ms; + if (entry_shared->expire_ms > t && entry_shared->create_ms + cache_ttl > t) { if ( - mysql_thread___query_cache_soft_ttl_pct && !entry->refreshing && - entry->create_ms + cache_ttl * mysql_thread___query_cache_soft_ttl_pct / 100 <= t + GET_THREAD_VARIABLE(query_cache_soft_ttl_pct) && !entry_shared->refreshing && + entry_shared->create_ms + cache_ttl * GET_THREAD_VARIABLE(query_cache_soft_ttl_pct) / 100 <= t ) { // If the Query Cache entry reach the soft_ttl but do not reach // the cache_ttl, the next query hit the backend and refresh @@ -663,165 +491,61 @@ unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, co // refreshing is in process, other queries keep using the "old" // Query Cache entry. // soft_ttl_pct with value 0 and 100 disables the functionality. - entry->refreshing = true; + entry_shared->refreshing = true; } else { THR_UPDATE_CNT(__thr_cntGetOK,Glo_cntGetOK,1,1); - THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT,entry->length,1); - - if (deprecate_eof_active && entry->column_eof_pkt_offset) { - result = eof_to_ok_packet(entry); - *lv = entry->length + eof_to_ok_dif; - } else if (!deprecate_eof_active && entry->ok_pkt_offset){ - result = ok_to_eof_packet(entry); - *lv = entry->length + ok_to_eof_dif; - } else { - result = (unsigned char *)malloc(entry->length); - memcpy(result, entry->value, entry->length); - *lv = entry->length; - } - - if (t > entry->access_ms) entry->access_ms=t; + THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT, entry_shared->length,1); + if (t > entry_shared->access_ms) entry_shared->access_ms=t; + return entry_shared; } } - __sync_fetch_and_sub(&entry->ref_count,1); } - return result; + return std::shared_ptr(nullptr); } -bool Query_Cache::set(uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp, uint32_t vl, unsigned long long create_ms, unsigned long long curtime_ms, unsigned long long expire_ms, bool deprecate_eof_active) { - QC_entry_t *entry = (QC_entry_t *)malloc(sizeof(QC_entry_t)); +template +bool Query_Cache::set(QC_entry_t* entry, uint64_t user_hash, const unsigned char *kp, uint32_t kl, + unsigned char *vp, uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms) { entry->klen=kl; entry->length=vl; - entry->ref_count=0; - entry->column_eof_pkt_offset=0; - entry->row_eof_pkt_offset=0; - entry->ok_pkt_offset=0; entry->refreshing=false; - - // Find the first EOF location - unsigned char* it = vp; - it += sizeof(mysql_hdr); - uint64_t c_count = 0; - int c_count_len = mysql_decode_length(const_cast(it), &c_count); - it += c_count_len; - - for (uint64_t i = 0; i < c_count; i++) { - mysql_hdr hdr; - memcpy(&hdr, it ,sizeof(mysql_hdr)); - it += sizeof(mysql_hdr) + hdr.pkt_length; - } - - if (deprecate_eof_active == false) { - // Store EOF position and jump to rows - entry->column_eof_pkt_offset = it - vp; - mysql_hdr hdr; - memcpy(&hdr, it, sizeof(mysql_hdr)); - it += sizeof(mysql_hdr) + hdr.pkt_length; - } - - // Find the second EOF location or the OK packet - for (;;) { - mysql_hdr hdr; - memcpy(&hdr, it ,sizeof(mysql_hdr)); - unsigned char* payload = it + sizeof(mysql_hdr); - - if (hdr.pkt_length < 9 && *payload == 0xfe) { - if (deprecate_eof_active) { - entry->ok_pkt_offset = it - vp; - - // Reset the warning flags to zero before storing resultset in the cache - // Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS". - // However, when retrieving data from the cache, it's possible that there are no warnings present - // that might be associated with previous interactions. - unsigned char* payload_temp = payload+1; - - // skip affected_rows - payload_temp += mysql_decode_length(payload_temp, nullptr); - - // skip last_insert_id - payload_temp += mysql_decode_length(payload_temp, nullptr); - - // skip stats_flags - payload_temp += sizeof(uint16_t); - - uint16_t warnings = 0; - memcpy(payload_temp, &warnings, sizeof(uint16_t)); - - } else { - entry->row_eof_pkt_offset = it - vp; - - // Reset the warning flags to zero before storing resultset in the cache - // Reason: When a warning flag is set, it may prompt the client to invoke "SHOW WARNINGS" or "SHOW COUNT(*) FROM WARNINGS". - // However, when retrieving data from the cache, it's possible that there are no warnings present - // that might be associated with previous interactions. - uint16_t warnings = 0; - memcpy((payload + 1), &warnings, sizeof(uint16_t)); - } - break; - } else { - it += sizeof(mysql_hdr) + hdr.pkt_length; - } - } - - entry->value=(char *)malloc(vl); - memcpy(entry->value,vp,vl); - entry->self=entry; + // entry->value = (unsigned char*)malloc(vl); + // memcpy(entry->value, vp, vl); + entry->value = vp; // no need to allocate new memory and copy value + //entry->self=entry; entry->create_ms=create_ms; entry->access_ms=curtime_ms; entry->expire_ms=expire_ms; uint64_t hk=SpookyHash::Hash64(kp, kl, user_hash); - unsigned char i=hk%SHARED_QUERY_CACHE_HASH_TABLES; + uint8_t i=hk%SHARED_QUERY_CACHE_HASH_TABLES; entry->key=hk; + entry->kv=KVs[i]; KVs[i]->replace(hk, entry); - return true; } -uint64_t Query_Cache::flush() { - int i; +template +uint64_t Query_Cache::flush() { uint64_t total_count=0; - for (i=0; icnt(); - KVs[i]->empty(); + for (int i=0; icount(); + KVs[i]->clear(); } return total_count; }; -void * Query_Cache::purgeHash_thread(void *) { - unsigned int i; - unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; - MySQL_Thread * mysql_thr = new MySQL_Thread(); - MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); - set_thread_name("QueryCachePurge"); - mysql_thr->refresh_variables(); - max_memory_size = (uint64_t) mysql_thread___query_cache_size_MB*1024*1024; - while (shutdown==0) { - usleep(purge_loop_time); - unsigned long long t=monotonic_time()/1000; - QCnow_ms=t; - unsigned int glover=GloMTH->get_global_version(); - if (GloMTH) { - if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { - MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; - mysql_thr->refresh_variables(); - max_memory_size = (uint64_t) mysql_thread___query_cache_size_MB*1024*1024; - } - } - unsigned int curr_pct=current_used_memory_pct(); - if (curr_pct < purge_threshold_pct_min ) continue; - for (i=0; ipurge_some(QCnow_ms, (curr_pct > purge_threshold_pct_max)); - } +template +void Query_Cache::purgeHash(uint64_t QCnow_ms, unsigned int curr_pct) { + for (int i = 0; i < SHARED_QUERY_CACHE_HASH_TABLES; i++) { + KVs[i]->purge_some(QCnow_ms, (curr_pct > purge_threshold_pct_max)); } - delete mysql_thr; - return NULL; -}; +} -SQLite3_result * Query_Cache::SQL3_getStats() { - const int colnum=2; +template +SQLite3_result* Query_Cache::SQL3_getStats() { + constexpr int colnum =2; char buf[256]; char **pta=(char **)malloc(sizeof(char *)*colnum); - //Get_Memory_Stats(); SQLite3_result *result=new SQLite3_result(colnum); result->add_column_definition(SQLITE_TEXT,"Variable_Name"); result->add_column_definition(SQLITE_TEXT,"Variable_Value"); @@ -877,3 +601,16 @@ SQLite3_result * Query_Cache::SQL3_getStats() { free(pta); return result; } + +template +void Query_Cache::purgeHash(uint64_t max_memory_size) { + const unsigned int curr_pct = current_used_memory_pct(max_memory_size); + if (curr_pct < purge_threshold_pct_min) return; + purgeHash((monotonic_time() / 1000ULL), curr_pct); +} + +template +class Query_Cache; + +template +class Query_Cache; diff --git a/src/SQLite3_Server.cpp b/src/SQLite3_Server.cpp index a310961ca..da0949520 100644 --- a/src/SQLite3_Server.cpp +++ b/src/SQLite3_Server.cpp @@ -99,7 +99,7 @@ static char *s_strdup(char *s) { static int __SQLite3_Server_refresh_interval=1000; -extern Query_Cache *GloQC; +extern MySQL_Query_Cache *GloMyQC; extern MySQL_Authentication *GloMyAuth; extern ProxySQL_Admin *GloAdmin; extern MySQL_Query_Processor* GloMyQPro; diff --git a/src/main.cpp b/src/main.cpp index 8cb5f854b..e715e573e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -31,6 +31,8 @@ using json = nlohmann::json; #include "MySQL_Authentication.hpp" #include "PgSQL_Authentication.h" #include "MySQL_LDAP_Authentication.hpp" +#include "MySQL_Query_Cache.h" +#include "PgSQL_Query_Cache.h" #include "proxysql_restapi.h" #include "Web_Interface.hpp" #include "proxysql_utils.h" @@ -444,7 +446,8 @@ int listen_fd; int socket_fd; -Query_Cache *GloQC; +MySQL_Query_Cache *GloMyQC; +PgSQL_Query_Cache* GloPgQC; MySQL_Authentication *GloMyAuth; PgSQL_Authentication* GloPgAuth; MySQL_LDAP_Authentication *GloMyLdapAuth; @@ -607,11 +610,55 @@ void* pgsql_worker_thread_func_idles(void* arg) { } #endif // IDLE_THREADS -void * mysql_shared_query_cache_funct(void *arg) { - GloQC->purgeHash_thread(NULL); +void* unified_query_cache_purge_thread(void *arg) { + + set_thread_name("QCPurgeThread"); + + MySQL_Thread* mysql_thr = new MySQL_Thread(); + unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; + MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version(); + mysql_thr->refresh_variables(); + uint64_t mysql_max_memory_size = static_cast(mysql_thread___query_cache_size_MB * 1024ULL * 1024ULL); + + PgSQL_Thread* pgsql_thr = new PgSQL_Thread(); + unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version; + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version(); + pgsql_thr->refresh_variables(); + uint64_t pgsql_max_memory_size = static_cast(pgsql_thread___query_cache_size_MB * 1024ULL * 1024ULL); + + // Both MySQL and PgSQL query caches use the same shutting_down value + while (GloMyQC->shutting_down == false && GloPgQC->shutting_down == false) { + + // Both MySQL and PgSQL query caches share the same purge_loop_time value. + // Therefore, using either purge_loop_time will have no impact on the behavior. + usleep(GloMyQC->purge_loop_time); + + const unsigned int mysql_glover = GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < mysql_glover) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version = mysql_glover; + mysql_thr->refresh_variables(); + mysql_max_memory_size = static_cast(mysql_thread___query_cache_size_MB * 1024ULL * 1024ULL); + } + GloMyQC->purgeHash(mysql_max_memory_size); + + const unsigned int pgsql_glover = GloPTH->get_global_version(); + if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < pgsql_glover) { + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = pgsql_glover; + pgsql_thr->refresh_variables(); + pgsql_max_memory_size = static_cast(pgsql_thread___query_cache_size_MB * 1024ULL * 1024ULL); + } + GloPgQC->purgeHash(pgsql_max_memory_size); + } + + delete mysql_thr; + delete pgsql_thr; return NULL; } +/*void* pgsql_shared_query_cache_funct(void* arg) { + GloPgQC->purgeHash_thread(NULL); + return NULL; +}*/ void ProxySQL_Main_process_global_variables(int argc, const char **argv) { GloVars.errorlog = NULL; @@ -815,7 +862,8 @@ void ProxySQL_Main_process_global_variables(int argc, const char **argv) { } void ProxySQL_Main_init_main_modules() { - GloQC=NULL; + GloMyQC=NULL; + GloPgQC=NULL; GloMyQPro=NULL; GloMTH=NULL; GloMyAuth=NULL; @@ -946,9 +994,15 @@ void ProxySQL_Main_init_PgSQL_Threads_Handler_module() { } void ProxySQL_Main_init_Query_Cache_module() { - GloQC = new Query_Cache(); - GloQC->print_version(); - pthread_create(&GloQC->purge_thread_id, NULL, mysql_shared_query_cache_funct , NULL); + GloMyQC = new MySQL_Query_Cache(); + GloMyQC->print_version(); + GloPgQC = new PgSQL_Query_Cache(); + GloPgQC->print_version(); + + pthread_t purge_thread_id; + pthread_create(&purge_thread_id, NULL, unified_query_cache_purge_thread, NULL); + GloMyQC->purge_thread_id = purge_thread_id; + GloPgQC->purge_thread_id = purge_thread_id; } void ProxySQL_Main_init_MySQL_Monitor_module() { @@ -1003,10 +1057,12 @@ void ProxySQL_Main_join_all_threads() { std::cerr << "GloPTH joined in "; #endif } - if (GloQC) { - GloQC->shutdown=1; + if (GloMyQC) { + GloMyQC->shutting_down=true; + } + if (GloPgQC) { + GloPgQC->shutting_down=true; } - if (GloMyMon) { GloMyMon->shutdown=true; } @@ -1032,14 +1088,33 @@ void ProxySQL_Main_join_all_threads() { std::cerr << "GloPgMon joined in "; #endif } - // join GloQC thread - if (GloQC) { + /* Unified QC Purge Thread for both MySQL and PgSQL query cache + // join GloMyQC thread + if (GloMyQC) { cpu_timer t; - pthread_join(GloQC->purge_thread_id, NULL); + pthread_join(GloMyQC->purge_thread_id, NULL); #ifdef DEBUG - std::cerr << "GloQC joined in "; + std::cerr << "GloMyQC joined in "; #endif } + // join GloPgQC thread + if (GloPgQC) { + cpu_timer t; + pthread_join(GloPgQC->purge_thread_id, NULL); +#ifdef DEBUG + std::cerr << "GloPgQC joined in "; +#endif + }*/ + if (GloMyQC || GloPgQC) { + cpu_timer t; + // The purge_thread_id is shared by both MySQL and PgSQL. + // use either one to join the thread. + pthread_join(GloMyQC->purge_thread_id, NULL); +#ifdef DEBUG + std::cerr << "GloMyQC and GloPgQC joined in "; +#endif + } + #ifdef DEBUG std::cerr << "All threads joined in "; #endif @@ -1062,12 +1137,20 @@ void ProxySQL_Main_shutdown_all_modules() { std::cerr << "GloPgMon shutdown in "; #endif } - if (GloQC) { + if (GloMyQC) { + cpu_timer t; + delete GloMyQC; + GloMyQC=NULL; +#ifdef DEBUG + std::cerr << "GloMyQC shutdown in "; +#endif + } + if (GloPgQC) { cpu_timer t; - delete GloQC; - GloQC=NULL; + delete GloPgQC; + GloPgQC=NULL; #ifdef DEBUG - std::cerr << "GloQC shutdown in "; + std::cerr << "GloPgQC shutdown in "; #endif } if (GloMyQPro) { diff --git a/test/tap/tap/SQLite3_Server.cpp b/test/tap/tap/SQLite3_Server.cpp index 127432ead..29f7c7f72 100644 --- a/test/tap/tap/SQLite3_Server.cpp +++ b/test/tap/tap/SQLite3_Server.cpp @@ -73,7 +73,7 @@ static bool testTimeoutSequence[] = {true, false, true, false, true, false, true static int testIndex = 7; static int testLag = 10; -extern Query_Cache *GloQC; +extern MySQL_Query_Cache *GloMyQC; extern MySQL_Authentication *GloMyAuth; extern ProxySQL_Admin *GloAdmin; extern MySQL_Query_Processor* GloQPro;