diff --git a/include/Base_Session.h b/include/Base_Session.h index 4316899db..16c66009c 100644 --- a/include/Base_Session.h +++ b/include/Base_Session.h @@ -1,4 +1,4 @@ -class Base_Session; +template class Base_Session; //// avoid loading definition of MySQL_Session and PgSQL_Session //#define __CLASS_MYSQL_SESSION_H @@ -17,6 +17,7 @@ class StmtLongDataHandler; class MySQL_Session; class PgSQL_Session; +template class Base_Session { public: Base_Session(); @@ -29,7 +30,10 @@ class Base_Session { unsigned long long idle_since; unsigned long long transaction_started_at; + T * thread; + B *mybe; PtrArray *mybes; + DS * client_myds; /* * @brief Store the hostgroups that hold connections that have been flagged as 'expired' by the * maintenance thread. These values will be used to release the retained connections in the specific @@ -90,10 +94,14 @@ class Base_Session { - template void init(); - template B * find_backend(int hostgroup_id); - template B * create_backend(int, D * _myds = NULL); - template B * find_or_create_backend(int, D * _myds = NULL); + void init(); + //template B * find_backend(int hostgroup_id); + //template B * create_backend(int, DS * _myds = NULL); + //template B * find_or_create_backend(int, DS * _myds = NULL); + B * find_backend(int hostgroup_id); + B * create_backend(int, DS * _myds = NULL); + B * find_or_create_backend(int, DS * _myds = NULL); + void writeout(); }; #endif // CLASS_BASE_SESSION_H diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index ddb7a04bd..23511a64b 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -106,7 +106,7 @@ class Query_Info { * This class is central to ProxySQL's handling of client connections. It manages the lifecycle * of a session, processes queries, and communicates with backend MySQL servers. */ -class MySQL_Session: public Base_Session +class MySQL_Session: public Base_Session { private: //int handler_ret; @@ -284,17 +284,17 @@ class MySQL_Session: public Base_Session unsigned long long idle_since; unsigned long long transaction_started_at; -#endif // 0 // pointers MySQL_Thread *thread; +#endif // 0 Query_Processor_Output *qpo; StatCounters *command_counters; - MySQL_Backend *mybe; #if 0 + MySQL_Backend *mybe; PtrArray *mybes; -#endif // 0 MySQL_Data_Stream *client_myds; +#endif // 0 MySQL_Data_Stream *server_myds; #if 0 /* @@ -417,7 +417,7 @@ class MySQL_Session: public Base_Session unsigned long long IdleTime(); void reset_all_backends(); - void writeout(); + //void writeout(); void Memory_Stats(); void create_new_session_and_reset_connection(MySQL_Data_Stream *_myds); bool handle_command_query_kill(PtrSize_t *); diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 4ab975a4d..07909cabe 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -101,7 +101,7 @@ public: bool is_select_NOT_for_update(); }; -class PgSQL_Session : public Base_Session { +class PgSQL_Session : public Base_Session { private: //int handler_ret; void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t*, bool*); @@ -262,17 +262,17 @@ public: unsigned long long idle_since; unsigned long long transaction_started_at; -#endif // 0 // pointers PgSQL_Thread* thread; +#endif // 0 Query_Processor_Output* qpo; StatCounters* command_counters; - PgSQL_Backend* mybe; #if 0 + PgSQL_Backend* mybe; PtrArray* mybes; -#endif // 0 PgSQL_Data_Stream* client_myds; +#endif // 0 PgSQL_Data_Stream* server_myds; #if 0 /* @@ -395,7 +395,7 @@ public: unsigned long long IdleTime(); void reset_all_backends(); - void writeout(); + //void writeout(); void Memory_Stats(); void create_new_session_and_reset_connection(PgSQL_Data_Stream* _myds); bool handle_command_query_kill(PtrSize_t*); diff --git a/lib/Base_Session.cpp b/lib/Base_Session.cpp index ebfa2283b..ed9353769 100644 --- a/lib/Base_Session.cpp +++ b/lib/Base_Session.cpp @@ -5,8 +5,23 @@ #include "PgSQL_Data_Stream.h" // Explicitly instantiate the required template class and member functions -template void Base_Session::init(); -template void Base_Session::init(); +template void Base_Session::init(); +template void Base_Session::init(); + +template Base_Session::Base_Session(); +template Base_Session::Base_Session(); +template Base_Session::~Base_Session(); +template Base_Session::~Base_Session(); + +//template Base_Session::Base_Session(); +//emplate Base_Session::Base_Session(); + +//template void Base_Session::init(); +//template void Base_Session::init(); + +template MySQL_Backend * Base_Session::find_backend(int); +template PgSQL_Backend * Base_Session::find_backend(int); +/* template MySQL_Backend * Base_Session::find_backend(int); template PgSQL_Backend * Base_Session::find_backend(int); @@ -14,28 +29,36 @@ template MySQL_Backend * Base_Session::create_backend(int, PgSQL_Data_Stream *); template MySQL_Backend * Base_Session::find_or_create_backend(int, MySQL_Data_Stream *); template PgSQL_Backend * Base_Session::find_or_create_backend(int, PgSQL_Data_Stream *); +*/ +template MySQL_Backend * Base_Session::find_or_create_backend(int, MySQL_Data_Stream *); +template PgSQL_Backend * Base_Session::find_or_create_backend(int, PgSQL_Data_Stream *); -Base_Session::Base_Session() { +template void Base_Session::writeout(); +template void Base_Session::writeout(); + +template +Base_Session::Base_Session() { }; -Base_Session::~Base_Session() { +template +Base_Session::~Base_Session() { }; -template -void Base_Session::init() { +template +void Base_Session::init() { transaction_persistent_hostgroup = -1; transaction_persistent = false; mybes = new PtrArray(4); // Conditional initialization based on derived class - if constexpr (std::is_same_v) { + if constexpr (std::is_same_v) { sess_STMTs_meta = new MySQL_STMTs_meta(); SLDH = new StmtLongDataHandler(); } }; -template -B * Base_Session::find_backend(int hostgroup_id) { +template +B * Base_Session::find_backend(int hostgroup_id) { B *_mybe; unsigned int i; for (i=0; i < mybes->len; i++) { @@ -58,15 +81,15 @@ B * Base_Session::find_backend(int hostgroup_id) { * @param _myds The MySQL data stream associated with the backend. * @return A pointer to the newly created MySQL_Backend object. */ -template -B * Base_Session::create_backend(int hostgroup_id, D *_myds) { +template +B * Base_Session::create_backend(int hostgroup_id, DS *_myds) { B *_mybe = new B(); proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe); _mybe->hostgroup_id=hostgroup_id; if (_myds) { _mybe->server_myds=_myds; } else { - _mybe->server_myds = new D(); + _mybe->server_myds = new DS(); _mybe->server_myds->DSS=STATE_NOT_INITIALIZED; _mybe->server_myds->init(MYDS_BACKEND_NOT_CONNECTED, static_cast(this), 0); } @@ -87,11 +110,167 @@ B * Base_Session::create_backend(int hostgroup_id, D *_myds) { * @param _myds The MySQL data stream associated with the backend. * @return A pointer to the MySQL_Backend object found or created. */ -template -B * Base_Session::find_or_create_backend(int hostgroup_id, D *_myds) { - B * _mybe = find_backend(hostgroup_id); +template +B * Base_Session::find_or_create_backend(int hostgroup_id, DS *_myds) { + B * _mybe = find_backend(hostgroup_id); proxy_debug(PROXY_DEBUG_NET,4,"HID=%d, _myds=%p, _mybe=%p\n" , hostgroup_id, _myds, _mybe); // The pointer to the found or newly created backend is returned. - return ( _mybe ? _mybe : create_backend(hostgroup_id, _myds) ); + return ( _mybe ? _mybe : create_backend(hostgroup_id, _myds) ); }; +/** + * @brief Writes data from the session to the network with optional throttling and flow control. + * + * The writeout() function in the MySQL_Session class is responsible for writing data from the session to the network. + * It supports throttling, which limits the rate at which data is sent to the client. Throttling is controlled by the + * mysql_thread___throttle_max_bytes_per_second_to_client configuration parameter. If throttling is disabled (the parameter + * is set to 0), the function bypasses throttling. + * + * This function first ensures that any pending data in the session's data stream (client_myds) is written to the network. + * This ensures that the network buffers are emptied, allowing new data to be sent. + * + * After writing data to the network, the function checks if flow control is necessary. If the total amount of data written + * exceeds the maximum allowed per call (mwpl), or if the data is sent too quickly, the function pauses writing for a brief + * period to control the flow of data. + * + * If throttling is enabled, the function adjusts the throttle based on the amount of data written and the configured maximum + * bytes per second. If the current throughput exceeds the configured limit, the function increases the pause duration to + * regulate the flow of data. + * + * Finally, if the session has a backend associated with it (mybe), and the backend has a server data stream (server_myds), + * the function also writes data from the server data stream to the network. + * + * @note This function assumes that necessary session and network structures are properly initialized. + * + * @see mysql_thread___throttle_max_bytes_per_second_to_client + * @see MySQL_Session::client_myds + * @see MySQL_Session::mybe + * @see MySQL_Backend::server_myds + */ + +template +void Base_Session::writeout() { + int tps = 10; // throttling per second , by default every 100ms + int total_written = 0; + unsigned long long last_sent_=0; + int tmbpstc = 0; // throttle_max_bytes_per_second_to_client + if constexpr (std::is_same::value) { + tmbpstc = mysql_thread___throttle_max_bytes_per_second_to_client; + } else if constexpr (std::is_same::value) { + tmbpstc = pgsql_thread___throttle_max_bytes_per_second_to_client; + } else { + assert(0); + } + bool disable_throttle = tmbpstc == 0; + int mwpl = tmbpstc; // max writes per call + mwpl = mwpl/tps; + // logic to disable throttling + if constexpr (std::is_same::value) { + if (session_type!=PROXYSQL_SESSION_MYSQL) { + disable_throttle = true; + } + } + if constexpr (std::is_same::value) { + if (session_type != PROXYSQL_SESSION_PGSQL) { + disable_throttle = true; + } + } + if (client_myds && thread->curtime >= client_myds->pause_until) { + if (mirror==false) { + bool runloop=false; + if (client_myds->mypolls) { + last_sent_ = client_myds->mypolls->last_sent[client_myds->poll_fds_idx]; + } + int retbytes=client_myds->write_to_net_poll(); + total_written+=retbytes; + if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat + runloop=true; + } + while (runloop && (disable_throttle || total_written < mwpl)) { + runloop=false; // the default + client_myds->array2buffer_full(); + struct pollfd fds; + fds.fd=client_myds->fd; + fds.events=POLLOUT; + fds.revents=0; + int retpoll=poll(&fds, 1, 0); + if (retpoll>0) { + if (fds.revents==POLLOUT) { + retbytes=client_myds->write_to_net_poll(); + total_written+=retbytes; + if (retbytes==QUEUE_T_DEFAULT_SIZE) { // optimization to solve memory bloat + runloop=true; + } + } + } + } + } + } + + // flow control + if (!disable_throttle && total_written > 0) { + if (total_written > mwpl) { + unsigned long long add_ = 1000000 / tps + 1000000 / tps * ((unsigned long long)total_written - (unsigned long long)mwpl) / mwpl; + pause_until = thread->curtime + add_; + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } + else { + if (total_written >= QUEUE_T_DEFAULT_SIZE) { + unsigned long long time_diff = thread->curtime - last_sent_; + if (time_diff == 0) { // sending data really too fast! + unsigned long long add_ = 1000000 / tps + 1000000 / tps * ((unsigned long long)total_written - (unsigned long long)mwpl) / mwpl; + pause_until = thread->curtime + add_; + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } + else { + float current_Bps = (float)total_written * 1000 * 1000 / time_diff; + if (current_Bps > tmbpstc) { + unsigned long long add_ = 1000000 / tps; + pause_until = thread->curtime + add_; + assert(pause_until > thread->curtime); + client_myds->remove_pollout(); + client_myds->pause_until = thread->curtime + add_; + } + } + } + } + } + if (mybe) { + if (mybe->server_myds) mybe->server_myds->write_to_net_poll(); + } + proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this); +} + +#if 0 +void MySQL_Session::writeout() { + if (client_myds) client_myds->array2buffer_full(); + if (mybe && mybe->server_myds && mybe->server_myds->myds_type==MYDS_BACKEND) { + if (session_type==PROXYSQL_SESSION_MYSQL) { + if (mybe->server_myds->net_failure==false) { + if (mybe->server_myds->poll_fds_idx>-1) { // NOTE: attempt to force writes + mybe->server_myds->array2buffer_full(); + } + } + } else { + mybe->server_myds->array2buffer_full(); + } + } + + +void PgSQL_Session::writeout() { + if (client_myds) client_myds->array2buffer_full(); + if (mybe && mybe->server_myds && mybe->server_myds->myds_type == MYDS_BACKEND) { + if (session_type == PROXYSQL_SESSION_PGSQL) { + if (mybe->server_myds->net_failure == false) { + if (mybe->server_myds->poll_fds_idx > -1) { // NOTE: attempt to force writes + mybe->server_myds->array2buffer_full(); + } + } + } + else { + mybe->server_myds->array2buffer_full(); + } + } +#endif // 0 diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 5ae340bbc..dafac743b 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -670,7 +670,7 @@ MySQL_Session::MySQL_Session() { match_regexes=NULL; - init(); // we moved this out to allow CHANGE_USER + init(); // we moved this out to allow CHANGE_USER last_insert_id=0; // #1093 @@ -838,6 +838,7 @@ void MySQL_Session::reset_all_backends() { } }; +#if 0 /** * @brief Writes data from the session to the network with optional throttling and flow control. * @@ -955,6 +956,7 @@ void MySQL_Session::writeout() { } proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Writeout Session %p\n" , this->thread, this, this); } +#endif // 0 /** * @brief Handles COMMIT or ROLLBACK commands received from the client. @@ -3479,7 +3481,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } } - mybe=find_or_create_backend(current_hostgroup); + mybe=find_or_create_backend(current_hostgroup); if (client_myds->myconn->local_stmts==NULL) { client_myds->myconn->local_stmts=new MySQL_STMTs_local_v14(true); } @@ -3508,7 +3510,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C CurrentQuery.end_time=thread->curtime; CurrentQuery.end(); } else { - mybe=find_or_create_backend(current_hostgroup); + mybe=find_or_create_backend(current_hostgroup); status=PROCESSING_STMT_PREPARE; mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure; mybe->server_myds->wait_until=0; @@ -3652,7 +3654,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } } - mybe=find_or_create_backend(current_hostgroup); + mybe=find_or_create_backend(current_hostgroup); status=PROCESSING_STMT_EXECUTE; mybe->server_myds->connect_retries_on_failure=mysql_thread___connect_retries_on_failure; mybe->server_myds->wait_until=0; @@ -3984,7 +3986,7 @@ int MySQL_Session::GPFC_WaitingClientData_FastForwardSession(PtrSize_t& pkt) { return handler_ret; } - mybe=find_or_create_backend(current_hostgroup); // set a backend + mybe=find_or_create_backend(current_hostgroup); // set a backend mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection @@ -4089,7 +4091,7 @@ void MySQL_Session::GPFC_Replication_SwitchToFastForward(PtrSize_t& pkt, unsigne // forward before receiving the command. This way the state machine will // handle the command automatically. current_hostgroup = previous_hostgroup; - mybe = find_or_create_backend(current_hostgroup); // set a backend + mybe = find_or_create_backend(current_hostgroup); // set a backend mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active // We reinitialize the 'wait_until' since this session shouldn't wait for processing as // we are now transitioning to 'FAST_FORWARD'. @@ -4398,7 +4400,7 @@ __get_pkts_from_client: } } } - mybe=find_or_create_backend(current_hostgroup); + mybe=find_or_create_backend(current_hostgroup); status=PROCESSING_QUERY; // set query retries mybe->server_myds->query_retries_on_failure=mysql_thread___query_retries_on_failure; @@ -4979,7 +4981,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA() { void MySQL_Session::housekeeping_before_pkts() { if (mysql_thread___multiplexing) { for (const int hg_id : hgs_expired_conns) { - MySQL_Backend* mybe = find_backend(hg_id); + MySQL_Backend* mybe = find_backend(hg_id); if (mybe != nullptr) { MySQL_Data_Stream* myds = mybe->server_myds; @@ -7104,7 +7106,7 @@ bool MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // we need to try to execute it where the last write was successful if (last_HG_affected_rows >= 0) { MySQL_Backend * _mybe = NULL; - _mybe = find_backend(last_HG_affected_rows); + _mybe = find_backend(last_HG_affected_rows); if (_mybe) { if (_mybe->server_myds) { if (_mybe->server_myds->myconn) { @@ -7262,7 +7264,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C //if (session_type == PROXYSQL_SESSION_MYSQL) { if (session_type == PROXYSQL_SESSION_MYSQL || session_type == PROXYSQL_SESSION_SQLITE) { reset(); - init(); + init(); if (client_authenticated) { if (use_ldap_auth == false) { GloMyAuth->decrease_frontend_user_connections(client_myds->myconn->userinfo->username); @@ -7343,7 +7345,7 @@ void MySQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // Re-initialize the session reset(); - init(); + init(); // Recover the relevant session values this->default_hostgroup = default_hostgroup; @@ -7406,7 +7408,7 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED gtid_uuid = qpo->min_gtid; with_gtid = true; } else if (qpo->gtid_from_hostgroup >= 0) { - _gtid_from_backend = find_backend(qpo->gtid_from_hostgroup); + _gtid_from_backend = find_backend(qpo->gtid_from_hostgroup); if (_gtid_from_backend) { if (_gtid_from_backend->gtid_uuid[0]) { gtid_uuid = _gtid_from_backend->gtid_uuid; @@ -8026,7 +8028,7 @@ void MySQL_Session::create_new_session_and_reset_connection(MySQL_Data_Stream *_ // we create a brand new session, a new data stream, and attach the connection to it MySQL_Session * new_sess = new MySQL_Session(); - new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid); + new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid); new_myds = new_sess->mybe->server_myds; new_myds->attach_connection(mc); @@ -8435,7 +8437,7 @@ void MySQL_Session::reset_warning_hostgroup_flag_and_release_connection() { // if we've reached this point, it means that warning was found in the previous query, but the // current executed query is not 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', so we can safely reset warning_in_hg and // return connection back to the connection pool. - MySQL_Backend* _mybe = find_backend(warning_in_hg); + MySQL_Backend* _mybe = find_backend(warning_in_hg); if (_mybe) { MySQL_Data_Stream* myds = _mybe->server_myds; if (myds && myds->myconn) { diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 15f57ec94..350503ab4 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -2984,7 +2984,7 @@ void MySQL_Thread::run___get_multiple_idle_connections(int& num_idles) { MySQL_Data_Stream *myds; MySQL_Connection *mc=my_idle_conns[i]; MySQL_Session *sess=new MySQL_Session(); - sess->mybe=sess->find_or_create_backend(mc->parent->myhgc->hid); + sess->mybe=sess->find_or_create_backend(mc->parent->myhgc->hid); myds=sess->mybe->server_myds; myds->attach_connection(mc); diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 8568f67db..9a6b5feb2 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -600,7 +600,7 @@ PgSQL_Session::PgSQL_Session() { match_regexes = NULL; - init(); // we moved this out to allow CHANGE_USER + init(); // we moved this out to allow CHANGE_USER last_insert_id = 0; // #1093 @@ -741,6 +741,7 @@ void PgSQL_Session::reset_all_backends() { } }; +#if 0 void PgSQL_Session::writeout() { int tps = 10; // throttling per second , by default every 100ms int total_written = 0; @@ -832,6 +833,7 @@ void PgSQL_Session::writeout() { } proxy_debug(PROXY_DEBUG_NET, 1, "Thread=%p, Session=%p -- Writeout Session %p\n", this->thread, this, this); } +#endif // 0 bool PgSQL_Session::handler_CommitRollback(PtrSize_t* pkt) { if (pkt->size <= 5) { return false; } @@ -3347,7 +3349,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } } - mybe = find_or_create_backend(current_hostgroup); + mybe = find_or_create_backend(current_hostgroup); if (client_myds->myconn->local_stmts == NULL) { client_myds->myconn->local_stmts = new MySQL_STMTs_local_v14(true); } @@ -3377,7 +3379,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C CurrentQuery.end(); } else { - mybe = find_or_create_backend(current_hostgroup); + mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_STMT_PREPARE; mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; mybe->server_myds->wait_until = 0; @@ -3519,7 +3521,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C } } } - mybe = find_or_create_backend(current_hostgroup); + mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_STMT_EXECUTE; mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; mybe->server_myds->wait_until = 0; @@ -3861,7 +3863,7 @@ __get_pkts_from_client: return handler_ret; } - mybe = find_or_create_backend(current_hostgroup); // set a backend + mybe = find_or_create_backend(current_hostgroup); // set a backend mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size); // move the first packet previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD . Now we need a connection @@ -4056,7 +4058,7 @@ __get_pkts_from_client: } } } - mybe = find_or_create_backend(current_hostgroup); + mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_QUERY; // set query retries mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure; @@ -4274,7 +4276,7 @@ __get_pkts_from_client: } } } - mybe = find_or_create_backend(current_hostgroup); + mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_QUERY; // set query retries mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure; @@ -4382,7 +4384,7 @@ __get_pkts_from_client: // forward before receiving the command. This way the state machine will // handle the command automatically. current_hostgroup = previous_hostgroup; - mybe = find_or_create_backend(current_hostgroup); // set a backend + mybe = find_or_create_backend(current_hostgroup); // set a backend mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active // We reinitialize the 'wait_until' since this session shouldn't wait for processing as // we are now transitioning to 'FAST_FORWARD'. @@ -4928,7 +4930,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA() { void PgSQL_Session::housekeeping_before_pkts() { if (pgsql_thread___multiplexing) { for (const int hg_id : hgs_expired_conns) { - PgSQL_Backend* mybe = find_backend(hg_id); + PgSQL_Backend* mybe = find_backend(hg_id); if (mybe != nullptr) { PgSQL_Data_Stream* myds = mybe->server_myds; @@ -7163,7 +7165,7 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // we need to try to execute it where the last write was successful if (last_HG_affected_rows >= 0) { PgSQL_Backend* _mybe = NULL; - _mybe = find_backend(last_HG_affected_rows); + _mybe = find_backend(last_HG_affected_rows); if (_mybe) { if (_mybe->server_myds) { if (_mybe->server_myds->myconn) { @@ -7322,7 +7324,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C //if (session_type == PROXYSQL_SESSION_PGSQL) { if (session_type == PROXYSQL_SESSION_PGSQL || session_type == PROXYSQL_SESSION_SQLITE) { reset(); - init(); + init(); if (client_authenticated) { if (use_ldap_auth == false) { GloPgAuth->decrease_frontend_user_connections(client_myds->myconn->userinfo->username); @@ -7407,7 +7409,7 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C // Re-initialize the session reset(); - init(); + init(); // Recover the relevant session values this->default_hostgroup = default_hostgroup; @@ -7472,7 +7474,7 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED with_gtid = true; } else if (qpo->gtid_from_hostgroup >= 0) { - _gtid_from_backend = find_backend(qpo->gtid_from_hostgroup); + _gtid_from_backend = find_backend(qpo->gtid_from_hostgroup); if (_gtid_from_backend) { if (_gtid_from_backend->gtid_uuid[0]) { gtid_uuid = _gtid_from_backend->gtid_uuid; @@ -8145,7 +8147,7 @@ void PgSQL_Session::create_new_session_and_reset_connection(PgSQL_Data_Stream* _ // we create a brand new session, a new data stream, and attach the connection to it PgSQL_Session* new_sess = new PgSQL_Session(); - new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid); + new_sess->mybe = new_sess->find_or_create_backend(mc->parent->myhgc->hid); new_myds = new_sess->mybe->server_myds; new_myds->attach_connection(mc); @@ -8538,7 +8540,7 @@ void PgSQL_Session::reset_warning_hostgroup_flag_and_release_connection() // if we've reached this point, it means that warning was found in the previous query, but the // current executed query is not 'SHOW WARNINGS' or 'SHOW COUNT(*) FROM WARNINGS', so we can safely reset warning_in_hg and // return connection back to the connection pool. - PgSQL_Backend* _mybe = find_backend(warning_in_hg); + PgSQL_Backend* _mybe = find_backend(warning_in_hg); if (_mybe) { PgSQL_Data_Stream* myds = _mybe->server_myds; if (myds && myds->myconn) { diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index ab7264813..da6a4992a 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -2849,7 +2849,7 @@ void PgSQL_Thread::run___get_multiple_idle_connections(int& num_idles) { PgSQL_Data_Stream* myds; PgSQL_Connection* mc = my_idle_conns[i]; PgSQL_Session* sess = new PgSQL_Session(); - sess->mybe = sess->find_or_create_backend(mc->parent->myhgc->hid); + sess->mybe = sess->find_or_create_backend(mc->parent->myhgc->hid); myds = sess->mybe->server_myds; myds->attach_connection(mc);