From 9e4b76d6d19459a2a150a6635dbfd9600cde6f86 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 19 Aug 2025 11:28:12 +0500 Subject: [PATCH] * Refactored PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo. * Fixed a crash occurring during session destruction. * Query rules will now apply only to the first message in an extended query frame. * OK message will apply to execute message. * Query rewrite, error messages, and large packet handling will apply to parse message. * Added query processing support for the Bind message. --- include/Base_Thread.h | 4 +- include/PgSQL_Extended_Query_Message.h | 15 +- include/PgSQL_Session.h | 10 + lib/Base_Session.cpp | 4 +- lib/PgSQL_Connection.cpp | 3 + lib/PgSQL_Protocol.cpp | 47 +- lib/PgSQL_Session.cpp | 1332 ++++++++++++------------ 7 files changed, 704 insertions(+), 711 deletions(-) diff --git a/include/Base_Thread.h b/include/Base_Thread.h index 096e9f19c..485fec913 100644 --- a/include/Base_Thread.h +++ b/include/Base_Thread.h @@ -28,9 +28,9 @@ private: void* re; char* s; public: - Session_Regex(char* p); + Session_Regex(const char* p); ~Session_Regex(); - bool match(char* m); + bool match(const char* m); }; class MySQL_Thread; diff --git a/include/PgSQL_Extended_Query_Message.h b/include/PgSQL_Extended_Query_Message.h index a5a66f248..f54f52e64 100644 --- a/include/PgSQL_Extended_Query_Message.h +++ b/include/PgSQL_Extended_Query_Message.h @@ -50,6 +50,19 @@ public: return _data; } + /** + * @brief Returns a reference to the internal packet data. + * + * @return Reference to the PtrSize_t structure containing packet data. + */ + inline const PtrSize_t& get_raw_pkt() const noexcept { + return _pkt; + } + + inline PtrSize_t& get_raw_pkt() noexcept { + return _pkt; + } + protected: /** * @brief Provides mutable access to the internal data. @@ -161,7 +174,6 @@ private: uint16_t remaining; ///< Number of fields remaining to read. }; - struct PgSQL_Parse_Data { const char* stmt_name; // The name of the prepared statement const char* query_string; // The query string to be prepared @@ -171,6 +183,7 @@ private: const unsigned char* param_types_start_ptr; // Array of parameter types (can be nullptr if none) friend class PgSQL_Parse_Message; + friend class PgSQL_Session; // need it for void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt); }; class PgSQL_Parse_Message : public Base_Extended_Query_Message { diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index ed4dd246e..664aaeaee 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -34,6 +34,8 @@ enum PgSQL_Extended_Query_Type : uint8_t { PGSQL_EXTENDED_QUERY_TYPE_PARSE = 0x01, PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE = 0x02, PGSQL_EXTENDED_QUERY_TYPE_EXECUTE = 0x04, + PGSQL_EXTENDED_QUERY_TYPE_BIND = 0x08, + PGSQL_EXTENDED_QUERY_TYPE_CLOSE = 0x10, }; /* Enumerated types for output format and date order */ @@ -188,6 +190,7 @@ private: using PktType = std::variant,std::unique_ptr, std::unique_ptr, std::unique_ptr, std::unique_ptr>; + bool extended_query_exec_qp = false; std::queue extended_query_frame; std::unique_ptr bind_waiting_for_execute; @@ -236,6 +239,13 @@ private: #endif void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection(); + + bool is_multi_statement_command(const char* cmd); + bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_SET_command(const char* dig, bool* lock_hostgroup); + bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_RESET_command(const char* dig, bool* lock_hostgroup); + bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DISCARD_command(const char* dig); + bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DEALLOCATE_command(const char* dig); + bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_special_commands(const char* dig, bool* lock_hostgroup); bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t*, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type = PGSQL_EXTENDED_QUERY_TYPE_NOT_SET); bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(PtrSize_t& pkt); diff --git a/lib/Base_Session.cpp b/lib/Base_Session.cpp index e8bc7f44a..95d66e6b9 100644 --- a/lib/Base_Session.cpp +++ b/lib/Base_Session.cpp @@ -683,7 +683,7 @@ int Base_Session::FindOneActiveTransaction(bool check_savepoint) { return ret; } -Session_Regex::Session_Regex(char* p) { +Session_Regex::Session_Regex(const char* p) { s = strdup(p); re2::RE2::Options* opt2 = new re2::RE2::Options(RE2::Quiet); opt2->set_case_sensitive(false); @@ -697,7 +697,7 @@ Session_Regex::~Session_Regex() { delete (re2::RE2::Options*)opt; } -bool Session_Regex::match(char* m) { +bool Session_Regex::match(const char* m) { bool rc = false; rc = RE2::PartialMatch(m, *(RE2*)re); return rc; diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index fb74a0a3b..926f10074 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -702,6 +702,9 @@ handler_again: break; case ASYNC_STMT_PREPARE_START: stmt_prepare_start(); + __sync_fetch_and_add(&parent->queries_sent, 1); + update_bytes_sent(query.length + 5); + statuses.questions++; if (async_exit_status) { next_event(ASYNC_STMT_PREPARE_CONT); } diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index fab67357a..6dc2693ed 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1480,27 +1480,34 @@ bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg, pgpkt.set_multi_pkt_mode(true); } - char* tag = extract_tag_from_query(query); - assert(tag); - - char tmpbuf[128]; - if (strcmp(tag, "INSERT") == 0) { - sprintf(tmpbuf, "%s 0 %d", tag, rows); - pgpkt.write_CommandComplete(tmpbuf); - } else if (strcmp(tag, "UPDATE") == 0 || - strcmp(tag, "DELETE") == 0 || - strcmp(tag, "MERGE") == 0 || - strcmp(tag, "MOVE") == 0 || - strcmp(tag, "FETCH") == 0 || - strcmp(tag, "COPY") == 0 || - strcmp(tag, "SELECT") == 0) { - sprintf(tmpbuf, "%s %d", tag, rows); - pgpkt.write_CommandComplete(tmpbuf); - } else { - pgpkt.write_CommandComplete(tag); + if (query) { + char* tag = extract_tag_from_query(query); + assert(tag); + + char tmpbuf[128]; + if (strcmp(tag, "INSERT") == 0) { + sprintf(tmpbuf, "%s 0 %d", tag, rows); + pgpkt.write_CommandComplete(tmpbuf); + } + else if (strcmp(tag, "UPDATE") == 0 || + strcmp(tag, "DELETE") == 0 || + strcmp(tag, "MERGE") == 0 || + strcmp(tag, "MOVE") == 0 || + strcmp(tag, "FETCH") == 0 || + strcmp(tag, "COPY") == 0 || + strcmp(tag, "SELECT") == 0) { + sprintf(tmpbuf, "%s %d", tag, rows); + pgpkt.write_CommandComplete(tmpbuf); + } + else { + pgpkt.write_CommandComplete(tag); + } + free(tag); + } else if (msg) { + // if no query, but message is provided, use it as tag + pgpkt.write_CommandComplete(msg); } - free(tag); - + for (auto& [param_name, param_value] : param_status) { pgpkt.write_ParameterStatus(param_name.c_str(), param_value.c_str()); } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index f44303e8b..7971b620d 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -457,8 +457,6 @@ void PgSQL_Session::reset() { PgSQL_Session::~PgSQL_Session() { - //reset(); - if (locked_on_hostgroup >= 0) { thread->status_variables.stvar[st_var_hostgroup_locked]--; } @@ -482,7 +480,11 @@ PgSQL_Session::~PgSQL_Session() { } } delete client_myds; + client_myds = nullptr; } + // Important: Keep the reset order as-is + reset(); + if (default_schema) { free(default_schema); } @@ -2255,6 +2257,7 @@ int PgSQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) { l_free(pkt.size, pkt.ptr); pkt = { 0, nullptr }; bind_waiting_for_execute.reset(nullptr); + extended_query_exec_qp = true; __run_sync_again: int rc = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_SYNC(); @@ -3792,7 +3795,7 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t* pkt) { client_myds->DSS = STATE_QUERY_SENT_NET; unsigned int nTrx = NumActiveTransactions(); const char txn_state = (nTrx ? 'T' : 'I'); - client_myds->myprot.generate_ok_packet(true, true, qpo->OK_msg, 0, (const char*)pkt->ptr + 5, txn_state); + client_myds->myprot.generate_ok_packet(true, true, qpo->OK_msg, 0, nullptr, txn_state); RequestEnd(NULL); l_free(pkt->size, pkt->ptr); } @@ -3816,650 +3819,527 @@ void PgSQL_Session::handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt) { l_free(pkt->size, pkt->ptr); } -bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t* pkt, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type) { - /* - lock_hostgroup: - If this variable is set to true, this session will get lock to a - specific hostgroup, and also have multiplexing disabled. - It means that parsing the query wasn't completely possible (mostly - a SET statement) and proxysql won't be able to set the same variable - in another connection. - This algorithm will be become obsolete once we implement session - tracking for MySQL 5.7+ - */ - //bool exit_after_SetParse = true; - - if (qpo->new_query) { - handler_WCD_SS_MCQ_qpo_QueryRewrite(pkt); - } - - if (pkt->size > (unsigned int)pgsql_thread___max_allowed_packet) { - handler_WCD_SS_MCQ_qpo_LargePacket(pkt); - return true; - } - - if (qpo->OK_msg) { - handler_WCD_SS_MCQ_qpo_OK_msg(pkt); - return true; +bool PgSQL_Session::is_multi_statement_command(const char* cmd) { + const char* dig = CurrentQuery.QueryParserArgs.digest_text; +#ifdef DEBUG + { + std::string nqn = std::string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing %s command = %s\n", cmd, nqn.c_str()); } - - if (qpo->error_msg) { - handler_WCD_SS_MCQ_qpo_error_msg(pkt); +#endif + if (index(dig, ';') && (index(dig, ';') != dig + strlen(dig) - 1)) { + std::string nqn; + if (pgsql_thread___parse_failure_logs_digest) + nqn = std::string(CurrentQuery.get_digest_text()); + else + nqn = std::string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); + proxy_warning( + "Unable to parse multi-statements command with %s statement from client" + " %s:%d: setting lock hostgroup. Command: %s\n", cmd, client_myds->addr.addr, + client_myds->addr.port, nqn.c_str() + ); return true; } - // Check if the session is not locked on a hostgroup and there are untracked option parameters - if (locked_on_hostgroup < 0 && untracked_option_parameters.empty() == false) { - if (client_myds && client_myds->addr.addr) { - proxy_warning("Unknown connection options from client %s:%d. Setting lock_hostgroup. Please report a bug for future enhancements:%s\n", client_myds->addr.addr, client_myds->addr.port, untracked_option_parameters.c_str()); - } else { - // Log a warning message without client address and port - proxy_warning("Unknown connection options. Setting lock_hostgroup. Please report a bug for future enhancements:%s\n", untracked_option_parameters.c_str()); - } - - // If there are untracked option parameters, lock the hostgroup - *lock_hostgroup = true; - - // Always create a new connection to pass untracked options to the server - qpo->create_new_conn = true; - return false; - } - - if (stmt_type == PGSQL_EXTENDED_QUERY_TYPE_PARSE || - stmt_type == PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE) { // for Parse and Describe we exit here - goto __exit_set_destination_hostgroup; - } - - // handle here #509, #815 and #816 - if (CurrentQuery.QueryParserArgs.digest_text) { - char* dig = CurrentQuery.QueryParserArgs.digest_text; + return false; +} - if ((locked_on_hostgroup == -1) && (strncasecmp(dig, "SET ", 4) == 0)) { - // this code is executed only if locked_on_hostgroup is not set yet - // if locked_on_hostgroup is set, we do not try to parse the SET statement -#ifdef DEBUG - { - string nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing SET command = %s\n", nqn.c_str()); - } -#endif - if (index(dig, ';') && (index(dig, ';') != dig + strlen(dig) - 1)) { +bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_SET_command(const char* dig, bool* lock_hostgroup) { + // this code is executed only if locked_on_hostgroup is not set yet + // if locked_on_hostgroup is set, we do not try to parse the SET statement + std::string nq = std::string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); + RE2::GlobalReplace(&nq, "^/\\*!\\d\\d\\d\\d\\d SET(.*)\\*/", "SET\\1"); + RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); + // remove trailing space and semicolon if present. See issue#4380 + nq.erase(nq.find_last_not_of(" ;") + 1); + if ( + (match_regexes && match_regexes[1]->match(dig)) + ) + { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing SET command %s\n", nq.c_str()); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing SET command = %s\n", nq.c_str()); + PgSQL_Set_Stmt_Parser parser(nq); + std::map> set = {}; + std::vector> param_status = {}; + bool send_param_status = false; + + thread->thr_SetParser->set_query(nq); // replace the query + set = thread->thr_SetParser->parse1v2(); // use algorithm v2 + + // Flag to be set if any variable within the 'SET' statement fails to be tracked, + // due to being unknown or because it's an user defined variable. + bool failed_to_parse_var = set.empty(); + for (auto it = std::begin(set); it != std::end(set); ++it) { + std::string var = it->first; + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Processing SET variable %s\n", var.c_str()); + if (it->second.size() < 1 || it->second.size() > 2) { + // error not enough arguments + string query_str = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); + string digest_str = string(CurrentQuery.get_digest_text()); string nqn; if (pgsql_thread___parse_failure_logs_digest) - nqn = string(CurrentQuery.get_digest_text()); + nqn = digest_str; else - nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_warning( - "Unable to parse multi-statements command with SET statement from client" - " %s:%d: setting lock hostgroup. Command: %s\n", client_myds->addr.addr, - client_myds->addr.port, nqn.c_str() - ); - *lock_hostgroup = true; + nqn = query_str; + // PMC-10002: A query has failed to be parsed. This can be due a incorrect query or + // due to ProxySQL not being able to properly parse it. In case the query is correct a + // bug report should be filed including the offending query. + proxy_error2(10002, "Unable to parse query. If correct, report it as a bug: %s\n", nqn.c_str()); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Locking hostgroup for query %s\n", + query_str.c_str()); + unable_to_parse_set_statement(lock_hostgroup); return false; } - - string nq = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - RE2::GlobalReplace(&nq, "^/\\*!\\d\\d\\d\\d\\d SET(.*)\\*/", "SET\\1"); - RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); - // remove trailing space and semicolon if present. See issue#4380 - nq.erase(nq.find_last_not_of(" ;") + 1); - if ( - (match_regexes && match_regexes[1]->match(dig)) - ) - { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing SET command %s\n", nq.c_str()); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing SET command = %s\n", nq.c_str()); - PgSQL_Set_Stmt_Parser parser(nq); - std::map> set = {}; - std::vector> param_status = {}; - bool send_param_status = false; - - thread->thr_SetParser->set_query(nq); // replace the query - set = thread->thr_SetParser->parse1v2(); // use algorithm v2 - - // Flag to be set if any variable within the 'SET' statement fails to be tracked, - // due to being unknown or because it's an user defined variable. - bool failed_to_parse_var = set.empty(); - for (auto it = std::begin(set); it != std::end(set); ++it) { - std::string var = it->first; - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Processing SET variable %s\n", var.c_str()); - if (it->second.size() < 1 || it->second.size() > 2) { - // error not enough arguments - string query_str = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - string digest_str = string(CurrentQuery.get_digest_text()); - string nqn; - if (pgsql_thread___parse_failure_logs_digest) - nqn = digest_str; - else - nqn = query_str; - // PMC-10002: A query has failed to be parsed. This can be due a incorrect query or - // due to ProxySQL not being able to properly parse it. In case the query is correct a - // bug report should be filed including the offending query. - proxy_error2(10002, "Unable to parse query. If correct, report it as a bug: %s\n", nqn.c_str()); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Locking hostgroup for query %s\n", - query_str.c_str()); - unable_to_parse_set_statement(lock_hostgroup); - return false; + auto values = std::begin(it->second); + if (std::find(pgsql_critical_variables.begin(), pgsql_critical_variables.end(), var) != pgsql_critical_variables.end() || + pgsql_other_variables.find(var) != pgsql_other_variables.end()) { + std::string value1 = *values; + + int idx = PGSQL_NAME_LAST_HIGH_WM; + for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { + // skip low water mark + if (i == PGSQL_NAME_LAST_LOW_WM) continue; + + if (variable_name_exists(pgsql_tracked_variables[i], var.c_str()) == true) { + idx = i; + break; } - auto values = std::begin(it->second); - if (std::find(pgsql_critical_variables.begin(), pgsql_critical_variables.end(), var) != pgsql_critical_variables.end() || - pgsql_other_variables.find(var) != pgsql_other_variables.end()) { - std::string value1 = *values; - - int idx = PGSQL_NAME_LAST_HIGH_WM; - for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { - // skip low water mark - if (i == PGSQL_NAME_LAST_LOW_WM) continue; - - if (variable_name_exists(pgsql_tracked_variables[i], var.c_str()) == true) { - idx = i; - break; + } + if (idx != PGSQL_NAME_LAST_HIGH_WM) { + uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); + if ((value1.size() == sizeof("DEFAULT") - 1) && strncasecmp(value1.c_str(), "DEFAULT", sizeof("DEFAULT") - 1) == 0) { + auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); + if (hash == 0) { + if (current_hash != 0) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Resetting connection variable %s to DEFAULT\n", var.c_str()); + pgsql_variables.client_reset_value(this, idx, true); } + client_myds->DSS = STATE_QUERY_SENT_NET; + unsigned int nTrx = NumActiveTransactions(); + const char trx_state = (nTrx ? 'T' : 'I'); + client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, trx_state, NULL, param_status); + RequestEnd(NULL); + return true; } - if (idx != PGSQL_NAME_LAST_HIGH_WM) { - uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); - if ((value1.size() == sizeof("DEFAULT") - 1) && strncasecmp(value1.c_str(), "DEFAULT",sizeof("DEFAULT")-1) == 0) { - auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); - if (hash == 0) { - if (current_hash != 0) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Resetting connection variable %s to DEFAULT\n", var.c_str()); - pgsql_variables.client_reset_value(this, idx, true); - } - client_myds->DSS = STATE_QUERY_SENT_NET; - unsigned int nTrx = NumActiveTransactions(); - const char trx_state = (nTrx ? 'T' : 'I'); - client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, trx_state, NULL, param_status); - RequestEnd(NULL); - l_free(pkt->size, pkt->ptr); - return true; - } - value1 = value; - } - - char* transformed_value = nullptr; - if (pgsql_tracked_variables[idx].validator && pgsql_tracked_variables[idx].validator->validate && - ( - *pgsql_tracked_variables[idx].validator->validate)( - value1.c_str(), &pgsql_tracked_variables[idx].validator->params, this, &transformed_value) == false - ) { - char* m = NULL; - char* errmsg = NULL; - proxy_error("invalid value for parameter \"%s\": \"%s\"\n", pgsql_tracked_variables[idx].set_variable_name, value1.c_str()); - m = (char*)"invalid value for parameter \"%s\": \"%s\""; - errmsg = (char*)malloc(value1.length() + strlen(pgsql_tracked_variables[idx].set_variable_name) + strlen(m)); - sprintf(errmsg, m, pgsql_tracked_variables[idx].set_variable_name, value1.c_str()); + value1 = value; + } - client_myds->DSS = STATE_QUERY_SENT_NET; - client_myds->myprot.generate_error_packet(true, true, errmsg, - PGSQL_ERROR_CODES::ERRCODE_INVALID_PARAMETER_VALUE, false, true); - free(errmsg); - RequestEnd(NULL); - l_free(pkt->size, pkt->ptr); - return true; - } + char* transformed_value = nullptr; + if (pgsql_tracked_variables[idx].validator && pgsql_tracked_variables[idx].validator->validate && + ( + *pgsql_tracked_variables[idx].validator->validate)( + value1.c_str(), &pgsql_tracked_variables[idx].validator->params, this, &transformed_value) == false + ) { + char* m = NULL; + char* errmsg = NULL; + proxy_error("invalid value for parameter \"%s\": \"%s\"\n", pgsql_tracked_variables[idx].set_variable_name, value1.c_str()); + m = (char*)"invalid value for parameter \"%s\": \"%s\""; + errmsg = (char*)malloc(value1.length() + strlen(pgsql_tracked_variables[idx].set_variable_name) + strlen(m)); + sprintf(errmsg, m, pgsql_tracked_variables[idx].set_variable_name, value1.c_str()); + + client_myds->DSS = STATE_QUERY_SENT_NET; + client_myds->myprot.generate_error_packet(true, true, errmsg, + PGSQL_ERROR_CODES::ERRCODE_INVALID_PARAMETER_VALUE, false, true); + free(errmsg); + RequestEnd(NULL); + return true; + } - if (transformed_value) { - value1 = transformed_value; - free(transformed_value); - } + if (transformed_value) { + value1 = transformed_value; + free(transformed_value); + } - if (idx == PGSQL_DATESTYLE) { - if (value1.empty()) { - client_myds->DSS = STATE_QUERY_SENT_NET; - unsigned int nTrx = NumActiveTransactions(); - const char txn_state = (nTrx ? 'T' : 'I'); - client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, param_status); - RequestEnd(NULL); - l_free(pkt->size, pkt->ptr); - return true; - } - } + if (idx == PGSQL_DATESTYLE) { + if (value1.empty()) { + client_myds->DSS = STATE_QUERY_SENT_NET; + unsigned int nTrx = NumActiveTransactions(); + const char txn_state = (nTrx ? 'T' : 'I'); + client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, param_status); + RequestEnd(NULL); + return true; + } + } - uint32_t var_hash_int = SpookyHash::Hash32(value1.c_str(), value1.length(), 10); - if (current_hash != var_hash_int) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", var.c_str(), value1.c_str()); - if (!pgsql_variables.client_set_value(this, idx, value1.c_str(), true)) { - return false; - } - if (idx == PGSQL_DATESTYLE) { - // always set current_datestyle - current_datestyle = PgSQL_DateStyle_Util::parse_datestyle(value1); - // No need to set send_param_status to true, as the original DateStyle value may have been modified. - // When send_param_status is true, it always sends the original value provided by the user in the SET statement. - if (IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx])) { - param_status.emplace_back(var, value1); - } - } else { - send_param_status = IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx]); - } + uint32_t var_hash_int = SpookyHash::Hash32(value1.c_str(), value1.length(), 10); + if (current_hash != var_hash_int) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", var.c_str(), value1.c_str()); + if (!pgsql_variables.client_set_value(this, idx, value1.c_str(), true)) { + return false; + } + if (idx == PGSQL_DATESTYLE) { + // always set current_datestyle + current_datestyle = PgSQL_DateStyle_Util::parse_datestyle(value1); + // No need to set send_param_status to true, as the original DateStyle value may have been modified. + // When send_param_status is true, it always sends the original value provided by the user in the SET statement. + if (IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx])) { + param_status.emplace_back(var, value1); } + } else { + send_param_status = IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx]); } - } else if (std::find(pgsql_variables.ignore_vars.begin(), pgsql_variables.ignore_vars.end(), var) != pgsql_variables.ignore_vars.end()) { - // this is a variable we parse but ignore - // see MySQL_Variables::MySQL_Variables() for a list of ignored variables + } + } + } else if (std::find(pgsql_variables.ignore_vars.begin(), pgsql_variables.ignore_vars.end(), var) != pgsql_variables.ignore_vars.end()) { + // this is a variable we parse but ignore + // see MySQL_Variables::MySQL_Variables() for a list of ignored variables #ifdef DEBUG - std::string value1 = *values; - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Processing SET %s value %s\n", var.c_str(), value1.c_str()); + std::string value1 = *values; + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Processing SET %s value %s\n", var.c_str(), value1.c_str()); #endif // DEBUG - } else { - // At this point the variable is unknown to us, or it's a user variable - // prefixed by '@', in both cases, we should fail to parse. We don't - // fail inmediately so we can anyway keep track of the other variables - // supplied within the 'SET' statement being parsed. - failed_to_parse_var = true; - } + } else { + // At this point the variable is unknown to us, or it's a user variable + // prefixed by '@', in both cases, we should fail to parse. We don't + // fail inmediately so we can anyway keep track of the other variables + // supplied within the 'SET' statement being parsed. + failed_to_parse_var = true; + } - if (send_param_status) - param_status.emplace_back(var, *values); - } + if (send_param_status) + param_status.emplace_back(var, *values); + } - if (failed_to_parse_var) { - unable_to_parse_set_statement(lock_hostgroup); - return false; - } + if (failed_to_parse_var) { + unable_to_parse_set_statement(lock_hostgroup); + return false; + } - client_myds->DSS = STATE_QUERY_SENT_NET; - unsigned int nTrx = NumActiveTransactions(); - const char txn_state = (nTrx ? 'T' : 'I'); - client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, param_status); - RequestEnd(NULL); - l_free(pkt->size, pkt->ptr); - return true; - } else { - unable_to_parse_set_statement(lock_hostgroup); - return false; - } - } else if ((locked_on_hostgroup == -1) && (strncasecmp(dig, "RESET ", 6) == 0)) { -#ifdef DEBUG - { - std::string nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing RESET command = %s\n", nqn.c_str()); - } -#endif - if (index(dig, ';') && (index(dig, ';') != dig + strlen(dig) - 1)) { - string nqn; - if (pgsql_thread___parse_failure_logs_digest) - nqn = string(CurrentQuery.get_digest_text()); - else - nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_warning( - "Unable to parse multi-statements command with RESET statement from client" - " %s:%d: setting lock hostgroup. Command: %s\n", client_myds->addr.addr, - client_myds->addr.port, nqn.c_str() - ); - *lock_hostgroup = true; - return false; - } + client_myds->DSS = STATE_QUERY_SENT_NET; + unsigned int nTrx = NumActiveTransactions(); + const char txn_state = (nTrx ? 'T' : 'I'); + client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, param_status); + RequestEnd(NULL); + return true; + } else { + unable_to_parse_set_statement(lock_hostgroup); + return false; + } + return false; +} - string nq = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); +bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_RESET_command(const char* dig, bool* lock_hostgroup) { + std::string nq = std::string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); - RE2::GlobalReplace(&nq, "(?i)\\bRESET\\b", ""); - RE2::GlobalReplace(&nq, "[^\\w]*", ""); + RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); + RE2::GlobalReplace(&nq, "(?i)\\bRESET\\b", ""); + RE2::GlobalReplace(&nq, "[^\\w]*", ""); - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing RESET command %s\n", nq.c_str()); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing RESET command = %s\n", nq.c_str()); + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing RESET command %s\n", nq.c_str()); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing RESET command = %s\n", nq.c_str()); - std::vector> param_status = {}; + std::vector> param_status = {}; - if (strncasecmp(nq.c_str(), "ALL", 3) == 0) { + if (strncasecmp(nq.c_str(), "ALL", 3) == 0) { - for (int idx = 0; idx < PGSQL_NAME_LAST_LOW_WM; idx++) { + for (int idx = 0; idx < PGSQL_NAME_LAST_LOW_WM; idx++) { - const char* name = pgsql_tracked_variables[idx].set_variable_name; - auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); - // hash can never be 0 for critical variables - uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); - if (current_hash != hash) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", name, value); - if (!pgsql_variables.client_set_value(this, idx, value, false)) { - return false; - } - if (IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx])) { - param_status.emplace_back(name, value); - } - } + const char* name = pgsql_tracked_variables[idx].set_variable_name; + auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); + // hash can never be 0 for critical variables + uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); + if (current_hash != hash) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", name, value); + if (!pgsql_variables.client_set_value(this, idx, value, false)) { + return false; } + if (IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx])) { + param_status.emplace_back(name, value); + } + } + } - for (int idx : client_myds->myconn->dynamic_variables_idx) { - const char* name = pgsql_tracked_variables[idx].set_variable_name; - auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); - uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); - if (hash == 0 && current_hash != 0) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Resetting connection variable %s to DEFAULT\n", name); - pgsql_variables.client_reset_value(this, idx, false); - } else if (hash != 0 && current_hash != hash) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", name, value); - if (!pgsql_variables.client_set_value(this, idx, value, false)) { - return false; - } - } + for (int idx : client_myds->myconn->dynamic_variables_idx) { + const char* name = pgsql_tracked_variables[idx].set_variable_name; + auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); + uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); + if (hash == 0 && current_hash != 0) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Resetting connection variable %s to DEFAULT\n", name); + pgsql_variables.client_reset_value(this, idx, false); + } else if (hash != 0 && current_hash != hash) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", name, value); + if (!pgsql_variables.client_set_value(this, idx, value, false)) { + return false; } - client_myds->myconn->reorder_dynamic_variables_idx(); + } + } + client_myds->myconn->reorder_dynamic_variables_idx(); - } else if (std::find(pgsql_variables.ignore_vars.begin(), pgsql_variables.ignore_vars.end(), nq) != pgsql_variables.ignore_vars.end()) { - // this is a variable we parse but ignore + } else if (std::find(pgsql_variables.ignore_vars.begin(), pgsql_variables.ignore_vars.end(), nq) != pgsql_variables.ignore_vars.end()) { + // this is a variable we parse but ignore #ifdef DEBUG - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Processing RESET %s\n", nq.c_str()); + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Processing RESET %s\n", nq.c_str()); #endif // DEBUG - } else { - int idx = PGSQL_NAME_LAST_HIGH_WM; - for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { - if (i == PGSQL_NAME_LAST_LOW_WM) - continue; + } else { + int idx = PGSQL_NAME_LAST_HIGH_WM; + for (int i = 0; i < PGSQL_NAME_LAST_HIGH_WM; i++) { + if (i == PGSQL_NAME_LAST_LOW_WM) + continue; - if (variable_name_exists(pgsql_tracked_variables[i], nq.c_str()) == true) { - idx = i; - break; - } - } - if (idx != PGSQL_NAME_LAST_HIGH_WM) { - const char* name = pgsql_tracked_variables[idx].set_variable_name; - auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); - uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); - // Reset to default if hash is zero, means startup parameter is not set - if (hash == 0 && current_hash != 0) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Resetting connection variable %s to DEFAULT\n", name); - pgsql_variables.client_reset_value(this, idx, true); - } else if (hash != 0 && current_hash != hash) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", name, value); - if (!pgsql_variables.client_set_value(this, idx, value, true)) { - return false; - } - if (IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx])) { - param_status.emplace_back(name, value); - } - } - } else { - unable_to_parse_set_statement(lock_hostgroup); + if (variable_name_exists(pgsql_tracked_variables[i], nq.c_str()) == true) { + idx = i; + break; + } + } + if (idx != PGSQL_NAME_LAST_HIGH_WM) { + const char* name = pgsql_tracked_variables[idx].set_variable_name; + auto [value, hash] = client_myds->myconn->get_startup_parameter_and_hash((enum pgsql_variable_name)idx); + uint32_t current_hash = pgsql_variables.client_get_hash(this, idx); + // Reset to default if hash is zero, means startup parameter is not set + if (hash == 0 && current_hash != 0) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Resetting connection variable %s to DEFAULT\n", name); + pgsql_variables.client_reset_value(this, idx, true); + } else if (hash != 0 && current_hash != hash) { + proxy_debug(PROXY_DEBUG_MYSQL_COM, 8, "Changing connection %s to %s\n", name, value); + if (!pgsql_variables.client_set_value(this, idx, value, true)) { return false; } + if (IS_PGTRACKED_VAR_OPTION_SET_PARAM_STATUS(pgsql_tracked_variables[idx])) { + param_status.emplace_back(name, value); + } } - client_myds->DSS = STATE_QUERY_SENT_NET; - unsigned int nTrx = NumActiveTransactions(); - const char txn_state = (nTrx ? 'T' : 'I'); - client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, param_status); + } else { + unable_to_parse_set_statement(lock_hostgroup); + return false; + } + } + client_myds->DSS = STATE_QUERY_SENT_NET; + unsigned int nTrx = NumActiveTransactions(); + const char txn_state = (nTrx ? 'T' : 'I'); + client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, param_status); - if (mirror == false) { - RequestEnd(NULL); - } else { - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; - } - l_free(pkt->size, pkt->ptr); - return true; - } else if (strncasecmp(dig, "DISCARD ", 8) == 0) { -#ifdef DEBUG - { - std::string nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing DISCARD command = %s\n", nqn.c_str()); - } -#endif - if (index(dig, ';') && (index(dig, ';') != dig + strlen(dig) - 1)) { - string nqn; - if (pgsql_thread___parse_failure_logs_digest) - nqn = string(CurrentQuery.get_digest_text()); - else - nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_warning( - "Unable to parse multi-statements command with DISCARD statement from client" - " %s:%d: setting lock hostgroup. Command: %s\n", client_myds->addr.addr, - client_myds->addr.port, nqn.c_str() - ); - *lock_hostgroup = true; - return false; - } + if (mirror == false) { + RequestEnd(NULL); + } else { + client_myds->DSS = STATE_SLEEP; + status = WAITING_CLIENT_DATA; + } - std::string nq = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); + return true; +} - RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); - RE2::GlobalReplace(&nq, "(?i)\\bDISCARD\\b", ""); - RE2::GlobalReplace(&nq, "[^\\w]*", ""); +bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DISCARD_command(const char* dig) { - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing DISCARD command %s\n", nq.c_str()); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing DISCARD command = %s\n", nq.c_str()); - bool handled = false; - const char* discard_value = nq.c_str(); - if (strncasecmp(discard_value, "ALL", 3) == 0) { - // Backup the current relevant session values - int default_hostgroup = this->default_hostgroup; - bool transaction_persistent = this->transaction_persistent; + std::string nq = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - // Re-initialize the session - reset(); - init(); + RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); + RE2::GlobalReplace(&nq, "(?i)\\bDISCARD\\b", ""); + RE2::GlobalReplace(&nq, "[^\\w]*", ""); - // Recover the relevant session values - this->default_hostgroup = default_hostgroup; - this->transaction_persistent = transaction_persistent; - handled = true; - } else if (strncasecmp(discard_value, "PLANS", 5) == 0) { - // ignore - handled = true; - } + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing DISCARD command %s\n", nq.c_str()); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing DISCARD command = %s\n", nq.c_str()); + bool handled = false; + const char* discard_value = nq.c_str(); + if (strncasecmp(discard_value, "ALL", 3) == 0) { + // Backup the current relevant session values + int default_hostgroup = this->default_hostgroup; + bool transaction_persistent = this->transaction_persistent; - if (handled) { - client_myds->DSS = STATE_QUERY_SENT_NET; - unsigned int nTrx = NumActiveTransactions(); - const char txn_state = (nTrx ? 'T' : 'I'); - client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, {}); + // Re-initialize the session + reset(); + init(); - if (mirror == false) { - RequestEnd(NULL); - } - else { - client_myds->DSS = STATE_SLEEP; - status = WAITING_CLIENT_DATA; - } - l_free(pkt->size, pkt->ptr); - return true; - } - // send other DISCARD variants to Backend - } else if (strncasecmp(dig, "DEALLOCATE ", 11) == 0) { -#ifdef DEBUG - { - std::string nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing DEALLOCATE command = %s\n", nqn.c_str()); - } -#endif - if (index(dig, ';') && (index(dig, ';') != dig + strlen(dig) - 1)) { - string nqn; - if (pgsql_thread___parse_failure_logs_digest) - nqn = string(CurrentQuery.get_digest_text()); - else - nqn = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - proxy_warning( - "Unable to parse multi-statements command with DEALLOCATE statement from client" - " %s:%d: setting lock hostgroup. Command: %s\n", client_myds->addr.addr, - client_myds->addr.port, nqn.c_str() - ); - *lock_hostgroup = true; - return false; - } + // Recover the relevant session values + this->default_hostgroup = default_hostgroup; + this->transaction_persistent = transaction_persistent; + handled = true; + } else if (strncasecmp(discard_value, "PLANS", 5) == 0) { + // ignore + handled = true; + } - std::string nq = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); + if (handled) { + client_myds->DSS = STATE_QUERY_SENT_NET; + unsigned int nTrx = NumActiveTransactions(); + const char txn_state = (nTrx ? 'T' : 'I'); + client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, {}); - RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); - RE2::GlobalReplace(&nq, "(?i)\\bDEALLOCATE\\b(\\s+PREPARE)?", ""); - RE2::GlobalReplace(&nq, "[^\\w]*", ""); + if (mirror == false) { + RequestEnd(NULL); + } else { + client_myds->DSS = STATE_SLEEP; + status = WAITING_CLIENT_DATA; + } + return true; + } + // send other DISCARD variants to Backend + return false; +} - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing DEALLOCATE command %s\n", nq.c_str()); - proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing DEALLOCATE command = %s\n", nq.c_str()); +bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DEALLOCATE_command(const char* dig) { + + std::string nq = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); - const char* dealloc_value = nq.c_str(); - if (strncasecmp(dealloc_value, "ALL", 3) == 0) { - client_myds->myconn->local_stmts->client_close_all(); - } else { - client_myds->myconn->local_stmts->client_close(dealloc_value); - } - client_myds->DSS = STATE_QUERY_SENT_NET; - unsigned int nTrx = NumActiveTransactions(); - const char txn_state = (nTrx ? 'T' : 'I'); - client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, {}); + RE2::GlobalReplace(&nq, "(?U)/\\*.*\\*/", ""); + RE2::GlobalReplace(&nq, "(?i)\\bDEALLOCATE\\b(\\s+PREPARE)?", ""); + RE2::GlobalReplace(&nq, "[^\\w]*", ""); + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Parsing DEALLOCATE command %s\n", nq.c_str()); + proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "Parsing DEALLOCATE command = %s\n", nq.c_str()); + + const char* dealloc_value = nq.c_str(); + if (strncasecmp(dealloc_value, "ALL", 3) == 0) { + client_myds->myconn->local_stmts->client_close_all(); + } else { + if (client_myds->myconn->local_stmts->client_close(dealloc_value) == false) { + client_myds->DSS = STATE_QUERY_SENT_NET; + const std::string& errmsg = "prepared statement \"" + std::string(dealloc_value) + "\" does not exist"; + client_myds->myprot.generate_error_packet(true, true, errmsg.c_str(), PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, false, true); if (mirror == false) { RequestEnd(NULL); } else { client_myds->DSS = STATE_SLEEP; status = WAITING_CLIENT_DATA; } - l_free(pkt->size, pkt->ptr); return true; } } - if (mirror == true) { // for mirror session we exit here - current_hostgroup = qpo->destination_hostgroup; - return false; + client_myds->DSS = STATE_QUERY_SENT_NET; + unsigned int nTrx = NumActiveTransactions(); + const char txn_state = (nTrx ? 'T' : 'I'); + client_myds->myprot.generate_ok_packet(true, true, NULL, 0, dig, txn_state, NULL, {}); + + if (mirror == false) { + RequestEnd(NULL); + } else { + client_myds->DSS = STATE_SLEEP; + status = WAITING_CLIENT_DATA; } + return true; +} -#if 0 - // handle case #1797 - // handle case #2564 - if ((pkt->size == SELECT_CONNECTION_ID_LEN + 5 && *((char*)(pkt->ptr) + 4) == (char)0x03 && strncasecmp((char*)SELECT_CONNECTION_ID, (char*)pkt->ptr + 5, pkt->size - 5) == 0)) { - char buf[32]; - char buf2[32]; - sprintf(buf, "%u", thread_session_id); - int l0 = strlen("CONNECTION_ID()"); - memcpy(buf2, (char*)pkt->ptr + 5 + SELECT_CONNECTION_ID_LEN - l0, l0); - buf2[l0] = 0; - unsigned int nTrx = NumActiveTransactions(); - uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0); - if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; - PgSQL_Data_Stream* myds = client_myds; - MySQL_Protocol* myprot = &client_myds->myprot; - myds->DSS = STATE_QUERY_SENT_DS; - int sid = 1; - myprot->generate_pkt_column_count(true, NULL, NULL, sid, 1); sid++; - myprot->generate_pkt_field(true, NULL, NULL, sid, (char*)"", (char*)"", (char*)"", buf2, (char*)"", 63, 31, MYSQL_TYPE_LONGLONG, 161, 0, false, 0, NULL); sid++; - myds->DSS = STATE_COLUMN_DEFINITION; +bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_special_commands(const char* dig, bool* lock_hostgroup) { + if (!dig) return false; - bool deprecate_eof_active = myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF; - if (!deprecate_eof_active) { - myprot->generate_pkt_EOF(true, NULL, NULL, sid, 0, setStatus); sid++; + if (locked_on_hostgroup == -1) { + if (strncasecmp(dig, "SET ", 4) == 0) { + if (is_multi_statement_command("SET") == true) { + *lock_hostgroup = true; + return false; + } + if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_SET_command(dig, lock_hostgroup)) { + return true; + } + return false; + } else if (strncasecmp(dig, "RESET ", 6) == 0) { + if (is_multi_statement_command("RESET") == true) { + *lock_hostgroup = true; + return false; + } + if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_RESET_command(dig, lock_hostgroup)) { + return true; + } + return false; + } + } + if (strncasecmp(dig, "DISCARD ", 8) == 0) { + if (is_multi_statement_command("DISCARD") == true) { + *lock_hostgroup = true; + return false; + } + if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DISCARD_command(dig)) { + return true; } + } else if (strncasecmp(dig, "DEALLOCATE ", 11) == 0) { + if (is_multi_statement_command("DEALLOCATE") == true) { + *lock_hostgroup = true; + return false; + } + if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DEALLOCATE_command(dig)) { + return true; + } + } + // If we reach here, it means the command was not handled + return false; +} - char** p = (char**)malloc(sizeof(char*) * 1); - unsigned long* l = (unsigned long*)malloc(sizeof(unsigned long*) * 1); - l[0] = strlen(buf); - p[0] = buf; - myprot->generate_pkt_row(true, NULL, NULL, sid, 1, l, p); sid++; - myds->DSS = STATE_ROW; +bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t* pkt, bool* lock_hostgroup, PgSQL_Extended_Query_Type stmt_type) { + /* + lock_hostgroup: + If this variable is set to true, this session will get lock to a + specific hostgroup, and also have multiplexing disabled. + It means that parsing the query wasn't completely possible (mostly + a SET statement) and proxysql won't be able to set the same variable + in another connection. + */ - if (deprecate_eof_active) { - myprot->generate_pkt_OK(true, NULL, NULL, sid, 0, 0, setStatus, 0, NULL, true); sid++; + // Initial query processing + if (stmt_type == PGSQL_EXTENDED_QUERY_TYPE_NOT_SET || + stmt_type == PGSQL_EXTENDED_QUERY_TYPE_PARSE) { + + if (qpo->new_query) { + handler_WCD_SS_MCQ_qpo_QueryRewrite(pkt); } - else { - myprot->generate_pkt_EOF(true, NULL, NULL, sid, 0, setStatus); sid++; + + if (pkt->size > (unsigned int)pgsql_thread___max_allowed_packet) { + handler_WCD_SS_MCQ_qpo_LargePacket(pkt); + return true; + } + + if (qpo->error_msg) { + handler_WCD_SS_MCQ_qpo_error_msg(pkt); + return true; } - myds->DSS = STATE_SLEEP; - RequestEnd(NULL); - l_free(pkt->size, pkt->ptr); - free(p); - free(l); - return true; } - // handle case #1421 , about LAST_INSERT_ID - if (CurrentQuery.QueryParserArgs.digest_text) { - char* dig = CurrentQuery.QueryParserArgs.digest_text; - if (strcasestr(dig, "LAST_INSERT_ID") || strcasestr(dig, "@@IDENTITY")) { - // we need to try to execute it where the last write was successful - if (last_HG_affected_rows >= 0) { - PgSQL_Backend* _mybe = NULL; - _mybe = find_backend(last_HG_affected_rows); - if (_mybe) { - if (_mybe->server_myds) { - if (_mybe->server_myds->myconn) { - if (_mybe->server_myds->myconn->pgsql_conn) { // we have an established connection - // this seems to be the right backend - qpo->destination_hostgroup = last_HG_affected_rows; - current_hostgroup = qpo->destination_hostgroup; - return false; // execute it on backend! - } - } - } - } - } + if (stmt_type == PGSQL_EXTENDED_QUERY_TYPE_NOT_SET || + stmt_type == PGSQL_EXTENDED_QUERY_TYPE_EXECUTE) { + if (qpo->OK_msg) { + handler_WCD_SS_MCQ_qpo_OK_msg(pkt); + return true; + } + } - // if we reached here, we don't know the right backend - // we try to determine if it is a simple "SELECT LAST_INSERT_ID()" or "SELECT @@IDENTITY" and we return pgsql->last_insert_id + // Check if the session is not locked on a hostgroup and there are untracked option parameters + if (locked_on_hostgroup < 0 && untracked_option_parameters.empty() == false) { + if (client_myds && client_myds->addr.addr) { + proxy_warning("Unknown connection options from client %s:%d. Setting lock_hostgroup. Please report a bug for future enhancements:%s\n", client_myds->addr.addr, client_myds->addr.port, untracked_option_parameters.c_str()); + } + else { + // Log a warning message without client address and port + proxy_warning("Unknown connection options. Setting lock_hostgroup. Please report a bug for future enhancements:%s\n", untracked_option_parameters.c_str()); + } - //handle 2564 - if ( - (pkt->size == SELECT_LAST_INSERT_ID_LEN + 5 && *((char*)(pkt->ptr) + 4) == (char)0x03 && strncasecmp((char*)SELECT_LAST_INSERT_ID, (char*)pkt->ptr + 5, pkt->size - 5) == 0) - || - (pkt->size == SELECT_LAST_INSERT_ID_LIMIT1_LEN + 5 && *((char*)(pkt->ptr) + 4) == (char)0x03 && strncasecmp((char*)SELECT_LAST_INSERT_ID_LIMIT1, (char*)pkt->ptr + 5, pkt->size - 5) == 0) - || - (pkt->size == SELECT_VARIABLE_IDENTITY_LEN + 5 && *((char*)(pkt->ptr) + 4) == (char)0x03 && strncasecmp((char*)SELECT_VARIABLE_IDENTITY, (char*)pkt->ptr + 5, pkt->size - 5) == 0) - || - (pkt->size == SELECT_VARIABLE_IDENTITY_LIMIT1_LEN + 5 && *((char*)(pkt->ptr) + 4) == (char)0x03 && strncasecmp((char*)SELECT_VARIABLE_IDENTITY_LIMIT1, (char*)pkt->ptr + 5, pkt->size - 5) == 0) - ) { - char buf[32]; - sprintf(buf, "%llu", last_insert_id); - char buf2[32]; - int l0 = 0; - if (strcasestr(dig, "LAST_INSERT_ID")) { - l0 = strlen("LAST_INSERT_ID()"); - memcpy(buf2, (char*)pkt->ptr + 5 + SELECT_LAST_INSERT_ID_LEN - l0, l0); - } - else if (strcasestr(dig, "@@IDENTITY")) { - l0 = strlen("@@IDENTITY"); - memcpy(buf2, (char*)pkt->ptr + 5 + SELECT_VARIABLE_IDENTITY_LEN - l0, l0); - } - buf2[l0] = 0; - unsigned int nTrx = NumActiveTransactions(); - uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0); - if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; - PgSQL_Data_Stream* myds = client_myds; - MySQL_Protocol* myprot = &client_myds->myprot; - myds->DSS = STATE_QUERY_SENT_DS; - int sid = 1; - myprot->generate_pkt_column_count(true, NULL, NULL, sid, 1); sid++; - myprot->generate_pkt_field(true, NULL, NULL, sid, (char*)"", (char*)"", (char*)"", buf2, (char*)"", 63, 31, MYSQL_TYPE_LONGLONG, 161, 0, false, 0, NULL); sid++; - myds->DSS = STATE_COLUMN_DEFINITION; - - bool deprecate_eof_active = myds->myconn->options.client_flag & CLIENT_DEPRECATE_EOF; - if (!deprecate_eof_active) { - myprot->generate_pkt_EOF(true, NULL, NULL, sid, 0, setStatus); sid++; - } - char** p = (char**)malloc(sizeof(char*) * 1); - unsigned long* l = (unsigned long*)malloc(sizeof(unsigned long*) * 1); - l[0] = strlen(buf); - p[0] = buf; - myprot->generate_pkt_row(true, NULL, NULL, sid, 1, l, p); sid++; - myds->DSS = STATE_ROW; - if (deprecate_eof_active) { - myprot->generate_pkt_OK(true, NULL, NULL, sid, 0, 0, setStatus, 0, NULL, true); sid++; - } - else { - myprot->generate_pkt_EOF(true, NULL, NULL, sid, 0, setStatus); sid++; - } - myds->DSS = STATE_SLEEP; - RequestEnd(NULL); - l_free(pkt->size, pkt->ptr); - free(p); - free(l); - return true; - } + // If there are untracked option parameters, lock the hostgroup + *lock_hostgroup = true; + + // Always create a new connection to pass untracked options to the server + qpo->create_new_conn = true; + return false; + } - // if we reached here, we don't know the right backend and we cannot answer the query directly - // We continue the normal way + // Exit early for specific statement types + switch (stmt_type) { + case PGSQL_EXTENDED_QUERY_TYPE_NOT_SET: + case PGSQL_EXTENDED_QUERY_TYPE_EXECUTE: + break; + default: + goto __exit_set_destination_hostgroup; + } - // as a precaution, we reset cache_ttl - qpo->cache_ttl = 0; + // Handle special commands + if (CurrentQuery.QueryParserArgs.digest_text) { + const char* dig = CurrentQuery.QueryParserArgs.digest_text; + if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_special_commands(dig, lock_hostgroup)) { + l_free(pkt->size, pkt->ptr); + return true; } } -#endif - // handle command KILL #860 + // Mirror session handling + if (mirror) { + current_hostgroup = qpo->destination_hostgroup; + return false; + } + + // Handle KILL command //if (prepared == false) { if (handle_command_query_kill(pkt)) { return true; } - //} + // + // Query cache handling if (qpo->cache_ttl > 0 && ((stmt_type & PGSQL_EXTENDED_QUERY_TYPE_EXECUTE) == 0)) { - const std::shared_ptr pgsql_qc_entry = GloPgQC->get( client_myds->myconn->userinfo->hash, (const unsigned char*)CurrentQuery.QueryPointer, @@ -4467,16 +4347,18 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_Q thread->curtime / 1000, qpo->cache_ttl ); + if (pgsql_qc_entry) { // FIXME: Add Error Transaction state detection unsigned int nTrx = NumActiveTransactions(); - PgSQL_Data_Stream::copy_buffer_to_resultset(client_myds->PSarrayOUT, + PgSQL_Data_Stream::copy_buffer_to_resultset(client_myds->PSarrayOUT, pgsql_qc_entry->value, pgsql_qc_entry->length, (nTrx ? 'T' : 'I')); - //client_myds->PSarrayOUT->copy_add(resultset, 0, resultset->len); + if (transaction_persistent_hostgroup == -1) { // not active, we can change it current_hostgroup = -1; } + RequestEnd(NULL); l_free(pkt->size, pkt->ptr); return true; @@ -4484,31 +4366,31 @@ bool PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_Q } __exit_set_destination_hostgroup: - + // Set query flags and hostgroups if (qpo->next_query_flagIN >= 0) { next_query_flagIN = qpo->next_query_flagIN; } - if (qpo->destination_hostgroup >= 0) { - if (transaction_persistent_hostgroup == -1) { - current_hostgroup = qpo->destination_hostgroup; - } + + if (qpo->destination_hostgroup >= 0 && transaction_persistent_hostgroup == -1) { + current_hostgroup = qpo->destination_hostgroup; } - if (pgsql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6 - if (locked_on_hostgroup >= 0) { - if (current_hostgroup != locked_on_hostgroup) { - client_myds->DSS = STATE_QUERY_SENT_NET; - char buf[140]; - sprintf(buf, "ProxySQL Error: connection is locked to hostgroup %d but trying to reach hostgroup %d", locked_on_hostgroup, current_hostgroup); - client_myds->myprot.generate_error_packet(true, true, buf, - PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, false); - thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; - RequestEnd(NULL); - l_free(pkt->size, pkt->ptr); - return true; - } + // Hostgroup locking check + if (pgsql_thread___set_query_lock_on_hostgroup == 1 && locked_on_hostgroup >= 0) { + if (current_hostgroup != locked_on_hostgroup) { + client_myds->DSS = STATE_QUERY_SENT_NET; + char buf[140]; + sprintf(buf, "ProxySQL Error: connection is locked to hostgroup %d but trying to reach hostgroup %d", + locked_on_hostgroup, current_hostgroup); + client_myds->myprot.generate_error_packet(true, true, buf, + PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, false); + thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; + RequestEnd(NULL); + l_free(pkt->size, pkt->ptr); + return true; } } + return false; } @@ -4849,9 +4731,6 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da } } else { // if query result is empty, means there was an error before query result was generated - if (!_conn->is_error_present()) - assert(0); // if query result is empty, there should be an error present in connection. - if (_myds && _myds->killed_at) { if (_myds->kill_type == 0) { client_myds->myprot.generate_error_packet(true, true, (char*)"Query execution was interrupted, query_timeout exceeded", @@ -4863,42 +4742,13 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da PGSQL_ERROR_CODES::ERRCODE_QUERY_CANCELED, false); //PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _conn->parent->myhgc->hid, _conn->parent->address, _conn->parent->port, 1317); } - } - else { + } else { + if (!_conn->is_error_present()) + assert(0); // if query result is empty, there should be an error present in connection. + client_myds->myprot.generate_error_packet(true, true, _conn->get_error_message().c_str(), _conn->get_error_code(), false); //PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _conn->parent->myhgc->hid, _conn->parent->address, _conn->parent->port, 1907); } - - /*int myerrno = mysql_errno(pgsql); - if (myerrno == 0) { - unsigned int num_rows = mysql_affected_rows(pgsql); - uint16_t setStatus = (active_transactions ? SERVER_STATUS_IN_TRANS : 0); - if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT; - if (pgsql->server_status & SERVER_MORE_RESULTS_EXIST) - setStatus |= SERVER_MORE_RESULTS_EXIST; - setStatus |= (pgsql->server_status & ~SERVER_STATUS_AUTOCOMMIT); // get flags from server_status but ignore autocommit - setStatus = setStatus & ~SERVER_STATUS_CURSOR_EXISTS; // Do not send cursor #1128 - client_myds->myprot.generate_pkt_OK(true, NULL, NULL, client_myds->pkt_sid + 1, num_rows, pgsql->insert_id, setStatus, warning_count, pgsql->info); - //client_myds->pkt_sid++; - } - else { - // error - char sqlstate[10]; - sprintf(sqlstate, "%s", mysql_sqlstate(pgsql)); - if (_myds && _myds->killed_at) { // see case #750 - if (_myds->kill_type == 0) { - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, 1907, sqlstate, (char*)"Query execution was interrupted, query_timeout exceeded"); - } - else { - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, 1317, sqlstate, (char*)"Query execution was interrupted"); - } - } - else { - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, mysql_errno(pgsql), sqlstate, mysql_error(pgsql)); - } - //client_myds->pkt_sid++; - } - */ } } @@ -5584,20 +5434,23 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg // CurrentQuery.stmt_client_name may briefly become a dangling pointer until CurrentQuery.end() is invoked extended_query_info.stmt_client_name = parse_data.stmt_name; - timespec begint; - timespec endt; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); - } - qpo = GloPgQPro->process_query(this, (unsigned char*)parse_data.query_string, strlen(parse_data.query_string) + 1, &CurrentQuery); - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); - thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + - (endt.tv_sec * 1000000000 + endt.tv_nsec) - - (begint.tv_sec * 1000000000 + begint.tv_nsec); + if (extended_query_exec_qp) { + timespec begint; + timespec endt; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); + } + qpo = GloPgQPro->process_query(this, (unsigned char*)parse_data.query_string, strlen(parse_data.query_string) + 1, &CurrentQuery); + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec * 1000000000 + endt.tv_nsec) - + (begint.tv_sec * 1000000000 + begint.tv_nsec); + } + assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo + } - assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo - + if (parse_data.num_param_types > 0) { Parse_Param_Types parse_param_type; parse_param_type.resize(parse_data.num_param_types); @@ -5613,11 +5466,20 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg auto parse_pkt = parse_msg->detach(); // detach the packet from the parse message - // setting 'prepared' to prevent fetching results from the cache if the digest matches - bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE); - if (handled_in_handler == true) - // no need to release parse_pkt, it has been released in handler - return 0; + if (extended_query_exec_qp) { + // setting 'prepared' to prevent fetching results from the cache if the digest matches + bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&parse_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_PARSE); + if (handled_in_handler == true) { + // parse_pkt memory is already freed in the handler + return 0; + } + extended_query_exec_qp = false; // reset the flag, we have processed the query and destination_hostgroup is set + } else { + assert(previous_hostgroup != -1); // previous_hostgroup should be set before + current_hostgroup = previous_hostgroup; // reset current hostgroup to previous hostgroup + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Session=%p client_myds=%p. Using previous hostgroup '%d'\n", + this, client_myds, previous_hostgroup); + } if (pgsql_thread___set_query_lock_on_hostgroup == 1) { if (locked_on_hostgroup < 0) { @@ -5701,7 +5563,6 @@ int PgSQL_Session::handle_post_sync_parse_message(PgSQL_Parse_Message* parse_msg mybe->server_myds->wait_until = 0; mybe->server_myds->killed_at = 0; mybe->server_myds->kill_type = 0; - mybe->server_myds->pgsql_real_query.init(&parse_pkt); // Transfer packet ownership mybe->server_myds->statuses.questions++; @@ -5784,24 +5645,25 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des extended_query_info.stmt_type = stmt_type; CurrentQuery.start_time = thread->curtime; - timespec begint; - timespec endt; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); - } - qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery); - assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo + if (extended_query_exec_qp) { + timespec begint; + timespec endt; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); + } + qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery); + assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo - if (qpo->max_lag_ms >= 0) { - thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; - } - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); - thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + - (endt.tv_sec * 1000000000 + endt.tv_nsec) - - (begint.tv_sec * 1000000000 + begint.tv_nsec); + if (qpo->max_lag_ms >= 0) { + thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; + } + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec * 1000000000 + endt.tv_nsec) - + (begint.tv_sec * 1000000000 + begint.tv_nsec); + } } - // Use cached stmt_metadata only for statements; for portals, forward the describe request to backend. if (extended_query_info.stmt_type == 'S') { stmt_info->rdlock(); @@ -5822,15 +5684,24 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des stmt_info->unlock(); } - auto describe_pkt = describe_msg->detach(); // detach the packet from the describe message // setting 'prepared' to prevent fetching results from the cache if the digest matches - bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&describe_pkt, - &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE); - if (handled_in_handler == true) { - // no need to free describe_pkt, it is already freed in the handler - return 0; + if (extended_query_exec_qp) { + auto describe_pkt = describe_msg->get_raw_pkt(); + bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&describe_pkt, + &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE); + if (handled_in_handler == true) { + // describe_pkt memory is already freed in the handler + // we can detach the describe_msg now + describe_msg->detach(); + return 0; + } + extended_query_exec_qp = false; + } else { + assert(previous_hostgroup != -1); // previous_hostgroup should be set before + current_hostgroup = previous_hostgroup; // reset current hostgroup to previous hostgroup + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Session=%p client_myds=%p. Using previous hostgroup '%d'\n", + this, client_myds, previous_hostgroup); } - if (pgsql_thread___set_query_lock_on_hostgroup == 1) { if (locked_on_hostgroup < 0) { if (lock_hostgroup) { @@ -5843,7 +5714,6 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des handle_post_sync_locked_on_hostgroup_error(CurrentQuery.extended_query_info.stmt_info->query, CurrentQuery.extended_query_info.stmt_info->query_length); RequestEnd(NULL); - l_free(describe_pkt.size, describe_pkt.ptr); return 2; } } @@ -5856,6 +5726,7 @@ int PgSQL_Session::handle_post_sync_describe_message(PgSQL_Describe_Message* des pause_until = 0; mybe->server_myds->killed_at = 0; mybe->server_myds->kill_type = 0; + auto describe_pkt = describe_msg->detach(); // detach the packet from the describe message mybe->server_myds->pgsql_real_query.init(&describe_pkt); // Transfer packet ownership mybe->server_myds->statuses.questions++; client_myds->setDSS_STATE_QUERY_SENT_NET(); @@ -5866,7 +5737,6 @@ int PgSQL_Session::handle_post_sync_close_message(PgSQL_Close_Message* close_msg PROXY_TRACE(); thread->status_variables.stvar[st_var_frontend_stmt_close]++; thread->status_variables.stvar[st_var_queries]++; - const PgSQL_Close_Data& close_data = close_msg->data(); // this will always be a valid pointer uint8_t stmt_type = close_data.stmt_type; @@ -5902,15 +5772,16 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) { thread->status_variables.stvar[st_var_queries]++; const PgSQL_Bind_Data& bind_data = bind_msg->data(); + bool lock_hostgroup = false; + const char* portal_name = bind_data.portal_name; + const char* stmt_client_name = bind_data.stmt_name; - if (bind_data.portal_name[0] != '\0') { + if (portal_name[0] != '\0') { // we don't support portals yet handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_FEATURE_NOT_SUPPORTED, "only unnamed portals are supported", false); return 2; } - const char* stmt_client_name = bind_data.stmt_name; - uint64_t stmt_global_id = client_myds->myconn->local_stmts->find_global_id_from_stmt_name(stmt_client_name); if (stmt_global_id == 0) { const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : @@ -5919,6 +5790,76 @@ int PgSQL_Session::handle_post_sync_bind_message(PgSQL_Bind_Message* bind_msg) { return 2; } + // now we get the statement information + PgSQL_STMT_Global_info* stmt_info = GloPgStmt->find_prepared_statement_by_stmt_id(stmt_global_id); + if (stmt_info == NULL) { + // we couldn't find it + const std::string& errmsg = stmt_client_name[0] != '\0' ? ("prepared statement \"" + std::string(stmt_client_name) + "\" does not exist") : + "unnamed prepared statement does not exist"; + handle_post_sync_error(PGSQL_ERROR_CODES::ERRCODE_INVALID_SQL_STATEMENT_NAME, errmsg.c_str(), false); + return 2; + } + + PgSQL_Extended_Query_Info& extended_query_info = CurrentQuery.extended_query_info; + extended_query_info.stmt_client_name = stmt_client_name; + extended_query_info.stmt_client_portal_name = portal_name; + extended_query_info.stmt_global_id = stmt_global_id; + extended_query_info.stmt_info = stmt_info; + CurrentQuery.start_time = thread->curtime; + + if (extended_query_exec_qp) { + timespec begint; + timespec endt; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); + } + qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery); + assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo + + if (qpo->max_lag_ms >= 0) { + thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; + } + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec * 1000000000 + endt.tv_nsec) - + (begint.tv_sec * 1000000000 + begint.tv_nsec); + } + + auto bind_pkt = bind_msg->get_raw_pkt(); + + bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&bind_pkt, + &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_BIND); + if (handled_in_handler == true) { + // bind_msg now has dangling pointer to bind_pkt, which is already freed in the handler + bind_msg->detach(); // detach the packet from the bind message + return 0; + } + extended_query_exec_qp = false; + } else { + assert(previous_hostgroup != -1); // previous_hostgroup should be set before + current_hostgroup = previous_hostgroup; // reset current hostgroup to previous hostgroup + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Session=%p client_myds=%p. Using previous hostgroup '%d'\n", + this, client_myds, previous_hostgroup); + } + + if (pgsql_thread___set_query_lock_on_hostgroup == 1) { + if (locked_on_hostgroup < 0) { + if (lock_hostgroup) { + // we are locking on hostgroup now + locked_on_hostgroup = current_hostgroup; + } + } + if (locked_on_hostgroup >= 0) { + if (current_hostgroup != locked_on_hostgroup) { + handle_post_sync_locked_on_hostgroup_error(CurrentQuery.extended_query_info.stmt_info->query, + CurrentQuery.extended_query_info.stmt_info->query_length); + RequestEnd(NULL); + return 2; + } + } + } + bind_waiting_for_execute.reset(bind_msg->release()); // release the ownership of the bind message client_myds->setDSS_STATE_QUERY_SENT_NET(); unsigned int nTxn = NumActiveTransactions(); @@ -5982,12 +5923,24 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu timespec begint; timespec endt; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); - } - qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery); - assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo + if (extended_query_exec_qp) { + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); + } + qpo = GloPgQPro->process_query(this, nullptr, 0, &CurrentQuery); + assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo + + if (qpo->max_lag_ms >= 0) { + thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; + } + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec * 1000000000 + endt.tv_nsec) - + (begint.tv_sec * 1000000000 + begint.tv_nsec); + } + } // required for SET statement parsing CurrentQuery.QueryPointer = (unsigned char*)stmt_info->query; CurrentQuery.QueryLength = stmt_info->query_length; @@ -5996,22 +5949,29 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu CurrentQuery.QueryParserArgs.first_comment = stmt_info->first_comment ? strdup(stmt_info->first_comment) : nullptr; // - if (qpo->max_lag_ms >= 0) { - thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; - } - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); - thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + - (endt.tv_sec * 1000000000 + endt.tv_nsec) - - (begint.tv_sec * 1000000000 + begint.tv_nsec); - } - - auto execute_pkt = execute_msg->detach(); // detach the packet from the describe message - // setting 'prepared' to prevent fetching results from the cache if the digest matches - bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&execute_pkt, &lock_hostgroup, PGSQL_EXTENDED_QUERY_TYPE_EXECUTE); - if (handled_in_handler == true) { - // no need to free execute_pkt, it is already freed in the handler - return 0; + if (extended_query_exec_qp) { + auto execute_pkt = execute_msg->get_raw_pkt(); // detach the packet from the describe message + // setting 'prepared' to prevent fetching results from the cache if the digest matches + bool handled_in_handler = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(&execute_pkt, &lock_hostgroup, + PGSQL_EXTENDED_QUERY_TYPE_EXECUTE); + if (handled_in_handler == true) { + // execute_pkt memory is already freed in the handler + execute_msg->detach(); // detach the packet from the execute message + return 0; + } + extended_query_exec_qp = false; + } else { + assert(previous_hostgroup != -1); // previous_hostgroup should be set before + if (CurrentQuery.QueryParserArgs.digest_text) { + const char* dig = CurrentQuery.QueryParserArgs.digest_text; + if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_special_commands(dig, &lock_hostgroup)) { + // if we are here, it means we have handled the special command + return 0; + } + } + current_hostgroup = previous_hostgroup; // reset current hostgroup to previous hostgroup + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Session=%p client_myds=%p. Using previous hostgroup '%d'\n", + this, client_myds, previous_hostgroup); } if (pgsql_thread___set_query_lock_on_hostgroup == 1) { @@ -6026,7 +5986,6 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu handle_post_sync_locked_on_hostgroup_error(CurrentQuery.extended_query_info.stmt_info->query, CurrentQuery.extended_query_info.stmt_info->query_length); RequestEnd(NULL); - l_free(execute_pkt.size, execute_pkt.ptr); return 2; } } @@ -6038,6 +5997,7 @@ int PgSQL_Session::handle_post_sync_execute_message(PgSQL_Execute_Message* execu pause_until = 0; mybe->server_myds->killed_at = 0; mybe->server_myds->kill_type = 0; + auto execute_pkt = execute_msg->detach(); // detach the packet from the execute message mybe->server_myds->pgsql_real_query.init(&execute_pkt); // Transfer ownership of the packet mybe->server_myds->statuses.questions++; client_myds->setDSS_STATE_QUERY_SENT_NET();