Refactor: Improved Prepared-Statement Cache Design (Lock-Free Hot Path) #5211

Concurrency and Memory Management
* Lock-Free Ref Counting: Replaced global mutex-protected integer reference counts with `std::atomic<uint32_t>` within `PgSQL_STMT_Global_info`, eliminating lock contention during statement referencing.
* Modern Ownership: Adopted std::shared_ptr<const PgSQL_STMT_Global_info> for global and local storage, providing automatic, thread-safe memory and lifecycle management.
* Memory Optimization: Removed redundant auxiliary maps `global_id_to_stmt_names` and `map_stmt_id_to_info` from local and global statement managers respectively, reducing overall memory overhead.
* Optimized Purging: Statement removal logic was simplified for efficiently identifying and cleaning up unused statements.

Hot Path Performance (`BIND`, `DESCRIBE`, `EXECUTE`)
* Bypassed Global Lookups: Local session maps now store the `shared_ptr` directly, removing the need to acquire the global lock and search the global map during hot path operations.
* Direct Refcount Manipulation: Refcount modification functions now operate directly on the passed statement object, eliminating the overhead of searching the global map to find the object pointer based on statement id.

Safety and Protocol Logic (`PARSE`)
* Efficient Statement Reuse: Implemented a **local fast path** check for the unnamed statement (`""`), allowing immediate reuse of an identical query (same hash) upon re-parse, which bypasses global processing and locks.

Cleanup
* Cleaned up and class rename `PgSQL_STMT_Manager_v14` -> `PgSQL_STMT_Manager`.
v3.0_refactor_prepared_statement_cache_design_5211
Rahim Kanji 3 months ago
parent d84444724a
commit c0f99c0e15

@ -12,7 +12,7 @@
class PgSQL_SrvC;
class PgSQL_Query_Result;
class PgSQL_STMTs_local_v14;
class PgSQL_STMT_Local;
//class PgSQL_Describe_Prepared_Info;
class PgSQL_Bind_Info;
//#define STATUS_PGSQL_CONNECTION_SEQUENCE 0x00000001
@ -639,7 +639,7 @@ public:
bool exit_pipeline_mode; // true if it is safe to exit pipeline mode
bool resync_failed; // true if the last resync attempt failed
PgSQL_STMTs_local_v14* local_stmts;
PgSQL_STMT_Local* local_stmts;
PgSQL_SrvC *parent;
PgSQL_Connection_userinfo* userinfo;
PgSQL_Data_Stream* myds;

@ -4,27 +4,30 @@
#include "proxysql.h"
#include "cpp.h"
static constexpr uint16_t PGSQL_MAX_THREADS = 255;
// class PgSQL_STMT_Global_info represents information about a PgSQL Prepared Statement
// it is an internal representation of prepared statement
// it include all metadata associated with it
class PgSQL_STMT_Global_info {
public:
uint64_t digest;
PGSQL_QUERY_command PgQueryCmd;
char* query;
char* digest_text;
uint64_t hash;
char *username;
char *dbname;
char *query;
char* first_comment;
unsigned int query_length;
int ref_count_client;
int ref_count_server;
uint64_t statement_id;
char* first_comment;
uint64_t total_mem_usage;
mutable std::atomic<uint32_t> ref_count_client;
mutable std::atomic<uint32_t> ref_count_server;
Parse_Param_Types parse_param_types;// array of parameter types, used for prepared statements
PgSQL_STMT_Global_info(uint64_t id, char* u, char* d, char* q, unsigned int ql, char* fc, Parse_Param_Types&& ppt, uint64_t _h);
char* username;
char* dbname;
uint64_t digest;
uint64_t hash;
uint64_t total_mem_usage;
PGSQL_QUERY_command PgQueryCmd;
PgSQL_STMT_Global_info(uint64_t id, const char* _user, const char* _database, const char* _query, unsigned int _query_len,
Parse_Param_Types&& _ppt, const char* _first_comment, uint64_t _h);
~PgSQL_STMT_Global_info();
void calculate_mem_usage();
@ -32,85 +35,284 @@ private:
void compute_hash();
};
class PgSQL_STMTs_local_v14 {
// class PgSQL_STMT_Local represents prepared statements local to a session/connection
class PgSQL_STMT_Local {
public:
// this map associate client_stmt_id to global_stmt_id : this is used only for client connections
std::map<std::string, uint64_t> stmt_name_to_global_ids;
// this multimap associate global_stmt_id to client_stmt_id : this is used only for client connections
std::multimap<uint64_t, std::string> global_id_to_stmt_names;
explicit PgSQL_STMT_Local(bool _ic) : sess(nullptr), local_max_stmt_id(0), is_client_(_ic) { }
~PgSQL_STMT_Local();
// this map associate backend_stmt_id to global_stmt_id : this is used only for backend connections
std::map<uint32_t, uint64_t> backend_stmt_to_global_ids;
// this map associate global_stmt_id to backend_stmt_id : this is used only for backend connections
std::map<uint64_t, uint32_t> global_stmt_to_backend_ids;
inline void set_is_client(PgSQL_Session *_s) { sess=_s; is_client_ = true; }
inline bool is_client() const { return is_client_; }
inline unsigned int get_num_backend_stmts() const { return backend_stmt_to_global_info.size(); }
/**
* Map client statement name to a global prepared statement.
*
* - If client statement (local_stmt_info_ptr) already references a different global stmt: decrement old refcount,
* increment new, and replace pointer.
* - If client statement (local_stmt_info_ptr) references the same stmt: no-op.
* - Otherwise: insert into stmt_name_to_global_info and increment client refcount.
*
* Parameters:
* stmt_info Global prepared statement (shared_ptr, must be valid).
* client_stmt_name Statement name from client scope.
* local_stmt_info_ptr Optional existing local pointer to update instead of map insert.
*/
void client_insert(std::shared_ptr<const PgSQL_STMT_Global_info>& stmt_info, const std::string& client_stmt_name,
std::shared_ptr<const PgSQL_STMT_Global_info>* local_stmt_info_ptr);
PgSQL_Session *sess;
PgSQL_STMTs_local_v14(bool _ic) : sess(NULL), is_client_(_ic) { }
~PgSQL_STMTs_local_v14();
/**
* @brief Find statement info by client statement name in stmt_name_to_global_info.
*
* Performs a map lookup using the provided client statement name and returns the
* associated PgSQL_STMT_Global_info pointer, or nullptr if not present.
*/
const PgSQL_STMT_Global_info* find_stmt_info_from_stmt_name(const std::string& client_stmt_name) const;
inline
void set_is_client(PgSQL_Session *_s) {
sess=_s;
is_client_ = true;
}
inline
bool is_client() { return is_client_; }
inline
unsigned int get_num_backend_stmts() { return backend_stmt_to_global_ids.size(); }
/**
* Close a client-side prepared statement mapping by its name.
*
* - If the name exists: decrement the global statement's client refcount,
* remove the mapping, return true.
* - If not found: do nothing, return false.
*
* @param client_stmt_name Client statement identifier.
* @return true if a mapping was removed, false otherwise.
*/
bool client_close(const std::string& client_stmt_name);
void backend_insert(uint64_t global_stmt_id, uint32_t backend_stmt_id);
void client_insert(uint64_t global_stmt_id, const std::string& client_stmt_name);
uint64_t compute_hash(const char *user, const char *database, const char *query, unsigned int query_length,
const Parse_Param_Types& param_types);
uint32_t generate_new_backend_stmt_id();
uint64_t find_global_id_from_stmt_name(const std::string& client_stmt_name);
uint32_t find_backend_stmt_id_from_global_id(uint64_t global_id);
bool client_close(const std::string& stmt_name);
/**
* Close all client-side prepared statement mappings.
*
* Decrements the client refcount for each associated global statement
* and clears the stmt_name_to_global_info map.
*/
void client_close_all();
/**
* Generate a new backend statement ID.
*
* @return A backend statement id.
*/
uint32_t generate_new_backend_stmt_id();
/**
* @brief Register a backend prepared statement mapping.
*
* Adds bidirectional associations:
* - backend_stmt_id -> global statement info
* - global statement_id -> backend_stmt_id
*/
void backend_insert(std::shared_ptr<const PgSQL_STMT_Global_info>& stmt_info, uint32_t backend_stmt_id);
/**
* @brief Find backend statement ID from global statement ID.
*
* Looks up the backend statement ID associated with the given global statement ID.
*
* @param global_id The global statement ID to look up.
* @return The associated backend statement ID, or 0 if not found.
*/
uint32_t find_backend_stmt_id_from_global_id(uint64_t global_id) const;
/**
* @brief Compute a unique hash for a prepared statement.
*
* Combines user, database, query, and parameter types into a hash value.
*
* @param user The username.
* @param database The database name.
* @param query The SQL query string.
* @param query_length The length of the query string.
* @param param_types The parameter types for the prepared statement.
* @return A computed hash value representing the prepared statement.
*/
static uint64_t compute_hash(const char* user, const char* database, const char* query,
unsigned int query_length, const Parse_Param_Types& param_types);
private:
bool is_client_;
// this map associate client_stmt_id to global_stmt_info : this is used only for client connections
std::map<std::string, std::shared_ptr<const PgSQL_STMT_Global_info>> stmt_name_to_global_info;
// this map associate backend_stmt_id to global_stmt_info : this is used only for backend connections
std::map<uint32_t, std::shared_ptr<const PgSQL_STMT_Global_info>> backend_stmt_to_global_info;
// this map associate global_stmt_id to backend_stmt_id : this is used only for backend connections
std::map<uint64_t, uint32_t> global_stmt_to_backend_ids;
PgSQL_Session* sess = nullptr;
// stack of free backend statement ids
std::stack<uint32_t> free_backend_ids;
// maximum assigned statement id in this session
uint32_t local_max_stmt_id = 0;
};
// is client session ?
bool is_client_;
friend class PgSQL_Session;
};
class PgSQL_STMT_Manager_v14 {
// class PgSQL_STMT_Manager manages global prepared statements across all sessions
class PgSQL_STMT_Manager {
public:
PgSQL_STMT_Manager_v14();
~PgSQL_STMT_Manager_v14();
PgSQL_STMT_Global_info* find_prepared_statement_by_hash(uint64_t hash, bool lock=true);
PgSQL_STMT_Global_info* find_prepared_statement_by_stmt_id(uint64_t id, bool lock=true);
inline void rdlock() { pthread_rwlock_rdlock(&rwlock_); }
inline void wrlock() { pthread_rwlock_wrlock(&rwlock_); }
inline void unlock() { pthread_rwlock_unlock(&rwlock_); }
void ref_count_client(uint64_t _stmt, int _v, bool lock=true) noexcept;
void ref_count_server(uint64_t _stmt, int _v, bool lock=true) noexcept;
PgSQL_STMT_Global_info* add_prepared_statement(char *user, char *database, char *query, unsigned int query_len,
char *fc, Parse_Param_Types&& ppt, bool lock=true);
PgSQL_STMT_Manager();
~PgSQL_STMT_Manager();
/**
* @brief Lookup a prepared statement by its 64-bit hash.
*
* @param hash Unique hash computed from user, db, query, and parameter types.
* @param lock If true acquires/release internal read lock; set false only if caller already holds it.
* @return Shared pointer to global statement info or nullptr if not found.
*/
std::shared_ptr<const PgSQL_STMT_Global_info> find_prepared_statement_by_hash(uint64_t hash, bool lock=true);
/**
* @brief Retrieve existing or create a new global prepared statement.
*
* Computes a hash from user, database, query and parameter types. If an entry with that hash
* exists it is returned. Otherwise a new PgSQL_STMT_Global_info is allocated, assigned a
* statement_id (preferring reused IDs from free_stmt_ids), optional digest metadata copied,
* inserted into map_stmt_hash_to_info, and zero-refcount tracking counters updated.
* Always increments the server refcount for the returned statement.
*
* Concurrency: Acquires/release write lock when lock == true.
*
* @param user Client username (null-terminated).
* @param database Database/schema name.
* @param query SQL text.
* @param query_len Length of query.
* @param ppt Vector of parameter type OIDs (moved if new entry is created).
* @param first_comment First comment prefix (optional, may be null).
* @param digest_text Normalized digest text (optional, may be null).
* @param digest 64-bit digest value (used only if digest_text provided).
* @param PgQueryCmd Parsed command classification.
* @param lock If true, method manages locking internally.
* @return shared_ptr to the global prepared statement (never nullptr).
*/
std::shared_ptr<const PgSQL_STMT_Global_info> add_prepared_statement(const char* user, const char* database, const char* query,
unsigned int query_len, Parse_Param_Types&& ppt, const char* first_comment, const char* digest_text, uint64_t digest,
PGSQL_QUERY_command PgQueryCmd, bool lock = true);
/**
* @brief Adjust client refcount for a prepared statement and update per-thread stats.
*
* - Applies delta _v (positive or negative) atomically.
* - Maintains zero-refcount tracking counters on transitions to/from zero.
* - Triggers opportunistic purge heuristic after modification.
*
* Thread-safety: uses atomic operations; no external lock required.
*
* @param stmt_info Non-null pointer to global PS metadata.
* @param _v Delta to apply (typically +1 or -1).
*/
void ref_count_client(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept;
/**
* @brief Adjust server refcount for a prepared statement and update per-thread stats.
*
* - Applies delta _v (positive or negative) atomically.
* - Maintains zero-refcount tracking counters on transitions to/from zero.
*
* Thread-safety: uses atomic operations; no external lock required.
*
* @param stmt_info Non-null pointer to global PS metadata.
* @param _v Delta to apply (typically +1 or -1).
*/
void ref_count_server(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept;
/**
* @brief Retrieve global prepared statement metrics.
*
* Outputs:
* - c_unique: Number of unique prepared statements with client refcount > 0.
* - c_total: Total number of client references across all prepared statements.
* - stmt_max_stmt_id: Maximum assigned statement ID.
* - cached: Total number of unique prepared statements in the global cache.
* - s_unique: Number of unique prepared statements with server refcount > 0.
* - s_total: Total number of server references across all prepared statements.
*/
void get_metrics(uint64_t *c_unique, uint64_t *c_total, uint64_t *stmt_max_stmt_id, uint64_t *cached,
uint64_t *s_unique, uint64_t *s_total);
SQLite3_result* get_prepared_statements_global_infos();
/**
* @brief Retrieve memory usage statistics for prepared statements.
*
* Outputs:
* - prep_stmt_metadata_mem_usage: Total memory used for prepared statement metadata.
* - prep_stmt_backend_mem_usage: Total memory used for backend prepared statement allocations.
*/
void get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage);
/**
* @brief Build and return a snapshot of all global prepared statements.
*
* @return SQLite3_result* containing one row per prepared statement (must be freed by caller).
*/
SQLite3_result* get_prepared_statements_global_infos();
private:
struct Statistics {
struct Totals {
uint64_t c_total;
uint64_t s_total;
} total;
uint64_t c_unique;
uint64_t s_unique;
uint64_t cached;
};
// map from statement hash to global prepared statement info
std::map<uint64_t, std::shared_ptr<const PgSQL_STMT_Global_info>> map_stmt_hash_to_info; // map using hashes
// next statement ID to assign if no free IDs are available
uint64_t next_statement_id;
uint64_t num_stmt_with_ref_client_count_zero;
uint64_t num_stmt_with_ref_server_count_zero;
// counters to track number of statements with zero refcounts
std::atomic<uint64_t> num_stmt_with_ref_client_count_zero;
// counters to track number of statements with zero refcounts
std::atomic<uint64_t> num_stmt_with_ref_server_count_zero;
// read-write lock to protect map_stmt_hash_to_info and free_stmt_ids
pthread_rwlock_t rwlock_;
std::map<uint64_t, PgSQL_STMT_Global_info*> map_stmt_id_to_info; // map using statement id
std::map<uint64_t, PgSQL_STMT_Global_info*> map_stmt_hash_to_info; // map using hashes
// stack of freed statement IDs for reuses
std::stack<uint64_t> free_stmt_ids;
struct {
uint64_t c_unique;
uint64_t c_total;
uint64_t stmt_max_stmt_id;
uint64_t cached;
uint64_t s_unique;
uint64_t s_total;
} statuses;
// last time we purged unused statements
time_t last_purge_time;
// to make lock free status counting per thread, we use thread local storage
inline static thread_local int thd_idx = -1;
std::atomic<int> next_status_idx{};
// FIXME: should be equal to number of worker threads configured in ProxySQL
std::array<Statistics::Totals, PGSQL_MAX_THREADS> stats_total{};
// get per thread statistics totals
inline
Statistics::Totals& get_current_thread_statistics_totals() noexcept {
if (thd_idx == -1) {
thd_idx = next_status_idx.fetch_add(1, std::memory_order_relaxed);
}
return stats_total[thd_idx];
}
// read-write lock wrappers
inline void rdlock() noexcept { pthread_rwlock_rdlock(&rwlock_); }
inline void wrlock() noexcept { pthread_rwlock_wrlock(&rwlock_); }
inline void unlock() noexcept { pthread_rwlock_unlock(&rwlock_); }
/**
* @brief Opportunistically purge unused prepared statements from the global cache.
*
* Concurrency:
* - Fast lock-free pre-check avoids unnecessary write lock.
*/
void ref_count_client___purge_stmts_if_needed() noexcept;
};
#endif /* CLASS_PGSQL_PREPARED_STATEMENT_H */

@ -149,7 +149,7 @@ struct PgSQL_Extended_Query_Info {
const char* stmt_client_name;
const char* stmt_client_portal_name;
const PgSQL_Bind_Message* bind_msg;
PgSQL_STMT_Global_info* stmt_info;
const PgSQL_STMT_Global_info* stmt_info;
uint64_t stmt_global_id;
uint32_t stmt_backend_id;
uint8_t stmt_type;

@ -179,7 +179,7 @@ PgSQL_Connection::PgSQL_Connection(bool is_client_conn) {
options.init_connect = NULL;
options.init_connect_sent = false;
userinfo = new PgSQL_Connection_userinfo();
local_stmts = new PgSQL_STMTs_local_v14(false); // false by default, it is a backend
local_stmts = new PgSQL_STMT_Local(false); // false by default, it is a backend
//for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) {
// variables[i].value = NULL;
@ -2499,7 +2499,7 @@ void PgSQL_Connection::reset() {
reusable = true;
creation_time = monotonic_time();
delete local_stmts;
local_stmts = new PgSQL_STMTs_local_v14(false);
local_stmts = new PgSQL_STMT_Local(false);
// reset all variables
for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) {

@ -9,7 +9,7 @@
#include "PgSQL_PreparedStatement.h"
#include "PgSQL_Protocol.h"
extern PgSQL_STMT_Manager_v14 *GloPgStmt;
extern PgSQL_STMT_Manager *GloPgStmt;
const int PS_GLOBAL_STATUS_FIELD_NUM = 8;
@ -63,25 +63,22 @@ void PgSQL_STMT_Global_info::compute_hash() {
query_length, parse_param_types);
}
PgSQL_STMT_Global_info::PgSQL_STMT_Global_info(uint64_t id,
char *u, char *d, char *q,
unsigned int ql,
char *fc,
Parse_Param_Types&& ppt,
uint64_t _h) {
PgSQL_STMT_Global_info::PgSQL_STMT_Global_info(uint64_t id, const char *_user, const char *_database, const char *_query,
unsigned int _query_len, Parse_Param_Types&& _ppt,
const char *_first_comment, uint64_t _h) {
total_mem_usage = 0;
statement_id = id;
ref_count_client = 0;
ref_count_server = 0;
digest_text = nullptr;
username = strdup(u);
dbname = strdup(d);
query = (char *)malloc(ql + 1);
memcpy(query, q, ql);
query[ql] = '\0'; // add NULL byte
query_length = ql;
first_comment = fc ? strdup(fc) : nullptr;
parse_param_types = std::move(ppt);
username = strdup(_user);
dbname = strdup(_database);
query_length = _query_len;
query = (char *)malloc(query_length + 1);
memcpy(query, _query, query_length);
query[query_length] = '\0'; // add NULL byte
first_comment = _first_comment ? strdup(_first_comment) : nullptr;
parse_param_types = std::move(_ppt);
PgQueryCmd = PGSQL_QUERY__UNINITIALIZED;
if (_h) {
@ -104,8 +101,7 @@ PgSQL_STMT_Global_info::~PgSQL_STMT_Global_info() {
}
void PgSQL_STMT_Global_info::calculate_mem_usage() {
total_mem_usage = sizeof(PgSQL_STMT_Global_info) +
query_length + 1;
total_mem_usage = sizeof(PgSQL_STMT_Global_info) + query_length + 1;
// NOSONAR: strlen is safe here
if (username) total_mem_usage += strlen(username) + 1; // NOSONAR
@ -114,182 +110,221 @@ void PgSQL_STMT_Global_info::calculate_mem_usage() {
if (digest_text) total_mem_usage += strlen(digest_text) + 1; // NOSONAR
}
void PgSQL_STMTs_local_v14::backend_insert(uint64_t global_stmt_id, uint32_t backend_stmt_id) {
global_stmt_to_backend_ids.insert(std::make_pair(global_stmt_id, backend_stmt_id));
backend_stmt_to_global_ids.insert(std::make_pair(backend_stmt_id,global_stmt_id));
PgSQL_STMT_Local::~PgSQL_STMT_Local() {
// Note: we do not free the prepared statements because we assume that
// if we call this destructor the connection is being destroyed anyway
if (is_client_) {
for (auto it = stmt_name_to_global_info.begin(); it != stmt_name_to_global_info.end(); ++it) {
auto* stmt_info = it->second.get();
GloPgStmt->ref_count_client(stmt_info, -1);
}
stmt_name_to_global_info.clear();
}
else {
for (auto it = backend_stmt_to_global_info.begin(); it != backend_stmt_to_global_info.end(); ++it) {
auto* stmt_info = it->second.get();
GloPgStmt->ref_count_server(stmt_info, -1);
}
backend_stmt_to_global_info.clear();
}
}
void PgSQL_STMTs_local_v14::client_insert(uint64_t global_stmt_id, const std::string& client_stmt_name) {
// validate that client_stmt_name is not empty and global_stmt_id is a valid id
[[maybe_unused]] auto [it, inserted] = stmt_name_to_global_ids.try_emplace(client_stmt_name, global_stmt_id);
assert(inserted && "client_stmt_name already exists in stmt_name_to_global_ids"); // Should not happen, as we expect unique client_stmt_name
#ifdef DEBUG
auto range = global_id_to_stmt_names.equal_range(global_stmt_id);
for (auto it = range.first; it != range.second; ++it) {
assert(it->second != client_stmt_name && "client_stmt_name is already mapped to global_stmt_id in global_id_to_stmt_names"); // Should not happen, as we expect unique client_stmt_name per global_stmt_id
void PgSQL_STMT_Local::client_insert(std::shared_ptr<const PgSQL_STMT_Global_info>& stmt_info,
const std::string& client_stmt_name, std::shared_ptr<const PgSQL_STMT_Global_info>* local_stmt_info_ptr) {
assert(stmt_info);
if (local_stmt_info_ptr && (*local_stmt_info_ptr)) {
auto& local_stmt_info = *local_stmt_info_ptr;
if (local_stmt_info->statement_id == stmt_info->statement_id)
return; // no change
// Adjust refcounts: decrement old, increment new
GloPgStmt->ref_count_client(local_stmt_info.get(), -1);
local_stmt_info.reset();
GloPgStmt->ref_count_client(stmt_info.get(), 1);
// Update existing entry to new global stmt info one
local_stmt_info = stmt_info;
return;
}
#endif
global_id_to_stmt_names.emplace(global_stmt_id, client_stmt_name);
GloPgStmt->ref_count_client(global_stmt_id, 1, false); // do not lock!
// New statement name — just insert
stmt_name_to_global_info.emplace(client_stmt_name, stmt_info);
GloPgStmt->ref_count_client(stmt_info.get(), 1);
}
const PgSQL_STMT_Global_info* PgSQL_STMT_Local::find_stmt_info_from_stmt_name(const std::string& client_stmt_name) const {
const PgSQL_STMT_Global_info* ret = nullptr;
if (auto s = stmt_name_to_global_info.find(client_stmt_name); s != stmt_name_to_global_info.end()) {
ret = s->second.get();
}
return ret;
}
bool PgSQL_STMT_Local::client_close(const std::string& client_stmt_name) {
if (auto s = stmt_name_to_global_info.find(client_stmt_name); s != stmt_name_to_global_info.end()) { // found
const PgSQL_STMT_Global_info* stmt_info = s->second.get();
GloPgStmt->ref_count_client(stmt_info, -1);
stmt_name_to_global_info.erase(s);
return true;
}
return false; // we don't really remove the prepared statement
}
void PgSQL_STMT_Local::client_close_all() {
for (auto& [_, global_stmt_info] : stmt_name_to_global_info) {
GloPgStmt->ref_count_client(global_stmt_info.get(), -1);
}
stmt_name_to_global_info.clear();
}
uint64_t PgSQL_STMTs_local_v14::compute_hash(const char *user,
uint32_t PgSQL_STMT_Local::generate_new_backend_stmt_id() {
assert(is_client_ == false);
if (free_backend_ids.empty() == false) {
uint32_t backend_stmt_id = free_backend_ids.top();
free_backend_ids.pop();
return backend_stmt_id;
}
local_max_stmt_id++;
return local_max_stmt_id;
}
void PgSQL_STMT_Local::backend_insert(std::shared_ptr<const PgSQL_STMT_Global_info>& stmt_info, uint32_t backend_stmt_id) {
backend_stmt_to_global_info.insert(std::make_pair(backend_stmt_id, stmt_info));
global_stmt_to_backend_ids.insert(std::make_pair(stmt_info->statement_id, backend_stmt_id));
}
uint32_t PgSQL_STMT_Local::find_backend_stmt_id_from_global_id(uint64_t global_id) const {
if (auto s = global_stmt_to_backend_ids.find(global_id); s != global_stmt_to_backend_ids.end()) {
return s->second;
}
return 0; // not found
}
uint64_t PgSQL_STMT_Local::compute_hash(const char *user,
const char *database, const char *query, unsigned int query_length, const Parse_Param_Types& param_types) {
uint64_t hash = stmt_compute_hash(user, database, query, query_length, param_types);
return hash;
}
PgSQL_STMT_Manager_v14::PgSQL_STMT_Manager_v14() {
PgSQL_STMT_Manager::PgSQL_STMT_Manager() {
last_purge_time = time(NULL);
pthread_rwlock_init(&rwlock_, NULL);
next_statement_id = 1; // we initialize this as 1 because we 0 is not allowed
num_stmt_with_ref_client_count_zero = 0;
num_stmt_with_ref_server_count_zero = 0;
statuses.c_unique = 0;
statuses.c_total = 0;
statuses.stmt_max_stmt_id = 0;
statuses.cached = 0;
statuses.s_unique = 0;
statuses.s_total = 0;
next_status_idx = 0;
}
PgSQL_STMT_Manager_v14::~PgSQL_STMT_Manager_v14() {
for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) {
PgSQL_STMT_Global_info * a = it->second;
delete a;
}
PgSQL_STMT_Manager::~PgSQL_STMT_Manager() {
wrlock();
map_stmt_hash_to_info.clear();
unlock();
}
void PgSQL_STMT_Manager_v14::ref_count_client(uint64_t _stmt_id ,int _v, bool lock) noexcept {
if (lock)
wrlock();
if (auto s = map_stmt_id_to_info.find(_stmt_id); s != map_stmt_id_to_info.end()) {
statuses.c_total += _v;
PgSQL_STMT_Global_info *stmt_info = s->second;
if (stmt_info->ref_count_client == 0 && _v == 1) {
__sync_sub_and_fetch(&num_stmt_with_ref_client_count_zero,1);
} else {
if (stmt_info->ref_count_client == 1 && _v == -1) {
__sync_add_and_fetch(&num_stmt_with_ref_client_count_zero,1);
}
}
stmt_info->ref_count_client += _v;
time_t ct = time(NULL);
uint64_t num_client_count_zero = __sync_add_and_fetch(&num_stmt_with_ref_client_count_zero, 0);
uint64_t num_server_count_zero = __sync_add_and_fetch(&num_stmt_with_ref_server_count_zero, 0);
size_t map_size = map_stmt_id_to_info.size();
if (
(ct > last_purge_time+1) &&
(map_size > (unsigned)pgsql_thread___max_stmts_cache) &&
(num_client_count_zero > map_size/10) &&
(num_server_count_zero > map_size/10)
) { // purge only if there is at least 10% gain
last_purge_time = ct;
int max_purge = map_size ;
std::vector<uint64_t> torem;
torem.reserve(max_purge);
for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) {
if (torem.size() >= std::min(static_cast<size_t>(max_purge),
static_cast<size_t>(num_client_count_zero))) {
break;
}
PgSQL_STMT_Global_info *a = it->second;
if ((__sync_add_and_fetch(&a->ref_count_client, 0) == 0) &&
(a->ref_count_server == 0) ) // this to avoid that IDs are incorrectly reused
{
uint64_t hash = a->hash;
map_stmt_hash_to_info.erase(hash);
__sync_sub_and_fetch(&num_stmt_with_ref_client_count_zero,1);
torem.emplace_back(it->first);
}
}
while (!torem.empty()) {
uint64_t id = torem.back();
torem.pop_back();
auto s3 = map_stmt_id_to_info.find(id);
PgSQL_STMT_Global_info *a = s3->second;
if (a->ref_count_server == 0) {
__sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1);
free_stmt_ids.push(id);
}
map_stmt_id_to_info.erase(s3);
statuses.s_total -= a->ref_count_server;
delete a;
}
}
void PgSQL_STMT_Manager::ref_count_client___purge_stmts_if_needed() noexcept {
/* Heuristic trigger(checked twice : before and after acquiring write lock) :
* 1. At least 1 second since last_purge_time.
* 2. map_stmt_hash_to_info.size() > pgsql_thread___max_stmts_cache.
* 3. >= 10% of entries have client refcount == 0.
* 4. >= 10% of entries have server refcount == 0.
*/
time_t ct = time(NULL);
if (ct <= last_purge_time + 1)
return; // too soon, skip
// --- Light pre-check without lock ---
size_t map_size = map_stmt_hash_to_info.size();
uint64_t num_client_zero = num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed);
uint64_t num_server_zero = num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed);
if (map_size <= (unsigned)pgsql_thread___max_stmts_cache ||
num_client_zero <= map_size / 10 ||
num_server_zero <= map_size / 10) {
// Heuristic says no purge needed
return;
}
if (lock)
unlock();
}
void PgSQL_STMT_Manager_v14::ref_count_server(uint64_t _stmt_id ,int _v, bool lock) noexcept {
if (lock)
wrlock();
std::map<uint64_t, PgSQL_STMT_Global_info *>::iterator s;
s = map_stmt_id_to_info.find(_stmt_id);
if (s != map_stmt_id_to_info.end()) {
statuses.s_total += _v;
PgSQL_STMT_Global_info *stmt_info = s->second;
if (stmt_info->ref_count_server == 0 && _v == 1) {
__sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1);
} else {
if (stmt_info->ref_count_server == 1 && _v == -1) {
__sync_add_and_fetch(&num_stmt_with_ref_server_count_zero,1);
}
}
stmt_info->ref_count_server += _v;
}
if (lock)
// --- Now we know we might purge, take write lock ---
wrlock();
// Double-check under exclusive lock (authoritative)
ct = time(NULL);
if (ct <= last_purge_time + 1) {
unlock();
}
return;
}
PgSQL_STMTs_local_v14::~PgSQL_STMTs_local_v14() {
// Note: we do not free the prepared statements because we assume that
// if we call this destructor the connection is being destroyed anyway
map_size = map_stmt_hash_to_info.size();
num_client_zero = num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed);
num_server_zero = num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed);
if (is_client_) {
for (auto it = stmt_name_to_global_ids.begin();
it != stmt_name_to_global_ids.end(); ++it) {
uint64_t global_stmt_id = it->second;
GloPgStmt->ref_count_client(global_stmt_id, -1);
}
} else {
for (auto it = backend_stmt_to_global_ids.begin();
it != backend_stmt_to_global_ids.end(); ++it) {
uint64_t global_stmt_id = it->second;
GloPgStmt->ref_count_server(global_stmt_id, -1);
}
if (map_size <= (unsigned)pgsql_thread___max_stmts_cache ||
num_client_zero <= map_size / 10 ||
num_server_zero <= map_size / 10) {
last_purge_time = ct;
unlock();
return;
}
}
// --- Actual purge happens here under wrlock() ---
last_purge_time = ct;
PgSQL_STMT_Global_info *PgSQL_STMT_Manager_v14::find_prepared_statement_by_hash(uint64_t hash, bool lock) {
PgSQL_STMT_Global_info *ret = nullptr; // assume we do not find it
if (lock) {
rdlock();
}
if (auto s = map_stmt_hash_to_info.find(hash); s != map_stmt_hash_to_info.end()) {
ret = s->second;
}
//auto& stat_totals = get_current_thread_statistics_totals();
if (lock) {
unlock();
// Determine how many entries we are allowed to remove
size_t remaining_removals = std::min(map_size, static_cast<size_t>(num_client_zero));
/* Purge logic :
* - Iterate statements while remaining_removals > 0.
* - Candidate removal only when shared_ptr use_count() == 1 (only the map holds it).
* - Return its statement_id to free_stmt_ids stack.
* - Decrement zero-refcount counters.
*/
for (auto it = map_stmt_hash_to_info.begin(); it != map_stmt_hash_to_info.end() && remaining_removals > 0; ) {
auto& global_stmt_info = it->second;
// use_count() == 1 indicates that only map_stmt_hash_to_info holds a reference,
// meaning there are no other references (from client or server) to this prepared statement.
// So we can safely remove this entry.
if (global_stmt_info.use_count() == 1) {
// ref_count_client and ref_count_server should both be 0 in this case
assert(global_stmt_info->ref_count_client.load(std::memory_order_relaxed) == 0);
assert(global_stmt_info->ref_count_server.load(std::memory_order_relaxed) == 0);
// Atomic counters
num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed);
num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed);
// Free ID
free_stmt_ids.push(global_stmt_info->statement_id);
// Update totals
//stat_totals.s_total -= global_stmt_info->ref_count_server.load(std::memory_order_relaxed);
// Safe erase from map while iterating
it = map_stmt_hash_to_info.erase(it);
remaining_removals--;
} else {
++it;
}
}
return ret;
unlock();
}
PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::find_prepared_statement_by_stmt_id(
uint64_t id, bool lock) {
PgSQL_STMT_Global_info*ret = nullptr; // assume we do not find it
std::shared_ptr<const PgSQL_STMT_Global_info> PgSQL_STMT_Manager::find_prepared_statement_by_hash(uint64_t hash, bool lock) {
std::shared_ptr<const PgSQL_STMT_Global_info> ret = nullptr; // assume we do not find it
if (lock) {
rdlock();
}
if (auto s = map_stmt_id_to_info.find(id); s != map_stmt_id_to_info.end()) {
if (auto s = map_stmt_hash_to_info.find(hash); s != map_stmt_hash_to_info.end()) {
ret = s->second;
}
@ -299,64 +334,13 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::find_prepared_statement_by_stmt_
return ret;
}
uint32_t PgSQL_STMTs_local_v14::generate_new_backend_stmt_id() {
assert(is_client_ == false);
if (free_backend_ids.empty() == false) {
uint32_t backend_stmt_id = free_backend_ids.top();
free_backend_ids.pop();
return backend_stmt_id;
}
local_max_stmt_id++;
return local_max_stmt_id;
}
uint64_t PgSQL_STMTs_local_v14::find_global_id_from_stmt_name(const std::string& client_stmt_name) {
uint64_t ret=0;
if (auto s = stmt_name_to_global_ids.find(client_stmt_name); s != stmt_name_to_global_ids.end()) {
ret = s->second;
}
return ret;
}
uint32_t PgSQL_STMTs_local_v14::find_backend_stmt_id_from_global_id(uint64_t global_id) {
if (auto s = global_stmt_to_backend_ids.find(global_id); s != global_stmt_to_backend_ids.end()) {
return s->second;
}
return 0; // not found
}
bool PgSQL_STMTs_local_v14::client_close(const std::string& stmt_name) {
if (auto s = stmt_name_to_global_ids.find(stmt_name); s != stmt_name_to_global_ids.end()) { // found
uint64_t global_stmt_id = s->second;
stmt_name_to_global_ids.erase(s);
GloPgStmt->ref_count_client(global_stmt_id, -1);
std::pair<std::multimap<uint64_t,std::string>::iterator, std::multimap<uint64_t,std::string>::iterator> ret;
ret = global_id_to_stmt_names.equal_range(global_stmt_id);
for (std::multimap<uint64_t, std::string>::iterator it=ret.first; it!=ret.second; ++it) {
if (it->second == stmt_name) {
global_id_to_stmt_names.erase(it);
break;
}
}
return true;
}
return false; // we don't really remove the prepared statement
}
std::shared_ptr<const PgSQL_STMT_Global_info> PgSQL_STMT_Manager::add_prepared_statement(const char* user, const char* database, const char* query,
unsigned int query_len, Parse_Param_Types&& ppt, const char* first_comment, const char* digest_text, uint64_t digest,
PGSQL_QUERY_command PgQueryCmd, bool lock) {
std::shared_ptr<const PgSQL_STMT_Global_info> ret = nullptr;
void PgSQL_STMTs_local_v14::client_close_all() {
for (auto [_, global_stmt_id] : stmt_name_to_global_ids) {
GloPgStmt->ref_count_client(global_stmt_id, -1);
}
stmt_name_to_global_ids.clear();
global_id_to_stmt_names.clear();
}
uint64_t hash = stmt_compute_hash(user, database, query, query_len, ppt); // this identifies the prepared statement
PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement(
char *u, char *d, char *q, unsigned int ql,
char *fc, Parse_Param_Types&& ppt, bool lock) {
PgSQL_STMT_Global_info *ret = nullptr;
uint64_t hash = stmt_compute_hash(
u, d, q, ql, ppt); // this identifies the prepared statement
if (lock) {
wrlock();
}
@ -374,105 +358,178 @@ PgSQL_STMT_Global_info* PgSQL_STMT_Manager_v14::add_prepared_statement(
next_statement_id++;
}
auto stmt_info = std::make_unique<PgSQL_STMT_Global_info>(next_id, u, d, q, ql, fc, std::move(ppt), hash);
// insert it in both maps
map_stmt_id_to_info.insert(std::make_pair(stmt_info->statement_id, stmt_info.get()));
map_stmt_hash_to_info.insert(std::make_pair(stmt_info->hash, stmt_info.get()));
ret = stmt_info.release();
__sync_add_and_fetch(&num_stmt_with_ref_client_count_zero,1);
__sync_add_and_fetch(&num_stmt_with_ref_server_count_zero,1);
}
if (ret->ref_count_server == 0) {
__sync_sub_and_fetch(&num_stmt_with_ref_server_count_zero,1);
auto stmt_info = std::make_shared<PgSQL_STMT_Global_info>(next_id, user, database, query, query_len, std::move(ppt), first_comment, hash);
if (digest_text) {
stmt_info->digest_text = strdup(digest_text);
stmt_info->digest = digest; // copy digest
stmt_info->PgQueryCmd = PgQueryCmd; // copy PgComQueryCmd
stmt_info->calculate_mem_usage();
}
ret = std::move(stmt_info);
//map_stmt_hash_to_info[ret->hash] = ret;
map_stmt_hash_to_info.emplace(ret->hash, ret);
num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed);
num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed);
}
ret->ref_count_server++;
statuses.s_total++;
// Server refcount increment logic
ref_count_server(ret.get(), 1);
if (lock) {
unlock();
}
return ret;
}
void PgSQL_STMT_Manager::ref_count_client(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept {
assert(stmt_info);
void PgSQL_STMT_Manager_v14::get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage) {
prep_stmt_backend_mem_usage = 0;
prep_stmt_metadata_mem_usage = sizeof(PgSQL_STMT_Manager_v14);
rdlock();
prep_stmt_metadata_mem_usage += map_stmt_id_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*));
prep_stmt_metadata_mem_usage += map_stmt_hash_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*));
prep_stmt_metadata_mem_usage += free_stmt_ids.size() * (sizeof(uint64_t));
for (const auto&[key, value] : map_stmt_id_to_info) {
const PgSQL_STMT_Global_info* stmt_global_info = value;
prep_stmt_metadata_mem_usage += stmt_global_info->total_mem_usage;
prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_server * 16; // ~16 bytes of memory utilized by global_stmt_id and stmt_id mappings
prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_client * 40; // ~40 bytes of memory utilized by client_stmt_name and global_stmt_id mappings;
auto& stat_totals = get_current_thread_statistics_totals();
// backend
prep_stmt_backend_mem_usage += stmt_global_info->ref_count_server; // FIXME: add backend memory usage
stat_totals.c_total += _v;
if (_v == 1) {
// increment: relaxed is fine for performance
int prev = stmt_info->ref_count_client.fetch_add(1, std::memory_order_relaxed);
// if prev was 0 -> we transitioned 0 -> 1: one fewer zero-count entry
if (prev == 0) {
num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed);
}
}
else if (_v == -1) {
// decrement: use acq_rel to synchronize-with potential deleter
int prev = stmt_info->ref_count_client.fetch_sub(1, std::memory_order_acq_rel);
// prev is the value before subtraction
if (prev == 1) {
// we just transitioned to zero
num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed);
}
}
else {
// support other increments/decrements (if needed)
int prev = stmt_info->ref_count_client.fetch_add(_v,
(_v > 0) ? std::memory_order_relaxed : std::memory_order_acq_rel);
if (_v > 0 && prev == 0) num_stmt_with_ref_client_count_zero.fetch_sub(1, std::memory_order_relaxed);
if (_v < 0 && prev + _v == 0) num_stmt_with_ref_client_count_zero.fetch_add(1, std::memory_order_relaxed);
}
ref_count_client___purge_stmts_if_needed();
}
void PgSQL_STMT_Manager::ref_count_server(const PgSQL_STMT_Global_info* stmt_info, int _v) noexcept {
assert(stmt_info);
auto& stat_totals = get_current_thread_statistics_totals();
stat_totals.s_total += _v;
if (_v == 1) {
int prev = stmt_info->ref_count_server.fetch_add(1, std::memory_order_relaxed);
if (prev == 0) {
num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed);
}
}
else if (_v == -1) {
int prev = stmt_info->ref_count_server.fetch_sub(1, std::memory_order_acq_rel);
if (prev == 1) {
num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed);
}
}
else {
int prev = stmt_info->ref_count_server.fetch_add(_v,
(_v > 0) ? std::memory_order_relaxed : std::memory_order_acq_rel);
if (_v > 0 && prev == 0) num_stmt_with_ref_server_count_zero.fetch_sub(1, std::memory_order_relaxed);
if (_v < 0 && prev + _v == 0) num_stmt_with_ref_server_count_zero.fetch_add(1, std::memory_order_relaxed);
}
unlock();
}
void PgSQL_STMT_Manager_v14::get_metrics(uint64_t *c_unique, uint64_t *c_total,
uint64_t *stmt_max_stmt_id, uint64_t *cached,
uint64_t *s_unique, uint64_t *s_total) {
void PgSQL_STMT_Manager::get_metrics(uint64_t* c_unique, uint64_t* c_total, uint64_t* stmt_max_stmt_id, uint64_t* cached,
uint64_t* s_unique, uint64_t* s_total) {
#ifdef DEBUG
uint64_t c_u = 0;
uint64_t c_t = 0;
uint64_t m = 0;
//uint64_t m = 0;
uint64_t c = 0;
uint64_t s_u = 0;
uint64_t s_t = 0;
#endif
wrlock();
statuses.cached = map_stmt_id_to_info.size();
statuses.c_unique = statuses.cached - num_stmt_with_ref_client_count_zero;
statuses.s_unique = statuses.cached - num_stmt_with_ref_server_count_zero;
Statistics stats{};
for (int i = 0; i < next_status_idx.load(std::memory_order_relaxed); ++i) {
stats.total.c_total += stats_total[i].c_total;
stats.total.s_total += stats_total[i].s_total;
}
stats.cached = map_stmt_hash_to_info.size();
stats.c_unique = stats.cached - num_stmt_with_ref_client_count_zero.load(std::memory_order_relaxed);
stats.s_unique = stats.cached - num_stmt_with_ref_server_count_zero.load(std::memory_order_relaxed);
#ifdef DEBUG
for (std::map<uint64_t, PgSQL_STMT_Global_info *>::iterator it = map_stmt_id_to_info.begin();
it != map_stmt_id_to_info.end(); ++it) {
const PgSQL_STMT_Global_info *a = it->second;
rdlock();
for (const auto& [_, value] : map_stmt_hash_to_info) {
const PgSQL_STMT_Global_info* a = value.get();
c++;
if (a->ref_count_client) {
if (a->ref_count_client.load(std::memory_order_relaxed)) {
c_u++;
c_t += a->ref_count_client;
c_t += a->ref_count_client.load(std::memory_order_relaxed);
}
if (a->ref_count_server) {
if (a->ref_count_server.load(std::memory_order_relaxed)) {
s_u++;
s_t += a->ref_count_server;
}
if (it->first > m) {
m = it->first;
s_t += a->ref_count_server.load(std::memory_order_relaxed);
}
//if (it->first > m) {
// m = it->first;
//}
}
assert (c_u == statuses.c_unique);
assert (c_t == statuses.c_total);
assert (c == statuses.cached);
assert (s_t == statuses.s_total);
assert (s_u == statuses.s_unique);
*stmt_max_stmt_id = m;
#endif
*stmt_max_stmt_id = next_statement_id; // this is max stmt_id, no matter if in used or not
*c_unique = statuses.c_unique;
*c_total = statuses.c_total;
*cached = statuses.cached;
*s_total = statuses.s_total;
*s_unique = statuses.s_unique;
unlock();
assert(c_u == stats.c_unique);
assert(c_t == stats.total.c_total);
assert(c == stats.cached);
assert(s_t == stats.total.s_total);
assert(s_u == stats.s_unique);
//*stmt_max_stmt_id = m;
#endif
* stmt_max_stmt_id = next_statement_id; // this is max stmt_id, no matter if in used or not
*c_unique = stats.c_unique;
*c_total = stats.total.c_total;
*cached = stats.cached;
*s_total = stats.total.s_total;
*s_unique = stats.s_unique;
}
void PgSQL_STMT_Manager::get_memory_usage(uint64_t& prep_stmt_metadata_mem_usage, uint64_t& prep_stmt_backend_mem_usage) {
prep_stmt_backend_mem_usage = 0;
prep_stmt_metadata_mem_usage = sizeof(PgSQL_STMT_Manager);
rdlock();
prep_stmt_metadata_mem_usage += map_stmt_hash_to_info.size() * (sizeof(uint64_t) + sizeof(PgSQL_STMT_Global_info*));
prep_stmt_metadata_mem_usage += free_stmt_ids.size() * (sizeof(uint64_t));
for (const auto& [_, value] : map_stmt_hash_to_info) {
const PgSQL_STMT_Global_info* stmt_global_info = value.get();
prep_stmt_metadata_mem_usage += stmt_global_info->total_mem_usage;
prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_server * ((sizeof(uint64_t) * 2) + sizeof(std::shared_ptr<PgSQL_STMT_Global_info>));
prep_stmt_metadata_mem_usage += stmt_global_info->ref_count_client * (sizeof(std::string) + 16 + sizeof(std::shared_ptr<PgSQL_STMT_Global_info>));
// backend
prep_stmt_backend_mem_usage += stmt_global_info->ref_count_server; // FIXME: add backend memory usage
}
unlock();
}
class PgSQL_PS_global_stats {
public:
public:
uint64_t statement_id;
char *username;
char *dbname;
char* username;
char* dbname;
uint64_t digest;
unsigned long long ref_count_client;
unsigned long long ref_count_server;
char *query;
char* query;
int num_param_types;
PgSQL_PS_global_stats(uint64_t stmt_id, const char *d, const char *u, uint64_t dig, const char *q,
PgSQL_PS_global_stats(uint64_t stmt_id, const char* d, const char* u, uint64_t dig, const char* q,
unsigned long long ref_c, unsigned long long ref_s, int params) {
statement_id = stmt_id;
digest = dig;
@ -484,38 +541,38 @@ class PgSQL_PS_global_stats {
num_param_types = params;
}
~PgSQL_PS_global_stats() {
if (query)
if (query)
free(query);
if (username)
free(username);
if (dbname)
free(dbname);
}
char **get_row() {
char** get_row() {
char buf[128];
char **pta=(char **)malloc(sizeof(char *)*PS_GLOBAL_STATUS_FIELD_NUM);
snprintf(buf,sizeof(buf),"%lu",statement_id);
pta[0]=strdup(buf);
char** pta = (char**)malloc(sizeof(char*) * PS_GLOBAL_STATUS_FIELD_NUM);
snprintf(buf, sizeof(buf), "%lu", statement_id);
pta[0] = strdup(buf);
assert(dbname);
pta[1]=strdup(dbname);
pta[1] = strdup(dbname);
assert(username);
pta[2]=strdup(username);
snprintf(buf,sizeof(buf),"0x%016llX", (long long unsigned int)digest);
pta[3]=strdup(buf);
pta[2] = strdup(username);
snprintf(buf, sizeof(buf), "0x%016llX", (long long unsigned int)digest);
pta[3] = strdup(buf);
assert(query);
pta[4]=strdup(query);
snprintf(buf,sizeof(buf),"%llu",ref_count_client);
pta[5]=strdup(buf);
snprintf(buf,sizeof(buf),"%llu",ref_count_server);
pta[6]=strdup(buf);
snprintf(buf,sizeof(buf),"%d",num_param_types);
pta[7]=strdup(buf);
pta[4] = strdup(query);
snprintf(buf, sizeof(buf), "%llu", ref_count_client);
pta[5] = strdup(buf);
snprintf(buf, sizeof(buf), "%llu", ref_count_server);
pta[6] = strdup(buf);
snprintf(buf, sizeof(buf), "%d", num_param_types);
pta[7] = strdup(buf);
return pta;
}
void free_row(char **pta) {
void free_row(char** pta) {
int i;
for (i=0;i<PS_GLOBAL_STATUS_FIELD_NUM;i++) {
for (i = 0; i < PS_GLOBAL_STATUS_FIELD_NUM; i++) {
assert(pta[i]);
free(pta[i]);
}
@ -523,27 +580,27 @@ class PgSQL_PS_global_stats {
}
};
SQLite3_result* PgSQL_STMT_Manager_v14::get_prepared_statements_global_infos() {
SQLite3_result* PgSQL_STMT_Manager::get_prepared_statements_global_infos() {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 4, "Dumping current prepared statements global info\n");
auto result = std::make_unique<SQLite3_result>(PS_GLOBAL_STATUS_FIELD_NUM);
result->add_column_definition(SQLITE_TEXT,"stmt_id");
result->add_column_definition(SQLITE_TEXT,"database");
result->add_column_definition(SQLITE_TEXT,"username");
result->add_column_definition(SQLITE_TEXT,"digest");
result->add_column_definition(SQLITE_TEXT,"query");
result->add_column_definition(SQLITE_TEXT,"ref_count_client");
result->add_column_definition(SQLITE_TEXT,"ref_count_server");
result->add_column_definition(SQLITE_TEXT,"num_param_types");
result->add_column_definition(SQLITE_TEXT, "stmt_id");
result->add_column_definition(SQLITE_TEXT, "database");
result->add_column_definition(SQLITE_TEXT, "username");
result->add_column_definition(SQLITE_TEXT, "digest");
result->add_column_definition(SQLITE_TEXT, "query");
result->add_column_definition(SQLITE_TEXT, "ref_count_client");
result->add_column_definition(SQLITE_TEXT, "ref_count_server");
result->add_column_definition(SQLITE_TEXT, "num_param_types");
rdlock();
for (auto it = map_stmt_id_to_info.begin(); it != map_stmt_id_to_info.end(); ++it) {
PgSQL_STMT_Global_info *stmt_global_info = it->second;
for (auto it = map_stmt_hash_to_info.begin(); it != map_stmt_hash_to_info.end(); ++it) {
const PgSQL_STMT_Global_info* stmt_global_info = it->second.get();
auto pgs = std::make_unique<PgSQL_PS_global_stats>(stmt_global_info->statement_id,
stmt_global_info->dbname, stmt_global_info->username, stmt_global_info->hash, stmt_global_info->query,
stmt_global_info->ref_count_client, stmt_global_info->ref_count_server, stmt_global_info->parse_param_types.size());
char **pta = pgs->get_row();
stmt_global_info->ref_count_client.load(std::memory_order_relaxed), stmt_global_info->ref_count_server.load(std::memory_order_relaxed),
stmt_global_info->parse_param_types.size());
char** pta = pgs->get_row();
result->add_row(pta);
pgs->free_row(pta);
}

@ -89,7 +89,7 @@ extern PgSQL_Authentication* GloPgAuth;
extern MySQL_LDAP_Authentication* GloMyLdapAuth;
extern ProxySQL_Admin* GloAdmin;
extern PgSQL_Logger* GloPgSQL_Logger;
extern PgSQL_STMT_Manager_v14* GloPgStmt;
extern PgSQL_STMT_Manager* GloPgStmt;
extern SQLite3_Server* GloSQLite3Server;
@ -506,7 +506,10 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) {
}
//j["conn"]["no_backslash_escapes"] = client_myds->myconn->options.no_backslash_escapes;
//j["conn"]["status"]["compression"] = client_myds->myconn->get_status(STATUS_PGSQL_CONNECTION_COMPRESSION);
j["conn"]["ps"]["stmt_name_to_global_ids"] = client_myds->myconn->local_stmts->stmt_name_to_global_ids;
json& stmt_name_to_global_ids = j["client"]["ps"]["stmt_name_to_global_ids"];
for (const auto& [stmt_name, global_stmt_info] : client_myds->myconn->local_stmts->stmt_name_to_global_info) {
stmt_name_to_global_ids[stmt_name.c_str()] = global_stmt_info->statement_id;
}
//j["conn"]["ps"]["global_id_to_stmt_names"] = client_myds->myconn->local_stmts->global_id_to_stmt_names;
const PgSQL_Conn_Param& conn_params = client_myds->myconn->conn_params;
@ -550,11 +553,8 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) {
j["backends"][i]["conn"]["questions"] = _myconn->statuses.questions;
j["backends"][i]["conn"]["pgconnpoll_get"] = _myconn->statuses.pgconnpoll_get;
j["backends"][i]["conn"]["pgconnpoll_put"] = _myconn->statuses.pgconnpoll_put;
//j["backend"][i]["conn"]["charset"] = _myds->myconn->options.charset; // not used for backend
//j["backends"][i]["conn"]["session_track_gtids"] = (_myconn->options.session_track_gtids ? _myconn->options.session_track_gtids : "");
j["backends"][i]["conn"]["init_connect"] = (_myconn->options.init_connect ? _myconn->options.init_connect : "");
j["backends"][i]["conn"]["init_connect_sent"] = _myds->myconn->options.init_connect_sent;
//j["backends"][i]["conn"]["standard_conforming_strings"] = _myconn->options.no_backslash_escapes;
j["backends"][i]["conn"]["status"]["advisory_lock"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_ADVISORY_LOCK);
j["backends"][i]["conn"]["status"]["advisory_xact_lock"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_ADVISORY_XACT_LOCK);
j["backends"][i]["conn"]["status"]["lock_tables"] = _myconn->get_status(STATUS_PGSQL_CONNECTION_LOCK_TABLES);
@ -578,12 +578,13 @@ void PgSQL_Session::generate_proxysql_internal_session_json(json& j) {
}
j["backends"][i]["conn"]["MultiplexDisabled_ext"] = multiplex_disabled;
}
j["backends"][i]["conn"]["ps"]["backend_stmt_to_global_ids"] = _myconn->local_stmts->backend_stmt_to_global_ids;
json& backend_stmt_to_global_ids = j["backends"][i]["conn"]["ps"]["backend_stmt_to_global_ids"];
for (const auto& [backend_stmt, global_stmt_info] : client_myds->myconn->local_stmts->backend_stmt_to_global_info) {
backend_stmt_to_global_ids[backend_stmt] = global_stmt_info->statement_id;
}
j["backends"][i]["conn"]["ps"]["global_stmt_to_backend_ids"] = _myconn->local_stmts->global_stmt_to_backend_ids;
//j["backends"][i]["conn"]["client_flag"]["value"] = _myconn->options.client_flag;
//j["backends"][i]["conn"]["client_flag"]["client_found_rows"] = (_myconn->options.client_flag & CLIENT_FOUND_ROWS ? 1 : 0);
//j["backends"][i]["conn"]["client_flag"]["client_multi_statements"] = (_myconn->options.client_flag & CLIENT_MULTI_STATEMENTS ? 1 : 0);
//j["backends"][i]["conn"]["client_flag"]["client_deprecate_eof"] = (_myconn->options.client_flag & CLIENT_DEPRECATE_EOF ? 1 : 0);
if (_myconn->is_connected()) {
sprintf(buff, "%p", _myconn->get_pg_connection());
j["backends"][i]["conn"]["pgsql"]["address"] = buff;
@ -5643,36 +5644,31 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
}
}
// if the same statement name is used, we drop it
PgSQL_STMTs_local_v14* local_stmts = client_myds->myconn->local_stmts;
std::string stmt_name(extended_query_info.stmt_client_name);
// If a client provides a statement name that already exists in the local map,
// validate whether it can be reused. Only the *unnamed* statement ("") may be redefined.
PgSQL_STMT_Local* local_stmts = client_myds->myconn->local_stmts;
std::string client_stmt_name(extended_query_info.stmt_client_name);
if (auto it = local_stmts->stmt_name_to_global_ids.find(stmt_name);
it != local_stmts->stmt_name_to_global_ids.end()) {
// Try to find an existing statement entry for this name in Local map
auto local_stmt_info_itr = local_stmts->stmt_name_to_global_info.find(client_stmt_name);
if (!stmt_name.empty()) {
const std::string& errmsg = "prepared statement \"" + stmt_name + "\" already exist";
// ----------------------------------------------------------------------
// 1) Reject redefinition of a named prepared statement
// (only the empty statement name is allowed to be overwritten).
// ----------------------------------------------------------------------
if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) {
if (!client_stmt_name.empty()) {
const std::string& errmsg = "prepared statement \"" + client_stmt_name + "\" already exist";
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_DUPLICATE_PSTATEMENT,
errmsg.c_str(), false);
l_free(parse_pkt.size, parse_pkt.ptr);
return 2;
}
uint64_t global_id = it->second;
auto range = local_stmts->global_id_to_stmt_names.equal_range(global_id);
for (auto iter = range.first; iter != range.second; ++iter) {
if (iter->second == stmt_name) {
local_stmts->global_id_to_stmt_names.erase(iter);
break;
}
}
local_stmts->stmt_name_to_global_ids.erase(it);
local_stmts->client_close(stmt_name);
}
// Hash the query
// ----------------------------------------------------------------------
// 2) Compute hash for current query
// ----------------------------------------------------------------------
uint64_t hash = local_stmts->compute_hash(
client_myds->myconn->userinfo->username,
client_myds->myconn->userinfo->dbname,
@ -5681,24 +5677,63 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg
CurrentQuery.extended_query_info.parse_param_types
);
// Check global statement cache
GloPgStmt->wrlock();
PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash, false);
// ----------------------------------------------------------------------
// 3) Local-cache fast path:
// If the client already prepared this same SQL under the same name,
// and the hash matches, reuse the existing global statement.
// ----------------------------------------------------------------------
if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) {
auto& local_stmt_info = local_stmt_info_itr->second;
// Exact match found; treat as a parse success and continue normally.
if (local_stmt_info && local_stmt_info->hash == hash) {
extended_query_info.stmt_global_id = local_stmt_info->statement_id;
client_myds->setDSS_STATE_QUERY_SENT_NET();
char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I';
bool send_ready_packet = is_extended_query_ready_for_query();
client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state);
RequestEnd(NULL, false);
l_free(parse_pkt.size, parse_pkt.ptr);
return 0;
}
}
// ----------------------------------------------------------------------
// 4) Global-cache lookup:
// If another session already prepared an identical statement (same hash),
// link the local name to that shared global entry.
// ----------------------------------------------------------------------
auto stmt_info = GloPgStmt->find_prepared_statement_by_hash(hash);
if (stmt_info) {
local_stmts->client_insert(stmt_info->statement_id, stmt_name);
std::shared_ptr<const PgSQL_STMT_Global_info>* local_stmt_info_ptr = nullptr;
if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) {
local_stmt_info_ptr = &local_stmt_info_itr->second; // reference to shared_ptr inside map
}
local_stmts->client_insert(stmt_info, client_stmt_name, local_stmt_info_ptr);
extended_query_info.stmt_global_id = stmt_info->statement_id;
GloPgStmt->unlock();
client_myds->setDSS_STATE_QUERY_SENT_NET();
char txn_state = NumActiveTransactions() > 0 ? 'T' : 'I';
bool send_ready_packet = is_extended_query_ready_for_query();
client_myds->myprot.generate_parse_completion_packet(true, send_ready_packet, txn_state);
//LogQuery(nullptr);
//CurrentQuery.end_time = thread->curtime;
RequestEnd(NULL, false);
l_free(parse_pkt.size, parse_pkt.ptr);
return 0;
}
GloPgStmt->unlock();
// ----------------------------------------------------------------------
// 5) The local name is being reused but does not match the SQL/hash.
// Clean up the old entry before creating a new global statement later.
// ----------------------------------------------------------------------
if (local_stmt_info_itr != local_stmts->stmt_name_to_global_info.end()) {
auto& local_stmt_info = local_stmt_info_itr->second;
// Decrement global reference and remove stale local pointer
if (local_stmt_info) {
GloPgStmt->ref_count_client(local_stmt_info.get(), -1);
local_stmt_info.reset();
}
local_stmts->stmt_name_to_global_info.erase(local_stmt_info_itr);
}
if (extended_query_frame.empty() == true) {
extended_query_info.flags |= PGSQL_EXTENDED_QUERY_FLAG_SYNC;
@ -5774,29 +5809,21 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des
}
assert(stmt_client_name);
uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name);
if (stmt_global_id == 0) {
// Look up an existing local statement info for client-provided statement name
const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name);
if (!stmt_info) {
const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") :
"unnamed prepared statement does not exist";
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false);
return 2;
}
// now we get the statement information
PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id);
if (stmt_info == NULL) {
// we couldn't find it
const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") :
"unnamed prepared statement does not exist";
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false);
return 2;
}
// describe_msg memory will be freed in pgsql_real_query.end()
// CurrentQuery.stmt_client_name may briefly become a dangling pointer until CurrentQuery.end() is invoked
PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info;
extended_query_info.stmt_client_name = stmt_client_name;
extended_query_info.stmt_client_portal_name = portal_name;
extended_query_info.stmt_global_id = stmt_global_id;
extended_query_info.stmt_global_id = stmt_info->statement_id;
extended_query_info.stmt_info = stmt_info;
extended_query_info.stmt_type = stmt_type;
CurrentQuery.start_time = thread->curtime;
@ -5921,18 +5948,9 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) {
return 2;
}
uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name);
if (stmt_global_id == 0) {
const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") :
"unnamed prepared statement does not exist";
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false);
return 2;
}
// now we get the statement information
PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id);
if (stmt_info == NULL) {
// we couldn't find it
// Look up an existing local statement info for client-provided statement name
const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name);
if (!stmt_info) {
const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") :
"unnamed prepared statement does not exist";
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false);
@ -5942,7 +5960,7 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) {
PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info;
extended_query_info.stmt_client_name = stmt_client_name;
extended_query_info.stmt_client_portal_name = portal_name;
extended_query_info.stmt_global_id = stmt_global_id;
extended_query_info.stmt_global_id = stmt_info->statement_id;
extended_query_info.stmt_info = stmt_info;
CurrentQuery.start_time = thread->curtime;
@ -6033,18 +6051,10 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
// bind_waiting_for_execute will be released on CurrentQuery.end() call or session destory
const char* stmt_client_name = bind_waiting_for_execute->data().stmt_name;
uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name);
if (stmt_global_id == 0) {
const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") :
"unnamed prepared statement does not exist";
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false);
return 2;
}
// now we get the statement information
PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id);
if (stmt_info == NULL) {
// we couldn't find it
// Look up an existing local statement info for client-provided statement name
const PgSQL_STMT_Global_info* stmt_info = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(stmt_client_name);
if (!stmt_info) {
const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") :
"unnamed prepared statement does not exist";
handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false);
@ -6054,7 +6064,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu
PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info;
extended_query_info.stmt_client_portal_name = portal_name;
extended_query_info.stmt_client_name = stmt_client_name;
extended_query_info.stmt_global_id = stmt_global_id;
extended_query_info.stmt_global_id = stmt_info->statement_id;
extended_query_info.stmt_info = stmt_info;
extended_query_info.bind_msg = bind_waiting_for_execute.get();
extended_query_info.flags |= execute_msg->send_describe_portal_result ?
@ -6371,32 +6381,24 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_E
bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& st, PgSQL_Data_Stream* myds) {
thread->status_variables.stvar[st_var_backend_stmt_prepare]++;
uint64_t global_stmtid;
PgSQL_STMT_Global_info* stmt_info = NULL;
GloPgStmt->wrlock();
stmt_info = GloPgStmt->add_prepared_statement(
(char*)client_myds->myconn->userinfo->username,
(char*)client_myds->myconn->userinfo->dbname,
(char*)CurrentQuery.QueryPointer,
auto stmt_info = GloPgStmt->add_prepared_statement(
client_myds->myconn->userinfo->username,
client_myds->myconn->userinfo->dbname,
(const char*)CurrentQuery.QueryPointer,
CurrentQuery.QueryLength,
CurrentQuery.QueryParserArgs.first_comment,
std::move(CurrentQuery.extended_query_info.parse_param_types),
false);
CurrentQuery.QueryParserArgs.first_comment,
CurrentQuery.QueryParserArgs.digest_text,
CurrentQuery.QueryParserArgs.digest,
CurrentQuery.PgQueryCmd
);
assert(stmt_info); // GloPgStmt->add_prepared_statement() should always return a valid pointer
if (CurrentQuery.QueryParserArgs.digest_text) {
if (stmt_info->digest_text == NULL) {
stmt_info->digest_text = strdup(CurrentQuery.QueryParserArgs.digest_text);
stmt_info->digest = CurrentQuery.QueryParserArgs.digest; // copy digest
stmt_info->PgQueryCmd = CurrentQuery.PgQueryCmd; // copy PgComQueryCmd
stmt_info->calculate_mem_usage();
}
}
PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info;
extended_query_info.stmt_info = stmt_info;
global_stmtid = stmt_info->statement_id;
myds->myconn->local_stmts->backend_insert(global_stmtid, extended_query_info.stmt_backend_id);
extended_query_info.stmt_info = stmt_info.get();
myds->myconn->local_stmts->backend_insert(stmt_info, extended_query_info.stmt_backend_id);
st = status;
if (previous_status.empty() == false) {
@ -6405,15 +6407,18 @@ bool PgSQL_Session::handler___rc0_PROCESSING_STMT_PREPARE(enum session_status& s
myds->DSS = STATE_MARIADB_GENERIC;
st = previous_status.top();
previous_status.pop();
GloPgStmt->unlock();
return true;
}
// We only perform the client_insert when there is no previous status, this
// is, when 'PROCESSING_STMT_PREPARE' is reached directly without transitioning from a previous status
// like 'PROCESSING_STMT_EXECUTE'.
assert(extended_query_info.stmt_client_name);
client_myds->myconn->local_stmts->client_insert(global_stmtid, extended_query_info.stmt_client_name);
GloPgStmt->unlock();
#ifdef DEBUG
auto* stmt_info_dbg = client_myds->myconn->local_stmts->find_stmt_info_from_stmt_name(extended_query_info.stmt_client_name);
assert(stmt_info_dbg == nullptr);
#endif
client_myds->myconn->local_stmts->client_insert(stmt_info, extended_query_info.stmt_client_name, nullptr);
return false;
}

@ -40,7 +40,7 @@ extern ProxySQL_Admin *GloAdmin;
extern MySQL_Threads_Handler *GloMTH;
extern PgSQL_Threads_Handler* GloPTH;
extern MySQL_STMT_Manager_v14 *GloMyStmt;
extern PgSQL_STMT_Manager_v14* GloPgStmt;
extern PgSQL_STMT_Manager* GloPgStmt;
extern MySQL_Query_Processor* GloMyQPro;
extern PgSQL_Query_Processor* GloPgQPro;
extern ProxySQL_Cluster *GloProxyCluster;

@ -479,7 +479,7 @@ MySQL_Threads_Handler *GloMTH = NULL;
PgSQL_Threads_Handler* GloPTH = NULL;
Web_Interface *GloWebInterface;
MySQL_STMT_Manager_v14 *GloMyStmt;
PgSQL_STMT_Manager_v14 *GloPgStmt;
PgSQL_STMT_Manager *GloPgStmt;
MySQL_Monitor *GloMyMon;
PgSQL_Monitor *GloPgMon;
@ -923,7 +923,7 @@ void ProxySQL_Main_init_main_modules() {
GloPgSQL_Logger = new PgSQL_Logger();
GloPgSQL_Logger->print_version();
GloMyStmt=new MySQL_STMT_Manager_v14();
GloPgStmt=new PgSQL_STMT_Manager_v14();
GloPgStmt=new PgSQL_STMT_Manager();
PgHGM = new PgSQL_HostGroups_Manager();
PgHGM->init();
PgSQL_Threads_Handler* _tmp_GloPTH = NULL;

Loading…
Cancel
Save