From c0f99c0e15218cea78cac6a9e5c1306c76f90748 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 25 Nov 2025 01:54:35 +0500 Subject: [PATCH] Refactor: Improved Prepared-Statement Cache Design (Lock-Free Hot Path) #5211 Concurrency and Memory Management * Lock-Free Ref Counting: Replaced global mutex-protected integer reference counts with `std::atomic` within `PgSQL_STMT_Global_info`, eliminating lock contention during statement referencing. * Modern Ownership: Adopted std::shared_ptr for global and local storage, providing automatic, thread-safe memory and lifecycle management. * Memory Optimization: Removed redundant auxiliary maps `global_id_to_stmt_names` and `map_stmt_id_to_info` from local and global statement managers respectively, reducing overall memory overhead. * Optimized Purging: Statement removal logic was simplified for efficiently identifying and cleaning up unused statements. Hot Path Performance (`BIND`, `DESCRIBE`, `EXECUTE`) * Bypassed Global Lookups: Local session maps now store the `shared_ptr` directly, removing the need to acquire the global lock and search the global map during hot path operations. * Direct Refcount Manipulation: Refcount modification functions now operate directly on the passed statement object, eliminating the overhead of searching the global map to find the object pointer based on statement id. Safety and Protocol Logic (`PARSE`) * Efficient Statement Reuse: Implemented a **local fast path** check for the unnamed statement (`""`), allowing immediate reuse of an identical query (same hash) upon re-parse, which bypasses global processing and locks. Cleanup * Cleaned up and class rename `PgSQL_STMT_Manager_v14` -> `PgSQL_STMT_Manager`. --- include/PgSQL_Connection.h | 4 +- include/PgSQL_PreparedStatement.h | 340 ++++++++++++--- include/PgSQL_Session.h | 2 +- lib/PgSQL_Connection.cpp | 4 +- lib/PgSQL_PreparedStatement.cpp | 675 ++++++++++++++++-------------- lib/PgSQL_Session.cpp | 207 ++++----- lib/ProxySQL_Admin_Stats.cpp | 2 +- src/main.cpp | 4 +- 8 files changed, 751 insertions(+), 487 deletions(-) diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index 5effde3d0..8f221f382 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -12,7 +12,7 @@ class PgSQL_SrvC; class PgSQL_Query_Result; -class PgSQL_STMTs_local_v14; +class PgSQL_STMT_Local; //class PgSQL_Describe_Prepared_Info; class PgSQL_Bind_Info; //#define STATUS_PGSQL_CONNECTION_SEQUENCE 0x00000001 @@ -639,7 +639,7 @@ public: bool exit_pipeline_mode; // true if it is safe to exit pipeline mode bool resync_failed; // true if the last resync attempt failed - PgSQL_STMTs_local_v14* local_stmts; + PgSQL_STMT_Local* local_stmts; PgSQL_SrvC *parent; PgSQL_Connection_userinfo* userinfo; PgSQL_Data_Stream* myds; diff --git a/include/PgSQL_PreparedStatement.h b/include/PgSQL_PreparedStatement.h index 37ec4cd7d..344373ecb 100644 --- a/include/PgSQL_PreparedStatement.h +++ b/include/PgSQL_PreparedStatement.h @@ -4,27 +4,30 @@ #include "proxysql.h" #include "cpp.h" +static constexpr uint16_t PGSQL_MAX_THREADS = 255; + // class PgSQL_STMT_Global_info represents information about a PgSQL Prepared Statement // it is an internal representation of prepared statement // it include all metadata associated with it class PgSQL_STMT_Global_info { public: - uint64_t digest; - PGSQL_QUERY_command PgQueryCmd; + char* query; char* digest_text; - uint64_t hash; - char *username; - char *dbname; - char *query; + char* first_comment; unsigned int query_length; - int ref_count_client; - int ref_count_server; uint64_t statement_id; - char* first_comment; - uint64_t total_mem_usage; + mutable std::atomic ref_count_client; + mutable std::atomic ref_count_server; Parse_Param_Types parse_param_types;// array of parameter types, used for prepared statements - - PgSQL_STMT_Global_info(uint64_t id, char* u, char* d, char* q, unsigned int ql, char* fc, Parse_Param_Types&& ppt, uint64_t _h); + char* username; + char* dbname; + uint64_t digest; + uint64_t hash; + uint64_t total_mem_usage; + PGSQL_QUERY_command PgQueryCmd; + + PgSQL_STMT_Global_info(uint64_t id, const char* _user, const char* _database, const char* _query, unsigned int _query_len, + Parse_Param_Types&& _ppt, const char* _first_comment, uint64_t _h); ~PgSQL_STMT_Global_info(); void calculate_mem_usage(); @@ -32,85 +35,284 @@ private: void compute_hash(); }; -class PgSQL_STMTs_local_v14 { +// class PgSQL_STMT_Local represents prepared statements local to a session/connection +class PgSQL_STMT_Local { public: - // this map associate client_stmt_id to global_stmt_id : this is used only for client connections - std::map stmt_name_to_global_ids; - // this multimap associate global_stmt_id to client_stmt_id : this is used only for client connections - std::multimap global_id_to_stmt_names; + explicit PgSQL_STMT_Local(bool _ic) : sess(nullptr), local_max_stmt_id(0), is_client_(_ic) { } + ~PgSQL_STMT_Local(); - // this map associate backend_stmt_id to global_stmt_id : this is used only for backend connections - std::map backend_stmt_to_global_ids; - // this map associate global_stmt_id to backend_stmt_id : this is used only for backend connections - std::map global_stmt_to_backend_ids; + inline void set_is_client(PgSQL_Session *_s) { sess=_s; is_client_ = true; } + inline bool is_client() const { return is_client_; } + inline unsigned int get_num_backend_stmts() const { return backend_stmt_to_global_info.size(); } + + /** + * Map client statement name to a global prepared statement. + * + * - If client statement (local_stmt_info_ptr) already references a different global stmt: decrement old refcount, + * increment new, and replace pointer. + * - If client statement (local_stmt_info_ptr) references the same stmt: no-op. + * - Otherwise: insert into stmt_name_to_global_info and increment client refcount. + * + * Parameters: + * stmt_info Global prepared statement (shared_ptr, must be valid). + * client_stmt_name Statement name from client scope. + * local_stmt_info_ptr Optional existing local pointer to update instead of map insert. + */ + void client_insert(std::shared_ptr& stmt_info, const std::string& client_stmt_name, + std::shared_ptr* local_stmt_info_ptr); - PgSQL_Session *sess; - PgSQL_STMTs_local_v14(bool _ic) : sess(NULL), is_client_(_ic) { } - ~PgSQL_STMTs_local_v14(); + /** + * @brief Find statement info by client statement name in stmt_name_to_global_info. + * + * Performs a map lookup using the provided client statement name and returns the + * associated PgSQL_STMT_Global_info pointer, or nullptr if not present. + */ + const PgSQL_STMT_Global_info* find_stmt_info_from_stmt_name(const std::string& client_stmt_name) const; - inline - void set_is_client(PgSQL_Session *_s) { - sess=_s; - is_client_ = true; - } - - inline - bool is_client() { return is_client_; } - inline - unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); } + /** + * Close a client-side prepared statement mapping by its name. + * + * - If the name exists: decrement the global statement's client refcount, + * remove the mapping, return true. + * - If not found: do nothing, return false. + * + * @param client_stmt_name Client statement identifier. + * @return true if a mapping was removed, false otherwise. + */ + bool client_close(const std::string& client_stmt_name); - void backend_insert(uint64_t global_stmt_id, uint32_t backend_stmt_id); - void client_insert(uint64_t global_stmt_id, const std::string& client_stmt_name); - uint64_t compute_hash(const char *user, const char *database, const char *query, unsigned int query_length, - const Parse_Param_Types& param_types); - uint32_t generate_new_backend_stmt_id(); - uint64_t find_global_id_from_stmt_name(const std::string& client_stmt_name); - uint32_t find_backend_stmt_id_from_global_id(uint64_t global_id); - bool client_close(const std::string& stmt_name); + /** + * Close all client-side prepared statement mappings. + * + * Decrements the client refcount for each associated global statement + * and clears the stmt_name_to_global_info map. + */ void client_close_all(); + /** + * Generate a new backend statement ID. + * + * @return A backend statement id. + */ + uint32_t generate_new_backend_stmt_id(); + + /** + * @brief Register a backend prepared statement mapping. + * + * Adds bidirectional associations: + * - backend_stmt_id -> global statement info + * - global statement_id -> backend_stmt_id + */ + void backend_insert(std::shared_ptr& stmt_info, uint32_t backend_stmt_id); + + /** + * @brief Find backend statement ID from global statement ID. + * + * Looks up the backend statement ID associated with the given global statement ID. + * + * @param global_id The global statement ID to look up. + * @return The associated backend statement ID, or 0 if not found. + */ + uint32_t find_backend_stmt_id_from_global_id(uint64_t global_id) const; + + /** + * @brief Compute a unique hash for a prepared statement. + * + * Combines user, database, query, and parameter types into a hash value. + * + * @param user The username. + * @param database The database name. + * @param query The SQL query string. + * @param query_length The length of the query string. + * @param param_types The parameter types for the prepared statement. + * @return A computed hash value representing the prepared statement. + */ + static uint64_t compute_hash(const char* user, const char* database, const char* query, + unsigned int query_length, const Parse_Param_Types& param_types); + private: - bool is_client_; + // this map associate client_stmt_id to global_stmt_info : this is used only for client connections + std::map> stmt_name_to_global_info; + + // this map associate backend_stmt_id to global_stmt_info : this is used only for backend connections + std::map> backend_stmt_to_global_info; + + // this map associate global_stmt_id to backend_stmt_id : this is used only for backend connections + std::map global_stmt_to_backend_ids; + + PgSQL_Session* sess = nullptr; + + // stack of free backend statement ids std::stack free_backend_ids; + + // maximum assigned statement id in this session uint32_t local_max_stmt_id = 0; -}; + // is client session ? + bool is_client_; + + friend class PgSQL_Session; +}; -class PgSQL_STMT_Manager_v14 { +// class PgSQL_STMT_Manager manages global prepared statements across all sessions +class PgSQL_STMT_Manager { public: - PgSQL_STMT_Manager_v14(); - ~PgSQL_STMT_Manager_v14(); - PgSQL_STMT_Global_info* find_prepared_statement_by_hash(uint64_t hash, bool lock=true); - PgSQL_STMT_Global_info* find_prepared_statement_by_stmt_id(uint64_t id, bool lock=true); - inline void rdlock() { pthread_rwlock_rdlock(&rwlock_); } - inline void wrlock() { pthread_rwlock_wrlock(&rwlock_); } - inline void unlock() { pthread_rwlock_unlock(&rwlock_); } - void ref_count_client(uint64_t _stmt, int _v, bool lock=true) noexcept; - void ref_count_server(uint64_t _stmt, int _v, bool lock=true) noexcept; - PgSQL_STMT_Global_info* add_prepared_statement(char *user, char *database, char *query, unsigned int query_len, - char *fc, Parse_Param_Types&& ppt, bool lock=true); + PgSQL_STMT_Manager(); + ~PgSQL_STMT_Manager(); + + /** + * @brief Lookup a prepared statement by its 64-bit hash. + * + * @param hash Unique hash computed from user, db, query, and parameter types. + * @param lock If true acquires/release internal read lock; set false only if caller already holds it. + * @return Shared pointer to global statement info or nullptr if not found. + */ + std::shared_ptr find_prepared_statement_by_hash(uint64_t hash, bool lock=true); + + /** + * @brief Retrieve existing or create a new global prepared statement. + * + * Computes a hash from user, database, query and parameter types. If an entry with that hash + * exists it is returned. Otherwise a new PgSQL_STMT_Global_info is allocated, assigned a + * statement_id (preferring reused IDs from free_stmt_ids), optional digest metadata copied, + * inserted into map_stmt_hash_to_info, and zero-refcount tracking counters updated. + * Always increments the server refcount for the returned statement. + * + * Concurrency: Acquires/release write lock when lock == true. + * + * @param user Client username (null-terminated). + * @param database Database/schema name. + * @param query SQL text. + * @param query_len Length of query. + * @param ppt Vector of parameter type OIDs (moved if new entry is created). + * @param first_comment First comment prefix (optional, may be null). + * @param digest_text Normalized digest text (optional, may be null). + * @param digest 64-bit digest value (used only if digest_text provided). + * @param PgQueryCmd Parsed command classification. + * @param lock If true, method manages locking internally. + * @return shared_ptr to the global prepared statement (never nullptr). + */ + std::shared_ptr add_prepared_statement(const char* user, const char* database, const char* query, + unsigned int query_len, Parse_Param_Types&& ppt, const char* first_comment, const char* digest_text, uint64_t digest, + PGSQL_QUERY_command PgQueryCmd, bool lock = true); + + /** + * @brief Adjust client refcount for a prepared statement and update per-thread stats. + * + * - Applies delta _v (positive or negative) atomically. + * - Maintains zero-refcount tracking counters on transitions to/from zero. + * - Triggers opportunistic purge heuristic after modification. + * + * Thread-safety: uses atomic operations; no external lock required. + * + * @param stmt_info Non-null pointer to global PS metadata. + * @param _v Delta to apply (typically +1 or -1). + */ + void ref_count_client(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept; + + /** + * @brief Adjust server refcount for a prepared statement and update per-thread stats. + * + * - Applies delta _v (positive or negative) atomically. + * - Maintains zero-refcount tracking counters on transitions to/from zero. + * + * Thread-safety: uses atomic operations; no external lock required. + * + * @param stmt_info Non-null pointer to global PS metadata. + * @param _v Delta to apply (typically +1 or -1). + */ + void ref_count_server(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept; + + /** + * @brief Retrieve global prepared statement metrics. + * + * Outputs: + * - c_unique: Number of unique prepared statements with client refcount > 0. + * - c_total: Total number of client references across all prepared statements. + * - stmt_max_stmt_id: Maximum assigned statement ID. + * - cached: Total number of unique prepared statements in the global cache. + * - s_unique: Number of unique prepared statements with server refcount > 0. + * - s_total: Total number of server references across all prepared statements. + */ void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached, uint64_t *s_unique, uint64_t *s_total); - SQLite3_result* get_prepared_statements_global_infos(); + + /** + * @brief Retrieve memory usage statistics for prepared statements. + * + * Outputs: + * - prep_stmt_metadata_mem_usage: Total memory used for prepared statement metadata. + * - prep_stmt_backend_mem_usage: Total memory used for backend prepared statement allocations. + */ void get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage); + /** + * @brief Build and return a snapshot of all global prepared statements. + * + * @return SQLite3_result* containing one row per prepared statement (must be freed by caller). + */ + SQLite3_result* get_prepared_statements_global_infos(); + private: + struct Statistics { + struct Totals { + uint64_t c_total; + uint64_t s_total; + } total; + uint64_t c_unique; + uint64_t s_unique; + uint64_t cached; + }; + + // map from statement hash to global prepared statement info + std::map> map_stmt_hash_to_info; // map using hashes + + // next statement ID to assign if no free IDs are available uint64_t next_statement_id; - uint64_t num_stmt_with_ref_client_count_zero; - uint64_t num_stmt_with_ref_server_count_zero; + + // counters to track number of statements with zero refcounts + std::atomic num_stmt_with_ref_client_count_zero; + + // counters to track number of statements with zero refcounts + std::atomic num_stmt_with_ref_server_count_zero; + + // read-write lock to protect map_stmt_hash_to_info and free_stmt_ids pthread_rwlock_t rwlock_; - std::map map_stmt_id_to_info; // map using statement id - std::map map_stmt_hash_to_info; // map using hashes + + // stack of freed statement IDs for reuses std::stack free_stmt_ids; - struct { - uint64_t c_unique; - uint64_t c_total; - uint64_t stmt_max_stmt_id; - uint64_t cached; - uint64_t s_unique; - uint64_t s_total; - } statuses; + + // last time we purged unused statements time_t last_purge_time; + + // to make lock free status counting per thread, we use thread local storage + inline static thread_local int thd_idx = -1; + std::atomic next_status_idx{}; + + // FIXME: should be equal to number of worker threads configured in ProxySQL + std::array stats_total{}; + + // get per thread statistics totals + inline + Statistics::Totals& get_current_thread_statistics_totals() noexcept { + if (thd_idx == -1) { + thd_idx = next_status_idx.fetch_add(1, std::memory_order_relaxed); + } + return stats_total[thd_idx]; + } + + // read-write lock wrappers + inline void rdlock() noexcept { pthread_rwlock_rdlock(&rwlock_); } + inline void wrlock() noexcept { pthread_rwlock_wrlock(&rwlock_); } + inline void unlock() noexcept { pthread_rwlock_unlock(&rwlock_); } + + /** + * @brief Opportunistically purge unused prepared statements from the global cache. + * + * Concurrency: + * - Fast lock-free pre-check avoids unnecessary write lock. + */ + void ref_count_client___purge_stmts_if_needed() noexcept; }; #endif /* CLASS_PGSQL_PREPARED_STATEMENT_H */ diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 8c391455a..55b156429 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -149,7 +149,7 @@ struct PgSQL_Extended_Query_Info { const char* stmt_client_name; const char* stmt_client_portal_name; const PgSQL_Bind_Message* bind_msg; - PgSQL_STMT_Global_info* stmt_info; + const PgSQL_STMT_Global_info* stmt_info; uint64_t stmt_global_id; uint32_t stmt_backend_id; uint8_t stmt_type; diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index a77da0f74..0a6d0e50d 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -179,7 +179,7 @@ PgSQL_Connection::PgSQL_Connection(bool is_client_conn) { options.init_connect = NULL; options.init_connect_sent = false; userinfo = new PgSQL_Connection_userinfo(); - local_stmts = new PgSQL_STMTs_local_v14(false); // false by default, it is a backend + local_stmts = new PgSQL_STMT_Local(false); // false by default, it is a backend //for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { // variables[i].value = NULL; @@ -2499,7 +2499,7 @@ void PgSQL_Connection::reset() { reusable = true; creation_time = monotonic_time(); delete local_stmts; - local_stmts = new PgSQL_STMTs_local_v14(false); + local_stmts = new PgSQL_STMT_Local(false); // reset all variables for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { diff --git a/lib/PgSQL_PreparedStatement.cpp b/lib/PgSQL_PreparedStatement.cpp index 2d6d7118b..e0621b1ae 100644 --- a/lib/PgSQL_PreparedStatement.cpp +++ b/lib/PgSQL_PreparedStatement.cpp @@ -9,7 +9,7 @@ #include "PgSQL_PreparedStatement.h" #include "PgSQL_Protocol.h" -extern PgSQL_STMT_Manager_v14 *GloPgStmt; +extern PgSQL_STMT_Manager *GloPgStmt; const int PS_GLOBAL_STATUS_FIELD_NUM = 8; @@ -63,25 +63,22 @@ void PgSQL_STMT_Global_info::compute_hash() { query_length, parse_param_types); } -PgSQL_STMT_Global_info::PgSQL_STMT_Global_info(uint64_t id, - char *u, char *d, char *q, - unsigned int ql, - char *fc, - Parse_Param_Types&& ppt, - uint64_t _h) { +PgSQL_STMT_Global_info::PgSQL_STMT_Global_info(uint64_t id, const char *_user, const char *_database, const char *_query, + unsigned int _query_len, Parse_Param_Types&& _ppt, + const char *_first_comment, uint64_t _h) { total_mem_usage = 0; statement_id = id; ref_count_client = 0; ref_count_server = 0; digest_text = nullptr; - username = strdup(u); - dbname = strdup(d); - query = (char *)malloc(ql + 1); - memcpy(query, q, ql); - query[ql] = '\0'; // add NULL byte - query_length = ql; - first_comment = fc ? strdup(fc) : nullptr; - parse_param_types = std::move(ppt); + username = strdup(_user); + dbname = strdup(_database); + query_length = _query_len; + query = (char *)malloc(query_length + 1); + memcpy(query, _query, query_length); + query[query_length] = '\0'; // add NULL byte + first_comment = _first_comment ? strdup(_first_comment) : nullptr; + parse_param_types = std::move(_ppt); PgQueryCmd = PGSQL_QUERY__UNINITIALIZED; if (_h) { @@ -104,8 +101,7 @@ PgSQL_STMT_Global_info::~PgSQL_STMT_Global_info() { } void PgSQL_STMT_Global_info::calculate_mem_usage() { - total_mem_usage = sizeof(PgSQL_STMT_Global_info) + - query_length + 1; + total_mem_usage = sizeof(PgSQL_STMT_Global_info) + query_length + 1; // NOSONAR: strlen is safe here if (username) total_mem_usage += strlen(username) + 1; // NOSONAR @@ -114,182 +110,221 @@ void PgSQL_STMT_Global_info::calculate_mem_usage() { if (digest_text) total_mem_usage += strlen(digest_text) + 1; // NOSONAR } -void PgSQL_STMTs_local_v14::backend_insert(uint64_t global_stmt_id, uint32_t backend_stmt_id) { - global_stmt_to_backend_ids.insert(std::make_pair(global_stmt_id, backend_stmt_id)); - backend_stmt_to_global_ids.insert(std::make_pair(backend_stmt_id,global_stmt_id)); +PgSQL_STMT_Local::~PgSQL_STMT_Local() { + // Note: we do not free the prepared statements because we assume that + // if we call this destructor the connection is being destroyed anyway + if (is_client_) { + for (auto it = stmt_name_to_global_info.begin(); it != stmt_name_to_global_info.end(); ++it) { + auto* stmt_info = it->second.get(); + GloPgStmt->ref_count_client(stmt_info, -1); + } + stmt_name_to_global_info.clear(); + } + else { + for (auto it = backend_stmt_to_global_info.begin(); it != backend_stmt_to_global_info.end(); ++it) { + auto* stmt_info = it->second.get(); + GloPgStmt->ref_count_server(stmt_info, -1); + } + backend_stmt_to_global_info.clear(); + } } -void PgSQL_STMTs_local_v14::client_insert(uint64_t global_stmt_id, const std::string& client_stmt_name) { - // validate that client_stmt_name is not empty and global_stmt_id is a valid id - [[maybe_unused]] auto [it, inserted] = stmt_name_to_global_ids.try_emplace(client_stmt_name, global_stmt_id); - assert(inserted && "client_stmt_name already exists in stmt_name_to_global_ids"); // Should not happen, as we expect unique client_stmt_name -#ifdef DEBUG - auto range = global_id_to_stmt_names.equal_range(global_stmt_id); - for (auto it = range.first; it != range.second; ++it) { - assert(it->second != client_stmt_name && "client_stmt_name is already mapped to global_stmt_id in global_id_to_stmt_names"); // Should not happen, as we expect unique client_stmt_name per global_stmt_id +void PgSQL_STMT_Local::client_insert(std::shared_ptr& stmt_info, + const std::string& client_stmt_name, std::shared_ptr* local_stmt_info_ptr) { + assert(stmt_info); + + if (local_stmt_info_ptr && (*local_stmt_info_ptr)) { + auto& local_stmt_info = *local_stmt_info_ptr; + if (local_stmt_info->statement_id == stmt_info->statement_id) + return; // no change + + // Adjust refcounts: decrement old, increment new + GloPgStmt->ref_count_client(local_stmt_info.get(), -1); + local_stmt_info.reset(); + + GloPgStmt->ref_count_client(stmt_info.get(), 1); + // Update existing entry to new global stmt info one + local_stmt_info = stmt_info; + return; } -#endif - global_id_to_stmt_names.emplace(global_stmt_id, client_stmt_name); - GloPgStmt->ref_count_client(global_stmt_id, 1, false); // do not lock! + + // New statement name — just insert + stmt_name_to_global_info.emplace(client_stmt_name, stmt_info); + GloPgStmt->ref_count_client(stmt_info.get(), 1); +} + +const PgSQL_STMT_Global_info* PgSQL_STMT_Local::find_stmt_info_from_stmt_name(const std::string& client_stmt_name) const { + const PgSQL_STMT_Global_info* ret = nullptr; + if (auto s = stmt_name_to_global_info.find(client_stmt_name); s != stmt_name_to_global_info.end()) { + ret = s->second.get(); + } + return ret; +} + +bool PgSQL_STMT_Local::client_close(const std::string& client_stmt_name) { + if (auto s = stmt_name_to_global_info.find(client_stmt_name); s != stmt_name_to_global_info.end()) { // found + const PgSQL_STMT_Global_info* stmt_info = s->second.get(); + GloPgStmt->ref_count_client(stmt_info, -1); + stmt_name_to_global_info.erase(s); + return true; + } + return false; // we don't really remove the prepared statement +} + +void PgSQL_STMT_Local::client_close_all() { + for (auto& [_, global_stmt_info] : stmt_name_to_global_info) { + GloPgStmt->ref_count_client(global_stmt_info.get(), -1); + } + stmt_name_to_global_info.clear(); } -uint64_t PgSQL_STMTs_local_v14::compute_hash(const char *user, +uint32_t PgSQL_STMT_Local::generate_new_backend_stmt_id() { + assert(is_client_ == false); + if (free_backend_ids.empty() == false) { + uint32_t backend_stmt_id = free_backend_ids.top(); + free_backend_ids.pop(); + return backend_stmt_id; + } + local_max_stmt_id++; + return local_max_stmt_id; +} + +void PgSQL_STMT_Local::backend_insert(std::shared_ptr& stmt_info, uint32_t backend_stmt_id) { + backend_stmt_to_global_info.insert(std::make_pair(backend_stmt_id, stmt_info)); + global_stmt_to_backend_ids.insert(std::make_pair(stmt_info->statement_id, backend_stmt_id)); +} + +uint32_t PgSQL_STMT_Local::find_backend_stmt_id_from_global_id(uint64_t global_id) const { + if (auto s = global_stmt_to_backend_ids.find(global_id); s != global_stmt_to_backend_ids.end()) { + return s->second; + } + return 0; // not found +} + +uint64_t PgSQL_STMT_Local::compute_hash(const char *user, const char *database, const char *query, unsigned int query_length, const Parse_Param_Types& param_types) { uint64_t hash = stmt_compute_hash(user, database, query, query_length, param_types); return hash; } -PgSQL_STMT_Manager_v14::PgSQL_STMT_Manager_v14() { +PgSQL_STMT_Manager::PgSQL_STMT_Manager() { last_purge_time = time(NULL); pthread_rwlock_init(&rwlock_, NULL); next_statement_id = 1; // we initialize this as 1 because we 0 is not allowed num_stmt_with_ref_client_count_zero = 0; num_stmt_with_ref_server_count_zero = 0; - statuses.c_unique = 0; - statuses.c_total = 0; - statuses.stmt_max_stmt_id = 0; - statuses.cached = 0; - statuses.s_unique = 0; - statuses.s_total = 0; + next_status_idx = 0; } -PgSQL_STMT_Manager_v14::~PgSQL_STMT_Manager_v14() { - for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) { - PgSQL_STMT_Global_info * a = it->second; - delete a; - } +PgSQL_STMT_Manager::~PgSQL_STMT_Manager() { + wrlock(); + map_stmt_hash_to_info.clear(); + unlock(); } -void PgSQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lock) noexcept { - if (lock) - wrlock(); - - if (auto s = map_stmt_id_to_info.find(_stmt_id); s != map_stmt_id_to_info.end()) { - statuses.c_total += _v; - PgSQL_STMT_Global_info *stmt_info = s->second; - if (stmt_info->ref_count_client == 0 && _v == 1) { - __sync_sub_and_fetch(&num_stmt_with_ref_client_count_zero,1); - } else { - if (stmt_info->ref_count_client == 1 && _v == -1) { - __sync_add_and_fetch(&num_stmt_with_ref_client_count_zero,1); - } - } - stmt_info->ref_count_client += _v; - time_t ct = time(NULL); - uint64_t num_client_count_zero = __sync_add_and_fetch(&num_stmt_with_ref_client_count_zero, 0); - uint64_t num_server_count_zero = __sync_add_and_fetch(&num_stmt_with_ref_server_count_zero, 0); - - size_t map_size = map_stmt_id_to_info.size(); - if ( - (ct > last_purge_time+1) && - (map_size > (unsigned)pgsql_thread___max_stmts_cache) && - (num_client_count_zero > map_size/10) && - (num_server_count_zero > map_size/10) - ) { // purge only if there is at least 10% gain - last_purge_time = ct; - int max_purge = map_size ; - std::vector torem; - torem.reserve(max_purge); - - for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) { - if (torem.size() >= std::min(static_cast(max_purge), - static_cast(num_client_count_zero))) { - break; - } - PgSQL_STMT_Global_info *a = it->second; - if ((__sync_add_and_fetch(&a->ref_count_client, 0) == 0) && - (a->ref_count_server == 0) ) // this to avoid that IDs are incorrectly reused - { - uint64_t hash = a->hash; - map_stmt_hash_to_info.erase(hash); - __sync_sub_and_fetch(&num_stmt_with_ref_client_count_zero,1); - torem.emplace_back(it->first); - } - } - while (!torem.empty()) { - uint64_t id = torem.back(); - torem.pop_back(); - auto s3 = map_stmt_id_to_info.find(id); - PgSQL_STMT_Global_info *a = s3->second; - if (a->ref_count_server == 0) { - __sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1); - free_stmt_ids.push(id); - } - map_stmt_id_to_info.erase(s3); - statuses.s_total -= a->ref_count_server; - delete a; - } - } +void PgSQL_STMT_Manager::ref_count_client___purge_stmts_if_needed() noexcept { + + /* Heuristic trigger(checked twice : before and after acquiring write lock) : + * 1. At least 1 second since last_purge_time. + * 2. map_stmt_hash_to_info.size() > pgsql_thread___max_stmts_cache. + * 3. >= 10% of entries have client refcount == 0. + * 4. >= 10% of entries have server refcount == 0. + */ + + time_t ct = time(NULL); + if (ct <= last_purge_time + 1) + return; // too soon, skip + + // --- Light pre-check without lock --- + size_t map_size = map_stmt_hash_to_info.size(); + uint64_t num_client_zero = num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed); + uint64_t num_server_zero = num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed); + + if (map_size <= (unsigned)pgsql_thread___max_stmts_cache || + num_client_zero <= map_size / 10 || + num_server_zero <= map_size / 10) { + // Heuristic says no purge needed + return; } - if (lock) - unlock(); -} -void PgSQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lock) noexcept { - if (lock) - wrlock(); - std::map::iterator s; - s = map_stmt_id_to_info.find(_stmt_id); - if (s != map_stmt_id_to_info.end()) { - statuses.s_total += _v; - PgSQL_STMT_Global_info *stmt_info = s->second; - if (stmt_info->ref_count_server == 0 && _v == 1) { - __sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1); - } else { - if (stmt_info->ref_count_server == 1 && _v == -1) { - __sync_add_and_fetch(&num_stmt_with_ref_server_count_zero,1); - } - } - stmt_info->ref_count_server += _v; - } - if (lock) + // --- Now we know we might purge, take write lock --- + wrlock(); + + // Double-check under exclusive lock (authoritative) + ct = time(NULL); + if (ct <= last_purge_time + 1) { unlock(); -} + return; + } -PgSQL_STMTs_local_v14::~PgSQL_STMTs_local_v14() { - // Note: we do not free the prepared statements because we assume that - // if we call this destructor the connection is being destroyed anyway + map_size = map_stmt_hash_to_info.size(); + num_client_zero = num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed); + num_server_zero = num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed); - if (is_client_) { - for (auto it = stmt_name_to_global_ids.begin(); - it != stmt_name_to_global_ids.end(); ++it) { - uint64_t global_stmt_id = it->second; - GloPgStmt->ref_count_client(global_stmt_id, -1); - } - } else { - for (auto it = backend_stmt_to_global_ids.begin(); - it != backend_stmt_to_global_ids.end(); ++it) { - uint64_t global_stmt_id = it->second; - GloPgStmt->ref_count_server(global_stmt_id, -1); - } + if (map_size <= (unsigned)pgsql_thread___max_stmts_cache || + num_client_zero <= map_size / 10 || + num_server_zero <= map_size / 10) { + last_purge_time = ct; + unlock(); + return; } -} + // --- Actual purge happens here under wrlock() --- + last_purge_time = ct; -PgSQL_STMT_Global_info *PgSQL_STMT_Manager_v14::find_prepared_statement_by_hash(uint64_t hash, bool lock) { - PgSQL_STMT_Global_info *ret = nullptr; // assume we do not find it - if (lock) { - rdlock(); - } - - if (auto s = map_stmt_hash_to_info.find(hash); s != map_stmt_hash_to_info.end()) { - ret = s->second; - } + //auto& stat_totals = get_current_thread_statistics_totals(); - if (lock) { - unlock(); + // Determine how many entries we are allowed to remove + size_t remaining_removals = std::min(map_size, static_cast(num_client_zero)); + + /* Purge logic : + * - Iterate statements while remaining_removals > 0. + * - Candidate removal only when shared_ptr use_count() == 1 (only the map holds it). + * - Return its statement_id to free_stmt_ids stack. + * - Decrement zero-refcount counters. + */ + for (auto it = map_stmt_hash_to_info.begin(); it != map_stmt_hash_to_info.end() && remaining_removals > 0; ) { + + auto& global_stmt_info = it->second; + + // use_count() == 1 indicates that only map_stmt_hash_to_info holds a reference, + // meaning there are no other references (from client or server) to this prepared statement. + // So we can safely remove this entry. + if (global_stmt_info.use_count() == 1) { + + // ref_count_client and ref_count_server should both be 0 in this case + assert(global_stmt_info->ref_count_client.load(std::memory_order_relaxed) == 0); + assert(global_stmt_info->ref_count_server.load(std::memory_order_relaxed) == 0); + + // Atomic counters + num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed); + num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed); + + // Free ID + free_stmt_ids.push(global_stmt_info->statement_id); + + // Update totals + //stat_totals.s_total -= global_stmt_info->ref_count_server.load(std::memory_order_relaxed); + + // Safe erase from map while iterating + it = map_stmt_hash_to_info.erase(it); + remaining_removals--; + } else { + ++it; + } } - return ret; + + unlock(); } -PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::find_prepared_statement_by_stmt_id( - uint64_t id, bool lock) { - PgSQL_STMT_Global_info*ret = nullptr; // assume we do not find it +std::shared_ptr PgSQL_STMT_Manager::find_prepared_statement_by_hash(uint64_t hash, bool lock) { + std::shared_ptr ret = nullptr; // assume we do not find it + if (lock) { rdlock(); } - if (auto s = map_stmt_id_to_info.find(id); s != map_stmt_id_to_info.end()) { + if (auto s = map_stmt_hash_to_info.find(hash); s != map_stmt_hash_to_info.end()) { ret = s->second; } @@ -299,64 +334,13 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::find_prepared_statement_by_stmt_ return ret; } -uint32_t PgSQL_STMTs_local_v14::generate_new_backend_stmt_id() { - assert(is_client_ == false); - if (free_backend_ids.empty() == false) { - uint32_t backend_stmt_id = free_backend_ids.top(); - free_backend_ids.pop(); - return backend_stmt_id; - } - local_max_stmt_id++; - return local_max_stmt_id; -} - -uint64_t PgSQL_STMTs_local_v14::find_global_id_from_stmt_name(const std::string& client_stmt_name) { - uint64_t ret=0; - if (auto s = stmt_name_to_global_ids.find(client_stmt_name); s != stmt_name_to_global_ids.end()) { - ret = s->second; - } - return ret; -} - -uint32_t PgSQL_STMTs_local_v14::find_backend_stmt_id_from_global_id(uint64_t global_id) { - if (auto s = global_stmt_to_backend_ids.find(global_id); s != global_stmt_to_backend_ids.end()) { - return s->second; - } - return 0; // not found -} - -bool PgSQL_STMTs_local_v14::client_close(const std::string& stmt_name) { - if (auto s = stmt_name_to_global_ids.find(stmt_name); s != stmt_name_to_global_ids.end()) { // found - uint64_t global_stmt_id = s->second; - stmt_name_to_global_ids.erase(s); - GloPgStmt->ref_count_client(global_stmt_id, -1); - std::pair::iterator, std::multimap::iterator> ret; - ret = global_id_to_stmt_names.equal_range(global_stmt_id); - for (std::multimap::iterator it=ret.first; it!=ret.second; ++it) { - if (it->second == stmt_name) { - global_id_to_stmt_names.erase(it); - break; - } - } - return true; - } - return false; // we don't really remove the prepared statement -} +std::shared_ptr PgSQL_STMT_Manager::add_prepared_statement(const char* user, const char* database, const char* query, + unsigned int query_len, Parse_Param_Types&& ppt, const char* first_comment, const char* digest_text, uint64_t digest, + PGSQL_QUERY_command PgQueryCmd, bool lock) { + std::shared_ptr ret = nullptr; -void PgSQL_STMTs_local_v14::client_close_all() { - for (auto [_, global_stmt_id] : stmt_name_to_global_ids) { - GloPgStmt->ref_count_client(global_stmt_id, -1); - } - stmt_name_to_global_ids.clear(); - global_id_to_stmt_names.clear(); -} + uint64_t hash = stmt_compute_hash(user, database, query, query_len, ppt); // this identifies the prepared statement -PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement( - char *u, char *d, char *q, unsigned int ql, - char *fc, Parse_Param_Types&& ppt, bool lock) { - PgSQL_STMT_Global_info *ret = nullptr; - uint64_t hash = stmt_compute_hash( - u, d, q, ql, ppt); // this identifies the prepared statement if (lock) { wrlock(); } @@ -374,105 +358,178 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement( next_statement_id++; } - auto stmt_info = std::make_unique(next_id, u, d, q, ql, fc, std::move(ppt), hash); - // insert it in both maps - map_stmt_id_to_info.insert(std::make_pair(stmt_info->statement_id, stmt_info.get())); - map_stmt_hash_to_info.insert(std::make_pair(stmt_info->hash, stmt_info.get())); - ret = stmt_info.release(); - __sync_add_and_fetch(&num_stmt_with_ref_client_count_zero,1); - __sync_add_and_fetch(&num_stmt_with_ref_server_count_zero,1); - } - if (ret->ref_count_server == 0) { - __sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1); + auto stmt_info = std::make_shared(next_id, user, database, query, query_len, std::move(ppt), first_comment, hash); + + if (digest_text) { + stmt_info->digest_text = strdup(digest_text); + stmt_info->digest = digest; // copy digest + stmt_info->PgQueryCmd = PgQueryCmd; // copy PgComQueryCmd + stmt_info->calculate_mem_usage(); + } + + ret = std::move(stmt_info); + + //map_stmt_hash_to_info[ret->hash] = ret; + map_stmt_hash_to_info.emplace(ret->hash, ret); + + num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed); + num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed); } - ret->ref_count_server++; - statuses.s_total++; + + // Server refcount increment logic + ref_count_server(ret.get(), 1); + if (lock) { unlock(); } return ret; } +void PgSQL_STMT_Manager::ref_count_client(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept { + assert(stmt_info); -void PgSQL_STMT_Manager_v14::get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage) { - prep_stmt_backend_mem_usage = 0; - prep_stmt_metadata_mem_usage = sizeof(PgSQL_STMT_Manager_v14); - rdlock(); - prep_stmt_metadata_mem_usage += map_stmt_id_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*)); - prep_stmt_metadata_mem_usage += map_stmt_hash_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*)); - prep_stmt_metadata_mem_usage += free_stmt_ids.size() * (sizeof(uint64_t)); - for (const auto&[key, value] : map_stmt_id_to_info) { - const PgSQL_STMT_Global_info* stmt_global_info = value; - prep_stmt_metadata_mem_usage += stmt_global_info->total_mem_usage; - prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_server * 16; // ~16 bytes of memory utilized by global_stmt_id and stmt_id mappings - prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_client * 40; // ~40 bytes of memory utilized by client_stmt_name and global_stmt_id mappings; + auto& stat_totals = get_current_thread_statistics_totals(); - // backend - prep_stmt_backend_mem_usage += stmt_global_info->ref_count_server; // FIXME: add backend memory usage + stat_totals.c_total += _v; + + if (_v == 1) { + // increment: relaxed is fine for performance + int prev = stmt_info->ref_count_client.fetch_add(1, std::memory_order_relaxed); + // if prev was 0 -> we transitioned 0 -> 1: one fewer zero-count entry + if (prev == 0) { + num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed); + } + } + else if (_v == -1) { + // decrement: use acq_rel to synchronize-with potential deleter + int prev = stmt_info->ref_count_client.fetch_sub(1, std::memory_order_acq_rel); + // prev is the value before subtraction + if (prev == 1) { + // we just transitioned to zero + num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed); + } + } + else { + // support other increments/decrements (if needed) + int prev = stmt_info->ref_count_client.fetch_add(_v, + (_v > 0) ? std::memory_order_relaxed : std::memory_order_acq_rel); + if (_v > 0 && prev == 0) num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed); + if (_v < 0 && prev + _v == 0) num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed); + } + + ref_count_client___purge_stmts_if_needed(); +} + +void PgSQL_STMT_Manager::ref_count_server(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept { + assert(stmt_info); + + auto& stat_totals = get_current_thread_statistics_totals(); + + stat_totals.s_total += _v; + + if (_v == 1) { + int prev = stmt_info->ref_count_server.fetch_add(1, std::memory_order_relaxed); + if (prev == 0) { + num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed); + } + } + else if (_v == -1) { + int prev = stmt_info->ref_count_server.fetch_sub(1, std::memory_order_acq_rel); + if (prev == 1) { + num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed); + } + } + else { + int prev = stmt_info->ref_count_server.fetch_add(_v, + (_v > 0) ? std::memory_order_relaxed : std::memory_order_acq_rel); + if (_v > 0 && prev == 0) num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed); + if (_v < 0 && prev + _v == 0) num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed); } - unlock(); } -void PgSQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total, - uint64_t *stmt_max_stmt_id, uint64_t *cached, - uint64_t *s_unique, uint64_t *s_total) { +void PgSQL_STMT_Manager::get_metrics(uint64_t* c_unique, uint64_t* c_total, uint64_t* stmt_max_stmt_id, uint64_t* cached, + uint64_t* s_unique, uint64_t* s_total) { + #ifdef DEBUG uint64_t c_u = 0; uint64_t c_t = 0; - uint64_t m = 0; + //uint64_t m = 0; uint64_t c = 0; uint64_t s_u = 0; uint64_t s_t = 0; #endif - wrlock(); - statuses.cached = map_stmt_id_to_info.size(); - statuses.c_unique = statuses.cached - num_stmt_with_ref_client_count_zero; - statuses.s_unique = statuses.cached - num_stmt_with_ref_server_count_zero; + Statistics stats{}; + + for (int i = 0; i < next_status_idx.load(std::memory_order_relaxed); ++i) { + stats.total.c_total += stats_total[i].c_total; + stats.total.s_total += stats_total[i].s_total; + } + + stats.cached = map_stmt_hash_to_info.size(); + stats.c_unique = stats.cached - num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed); + stats.s_unique = stats.cached - num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed); #ifdef DEBUG - for (std::map::iterator it = map_stmt_id_to_info.begin(); - it != map_stmt_id_to_info.end(); ++it) { - const PgSQL_STMT_Global_info *a = it->second; + rdlock(); + for (const auto& [_, value] : map_stmt_hash_to_info) { + const PgSQL_STMT_Global_info* a = value.get(); c++; - if (a->ref_count_client) { + if (a->ref_count_client.load(std::memory_order_relaxed)) { c_u++; - c_t += a->ref_count_client; + c_t += a->ref_count_client.load(std::memory_order_relaxed); } - if (a->ref_count_server) { + if (a->ref_count_server.load(std::memory_order_relaxed)) { s_u++; - s_t += a->ref_count_server; - } - if (it->first > m) { - m = it->first; + s_t += a->ref_count_server.load(std::memory_order_relaxed); } + //if (it->first > m) { + // m = it->first; + //} } - assert (c_u == statuses.c_unique); - assert (c_t == statuses.c_total); - assert (c == statuses.cached); - assert (s_t == statuses.s_total); - assert (s_u == statuses.s_unique); - *stmt_max_stmt_id = m; -#endif - *stmt_max_stmt_id = next_statement_id; // this is max stmt_id, no matter if in used or not - *c_unique = statuses.c_unique; - *c_total = statuses.c_total; - *cached = statuses.cached; - *s_total = statuses.s_total; - *s_unique = statuses.s_unique; unlock(); + assert(c_u == stats.c_unique); + assert(c_t == stats.total.c_total); + assert(c == stats.cached); + assert(s_t == stats.total.s_total); + assert(s_u == stats.s_unique); + //*stmt_max_stmt_id = m; +#endif + * stmt_max_stmt_id = next_statement_id; // this is max stmt_id, no matter if in used or not + *c_unique = stats.c_unique; + *c_total = stats.total.c_total; + *cached = stats.cached; + *s_total = stats.total.s_total; + *s_unique = stats.s_unique; } +void PgSQL_STMT_Manager::get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage) { + prep_stmt_backend_mem_usage = 0; + prep_stmt_metadata_mem_usage = sizeof(PgSQL_STMT_Manager); + rdlock(); + prep_stmt_metadata_mem_usage += map_stmt_hash_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*)); + prep_stmt_metadata_mem_usage += free_stmt_ids.size() * (sizeof(uint64_t)); + for (const auto& [_, value] : map_stmt_hash_to_info) { + const PgSQL_STMT_Global_info* stmt_global_info = value.get(); + prep_stmt_metadata_mem_usage += stmt_global_info->total_mem_usage; + prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_server * ((sizeof(uint64_t) * 2) + sizeof(std::shared_ptr)); + prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_client * (sizeof(std::string) + 16 + sizeof(std::shared_ptr)); + + // backend + prep_stmt_backend_mem_usage += stmt_global_info->ref_count_server; // FIXME: add backend memory usage + } + unlock(); +} class PgSQL_PS_global_stats { - public: +public: uint64_t statement_id; - char *username; - char *dbname; + char* username; + char* dbname; uint64_t digest; unsigned long long ref_count_client; unsigned long long ref_count_server; - char *query; + char* query; int num_param_types; - PgSQL_PS_global_stats(uint64_t stmt_id, const char *d, const char *u, uint64_t dig, const char *q, + PgSQL_PS_global_stats(uint64_t stmt_id, const char* d, const char* u, uint64_t dig, const char* q, unsigned long long ref_c, unsigned long long ref_s, int params) { statement_id = stmt_id; digest = dig; @@ -484,38 +541,38 @@ class PgSQL_PS_global_stats { num_param_types = params; } ~PgSQL_PS_global_stats() { - if (query) + if (query) free(query); if (username) free(username); if (dbname) free(dbname); } - char **get_row() { + char** get_row() { char buf[128]; - char **pta=(char **)malloc(sizeof(char *)*PS_GLOBAL_STATUS_FIELD_NUM); - snprintf(buf,sizeof(buf),"%lu",statement_id); - pta[0]=strdup(buf); + char** pta = (char**)malloc(sizeof(char*) * PS_GLOBAL_STATUS_FIELD_NUM); + snprintf(buf, sizeof(buf), "%lu", statement_id); + pta[0] = strdup(buf); assert(dbname); - pta[1]=strdup(dbname); + pta[1] = strdup(dbname); assert(username); - pta[2]=strdup(username); - snprintf(buf,sizeof(buf),"0x%016llX", (long long unsigned int)digest); - pta[3]=strdup(buf); + pta[2] = strdup(username); + snprintf(buf, sizeof(buf), "0x%016llX", (long long unsigned int)digest); + pta[3] = strdup(buf); assert(query); - pta[4]=strdup(query); - snprintf(buf,sizeof(buf),"%llu",ref_count_client); - pta[5]=strdup(buf); - snprintf(buf,sizeof(buf),"%llu",ref_count_server); - pta[6]=strdup(buf); - snprintf(buf,sizeof(buf),"%d",num_param_types); - pta[7]=strdup(buf); + pta[4] = strdup(query); + snprintf(buf, sizeof(buf), "%llu", ref_count_client); + pta[5] = strdup(buf); + snprintf(buf, sizeof(buf), "%llu", ref_count_server); + pta[6] = strdup(buf); + snprintf(buf, sizeof(buf), "%d", num_param_types); + pta[7] = strdup(buf); return pta; } - void free_row(char **pta) { + void free_row(char** pta) { int i; - for (i=0;i(PS_GLOBAL_STATUS_FIELD_NUM); - result->add_column_definition(SQLITE_TEXT,"stmt_id"); - result->add_column_definition(SQLITE_TEXT,"database"); - result->add_column_definition(SQLITE_TEXT,"username"); - result->add_column_definition(SQLITE_TEXT,"digest"); - result->add_column_definition(SQLITE_TEXT,"query"); - result->add_column_definition(SQLITE_TEXT,"ref_count_client"); - result->add_column_definition(SQLITE_TEXT,"ref_count_server"); - result->add_column_definition(SQLITE_TEXT,"num_param_types"); + result->add_column_definition(SQLITE_TEXT, "stmt_id"); + result->add_column_definition(SQLITE_TEXT, "database"); + result->add_column_definition(SQLITE_TEXT, "username"); + result->add_column_definition(SQLITE_TEXT, "digest"); + result->add_column_definition(SQLITE_TEXT, "query"); + result->add_column_definition(SQLITE_TEXT, "ref_count_client"); + result->add_column_definition(SQLITE_TEXT, "ref_count_server"); + result->add_column_definition(SQLITE_TEXT, "num_param_types"); rdlock(); - for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) { - PgSQL_STMT_Global_info *stmt_global_info = it->second; + for (auto it = map_stmt_hash_to_info.begin(); it != map_stmt_hash_to_info.end(); ++it) { + const PgSQL_STMT_Global_info* stmt_global_info = it->second.get(); auto pgs = std::make_unique(stmt_global_info->statement_id, stmt_global_info->dbname, stmt_global_info->username, stmt_global_info->hash, stmt_global_info->query, - stmt_global_info->ref_count_client, stmt_global_info->ref_count_server, stmt_global_info->parse_param_types.size()); - char **pta = pgs->get_row(); + stmt_global_info->ref_count_client.load(std::memory_order_relaxed), stmt_global_info->ref_count_server.load(std::memory_order_relaxed), + stmt_global_info->parse_param_types.size()); + char** pta = pgs->get_row(); result->add_row(pta); pgs->free_row(pta); } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 46992a941..695f03595 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -89,7 +89,7 @@ extern PgSQL_Authentication* GloPgAuth; extern MySQL_LDAP_Authentication* GloMyLdapAuth; extern ProxySQL_Admin* GloAdmin; extern PgSQL_Logger* GloPgSQL_Logger; -extern PgSQL_STMT_Manager_v14* GloPgStmt; +extern PgSQL_STMT_Manager* GloPgStmt; extern SQLite3_Server* GloSQLite3Server; @@ -506,7 +506,10 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { } //j["conn"]["no_backslash_escapes"] = client_myds->myconn->options.no_backslash_escapes; //j["conn"]["status"]["compression"] = client_myds->myconn->get_status(STATUS_PGSQL_CONNECTION_COMPRESSION); - j["conn"]["ps"]["stmt_name_to_global_ids"] = client_myds->myconn->local_stmts->stmt_name_to_global_ids; + json& stmt_name_to_global_ids = j["client"]["ps"]["stmt_name_to_global_ids"]; + for (const auto& [stmt_name, global_stmt_info] : client_myds->myconn->local_stmts->stmt_name_to_global_info) { + stmt_name_to_global_ids[stmt_name.c_str()] = global_stmt_info->statement_id; + } //j["conn"]["ps"]["global_id_to_stmt_names"] = client_myds->myconn->local_stmts->global_id_to_stmt_names; const PgSQL_Conn_Param& conn_params = client_myds->myconn->conn_params; @@ -550,11 +553,8 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { j["backends"][i]["conn"]["questions"] = _myconn->statuses.questions; j["backends"][i]["conn"]["pgconnpoll_get"] = _myconn->statuses.pgconnpoll_get; j["backends"][i]["conn"]["pgconnpoll_put"] = _myconn->statuses.pgconnpoll_put; - //j["backend"][i]["conn"]["charset"] = _myds->myconn->options.charset; // not used for backend - //j["backends"][i]["conn"]["session_track_gtids"] = (_myconn->options.session_track_gtids ? _myconn->options.session_track_gtids : ""); j["backends"][i]["conn"]["init_connect"] = (_myconn->options.init_connect ? _myconn->options.init_connect : ""); j["backends"][i]["conn"]["init_connect_sent"] = _myds->myconn->options.init_connect_sent; - //j["backends"][i]["conn"]["standard_conforming_strings"] = _myconn->options.no_backslash_escapes; j["backends"][i]["conn"]["status"]["advisory_lock"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_ADVISORY_LOCK); j["backends"][i]["conn"]["status"]["advisory_xact_lock"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_ADVISORY_XACT_LOCK); j["backends"][i]["conn"]["status"]["lock_tables"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_LOCK_TABLES); @@ -578,12 +578,13 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { } j["backends"][i]["conn"]["MultiplexDisabled_ext"] = multiplex_disabled; } - j["backends"][i]["conn"]["ps"]["backend_stmt_to_global_ids"] = _myconn->local_stmts->backend_stmt_to_global_ids; + + json& backend_stmt_to_global_ids = j["backends"][i]["conn"]["ps"]["backend_stmt_to_global_ids"]; + for (const auto& [backend_stmt, global_stmt_info] : client_myds->myconn->local_stmts->backend_stmt_to_global_info) { + backend_stmt_to_global_ids[backend_stmt] = global_stmt_info->statement_id; + } + j["backends"][i]["conn"]["ps"]["global_stmt_to_backend_ids"] = _myconn->local_stmts->global_stmt_to_backend_ids; - //j["backends"][i]["conn"]["client_flag"]["value"] = _myconn->options.client_flag; - //j["backends"][i]["conn"]["client_flag"]["client_found_rows"] = (_myconn->options.client_flag & CLIENT_FOUND_ROWS ? 1 : 0); - //j["backends"][i]["conn"]["client_flag"]["client_multi_statements"] = (_myconn->options.client_flag & CLIENT_MULTI_STATEMENTS ? 1 : 0); - //j["backends"][i]["conn"]["client_flag"]["client_deprecate_eof"] = (_myconn->options.client_flag & CLIENT_DEPRECATE_EOF ? 1 : 0); if (_myconn->is_connected()) { sprintf(buff, "%p", _myconn->get_pg_connection()); j["backends"][i]["conn"]["pgsql"]["address"] = buff; @@ -5643,36 +5644,31 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg } } - // if the same statement name is used, we drop it - PgSQL_STMTs_local_v14* local_stmts = client_myds->myconn->local_stmts; - std::string stmt_name(extended_query_info.stmt_client_name); + // If a client provides a statement name that already exists in the local map, + // validate whether it can be reused. Only the *unnamed* statement ("") may be redefined. + PgSQL_STMT_Local* local_stmts = client_myds->myconn->local_stmts; + std::string client_stmt_name(extended_query_info.stmt_client_name); - if (auto it = local_stmts->stmt_name_to_global_ids.find(stmt_name); - it != local_stmts->stmt_name_to_global_ids.end()) { + // Try to find an existing statement entry for this name in Local map + auto local_stmt_info_itr = local_stmts->stmt_name_to_global_info.find(client_stmt_name); - if (!stmt_name.empty()) { - const std::string& errmsg = "prepared statement \"" + stmt_name + "\" already exist"; + // ---------------------------------------------------------------------- + // 1) Reject redefinition of a named prepared statement + // (only the empty statement name is allowed to be overwritten). + // ---------------------------------------------------------------------- + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + if (!client_stmt_name.empty()) { + const std::string& errmsg = "prepared statement \"" + client_stmt_name + "\" already exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_DUPLICATE_PSTATEMENT, errmsg.c_str(), false); l_free(parse_pkt.size, parse_pkt.ptr); return 2; } - - uint64_t global_id = it->second; - auto range = local_stmts->global_id_to_stmt_names.equal_range(global_id); - - for (auto iter = range.first; iter != range.second; ++iter) { - if (iter->second == stmt_name) { - local_stmts->global_id_to_stmt_names.erase(iter); - break; - } - } - - local_stmts->stmt_name_to_global_ids.erase(it); - local_stmts->client_close(stmt_name); } - // Hash the query + // ---------------------------------------------------------------------- + // 2) Compute hash for current query + // ---------------------------------------------------------------------- uint64_t hash = local_stmts->compute_hash( client_myds->myconn->userinfo->username, client_myds->myconn->userinfo->dbname, @@ -5681,24 +5677,63 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg CurrentQuery.extended_query_info.parse_param_types ); - // Check global statement cache - GloPgStmt->wrlock(); - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash, false); + // ---------------------------------------------------------------------- + // 3) Local-cache fast path: + // If the client already prepared this same SQL under the same name, + // and the hash matches, reuse the existing global statement. + // ---------------------------------------------------------------------- + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + auto& local_stmt_info = local_stmt_info_itr->second; + + // Exact match found; treat as a parse success and continue normally. + if (local_stmt_info && local_stmt_info->hash == hash) { + extended_query_info.stmt_global_id = local_stmt_info->statement_id; + client_myds->setDSS_STATE_QUERY_SENT_NET(); + char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I'; + bool send_ready_packet = is_extended_query_ready_for_query(); + client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state); + RequestEnd(NULL, false); + l_free(parse_pkt.size, parse_pkt.ptr); + return 0; + } + } + + // ---------------------------------------------------------------------- + // 4) Global-cache lookup: + // If another session already prepared an identical statement (same hash), + // link the local name to that shared global entry. + // ---------------------------------------------------------------------- + auto stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash); if (stmt_info) { - local_stmts->client_insert(stmt_info->statement_id, stmt_name); + std::shared_ptr* local_stmt_info_ptr = nullptr; + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + local_stmt_info_ptr = &local_stmt_info_itr->second; // reference to shared_ptr inside map + } + local_stmts->client_insert(stmt_info, client_stmt_name, local_stmt_info_ptr); extended_query_info.stmt_global_id = stmt_info->statement_id; - GloPgStmt->unlock(); client_myds->setDSS_STATE_QUERY_SENT_NET(); char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I'; bool send_ready_packet = is_extended_query_ready_for_query(); client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state); - //LogQuery(nullptr); - //CurrentQuery.end_time = thread->curtime; RequestEnd(NULL, false); l_free(parse_pkt.size, parse_pkt.ptr); return 0; } - GloPgStmt->unlock(); + + // ---------------------------------------------------------------------- + // 5) The local name is being reused but does not match the SQL/hash. + // Clean up the old entry before creating a new global statement later. + // ---------------------------------------------------------------------- + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + auto& local_stmt_info = local_stmt_info_itr->second; + + // Decrement global reference and remove stale local pointer + if (local_stmt_info) { + GloPgStmt->ref_count_client(local_stmt_info.get(), -1); + local_stmt_info.reset(); + } + local_stmts->stmt_name_to_global_info.erase(local_stmt_info_itr); + } if (extended_query_frame.empty() == true) { extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC; @@ -5774,29 +5809,21 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des } assert(stmt_client_name); - uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); - if (stmt_global_id == 0) { + // Look up an existing local statement info for client-provided statement name + const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name); + if (!stmt_info) { const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : "unnamed prepared statement does not exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); return 2; } - // now we get the statement information - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id); - if (stmt_info == NULL) { - // we couldn't find it - const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : - "unnamed prepared statement does not exist"; - handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); - return 2; - } // describe_msg memory will be freed in pgsql_real_query.end() // CurrentQuery.stmt_client_name may briefly become a dangling pointer until CurrentQuery.end() is invoked PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; extended_query_info.stmt_client_name = stmt_client_name; extended_query_info.stmt_client_portal_name = portal_name; - extended_query_info.stmt_global_id = stmt_global_id; + extended_query_info.stmt_global_id = stmt_info->statement_id; extended_query_info.stmt_info = stmt_info; extended_query_info.stmt_type = stmt_type; CurrentQuery.start_time = thread->curtime; @@ -5921,18 +5948,9 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) { return 2; } - uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); - if (stmt_global_id == 0) { - const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : - "unnamed prepared statement does not exist"; - handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); - return 2; - } - - // now we get the statement information - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id); - if (stmt_info == NULL) { - // we couldn't find it + // Look up an existing local statement info for client-provided statement name + const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name); + if (!stmt_info) { const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : "unnamed prepared statement does not exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); @@ -5942,7 +5960,7 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) { PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; extended_query_info.stmt_client_name = stmt_client_name; extended_query_info.stmt_client_portal_name = portal_name; - extended_query_info.stmt_global_id = stmt_global_id; + extended_query_info.stmt_global_id = stmt_info->statement_id; extended_query_info.stmt_info = stmt_info; CurrentQuery.start_time = thread->curtime; @@ -6033,18 +6051,10 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu // bind_waiting_for_execute will be released on CurrentQuery.end() call or session destory const char* stmt_client_name = bind_waiting_for_execute->data().stmt_name; - uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); - if (stmt_global_id == 0) { - const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : - "unnamed prepared statement does not exist"; - handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); - return 2; - } - // now we get the statement information - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id); - if (stmt_info == NULL) { - // we couldn't find it + // Look up an existing local statement info for client-provided statement name + const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name); + if (!stmt_info) { const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : "unnamed prepared statement does not exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); @@ -6054,7 +6064,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; extended_query_info.stmt_client_portal_name = portal_name; extended_query_info.stmt_client_name = stmt_client_name; - extended_query_info.stmt_global_id = stmt_global_id; + extended_query_info.stmt_global_id = stmt_info->statement_id; extended_query_info.stmt_info = stmt_info; extended_query_info.bind_msg = bind_waiting_for_execute.get(); extended_query_info.flags |= execute_msg->send_describe_portal_result ? @@ -6371,32 +6381,24 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_E bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& st, PgSQL_Data_Stream* myds) { thread->status_variables.stvar[st_var_backend_stmt_prepare]++; - uint64_t global_stmtid; - - PgSQL_STMT_Global_info* stmt_info = NULL; - GloPgStmt->wrlock(); - stmt_info = GloPgStmt->add_prepared_statement( - (char*)client_myds->myconn->userinfo->username, - (char*)client_myds->myconn->userinfo->dbname, - (char*)CurrentQuery.QueryPointer, + + auto stmt_info = GloPgStmt->add_prepared_statement( + client_myds->myconn->userinfo->username, + client_myds->myconn->userinfo->dbname, + (const char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, - CurrentQuery.QueryParserArgs.first_comment, std::move(CurrentQuery.extended_query_info.parse_param_types), - false); + CurrentQuery.QueryParserArgs.first_comment, + CurrentQuery.QueryParserArgs.digest_text, + CurrentQuery.QueryParserArgs.digest, + CurrentQuery.PgQueryCmd + ); assert(stmt_info); // GloPgStmt->add_prepared_statement() should always return a valid pointer - if (CurrentQuery.QueryParserArgs.digest_text) { - if (stmt_info->digest_text == NULL) { - stmt_info->digest_text = strdup(CurrentQuery.QueryParserArgs.digest_text); - stmt_info->digest = CurrentQuery.QueryParserArgs.digest; // copy digest - stmt_info->PgQueryCmd = CurrentQuery.PgQueryCmd; // copy PgComQueryCmd - stmt_info->calculate_mem_usage(); - } - } + PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; - extended_query_info.stmt_info = stmt_info; - global_stmtid = stmt_info->statement_id; - - myds->myconn->local_stmts->backend_insert(global_stmtid, extended_query_info.stmt_backend_id); + extended_query_info.stmt_info = stmt_info.get(); + + myds->myconn->local_stmts->backend_insert(stmt_info, extended_query_info.stmt_backend_id); st = status; if (previous_status.empty() == false) { @@ -6405,15 +6407,18 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s myds->DSS = STATE_MARIADB_GENERIC; st = previous_status.top(); previous_status.pop(); - GloPgStmt->unlock(); + return true; } // We only perform the client_insert when there is no previous status, this // is, when 'PROCESSING_STMT_PREPARE' is reached directly without transitioning from a previous status // like 'PROCESSING_STMT_EXECUTE'. assert(extended_query_info.stmt_client_name); - client_myds->myconn->local_stmts->client_insert(global_stmtid, extended_query_info.stmt_client_name); - GloPgStmt->unlock(); +#ifdef DEBUG + auto* stmt_info_dbg = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(extended_query_info.stmt_client_name); + assert(stmt_info_dbg == nullptr); +#endif + client_myds->myconn->local_stmts->client_insert(stmt_info, extended_query_info.stmt_client_name, nullptr); return false; } diff --git a/lib/ProxySQL_Admin_Stats.cpp b/lib/ProxySQL_Admin_Stats.cpp index 9c7c84a2e..1f8b500cd 100644 --- a/lib/ProxySQL_Admin_Stats.cpp +++ b/lib/ProxySQL_Admin_Stats.cpp @@ -40,7 +40,7 @@ extern ProxySQL_Admin *GloAdmin; extern MySQL_Threads_Handler *GloMTH; extern PgSQL_Threads_Handler* GloPTH; extern MySQL_STMT_Manager_v14 *GloMyStmt; -extern PgSQL_STMT_Manager_v14* GloPgStmt; +extern PgSQL_STMT_Manager* GloPgStmt; extern MySQL_Query_Processor* GloMyQPro; extern PgSQL_Query_Processor* GloPgQPro; extern ProxySQL_Cluster *GloProxyCluster; diff --git a/src/main.cpp b/src/main.cpp index 52ee9ef4a..aa78d0f79 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -479,7 +479,7 @@ MySQL_Threads_Handler *GloMTH = NULL; PgSQL_Threads_Handler* GloPTH = NULL; Web_Interface *GloWebInterface; MySQL_STMT_Manager_v14 *GloMyStmt; -PgSQL_STMT_Manager_v14 *GloPgStmt; +PgSQL_STMT_Manager *GloPgStmt; MySQL_Monitor *GloMyMon; PgSQL_Monitor *GloPgMon; @@ -923,7 +923,7 @@ void ProxySQL_Main_init_main_modules() { GloPgSQL_Logger = new PgSQL_Logger(); GloPgSQL_Logger->print_version(); GloMyStmt=new MySQL_STMT_Manager_v14(); - GloPgStmt=new PgSQL_STMT_Manager_v14(); + GloPgStmt=new PgSQL_STMT_Manager(); PgHGM = new PgSQL_HostGroups_Manager(); PgHGM->init(); PgSQL_Threads_Handler* _tmp_GloPTH = NULL;