From b01792cae9125760397ad37a637fba7a9bb3ebe4 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 9 Jul 2024 13:05:46 +0500 Subject: [PATCH 1/4] Added multi-statement support --- include/PgSQL_Connection.h | 55 ++++++------- include/PgSQL_Protocol.h | 1 + lib/PgSQL_Connection.cpp | 132 ++++++++++++++++++++++--------- lib/PgSQL_HostGroups_Manager.cpp | 4 +- lib/PgSQL_Protocol.cpp | 1 + lib/PgSQL_Session.cpp | 19 ++--- 6 files changed, 134 insertions(+), 78 deletions(-) diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index cf32e7c16..faa61bc90 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -531,6 +531,7 @@ public: return PQprotocolVersion(pgsql_conn); } + inline bool is_error_present() const { if (error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL || error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR || @@ -540,30 +541,36 @@ public: return false; } + inline PGSQL_ERROR_SEVERITY get_error_severity() const { return error_info.severity; } + inline PGSQL_ERROR_CATEGORY get_error_category() const { return error_info.category; } + inline const std::string& get_error_message() const { return error_info.message; } - std::string get_error_code_with_message() const { - return ("[" + std::string(error_info.sqlstate) + "] " + error_info.message); - } - - const char* get_error_code_str() const { + inline + const char* get_error_code_str() const { return error_info.sqlstate; } + inline PGSQL_ERROR_CODES get_error_code() const { return error_info.code; } + inline + std::string get_error_code_with_message() const { + return ("[" + std::string(error_info.sqlstate) + "] " + error_info.message); + } + void set_error(const char* code, const char* message, bool is_fatal) { PgSQL_Error_Helper::fill_error_info(error_info, code, message, is_fatal ? "FATAL" : "ERROR"); } @@ -573,8 +580,15 @@ public: PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL : PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR); } + // safety check. Sometimes libpq return garbage result when connection is lost with the backend + bool is_error_result_valid(const PGresult* result) const { + if (result == nullptr) + return false; + return (PQresultErrorField(result, PG_DIAG_SQLSTATE) != nullptr); + } + void set_error_from_result(const PGresult* result, uint16_t ext_fields = 0) { - if (result) { + if (is_error_result_valid(result)) { PgSQL_Error_Helper::fill_error_info(error_info, result, ext_fields); } else { const char* errmsg = PQerrorMessage(pgsql_conn); @@ -583,37 +597,20 @@ public: } } - void reset_error() { - reset_error_info(error_info, false); - } - - PGresult* get_last_result() const { - return last_result; - } - - void set_last_result(PGresult* result) { - if (last_result) { - PQclear(last_result); - } + void reset_error() { reset_error_info(error_info, false); } - last_result = result; - } - - void reset_last_result() { - if (last_result) { - PQclear(last_result); - last_result = nullptr; - } - } + PGresult* get_result(); + void next_multi_statement_result(PGresult* result); + bool set_single_row_mode(); void optimize() {} //PgSQL_Conn_Param conn_params; PgSQL_ErrorInfo error_info; PGconn* pgsql_conn; - PGresult* last_result; + PGresult* pgsql_result; PgSQL_Query_Result* query_result; PgSQL_Query_Result* query_result_reuse; - bool first_result; + bool new_result; //PgSQL_SrvC* parent; //PgSQL_Connection_userinfo* userinfo; //PgSQL_Data_Stream* myds; diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index 62b533a8b..8580dea50 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -232,6 +232,7 @@ private: uint8_t result_packet_type; friend class PgSQL_Protocol; + friend class PgSQL_Connection; }; class PgSQL_Protocol : public MySQL_Protocol { diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index 05b45bc6e..b6ac5c6aa 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -2950,19 +2950,23 @@ bool PgSQL_Connection_Placeholder::get_gtid(char *buff, uint64_t *trx_id) { PgSQL_Connection::PgSQL_Connection() { pgsql_conn = NULL; - last_result = NULL; + pgsql_result = NULL; query_result = NULL; query_result_reuse = NULL; - first_result = true; + new_result = true; reset_error(); } PgSQL_Connection::~PgSQL_Connection() { - reset_last_result(); + if (userinfo) { delete userinfo; userinfo = NULL; } + if (pgsql_result) { + PQclear(pgsql_result); + pgsql_result = NULL; + } if (pgsql_conn) { PQfinish(pgsql_conn); pgsql_conn = NULL; @@ -3133,7 +3137,8 @@ handler_again: if (async_exit_status) { next_event(ASYNC_QUERY_CONT); } else { - if (is_error_present()) { + if (is_error_present() || + !set_single_row_mode()) { NEXT_IMMEDIATE(ASYNC_QUERY_END); } NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); @@ -3145,7 +3150,7 @@ handler_again: if (is_error_present()) { NEXT_IMMEDIATE(ASYNC_QUERY_END); } - first_result = true; + new_result = true; if (myds->sess->mirror == false) { if (query_result_reuse == NULL) { query_result = new PgSQL_Query_Result(); @@ -3191,16 +3196,30 @@ handler_again: break; } - const PGresult* result = get_last_result(); + //PGresult* result = get_result(); + std::unique_ptr result(get_result(), PQclear); + if (result) { - auto state = PQresultStatus(result); - switch (state) { + + const ExecStatusType exec_status_type = PQresultStatus(result.get()); + + if (exec_status_type != PGRES_BAD_RESPONSE && + exec_status_type != PGRES_NONFATAL_ERROR && + exec_status_type != PGRES_FATAL_ERROR) { + if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY /*| PGSQL_QUERY_RESULT_ERROR*/))) { + next_multi_statement_result(result.release()); + next_event(ASYNC_USE_RESULT_START); + break; + } + } + + switch (exec_status_type) { case PGRES_COMMAND_OK: - query_result->add_command_completion(result); + query_result->add_command_completion(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); break; case PGRES_EMPTY_QUERY: - query_result->add_empty_query_response(result); + query_result->add_empty_query_response(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); break; case PGRES_TUPLES_OK: @@ -3216,11 +3235,17 @@ handler_again: case PGRES_NONFATAL_ERROR: case PGRES_FATAL_ERROR: default: + // if on previous call we encountered a FATAL error, we will not process the result, as it will contain residual protocol messages + // from the broken connection + if (is_error_present() == true && get_error_severity() == PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL) { + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } + // we don't have a command completion, empty query responseor error packet in the result. This check is here to // handle internal cleanup of libpq that might return residual protocol messages from the broken connection and // may add multiple final packets. - if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { - set_error_from_result(result, PGSQL_ERROR_FIELD_ALL); + //if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { + set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL); assert(is_error_present()); // we will not send FATAL error messages to the client @@ -3229,26 +3254,32 @@ handler_again: severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING || severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) { - query_result->add_error(result); + query_result->add_error(result.get()); } const PGSQL_ERROR_CATEGORY error_category = get_error_category(); if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { - proxy_error("Error: %s\n", get_error_code_with_message().c_str()); + proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement); } - } + //} NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); } - if (first_result == true) { - query_result->add_row_description(result); - first_result = false; + if (new_result == true) { + query_result->add_row_description(result.get()); + new_result = false; } - if (PQntuples(result) > 0) { - unsigned int br = query_result->add_row(result); + /*if (state == PGRES_COMMAND_OK || + state == PGRES_EMPTY_QUERY || + state == PGRES_TUPLES_OK) { + new_result = true; + }*/ + + if (PQntuples(result.get()) > 0) { + unsigned int br = query_result->add_row(result.get()); __sync_fetch_and_add(&parent->bytes_recv, br); myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; myds->bytes_info.bytes_recv += br; @@ -3264,7 +3295,7 @@ handler_again: NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping } } else { - query_result->add_command_completion(result); + query_result->add_command_completion(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); } } @@ -3279,7 +3310,7 @@ handler_again: // finally add ready for query packet query_result->add_ready_status(PQtransactionStatus(pgsql_conn)); - + //processing_multi_statement = false; NEXT_IMMEDIATE(ASYNC_QUERY_END); } break; @@ -3290,7 +3321,8 @@ handler_again: } else { unknown_transaction_status = false; } - reset_last_result(); + // should be NULL + assert(!pgsql_result); break; /* case ASYNC_CHANGE_USER_START: change_user_start(); @@ -4085,6 +4117,7 @@ void PgSQL_Connection::connect_cont(short event) { void PgSQL_Connection::query_start() { PROXY_TRACE(); reset_error(); + processing_multi_statement = false; async_exit_status = PG_EVENT_NONE; if (PQsendQuery(pgsql_conn, query.ptr) == 0) { // WARNING: DO NOT RELEASE this PGresult @@ -4110,20 +4143,17 @@ void PgSQL_Connection::fetch_result_start() { PROXY_TRACE(); reset_error(); async_exit_status = PG_EVENT_NONE; - if (PQsetSingleRowMode(pgsql_conn) == 0) { - // WARNING: DO NOT RELEASE this PGresult - const PGresult* result = PQgetResultFromPGconn(pgsql_conn); - set_error_from_result(result); - proxy_error("Failed to set single row mode. %s\n", get_error_code_with_message().c_str()); - return; - } } void PgSQL_Connection::fetch_result_cont(short event) { PROXY_TRACE(); - reset_last_result(); async_exit_status = PG_EVENT_NONE; + // Avoid fetching a new result if one is already available. + // This situation can happen when a multi-statement query has been executed. + if (pgsql_result) + return; + if (PQconsumeInput(pgsql_conn) == 0) { // WARNING: DO NOT RELEASE this PGresult const PGresult* result = PQgetResultFromPGconn(pgsql_conn); @@ -4135,6 +4165,7 @@ void PgSQL_Connection::fetch_result_cont(short event) { set_error_from_result(result); proxy_error("Failed to consume input. %s\n", get_error_code_with_message().c_str()); } + return; } if (PQisBusy(pgsql_conn)) { @@ -4142,7 +4173,7 @@ void PgSQL_Connection::fetch_result_cont(short event) { return; } - set_last_result(PQgetResult(pgsql_conn)); + pgsql_result = PQgetResult(pgsql_conn); } void PgSQL_Connection::flush() { @@ -4272,7 +4303,10 @@ void PgSQL_Connection::async_free_result() { // query.stmt = NULL; //} } - reset_last_result(); + if (pgsql_result) { + PQclear(pgsql_result); + pgsql_result = NULL; + } compute_unknown_transaction_status(); async_state_machine = ASYNC_IDLE; if (query_result) { @@ -4282,7 +4316,7 @@ void PgSQL_Connection::async_free_result() { query_result_reuse = query_result; query_result = NULL; } - first_result = false; + new_result = false; } int PgSQL_Connection::async_set_autocommit(short event, bool ac) { @@ -4436,7 +4470,7 @@ int PgSQL_Connection::async_query(short event, char* stmt, unsigned long length, return 0; } } - if (async_state_machine == ASYNC_NEXT_RESULT_START) { + if (async_state_machine == ASYNC_USE_RESULT_START) { // if we reached this point it measn we are processing a multi-statement // and we need to exit to give control to MySQL_Session processing_multi_statement = true; @@ -4565,4 +4599,30 @@ bool PgSQL_Connection::is_connection_in_reusable_state() const { const bool conn_usable = !(txn_status == PQTRANS_UNKNOWN || txn_status == PQTRANS_ACTIVE); assert(!(conn_usable == false && is_error_present() == false)); return conn_usable; -} \ No newline at end of file +} + +PGresult* PgSQL_Connection::get_result() { + PGresult* result_tmp = pgsql_result; + pgsql_result = nullptr; + return result_tmp; +} + +bool PgSQL_Connection::set_single_row_mode() { + assert(pgsql_conn); + if (PQsetSingleRowMode(pgsql_conn) == 0) { + // WARNING: DO NOT RELEASE this PGresult + const PGresult* result = PQgetResultFromPGconn(pgsql_conn); + set_error_from_result(result); + proxy_error("Failed to set single row mode. %s\n", get_error_code_with_message().c_str()); + return false; + } + return true; +} + +void PgSQL_Connection::next_multi_statement_result(PGresult* result) { + // set unprocessed result to pgsql_result + pgsql_result = result; + // copy buffer to PSarrayOut + query_result->buffer_to_PSarrayOut(); + async_exit_status = PG_EVENT_READ | PG_EVENT_WRITE; +} diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 3c6bb7564..6bb72640f 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -627,7 +627,7 @@ static void * HGCU_thread_run() { myconn->reset(); PgHGM->increase_reset_counter(); myconn=(PgSQL_Connection *)conn_array->index(i); - if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { + if (myconn->pgsql && myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { PgSQL_Connection_userinfo *userinfo = myconn->userinfo; char *auth_password = NULL; if (userinfo->password) { @@ -671,7 +671,7 @@ static void * HGCU_thread_run() { usleep(50); for (i=0;i<(int)conn_array->len;i++) { myconn=(PgSQL_Connection *)conn_array->index(i); - if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { + if (myconn->pgsql && myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { statuses[i]=wait_for_pgsql(myconn->pgsql, statuses[i]); if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { if ((statuses[i] & MYSQL_WAIT_TIMEOUT) == 0) { diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index c1397a0fe..1b4979b1b 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1749,6 +1749,7 @@ void PgSQL_Query_Result::init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds, conn = _conn; myds = _myds; buffer_init(); + reset(); if (proto == NULL) { return; // this is a mirror diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index c15cc5a21..670afa41c 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -4751,11 +4751,9 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) { if (myconn->query_result && myconn->query_result->is_transfer_started()) { // transfer to frontend has started, we cannot retry } else { - // a hack to check if we have pending results. This should never occur. - if (myconn->get_last_result() != nullptr) { - // transfer to frontend has started, because this is, at least, - // the second resultset coming from the server - // we cannot retry + // This should never occur. + if (myconn->processing_multi_statement == true) { + // we are in the process of retriving results from a multi-statement query proxy_warning("Disabling query retry because we were in middle of processing results\n"); } else { retry_conn = true; @@ -5290,7 +5288,7 @@ handler_again: if (rc == 0) { if (active_transactions != 0) { // run this only if currently we think there is a transaction - if (myconn->pgsql && (myconn->pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) { // there is no transaction on the backend connection + if (myconn->IsKnownActiveTransaction() == false) { // there is no transaction on the backend connection active_transactions = NumActiveTransactions(); // we check all the hostgroups/backends if (active_transactions == 0) transaction_started_at = 0; // reset it @@ -5303,8 +5301,8 @@ handler_again: // see bug #3549 if (locked_on_hostgroup >= 0) { assert(myconn != NULL); - assert(myconn->pgsql != NULL); - autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT; + assert(myconn->pgsql_conn != NULL); + //autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT; } if (mirror == false && myconn->pgsql) { @@ -5421,11 +5419,9 @@ handler_again: if (myconn->query_result_reuse) { delete myconn->query_result_reuse; } - //myconn->query_result->reset_pid = false; myconn->query_result_reuse = myconn->query_result; myconn->query_result = NULL; } - NEXT_IMMEDIATE(PROCESSING_QUERY); break; // rc==3 , a multi statement query is still running // start sending to frontend if pgsql_thread___threshold_resultset_size is reached @@ -7744,7 +7740,8 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY); bool resultset_completed = query_result->get_resultset(client_myds->PSarrayOUT); CurrentQuery.rows_sent = query_result->get_num_rows(); - assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called + if (_conn->processing_multi_statement == false) + assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called if (transfer_started == false) { // we have all the resultset when PgSQL_Result_to_PgSQL_wire was called if (qpo && qpo->cache_ttl > 0 && is_tuple == true) { // the resultset should be cached /*if (mysql_errno(pgsql) == 0 && From 2766627b3ff86c4b31cc47f57eeed8a35d32ac7d Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 9 Jul 2024 13:23:20 +0500 Subject: [PATCH 2/4] TAP test now checks multi-statement --- test/tap/tests/pgsql-basic_tests-t.cpp | 351 ++++++++++++++++++++++++- 1 file changed, 342 insertions(+), 9 deletions(-) diff --git a/test/tap/tests/pgsql-basic_tests-t.cpp b/test/tap/tests/pgsql-basic_tests-t.cpp index 955fdb5f1..fa94f1044 100644 --- a/test/tap/tests/pgsql-basic_tests-t.cpp +++ b/test/tap/tests/pgsql-basic_tests-t.cpp @@ -25,6 +25,15 @@ res; \ }) +#define PQSENDQUERY(conn,query) ({int send_status = PQsendQuery(conn,query); \ + if (send_status != 1) { \ + fprintf(stderr, "File %s, line %d, status %d, %s\n", \ + __FILE__, __LINE__, status, PQerrorMessage(conn)); \ + } \ + send_status; \ + }) + + CommandLine cl; PGconn* create_new_connection(bool with_ssl) { @@ -35,18 +44,18 @@ PGconn* create_new_connection(bool with_ssl) { if (with_ssl) { ss << " sslmode=require"; - } else { + } else { ss << " sslmode=disable"; - } - - PGconn* conn = PQconnectdb(ss.str().c_str()); + } + + PGconn* conn = PQconnectdb(ss.str().c_str()); const bool res = (conn && PQstatus(conn) == CONNECTION_OK); ok(res, "Connection created successfully. %s", PQerrorMessage(conn)); if (res) return conn; - PQfinish(conn); - return nullptr; + PQfinish(conn); + return nullptr; } // Function to set up the test environment @@ -259,6 +268,324 @@ void test_constraint_violation(PGconn* conn) { PQclear(res); } +void test_multi_statement_transaction(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement transaction + status = PQsendQuery(conn, "BEGIN; " + "INSERT INTO test_table (value) VALUES ('multi statement'); " + "UPDATE test_table SET value = 'multi statement updated' WHERE value = 'multi statement'; " + "COMMIT;"); + ok(status == 1, "Multi-statement transaction sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of BEGIN + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN executed successfully"); + PQclear(res); + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + // Check result of COMMIT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "COMMIT executed successfully"); + PQclear(res); + + res = PQgetResult(conn); + ok(PQtransactionStatus(conn) == PQTRANS_IDLE, "Connection in Idle state"); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement updated'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Multi-statement transaction committed correctly"); + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement updated") == 0, "Multi-statement transaction result is correct"); + } else { + ok(0, "Failed to verify multi-statement transaction"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_transaction_with_error(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement transaction with an error + status = PQSENDQUERY(conn, "BEGIN; " + "INSERT INTO test_table (value) VALUES ('multi statement error'); " + "UPDATE test_table SET value = 'multi statement error updated' WHERE value = 'multi statement error'; " + "INSERT INTO test_table (non_existent_column) VALUES ('error'); " + "COMMIT;"); + ok(status == 1, "Multi-statement transaction with error sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of BEGIN + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN executed successfully"); + PQclear(res); + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + // Check result of erroneous INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_FATAL_ERROR, "Erroneous INSERT failed as expected"); + PQclear(res); + + PQgetResult(conn); + // Ensure the transaction is in error state + ok(PQtransactionStatus(conn) == PQTRANS_INERROR, "Connection in Error Transaction state"); + + // Rollback the transaction + status = PQsendQuery(conn, "ROLLBACK"); + ok(status == 1, "ROLLBACK sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "ROLLBACK executed successfully"); + PQclear(res); + + PQgetResult(conn); + + ok(PQtransactionStatus(conn) == PQTRANS_IDLE, "Connection in Idle state"); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement error' OR value = 'multi statement error updated'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 0, "Multi-statement transaction with error rolled back correctly"); + } else { + ok(0, "Failed to verify rollback of multi-statement transaction with error"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_select_insert(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement SELECT and INSERT + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE id = 1; " + "INSERT INTO test_table (value) VALUES ('multi statement select insert');"); + ok(status == 1, "Multi-statement SELECT and INSERT sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of SELECT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_TUPLES_OK, "SELECT executed successfully"); + PQclear(res); + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + PQgetResult(conn); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement select insert'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Multi-statement SELECT and INSERT committed correctly"); + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement select insert") == 0, "Multi-statement SELECT and INSERT result is correct"); + } else { + ok(0, "Failed to verify multi-statement SELECT and INSERT"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_delete_update(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement DELETE and UPDATE + status = PQsendQuery(conn, "DELETE FROM test_table WHERE value = 'test1'; " + "UPDATE test_table SET value = 'multi statement delete update' WHERE value = 'test4';"); + ok(status == 1, "Multi-statement DELETE and UPDATE sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of DELETE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "DELETE executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + PQgetResult(conn); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement delete update'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Multi-statement DELETE and UPDATE committed correctly"); + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement delete update") == 0, "Multi-statement DELETE and UPDATE result is correct"); + } else { + ok(0, "Failed to verify multi-statement DELETE and UPDATE"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_with_error(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement with an error + status = PQsendQuery(conn, "INSERT INTO test_table (value) VALUES ('multi statement error'); " + "UPDATE test_table SET value = 'multi statement error updated' WHERE value = 'multi statement error'; " + "INSERT INTO test_table (non_existent_column) VALUES ('error');"); + ok(status == 1, "Multi-statement with error sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + // Check result of erroneous INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_FATAL_ERROR, "Erroneous INSERT failed as expected"); + PQclear(res); + + PQgetResult(conn); + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement error' OR value = 'multi statement error updated'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 0, "No rows are inserted or updated"); + } else { + ok(0, "Failed to verify rows from multi-statement with error"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_insert_select_select(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement INSERT, SELECT, and SELECT + status = PQsendQuery(conn, "INSERT INTO test_table (value) VALUES ('multi statement select1'), ('multi statement select2'); " + "SELECT value FROM test_table WHERE value = 'multi statement select1'; " + "SELECT value FROM test_table WHERE value = 'multi statement select2';"); + ok(status == 1, "Multi-statement INSERT and SELECTs sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of the INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of the first SELECT + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "First SELECT executed successfully"); + if (nRows > 0) { + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement select1") == 0, "First SELECT result is correct"); + } + } else { + ok(0, "First SELECT failed"); + } + PQclear(res); + + // Check result of the second SELECT + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Second SELECT executed successfully"); + if (nRows > 0) { + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement select2") == 0, "Second SELECT result is correct"); + } + } else { + ok(0, "Second SELECT failed"); + } + PQclear(res); + PQgetResult(conn); +} + void teardown_database(PGconn* conn) { PGresult* res; @@ -271,7 +598,7 @@ void test_invalid_connection(bool with_ssl) { std::stringstream ss; ss << "host=invalid_host port=invalid_port dbname=invalid_db user=invalid_user password=invalid_password"; - + if (with_ssl) { ss << " sslmode=require"; } else { @@ -300,6 +627,12 @@ void execute_tests(bool with_ssl) { test_transaction_error(conn); test_null_value(conn); test_constraint_violation(conn); + test_multi_statement_transaction(conn); + test_multi_statement_transaction_with_error(conn); + test_multi_statement_select_insert(conn); + test_multi_statement_delete_update(conn); + test_multi_statement_with_error(conn); + test_multi_statement_insert_select_select(conn); teardown_database(conn); test_invalid_connection(with_ssl); @@ -307,8 +640,8 @@ void execute_tests(bool with_ssl) { } int main(int argc, char** argv) { - - plan(88); // Total number of tests planned + + plan(176); // Total number of tests planned if (cl.getEnv()) return exit_status(); From 7e66bfa5809fbd86ea06016da2fae2ca3e945ea7 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 9 Jul 2024 16:17:56 +0500 Subject: [PATCH 3/4] Fixed crash --- lib/Base_Session.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/Base_Session.cpp b/lib/Base_Session.cpp index 7bd3ebe98..7ed8081ff 100644 --- a/lib/Base_Session.cpp +++ b/lib/Base_Session.cpp @@ -65,6 +65,11 @@ void Base_Session::init() { if constexpr (std::is_same_v) { sess_STMTs_meta = new MySQL_STMTs_meta(); SLDH = new StmtLongDataHandler(); + } else if constexpr (std::is_same_v) { + sess_STMTs_meta = NULL; + SLDH = NULL; + } else { + assert(0); } }; From bd842d4f76f3d59420c38f1f963239b896b453a4 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 9 Jul 2024 16:32:33 +0500 Subject: [PATCH 4/4] Ensure writeout is called only after all multi-statement results are fetched and processed --- lib/PgSQL_Connection.cpp | 13 ++++--------- lib/PgSQL_Session.cpp | 1 + 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index b6ac5c6aa..311d7a3ff 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -3203,14 +3203,10 @@ handler_again: const ExecStatusType exec_status_type = PQresultStatus(result.get()); - if (exec_status_type != PGRES_BAD_RESPONSE && - exec_status_type != PGRES_NONFATAL_ERROR && - exec_status_type != PGRES_FATAL_ERROR) { - if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY /*| PGSQL_QUERY_RESULT_ERROR*/))) { - next_multi_statement_result(result.release()); - next_event(ASYNC_USE_RESULT_START); - break; - } + if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) { + next_multi_statement_result(result.release()); + next_event(ASYNC_USE_RESULT_START); + break; } switch (exec_status_type) { @@ -4624,5 +4620,4 @@ void PgSQL_Connection::next_multi_statement_result(PGresult* result) { pgsql_result = result; // copy buffer to PSarrayOut query_result->buffer_to_PSarrayOut(); - async_exit_status = PG_EVENT_READ | PG_EVENT_WRITE; } diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index a98ad2815..b2560f686 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -4795,6 +4795,7 @@ handler_again: myconn->query_result_reuse = myconn->query_result; myconn->query_result = NULL; } + NEXT_IMMEDIATE(PROCESSING_QUERY); break; // rc==3 , a multi statement query is still running // start sending to frontend if pgsql_thread___threshold_resultset_size is reached