Many improvements and bug fixes

* Fixed OpenSSL includes and linking for LibPQ building.
* Handled Empty query response.
* Improve transaction status detection.
* Implemented detection for unusable connections and fixed query retry
  mechanism.
* Resolved Heap overflow issue.
* Forward non-FATAL errors to client.
* Added duplicate error detection to avoid adding multiple error packets in
query result.
v2.x_pg_PrepStmtBase_240714
Rahim Kanji 2 years ago
parent 20cb1149df
commit 27e1e4fb7a

@ -520,6 +520,8 @@ public:
bool IsActiveTransaction();
bool IsKnownActiveTransaction();
bool IsServerOffline();
bool is_connection_in_reusable_state() const;
int get_server_version() {
return PQserverVersion(pgsql_conn);
@ -538,11 +540,15 @@ public:
return false;
}
PGSQL_ERROR_SEVERITY get_error_severity() const {
return error_info.severity;
}
PGSQL_ERROR_CATEGORY get_error_category() const {
return error_info.category;
}
std::string get_error_message() const {
const std::string& get_error_message() const {
return error_info.message;
}
@ -562,8 +568,19 @@ public:
PgSQL_Error_Helper::fill_error_info(error_info, code, message, is_fatal ? "FATAL" : "ERROR");
}
void set_error(PGSQL_ERROR_CODES code, const char* message, bool is_fatal) {
PgSQL_Error_Helper::fill_error_info(error_info, code, message, is_fatal ?
PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL : PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR);
}
void set_error_from_result(const PGresult* result, uint16_t ext_fields = 0) {
PgSQL_Error_Helper::fill_error_info(error_info, result, ext_fields);
if (result) {
PgSQL_Error_Helper::fill_error_info(error_info, result, ext_fields);
} else {
const char* errmsg = PQerrorMessage(pgsql_conn);
set_error(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, errmsg ? errmsg : "Unknown error", true);
//PgSQL_Error_Helper::fill_error_info_from_error_message(error_info, errmsg);
}
}
void reset_error() {

@ -388,12 +388,18 @@ void reset_error_info(PgSQL_ErrorInfo& err_info, bool release_extented);
class PgSQL_Error_Helper {
public:
static constexpr const char* get_error_code(PGSQL_ERROR_CODES err_code) {
return error_code_str[static_cast<uint8_t>(err_code)];
static constexpr const char* get_error_code(PGSQL_ERROR_CODES code) {
return error_code_str[static_cast<uint8_t>(code)];
}
static constexpr const char* get_severity(PGSQL_ERROR_SEVERITY severity) {
return severity_str[static_cast<uint8_t>(severity)];
}
static void fill_error_info(PgSQL_ErrorInfo& err_info, const PGresult* result, uint16_t ext_fields);
static void fill_error_info(PgSQL_ErrorInfo& err_info, const char* code, const char* msg, const char* severity);
static void fill_error_info(PgSQL_ErrorInfo& err_info, PGSQL_ERROR_CODES code, const char* msg, PGSQL_ERROR_SEVERITY severity);
//static void fill_error_info_from_error_message(PgSQL_ErrorInfo& err_info, const char* error_msg);
private:
static PGSQL_ERROR_CODES identify_error_code(const char* code);
@ -646,6 +652,19 @@ private:
};
static_assert(static_cast<uint8_t>(PGSQL_ERROR_CODES::PGSQL_ERROR_CODES_COUNT) == sizeof(error_code_str) / sizeof(char*), "Mismatch between PGSQL_ERROR_CODES_COUNT and error_code_str array size");
static constexpr const char* severity_str[] = {
"UNKNOWN",
"FATAL",
"PANIC",
"ERROR",
"WARNING",
"NOTICE",
"DEBUG",
"INFO",
"LOG"
};
static_assert(static_cast<uint8_t>(PGSQL_ERROR_SEVERITY::PGSQL_ERROR_SEVERITY_COUNT) == sizeof(severity_str) / sizeof(char*), "Mismatch between PGSQL_ERROR_SEVERITY_COUNT and severity_str array size");
};
#define PGSQL_GET_ERROR_CODE_STR(ENUM_CODE) PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ENUM_CODE)

@ -183,12 +183,12 @@ private:
class PgSQL_Protocol;
#define PGSQL_QUERY_RESULT_EMPTY 0x00
#define PGSQL_QUERY_RESULT_NO_DATA 0x00
#define PGSQL_QUERY_RESULT_TUPLE 0x01
#define PGSQL_QUERY_RESULT_COMMAND 0x02
#define PGSQL_QUERY_RESULT_COMMAND 0x02
#define PGSQL_QUERY_RESULT_READY 0x04
#define PGSQL_QUERY_RESULT_ERROR 0x08
#define PGSQL_QUERY_RESULT_WARNING 0x10
#define PGSQL_QUERY_RESULT_EMPTY 0x10
class PgSQL_Query_Result {
public:
@ -200,6 +200,7 @@ public:
unsigned int add_row(const PGresult* result);
unsigned int add_command_completion(const PGresult* result);
unsigned int add_error(const PGresult* result);
unsigned int add_empty_query_response(const PGresult* result);
unsigned int add_ready_status(PGTransactionStatusType txn_status);
bool get_resultset(PtrSizeArray* PSarrayFinal);
@ -257,7 +258,9 @@ public:
unsigned int copy_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
unsigned int copy_command_completion_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
unsigned int copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
unsigned int copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status);
private:
bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr);
void load_conn_parameters(pgsql_hdr* pkt, bool startup);

@ -221,11 +221,11 @@ private:
void SetQueryTimeout();
bool handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, PgSQL_Data_Stream* myds, bool& prepared_stmt_with_no_params);
void handler_rc0_PROCESSING_STMT_EXECUTE(PgSQL_Data_Stream* myds);
bool handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds, int myerr, char** errmsg);
void handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn, int myerr, char* errmsg);
bool handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int myerr, char** errmsg, int& handler_ret);
void handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn, bool& wrong_pass);
void handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn);
bool handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds);
void handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn);
bool handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int& handler_ret);
void handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, bool& wrong_pass);
void handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds);
int RunQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn);
void handler___status_WAITING_CLIENT_DATA();
void handler_rc0_Process_GTID(PgSQL_Connection* myconn);
@ -410,9 +410,10 @@ public:
bool known_query_for_locked_on_hostgroup(uint64_t);
void unable_to_parse_set_statement(bool*);
bool has_any_backend();
void detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, int myerr, const char* message, bool verbose = false);
void detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, bool verbose = false);
void generate_status_one_hostgroup(int hid, std::string& s);
void reset_warning_hostgroup_flag_and_release_connection();
void set_previous_status_mode3(bool allow_execute = true);
};
#define PgSQL_KILL_QUERY 1

@ -107,8 +107,8 @@ enum PgSQL_Thread_status_variable {
st_var_automatic_detected_sqli,
st_var_mysql_whitelisted_sqli_fingerprint,
st_var_client_host_error_killed_connections,
st_var_END*/
PG_st_var_END
*/
PG_st_var_END = 42 // to avoid ASAN complaining. TO FIX
};
class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread

@ -2198,7 +2198,7 @@ void MySQL_HostGroups_Manager::update_table_mysql_servers_for_monitor(bool lock)
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
char* query = const_cast<char*>("SELECT hostname, port, status, use_ssl FROM mysql_servers WHERE status != 3 GROUP BY hostname, port");
const char* query = "SELECT hostname, port, status, use_ssl FROM mysql_servers WHERE status != 3 GROUP BY hostname, port";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);

@ -3193,26 +3193,53 @@ handler_again:
const PGresult* result = get_last_result();
if (result) {
switch (PQresultStatus(result)) {
case PGRES_COMMAND_OK:
query_result->add_command_completion(result);
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_TUPLES_OK:
case PGRES_SINGLE_TUPLE:
break;
default:
{
set_error_from_result(result, PGSQL_ERROR_FIELD_ALL);
query_result->add_error(result);
const PGSQL_ERROR_CATEGORY error_category = get_error_category();
if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) {
proxy_error("Error: %s\n", get_error_code_with_message().c_str());
}
}
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
auto state = PQresultStatus(result);
switch (state) {
case PGRES_COMMAND_OK:
query_result->add_command_completion(result);
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_EMPTY_QUERY:
query_result->add_empty_query_response(result);
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_TUPLES_OK:
case PGRES_SINGLE_TUPLE:
break;
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
// NOT IMPLEMENTED
assert(0);
break;
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
default:
// we don't have a command completion, empty query responseor error packet in the result. This check is here to
// handle internal cleanup of libpq that might return residual protocol messages from the broken connection and
// may add multiple final packets.
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
set_error_from_result(result, PGSQL_ERROR_FIELD_ALL);
assert(is_error_present());
// we will not send FATAL error messages to the client
const PGSQL_ERROR_SEVERITY severity = get_error_severity();
if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR ||
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING ||
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) {
query_result->add_error(result);
}
const PGSQL_ERROR_CATEGORY error_category = get_error_category();
if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) {
proxy_error("Error: %s\n", get_error_code_with_message().c_str());
}
}
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}
if (first_result == true) {
@ -3242,7 +3269,7 @@ handler_again:
}
}
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_ERROR)) == 0) {
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
// if we reach here we assume that error_info is already set in previous call
if (!is_error_present())
assert(0); // we might have missed setting error_info in previous call
@ -3974,7 +4001,7 @@ void PgSQL_Connection::connect_start() {
//pgsql_conn = PQconnectdb(conninfo_str.c_str());
//PQsetErrorVerbosity(pgsql_conn, PQERRORS_VERBOSE);
//PQsetErrorContextVisibility(pgsql_conn, PQSHOW_CONTEXT_ALWAYS);
//PQsetErrorContextVisibility(pgsql_conn, PQSHOW_CONTEXT_ERRORS);
if (pgsql_conn == NULL || PQstatus(pgsql_conn) == CONNECTION_BAD) {
if (pgsql_conn) {
@ -4100,10 +4127,14 @@ void PgSQL_Connection::fetch_result_cont(short event) {
if (PQconsumeInput(pgsql_conn) == 0) {
// WARNING: DO NOT RELEASE this PGresult
const PGresult* result = PQgetResultFromPGconn(pgsql_conn);
set_error_from_result(result);
// this is not actually an error, a hint to the user
error_info.severity = PGSQL_ERROR_SEVERITY::ERRSEVERITY_INFO;
proxy_warning("Failed to consume input. %s\n", get_error_code_with_message().c_str());
/* We will only set the error if the result is not NULL or we didn't capture error in last call. If the result is NULL,
* it indicates that an error was already captured during a previous PQconsumeInput call,
* and we do not want to overwrite that information.
*/
if (result || is_error_present() == false) {
set_error_from_result(result);
proxy_error("Failed to consume input. %s\n", get_error_code_with_message().c_str());
}
}
if (PQisBusy(pgsql_conn)) {
@ -4187,20 +4218,21 @@ void PgSQL_Connection::compute_unknown_transaction_status() {
return;
}
if (is_connected() == false) {
/*if (is_connected() == false) {
unknown_transaction_status = true;
return;
}
}*/
switch (PQtransactionStatus(pgsql_conn)) {
case PQTRANS_INTRANS:
case PQTRANS_INERROR:
case PQTRANS_UNKNOWN:
case PQTRANS_ACTIVE:
unknown_transaction_status = true;
break;
case PQTRANS_UNKNOWN:
default:
unknown_transaction_status = false;
//unknown_transaction_status = false;
break;
}
}
}
@ -4470,7 +4502,8 @@ bool PgSQL_Connection::IsKnownActiveTransaction() {
bool in_txn = false;
if (pgsql_conn) {
// Get the transaction status
if (PQtransactionStatus(pgsql_conn) == PQTRANS_INTRANS) {
PGTransactionStatusType status = PQtransactionStatus(pgsql_conn);
if (status == PQTRANS_INTRANS || status == PQTRANS_INERROR) {
in_txn = true;
}
}
@ -4487,12 +4520,12 @@ bool PgSQL_Connection::IsActiveTransaction() {
switch (status) {
case PQTRANS_INTRANS:
case PQTRANS_INERROR:
case PQTRANS_UNKNOWN:
in_txn = true;
break;
case PQTRANS_UNKNOWN:
case PQTRANS_IDLE:
case PQTRANS_ACTIVE:
default:
//case PQTRANS_IDLE:
//case PQTRANS_ACTIVE:
in_txn = false;
}
@ -4526,3 +4559,10 @@ bool PgSQL_Connection::IsServerOffline() {
}
return ret;
}
bool PgSQL_Connection::is_connection_in_reusable_state() const {
const PGTransactionStatusType txn_status = PQtransactionStatus(pgsql_conn);
const bool conn_usable = !(txn_status == PQTRANS_UNKNOWN || txn_status == PQTRANS_ACTIVE);
assert(!(conn_usable == false && is_error_present() == false));
return conn_usable;
}

@ -253,6 +253,10 @@ PGSQL_ERROR_SEVERITY PgSQL_Error_Helper::identify_error_severity(const char* sev
ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_DEBUG;
} else if (strcasecmp(severity, "LOG") == 0) {
ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_LOG;
} else if (strcasecmp(severity, "INFO") == 0) {
ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_INFO;
} else {
ret = PGSQL_ERROR_SEVERITY::ERRSEVERITY_UNKNOWN_SEVERITY;
}
return ret;
}
@ -286,6 +290,46 @@ void PgSQL_Error_Helper::fill_error_info(PgSQL_ErrorInfo& err_info, const char*
err_info.message = msg;
}
void PgSQL_Error_Helper::fill_error_info(PgSQL_ErrorInfo& err_info, PGSQL_ERROR_CODES code, const char* msg, PGSQL_ERROR_SEVERITY severity) {
fill_error_info(err_info, get_error_code(code), msg, PgSQL_Error_Helper::get_severity(severity));
}
/*
void PgSQL_Error_Helper::fill_error_info_from_error_message(PgSQL_ErrorInfo& err_info, const char* error_msg) {
std::string errorMsgStr(error_msg);
std::string sqlState;
std::string primaryErrorMsg;
std::string severity;
// Initialize positions
size_t startPos = 0;
size_t endPos = 0;
// Extract severity (assume it's the first word in the primary error message)
size_t severityEndPos = errorMsgStr.find(": ");
if (severityEndPos != std::string::npos) {
severity = errorMsgStr.substr(0, severityEndPos);
startPos = severityEndPos + 2; // Skip the ": "
} else {
severity = get_severity(PGSQL_ERROR_SEVERITY::ERRSEVERITY_UNKNOWN_SEVERITY);
}
// Extract SQL state in the format [XXXXX]
startPos = errorMsgStr.find('[', startPos);
endPos = (startPos != std::string::npos) ? errorMsgStr.find(']', startPos) : std::string::npos;
if (startPos != std::string::npos && endPos != std::string::npos && endPos == startPos + 6) {
sqlState = errorMsgStr.substr(startPos + 1, 5); // Extract the SQL state
primaryErrorMsg = errorMsgStr.substr(severityEndPos + 2, startPos - (severityEndPos + 2)); // Extract the primary error message up to the SQL state
} else {
sqlState = get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION); // Default SQL state
primaryErrorMsg = errorMsgStr.substr(severityEndPos + 2); // No SQL state found, remainder is the error message
}
fill_error_info(err_info, sqlState.c_str(), primaryErrorMsg.c_str(), severity.c_str());
}
*/
void reset_error_info(PgSQL_ErrorInfo& err_info, bool release_extented) {
err_info.sqlstate[0] = '\0';
err_info.code = PGSQL_ERROR_CODES::ERRCODE_SUCCESSFUL_COMPLETION;

@ -3358,10 +3358,9 @@ void PgSQL_HostGroups_Manager::destroy_MyConn_from_pool(PgSQL_Connection *c, boo
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && c->send_quit && queue.size() < __sync_fetch_and_add(&GloMTH->variables.connpoll_reset_queue_length,0)) {
if (c->async_state_machine==ASYNC_IDLE) {
// overall, the backend seems healthy and so it is the connection. Try to reset it
int myerr=mysql_errno(c->pgsql);
if (myerr >= 2000 && myerr < 3000) {
if (c->is_connection_in_reusable_state() == false) {
// client library error . We must not try to save the connection
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Not trying to reset PgSQL_Connection %p, server %s:%d . Error code %d\n", c, mysrvc->address, mysrvc->port, myerr);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Not trying to reset PgSQL_Connection %p, server %s:%d . Error %s\n", c, mysrvc->address, mysrvc->port, c->get_error_code_with_message().c_str());
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Trying to reset PgSQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port);
to_del=false;

@ -1627,6 +1627,42 @@ unsigned int PgSQL_Protocol::copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Q
return size;
}
unsigned int PgSQL_Protocol::copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result) {
assert(pg_query_result);
// we are currently not using result. It is just for future use
const unsigned int size = 1 + 4; // I, length
bool alloced_new_buffer = false;
unsigned char* _ptr = pg_query_result->buffer_reserve_space(size);
// buffer is not enough to store the new row. Remember we have already pushed data to PSarrayOUT
if (_ptr == NULL) {
_ptr = (unsigned char*)l_alloc(size);
alloced_new_buffer = true;
}
PG_pkt pgpkt(_ptr, size);
pgpkt.put_char('I');
pgpkt.put_uint32(size - 1);
if (send == true) {
// not supported
//(*myds)->PSarrayOUT->add((void*)_ptr, size);
}
pg_query_result->resultset_size += size;
if (alloced_new_buffer) {
// we created new buffer
//pg_query_result->buffer_to_PSarrayOut();
pg_query_result->PSarrayOUT.add(_ptr, size);
}
pg_query_result->pkt_count++;
return size;
}
unsigned int PgSQL_Protocol::copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status) {
assert(pg_query_result);
@ -1677,7 +1713,7 @@ PgSQL_Query_Result::PgSQL_Query_Result() {
num_fields = 0;
num_rows = 0;
pkt_count = 0;
result_packet_type = PGSQL_QUERY_RESULT_EMPTY;
result_packet_type = PGSQL_QUERY_RESULT_NO_DATA;
}
PgSQL_Query_Result::~PgSQL_Query_Result() {
@ -1759,6 +1795,12 @@ unsigned int PgSQL_Query_Result::add_error(const PGresult* result) {
return size;
}
unsigned int PgSQL_Query_Result::add_empty_query_response(const PGresult* result) {
const unsigned int bytes = proto->copy_empty_query_response_to_PgSQL_Query_Result(false, this, result);
result_packet_type |= PGSQL_QUERY_RESULT_EMPTY;
return bytes;
}
unsigned int PgSQL_Query_Result::add_ready_status(PGTransactionStatusType txn_status) {
const unsigned int bytes = proto->copy_ready_status_to_PgSQL_Query_Result(false, this, txn_status);
buffer_to_PSarrayOut();
@ -1843,5 +1885,5 @@ void PgSQL_Query_Result::reset() {
num_fields = 0;
num_rows = 0;
pkt_count = 0;
result_packet_type = PGSQL_QUERY_RESULT_EMPTY;
result_packet_type = PGSQL_QUERY_RESULT_NO_DATA;
}

@ -1851,7 +1851,7 @@ int PgSQL_Session::handler_again___status_PINGING_SERVER() {
}
else { // rc==-1
int myerr = mysql_errno(myconn->pgsql);
detected_broken_connection(__FILE__, __LINE__, __func__, "during ping", myconn, myerr, mysql_error(myconn->pgsql), true);
detected_broken_connection(__FILE__, __LINE__, __func__, "during ping", myconn, true);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myerr);
}
myds->destroy_MySQL_Connection_From_Pool(false);
@ -2031,22 +2031,8 @@ bool PgSQL_Session::handler_again___verify_init_connect() {
if (tmp_init_connect) {
// we send init connect queries only if set
mybe->server_myds->myconn->options.init_connect = strdup(tmp_init_connect);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
NEXT_IMMEDIATE_NEW(SETTING_INIT_CONNECT);
}
}
@ -2101,18 +2087,8 @@ bool PgSQL_Session::handler_again___verify_backend_session_track_gtids() {
mybe->server_myds->myconn->options.session_track_gtids_int =
SpookyHash::Hash32((char*)"OWN_GTID", strlen((char*)"OWN_GTID"), 10);
// we now switch status to set session_track_gtids
switch (status) {
case PROCESSING_QUERY:
case PROCESSING_STMT_PREPARE:
case PROCESSING_STMT_EXECUTE:
previous_status.push(status);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
NEXT_IMMEDIATE_NEW(SETTING_SESSION_TRACK_GTIDS);
}
return ret;
@ -2200,45 +2176,15 @@ bool PgSQL_Session::handler_again___verify_backend_autocommit() {
// enforce_autocommit_on_reads is disabled
// we need to check if it is a SELECT not FOR UPDATE
if (CurrentQuery.is_select_NOT_for_update() == false) {
//previous_status.push(PROCESSING_QUERY);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT);
}
}
else {
// in every other cases, enforce autocommit
//previous_status.push(PROCESSING_QUERY);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
NEXT_IMMEDIATE_NEW(CHANGING_AUTOCOMMIT);
}
}
@ -2287,44 +2233,14 @@ bool PgSQL_Session::handler_again___verify_backend_user_schema() {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session %p , client: %s , backend: %s\n", this, client_myds->myconn->userinfo->schemaname, mybe->server_myds->myconn->userinfo->schemaname);
if (client_myds->myconn->userinfo->hash != mybe->server_myds->myconn->userinfo->hash) {
if (strcmp(client_myds->myconn->userinfo->username, myds->myconn->userinfo->username)) {
//previous_status.push(PROCESSING_QUERY);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
mybe->server_myds->wait_until = thread->curtime + pgsql_thread___connect_timeout_server * 1000; // max_timeout
NEXT_IMMEDIATE_NEW(CHANGING_USER_SERVER);
}
if (strcmp(client_myds->myconn->userinfo->schemaname, myds->myconn->userinfo->schemaname)) {
//previous_status.push(PROCESSING_QUERY);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
NEXT_IMMEDIATE_NEW(CHANGING_SCHEMA);
}
}
@ -2334,18 +2250,9 @@ bool PgSQL_Session::handler_again___verify_backend_user_schema() {
// the backend connection has some session variable set
// that the client never asked for
// because we can't unset variables, we will reset the connection
switch (status) {
case PROCESSING_QUERY:
case PROCESSING_STMT_PREPARE:
case PROCESSING_STMT_EXECUTE:
previous_status.push(status);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
//
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
mybe->server_myds->wait_until = thread->curtime + pgsql_thread___connect_timeout_server * 1000; // max_timeout
NEXT_IMMEDIATE_NEW(CHANGING_USER_SERVER);
}
@ -2385,7 +2292,7 @@ bool PgSQL_Session::handler_again___status_SETTING_INIT_CONNECT(int* _rc) {
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting INIT CONNECT", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting INIT CONNECT", myconn);
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if (rc != -2) { // see PMC-10003
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
@ -2493,7 +2400,7 @@ bool PgSQL_Session::handler_again___status_SETTING_LDAP_USER_VARIABLE(int* _rc)
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting LDAP USER VARIABLE", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting LDAP USER VARIABLE", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
@ -2580,7 +2487,7 @@ bool PgSQL_Session::handler_again___status_SETTING_SQL_LOG_BIN(int* _rc) {
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting SQL_LOG_BIN", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting SQL_LOG_BIN", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
@ -2660,7 +2567,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_CHARSET(int* _rc) {
}
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "during SET NAMES", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "during SET NAMES", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
@ -2806,7 +2713,7 @@ bool PgSQL_Session::handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, co
// client error, serious
std::string action = "while setting ";
action += var_name;
detected_broken_connection(__FILE__, __LINE__, __func__, action.c_str(), myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, action.c_str(), myconn);
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
@ -2925,7 +2832,7 @@ bool PgSQL_Session::handler_again___status_SETTING_MULTI_STMT(int* _rc) {
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting MYSQL_OPTION_MULTI_STATEMENTS", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting MYSQL_OPTION_MULTI_STATEMENTS", myconn);
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
@ -3000,7 +2907,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_SCHEMA(int* _rc) {
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "during INIT_DB", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "during INIT_DB", myconn);
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
@ -3276,7 +3183,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_USER_SERVER(int* _rc) {
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "during CHANGE_USER", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "during CHANGE_USER", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
@ -3381,7 +3288,7 @@ bool PgSQL_Session::handler_again___status_CHANGING_AUTOCOMMIT(int* _rc) {
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "during SET AUTOCOMMIT", myconn, myerr, mysql_error(myconn->pgsql));
detected_broken_connection(__FILE__, __LINE__, __func__, "during SET AUTOCOMMIT", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
@ -4656,35 +4563,31 @@ __get_pkts_from_client:
int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgSQL_Data_Stream* myds) {
PgSQL_Connection* myconn = myds->myconn;
// the query failed
if (
// due to #774 , we now read myconn->server_status instead of myconn->parent->status
(myconn->server_status == MYSQL_SERVER_STATUS_OFFLINE_HARD) // the query failed because the server is offline hard
||
(myconn->server_status == MYSQL_SERVER_STATUS_SHUNNED && myconn->parent->shunned_automatic == true && myconn->parent->shunned_and_kill_all_connections == true) // the query failed because the server is shunned due to a serious failure
||
(myconn->server_status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) // slave is lagging! see #774
) {
if (myconn->IsServerOffline()) {
// Set maximum connect time if connect timeout is configured
if (pgsql_thread___connect_timeout_server_max) {
myds->max_connect_time = thread->curtime + pgsql_thread___connect_timeout_server_max * 1000;
}
// Variables to track retry and error conditions
bool retry_conn = false;
if (myconn->server_status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
thread->status_variables.stvar[st_var_backend_lagging_during_query]++;
proxy_error("Detected a lagging server during query: %s, %d\n", myconn->parent->address, myconn->parent->port);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_LAGGING_SRV);
}
else {
} else {
thread->status_variables.stvar[st_var_backend_offline_during_query]++;
proxy_error("Detected an offline server during query: %s, %d\n", myconn->parent->address, myconn->parent->port);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_OFFLINE_SRV);
}
// Retry the query if retries are allowed and conditions permit
if (myds->query_retries_on_failure > 0) {
myds->query_retries_on_failure--;
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if (myds->myconn->query_result && myds->myconn->query_result->is_transfer_started()) {
// transfer to frontend has started, we cannot retry
}
else {
} else {
retry_conn = true;
proxy_warning("Retrying query.\n");
}
@ -4694,23 +4597,8 @@ int PgSQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(PgS
myds->fd = 0;
if (retry_conn) {
myds->DSS = STATE_NOT_INITIALIZED;
//previous_status.push(PROCESSING_QUERY);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
return 1;
}
return -1;
@ -4852,25 +4740,24 @@ void PgSQL_Session::handler_rc0_PROCESSING_STMT_EXECUTE(PgSQL_Data_Stream* myds)
// now it returns:
// true: NEXT_IMMEDIATE(CONNECTING_SERVER) needs to be called
// false: continue
bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds, int myerr, char** errmsg) {
bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) {
PgSQL_Connection* myconn = myds->myconn;
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "running query", myconn, myerr, mysql_error(myconn->pgsql), true);
detected_broken_connection(__FILE__, __LINE__, __func__, "running query", myconn, true);
if (myds->query_retries_on_failure > 0) {
myds->query_retries_on_failure--;
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if (myds->myconn->query_result && myds->myconn->query_result->is_transfer_started()) {
if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) {
if (myconn->query_result && myconn->query_result->is_transfer_started()) {
// transfer to frontend has started, we cannot retry
}
else {
if (myds->myconn->pgsql->server_status & SERVER_MORE_RESULTS_EXIST) {
} else {
// a hack to check if we have pending results. This should never occur.
if (myconn->get_last_result() != nullptr) {
// transfer to frontend has started, because this is, at least,
// the second resultset coming from the server
// we cannot retry
proxy_warning("Disabling query retry because SERVER_MORE_RESULTS_EXIST is set\n");
}
else {
proxy_warning("Disabling query retry because we were in middle of processing results\n");
} else {
retry_conn = true;
proxy_warning("Retrying query.\n");
}
@ -4881,46 +4768,23 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds, i
myds->fd = 0;
if (retry_conn) {
myds->DSS = STATE_NOT_INITIALIZED;
//previous_status.push(PROCESSING_QUERY);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
if (*errmsg) {
free(*errmsg);
*errmsg = NULL;
}
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
return true;
}
if (*errmsg) {
free(*errmsg);
*errmsg = NULL;
}
return false;
}
// this function was inline
void PgSQL_Session::handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn, int myerr, char* errmsg) {
void PgSQL_Session::handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn) {
if (pgsql_thread___verbose_query_error) {
proxy_warning("Error during query on (%d,%s,%d,%lu) , user \"%s@%s\" , schema \"%s\" , %s . digest_text = \"%s\"\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"), client_myds->myconn->userinfo->schemaname, myconn->get_error_code_with_message().c_str(), CurrentQuery.QueryParserArgs.digest_text);
}
else {
proxy_warning("Error during query on (%d,%s,%d,%lu): %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->get_error_code_with_message().c_str());
}
PgHGM->add_pgsql_errors(myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"), client_myds->myconn->userinfo->schemaname, myerr, (char*)(errmsg ? errmsg : mysql_error(myconn->pgsql)));
PgHGM->add_pgsql_errors(myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, client_myds->myconn->userinfo->username, (client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"), client_myds->myconn->userinfo->schemaname, 9999, (char*)myconn->get_error_code_with_message().c_str());
}
@ -4930,12 +4794,12 @@ void PgSQL_Session::handler_minus1_LogErrorDuringQuery(PgSQL_Connection* myconn,
// if handler_ret == -1 : return
// if handler_ret == 0 : NEXT_IMMEDIATE(CONNECTING_SERVER) needs to be called
// false: continue
bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int myerr, char** errmsg, int& handler_ret) {
bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int& handler_ret) {
bool retry_conn = false;
PgSQL_Connection* myconn = myds->myconn;
handler_ret = 0; // default
switch (myerr) {
case 1317: // Query execution was interrupted
switch (myconn->get_error_code()) {
case PGSQL_ERROR_CODES::ERRCODE_QUERY_CANCELED: // Query execution was interrupted
if (killed == true) { // this session is being kiled
handler_ret = -1;
return true;
@ -4945,60 +4809,32 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int
break;
}
break;
case 1047: // WSREP has not yet prepared node for application use
case 1053: // Server shutdown in progress
myconn->parent->connect_error(myerr);
case PGSQL_ERROR_CODES::ERRCODE_ADMIN_SHUTDOWN: // Server shutdown in progress. Requested by Admin
case PGSQL_ERROR_CODES::ERRCODE_CRASH_SHUTDOWN: // Server shutdown in progress
case PGSQL_ERROR_CODES::ERRCODE_CANNOT_CONNECT_NOW: // Server in initialization mode and not ready to handle new connections
myconn->parent->connect_error(9999);
if (myds->query_retries_on_failure > 0) {
myds->query_retries_on_failure--;
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
if ((myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) {
retry_conn = true;
proxy_warning("Retrying query.\n");
}
}
switch (myerr) {
case 1047: // WSREP has not yet prepared node for application use
case 1053: // Server shutdown in progress
myds->destroy_MySQL_Connection_From_Pool(false);
break;
default:
if (pgsql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
}
else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
break;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myconn = myds->myconn;
myds->fd = 0;
if (retry_conn) {
myds->DSS = STATE_NOT_INITIALIZED;
//previous_status.push(PROCESSING_QUERY);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
if (*errmsg) {
free(*errmsg);
*errmsg = NULL;
}
set_previous_status_mode3(false);
return true; // it will call NEXT_IMMEDIATE(CONNECTING_SERVER);
//NEXT_IMMEDIATE(CONNECTING_SERVER);
}
//handler_ret = -1;
//return handler_ret;
break;
case 1153: // ER_NET_PACKET_TOO_LARGE
proxy_warning("Error ER_NET_PACKET_TOO_LARGE during query on (%d,%s,%d,%lu): %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myerr, mysql_error(myconn->pgsql));
case PGSQL_ERROR_CODES::ERRCODE_OUT_OF_MEMORY:
proxy_warning("Error OUT_OF_MEMORY during query on (%d,%s,%d,%lu): %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->get_error_code_with_message().c_str());
break;
default:
break; // continue normally
@ -5007,7 +4843,8 @@ bool PgSQL_Session::handler_minus1_HandleErrorCodes(PgSQL_Data_Stream* myds, int
}
// this function used to be inline.
void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn, bool& wrong_pass) {
void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, bool& wrong_pass) {
PgSQL_Connection* myconn = myds->myconn;
switch (status) {
case PROCESSING_QUERY:
if (myconn) {
@ -5067,24 +4904,22 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds,
}
// this function was inline
void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn) {
if (myds->myconn) {
myds->myconn->reduce_auto_increment_delay_token();
if (pgsql_thread___multiplexing && (myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
void PgSQL_Session::handler_minus1_HandleBackendConnection(PgSQL_Data_Stream* myds) {
PgSQL_Connection* myconn = myds->myconn;
if (myconn) {
myconn->reduce_auto_increment_delay_token();
if (pgsql_thread___multiplexing && (myconn->reusable == true) && myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false) {
myds->DSS = STATE_NOT_INITIALIZED;
if (mysql_thread___autocommit_false_not_reusable && myds->myconn->IsAutoCommit() == false) {
if (mysql_thread___autocommit_false_not_reusable && myconn->IsAutoCommit() == false) {
if (pgsql_thread___reset_connection_algorithm == 2) {
create_new_session_and_reset_connection(myds);
}
else {
} else {
myds->destroy_MySQL_Connection_From_Pool(true);
}
}
else {
} else {
myds->return_MySQL_Connection_To_Pool();
}
}
else {
} else {
myconn->async_state_machine = ASYNC_IDLE;
myds->DSS = STATE_MARIADB_GENERIC;
}
@ -5316,25 +5151,12 @@ handler_again:
}
if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) {
// we don't have a backend yet
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
// It saves the current processing status of the session (status) onto the previous_status stack
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
// It transitions the session to the CONNECTING_SERVER state immediately.
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
else {
} else {
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
mybe->server_myds->max_connect_time = 0;
@ -5546,16 +5368,16 @@ handler_again:
else {
if (rc == -1) {
// the query failed
int myerr = mysql_errno(myconn->pgsql);
char* errmsg = NULL;
if (myerr == 0) {
if (CurrentQuery.mysql_stmt) {
const bool is_error_present = myconn->is_error_present(); // false means failure is due to server being in OFFLINE state
if (is_error_present == false) {
/*if (CurrentQuery.mysql_stmt) {
myerr = mysql_stmt_errno(CurrentQuery.mysql_stmt);
errmsg = strdup(mysql_stmt_error(CurrentQuery.mysql_stmt));
}
}*/
}
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myerr);
CurrentQuery.mysql_stmt = NULL; // immediately reset mysql_stmt
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, 9999); // TOFIX
//CurrentQuery.mysql_stmt = NULL; // immediately reset mysql_stmt
int rc1 = handler_ProcessingQueryError_CheckBackendConnectionStatus(myds);
if (rc1 == -1) {
handler_ret = -1;
@ -5565,28 +5387,25 @@ handler_again:
if (rc1 == 1)
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
if (myerr >= 2000 && myerr < 3000) {
if (handler_minus1_ClientLibraryError(myds, myerr, &errmsg)) {
if (myconn->is_connection_in_reusable_state() == false) {
if (handler_minus1_ClientLibraryError(myds)) {
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
else {
} else {
handler_ret = -1;
return handler_ret;
}
}
else {
handler_minus1_LogErrorDuringQuery(myconn, myerr, errmsg);
if (handler_minus1_HandleErrorCodes(myds, myerr, &errmsg, handler_ret)) {
} else {
handler_minus1_LogErrorDuringQuery(myconn);
if (handler_minus1_HandleErrorCodes(myds, handler_ret)) {
if (handler_ret == 0)
NEXT_IMMEDIATE(CONNECTING_SERVER);
return handler_ret;
}
handler_minus1_GenerateErrorMessage(myds, myconn, wrong_pass);
handler_minus1_GenerateErrorMessage(myds, wrong_pass);
RequestEnd(myds);
handler_minus1_HandleBackendConnection(myds, myconn);
handler_minus1_HandleBackendConnection(myds);
}
}
else {
} else {
switch (rc) {
// rc==1 , query is still running
// start sending to frontend if pgsql_thread___threshold_resultset_size is reached
@ -5620,10 +5439,7 @@ handler_again:
}
}
}
goto __exit_DSS__STATE_NOT_INITIALIZED;
}
break;
@ -7922,7 +7738,7 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da
PgSQL_Query_Result* query_result = _conn->query_result;
if (query_result && query_result->get_result_packet_type() != PGSQL_QUERY_RESULT_EMPTY) {
if (query_result && query_result->get_result_packet_type() != PGSQL_QUERY_RESULT_NO_DATA) {
bool transfer_started = query_result->is_transfer_started();
// if there is an error, it will be false so results are not cached
bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY);
@ -8729,22 +8545,22 @@ void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_C
l_free(pkt.size, pkt.ptr);
}
void PgSQL_Session::detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, int myerr, const char* message, bool verbose) {
char* msg = (char*)message;
if (msg == NULL) {
msg = (char*)"Detected offline server prior to statement execution";
}
if (myerr == 0) {
myerr = ER_PROXYSQL_OFFLINE_SRV;
msg = (char*)"Detected offline server prior to statement execution";
void PgSQL_Session::detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, bool verbose) {
const char* code = PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION);;
const char* msg = "Detected offline server prior to statement execution";
if (myconn->is_error_present() == true) {
code = myconn->get_error_code_str();
msg = myconn->get_error_message().c_str();
}
unsigned long long last_used = thread->curtime - myconn->last_time_used;
last_used /= 1000;
if (verbose) {
proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , FD (Conn:%d , MyDS:%d) , user %s , last_used %llums ago : %d, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->myds->fd, myconn->fd, myconn->userinfo->username, last_used, myerr, msg);
}
else {
proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , user %s , last_used %llums ago : %d, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->userinfo->username, last_used, myerr, msg);
proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , FD (Conn:%d , MyDS:%d) , user %s , last_used %llums ago : %s, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->myds->fd, myconn->fd, myconn->userinfo->username, last_used, code, msg);
} else {
proxy_error_inline(file, line, func, "Detected a broken connection while %s on (%d,%s,%d,%lu) , user %s , last_used %llums ago : %s, %s\n", action, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myconn->get_mysql_thread_id(), myconn->userinfo->username, last_used, code, msg);
}
}
@ -8789,3 +8605,49 @@ void PgSQL_Session::reset_warning_hostgroup_flag_and_release_connection()
warning_in_hg = -1;
}
}
/**
* @brief Sets the previous status of the PgSQL session according to the current status, with an option to allow EXECUTE statements.
*
* This method updates the previous status of the PgSQL session based on its current status. It employs a switch statement
* to determine the current status and then pushes the corresponding status value onto the `previous_status` stack. If the
* `allow_execute` parameter is set to true and the current status is `PROCESSING_STMT_EXECUTE`, the method pushes this status
* onto the stack; otherwise, it skips pushing the status for EXECUTE statements. If the current status does not match any known
* status value (which should not occur under normal circumstances), the method asserts to indicate a programming error.
* It currently works with only 3 possible status:
* - PROCESSING_QUERY
* - PROCESSING_STMT_PREPARE
* - PROCESSING_STMT_EXECUTE
*
* @param allow_execute A boolean value indicating whether to allow the status of EXECUTE statements to be pushed onto the
* `previous_status` stack. If set to true, the method will include EXECUTE statements in the session's status history.
*
* @return void.
* @note This method assumes that the `status` member variable has been properly initialized with one of the predefined
* status values.
* @note This method is primarily used to maintain a history of the session's previous states for later reference or
* recovery purposes.
* @note The LCOV_EXCL_START and LCOV_EXCL_STOP directives are used to exclude the assert statement from code coverage
* analysis because the condition should not occur during normal execution and is included as a safeguard against
* programming errors.
*/
void PgSQL_Session::set_previous_status_mode3(bool allow_execute) {
switch (status) {
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
if (allow_execute == true) {
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
}
default:
// LCOV_EXCL_START
assert(0); // Assert to indicate an unexpected status value
break;
// LCOV_EXCL_STOP
}
}

Loading…
Cancel
Save