diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index c25982e6e..cf32e7c16 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -520,6 +520,8 @@ public: bool IsActiveTransaction(); bool IsKnownActiveTransaction(); bool IsServerOffline(); + + bool is_connection_in_reusable_state() const; int get_server_version() { return PQserverVersion(pgsql_conn); @@ -538,11 +540,15 @@ public: return false; } + PGSQL_ERROR_SEVERITY get_error_severity() const { + return error_info.severity; + } + PGSQL_ERROR_CATEGORY get_error_category() const { return error_info.category; } - std::string get_error_message() const { + const std::string& get_error_message() const { return error_info.message; } @@ -562,8 +568,19 @@ public: PgSQL_Error_Helper::fill_error_info(error_info, code, message, is_fatal ? "FATAL" : "ERROR"); } + void set_error(PGSQL_ERROR_CODES code, const char* message, bool is_fatal) { + PgSQL_Error_Helper::fill_error_info(error_info, code, message, is_fatal ? + PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL : PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR); + } + void set_error_from_result(const PGresult* result, uint16_t ext_fields = 0) { - PgSQL_Error_Helper::fill_error_info(error_info, result, ext_fields); + if (result) { + PgSQL_Error_Helper::fill_error_info(error_info, result, ext_fields); + } else { + const char* errmsg = PQerrorMessage(pgsql_conn); + set_error(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, errmsg ? errmsg : "Unknown error", true); + //PgSQL_Error_Helper::fill_error_info_from_error_message(error_info, errmsg); + } } void reset_error() { diff --git a/include/PgSQL_Error_Helper.h b/include/PgSQL_Error_Helper.h index 34d1a4472..5ae88da5f 100644 --- a/include/PgSQL_Error_Helper.h +++ b/include/PgSQL_Error_Helper.h @@ -388,12 +388,18 @@ void reset_error_info(PgSQL_ErrorInfo& err_info, bool release_extented); class PgSQL_Error_Helper { public: - static constexpr const char* get_error_code(PGSQL_ERROR_CODES err_code) { - return error_code_str[static_cast(err_code)]; + static constexpr const char* get_error_code(PGSQL_ERROR_CODES code) { + return error_code_str[static_cast(code)]; + } + + static constexpr const char* get_severity(PGSQL_ERROR_SEVERITY severity) { + return severity_str[static_cast(severity)]; } static void fill_error_info(PgSQL_ErrorInfo& err_info, const PGresult* result, uint16_t ext_fields); static void fill_error_info(PgSQL_ErrorInfo& err_info, const char* code, const char* msg, const char* severity); + static void fill_error_info(PgSQL_ErrorInfo& err_info, PGSQL_ERROR_CODES code, const char* msg, PGSQL_ERROR_SEVERITY severity); + //static void fill_error_info_from_error_message(PgSQL_ErrorInfo& err_info, const char* error_msg); private: static PGSQL_ERROR_CODES identify_error_code(const char* code); @@ -646,6 +652,19 @@ private: }; static_assert(static_cast(PGSQL_ERROR_CODES::PGSQL_ERROR_CODES_COUNT) == sizeof(error_code_str) / sizeof(char*), "Mismatch between PGSQL_ERROR_CODES_COUNT and error_code_str array size"); + + static constexpr const char* severity_str[] = { + "UNKNOWN", + "FATAL", + "PANIC", + "ERROR", + "WARNING", + "NOTICE", + "DEBUG", + "INFO", + "LOG" + }; + static_assert(static_cast(PGSQL_ERROR_SEVERITY::PGSQL_ERROR_SEVERITY_COUNT) == sizeof(severity_str) / sizeof(char*), "Mismatch between PGSQL_ERROR_SEVERITY_COUNT and severity_str array size"); }; #define PGSQL_GET_ERROR_CODE_STR(ENUM_CODE) PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ENUM_CODE) diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index 1b0f7db96..62b533a8b 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -183,12 +183,12 @@ private: class PgSQL_Protocol; -#define PGSQL_QUERY_RESULT_EMPTY 0x00 +#define PGSQL_QUERY_RESULT_NO_DATA 0x00 #define PGSQL_QUERY_RESULT_TUPLE 0x01 -#define PGSQL_QUERY_RESULT_COMMAND 0x02 +#define PGSQL_QUERY_RESULT_COMMAND 0x02 #define PGSQL_QUERY_RESULT_READY 0x04 #define PGSQL_QUERY_RESULT_ERROR 0x08 -#define PGSQL_QUERY_RESULT_WARNING 0x10 +#define PGSQL_QUERY_RESULT_EMPTY 0x10 class PgSQL_Query_Result { public: @@ -200,6 +200,7 @@ public: unsigned int add_row(const PGresult* result); unsigned int add_command_completion(const PGresult* result); unsigned int add_error(const PGresult* result); + unsigned int add_empty_query_response(const PGresult* result); unsigned int add_ready_status(PGTransactionStatusType txn_status); bool get_resultset(PtrSizeArray* PSarrayFinal); @@ -257,7 +258,9 @@ public: unsigned int copy_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); unsigned int copy_command_completion_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); + unsigned int copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); unsigned int copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status); + private: bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr); void load_conn_parameters(pgsql_hdr* pkt, bool startup); diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index 02bb94fb0..292461230 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -221,11 +221,11 @@ private: void SetQueryTimeout(); bool handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params); void handler_rc0_PROCESSING_STMT_EXECUTE(PgSQL_Data_Stream* myds); - bool handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds, int myerr, char** errmsg); - void handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn, int myerr, char* errmsg); - bool handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int myerr, char** errmsg, int& handler_ret); - void handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn, bool& wrong_pass); - void handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn); + bool handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds); + void handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn); + bool handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int& handler_ret); + void handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, bool& wrong_pass); + void handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds); int RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn); void handler___status_WAITING_CLIENT_DATA(); void handler_rc0_Process_GTID(PgSQL_Connection* myconn); @@ -410,9 +410,10 @@ public: bool known_query_for_locked_on_hostgroup(uint64_t); void unable_to_parse_set_statement(bool*); bool has_any_backend(); - void detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, int myerr, const char* message, bool verbose = false); + void detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, bool verbose = false); void generate_status_one_hostgroup(int hid, std::string& s); void reset_warning_hostgroup_flag_and_release_connection(); + void set_previous_status_mode3(bool allow_execute = true); }; #define PgSQL_KILL_QUERY 1 diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 56c8c5f28..07477fec1 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -107,8 +107,8 @@ enum PgSQL_Thread_status_variable { st_var_automatic_detected_sqli, st_var_mysql_whitelisted_sqli_fingerprint, st_var_client_host_error_killed_connections, - st_var_END*/ - PG_st_var_END + */ + PG_st_var_END = 42 // to avoid ASAN complaining. TO FIX }; class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index e0cda1bb2..30dbf1958 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -2198,7 +2198,7 @@ void MySQL_HostGroups_Manager::update_table_mysql_servers_for_monitor(bool lock) int cols = 0; int affected_rows = 0; SQLite3_result* resultset = NULL; - char* query = const_cast("SELECT hostname, port, status, use_ssl FROM mysql_servers WHERE status != 3 GROUP BY hostname, port"); + const char* query = "SELECT hostname, port, status, use_ssl FROM mysql_servers WHERE status != 3 GROUP BY hostname, port"; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index af200ea00..05b45bc6e 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -3193,26 +3193,53 @@ handler_again: const PGresult* result = get_last_result(); if (result) { - switch (PQresultStatus(result)) { - case PGRES_COMMAND_OK: - query_result->add_command_completion(result); - NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); - break; - case PGRES_TUPLES_OK: - case PGRES_SINGLE_TUPLE: - break; - default: - { - set_error_from_result(result, PGSQL_ERROR_FIELD_ALL); - query_result->add_error(result); - const PGSQL_ERROR_CATEGORY error_category = get_error_category(); - if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && - error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && - error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { - proxy_error("Error: %s\n", get_error_code_with_message().c_str()); - } - } - NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + auto state = PQresultStatus(result); + switch (state) { + case PGRES_COMMAND_OK: + query_result->add_command_completion(result); + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + break; + case PGRES_EMPTY_QUERY: + query_result->add_empty_query_response(result); + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + break; + case PGRES_TUPLES_OK: + case PGRES_SINGLE_TUPLE: + break; + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_COPY_BOTH: + // NOT IMPLEMENTED + assert(0); + break; + case PGRES_BAD_RESPONSE: + case PGRES_NONFATAL_ERROR: + case PGRES_FATAL_ERROR: + default: + // we don't have a command completion, empty query responseor error packet in the result. This check is here to + // handle internal cleanup of libpq that might return residual protocol messages from the broken connection and + // may add multiple final packets. + if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { + set_error_from_result(result, PGSQL_ERROR_FIELD_ALL); + assert(is_error_present()); + + // we will not send FATAL error messages to the client + const PGSQL_ERROR_SEVERITY severity = get_error_severity(); + if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR || + severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING || + severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) { + + query_result->add_error(result); + } + + const PGSQL_ERROR_CATEGORY error_category = get_error_category(); + if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && + error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && + error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { + proxy_error("Error: %s\n", get_error_code_with_message().c_str()); + } + } + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); } if (first_result == true) { @@ -3242,7 +3269,7 @@ handler_again: } } - if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_ERROR)) == 0) { + if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { // if we reach here we assume that error_info is already set in previous call if (!is_error_present()) assert(0); // we might have missed setting error_info in previous call @@ -3974,7 +4001,7 @@ void PgSQL_Connection::connect_start() { //pgsql_conn = PQconnectdb(conninfo_str.c_str()); //PQsetErrorVerbosity(pgsql_conn, PQERRORS_VERBOSE); - //PQsetErrorContextVisibility(pgsql_conn, PQSHOW_CONTEXT_ALWAYS); + //PQsetErrorContextVisibility(pgsql_conn, PQSHOW_CONTEXT_ERRORS); if (pgsql_conn == NULL || PQstatus(pgsql_conn) == CONNECTION_BAD) { if (pgsql_conn) { @@ -4100,10 +4127,14 @@ void PgSQL_Connection::fetch_result_cont(short event) { if (PQconsumeInput(pgsql_conn) == 0) { // WARNING: DO NOT RELEASE this PGresult const PGresult* result = PQgetResultFromPGconn(pgsql_conn); - set_error_from_result(result); - // this is not actually an error, a hint to the user - error_info.severity = PGSQL_ERROR_SEVERITY::ERRSEVERITY_INFO; - proxy_warning("Failed to consume input. %s\n", get_error_code_with_message().c_str()); + /* We will only set the error if the result is not NULL or we didn't capture error in last call. If the result is NULL, + * it indicates that an error was already captured during a previous PQconsumeInput call, + * and we do not want to overwrite that information. + */ + if (result || is_error_present() == false) { + set_error_from_result(result); + proxy_error("Failed to consume input. %s\n", get_error_code_with_message().c_str()); + } } if (PQisBusy(pgsql_conn)) { @@ -4187,20 +4218,21 @@ void PgSQL_Connection::compute_unknown_transaction_status() { return; } - if (is_connected() == false) { + /*if (is_connected() == false) { unknown_transaction_status = true; return; - } + }*/ switch (PQtransactionStatus(pgsql_conn)) { case PQTRANS_INTRANS: case PQTRANS_INERROR: - case PQTRANS_UNKNOWN: case PQTRANS_ACTIVE: unknown_transaction_status = true; break; + case PQTRANS_UNKNOWN: default: - unknown_transaction_status = false; + //unknown_transaction_status = false; + break; } } } @@ -4470,7 +4502,8 @@ bool PgSQL_Connection::IsKnownActiveTransaction() { bool in_txn = false; if (pgsql_conn) { // Get the transaction status - if (PQtransactionStatus(pgsql_conn) == PQTRANS_INTRANS) { + PGTransactionStatusType status = PQtransactionStatus(pgsql_conn); + if (status == PQTRANS_INTRANS || status == PQTRANS_INERROR) { in_txn = true; } } @@ -4487,12 +4520,12 @@ bool PgSQL_Connection::IsActiveTransaction() { switch (status) { case PQTRANS_INTRANS: case PQTRANS_INERROR: - case PQTRANS_UNKNOWN: in_txn = true; break; + case PQTRANS_UNKNOWN: + case PQTRANS_IDLE: + case PQTRANS_ACTIVE: default: - //case PQTRANS_IDLE: - //case PQTRANS_ACTIVE: in_txn = false; } @@ -4526,3 +4559,10 @@ bool PgSQL_Connection::IsServerOffline() { } return ret; } + +bool PgSQL_Connection::is_connection_in_reusable_state() const { + const PGTransactionStatusType txn_status = PQtransactionStatus(pgsql_conn); + const bool conn_usable = !(txn_status == PQTRANS_UNKNOWN || txn_status == PQTRANS_ACTIVE); + assert(!(conn_usable == false && is_error_present() == false)); + return conn_usable; +} \ No newline at end of file diff --git a/lib/PgSQL_Error_Helper.cpp b/lib/PgSQL_Error_Helper.cpp index f8e6ce1b5..7b172ebf3 100644 --- a/lib/PgSQL_Error_Helper.cpp +++ b/lib/PgSQL_Error_Helper.cpp @@ -253,6 +253,10 @@ PGSQL_ERROR_SEVERITY PgSQL_Error_Helper::identify_error_severity(const char* sev ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_DEBUG; } else if (strcasecmp(severity, "LOG") == 0) { ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_LOG; + } else if (strcasecmp(severity, "INFO") == 0) { + ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_INFO; + } else { + ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_UNKNOWN_SEVERITY; } return ret; } @@ -286,6 +290,46 @@ void PgSQL_Error_Helper::fill_error_info(PgSQL_ErrorInfo& err_info, const char* err_info.message = msg; } +void PgSQL_Error_Helper::fill_error_info(PgSQL_ErrorInfo& err_info, PGSQL_ERROR_CODES code, const char* msg, PGSQL_ERROR_SEVERITY severity) { + fill_error_info(err_info, get_error_code(code), msg, PgSQL_Error_Helper::get_severity(severity)); +} + +/* +void PgSQL_Error_Helper::fill_error_info_from_error_message(PgSQL_ErrorInfo& err_info, const char* error_msg) { + std::string errorMsgStr(error_msg); + std::string sqlState; + std::string primaryErrorMsg; + std::string severity; + + // Initialize positions + size_t startPos = 0; + size_t endPos = 0; + + // Extract severity (assume it's the first word in the primary error message) + size_t severityEndPos = errorMsgStr.find(": "); + if (severityEndPos != std::string::npos) { + severity = errorMsgStr.substr(0, severityEndPos); + startPos = severityEndPos + 2; // Skip the ": " + } else { + severity = get_severity(PGSQL_ERROR_SEVERITY::ERRSEVERITY_UNKNOWN_SEVERITY); + } + + // Extract SQL state in the format [XXXXX] + startPos = errorMsgStr.find('[', startPos); + endPos = (startPos != std::string::npos) ? errorMsgStr.find(']', startPos) : std::string::npos; + + if (startPos != std::string::npos && endPos != std::string::npos && endPos == startPos + 6) { + sqlState = errorMsgStr.substr(startPos + 1, 5); // Extract the SQL state + primaryErrorMsg = errorMsgStr.substr(severityEndPos + 2, startPos - (severityEndPos + 2)); // Extract the primary error message up to the SQL state + } else { + sqlState = get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION); // Default SQL state + primaryErrorMsg = errorMsgStr.substr(severityEndPos + 2); // No SQL state found, remainder is the error message + } + + fill_error_info(err_info, sqlState.c_str(), primaryErrorMsg.c_str(), severity.c_str()); +} +*/ + void reset_error_info(PgSQL_ErrorInfo& err_info, bool release_extented) { err_info.sqlstate[0] = '\0'; err_info.code = PGSQL_ERROR_CODES::ERRCODE_SUCCESSFUL_COMPLETION; diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index c623dc614..b8798d2d2 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -3358,10 +3358,9 @@ void PgSQL_HostGroups_Manager::destroy_MyConn_from_pool(PgSQL_Connection *c, boo if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && c->send_quit && queue.size() < __sync_fetch_and_add(&GloMTH->variables.connpoll_reset_queue_length,0)) { if (c->async_state_machine==ASYNC_IDLE) { // overall, the backend seems healthy and so it is the connection. Try to reset it - int myerr=mysql_errno(c->pgsql); - if (myerr >= 2000 && myerr < 3000) { + if (c->is_connection_in_reusable_state() == false) { // client library error . We must not try to save the connection - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Not trying to reset PgSQL_Connection %p, server %s:%d . Error code %d\n", c, mysrvc->address, mysrvc->port, myerr); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Not trying to reset PgSQL_Connection %p, server %s:%d . Error %s\n", c, mysrvc->address, mysrvc->port, c->get_error_code_with_message().c_str()); } else { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Trying to reset PgSQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port); to_del=false; diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index 89f7d4053..a071c278a 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1627,6 +1627,42 @@ unsigned int PgSQL_Protocol::copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Q return size; } +unsigned int PgSQL_Protocol::copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result) { + assert(pg_query_result); + // we are currently not using result. It is just for future use + + const unsigned int size = 1 + 4; // I, length + bool alloced_new_buffer = false; + + unsigned char* _ptr = pg_query_result->buffer_reserve_space(size); + + // buffer is not enough to store the new row. Remember we have already pushed data to PSarrayOUT + if (_ptr == NULL) { + _ptr = (unsigned char*)l_alloc(size); + alloced_new_buffer = true; + } + + PG_pkt pgpkt(_ptr, size); + + pgpkt.put_char('I'); + pgpkt.put_uint32(size - 1); + + if (send == true) { + // not supported + //(*myds)->PSarrayOUT->add((void*)_ptr, size); + } + + pg_query_result->resultset_size += size; + + if (alloced_new_buffer) { + // we created new buffer + //pg_query_result->buffer_to_PSarrayOut(); + pg_query_result->PSarrayOUT.add(_ptr, size); + } + pg_query_result->pkt_count++; + return size; +} + unsigned int PgSQL_Protocol::copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status) { assert(pg_query_result); @@ -1677,7 +1713,7 @@ PgSQL_Query_Result::PgSQL_Query_Result() { num_fields = 0; num_rows = 0; pkt_count = 0; - result_packet_type = PGSQL_QUERY_RESULT_EMPTY; + result_packet_type = PGSQL_QUERY_RESULT_NO_DATA; } PgSQL_Query_Result::~PgSQL_Query_Result() { @@ -1759,6 +1795,12 @@ unsigned int PgSQL_Query_Result::add_error(const PGresult* result) { return size; } +unsigned int PgSQL_Query_Result::add_empty_query_response(const PGresult* result) { + const unsigned int bytes = proto->copy_empty_query_response_to_PgSQL_Query_Result(false, this, result); + result_packet_type |= PGSQL_QUERY_RESULT_EMPTY; + return bytes; +} + unsigned int PgSQL_Query_Result::add_ready_status(PGTransactionStatusType txn_status) { const unsigned int bytes = proto->copy_ready_status_to_PgSQL_Query_Result(false, this, txn_status); buffer_to_PSarrayOut(); @@ -1843,5 +1885,5 @@ void PgSQL_Query_Result::reset() { num_fields = 0; num_rows = 0; pkt_count = 0; - result_packet_type = PGSQL_QUERY_RESULT_EMPTY; + result_packet_type = PGSQL_QUERY_RESULT_NO_DATA; } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 1faf14f05..c15cc5a21 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -1851,7 +1851,7 @@ int PgSQL_Session::handler_again___status_PINGING_SERVER() { } else { // rc==-1 int myerr = mysql_errno(myconn->pgsql); - detected_broken_connection(__FILE__, __LINE__, __func__, "during ping", myconn, myerr, mysql_error(myconn->pgsql), true); + detected_broken_connection(__FILE__, __LINE__, __func__, "during ping", myconn, true); PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myerr); } myds->destroy_MySQL_Connection_From_Pool(false); @@ -2031,22 +2031,8 @@ bool PgSQL_Session::handler_again___verify_init_connect() { if (tmp_init_connect) { // we send init connect queries only if set mybe->server_myds->myconn->options.init_connect = strdup(tmp_init_connect); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); NEXT_IMMEDIATE_NEW(SETTING_INIT_CONNECT); } } @@ -2101,18 +2087,8 @@ bool PgSQL_Session::handler_again___verify_backend_session_track_gtids() { mybe->server_myds->myconn->options.session_track_gtids_int = SpookyHash::Hash32((char*)"OWN_GTID", strlen((char*)"OWN_GTID"), 10); // we now switch status to set session_track_gtids - switch (status) { - case PROCESSING_QUERY: - case PROCESSING_STMT_PREPARE: - case PROCESSING_STMT_EXECUTE: - previous_status.push(status); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); NEXT_IMMEDIATE_NEW(SETTING_SESSION_TRACK_GTIDS); } return ret; @@ -2200,45 +2176,15 @@ bool PgSQL_Session::handler_again___verify_backend_autocommit() { // enforce_autocommit_on_reads is disabled // we need to check if it is a SELECT not FOR UPDATE if (CurrentQuery.is_select_NOT_for_update() == false) { - //previous_status.push(PROCESSING_QUERY); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT); } } else { // in every other cases, enforce autocommit - //previous_status.push(PROCESSING_QUERY); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT); } } @@ -2287,44 +2233,14 @@ bool PgSQL_Session::handler_again___verify_backend_user_schema() { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session %p , client: %s , backend: %s\n", this, client_myds->myconn->userinfo->schemaname, mybe->server_myds->myconn->userinfo->schemaname); if (client_myds->myconn->userinfo->hash != mybe->server_myds->myconn->userinfo->hash) { if (strcmp(client_myds->myconn->userinfo->username, myds->myconn->userinfo->username)) { - //previous_status.push(PROCESSING_QUERY); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); mybe->server_myds->wait_until = thread->curtime + pgsql_thread___connect_timeout_server * 1000; // max_timeout NEXT_IMMEDIATE_NEW(CHANGING_USER_SERVER); } if (strcmp(client_myds->myconn->userinfo->schemaname, myds->myconn->userinfo->schemaname)) { - //previous_status.push(PROCESSING_QUERY); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); NEXT_IMMEDIATE_NEW(CHANGING_SCHEMA); } } @@ -2334,18 +2250,9 @@ bool PgSQL_Session::handler_again___verify_backend_user_schema() { // the backend connection has some session variable set // that the client never asked for // because we can't unset variables, we will reset the connection - switch (status) { - case PROCESSING_QUERY: - case PROCESSING_STMT_PREPARE: - case PROCESSING_STMT_EXECUTE: - previous_status.push(status); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); mybe->server_myds->wait_until = thread->curtime + pgsql_thread___connect_timeout_server * 1000; // max_timeout NEXT_IMMEDIATE_NEW(CHANGING_USER_SERVER); } @@ -2385,7 +2292,7 @@ bool PgSQL_Session::handler_again___status_SETTING_INIT_CONNECT(int* _rc) { if (myerr >= 2000 || myerr == 0) { bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "while setting INIT CONNECT", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "while setting INIT CONNECT", myconn); //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { if (rc != -2) { // see PMC-10003 if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { @@ -2493,7 +2400,7 @@ bool PgSQL_Session::handler_again___status_SETTING_LDAP_USER_VARIABLE(int* _rc) if (myerr >= 2000 || myerr == 0) { bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "while setting LDAP USER VARIABLE", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "while setting LDAP USER VARIABLE", myconn); if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; } @@ -2580,7 +2487,7 @@ bool PgSQL_Session::handler_again___status_SETTING_SQL_LOG_BIN(int* _rc) { if (myerr >= 2000 || myerr == 0) { bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "while setting SQL_LOG_BIN", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "while setting SQL_LOG_BIN", myconn); if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; } @@ -2660,7 +2567,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_CHARSET(int* _rc) { } bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "during SET NAMES", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "during SET NAMES", myconn); if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; } @@ -2806,7 +2713,7 @@ bool PgSQL_Session::handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, co // client error, serious std::string action = "while setting "; action += var_name; - detected_broken_connection(__FILE__, __LINE__, __func__, action.c_str(), myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, action.c_str(), myconn); //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; @@ -2925,7 +2832,7 @@ bool PgSQL_Session::handler_again___status_SETTING_MULTI_STMT(int* _rc) { if (myerr >= 2000 || myerr == 0) { bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "while setting MYSQL_OPTION_MULTI_STATEMENTS", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "while setting MYSQL_OPTION_MULTI_STATEMENTS", myconn); //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; @@ -3000,7 +2907,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_SCHEMA(int* _rc) { if (myerr >= 2000 || myerr == 0) { bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "during INIT_DB", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "during INIT_DB", myconn); //if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; @@ -3276,7 +3183,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_USER_SERVER(int* _rc) { if (myerr >= 2000 || myerr == 0) { bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "during CHANGE_USER", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "during CHANGE_USER", myconn); if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; } @@ -3381,7 +3288,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_AUTOCOMMIT(int* _rc) { if (myerr >= 2000 || myerr == 0) { bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "during SET AUTOCOMMIT", myconn, myerr, mysql_error(myconn->pgsql)); + detected_broken_connection(__FILE__, __LINE__, __func__, "during SET AUTOCOMMIT", myconn); if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { retry_conn = true; } @@ -4656,35 +4563,31 @@ __get_pkts_from_client: int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgSQL_Data_Stream* myds) { PgSQL_Connection* myconn = myds->myconn; // the query failed - if ( - // due to #774 , we now read myconn->server_status instead of myconn->parent->status - (myconn->server_status == MYSQL_SERVER_STATUS_OFFLINE_HARD) // the query failed because the server is offline hard - || - (myconn->server_status == MYSQL_SERVER_STATUS_SHUNNED && myconn->parent->shunned_automatic == true && myconn->parent->shunned_and_kill_all_connections == true) // the query failed because the server is shunned due to a serious failure - || - (myconn->server_status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) // slave is lagging! see #774 - ) { + if (myconn->IsServerOffline()) { + // Set maximum connect time if connect timeout is configured if (pgsql_thread___connect_timeout_server_max) { myds->max_connect_time = thread->curtime + pgsql_thread___connect_timeout_server_max * 1000; } + + // Variables to track retry and error conditions bool retry_conn = false; if (myconn->server_status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { thread->status_variables.stvar[st_var_backend_lagging_during_query]++; proxy_error("Detected a lagging server during query: %s, %d\n", myconn->parent->address, myconn->parent->port); PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_LAGGING_SRV); - } - else { + } else { thread->status_variables.stvar[st_var_backend_offline_during_query]++; proxy_error("Detected an offline server during query: %s, %d\n", myconn->parent->address, myconn->parent->port); PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_OFFLINE_SRV); } + + // Retry the query if retries are allowed and conditions permit if (myds->query_retries_on_failure > 0) { myds->query_retries_on_failure--; if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { if (myds->myconn->query_result && myds->myconn->query_result->is_transfer_started()) { // transfer to frontend has started, we cannot retry - } - else { + } else { retry_conn = true; proxy_warning("Retrying query.\n"); } @@ -4694,23 +4597,8 @@ int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgS myds->fd = 0; if (retry_conn) { myds->DSS = STATE_NOT_INITIALIZED; - //previous_status.push(PROCESSING_QUERY); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); return 1; } return -1; @@ -4852,25 +4740,24 @@ void PgSQL_Session::handler_rc0_PROCESSING_STMT_EXECUTE(PgSQL_Data_Stream* myds) // now it returns: // true: NEXT_IMMEDIATE(CONNECTING_SERVER) needs to be called // false: continue -bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds, int myerr, char** errmsg) { +bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) { PgSQL_Connection* myconn = myds->myconn; bool retry_conn = false; // client error, serious - detected_broken_connection(__FILE__, __LINE__, __func__, "running query", myconn, myerr, mysql_error(myconn->pgsql), true); + detected_broken_connection(__FILE__, __LINE__, __func__, "running query", myconn, true); if (myds->query_retries_on_failure > 0) { myds->query_retries_on_failure--; - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { - if (myds->myconn->query_result && myds->myconn->query_result->is_transfer_started()) { + if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) { + if (myconn->query_result && myconn->query_result->is_transfer_started()) { // transfer to frontend has started, we cannot retry - } - else { - if (myds->myconn->pgsql->server_status & SERVER_MORE_RESULTS_EXIST) { + } else { + // a hack to check if we have pending results. This should never occur. + if (myconn->get_last_result() != nullptr) { // transfer to frontend has started, because this is, at least, // the second resultset coming from the server // we cannot retry - proxy_warning("Disabling query retry because SERVER_MORE_RESULTS_EXIST is set\n"); - } - else { + proxy_warning("Disabling query retry because we were in middle of processing results\n"); + } else { retry_conn = true; proxy_warning("Retrying query.\n"); } @@ -4881,46 +4768,23 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds, i myds->fd = 0; if (retry_conn) { myds->DSS = STATE_NOT_INITIALIZED; - //previous_status.push(PROCESSING_QUERY); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } - if (*errmsg) { - free(*errmsg); - *errmsg = NULL; - } + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); return true; } - if (*errmsg) { - free(*errmsg); - *errmsg = NULL; - } return false; } // this function was inline -void PgSQL_Session::handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn, int myerr, char* errmsg) { +void PgSQL_Session::handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn) { if (pgsql_thread___verbose_query_error) { proxy_warning("Error during query on (%d,%s,%d,%lu) , user \"%s@%s\" , schema \"%s\" , %s . digest_text = \"%s\"\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"), client_myds->myconn->userinfo->schemaname, myconn->get_error_code_with_message().c_str(), CurrentQuery.QueryParserArgs.digest_text); } else { proxy_warning("Error during query on (%d,%s,%d,%lu): %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->get_error_code_with_message().c_str()); } - PgHGM->add_pgsql_errors(myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"), client_myds->myconn->userinfo->schemaname, myerr, (char*)(errmsg ? errmsg : mysql_error(myconn->pgsql))); + PgHGM->add_pgsql_errors(myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"), client_myds->myconn->userinfo->schemaname, 9999, (char*)myconn->get_error_code_with_message().c_str()); } @@ -4930,12 +4794,12 @@ void PgSQL_Session::handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn, // if handler_ret == -1 : return // if handler_ret == 0 : NEXT_IMMEDIATE(CONNECTING_SERVER) needs to be called // false: continue -bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int myerr, char** errmsg, int& handler_ret) { +bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int& handler_ret) { bool retry_conn = false; PgSQL_Connection* myconn = myds->myconn; handler_ret = 0; // default - switch (myerr) { - case 1317: // Query execution was interrupted + switch (myconn->get_error_code()) { + case PGSQL_ERROR_CODES::ERRCODE_QUERY_CANCELED: // Query execution was interrupted if (killed == true) { // this session is being kiled handler_ret = -1; return true; @@ -4945,60 +4809,32 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int break; } break; - case 1047: // WSREP has not yet prepared node for application use - case 1053: // Server shutdown in progress - myconn->parent->connect_error(myerr); + case PGSQL_ERROR_CODES::ERRCODE_ADMIN_SHUTDOWN: // Server shutdown in progress. Requested by Admin + case PGSQL_ERROR_CODES::ERRCODE_CRASH_SHUTDOWN: // Server shutdown in progress + case PGSQL_ERROR_CODES::ERRCODE_CANNOT_CONNECT_NOW: // Server in initialization mode and not ready to handle new connections + myconn->parent->connect_error(9999); if (myds->query_retries_on_failure > 0) { myds->query_retries_on_failure--; - if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { + if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) { retry_conn = true; proxy_warning("Retrying query.\n"); } } - switch (myerr) { - case 1047: // WSREP has not yet prepared node for application use - case 1053: // Server shutdown in progress - myds->destroy_MySQL_Connection_From_Pool(false); - break; - default: - if (pgsql_thread___reset_connection_algorithm == 2) { - create_new_session_and_reset_connection(myds); - } - else { - myds->destroy_MySQL_Connection_From_Pool(true); - } - break; - } + myds->destroy_MySQL_Connection_From_Pool(false); myconn = myds->myconn; myds->fd = 0; if (retry_conn) { myds->DSS = STATE_NOT_INITIALIZED; //previous_status.push(PROCESSING_QUERY); - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } - if (*errmsg) { - free(*errmsg); - *errmsg = NULL; - } + set_previous_status_mode3(false); return true; // it will call NEXT_IMMEDIATE(CONNECTING_SERVER); //NEXT_IMMEDIATE(CONNECTING_SERVER); } //handler_ret = -1; //return handler_ret; break; - case 1153: // ER_NET_PACKET_TOO_LARGE - proxy_warning("Error ER_NET_PACKET_TOO_LARGE during query on (%d,%s,%d,%lu): %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myerr, mysql_error(myconn->pgsql)); + case PGSQL_ERROR_CODES::ERRCODE_OUT_OF_MEMORY: + proxy_warning("Error OUT_OF_MEMORY during query on (%d,%s,%d,%lu): %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->get_error_code_with_message().c_str()); break; default: break; // continue normally @@ -5007,7 +4843,8 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int } // this function used to be inline. -void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn, bool& wrong_pass) { +void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, bool& wrong_pass) { + PgSQL_Connection* myconn = myds->myconn; switch (status) { case PROCESSING_QUERY: if (myconn) { @@ -5067,24 +4904,22 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, } // this function was inline -void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) { - if (myds->myconn) { - myds->myconn->reduce_auto_increment_delay_token(); - if (pgsql_thread___multiplexing && (myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) { +void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds) { + PgSQL_Connection* myconn = myds->myconn; + if (myconn) { + myconn->reduce_auto_increment_delay_token(); + if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) { myds->DSS = STATE_NOT_INITIALIZED; - if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit() == false) { + if (mysql_thread___autocommit_false_not_reusable && myconn->IsAutoCommit() == false) { if (pgsql_thread___reset_connection_algorithm == 2) { create_new_session_and_reset_connection(myds); - } - else { + } else { myds->destroy_MySQL_Connection_From_Pool(true); } - } - else { + } else { myds->return_MySQL_Connection_To_Pool(); } - } - else { + } else { myconn->async_state_machine = ASYNC_IDLE; myds->DSS = STATE_MARIADB_GENERIC; } @@ -5316,25 +5151,12 @@ handler_again: } if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) { // we don't have a backend yet - switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility - case PROCESSING_QUERY: - previous_status.push(PROCESSING_QUERY); - break; - case PROCESSING_STMT_PREPARE: - previous_status.push(PROCESSING_STMT_PREPARE); - break; - case PROCESSING_STMT_EXECUTE: - previous_status.push(PROCESSING_STMT_EXECUTE); - break; - default: - // LCOV_EXCL_START - assert(0); - break; - // LCOV_EXCL_STOP - } + // It saves the current processing status of the session (status) onto the previous_status stack + // Sets the previous status of the PgSQL session according to the current status. + set_previous_status_mode3(); + // It transitions the session to the CONNECTING_SERVER state immediately. NEXT_IMMEDIATE(CONNECTING_SERVER); - } - else { + } else { PgSQL_Data_Stream* myds = mybe->server_myds; PgSQL_Connection* myconn = myds->myconn; mybe->server_myds->max_connect_time = 0; @@ -5546,16 +5368,16 @@ handler_again: else { if (rc == -1) { // the query failed - int myerr = mysql_errno(myconn->pgsql); - char* errmsg = NULL; - if (myerr == 0) { - if (CurrentQuery.mysql_stmt) { + const bool is_error_present = myconn->is_error_present(); // false means failure is due to server being in OFFLINE state + if (is_error_present == false) { + + /*if (CurrentQuery.mysql_stmt) { myerr = mysql_stmt_errno(CurrentQuery.mysql_stmt); errmsg = strdup(mysql_stmt_error(CurrentQuery.mysql_stmt)); - } + }*/ } - PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myerr); - CurrentQuery.mysql_stmt = NULL; // immediately reset mysql_stmt + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, 9999); // TOFIX + //CurrentQuery.mysql_stmt = NULL; // immediately reset mysql_stmt int rc1 = handler_ProcessingQueryError_CheckBackendConnectionStatus(myds); if (rc1 == -1) { handler_ret = -1; @@ -5565,28 +5387,25 @@ handler_again: if (rc1 == 1) NEXT_IMMEDIATE(CONNECTING_SERVER); } - if (myerr >= 2000 && myerr < 3000) { - if (handler_minus1_ClientLibraryError(myds, myerr, &errmsg)) { + if (myconn->is_connection_in_reusable_state() == false) { + if (handler_minus1_ClientLibraryError(myds)) { NEXT_IMMEDIATE(CONNECTING_SERVER); - } - else { + } else { handler_ret = -1; return handler_ret; } - } - else { - handler_minus1_LogErrorDuringQuery(myconn, myerr, errmsg); - if (handler_minus1_HandleErrorCodes(myds, myerr, &errmsg, handler_ret)) { + } else { + handler_minus1_LogErrorDuringQuery(myconn); + if (handler_minus1_HandleErrorCodes(myds, handler_ret)) { if (handler_ret == 0) NEXT_IMMEDIATE(CONNECTING_SERVER); return handler_ret; } - handler_minus1_GenerateErrorMessage(myds, myconn, wrong_pass); + handler_minus1_GenerateErrorMessage(myds, wrong_pass); RequestEnd(myds); - handler_minus1_HandleBackendConnection(myds, myconn); + handler_minus1_HandleBackendConnection(myds); } - } - else { + } else { switch (rc) { // rc==1 , query is still running // start sending to frontend if pgsql_thread___threshold_resultset_size is reached @@ -5620,10 +5439,7 @@ handler_again: } } } - goto __exit_DSS__STATE_NOT_INITIALIZED; - - } break; @@ -7922,7 +7738,7 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da PgSQL_Query_Result* query_result = _conn->query_result; - if (query_result && query_result->get_result_packet_type() != PGSQL_QUERY_RESULT_EMPTY) { + if (query_result && query_result->get_result_packet_type() != PGSQL_QUERY_RESULT_NO_DATA) { bool transfer_started = query_result->is_transfer_started(); // if there is an error, it will be false so results are not cached bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY); @@ -8729,22 +8545,22 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C l_free(pkt.size, pkt.ptr); } -void PgSQL_Session::detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, int myerr, const char* message, bool verbose) { - char* msg = (char*)message; - if (msg == NULL) { - msg = (char*)"Detected offline server prior to statement execution"; - } - if (myerr == 0) { - myerr = ER_PROXYSQL_OFFLINE_SRV; - msg = (char*)"Detected offline server prior to statement execution"; +void PgSQL_Session::detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, bool verbose) { + + const char* code = PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION);; + const char* msg = "Detected offline server prior to statement execution"; + + if (myconn->is_error_present() == true) { + code = myconn->get_error_code_str(); + msg = myconn->get_error_message().c_str(); } + unsigned long long last_used = thread->curtime - myconn->last_time_used; last_used /= 1000; if (verbose) { - proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , FD (Conn:%d , MyDS:%d) , user %s , last_used %llums ago : %d, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->myds->fd, myconn->fd, myconn->userinfo->username, last_used, myerr, msg); - } - else { - proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , user %s , last_used %llums ago : %d, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->userinfo->username, last_used, myerr, msg); + proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , FD (Conn:%d , MyDS:%d) , user %s , last_used %llums ago : %s, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->myds->fd, myconn->fd, myconn->userinfo->username, last_used, code, msg); + } else { + proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , user %s , last_used %llums ago : %s, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->userinfo->username, last_used, code, msg); } } @@ -8789,3 +8605,49 @@ void PgSQL_Session::reset_warning_hostgroup_flag_and_release_connection() warning_in_hg = -1; } } + +/** + * @brief Sets the previous status of the PgSQL session according to the current status, with an option to allow EXECUTE statements. + * + * This method updates the previous status of the PgSQL session based on its current status. It employs a switch statement + * to determine the current status and then pushes the corresponding status value onto the `previous_status` stack. If the + * `allow_execute` parameter is set to true and the current status is `PROCESSING_STMT_EXECUTE`, the method pushes this status + * onto the stack; otherwise, it skips pushing the status for EXECUTE statements. If the current status does not match any known + * status value (which should not occur under normal circumstances), the method asserts to indicate a programming error. + * It currently works with only 3 possible status: + * - PROCESSING_QUERY + * - PROCESSING_STMT_PREPARE + * - PROCESSING_STMT_EXECUTE + * + * @param allow_execute A boolean value indicating whether to allow the status of EXECUTE statements to be pushed onto the + * `previous_status` stack. If set to true, the method will include EXECUTE statements in the session's status history. + * + * @return void. + * @note This method assumes that the `status` member variable has been properly initialized with one of the predefined + * status values. + * @note This method is primarily used to maintain a history of the session's previous states for later reference or + * recovery purposes. + * @note The LCOV_EXCL_START and LCOV_EXCL_STOP directives are used to exclude the assert statement from code coverage + * analysis because the condition should not occur during normal execution and is included as a safeguard against + * programming errors. + */ +void PgSQL_Session::set_previous_status_mode3(bool allow_execute) { + switch (status) { + case PROCESSING_QUERY: + previous_status.push(PROCESSING_QUERY); + break; + case PROCESSING_STMT_PREPARE: + previous_status.push(PROCESSING_STMT_PREPARE); + break; + case PROCESSING_STMT_EXECUTE: + if (allow_execute == true) { + previous_status.push(PROCESSING_STMT_EXECUTE); + break; + } + default: + // LCOV_EXCL_START + assert(0); // Assert to indicate an unexpected status value + break; + // LCOV_EXCL_STOP + } +}