Initial work to write Base_Session

v2.x_pg_PrepStmtBase_240714
Rene Cannao 2 years ago
parent a1fd4af1a6
commit ae1e088d82

@ -1,8 +1,19 @@
#ifndef CLASS_BASE_SESSION_H
#define CLASS_BASE_SESSION_H
class Base_Session;
//// avoid loading definition of MySQL_Session and PgSQL_Session
//#define __CLASS_MYSQL_SESSION_H
//#define __CLASS_PGSQL_SESSION_H
#include "Client_Session.h"
#include "proxysql.h"
#include "cpp.h"
#ifndef CLASS_BASE_SESSION_H
#define CLASS_BASE_SESSION_H
class MySQL_STMTs_meta;
class StmtLongDataHandler;
class MySQL_Session;
class PgSQL_Session;
@ -10,6 +21,79 @@ class Base_Session {
public:
Base_Session();
~Base_Session();
// uint64_t
unsigned long long start_time;
unsigned long long pause_until;
unsigned long long idle_since;
unsigned long long transaction_started_at;
PtrArray *mybes;
/*
* @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the
* maintenance thread. These values will be used to release the retained connections in the specific
* hostgroups in housekeeping operations, before client packet processing. Currently 'housekeeping_before_pkts'.
*/
std::vector<int32_t> hgs_expired_conns {};
char * default_schema;
char * user_attributes;
//this pointer is always initialized inside handler().
// it is an attempt to start simplifying the complexing of handler()
uint32_t thread_session_id;
unsigned long long last_insert_id;
int last_HG_affected_rows;
enum session_status status;
int healthy;
int user_max_connections;
int current_hostgroup;
int default_hostgroup;
int previous_hostgroup;
/**
* @brief Charset directly specified by the client. Supplied and updated via 'HandshakeResponse'
* and 'COM_CHANGE_USER' packets.
* @details Used when session needs to be restored via 'COM_RESET_CONNECTION'.
*/
int default_charset;
int locked_on_hostgroup;
int next_query_flagIN;
int mirror_hostgroup;
int mirror_flagOUT;
unsigned int active_transactions;
int autocommit_on_hostgroup;
int transaction_persistent_hostgroup;
int to_process;
int pending_connect;
enum proxysql_session_type session_type;
int warning_in_hg;
// bool
bool autocommit;
bool autocommit_handled;
bool sending_set_autocommit;
bool killed;
bool locked_on_hostgroup_and_all_variables_set;
//bool admin;
bool max_connections_reached;
bool client_authenticated;
bool connections_handler;
bool mirror;
//bool stats;
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
MySQL_STMTs_meta *sess_STMTs_meta;
StmtLongDataHandler *SLDH;
template <typename T> void init();
template<typename B, typename S> B * find_backend(int hostgroup_id);
template<typename B, typename S, typename D> B * create_backend(int, D * _myds = NULL);
template<typename B, typename S, typename D> B * find_or_create_backend(int, D * _myds = NULL);
};
#endif // CLASS_BASE_SESSION_H

@ -118,17 +118,6 @@ public:
};
#define TO_CONNECTION_INFO(connection_info) Connection_Info_T<decltype(connection_info)>(connection_info)
enum proxysql_session_type {
PROXYSQL_SESSION_MYSQL,
PROXYSQL_SESSION_ADMIN,
PROXYSQL_SESSION_STATS,
PROXYSQL_SESSION_SQLITE,
PROXYSQL_SESSION_CLICKHOUSE,
PROXYSQL_SESSION_MYSQL_EMU,
PROXYSQL_SESSION_PGSQL,
PROXYSQL_SESSION_NONE
};
std::string proxysql_session_type_str(enum proxysql_session_type session_type);
#endif /* __CLASS_CLIENT_SESSION_H */

@ -3,7 +3,7 @@
* @brief Declaration of the MySQL_Session class and associated types and enums.
*/
#ifdef CLASS_BASE_SESSION_H
#ifndef __CLASS_MYSQL_SESSION_H
#define __CLASS_MYSQL_SESSION_H
@ -14,6 +14,7 @@
#include "proxysql.h"
#include "cpp.h"
#include "MySQL_Variables.h"
#include "Base_Session.h"
#ifndef PROXYJSON
#define PROXYJSON
@ -51,7 +52,7 @@ enum ps_type : uint8_t {
std::string proxysql_session_type_str(enum proxysql_session_type session_type);
//std::string proxysql_session_type_str(enum proxysql_session_type session_type);
/**
* @class Query_Info
@ -105,7 +106,7 @@ class Query_Info {
* This class is central to ProxySQL's handling of client connections. It manages the lifecycle
* of a session, processes queries, and communicates with backend MySQL servers.
*/
class MySQL_Session
class MySQL_Session: public Base_Session
{
private:
//int handler_ret;
@ -214,7 +215,7 @@ class MySQL_Session
bool handler_again___status_SETTING_MULTI_STMT(int *_rc);
bool handler_again___multiple_statuses(int *rc);
void init();
//void init();
void reset();
void add_ldap_comment_to_pkt(PtrSize_t *);
/**
@ -271,28 +272,31 @@ class MySQL_Session
bool handler_again___status_SETTING_GENERIC_VARIABLE(int *_rc, const char *var_name, const char *var_value, bool no_quote=false, bool set_transaction=false);
bool handler_again___status_SETTING_SQL_LOG_BIN(int *);
std::stack<enum session_status> previous_status;
void * operator new(size_t);
void operator delete(void *);
Query_Info CurrentQuery;
PtrSize_t mirrorPkt;
PtrSize_t pkt;
#if 0
// uint64_t
unsigned long long start_time;
unsigned long long pause_until;
unsigned long long idle_since;
unsigned long long transaction_started_at;
#endif // 0
// pointers
MySQL_Thread *thread;
Query_Processor_Output *qpo;
StatCounters *command_counters;
MySQL_Backend *mybe;
#if 0
PtrArray *mybes;
#endif // 0
MySQL_Data_Stream *client_myds;
MySQL_Data_Stream *server_myds;
#if 0
/*
* @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the
* maintenance thread. These values will be used to release the retained connections in the specific
@ -349,6 +353,7 @@ class MySQL_Session
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
#endif // 0
/**
* @brief This status variable tracks whether the session is performing an
* 'Auth Switch' due to a 'COM_CHANGE_USER' packet.
@ -366,8 +371,8 @@ class MySQL_Session
//uint64_t gtid_trxid;
int gtid_hid;
MySQL_STMTs_meta *sess_STMTs_meta;
StmtLongDataHandler *SLDH;
// MySQL_STMTs_meta *sess_STMTs_meta;
// StmtLongDataHandler *SLDH;
Session_Regex **match_regexes;
@ -386,9 +391,9 @@ class MySQL_Session
int handler();
void (*handler_function) (Client_Session<MySQL_Session*> arg, void *, PtrSize_t *pkt);
MySQL_Backend * find_backend(int);
MySQL_Backend * create_backend(int, MySQL_Data_Stream *_myds=NULL);
MySQL_Backend * find_or_create_backend(int, MySQL_Data_Stream *_myds=NULL);
//MySQL_Backend * find_backend(int);
//MySQL_Backend * create_backend(int, MySQL_Data_Stream *_myds=NULL);
//MySQL_Backend * find_or_create_backend(int, MySQL_Data_Stream *_myds=NULL);
void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *, bool in_transaction=false, bool deprecate_eof_active=false);
void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS, unsigned int warning_count, MySQL_Data_Stream *_myds=NULL);
@ -469,3 +474,4 @@ private:
void * kill_query_thread(void *arg);
#endif /* __CLASS_MYSQL_SESSION_ H */
#endif // CLASS_BASE_SESSION_H

@ -1,3 +1,5 @@
#ifdef CLASS_BASE_SESSION_H
#ifndef __CLASS_PGSQL_SESSION_H
#define __CLASS_PGSQL_SESSION_H
@ -9,6 +11,7 @@
#include "Client_Session.h"
#include "cpp.h"
#include "PgSQL_Variables.h"
#include "Base_Session.h"
class PgSQL_Query_Result;
@ -43,7 +46,7 @@ enum PgSQL_ps_type : uint8_t {
std::string proxysql_session_type_str(enum proxysql_session_type session_type);
//std::string proxysql_session_type_str(enum proxysql_session_type session_type);
// these structs will be used for various regex hardcoded
// their initial use will be for sql_log_bin , sql_mode and time_zone
@ -201,7 +204,7 @@ private:
bool handler_again___status_CHANGING_AUTOCOMMIT(int*);
bool handler_again___status_SETTING_MULTI_STMT(int* _rc);
bool handler_again___multiple_statuses(int* rc);
void init();
//void init();
void reset();
void add_ldap_comment_to_pkt(PtrSize_t*);
/**
@ -247,28 +250,31 @@ public:
bool handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, const char* var_name, const char* var_value, bool no_quote = false, bool set_transaction = false);
bool handler_again___status_SETTING_SQL_LOG_BIN(int*);
std::stack<enum session_status> previous_status;
void* operator new(size_t);
void operator delete(void*);
PgSQL_Query_Info CurrentQuery;
PtrSize_t mirrorPkt;
PtrSize_t pkt;
#if 0
// uint64_t
unsigned long long start_time;
unsigned long long pause_until;
unsigned long long idle_since;
unsigned long long transaction_started_at;
#endif // 0
// pointers
PgSQL_Thread* thread;
Query_Processor_Output* qpo;
StatCounters* command_counters;
PgSQL_Backend* mybe;
#if 0
PtrArray* mybes;
#endif // 0
PgSQL_Data_Stream* client_myds;
PgSQL_Data_Stream* server_myds;
#if 0
/*
* @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the
* maintenance thread. These values will be used to release the retained connections in the specific
@ -325,6 +331,7 @@ public:
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
#endif // 0
/**
* @brief This status variable tracks whether the session is performing an
* 'Auth Switch' due to a 'COM_CHANGE_USER' packet.
@ -342,8 +349,8 @@ public:
//uint64_t gtid_trxid;
int gtid_hid;
MySQL_STMTs_meta* sess_STMTs_meta;
StmtLongDataHandler* SLDH;
// MySQL_STMTs_meta* sess_STMTs_meta;
// StmtLongDataHandler* SLDH;
Session_Regex** match_regexes;
@ -362,9 +369,9 @@ public:
int handler();
void (*handler_function) (Client_Session<PgSQL_Session*> arg, void*, PtrSize_t* pkt);
PgSQL_Backend* find_backend(int);
PgSQL_Backend* create_backend(int, PgSQL_Data_Stream* _myds = NULL);
PgSQL_Backend* find_or_create_backend(int, PgSQL_Data_Stream* _myds = NULL);
//PgSQL_Backend* find_backend(int);
//PgSQL_Backend* create_backend(int, PgSQL_Data_Stream* _myds = NULL);
//PgSQL_Backend* find_or_create_backend(int, PgSQL_Data_Stream* _myds = NULL);
void SQLite3_to_MySQL(SQLite3_result*, char*, int, MySQL_Protocol*, bool in_transaction = false, bool deprecate_eof_active = false);
void PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* conn, PgSQL_Data_Stream* _myds = NULL);
@ -443,3 +450,4 @@ private:
void* PgSQL_kill_query_thread(void* arg);
#endif /* __CLASS_PGSQL_SESSION_H */
#endif // CLASS_BASE_SESSION_H

@ -482,6 +482,17 @@ enum PROXYSQL_MYSQL_ERR {
ER_PROXYSQL_SRV_NULL_REPLICATION_LAG = 9019,
};
enum proxysql_session_type {
PROXYSQL_SESSION_MYSQL,
PROXYSQL_SESSION_ADMIN,
PROXYSQL_SESSION_STATS,
PROXYSQL_SESSION_SQLITE,
PROXYSQL_SESSION_CLICKHOUSE,
PROXYSQL_SESSION_MYSQL_EMU,
PROXYSQL_SESSION_PGSQL,
PROXYSQL_SESSION_NONE
};
#endif /* PROXYSQL_ENUMS */

@ -1,7 +1,97 @@
#include "Base_Session.h"
#include "MySQL_PreparedStatement.h"
#include "MySQL_Data_Stream.h"
#include "PgSQL_Data_Stream.h"
// Explicitly instantiate the required template class and member functions
template void Base_Session::init<MySQL_Session>();
template void Base_Session::init<PgSQL_Session>();
template MySQL_Backend * Base_Session::find_backend<MySQL_Backend,MySQL_Session>(int);
template PgSQL_Backend * Base_Session::find_backend<PgSQL_Backend,PgSQL_Session>(int);
template MySQL_Backend * Base_Session::create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(int, MySQL_Data_Stream *);
template PgSQL_Backend * Base_Session::create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(int, PgSQL_Data_Stream *);
template MySQL_Backend * Base_Session::find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(int, MySQL_Data_Stream *);
template PgSQL_Backend * Base_Session::find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(int, PgSQL_Data_Stream *);
Base_Session::Base_Session() {
};
Base_Session::~Base_Session() {
};
template<typename T>
void Base_Session::init() {
transaction_persistent_hostgroup = -1;
transaction_persistent = false;
mybes = new PtrArray(4);
// Conditional initialization based on derived class
if constexpr (std::is_same_v<T, MySQL_Session>) {
sess_STMTs_meta = new MySQL_STMTs_meta();
SLDH = new StmtLongDataHandler();
}
};
template<typename B, typename S>
B * Base_Session::find_backend(int hostgroup_id) {
B *_mybe;
unsigned int i;
for (i=0; i < mybes->len; i++) {
_mybe=(B *)mybes->index(i);
if (_mybe->hostgroup_id==hostgroup_id) {
return _mybe;
}
}
return NULL; // NULL = backend not found
};
/**
* @brief Create a new MySQL backend associated with the specified hostgroup ID and data stream.
*
* This function creates a new MySQL backend object and associates it with the provided hostgroup ID
* and data stream. If the data stream is not provided (_myds is nullptr), a new MySQL_Data_Stream
* object is created and initialized.
*
* @param hostgroup_id The ID of the hostgroup to which the backend belongs.
* @param _myds The MySQL data stream associated with the backend.
* @return A pointer to the newly created MySQL_Backend object.
*/
template<typename B, typename S, typename D>
B * Base_Session::create_backend(int hostgroup_id, D *_myds) {
B *_mybe = new B();
proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe);
_mybe->hostgroup_id=hostgroup_id;
if (_myds) {
_mybe->server_myds=_myds;
} else {
_mybe->server_myds = new D();
_mybe->server_myds->DSS=STATE_NOT_INITIALIZED;
_mybe->server_myds->init(MYDS_BACKEND_NOT_CONNECTED, static_cast<S*>(this), 0);
}
// the newly created backend is added to the session's list of backends (mybes) and a pointer to it is returned.
mybes->add(_mybe);
return _mybe;
};
/**
* @brief Find or create a MySQL backend associated with the specified hostgroup ID and data stream.
*
* This function first attempts to find an existing MySQL backend associated with the provided
* hostgroup ID. If a backend is found, its pointer is returned. Otherwise, a new MySQL backend
* is created and associated with the hostgroup ID and data stream. If the data stream is not provided
* (_myds is nullptr), a new MySQL_Data_Stream object is created and initialized for the new backend.
*
* @param hostgroup_id The ID of the hostgroup to which the backend belongs.
* @param _myds The MySQL data stream associated with the backend.
* @return A pointer to the MySQL_Backend object found or created.
*/
template<typename B, typename S, typename D>
B * Base_Session::find_or_create_backend(int hostgroup_id, D *_myds) {
B * _mybe = find_backend<B,S>(hostgroup_id);
proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe);
// The pointer to the found or newly created backend is returned.
return ( _mybe ? _mybe : create_backend<B,S,D>(hostgroup_id, _myds) );
};

@ -589,14 +589,6 @@ bool Query_Info::is_select_NOT_for_update() {
return true;
}
void * MySQL_Session::operator new(size_t size) {
return l_alloc(size);
}
void MySQL_Session::operator delete(void *ptr) {
l_free(sizeof(MySQL_Session),ptr);
}
void MySQL_Session::set_status(enum session_status e) {
if (e==session_status___NONE) {
@ -678,7 +670,7 @@ MySQL_Session::MySQL_Session() {
match_regexes=NULL;
init(); // we moved this out to allow CHANGE_USER
init<MySQL_Session>(); // we moved this out to allow CHANGE_USER
last_insert_id=0; // #1093
@ -687,17 +679,6 @@ MySQL_Session::MySQL_Session() {
use_ldap_auth = false;
}
/**
* @brief Initializes the MySQL session.
*/
void MySQL_Session::init() {
transaction_persistent_hostgroup=-1;
transaction_persistent=false;
mybes= new PtrArray(4);
sess_STMTs_meta=new MySQL_STMTs_meta();
SLDH=new StmtLongDataHandler();
}
/**
* @brief Resets the MySQL session to its initial state.
*/
@ -803,24 +784,6 @@ MySQL_Session::~MySQL_Session() {
}
/**
* @brief Find a backend associated with the specified hostgroup ID.
*
* @param hostgroup_id The ID of the hostgroup to search for.
* @return A pointer to the MySQL backend associated with the specified hostgroup ID, or nullptr if not found.
*/
MySQL_Backend * MySQL_Session::find_backend(int hostgroup_id) {
MySQL_Backend *_mybe;
unsigned int i;
for (i=0; i < mybes->len; i++) {
_mybe=(MySQL_Backend *)mybes->index(i);
if (_mybe->hostgroup_id==hostgroup_id) {
return _mybe;
}
}
return NULL; // NULL = backend not found
};
/**
* @brief Update expired connections based on specified checks.
*
@ -859,52 +822,6 @@ void MySQL_Session::update_expired_conns(const vector<function<bool(MySQL_Connec
}
}
/**
* @brief Create a new MySQL backend associated with the specified hostgroup ID and data stream.
*
* This function creates a new MySQL backend object and associates it with the provided hostgroup ID
* and data stream. If the data stream is not provided (_myds is nullptr), a new MySQL_Data_Stream
* object is created and initialized.
*
* @param hostgroup_id The ID of the hostgroup to which the backend belongs.
* @param _myds The MySQL data stream associated with the backend.
* @return A pointer to the newly created MySQL_Backend object.
*/
MySQL_Backend * MySQL_Session::create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) {
MySQL_Backend *_mybe=new MySQL_Backend();
proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe);
_mybe->hostgroup_id=hostgroup_id;
if (_myds) {
_mybe->server_myds=_myds;
} else {
_mybe->server_myds = new MySQL_Data_Stream();
_mybe->server_myds->DSS=STATE_NOT_INITIALIZED;
_mybe->server_myds->init(MYDS_BACKEND_NOT_CONNECTED, this, 0);
}
// the newly created backend is added to the session's list of backends (mybes) and a pointer to it is returned.
mybes->add(_mybe);
return _mybe;
};
/**
* @brief Find or create a MySQL backend associated with the specified hostgroup ID and data stream.
*
* This function first attempts to find an existing MySQL backend associated with the provided
* hostgroup ID. If a backend is found, its pointer is returned. Otherwise, a new MySQL backend
* is created and associated with the hostgroup ID and data stream. If the data stream is not provided
* (_myds is nullptr), a new MySQL_Data_Stream object is created and initialized for the new backend.
*
* @param hostgroup_id The ID of the hostgroup to which the backend belongs.
* @param _myds The MySQL data stream associated with the backend.
* @return A pointer to the MySQL_Backend object found or created.
*/
MySQL_Backend * MySQL_Session::find_or_create_backend(int hostgroup_id, MySQL_Data_Stream *_myds) {
MySQL_Backend *_mybe=find_backend(hostgroup_id);
proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe);
// The pointer to the found or newly created backend is returned.
return ( _mybe ? _mybe : create_backend(hostgroup_id, _myds) );
};
/**
* @brief Reset all MySQL backends associated with this session.
*
@ -3562,7 +3479,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe=find_or_create_backend(current_hostgroup);
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
if (client_myds->myconn->local_stmts==NULL) {
client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true);
}
@ -3591,7 +3508,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
CurrentQuery.end_time=thread->curtime;
CurrentQuery.end();
} else {
mybe=find_or_create_backend(current_hostgroup);
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
status=PROCESSING_STMT_PREPARE;
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until=0;
@ -3735,7 +3652,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe=find_or_create_backend(current_hostgroup);
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
status=PROCESSING_STMT_EXECUTE;
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until=0;
@ -4067,7 +3984,7 @@ int MySQL_Session::GPFC_WaitingClientData_FastForwardSession(PtrSize_t& pkt) {
return handler_ret;
}
mybe=find_or_create_backend(current_hostgroup); // set a backend
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection
@ -4172,7 +4089,7 @@ void MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigne
// forward before receiving the command. This way the state machine will
// handle the command automatically.
current_hostgroup = previous_hostgroup;
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe = find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
@ -4481,7 +4398,7 @@ __get_pkts_from_client:
}
}
}
mybe=find_or_create_backend(current_hostgroup);
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
status=PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure=mysql_thread___query_retries_on_failure;
@ -5062,7 +4979,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
void MySQL_Session::housekeeping_before_pkts() {
if (mysql_thread___multiplexing) {
for (const int hg_id : hgs_expired_conns) {
MySQL_Backend* mybe = find_backend(hg_id);
MySQL_Backend* mybe = find_backend<MySQL_Backend,MySQL_Session>(hg_id);
if (mybe != nullptr) {
MySQL_Data_Stream* myds = mybe->server_myds;
@ -7187,7 +7104,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// we need to try to execute it where the last write was successful
if (last_HG_affected_rows >= 0) {
MySQL_Backend * _mybe = NULL;
_mybe = find_backend(last_HG_affected_rows);
_mybe = find_backend<MySQL_Backend,MySQL_Session>(last_HG_affected_rows);
if (_mybe) {
if (_mybe->server_myds) {
if (_mybe->server_myds->myconn) {
@ -7345,7 +7262,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
//if (session_type == PROXYSQL_SESSION_MYSQL) {
if (session_type == PROXYSQL_SESSION_MYSQL || session_type == PROXYSQL_SESSION_SQLITE) {
reset();
init();
init<MySQL_Session>();
if (client_authenticated) {
if (use_ldap_auth == false) {
GloMyAuth->decrease_frontend_user_connections(client_myds->myconn->userinfo->username);
@ -7426,7 +7343,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// Re-initialize the session
reset();
init();
init<MySQL_Session>();
// Recover the relevant session values
this->default_hostgroup = default_hostgroup;
@ -7489,7 +7406,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
gtid_uuid = qpo->min_gtid;
with_gtid = true;
} else if (qpo->gtid_from_hostgroup >= 0) {
_gtid_from_backend = find_backend(qpo->gtid_from_hostgroup);
_gtid_from_backend = find_backend<MySQL_Backend,MySQL_Session>(qpo->gtid_from_hostgroup);
if (_gtid_from_backend) {
if (_gtid_from_backend->gtid_uuid[0]) {
gtid_uuid = _gtid_from_backend->gtid_uuid;
@ -8109,7 +8026,7 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_
// we create a brand new session, a new data stream, and attach the connection to it
MySQL_Session * new_sess = new MySQL_Session();
new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid);
new_sess->mybe = new_sess->find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(mc->parent->myhgc->hid);
new_myds = new_sess->mybe->server_myds;
new_myds->attach_connection(mc);
@ -8518,7 +8435,7 @@ void MySQL_Session::reset_warning_hostgroup_flag_and_release_connection() {
// if we've reached this point, it means that warning was found in the previous query, but the
// current executed query is not 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', so we can safely reset warning_in_hg and
// return connection back to the connection pool.
MySQL_Backend* _mybe = find_backend(warning_in_hg);
MySQL_Backend* _mybe = find_backend<MySQL_Backend,MySQL_Session>(warning_in_hg);
if (_mybe) {
MySQL_Data_Stream* myds = _mybe->server_myds;
if (myds && myds->myconn) {

@ -2984,7 +2984,7 @@ void MySQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
MySQL_Data_Stream *myds;
MySQL_Connection *mc=my_idle_conns[i];
MySQL_Session *sess=new MySQL_Session();
sess->mybe=sess->find_or_create_backend(mc->parent->myhgc->hid);
sess->mybe=sess->find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(mc->parent->myhgc->hid);
myds=sess->mybe->server_myds;
myds->attach_connection(mc);

@ -522,15 +522,6 @@ bool PgSQL_Query_Info::is_select_NOT_for_update() {
return true;
}
void* PgSQL_Session::operator new(size_t size) {
return l_alloc(size);
}
void PgSQL_Session::operator delete(void* ptr) {
l_free(sizeof(PgSQL_Session), ptr);
}
void PgSQL_Session::set_status(enum session_status e) {
if (e == session_status___NONE) {
if (mybe) {
@ -609,7 +600,7 @@ PgSQL_Session::PgSQL_Session() {
match_regexes = NULL;
init(); // we moved this out to allow CHANGE_USER
init<PgSQL_Session>(); // we moved this out to allow CHANGE_USER
last_insert_id = 0; // #1093
@ -618,14 +609,6 @@ PgSQL_Session::PgSQL_Session() {
use_ldap_auth = false;
}
void PgSQL_Session::init() {
transaction_persistent_hostgroup = -1;
transaction_persistent = false;
mybes = new PtrArray(4);
sess_STMTs_meta = new MySQL_STMTs_meta();
SLDH = new StmtLongDataHandler();
}
void PgSQL_Session::reset() {
autocommit = true;
autocommit_handled = false;
@ -725,20 +708,6 @@ PgSQL_Session::~PgSQL_Session() {
}
}
// scan the pointer array of pgsql backends (mybes) looking for a backend for the specified hostgroup_id
PgSQL_Backend* PgSQL_Session::find_backend(int hostgroup_id) {
PgSQL_Backend* _mybe;
unsigned int i;
for (i = 0; i < mybes->len; i++) {
_mybe = (PgSQL_Backend*)mybes->index(i);
if (_mybe->hostgroup_id == hostgroup_id) {
return _mybe;
}
}
return NULL; // NULL = backend not found
};
void PgSQL_Session::update_expired_conns(const vector<function<bool(PgSQL_Connection*)>>& checks) {
for (uint32_t i = 0; i < mybes->len; i++) {
PgSQL_Backend* mybe = static_cast<PgSQL_Backend*>(mybes->index(i));
@ -763,28 +732,6 @@ void PgSQL_Session::update_expired_conns(const vector<function<bool(PgSQL_Connec
}
}
PgSQL_Backend* PgSQL_Session::create_backend(int hostgroup_id, PgSQL_Data_Stream* _myds) {
PgSQL_Backend* _mybe = new PgSQL_Backend();
proxy_debug(PROXY_DEBUG_NET, 4, "HID=%d, _myds=%p, _mybe=%p\n", hostgroup_id, _myds, _mybe);
_mybe->hostgroup_id = hostgroup_id;
if (_myds) {
_mybe->server_myds = _myds;
}
else {
_mybe->server_myds = new PgSQL_Data_Stream();
_mybe->server_myds->DSS = STATE_NOT_INITIALIZED;
_mybe->server_myds->init(MYDS_BACKEND_NOT_CONNECTED, this, 0);
}
mybes->add(_mybe);
return _mybe;
};
PgSQL_Backend* PgSQL_Session::find_or_create_backend(int hostgroup_id, PgSQL_Data_Stream* _myds) {
PgSQL_Backend* _mybe = find_backend(hostgroup_id);
proxy_debug(PROXY_DEBUG_NET, 4, "HID=%d, _myds=%p, _mybe=%p\n", hostgroup_id, _myds, _mybe);
return (_mybe ? _mybe : create_backend(hostgroup_id, _myds));
};
void PgSQL_Session::reset_all_backends() {
PgSQL_Backend* mybe;
while (mybes->len) {
@ -3400,7 +3347,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe = find_or_create_backend(current_hostgroup);
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
if (client_myds->myconn->local_stmts == NULL) {
client_myds->myconn->local_stmts = new MySQL_STMTs_local_v14(true);
}
@ -3430,7 +3377,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
CurrentQuery.end();
}
else {
mybe = find_or_create_backend(current_hostgroup);
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
status = PROCESSING_STMT_PREPARE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
@ -3572,7 +3519,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe = find_or_create_backend(current_hostgroup);
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
status = PROCESSING_STMT_EXECUTE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
@ -3914,7 +3861,7 @@ __get_pkts_from_client:
return handler_ret;
}
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection
@ -4109,7 +4056,7 @@ __get_pkts_from_client:
}
}
}
mybe = find_or_create_backend(current_hostgroup);
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure;
@ -4327,7 +4274,7 @@ __get_pkts_from_client:
}
}
}
mybe = find_or_create_backend(current_hostgroup);
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure;
@ -4435,7 +4382,7 @@ __get_pkts_from_client:
// forward before receiving the command. This way the state machine will
// handle the command automatically.
current_hostgroup = previous_hostgroup;
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
@ -4981,7 +4928,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA() {
void PgSQL_Session::housekeeping_before_pkts() {
if (pgsql_thread___multiplexing) {
for (const int hg_id : hgs_expired_conns) {
PgSQL_Backend* mybe = find_backend(hg_id);
PgSQL_Backend* mybe = find_backend<PgSQL_Backend,PgSQL_Session>(hg_id);
if (mybe != nullptr) {
PgSQL_Data_Stream* myds = mybe->server_myds;
@ -7216,7 +7163,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// we need to try to execute it where the last write was successful
if (last_HG_affected_rows >= 0) {
PgSQL_Backend* _mybe = NULL;
_mybe = find_backend(last_HG_affected_rows);
_mybe = find_backend<PgSQL_Backend,PgSQL_Session>(last_HG_affected_rows);
if (_mybe) {
if (_mybe->server_myds) {
if (_mybe->server_myds->myconn) {
@ -7375,7 +7322,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
//if (session_type == PROXYSQL_SESSION_PGSQL) {
if (session_type == PROXYSQL_SESSION_PGSQL || session_type == PROXYSQL_SESSION_SQLITE) {
reset();
init();
init<PgSQL_Session>();
if (client_authenticated) {
if (use_ldap_auth == false) {
GloPgAuth->decrease_frontend_user_connections(client_myds->myconn->userinfo->username);
@ -7460,7 +7407,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// Re-initialize the session
reset();
init();
init<PgSQL_Session>();
// Recover the relevant session values
this->default_hostgroup = default_hostgroup;
@ -7525,7 +7472,7 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
with_gtid = true;
}
else if (qpo->gtid_from_hostgroup >= 0) {
_gtid_from_backend = find_backend(qpo->gtid_from_hostgroup);
_gtid_from_backend = find_backend<PgSQL_Backend,PgSQL_Session>(qpo->gtid_from_hostgroup);
if (_gtid_from_backend) {
if (_gtid_from_backend->gtid_uuid[0]) {
gtid_uuid = _gtid_from_backend->gtid_uuid;
@ -8198,7 +8145,7 @@ void PgSQL_Session::create_new_session_and_reset_connection(PgSQL_Data_Stream* _
// we create a brand new session, a new data stream, and attach the connection to it
PgSQL_Session* new_sess = new PgSQL_Session();
new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid);
new_sess->mybe = new_sess->find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(mc->parent->myhgc->hid);
new_myds = new_sess->mybe->server_myds;
new_myds->attach_connection(mc);
@ -8591,7 +8538,7 @@ void PgSQL_Session::reset_warning_hostgroup_flag_and_release_connection()
// if we've reached this point, it means that warning was found in the previous query, but the
// current executed query is not 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', so we can safely reset warning_in_hg and
// return connection back to the connection pool.
PgSQL_Backend* _mybe = find_backend(warning_in_hg);
PgSQL_Backend* _mybe = find_backend<PgSQL_Backend,PgSQL_Session>(warning_in_hg);
if (_mybe) {
PgSQL_Data_Stream* myds = _mybe->server_myds;
if (myds && myds->myconn) {

@ -2849,7 +2849,7 @@ void PgSQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
PgSQL_Data_Stream* myds;
PgSQL_Connection* mc = my_idle_conns[i];
PgSQL_Session* sess = new PgSQL_Session();
sess->mybe = sess->find_or_create_backend(mc->parent->myhgc->hid);
sess->mybe = sess->find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(mc->parent->myhgc->hid);
myds = sess->mybe->server_myds;
myds->attach_connection(mc);

Loading…
Cancel
Save