Using template for writeout()

v2.x_pg_PrepStmtBase_240714
Rene Cannao 2 years ago
parent ae1e088d82
commit 649c985384

@ -1,4 +1,4 @@
class Base_Session;
template<typename S, typename DSi, typename B, typename T> class Base_Session;
//// avoid loading definition of MySQL_Session and PgSQL_Session
//#define __CLASS_MYSQL_SESSION_H
@ -17,6 +17,7 @@ class StmtLongDataHandler;
class MySQL_Session;
class PgSQL_Session;
template<typename S, typename DS, typename B, typename T>
class Base_Session {
public:
Base_Session();
@ -29,7 +30,10 @@ class Base_Session {
unsigned long long idle_since;
unsigned long long transaction_started_at;
T * thread;
B *mybe;
PtrArray *mybes;
DS * client_myds;
/*
* @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the
* maintenance thread. These values will be used to release the retained connections in the specific
@ -90,10 +94,14 @@ class Base_Session {
template <typename T> void init();
template<typename B, typename S> B * find_backend(int hostgroup_id);
template<typename B, typename S, typename D> B * create_backend(int, D * _myds = NULL);
template<typename B, typename S, typename D> B * find_or_create_backend(int, D * _myds = NULL);
void init();
//template<typename B> B * find_backend(int hostgroup_id);
//template<typename B> B * create_backend(int, DS * _myds = NULL);
//template<typename B> B * find_or_create_backend(int, DS * _myds = NULL);
B * find_backend(int hostgroup_id);
B * create_backend(int, DS * _myds = NULL);
B * find_or_create_backend(int, DS * _myds = NULL);
void writeout();
};
#endif // CLASS_BASE_SESSION_H

@ -106,7 +106,7 @@ class Query_Info {
* This class is central to ProxySQL's handling of client connections. It manages the lifecycle
* of a session, processes queries, and communicates with backend MySQL servers.
*/
class MySQL_Session: public Base_Session
class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL_Backend, MySQL_Thread>
{
private:
//int handler_ret;
@ -284,17 +284,17 @@ class MySQL_Session: public Base_Session
unsigned long long idle_since;
unsigned long long transaction_started_at;
#endif // 0
// pointers
MySQL_Thread *thread;
#endif // 0
Query_Processor_Output *qpo;
StatCounters *command_counters;
MySQL_Backend *mybe;
#if 0
MySQL_Backend *mybe;
PtrArray *mybes;
#endif // 0
MySQL_Data_Stream *client_myds;
#endif // 0
MySQL_Data_Stream *server_myds;
#if 0
/*
@ -417,7 +417,7 @@ class MySQL_Session: public Base_Session
unsigned long long IdleTime();
void reset_all_backends();
void writeout();
//void writeout();
void Memory_Stats();
void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds);
bool handle_command_query_kill(PtrSize_t *);

@ -101,7 +101,7 @@ public:
bool is_select_NOT_for_update();
};
class PgSQL_Session : public Base_Session {
class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQL_Backend, PgSQL_Thread> {
private:
//int handler_ret;
void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t*, bool*);
@ -262,17 +262,17 @@ public:
unsigned long long idle_since;
unsigned long long transaction_started_at;
#endif // 0
// pointers
PgSQL_Thread* thread;
#endif // 0
Query_Processor_Output* qpo;
StatCounters* command_counters;
PgSQL_Backend* mybe;
#if 0
PgSQL_Backend* mybe;
PtrArray* mybes;
#endif // 0
PgSQL_Data_Stream* client_myds;
#endif // 0
PgSQL_Data_Stream* server_myds;
#if 0
/*
@ -395,7 +395,7 @@ public:
unsigned long long IdleTime();
void reset_all_backends();
void writeout();
//void writeout();
void Memory_Stats();
void create_new_session_and_reset_connection(PgSQL_Data_Stream* _myds);
bool handle_command_query_kill(PtrSize_t*);

@ -5,8 +5,23 @@
#include "PgSQL_Data_Stream.h"
// Explicitly instantiate the required template class and member functions
template void Base_Session::init<MySQL_Session>();
template void Base_Session::init<PgSQL_Session>();
template void Base_Session<MySQL_Session,MySQL_Data_Stream,MySQL_Backend,MySQL_Thread>::init();
template void Base_Session<PgSQL_Session,PgSQL_Data_Stream,PgSQL_Backend,PgSQL_Thread>::init();
template Base_Session<MySQL_Session,MySQL_Data_Stream,MySQL_Backend,MySQL_Thread>::Base_Session();
template Base_Session<PgSQL_Session,PgSQL_Data_Stream,PgSQL_Backend,PgSQL_Thread>::Base_Session();
template Base_Session<MySQL_Session,MySQL_Data_Stream,MySQL_Backend,MySQL_Thread>::~Base_Session();
template Base_Session<PgSQL_Session,PgSQL_Data_Stream,PgSQL_Backend,PgSQL_Thread>::~Base_Session();
//template Base_Session<MySQL_Session,MySQL_Data_Stream>::Base_Session();
//emplate Base_Session<PgSQL_Session,PgSQL_Data_Stream>::Base_Session();
//template void Base_Session::init<MySQL_Session>();
//template void Base_Session::init<PgSQL_Session>();
template MySQL_Backend * Base_Session<MySQL_Session,MySQL_Data_Stream,MySQL_Backend,MySQL_Thread>::find_backend(int);
template PgSQL_Backend * Base_Session<PgSQL_Session,PgSQL_Data_Stream,PgSQL_Backend,PgSQL_Thread>::find_backend(int);
/*
template MySQL_Backend * Base_Session::find_backend<MySQL_Backend,MySQL_Session>(int);
template PgSQL_Backend * Base_Session::find_backend<PgSQL_Backend,PgSQL_Session>(int);
@ -14,28 +29,36 @@ template MySQL_Backend * Base_Session::create_backend<MySQL_Backend,MySQL_Sessio
template PgSQL_Backend * Base_Session::create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(int, PgSQL_Data_Stream *);
template MySQL_Backend * Base_Session::find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(int, MySQL_Data_Stream *);
template PgSQL_Backend * Base_Session::find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(int, PgSQL_Data_Stream *);
*/
template MySQL_Backend * Base_Session<MySQL_Session,MySQL_Data_Stream,MySQL_Backend,MySQL_Thread>::find_or_create_backend(int, MySQL_Data_Stream *);
template PgSQL_Backend * Base_Session<PgSQL_Session,PgSQL_Data_Stream,PgSQL_Backend,PgSQL_Thread>::find_or_create_backend(int, PgSQL_Data_Stream *);
Base_Session::Base_Session() {
template void Base_Session<MySQL_Session,MySQL_Data_Stream,MySQL_Backend,MySQL_Thread>::writeout();
template void Base_Session<PgSQL_Session,PgSQL_Data_Stream,PgSQL_Backend,PgSQL_Thread>::writeout();
template<typename S, typename DS, typename B, typename T>
Base_Session<S,DS,B,T>::Base_Session() {
};
Base_Session::~Base_Session() {
template<typename S, typename DS, typename B, typename T>
Base_Session<S,DS,B,T>::~Base_Session() {
};
template<typename T>
void Base_Session::init() {
template<typename S, typename DS, typename B, typename T>
void Base_Session<S,DS,B,T>::init() {
transaction_persistent_hostgroup = -1;
transaction_persistent = false;
mybes = new PtrArray(4);
// Conditional initialization based on derived class
if constexpr (std::is_same_v<T, MySQL_Session>) {
if constexpr (std::is_same_v<S, MySQL_Session>) {
sess_STMTs_meta = new MySQL_STMTs_meta();
SLDH = new StmtLongDataHandler();
}
};
template<typename B, typename S>
B * Base_Session::find_backend(int hostgroup_id) {
template<typename S, typename DS, typename B, typename T>
B * Base_Session<S,DS,B,T>::find_backend(int hostgroup_id) {
B *_mybe;
unsigned int i;
for (i=0; i < mybes->len; i++) {
@ -58,15 +81,15 @@ B * Base_Session::find_backend(int hostgroup_id) {
* @param _myds The MySQL data stream associated with the backend.
* @return A pointer to the newly created MySQL_Backend object.
*/
template<typename B, typename S, typename D>
B * Base_Session::create_backend(int hostgroup_id, D *_myds) {
template<typename S, typename DS, typename B, typename T>
B * Base_Session<S,DS,B,T>::create_backend(int hostgroup_id, DS *_myds) {
B *_mybe = new B();
proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe);
_mybe->hostgroup_id=hostgroup_id;
if (_myds) {
_mybe->server_myds=_myds;
} else {
_mybe->server_myds = new D();
_mybe->server_myds = new DS();
_mybe->server_myds->DSS=STATE_NOT_INITIALIZED;
_mybe->server_myds->init(MYDS_BACKEND_NOT_CONNECTED, static_cast<S*>(this), 0);
}
@ -87,11 +110,167 @@ B * Base_Session::create_backend(int hostgroup_id, D *_myds) {
* @param _myds The MySQL data stream associated with the backend.
* @return A pointer to the MySQL_Backend object found or created.
*/
template<typename B, typename S, typename D>
B * Base_Session::find_or_create_backend(int hostgroup_id, D *_myds) {
B * _mybe = find_backend<B,S>(hostgroup_id);
template<typename S, typename DS, typename B, typename T>
B * Base_Session<S,DS,B,T>::find_or_create_backend(int hostgroup_id, DS *_myds) {
B * _mybe = find_backend(hostgroup_id);
proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe);
// The pointer to the found or newly created backend is returned.
return ( _mybe ? _mybe : create_backend<B,S,D>(hostgroup_id, _myds) );
return ( _mybe ? _mybe : create_backend(hostgroup_id, _myds) );
};
/**
* @brief Writes data from the session to the network with optional throttling and flow control.
*
* The writeout() function in the MySQL_Session class is responsible for writing data from the session to the network.
* It supports throttling, which limits the rate at which data is sent to the client. Throttling is controlled by the
* mysql_thread___throttle_max_bytes_per_second_to_client configuration parameter. If throttling is disabled (the parameter
* is set to 0), the function bypasses throttling.
*
* This function first ensures that any pending data in the session's data stream (client_myds) is written to the network.
* This ensures that the network buffers are emptied, allowing new data to be sent.
*
* After writing data to the network, the function checks if flow control is necessary. If the total amount of data written
* exceeds the maximum allowed per call (mwpl), or if the data is sent too quickly, the function pauses writing for a brief
* period to control the flow of data.
*
* If throttling is enabled, the function adjusts the throttle based on the amount of data written and the configured maximum
* bytes per second. If the current throughput exceeds the configured limit, the function increases the pause duration to
* regulate the flow of data.
*
* Finally, if the session has a backend associated with it (mybe), and the backend has a server data stream (server_myds),
* the function also writes data from the server data stream to the network.
*
* @note This function assumes that necessary session and network structures are properly initialized.
*
* @see mysql_thread___throttle_max_bytes_per_second_to_client
* @see MySQL_Session::client_myds
* @see MySQL_Session::mybe
* @see MySQL_Backend::server_myds
*/
template<typename S, typename DS, typename B, typename T>
void Base_Session<S,DS,B,T>::writeout() {
int tps = 10; // throttling per second , by default every 100ms
int total_written = 0;
unsigned long long last_sent_=0;
int tmbpstc = 0; // throttle_max_bytes_per_second_to_client
if constexpr (std::is_same<S, MySQL_Session>::value) {
tmbpstc = mysql_thread___throttle_max_bytes_per_second_to_client;
} else if constexpr (std::is_same<S, PgSQL_Session>::value) {
tmbpstc = pgsql_thread___throttle_max_bytes_per_second_to_client;
} else {
assert(0);
}
bool disable_throttle = tmbpstc == 0;
int mwpl = tmbpstc; // max writes per call
mwpl = mwpl/tps;
// logic to disable throttling
if constexpr (std::is_same<S, MySQL_Session>::value) {
if (session_type!=PROXYSQL_SESSION_MYSQL) {
disable_throttle = true;
}
}
if constexpr (std::is_same<S, PgSQL_Session>::value) {
if (session_type != PROXYSQL_SESSION_PGSQL) {
disable_throttle = true;
}
}
if (client_myds && thread->curtime >= client_myds->pause_until) {
if (mirror==false) {
bool runloop=false;
if (client_myds->mypolls) {
last_sent_ = client_myds->mypolls->last_sent[client_myds->poll_fds_idx];
}
int retbytes=client_myds->write_to_net_poll();
total_written+=retbytes;
if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat
runloop=true;
}
while (runloop && (disable_throttle || total_written < mwpl)) {
runloop=false; // the default
client_myds->array2buffer_full();
struct pollfd fds;
fds.fd=client_myds->fd;
fds.events=POLLOUT;
fds.revents=0;
int retpoll=poll(&fds, 1, 0);
if (retpoll>0) {
if (fds.revents==POLLOUT) {
retbytes=client_myds->write_to_net_poll();
total_written+=retbytes;
if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat
runloop=true;
}
}
}
}
}
}
// flow control
if (!disable_throttle && total_written > 0) {
if (total_written > mwpl) {
unsigned long long add_ = 1000000 / tps + 1000000 / tps * ((unsigned long long)total_written - (unsigned long long)mwpl) / mwpl;
pause_until = thread->curtime + add_;
client_myds->remove_pollout();
client_myds->pause_until = thread->curtime + add_;
}
else {
if (total_written >= QUEUE_T_DEFAULT_SIZE) {
unsigned long long time_diff = thread->curtime - last_sent_;
if (time_diff == 0) { // sending data really too fast!
unsigned long long add_ = 1000000 / tps + 1000000 / tps * ((unsigned long long)total_written - (unsigned long long)mwpl) / mwpl;
pause_until = thread->curtime + add_;
client_myds->remove_pollout();
client_myds->pause_until = thread->curtime + add_;
}
else {
float current_Bps = (float)total_written * 1000 * 1000 / time_diff;
if (current_Bps > tmbpstc) {
unsigned long long add_ = 1000000 / tps;
pause_until = thread->curtime + add_;
assert(pause_until > thread->curtime);
client_myds->remove_pollout();
client_myds->pause_until = thread->curtime + add_;
}
}
}
}
}
if (mybe) {
if (mybe->server_myds) mybe->server_myds->write_to_net_poll();
}
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this);
}
#if 0
void MySQL_Session::writeout() {
if (client_myds) client_myds->array2buffer_full();
if (mybe && mybe->server_myds && mybe->server_myds->myds_type==MYDS_BACKEND) {
if (session_type==PROXYSQL_SESSION_MYSQL) {
if (mybe->server_myds->net_failure==false) {
if (mybe->server_myds->poll_fds_idx>-1) { // NOTE: attempt to force writes
mybe->server_myds->array2buffer_full();
}
}
} else {
mybe->server_myds->array2buffer_full();
}
}
void PgSQL_Session::writeout() {
if (client_myds) client_myds->array2buffer_full();
if (mybe && mybe->server_myds && mybe->server_myds->myds_type == MYDS_BACKEND) {
if (session_type == PROXYSQL_SESSION_PGSQL) {
if (mybe->server_myds->net_failure == false) {
if (mybe->server_myds->poll_fds_idx > -1) { // NOTE: attempt to force writes
mybe->server_myds->array2buffer_full();
}
}
}
else {
mybe->server_myds->array2buffer_full();
}
}
#endif // 0

@ -670,7 +670,7 @@ MySQL_Session::MySQL_Session() {
match_regexes=NULL;
init<MySQL_Session>(); // we moved this out to allow CHANGE_USER
init(); // we moved this out to allow CHANGE_USER
last_insert_id=0; // #1093
@ -838,6 +838,7 @@ void MySQL_Session::reset_all_backends() {
}
};
#if 0
/**
* @brief Writes data from the session to the network with optional throttling and flow control.
*
@ -955,6 +956,7 @@ void MySQL_Session::writeout() {
}
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this);
}
#endif // 0
/**
* @brief Handles COMMIT or ROLLBACK commands received from the client.
@ -3479,7 +3481,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
mybe=find_or_create_backend(current_hostgroup);
if (client_myds->myconn->local_stmts==NULL) {
client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true);
}
@ -3508,7 +3510,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
CurrentQuery.end_time=thread->curtime;
CurrentQuery.end();
} else {
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
mybe=find_or_create_backend(current_hostgroup);
status=PROCESSING_STMT_PREPARE;
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until=0;
@ -3652,7 +3654,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
mybe=find_or_create_backend(current_hostgroup);
status=PROCESSING_STMT_EXECUTE;
mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until=0;
@ -3984,7 +3986,7 @@ int MySQL_Session::GPFC_WaitingClientData_FastForwardSession(PtrSize_t& pkt) {
return handler_ret;
}
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup); // set a backend
mybe=find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection
@ -4089,7 +4091,7 @@ void MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigne
// forward before receiving the command. This way the state machine will
// handle the command automatically.
current_hostgroup = previous_hostgroup;
mybe = find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup); // set a backend
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
@ -4398,7 +4400,7 @@ __get_pkts_from_client:
}
}
}
mybe=find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(current_hostgroup);
mybe=find_or_create_backend(current_hostgroup);
status=PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure=mysql_thread___query_retries_on_failure;
@ -4979,7 +4981,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() {
void MySQL_Session::housekeeping_before_pkts() {
if (mysql_thread___multiplexing) {
for (const int hg_id : hgs_expired_conns) {
MySQL_Backend* mybe = find_backend<MySQL_Backend,MySQL_Session>(hg_id);
MySQL_Backend* mybe = find_backend(hg_id);
if (mybe != nullptr) {
MySQL_Data_Stream* myds = mybe->server_myds;
@ -7104,7 +7106,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// we need to try to execute it where the last write was successful
if (last_HG_affected_rows >= 0) {
MySQL_Backend * _mybe = NULL;
_mybe = find_backend<MySQL_Backend,MySQL_Session>(last_HG_affected_rows);
_mybe = find_backend(last_HG_affected_rows);
if (_mybe) {
if (_mybe->server_myds) {
if (_mybe->server_myds->myconn) {
@ -7262,7 +7264,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
//if (session_type == PROXYSQL_SESSION_MYSQL) {
if (session_type == PROXYSQL_SESSION_MYSQL || session_type == PROXYSQL_SESSION_SQLITE) {
reset();
init<MySQL_Session>();
init();
if (client_authenticated) {
if (use_ldap_auth == false) {
GloMyAuth->decrease_frontend_user_connections(client_myds->myconn->userinfo->username);
@ -7343,7 +7345,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// Re-initialize the session
reset();
init<MySQL_Session>();
init();
// Recover the relevant session values
this->default_hostgroup = default_hostgroup;
@ -7406,7 +7408,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
gtid_uuid = qpo->min_gtid;
with_gtid = true;
} else if (qpo->gtid_from_hostgroup >= 0) {
_gtid_from_backend = find_backend<MySQL_Backend,MySQL_Session>(qpo->gtid_from_hostgroup);
_gtid_from_backend = find_backend(qpo->gtid_from_hostgroup);
if (_gtid_from_backend) {
if (_gtid_from_backend->gtid_uuid[0]) {
gtid_uuid = _gtid_from_backend->gtid_uuid;
@ -8026,7 +8028,7 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_
// we create a brand new session, a new data stream, and attach the connection to it
MySQL_Session * new_sess = new MySQL_Session();
new_sess->mybe = new_sess->find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(mc->parent->myhgc->hid);
new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid);
new_myds = new_sess->mybe->server_myds;
new_myds->attach_connection(mc);
@ -8435,7 +8437,7 @@ void MySQL_Session::reset_warning_hostgroup_flag_and_release_connection() {
// if we've reached this point, it means that warning was found in the previous query, but the
// current executed query is not 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', so we can safely reset warning_in_hg and
// return connection back to the connection pool.
MySQL_Backend* _mybe = find_backend<MySQL_Backend,MySQL_Session>(warning_in_hg);
MySQL_Backend* _mybe = find_backend(warning_in_hg);
if (_mybe) {
MySQL_Data_Stream* myds = _mybe->server_myds;
if (myds && myds->myconn) {

@ -2984,7 +2984,7 @@ void MySQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
MySQL_Data_Stream *myds;
MySQL_Connection *mc=my_idle_conns[i];
MySQL_Session *sess=new MySQL_Session();
sess->mybe=sess->find_or_create_backend<MySQL_Backend,MySQL_Session,MySQL_Data_Stream>(mc->parent->myhgc->hid);
sess->mybe=sess->find_or_create_backend(mc->parent->myhgc->hid);
myds=sess->mybe->server_myds;
myds->attach_connection(mc);

@ -600,7 +600,7 @@ PgSQL_Session::PgSQL_Session() {
match_regexes = NULL;
init<PgSQL_Session>(); // we moved this out to allow CHANGE_USER
init(); // we moved this out to allow CHANGE_USER
last_insert_id = 0; // #1093
@ -741,6 +741,7 @@ void PgSQL_Session::reset_all_backends() {
}
};
#if 0
void PgSQL_Session::writeout() {
int tps = 10; // throttling per second , by default every 100ms
int total_written = 0;
@ -832,6 +833,7 @@ void PgSQL_Session::writeout() {
}
proxy_debug(PROXY_DEBUG_NET, 1, "Thread=%p, Session=%p -- Writeout Session %p\n", this->thread, this, this);
}
#endif // 0
bool PgSQL_Session::handler_CommitRollback(PtrSize_t* pkt) {
if (pkt->size <= 5) { return false; }
@ -3347,7 +3349,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
mybe = find_or_create_backend(current_hostgroup);
if (client_myds->myconn->local_stmts == NULL) {
client_myds->myconn->local_stmts = new MySQL_STMTs_local_v14(true);
}
@ -3377,7 +3379,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
CurrentQuery.end();
}
else {
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_STMT_PREPARE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
@ -3519,7 +3521,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
}
}
}
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_STMT_EXECUTE;
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
@ -3861,7 +3863,7 @@ __get_pkts_from_client:
return handler_ret;
}
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup); // set a backend
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection
@ -4056,7 +4058,7 @@ __get_pkts_from_client:
}
}
}
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure;
@ -4274,7 +4276,7 @@ __get_pkts_from_client:
}
}
}
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup);
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure;
@ -4382,7 +4384,7 @@ __get_pkts_from_client:
// forward before receiving the command. This way the state machine will
// handle the command automatically.
current_hostgroup = previous_hostgroup;
mybe = find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(current_hostgroup); // set a backend
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
@ -4928,7 +4930,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA() {
void PgSQL_Session::housekeeping_before_pkts() {
if (pgsql_thread___multiplexing) {
for (const int hg_id : hgs_expired_conns) {
PgSQL_Backend* mybe = find_backend<PgSQL_Backend,PgSQL_Session>(hg_id);
PgSQL_Backend* mybe = find_backend(hg_id);
if (mybe != nullptr) {
PgSQL_Data_Stream* myds = mybe->server_myds;
@ -7163,7 +7165,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// we need to try to execute it where the last write was successful
if (last_HG_affected_rows >= 0) {
PgSQL_Backend* _mybe = NULL;
_mybe = find_backend<PgSQL_Backend,PgSQL_Session>(last_HG_affected_rows);
_mybe = find_backend(last_HG_affected_rows);
if (_mybe) {
if (_mybe->server_myds) {
if (_mybe->server_myds->myconn) {
@ -7322,7 +7324,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
//if (session_type == PROXYSQL_SESSION_PGSQL) {
if (session_type == PROXYSQL_SESSION_PGSQL || session_type == PROXYSQL_SESSION_SQLITE) {
reset();
init<PgSQL_Session>();
init();
if (client_authenticated) {
if (use_ldap_auth == false) {
GloPgAuth->decrease_frontend_user_connections(client_myds->myconn->userinfo->username);
@ -7407,7 +7409,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
// Re-initialize the session
reset();
init<PgSQL_Session>();
init();
// Recover the relevant session values
this->default_hostgroup = default_hostgroup;
@ -7472,7 +7474,7 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
with_gtid = true;
}
else if (qpo->gtid_from_hostgroup >= 0) {
_gtid_from_backend = find_backend<PgSQL_Backend,PgSQL_Session>(qpo->gtid_from_hostgroup);
_gtid_from_backend = find_backend(qpo->gtid_from_hostgroup);
if (_gtid_from_backend) {
if (_gtid_from_backend->gtid_uuid[0]) {
gtid_uuid = _gtid_from_backend->gtid_uuid;
@ -8145,7 +8147,7 @@ void PgSQL_Session::create_new_session_and_reset_connection(PgSQL_Data_Stream* _
// we create a brand new session, a new data stream, and attach the connection to it
PgSQL_Session* new_sess = new PgSQL_Session();
new_sess->mybe = new_sess->find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(mc->parent->myhgc->hid);
new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid);
new_myds = new_sess->mybe->server_myds;
new_myds->attach_connection(mc);
@ -8538,7 +8540,7 @@ void PgSQL_Session::reset_warning_hostgroup_flag_and_release_connection()
// if we've reached this point, it means that warning was found in the previous query, but the
// current executed query is not 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', so we can safely reset warning_in_hg and
// return connection back to the connection pool.
PgSQL_Backend* _mybe = find_backend<PgSQL_Backend,PgSQL_Session>(warning_in_hg);
PgSQL_Backend* _mybe = find_backend(warning_in_hg);
if (_mybe) {
PgSQL_Data_Stream* myds = _mybe->server_myds;
if (myds && myds->myconn) {

@ -2849,7 +2849,7 @@ void PgSQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
PgSQL_Data_Stream* myds;
PgSQL_Connection* mc = my_idle_conns[i];
PgSQL_Session* sess = new PgSQL_Session();
sess->mybe = sess->find_or_create_backend<PgSQL_Backend,PgSQL_Session,PgSQL_Data_Stream>(mc->parent->myhgc->hid);
sess->mybe = sess->find_or_create_backend(mc->parent->myhgc->hid);
myds = sess->mybe->server_myds;
myds->attach_connection(mc);

Loading…
Cancel
Save