diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index 5effde3d0..8f221f382 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -12,7 +12,7 @@ class PgSQL_SrvC; class PgSQL_Query_Result; -class PgSQL_STMTs_local_v14; +class PgSQL_STMT_Local; //class PgSQL_Describe_Prepared_Info; class PgSQL_Bind_Info; //#define STATUS_PGSQL_CONNECTION_SEQUENCE 0x00000001 @@ -639,7 +639,7 @@ public: bool exit_pipeline_mode; // true if it is safe to exit pipeline mode bool resync_failed; // true if the last resync attempt failed - PgSQL_STMTs_local_v14* local_stmts; + PgSQL_STMT_Local* local_stmts; PgSQL_SrvC *parent; PgSQL_Connection_userinfo* userinfo; PgSQL_Data_Stream* myds; diff --git a/include/PgSQL_PreparedStatement.h b/include/PgSQL_PreparedStatement.h index 37ec4cd7d..344373ecb 100644 --- a/include/PgSQL_PreparedStatement.h +++ b/include/PgSQL_PreparedStatement.h @@ -4,27 +4,30 @@ #include "proxysql.h" #include "cpp.h" +static constexpr uint16_t PGSQL_MAX_THREADS = 255; + // class PgSQL_STMT_Global_info represents information about a PgSQL Prepared Statement // it is an internal representation of prepared statement // it include all metadata associated with it class PgSQL_STMT_Global_info { public: - uint64_t digest; - PGSQL_QUERY_command PgQueryCmd; + char* query; char* digest_text; - uint64_t hash; - char *username; - char *dbname; - char *query; + char* first_comment; unsigned int query_length; - int ref_count_client; - int ref_count_server; uint64_t statement_id; - char* first_comment; - uint64_t total_mem_usage; + mutable std::atomic ref_count_client; + mutable std::atomic ref_count_server; Parse_Param_Types parse_param_types;// array of parameter types, used for prepared statements - - PgSQL_STMT_Global_info(uint64_t id, char* u, char* d, char* q, unsigned int ql, char* fc, Parse_Param_Types&& ppt, uint64_t _h); + char* username; + char* dbname; + uint64_t digest; + uint64_t hash; + uint64_t total_mem_usage; + PGSQL_QUERY_command PgQueryCmd; + + PgSQL_STMT_Global_info(uint64_t id, const char* _user, const char* _database, const char* _query, unsigned int _query_len, + Parse_Param_Types&& _ppt, const char* _first_comment, uint64_t _h); ~PgSQL_STMT_Global_info(); void calculate_mem_usage(); @@ -32,85 +35,284 @@ private: void compute_hash(); }; -class PgSQL_STMTs_local_v14 { +// class PgSQL_STMT_Local represents prepared statements local to a session/connection +class PgSQL_STMT_Local { public: - // this map associate client_stmt_id to global_stmt_id : this is used only for client connections - std::map stmt_name_to_global_ids; - // this multimap associate global_stmt_id to client_stmt_id : this is used only for client connections - std::multimap global_id_to_stmt_names; + explicit PgSQL_STMT_Local(bool _ic) : sess(nullptr), local_max_stmt_id(0), is_client_(_ic) { } + ~PgSQL_STMT_Local(); - // this map associate backend_stmt_id to global_stmt_id : this is used only for backend connections - std::map backend_stmt_to_global_ids; - // this map associate global_stmt_id to backend_stmt_id : this is used only for backend connections - std::map global_stmt_to_backend_ids; + inline void set_is_client(PgSQL_Session *_s) { sess=_s; is_client_ = true; } + inline bool is_client() const { return is_client_; } + inline unsigned int get_num_backend_stmts() const { return backend_stmt_to_global_info.size(); } + + /** + * Map client statement name to a global prepared statement. + * + * - If client statement (local_stmt_info_ptr) already references a different global stmt: decrement old refcount, + * increment new, and replace pointer. + * - If client statement (local_stmt_info_ptr) references the same stmt: no-op. + * - Otherwise: insert into stmt_name_to_global_info and increment client refcount. + * + * Parameters: + * stmt_info Global prepared statement (shared_ptr, must be valid). + * client_stmt_name Statement name from client scope. + * local_stmt_info_ptr Optional existing local pointer to update instead of map insert. + */ + void client_insert(std::shared_ptr& stmt_info, const std::string& client_stmt_name, + std::shared_ptr* local_stmt_info_ptr); - PgSQL_Session *sess; - PgSQL_STMTs_local_v14(bool _ic) : sess(NULL), is_client_(_ic) { } - ~PgSQL_STMTs_local_v14(); + /** + * @brief Find statement info by client statement name in stmt_name_to_global_info. + * + * Performs a map lookup using the provided client statement name and returns the + * associated PgSQL_STMT_Global_info pointer, or nullptr if not present. + */ + const PgSQL_STMT_Global_info* find_stmt_info_from_stmt_name(const std::string& client_stmt_name) const; - inline - void set_is_client(PgSQL_Session *_s) { - sess=_s; - is_client_ = true; - } - - inline - bool is_client() { return is_client_; } - inline - unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); } + /** + * Close a client-side prepared statement mapping by its name. + * + * - If the name exists: decrement the global statement's client refcount, + * remove the mapping, return true. + * - If not found: do nothing, return false. + * + * @param client_stmt_name Client statement identifier. + * @return true if a mapping was removed, false otherwise. + */ + bool client_close(const std::string& client_stmt_name); - void backend_insert(uint64_t global_stmt_id, uint32_t backend_stmt_id); - void client_insert(uint64_t global_stmt_id, const std::string& client_stmt_name); - uint64_t compute_hash(const char *user, const char *database, const char *query, unsigned int query_length, - const Parse_Param_Types& param_types); - uint32_t generate_new_backend_stmt_id(); - uint64_t find_global_id_from_stmt_name(const std::string& client_stmt_name); - uint32_t find_backend_stmt_id_from_global_id(uint64_t global_id); - bool client_close(const std::string& stmt_name); + /** + * Close all client-side prepared statement mappings. + * + * Decrements the client refcount for each associated global statement + * and clears the stmt_name_to_global_info map. + */ void client_close_all(); + /** + * Generate a new backend statement ID. + * + * @return A backend statement id. + */ + uint32_t generate_new_backend_stmt_id(); + + /** + * @brief Register a backend prepared statement mapping. + * + * Adds bidirectional associations: + * - backend_stmt_id -> global statement info + * - global statement_id -> backend_stmt_id + */ + void backend_insert(std::shared_ptr& stmt_info, uint32_t backend_stmt_id); + + /** + * @brief Find backend statement ID from global statement ID. + * + * Looks up the backend statement ID associated with the given global statement ID. + * + * @param global_id The global statement ID to look up. + * @return The associated backend statement ID, or 0 if not found. + */ + uint32_t find_backend_stmt_id_from_global_id(uint64_t global_id) const; + + /** + * @brief Compute a unique hash for a prepared statement. + * + * Combines user, database, query, and parameter types into a hash value. + * + * @param user The username. + * @param database The database name. + * @param query The SQL query string. + * @param query_length The length of the query string. + * @param param_types The parameter types for the prepared statement. + * @return A computed hash value representing the prepared statement. + */ + static uint64_t compute_hash(const char* user, const char* database, const char* query, + unsigned int query_length, const Parse_Param_Types& param_types); + private: - bool is_client_; + // this map associate client_stmt_id to global_stmt_info : this is used only for client connections + std::map> stmt_name_to_global_info; + + // this map associate backend_stmt_id to global_stmt_info : this is used only for backend connections + std::map> backend_stmt_to_global_info; + + // this map associate global_stmt_id to backend_stmt_id : this is used only for backend connections + std::map global_stmt_to_backend_ids; + + PgSQL_Session* sess = nullptr; + + // stack of free backend statement ids std::stack free_backend_ids; + + // maximum assigned statement id in this session uint32_t local_max_stmt_id = 0; -}; + // is client session ? + bool is_client_; + + friend class PgSQL_Session; +}; -class PgSQL_STMT_Manager_v14 { +// class PgSQL_STMT_Manager manages global prepared statements across all sessions +class PgSQL_STMT_Manager { public: - PgSQL_STMT_Manager_v14(); - ~PgSQL_STMT_Manager_v14(); - PgSQL_STMT_Global_info* find_prepared_statement_by_hash(uint64_t hash, bool lock=true); - PgSQL_STMT_Global_info* find_prepared_statement_by_stmt_id(uint64_t id, bool lock=true); - inline void rdlock() { pthread_rwlock_rdlock(&rwlock_); } - inline void wrlock() { pthread_rwlock_wrlock(&rwlock_); } - inline void unlock() { pthread_rwlock_unlock(&rwlock_); } - void ref_count_client(uint64_t _stmt, int _v, bool lock=true) noexcept; - void ref_count_server(uint64_t _stmt, int _v, bool lock=true) noexcept; - PgSQL_STMT_Global_info* add_prepared_statement(char *user, char *database, char *query, unsigned int query_len, - char *fc, Parse_Param_Types&& ppt, bool lock=true); + PgSQL_STMT_Manager(); + ~PgSQL_STMT_Manager(); + + /** + * @brief Lookup a prepared statement by its 64-bit hash. + * + * @param hash Unique hash computed from user, db, query, and parameter types. + * @param lock If true acquires/release internal read lock; set false only if caller already holds it. + * @return Shared pointer to global statement info or nullptr if not found. + */ + std::shared_ptr find_prepared_statement_by_hash(uint64_t hash, bool lock=true); + + /** + * @brief Retrieve existing or create a new global prepared statement. + * + * Computes a hash from user, database, query and parameter types. If an entry with that hash + * exists it is returned. Otherwise a new PgSQL_STMT_Global_info is allocated, assigned a + * statement_id (preferring reused IDs from free_stmt_ids), optional digest metadata copied, + * inserted into map_stmt_hash_to_info, and zero-refcount tracking counters updated. + * Always increments the server refcount for the returned statement. + * + * Concurrency: Acquires/release write lock when lock == true. + * + * @param user Client username (null-terminated). + * @param database Database/schema name. + * @param query SQL text. + * @param query_len Length of query. + * @param ppt Vector of parameter type OIDs (moved if new entry is created). + * @param first_comment First comment prefix (optional, may be null). + * @param digest_text Normalized digest text (optional, may be null). + * @param digest 64-bit digest value (used only if digest_text provided). + * @param PgQueryCmd Parsed command classification. + * @param lock If true, method manages locking internally. + * @return shared_ptr to the global prepared statement (never nullptr). + */ + std::shared_ptr add_prepared_statement(const char* user, const char* database, const char* query, + unsigned int query_len, Parse_Param_Types&& ppt, const char* first_comment, const char* digest_text, uint64_t digest, + PGSQL_QUERY_command PgQueryCmd, bool lock = true); + + /** + * @brief Adjust client refcount for a prepared statement and update per-thread stats. + * + * - Applies delta _v (positive or negative) atomically. + * - Maintains zero-refcount tracking counters on transitions to/from zero. + * - Triggers opportunistic purge heuristic after modification. + * + * Thread-safety: uses atomic operations; no external lock required. + * + * @param stmt_info Non-null pointer to global PS metadata. + * @param _v Delta to apply (typically +1 or -1). + */ + void ref_count_client(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept; + + /** + * @brief Adjust server refcount for a prepared statement and update per-thread stats. + * + * - Applies delta _v (positive or negative) atomically. + * - Maintains zero-refcount tracking counters on transitions to/from zero. + * + * Thread-safety: uses atomic operations; no external lock required. + * + * @param stmt_info Non-null pointer to global PS metadata. + * @param _v Delta to apply (typically +1 or -1). + */ + void ref_count_server(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept; + + /** + * @brief Retrieve global prepared statement metrics. + * + * Outputs: + * - c_unique: Number of unique prepared statements with client refcount > 0. + * - c_total: Total number of client references across all prepared statements. + * - stmt_max_stmt_id: Maximum assigned statement ID. + * - cached: Total number of unique prepared statements in the global cache. + * - s_unique: Number of unique prepared statements with server refcount > 0. + * - s_total: Total number of server references across all prepared statements. + */ void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached, uint64_t *s_unique, uint64_t *s_total); - SQLite3_result* get_prepared_statements_global_infos(); + + /** + * @brief Retrieve memory usage statistics for prepared statements. + * + * Outputs: + * - prep_stmt_metadata_mem_usage: Total memory used for prepared statement metadata. + * - prep_stmt_backend_mem_usage: Total memory used for backend prepared statement allocations. + */ void get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage); + /** + * @brief Build and return a snapshot of all global prepared statements. + * + * @return SQLite3_result* containing one row per prepared statement (must be freed by caller). + */ + SQLite3_result* get_prepared_statements_global_infos(); + private: + struct Statistics { + struct Totals { + uint64_t c_total; + uint64_t s_total; + } total; + uint64_t c_unique; + uint64_t s_unique; + uint64_t cached; + }; + + // map from statement hash to global prepared statement info + std::map> map_stmt_hash_to_info; // map using hashes + + // next statement ID to assign if no free IDs are available uint64_t next_statement_id; - uint64_t num_stmt_with_ref_client_count_zero; - uint64_t num_stmt_with_ref_server_count_zero; + + // counters to track number of statements with zero refcounts + std::atomic num_stmt_with_ref_client_count_zero; + + // counters to track number of statements with zero refcounts + std::atomic num_stmt_with_ref_server_count_zero; + + // read-write lock to protect map_stmt_hash_to_info and free_stmt_ids pthread_rwlock_t rwlock_; - std::map map_stmt_id_to_info; // map using statement id - std::map map_stmt_hash_to_info; // map using hashes + + // stack of freed statement IDs for reuses std::stack free_stmt_ids; - struct { - uint64_t c_unique; - uint64_t c_total; - uint64_t stmt_max_stmt_id; - uint64_t cached; - uint64_t s_unique; - uint64_t s_total; - } statuses; + + // last time we purged unused statements time_t last_purge_time; + + // to make lock free status counting per thread, we use thread local storage + inline static thread_local int thd_idx = -1; + std::atomic next_status_idx{}; + + // FIXME: should be equal to number of worker threads configured in ProxySQL + std::array stats_total{}; + + // get per thread statistics totals + inline + Statistics::Totals& get_current_thread_statistics_totals() noexcept { + if (thd_idx == -1) { + thd_idx = next_status_idx.fetch_add(1, std::memory_order_relaxed); + } + return stats_total[thd_idx]; + } + + // read-write lock wrappers + inline void rdlock() noexcept { pthread_rwlock_rdlock(&rwlock_); } + inline void wrlock() noexcept { pthread_rwlock_wrlock(&rwlock_); } + inline void unlock() noexcept { pthread_rwlock_unlock(&rwlock_); } + + /** + * @brief Opportunistically purge unused prepared statements from the global cache. + * + * Concurrency: + * - Fast lock-free pre-check avoids unnecessary write lock. + */ + void ref_count_client___purge_stmts_if_needed() noexcept; }; #endif /* CLASS_PGSQL_PREPARED_STATEMENT_H */ diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 8c391455a..55b156429 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -149,7 +149,7 @@ struct PgSQL_Extended_Query_Info { const char* stmt_client_name; const char* stmt_client_portal_name; const PgSQL_Bind_Message* bind_msg; - PgSQL_STMT_Global_info* stmt_info; + const PgSQL_STMT_Global_info* stmt_info; uint64_t stmt_global_id; uint32_t stmt_backend_id; uint8_t stmt_type; diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index a77da0f74..0a6d0e50d 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -179,7 +179,7 @@ PgSQL_Connection::PgSQL_Connection(bool is_client_conn) { options.init_connect = NULL; options.init_connect_sent = false; userinfo = new PgSQL_Connection_userinfo(); - local_stmts = new PgSQL_STMTs_local_v14(false); // false by default, it is a backend + local_stmts = new PgSQL_STMT_Local(false); // false by default, it is a backend //for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { // variables[i].value = NULL; @@ -2499,7 +2499,7 @@ void PgSQL_Connection::reset() { reusable = true; creation_time = monotonic_time(); delete local_stmts; - local_stmts = new PgSQL_STMTs_local_v14(false); + local_stmts = new PgSQL_STMT_Local(false); // reset all variables for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { diff --git a/lib/PgSQL_PreparedStatement.cpp b/lib/PgSQL_PreparedStatement.cpp index 2d6d7118b..e0621b1ae 100644 --- a/lib/PgSQL_PreparedStatement.cpp +++ b/lib/PgSQL_PreparedStatement.cpp @@ -9,7 +9,7 @@ #include "PgSQL_PreparedStatement.h" #include "PgSQL_Protocol.h" -extern PgSQL_STMT_Manager_v14 *GloPgStmt; +extern PgSQL_STMT_Manager *GloPgStmt; const int PS_GLOBAL_STATUS_FIELD_NUM = 8; @@ -63,25 +63,22 @@ void PgSQL_STMT_Global_info::compute_hash() { query_length, parse_param_types); } -PgSQL_STMT_Global_info::PgSQL_STMT_Global_info(uint64_t id, - char *u, char *d, char *q, - unsigned int ql, - char *fc, - Parse_Param_Types&& ppt, - uint64_t _h) { +PgSQL_STMT_Global_info::PgSQL_STMT_Global_info(uint64_t id, const char *_user, const char *_database, const char *_query, + unsigned int _query_len, Parse_Param_Types&& _ppt, + const char *_first_comment, uint64_t _h) { total_mem_usage = 0; statement_id = id; ref_count_client = 0; ref_count_server = 0; digest_text = nullptr; - username = strdup(u); - dbname = strdup(d); - query = (char *)malloc(ql + 1); - memcpy(query, q, ql); - query[ql] = '\0'; // add NULL byte - query_length = ql; - first_comment = fc ? strdup(fc) : nullptr; - parse_param_types = std::move(ppt); + username = strdup(_user); + dbname = strdup(_database); + query_length = _query_len; + query = (char *)malloc(query_length + 1); + memcpy(query, _query, query_length); + query[query_length] = '\0'; // add NULL byte + first_comment = _first_comment ? strdup(_first_comment) : nullptr; + parse_param_types = std::move(_ppt); PgQueryCmd = PGSQL_QUERY__UNINITIALIZED; if (_h) { @@ -104,8 +101,7 @@ PgSQL_STMT_Global_info::~PgSQL_STMT_Global_info() { } void PgSQL_STMT_Global_info::calculate_mem_usage() { - total_mem_usage = sizeof(PgSQL_STMT_Global_info) + - query_length + 1; + total_mem_usage = sizeof(PgSQL_STMT_Global_info) + query_length + 1; // NOSONAR: strlen is safe here if (username) total_mem_usage += strlen(username) + 1; // NOSONAR @@ -114,182 +110,221 @@ void PgSQL_STMT_Global_info::calculate_mem_usage() { if (digest_text) total_mem_usage += strlen(digest_text) + 1; // NOSONAR } -void PgSQL_STMTs_local_v14::backend_insert(uint64_t global_stmt_id, uint32_t backend_stmt_id) { - global_stmt_to_backend_ids.insert(std::make_pair(global_stmt_id, backend_stmt_id)); - backend_stmt_to_global_ids.insert(std::make_pair(backend_stmt_id,global_stmt_id)); +PgSQL_STMT_Local::~PgSQL_STMT_Local() { + // Note: we do not free the prepared statements because we assume that + // if we call this destructor the connection is being destroyed anyway + if (is_client_) { + for (auto it = stmt_name_to_global_info.begin(); it != stmt_name_to_global_info.end(); ++it) { + auto* stmt_info = it->second.get(); + GloPgStmt->ref_count_client(stmt_info, -1); + } + stmt_name_to_global_info.clear(); + } + else { + for (auto it = backend_stmt_to_global_info.begin(); it != backend_stmt_to_global_info.end(); ++it) { + auto* stmt_info = it->second.get(); + GloPgStmt->ref_count_server(stmt_info, -1); + } + backend_stmt_to_global_info.clear(); + } } -void PgSQL_STMTs_local_v14::client_insert(uint64_t global_stmt_id, const std::string& client_stmt_name) { - // validate that client_stmt_name is not empty and global_stmt_id is a valid id - [[maybe_unused]] auto [it, inserted] = stmt_name_to_global_ids.try_emplace(client_stmt_name, global_stmt_id); - assert(inserted && "client_stmt_name already exists in stmt_name_to_global_ids"); // Should not happen, as we expect unique client_stmt_name -#ifdef DEBUG - auto range = global_id_to_stmt_names.equal_range(global_stmt_id); - for (auto it = range.first; it != range.second; ++it) { - assert(it->second != client_stmt_name && "client_stmt_name is already mapped to global_stmt_id in global_id_to_stmt_names"); // Should not happen, as we expect unique client_stmt_name per global_stmt_id +void PgSQL_STMT_Local::client_insert(std::shared_ptr& stmt_info, + const std::string& client_stmt_name, std::shared_ptr* local_stmt_info_ptr) { + assert(stmt_info); + + if (local_stmt_info_ptr && (*local_stmt_info_ptr)) { + auto& local_stmt_info = *local_stmt_info_ptr; + if (local_stmt_info->statement_id == stmt_info->statement_id) + return; // no change + + // Adjust refcounts: decrement old, increment new + GloPgStmt->ref_count_client(local_stmt_info.get(), -1); + local_stmt_info.reset(); + + GloPgStmt->ref_count_client(stmt_info.get(), 1); + // Update existing entry to new global stmt info one + local_stmt_info = stmt_info; + return; } -#endif - global_id_to_stmt_names.emplace(global_stmt_id, client_stmt_name); - GloPgStmt->ref_count_client(global_stmt_id, 1, false); // do not lock! + + // New statement name — just insert + stmt_name_to_global_info.emplace(client_stmt_name, stmt_info); + GloPgStmt->ref_count_client(stmt_info.get(), 1); +} + +const PgSQL_STMT_Global_info* PgSQL_STMT_Local::find_stmt_info_from_stmt_name(const std::string& client_stmt_name) const { + const PgSQL_STMT_Global_info* ret = nullptr; + if (auto s = stmt_name_to_global_info.find(client_stmt_name); s != stmt_name_to_global_info.end()) { + ret = s->second.get(); + } + return ret; +} + +bool PgSQL_STMT_Local::client_close(const std::string& client_stmt_name) { + if (auto s = stmt_name_to_global_info.find(client_stmt_name); s != stmt_name_to_global_info.end()) { // found + const PgSQL_STMT_Global_info* stmt_info = s->second.get(); + GloPgStmt->ref_count_client(stmt_info, -1); + stmt_name_to_global_info.erase(s); + return true; + } + return false; // we don't really remove the prepared statement +} + +void PgSQL_STMT_Local::client_close_all() { + for (auto& [_, global_stmt_info] : stmt_name_to_global_info) { + GloPgStmt->ref_count_client(global_stmt_info.get(), -1); + } + stmt_name_to_global_info.clear(); } -uint64_t PgSQL_STMTs_local_v14::compute_hash(const char *user, +uint32_t PgSQL_STMT_Local::generate_new_backend_stmt_id() { + assert(is_client_ == false); + if (free_backend_ids.empty() == false) { + uint32_t backend_stmt_id = free_backend_ids.top(); + free_backend_ids.pop(); + return backend_stmt_id; + } + local_max_stmt_id++; + return local_max_stmt_id; +} + +void PgSQL_STMT_Local::backend_insert(std::shared_ptr& stmt_info, uint32_t backend_stmt_id) { + backend_stmt_to_global_info.insert(std::make_pair(backend_stmt_id, stmt_info)); + global_stmt_to_backend_ids.insert(std::make_pair(stmt_info->statement_id, backend_stmt_id)); +} + +uint32_t PgSQL_STMT_Local::find_backend_stmt_id_from_global_id(uint64_t global_id) const { + if (auto s = global_stmt_to_backend_ids.find(global_id); s != global_stmt_to_backend_ids.end()) { + return s->second; + } + return 0; // not found +} + +uint64_t PgSQL_STMT_Local::compute_hash(const char *user, const char *database, const char *query, unsigned int query_length, const Parse_Param_Types& param_types) { uint64_t hash = stmt_compute_hash(user, database, query, query_length, param_types); return hash; } -PgSQL_STMT_Manager_v14::PgSQL_STMT_Manager_v14() { +PgSQL_STMT_Manager::PgSQL_STMT_Manager() { last_purge_time = time(NULL); pthread_rwlock_init(&rwlock_, NULL); next_statement_id = 1; // we initialize this as 1 because we 0 is not allowed num_stmt_with_ref_client_count_zero = 0; num_stmt_with_ref_server_count_zero = 0; - statuses.c_unique = 0; - statuses.c_total = 0; - statuses.stmt_max_stmt_id = 0; - statuses.cached = 0; - statuses.s_unique = 0; - statuses.s_total = 0; + next_status_idx = 0; } -PgSQL_STMT_Manager_v14::~PgSQL_STMT_Manager_v14() { - for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) { - PgSQL_STMT_Global_info * a = it->second; - delete a; - } +PgSQL_STMT_Manager::~PgSQL_STMT_Manager() { + wrlock(); + map_stmt_hash_to_info.clear(); + unlock(); } -void PgSQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lock) noexcept { - if (lock) - wrlock(); - - if (auto s = map_stmt_id_to_info.find(_stmt_id); s != map_stmt_id_to_info.end()) { - statuses.c_total += _v; - PgSQL_STMT_Global_info *stmt_info = s->second; - if (stmt_info->ref_count_client == 0 && _v == 1) { - __sync_sub_and_fetch(&num_stmt_with_ref_client_count_zero,1); - } else { - if (stmt_info->ref_count_client == 1 && _v == -1) { - __sync_add_and_fetch(&num_stmt_with_ref_client_count_zero,1); - } - } - stmt_info->ref_count_client += _v; - time_t ct = time(NULL); - uint64_t num_client_count_zero = __sync_add_and_fetch(&num_stmt_with_ref_client_count_zero, 0); - uint64_t num_server_count_zero = __sync_add_and_fetch(&num_stmt_with_ref_server_count_zero, 0); - - size_t map_size = map_stmt_id_to_info.size(); - if ( - (ct > last_purge_time+1) && - (map_size > (unsigned)pgsql_thread___max_stmts_cache) && - (num_client_count_zero > map_size/10) && - (num_server_count_zero > map_size/10) - ) { // purge only if there is at least 10% gain - last_purge_time = ct; - int max_purge = map_size ; - std::vector torem; - torem.reserve(max_purge); - - for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) { - if (torem.size() >= std::min(static_cast(max_purge), - static_cast(num_client_count_zero))) { - break; - } - PgSQL_STMT_Global_info *a = it->second; - if ((__sync_add_and_fetch(&a->ref_count_client, 0) == 0) && - (a->ref_count_server == 0) ) // this to avoid that IDs are incorrectly reused - { - uint64_t hash = a->hash; - map_stmt_hash_to_info.erase(hash); - __sync_sub_and_fetch(&num_stmt_with_ref_client_count_zero,1); - torem.emplace_back(it->first); - } - } - while (!torem.empty()) { - uint64_t id = torem.back(); - torem.pop_back(); - auto s3 = map_stmt_id_to_info.find(id); - PgSQL_STMT_Global_info *a = s3->second; - if (a->ref_count_server == 0) { - __sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1); - free_stmt_ids.push(id); - } - map_stmt_id_to_info.erase(s3); - statuses.s_total -= a->ref_count_server; - delete a; - } - } +void PgSQL_STMT_Manager::ref_count_client___purge_stmts_if_needed() noexcept { + + /* Heuristic trigger(checked twice : before and after acquiring write lock) : + * 1. At least 1 second since last_purge_time. + * 2. map_stmt_hash_to_info.size() > pgsql_thread___max_stmts_cache. + * 3. >= 10% of entries have client refcount == 0. + * 4. >= 10% of entries have server refcount == 0. + */ + + time_t ct = time(NULL); + if (ct <= last_purge_time + 1) + return; // too soon, skip + + // --- Light pre-check without lock --- + size_t map_size = map_stmt_hash_to_info.size(); + uint64_t num_client_zero = num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed); + uint64_t num_server_zero = num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed); + + if (map_size <= (unsigned)pgsql_thread___max_stmts_cache || + num_client_zero <= map_size / 10 || + num_server_zero <= map_size / 10) { + // Heuristic says no purge needed + return; } - if (lock) - unlock(); -} -void PgSQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lock) noexcept { - if (lock) - wrlock(); - std::map::iterator s; - s = map_stmt_id_to_info.find(_stmt_id); - if (s != map_stmt_id_to_info.end()) { - statuses.s_total += _v; - PgSQL_STMT_Global_info *stmt_info = s->second; - if (stmt_info->ref_count_server == 0 && _v == 1) { - __sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1); - } else { - if (stmt_info->ref_count_server == 1 && _v == -1) { - __sync_add_and_fetch(&num_stmt_with_ref_server_count_zero,1); - } - } - stmt_info->ref_count_server += _v; - } - if (lock) + // --- Now we know we might purge, take write lock --- + wrlock(); + + // Double-check under exclusive lock (authoritative) + ct = time(NULL); + if (ct <= last_purge_time + 1) { unlock(); -} + return; + } -PgSQL_STMTs_local_v14::~PgSQL_STMTs_local_v14() { - // Note: we do not free the prepared statements because we assume that - // if we call this destructor the connection is being destroyed anyway + map_size = map_stmt_hash_to_info.size(); + num_client_zero = num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed); + num_server_zero = num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed); - if (is_client_) { - for (auto it = stmt_name_to_global_ids.begin(); - it != stmt_name_to_global_ids.end(); ++it) { - uint64_t global_stmt_id = it->second; - GloPgStmt->ref_count_client(global_stmt_id, -1); - } - } else { - for (auto it = backend_stmt_to_global_ids.begin(); - it != backend_stmt_to_global_ids.end(); ++it) { - uint64_t global_stmt_id = it->second; - GloPgStmt->ref_count_server(global_stmt_id, -1); - } + if (map_size <= (unsigned)pgsql_thread___max_stmts_cache || + num_client_zero <= map_size / 10 || + num_server_zero <= map_size / 10) { + last_purge_time = ct; + unlock(); + return; } -} + // --- Actual purge happens here under wrlock() --- + last_purge_time = ct; -PgSQL_STMT_Global_info *PgSQL_STMT_Manager_v14::find_prepared_statement_by_hash(uint64_t hash, bool lock) { - PgSQL_STMT_Global_info *ret = nullptr; // assume we do not find it - if (lock) { - rdlock(); - } - - if (auto s = map_stmt_hash_to_info.find(hash); s != map_stmt_hash_to_info.end()) { - ret = s->second; - } + //auto& stat_totals = get_current_thread_statistics_totals(); - if (lock) { - unlock(); + // Determine how many entries we are allowed to remove + size_t remaining_removals = std::min(map_size, static_cast(num_client_zero)); + + /* Purge logic : + * - Iterate statements while remaining_removals > 0. + * - Candidate removal only when shared_ptr use_count() == 1 (only the map holds it). + * - Return its statement_id to free_stmt_ids stack. + * - Decrement zero-refcount counters. + */ + for (auto it = map_stmt_hash_to_info.begin(); it != map_stmt_hash_to_info.end() && remaining_removals > 0; ) { + + auto& global_stmt_info = it->second; + + // use_count() == 1 indicates that only map_stmt_hash_to_info holds a reference, + // meaning there are no other references (from client or server) to this prepared statement. + // So we can safely remove this entry. + if (global_stmt_info.use_count() == 1) { + + // ref_count_client and ref_count_server should both be 0 in this case + assert(global_stmt_info->ref_count_client.load(std::memory_order_relaxed) == 0); + assert(global_stmt_info->ref_count_server.load(std::memory_order_relaxed) == 0); + + // Atomic counters + num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed); + num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed); + + // Free ID + free_stmt_ids.push(global_stmt_info->statement_id); + + // Update totals + //stat_totals.s_total -= global_stmt_info->ref_count_server.load(std::memory_order_relaxed); + + // Safe erase from map while iterating + it = map_stmt_hash_to_info.erase(it); + remaining_removals--; + } else { + ++it; + } } - return ret; + + unlock(); } -PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::find_prepared_statement_by_stmt_id( - uint64_t id, bool lock) { - PgSQL_STMT_Global_info*ret = nullptr; // assume we do not find it +std::shared_ptr PgSQL_STMT_Manager::find_prepared_statement_by_hash(uint64_t hash, bool lock) { + std::shared_ptr ret = nullptr; // assume we do not find it + if (lock) { rdlock(); } - if (auto s = map_stmt_id_to_info.find(id); s != map_stmt_id_to_info.end()) { + if (auto s = map_stmt_hash_to_info.find(hash); s != map_stmt_hash_to_info.end()) { ret = s->second; } @@ -299,64 +334,13 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::find_prepared_statement_by_stmt_ return ret; } -uint32_t PgSQL_STMTs_local_v14::generate_new_backend_stmt_id() { - assert(is_client_ == false); - if (free_backend_ids.empty() == false) { - uint32_t backend_stmt_id = free_backend_ids.top(); - free_backend_ids.pop(); - return backend_stmt_id; - } - local_max_stmt_id++; - return local_max_stmt_id; -} - -uint64_t PgSQL_STMTs_local_v14::find_global_id_from_stmt_name(const std::string& client_stmt_name) { - uint64_t ret=0; - if (auto s = stmt_name_to_global_ids.find(client_stmt_name); s != stmt_name_to_global_ids.end()) { - ret = s->second; - } - return ret; -} - -uint32_t PgSQL_STMTs_local_v14::find_backend_stmt_id_from_global_id(uint64_t global_id) { - if (auto s = global_stmt_to_backend_ids.find(global_id); s != global_stmt_to_backend_ids.end()) { - return s->second; - } - return 0; // not found -} - -bool PgSQL_STMTs_local_v14::client_close(const std::string& stmt_name) { - if (auto s = stmt_name_to_global_ids.find(stmt_name); s != stmt_name_to_global_ids.end()) { // found - uint64_t global_stmt_id = s->second; - stmt_name_to_global_ids.erase(s); - GloPgStmt->ref_count_client(global_stmt_id, -1); - std::pair::iterator, std::multimap::iterator> ret; - ret = global_id_to_stmt_names.equal_range(global_stmt_id); - for (std::multimap::iterator it=ret.first; it!=ret.second; ++it) { - if (it->second == stmt_name) { - global_id_to_stmt_names.erase(it); - break; - } - } - return true; - } - return false; // we don't really remove the prepared statement -} +std::shared_ptr PgSQL_STMT_Manager::add_prepared_statement(const char* user, const char* database, const char* query, + unsigned int query_len, Parse_Param_Types&& ppt, const char* first_comment, const char* digest_text, uint64_t digest, + PGSQL_QUERY_command PgQueryCmd, bool lock) { + std::shared_ptr ret = nullptr; -void PgSQL_STMTs_local_v14::client_close_all() { - for (auto [_, global_stmt_id] : stmt_name_to_global_ids) { - GloPgStmt->ref_count_client(global_stmt_id, -1); - } - stmt_name_to_global_ids.clear(); - global_id_to_stmt_names.clear(); -} + uint64_t hash = stmt_compute_hash(user, database, query, query_len, ppt); // this identifies the prepared statement -PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement( - char *u, char *d, char *q, unsigned int ql, - char *fc, Parse_Param_Types&& ppt, bool lock) { - PgSQL_STMT_Global_info *ret = nullptr; - uint64_t hash = stmt_compute_hash( - u, d, q, ql, ppt); // this identifies the prepared statement if (lock) { wrlock(); } @@ -374,105 +358,178 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement( next_statement_id++; } - auto stmt_info = std::make_unique(next_id, u, d, q, ql, fc, std::move(ppt), hash); - // insert it in both maps - map_stmt_id_to_info.insert(std::make_pair(stmt_info->statement_id, stmt_info.get())); - map_stmt_hash_to_info.insert(std::make_pair(stmt_info->hash, stmt_info.get())); - ret = stmt_info.release(); - __sync_add_and_fetch(&num_stmt_with_ref_client_count_zero,1); - __sync_add_and_fetch(&num_stmt_with_ref_server_count_zero,1); - } - if (ret->ref_count_server == 0) { - __sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1); + auto stmt_info = std::make_shared(next_id, user, database, query, query_len, std::move(ppt), first_comment, hash); + + if (digest_text) { + stmt_info->digest_text = strdup(digest_text); + stmt_info->digest = digest; // copy digest + stmt_info->PgQueryCmd = PgQueryCmd; // copy PgComQueryCmd + stmt_info->calculate_mem_usage(); + } + + ret = std::move(stmt_info); + + //map_stmt_hash_to_info[ret->hash] = ret; + map_stmt_hash_to_info.emplace(ret->hash, ret); + + num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed); + num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed); } - ret->ref_count_server++; - statuses.s_total++; + + // Server refcount increment logic + ref_count_server(ret.get(), 1); + if (lock) { unlock(); } return ret; } +void PgSQL_STMT_Manager::ref_count_client(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept { + assert(stmt_info); -void PgSQL_STMT_Manager_v14::get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage) { - prep_stmt_backend_mem_usage = 0; - prep_stmt_metadata_mem_usage = sizeof(PgSQL_STMT_Manager_v14); - rdlock(); - prep_stmt_metadata_mem_usage += map_stmt_id_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*)); - prep_stmt_metadata_mem_usage += map_stmt_hash_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*)); - prep_stmt_metadata_mem_usage += free_stmt_ids.size() * (sizeof(uint64_t)); - for (const auto&[key, value] : map_stmt_id_to_info) { - const PgSQL_STMT_Global_info* stmt_global_info = value; - prep_stmt_metadata_mem_usage += stmt_global_info->total_mem_usage; - prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_server * 16; // ~16 bytes of memory utilized by global_stmt_id and stmt_id mappings - prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_client * 40; // ~40 bytes of memory utilized by client_stmt_name and global_stmt_id mappings; + auto& stat_totals = get_current_thread_statistics_totals(); - // backend - prep_stmt_backend_mem_usage += stmt_global_info->ref_count_server; // FIXME: add backend memory usage + stat_totals.c_total += _v; + + if (_v == 1) { + // increment: relaxed is fine for performance + int prev = stmt_info->ref_count_client.fetch_add(1, std::memory_order_relaxed); + // if prev was 0 -> we transitioned 0 -> 1: one fewer zero-count entry + if (prev == 0) { + num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed); + } + } + else if (_v == -1) { + // decrement: use acq_rel to synchronize-with potential deleter + int prev = stmt_info->ref_count_client.fetch_sub(1, std::memory_order_acq_rel); + // prev is the value before subtraction + if (prev == 1) { + // we just transitioned to zero + num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed); + } + } + else { + // support other increments/decrements (if needed) + int prev = stmt_info->ref_count_client.fetch_add(_v, + (_v > 0) ? std::memory_order_relaxed : std::memory_order_acq_rel); + if (_v > 0 && prev == 0) num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed); + if (_v < 0 && prev + _v == 0) num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed); + } + + ref_count_client___purge_stmts_if_needed(); +} + +void PgSQL_STMT_Manager::ref_count_server(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept { + assert(stmt_info); + + auto& stat_totals = get_current_thread_statistics_totals(); + + stat_totals.s_total += _v; + + if (_v == 1) { + int prev = stmt_info->ref_count_server.fetch_add(1, std::memory_order_relaxed); + if (prev == 0) { + num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed); + } + } + else if (_v == -1) { + int prev = stmt_info->ref_count_server.fetch_sub(1, std::memory_order_acq_rel); + if (prev == 1) { + num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed); + } + } + else { + int prev = stmt_info->ref_count_server.fetch_add(_v, + (_v > 0) ? std::memory_order_relaxed : std::memory_order_acq_rel); + if (_v > 0 && prev == 0) num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed); + if (_v < 0 && prev + _v == 0) num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed); } - unlock(); } -void PgSQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total, - uint64_t *stmt_max_stmt_id, uint64_t *cached, - uint64_t *s_unique, uint64_t *s_total) { +void PgSQL_STMT_Manager::get_metrics(uint64_t* c_unique, uint64_t* c_total, uint64_t* stmt_max_stmt_id, uint64_t* cached, + uint64_t* s_unique, uint64_t* s_total) { + #ifdef DEBUG uint64_t c_u = 0; uint64_t c_t = 0; - uint64_t m = 0; + //uint64_t m = 0; uint64_t c = 0; uint64_t s_u = 0; uint64_t s_t = 0; #endif - wrlock(); - statuses.cached = map_stmt_id_to_info.size(); - statuses.c_unique = statuses.cached - num_stmt_with_ref_client_count_zero; - statuses.s_unique = statuses.cached - num_stmt_with_ref_server_count_zero; + Statistics stats{}; + + for (int i = 0; i < next_status_idx.load(std::memory_order_relaxed); ++i) { + stats.total.c_total += stats_total[i].c_total; + stats.total.s_total += stats_total[i].s_total; + } + + stats.cached = map_stmt_hash_to_info.size(); + stats.c_unique = stats.cached - num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed); + stats.s_unique = stats.cached - num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed); #ifdef DEBUG - for (std::map::iterator it = map_stmt_id_to_info.begin(); - it != map_stmt_id_to_info.end(); ++it) { - const PgSQL_STMT_Global_info *a = it->second; + rdlock(); + for (const auto& [_, value] : map_stmt_hash_to_info) { + const PgSQL_STMT_Global_info* a = value.get(); c++; - if (a->ref_count_client) { + if (a->ref_count_client.load(std::memory_order_relaxed)) { c_u++; - c_t += a->ref_count_client; + c_t += a->ref_count_client.load(std::memory_order_relaxed); } - if (a->ref_count_server) { + if (a->ref_count_server.load(std::memory_order_relaxed)) { s_u++; - s_t += a->ref_count_server; - } - if (it->first > m) { - m = it->first; + s_t += a->ref_count_server.load(std::memory_order_relaxed); } + //if (it->first > m) { + // m = it->first; + //} } - assert (c_u == statuses.c_unique); - assert (c_t == statuses.c_total); - assert (c == statuses.cached); - assert (s_t == statuses.s_total); - assert (s_u == statuses.s_unique); - *stmt_max_stmt_id = m; -#endif - *stmt_max_stmt_id = next_statement_id; // this is max stmt_id, no matter if in used or not - *c_unique = statuses.c_unique; - *c_total = statuses.c_total; - *cached = statuses.cached; - *s_total = statuses.s_total; - *s_unique = statuses.s_unique; unlock(); + assert(c_u == stats.c_unique); + assert(c_t == stats.total.c_total); + assert(c == stats.cached); + assert(s_t == stats.total.s_total); + assert(s_u == stats.s_unique); + //*stmt_max_stmt_id = m; +#endif + * stmt_max_stmt_id = next_statement_id; // this is max stmt_id, no matter if in used or not + *c_unique = stats.c_unique; + *c_total = stats.total.c_total; + *cached = stats.cached; + *s_total = stats.total.s_total; + *s_unique = stats.s_unique; } +void PgSQL_STMT_Manager::get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage) { + prep_stmt_backend_mem_usage = 0; + prep_stmt_metadata_mem_usage = sizeof(PgSQL_STMT_Manager); + rdlock(); + prep_stmt_metadata_mem_usage += map_stmt_hash_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*)); + prep_stmt_metadata_mem_usage += free_stmt_ids.size() * (sizeof(uint64_t)); + for (const auto& [_, value] : map_stmt_hash_to_info) { + const PgSQL_STMT_Global_info* stmt_global_info = value.get(); + prep_stmt_metadata_mem_usage += stmt_global_info->total_mem_usage; + prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_server * ((sizeof(uint64_t) * 2) + sizeof(std::shared_ptr)); + prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_client * (sizeof(std::string) + 16 + sizeof(std::shared_ptr)); + + // backend + prep_stmt_backend_mem_usage += stmt_global_info->ref_count_server; // FIXME: add backend memory usage + } + unlock(); +} class PgSQL_PS_global_stats { - public: +public: uint64_t statement_id; - char *username; - char *dbname; + char* username; + char* dbname; uint64_t digest; unsigned long long ref_count_client; unsigned long long ref_count_server; - char *query; + char* query; int num_param_types; - PgSQL_PS_global_stats(uint64_t stmt_id, const char *d, const char *u, uint64_t dig, const char *q, + PgSQL_PS_global_stats(uint64_t stmt_id, const char* d, const char* u, uint64_t dig, const char* q, unsigned long long ref_c, unsigned long long ref_s, int params) { statement_id = stmt_id; digest = dig; @@ -484,38 +541,38 @@ class PgSQL_PS_global_stats { num_param_types = params; } ~PgSQL_PS_global_stats() { - if (query) + if (query) free(query); if (username) free(username); if (dbname) free(dbname); } - char **get_row() { + char** get_row() { char buf[128]; - char **pta=(char **)malloc(sizeof(char *)*PS_GLOBAL_STATUS_FIELD_NUM); - snprintf(buf,sizeof(buf),"%lu",statement_id); - pta[0]=strdup(buf); + char** pta = (char**)malloc(sizeof(char*) * PS_GLOBAL_STATUS_FIELD_NUM); + snprintf(buf, sizeof(buf), "%lu", statement_id); + pta[0] = strdup(buf); assert(dbname); - pta[1]=strdup(dbname); + pta[1] = strdup(dbname); assert(username); - pta[2]=strdup(username); - snprintf(buf,sizeof(buf),"0x%016llX", (long long unsigned int)digest); - pta[3]=strdup(buf); + pta[2] = strdup(username); + snprintf(buf, sizeof(buf), "0x%016llX", (long long unsigned int)digest); + pta[3] = strdup(buf); assert(query); - pta[4]=strdup(query); - snprintf(buf,sizeof(buf),"%llu",ref_count_client); - pta[5]=strdup(buf); - snprintf(buf,sizeof(buf),"%llu",ref_count_server); - pta[6]=strdup(buf); - snprintf(buf,sizeof(buf),"%d",num_param_types); - pta[7]=strdup(buf); + pta[4] = strdup(query); + snprintf(buf, sizeof(buf), "%llu", ref_count_client); + pta[5] = strdup(buf); + snprintf(buf, sizeof(buf), "%llu", ref_count_server); + pta[6] = strdup(buf); + snprintf(buf, sizeof(buf), "%d", num_param_types); + pta[7] = strdup(buf); return pta; } - void free_row(char **pta) { + void free_row(char** pta) { int i; - for (i=0;i(PS_GLOBAL_STATUS_FIELD_NUM); - result->add_column_definition(SQLITE_TEXT,"stmt_id"); - result->add_column_definition(SQLITE_TEXT,"database"); - result->add_column_definition(SQLITE_TEXT,"username"); - result->add_column_definition(SQLITE_TEXT,"digest"); - result->add_column_definition(SQLITE_TEXT,"query"); - result->add_column_definition(SQLITE_TEXT,"ref_count_client"); - result->add_column_definition(SQLITE_TEXT,"ref_count_server"); - result->add_column_definition(SQLITE_TEXT,"num_param_types"); + result->add_column_definition(SQLITE_TEXT, "stmt_id"); + result->add_column_definition(SQLITE_TEXT, "database"); + result->add_column_definition(SQLITE_TEXT, "username"); + result->add_column_definition(SQLITE_TEXT, "digest"); + result->add_column_definition(SQLITE_TEXT, "query"); + result->add_column_definition(SQLITE_TEXT, "ref_count_client"); + result->add_column_definition(SQLITE_TEXT, "ref_count_server"); + result->add_column_definition(SQLITE_TEXT, "num_param_types"); rdlock(); - for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) { - PgSQL_STMT_Global_info *stmt_global_info = it->second; + for (auto it = map_stmt_hash_to_info.begin(); it != map_stmt_hash_to_info.end(); ++it) { + const PgSQL_STMT_Global_info* stmt_global_info = it->second.get(); auto pgs = std::make_unique(stmt_global_info->statement_id, stmt_global_info->dbname, stmt_global_info->username, stmt_global_info->hash, stmt_global_info->query, - stmt_global_info->ref_count_client, stmt_global_info->ref_count_server, stmt_global_info->parse_param_types.size()); - char **pta = pgs->get_row(); + stmt_global_info->ref_count_client.load(std::memory_order_relaxed), stmt_global_info->ref_count_server.load(std::memory_order_relaxed), + stmt_global_info->parse_param_types.size()); + char** pta = pgs->get_row(); result->add_row(pta); pgs->free_row(pta); } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 46992a941..695f03595 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -89,7 +89,7 @@ extern PgSQL_Authentication* GloPgAuth; extern MySQL_LDAP_Authentication* GloMyLdapAuth; extern ProxySQL_Admin* GloAdmin; extern PgSQL_Logger* GloPgSQL_Logger; -extern PgSQL_STMT_Manager_v14* GloPgStmt; +extern PgSQL_STMT_Manager* GloPgStmt; extern SQLite3_Server* GloSQLite3Server; @@ -506,7 +506,10 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { } //j["conn"]["no_backslash_escapes"] = client_myds->myconn->options.no_backslash_escapes; //j["conn"]["status"]["compression"] = client_myds->myconn->get_status(STATUS_PGSQL_CONNECTION_COMPRESSION); - j["conn"]["ps"]["stmt_name_to_global_ids"] = client_myds->myconn->local_stmts->stmt_name_to_global_ids; + json& stmt_name_to_global_ids = j["client"]["ps"]["stmt_name_to_global_ids"]; + for (const auto& [stmt_name, global_stmt_info] : client_myds->myconn->local_stmts->stmt_name_to_global_info) { + stmt_name_to_global_ids[stmt_name.c_str()] = global_stmt_info->statement_id; + } //j["conn"]["ps"]["global_id_to_stmt_names"] = client_myds->myconn->local_stmts->global_id_to_stmt_names; const PgSQL_Conn_Param& conn_params = client_myds->myconn->conn_params; @@ -550,11 +553,8 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { j["backends"][i]["conn"]["questions"] = _myconn->statuses.questions; j["backends"][i]["conn"]["pgconnpoll_get"] = _myconn->statuses.pgconnpoll_get; j["backends"][i]["conn"]["pgconnpoll_put"] = _myconn->statuses.pgconnpoll_put; - //j["backend"][i]["conn"]["charset"] = _myds->myconn->options.charset; // not used for backend - //j["backends"][i]["conn"]["session_track_gtids"] = (_myconn->options.session_track_gtids ? _myconn->options.session_track_gtids : ""); j["backends"][i]["conn"]["init_connect"] = (_myconn->options.init_connect ? _myconn->options.init_connect : ""); j["backends"][i]["conn"]["init_connect_sent"] = _myds->myconn->options.init_connect_sent; - //j["backends"][i]["conn"]["standard_conforming_strings"] = _myconn->options.no_backslash_escapes; j["backends"][i]["conn"]["status"]["advisory_lock"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_ADVISORY_LOCK); j["backends"][i]["conn"]["status"]["advisory_xact_lock"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_ADVISORY_XACT_LOCK); j["backends"][i]["conn"]["status"]["lock_tables"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_LOCK_TABLES); @@ -578,12 +578,13 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) { } j["backends"][i]["conn"]["MultiplexDisabled_ext"] = multiplex_disabled; } - j["backends"][i]["conn"]["ps"]["backend_stmt_to_global_ids"] = _myconn->local_stmts->backend_stmt_to_global_ids; + + json& backend_stmt_to_global_ids = j["backends"][i]["conn"]["ps"]["backend_stmt_to_global_ids"]; + for (const auto& [backend_stmt, global_stmt_info] : client_myds->myconn->local_stmts->backend_stmt_to_global_info) { + backend_stmt_to_global_ids[backend_stmt] = global_stmt_info->statement_id; + } + j["backends"][i]["conn"]["ps"]["global_stmt_to_backend_ids"] = _myconn->local_stmts->global_stmt_to_backend_ids; - //j["backends"][i]["conn"]["client_flag"]["value"] = _myconn->options.client_flag; - //j["backends"][i]["conn"]["client_flag"]["client_found_rows"] = (_myconn->options.client_flag & CLIENT_FOUND_ROWS ? 1 : 0); - //j["backends"][i]["conn"]["client_flag"]["client_multi_statements"] = (_myconn->options.client_flag & CLIENT_MULTI_STATEMENTS ? 1 : 0); - //j["backends"][i]["conn"]["client_flag"]["client_deprecate_eof"] = (_myconn->options.client_flag & CLIENT_DEPRECATE_EOF ? 1 : 0); if (_myconn->is_connected()) { sprintf(buff, "%p", _myconn->get_pg_connection()); j["backends"][i]["conn"]["pgsql"]["address"] = buff; @@ -5643,36 +5644,31 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg } } - // if the same statement name is used, we drop it - PgSQL_STMTs_local_v14* local_stmts = client_myds->myconn->local_stmts; - std::string stmt_name(extended_query_info.stmt_client_name); + // If a client provides a statement name that already exists in the local map, + // validate whether it can be reused. Only the *unnamed* statement ("") may be redefined. + PgSQL_STMT_Local* local_stmts = client_myds->myconn->local_stmts; + std::string client_stmt_name(extended_query_info.stmt_client_name); - if (auto it = local_stmts->stmt_name_to_global_ids.find(stmt_name); - it != local_stmts->stmt_name_to_global_ids.end()) { + // Try to find an existing statement entry for this name in Local map + auto local_stmt_info_itr = local_stmts->stmt_name_to_global_info.find(client_stmt_name); - if (!stmt_name.empty()) { - const std::string& errmsg = "prepared statement \"" + stmt_name + "\" already exist"; + // ---------------------------------------------------------------------- + // 1) Reject redefinition of a named prepared statement + // (only the empty statement name is allowed to be overwritten). + // ---------------------------------------------------------------------- + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + if (!client_stmt_name.empty()) { + const std::string& errmsg = "prepared statement \"" + client_stmt_name + "\" already exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_DUPLICATE_PSTATEMENT, errmsg.c_str(), false); l_free(parse_pkt.size, parse_pkt.ptr); return 2; } - - uint64_t global_id = it->second; - auto range = local_stmts->global_id_to_stmt_names.equal_range(global_id); - - for (auto iter = range.first; iter != range.second; ++iter) { - if (iter->second == stmt_name) { - local_stmts->global_id_to_stmt_names.erase(iter); - break; - } - } - - local_stmts->stmt_name_to_global_ids.erase(it); - local_stmts->client_close(stmt_name); } - // Hash the query + // ---------------------------------------------------------------------- + // 2) Compute hash for current query + // ---------------------------------------------------------------------- uint64_t hash = local_stmts->compute_hash( client_myds->myconn->userinfo->username, client_myds->myconn->userinfo->dbname, @@ -5681,24 +5677,63 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg CurrentQuery.extended_query_info.parse_param_types ); - // Check global statement cache - GloPgStmt->wrlock(); - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash, false); + // ---------------------------------------------------------------------- + // 3) Local-cache fast path: + // If the client already prepared this same SQL under the same name, + // and the hash matches, reuse the existing global statement. + // ---------------------------------------------------------------------- + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + auto& local_stmt_info = local_stmt_info_itr->second; + + // Exact match found; treat as a parse success and continue normally. + if (local_stmt_info && local_stmt_info->hash == hash) { + extended_query_info.stmt_global_id = local_stmt_info->statement_id; + client_myds->setDSS_STATE_QUERY_SENT_NET(); + char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I'; + bool send_ready_packet = is_extended_query_ready_for_query(); + client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state); + RequestEnd(NULL, false); + l_free(parse_pkt.size, parse_pkt.ptr); + return 0; + } + } + + // ---------------------------------------------------------------------- + // 4) Global-cache lookup: + // If another session already prepared an identical statement (same hash), + // link the local name to that shared global entry. + // ---------------------------------------------------------------------- + auto stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash); if (stmt_info) { - local_stmts->client_insert(stmt_info->statement_id, stmt_name); + std::shared_ptr* local_stmt_info_ptr = nullptr; + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + local_stmt_info_ptr = &local_stmt_info_itr->second; // reference to shared_ptr inside map + } + local_stmts->client_insert(stmt_info, client_stmt_name, local_stmt_info_ptr); extended_query_info.stmt_global_id = stmt_info->statement_id; - GloPgStmt->unlock(); client_myds->setDSS_STATE_QUERY_SENT_NET(); char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I'; bool send_ready_packet = is_extended_query_ready_for_query(); client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state); - //LogQuery(nullptr); - //CurrentQuery.end_time = thread->curtime; RequestEnd(NULL, false); l_free(parse_pkt.size, parse_pkt.ptr); return 0; } - GloPgStmt->unlock(); + + // ---------------------------------------------------------------------- + // 5) The local name is being reused but does not match the SQL/hash. + // Clean up the old entry before creating a new global statement later. + // ---------------------------------------------------------------------- + if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) { + auto& local_stmt_info = local_stmt_info_itr->second; + + // Decrement global reference and remove stale local pointer + if (local_stmt_info) { + GloPgStmt->ref_count_client(local_stmt_info.get(), -1); + local_stmt_info.reset(); + } + local_stmts->stmt_name_to_global_info.erase(local_stmt_info_itr); + } if (extended_query_frame.empty() == true) { extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC; @@ -5774,29 +5809,21 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des } assert(stmt_client_name); - uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); - if (stmt_global_id == 0) { + // Look up an existing local statement info for client-provided statement name + const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name); + if (!stmt_info) { const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : "unnamed prepared statement does not exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); return 2; } - // now we get the statement information - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id); - if (stmt_info == NULL) { - // we couldn't find it - const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : - "unnamed prepared statement does not exist"; - handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); - return 2; - } // describe_msg memory will be freed in pgsql_real_query.end() // CurrentQuery.stmt_client_name may briefly become a dangling pointer until CurrentQuery.end() is invoked PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; extended_query_info.stmt_client_name = stmt_client_name; extended_query_info.stmt_client_portal_name = portal_name; - extended_query_info.stmt_global_id = stmt_global_id; + extended_query_info.stmt_global_id = stmt_info->statement_id; extended_query_info.stmt_info = stmt_info; extended_query_info.stmt_type = stmt_type; CurrentQuery.start_time = thread->curtime; @@ -5921,18 +5948,9 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) { return 2; } - uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); - if (stmt_global_id == 0) { - const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : - "unnamed prepared statement does not exist"; - handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); - return 2; - } - - // now we get the statement information - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id); - if (stmt_info == NULL) { - // we couldn't find it + // Look up an existing local statement info for client-provided statement name + const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name); + if (!stmt_info) { const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : "unnamed prepared statement does not exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); @@ -5942,7 +5960,7 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) { PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; extended_query_info.stmt_client_name = stmt_client_name; extended_query_info.stmt_client_portal_name = portal_name; - extended_query_info.stmt_global_id = stmt_global_id; + extended_query_info.stmt_global_id = stmt_info->statement_id; extended_query_info.stmt_info = stmt_info; CurrentQuery.start_time = thread->curtime; @@ -6033,18 +6051,10 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu // bind_waiting_for_execute will be released on CurrentQuery.end() call or session destory const char* stmt_client_name = bind_waiting_for_execute->data().stmt_name; - uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); - if (stmt_global_id == 0) { - const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : - "unnamed prepared statement does not exist"; - handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); - return 2; - } - // now we get the statement information - PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id); - if (stmt_info == NULL) { - // we couldn't find it + // Look up an existing local statement info for client-provided statement name + const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name); + if (!stmt_info) { const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : "unnamed prepared statement does not exist"; handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); @@ -6054,7 +6064,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; extended_query_info.stmt_client_portal_name = portal_name; extended_query_info.stmt_client_name = stmt_client_name; - extended_query_info.stmt_global_id = stmt_global_id; + extended_query_info.stmt_global_id = stmt_info->statement_id; extended_query_info.stmt_info = stmt_info; extended_query_info.bind_msg = bind_waiting_for_execute.get(); extended_query_info.flags |= execute_msg->send_describe_portal_result ? @@ -6371,32 +6381,24 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_E bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& st, PgSQL_Data_Stream* myds) { thread->status_variables.stvar[st_var_backend_stmt_prepare]++; - uint64_t global_stmtid; - - PgSQL_STMT_Global_info* stmt_info = NULL; - GloPgStmt->wrlock(); - stmt_info = GloPgStmt->add_prepared_statement( - (char*)client_myds->myconn->userinfo->username, - (char*)client_myds->myconn->userinfo->dbname, - (char*)CurrentQuery.QueryPointer, + + auto stmt_info = GloPgStmt->add_prepared_statement( + client_myds->myconn->userinfo->username, + client_myds->myconn->userinfo->dbname, + (const char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength, - CurrentQuery.QueryParserArgs.first_comment, std::move(CurrentQuery.extended_query_info.parse_param_types), - false); + CurrentQuery.QueryParserArgs.first_comment, + CurrentQuery.QueryParserArgs.digest_text, + CurrentQuery.QueryParserArgs.digest, + CurrentQuery.PgQueryCmd + ); assert(stmt_info); // GloPgStmt->add_prepared_statement() should always return a valid pointer - if (CurrentQuery.QueryParserArgs.digest_text) { - if (stmt_info->digest_text == NULL) { - stmt_info->digest_text = strdup(CurrentQuery.QueryParserArgs.digest_text); - stmt_info->digest = CurrentQuery.QueryParserArgs.digest; // copy digest - stmt_info->PgQueryCmd = CurrentQuery.PgQueryCmd; // copy PgComQueryCmd - stmt_info->calculate_mem_usage(); - } - } + PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; - extended_query_info.stmt_info = stmt_info; - global_stmtid = stmt_info->statement_id; - - myds->myconn->local_stmts->backend_insert(global_stmtid, extended_query_info.stmt_backend_id); + extended_query_info.stmt_info = stmt_info.get(); + + myds->myconn->local_stmts->backend_insert(stmt_info, extended_query_info.stmt_backend_id); st = status; if (previous_status.empty() == false) { @@ -6405,15 +6407,18 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s myds->DSS = STATE_MARIADB_GENERIC; st = previous_status.top(); previous_status.pop(); - GloPgStmt->unlock(); + return true; } // We only perform the client_insert when there is no previous status, this // is, when 'PROCESSING_STMT_PREPARE' is reached directly without transitioning from a previous status // like 'PROCESSING_STMT_EXECUTE'. assert(extended_query_info.stmt_client_name); - client_myds->myconn->local_stmts->client_insert(global_stmtid, extended_query_info.stmt_client_name); - GloPgStmt->unlock(); +#ifdef DEBUG + auto* stmt_info_dbg = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(extended_query_info.stmt_client_name); + assert(stmt_info_dbg == nullptr); +#endif + client_myds->myconn->local_stmts->client_insert(stmt_info, extended_query_info.stmt_client_name, nullptr); return false; } diff --git a/lib/ProxySQL_Admin_Stats.cpp b/lib/ProxySQL_Admin_Stats.cpp index 9c7c84a2e..1f8b500cd 100644 --- a/lib/ProxySQL_Admin_Stats.cpp +++ b/lib/ProxySQL_Admin_Stats.cpp @@ -40,7 +40,7 @@ extern ProxySQL_Admin *GloAdmin; extern MySQL_Threads_Handler *GloMTH; extern PgSQL_Threads_Handler* GloPTH; extern MySQL_STMT_Manager_v14 *GloMyStmt; -extern PgSQL_STMT_Manager_v14* GloPgStmt; +extern PgSQL_STMT_Manager* GloPgStmt; extern MySQL_Query_Processor* GloMyQPro; extern PgSQL_Query_Processor* GloPgQPro; extern ProxySQL_Cluster *GloProxyCluster; diff --git a/src/main.cpp b/src/main.cpp index 52ee9ef4a..aa78d0f79 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -479,7 +479,7 @@ MySQL_Threads_Handler *GloMTH = NULL; PgSQL_Threads_Handler* GloPTH = NULL; Web_Interface *GloWebInterface; MySQL_STMT_Manager_v14 *GloMyStmt; -PgSQL_STMT_Manager_v14 *GloPgStmt; +PgSQL_STMT_Manager *GloPgStmt; MySQL_Monitor *GloMyMon; PgSQL_Monitor *GloPgMon; @@ -923,7 +923,7 @@ void ProxySQL_Main_init_main_modules() { GloPgSQL_Logger = new PgSQL_Logger(); GloPgSQL_Logger->print_version(); GloMyStmt=new MySQL_STMT_Manager_v14(); - GloPgStmt=new PgSQL_STMT_Manager_v14(); + GloPgStmt=new PgSQL_STMT_Manager(); PgHGM = new PgSQL_HostGroups_Manager(); PgHGM->init(); PgSQL_Threads_Handler* _tmp_GloPTH = NULL;