diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index cf32e7c16..faa61bc90 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -531,6 +531,7 @@ public: return PQprotocolVersion(pgsql_conn); } + inline bool is_error_present() const { if (error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL || error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR || @@ -540,30 +541,36 @@ public: return false; } + inline PGSQL_ERROR_SEVERITY get_error_severity() const { return error_info.severity; } + inline PGSQL_ERROR_CATEGORY get_error_category() const { return error_info.category; } + inline const std::string& get_error_message() const { return error_info.message; } - std::string get_error_code_with_message() const { - return ("[" + std::string(error_info.sqlstate) + "] " + error_info.message); - } - - const char* get_error_code_str() const { + inline + const char* get_error_code_str() const { return error_info.sqlstate; } + inline PGSQL_ERROR_CODES get_error_code() const { return error_info.code; } + inline + std::string get_error_code_with_message() const { + return ("[" + std::string(error_info.sqlstate) + "] " + error_info.message); + } + void set_error(const char* code, const char* message, bool is_fatal) { PgSQL_Error_Helper::fill_error_info(error_info, code, message, is_fatal ? "FATAL" : "ERROR"); } @@ -573,8 +580,15 @@ public: PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL : PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR); } + // safety check. Sometimes libpq return garbage result when connection is lost with the backend + bool is_error_result_valid(const PGresult* result) const { + if (result == nullptr) + return false; + return (PQresultErrorField(result, PG_DIAG_SQLSTATE) != nullptr); + } + void set_error_from_result(const PGresult* result, uint16_t ext_fields = 0) { - if (result) { + if (is_error_result_valid(result)) { PgSQL_Error_Helper::fill_error_info(error_info, result, ext_fields); } else { const char* errmsg = PQerrorMessage(pgsql_conn); @@ -583,37 +597,20 @@ public: } } - void reset_error() { - reset_error_info(error_info, false); - } - - PGresult* get_last_result() const { - return last_result; - } - - void set_last_result(PGresult* result) { - if (last_result) { - PQclear(last_result); - } + void reset_error() { reset_error_info(error_info, false); } - last_result = result; - } - - void reset_last_result() { - if (last_result) { - PQclear(last_result); - last_result = nullptr; - } - } + PGresult* get_result(); + void next_multi_statement_result(PGresult* result); + bool set_single_row_mode(); void optimize() {} //PgSQL_Conn_Param conn_params; PgSQL_ErrorInfo error_info; PGconn* pgsql_conn; - PGresult* last_result; + PGresult* pgsql_result; PgSQL_Query_Result* query_result; PgSQL_Query_Result* query_result_reuse; - bool first_result; + bool new_result; //PgSQL_SrvC* parent; //PgSQL_Connection_userinfo* userinfo; //PgSQL_Data_Stream* myds; diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index 62b533a8b..8580dea50 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -232,6 +232,7 @@ private: uint8_t result_packet_type; friend class PgSQL_Protocol; + friend class PgSQL_Connection; }; class PgSQL_Protocol : public MySQL_Protocol { diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index 0d574c851..f83b6595c 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -2951,19 +2951,23 @@ bool PgSQL_Connection_Placeholder::get_gtid(char *buff, uint64_t *trx_id) { PgSQL_Connection::PgSQL_Connection() { pgsql_conn = NULL; - last_result = NULL; + pgsql_result = NULL; query_result = NULL; query_result_reuse = NULL; - first_result = true; + new_result = true; reset_error(); } PgSQL_Connection::~PgSQL_Connection() { - reset_last_result(); + if (userinfo) { delete userinfo; userinfo = NULL; } + if (pgsql_result) { + PQclear(pgsql_result); + pgsql_result = NULL; + } if (pgsql_conn) { PQfinish(pgsql_conn); pgsql_conn = NULL; @@ -3137,7 +3141,8 @@ handler_again: if (async_exit_status) { next_event(ASYNC_QUERY_CONT); } else { - if (is_error_present()) { + if (is_error_present() || + !set_single_row_mode()) { NEXT_IMMEDIATE(ASYNC_QUERY_END); } NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); @@ -3149,7 +3154,7 @@ handler_again: if (is_error_present()) { NEXT_IMMEDIATE(ASYNC_QUERY_END); } - first_result = true; + new_result = true; if (myds->sess->mirror == false) { if (query_result_reuse == NULL) { query_result = new PgSQL_Query_Result(); @@ -3195,16 +3200,26 @@ handler_again: break; } - const PGresult* result = get_last_result(); + //PGresult* result = get_result(); + std::unique_ptr result(get_result(), PQclear); + if (result) { - auto state = PQresultStatus(result); - switch (state) { + + const ExecStatusType exec_status_type = PQresultStatus(result.get()); + + if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) { + next_multi_statement_result(result.release()); + next_event(ASYNC_USE_RESULT_START); + break; + } + + switch (exec_status_type) { case PGRES_COMMAND_OK: - query_result->add_command_completion(result); + query_result->add_command_completion(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); break; case PGRES_EMPTY_QUERY: - query_result->add_empty_query_response(result); + query_result->add_empty_query_response(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); break; case PGRES_TUPLES_OK: @@ -3220,11 +3235,17 @@ handler_again: case PGRES_NONFATAL_ERROR: case PGRES_FATAL_ERROR: default: + // if on previous call we encountered a FATAL error, we will not process the result, as it will contain residual protocol messages + // from the broken connection + if (is_error_present() == true && get_error_severity() == PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL) { + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } + // we don't have a command completion, empty query responseor error packet in the result. This check is here to // handle internal cleanup of libpq that might return residual protocol messages from the broken connection and // may add multiple final packets. - if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { - set_error_from_result(result, PGSQL_ERROR_FIELD_ALL); + //if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { + set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL); assert(is_error_present()); // we will not send FATAL error messages to the client @@ -3233,26 +3254,32 @@ handler_again: severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING || severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) { - query_result->add_error(result); + query_result->add_error(result.get()); } const PGSQL_ERROR_CATEGORY error_category = get_error_category(); if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { - proxy_error("Error: %s\n", get_error_code_with_message().c_str()); + proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement); } - } + //} NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); } - if (first_result == true) { - query_result->add_row_description(result); - first_result = false; + if (new_result == true) { + query_result->add_row_description(result.get()); + new_result = false; } - if (PQntuples(result) > 0) { - unsigned int br = query_result->add_row(result); + /*if (state == PGRES_COMMAND_OK || + state == PGRES_EMPTY_QUERY || + state == PGRES_TUPLES_OK) { + new_result = true; + }*/ + + if (PQntuples(result.get()) > 0) { + unsigned int br = query_result->add_row(result.get()); __sync_fetch_and_add(&parent->bytes_recv, br); myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; myds->bytes_info.bytes_recv += br; @@ -3268,7 +3295,7 @@ handler_again: NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping } } else { - query_result->add_command_completion(result); + query_result->add_command_completion(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); } } @@ -3283,7 +3310,7 @@ handler_again: // finally add ready for query packet query_result->add_ready_status(PQtransactionStatus(pgsql_conn)); - + //processing_multi_statement = false; NEXT_IMMEDIATE(ASYNC_QUERY_END); } break; @@ -3294,7 +3321,8 @@ handler_again: } else { unknown_transaction_status = false; } - reset_last_result(); + // should be NULL + assert(!pgsql_result); break; /* case ASYNC_CHANGE_USER_START: change_user_start(); @@ -4089,6 +4117,7 @@ void PgSQL_Connection::connect_cont(short event) { void PgSQL_Connection::query_start() { PROXY_TRACE(); reset_error(); + processing_multi_statement = false; async_exit_status = PG_EVENT_NONE; if (PQsendQuery(pgsql_conn, query.ptr) == 0) { // WARNING: DO NOT RELEASE this PGresult @@ -4114,20 +4143,17 @@ void PgSQL_Connection::fetch_result_start() { PROXY_TRACE(); reset_error(); async_exit_status = PG_EVENT_NONE; - if (PQsetSingleRowMode(pgsql_conn) == 0) { - // WARNING: DO NOT RELEASE this PGresult - const PGresult* result = PQgetResultFromPGconn(pgsql_conn); - set_error_from_result(result); - proxy_error("Failed to set single row mode. %s\n", get_error_code_with_message().c_str()); - return; - } } void PgSQL_Connection::fetch_result_cont(short event) { PROXY_TRACE(); - reset_last_result(); async_exit_status = PG_EVENT_NONE; + // Avoid fetching a new result if one is already available. + // This situation can happen when a multi-statement query has been executed. + if (pgsql_result) + return; + if (PQconsumeInput(pgsql_conn) == 0) { // WARNING: DO NOT RELEASE this PGresult const PGresult* result = PQgetResultFromPGconn(pgsql_conn); @@ -4139,6 +4165,7 @@ void PgSQL_Connection::fetch_result_cont(short event) { set_error_from_result(result); proxy_error("Failed to consume input. %s\n", get_error_code_with_message().c_str()); } + return; } if (PQisBusy(pgsql_conn)) { @@ -4146,7 +4173,7 @@ void PgSQL_Connection::fetch_result_cont(short event) { return; } - set_last_result(PQgetResult(pgsql_conn)); + pgsql_result = PQgetResult(pgsql_conn); } void PgSQL_Connection::flush() { @@ -4276,7 +4303,10 @@ void PgSQL_Connection::async_free_result() { // query.stmt = NULL; //} } - reset_last_result(); + if (pgsql_result) { + PQclear(pgsql_result); + pgsql_result = NULL; + } compute_unknown_transaction_status(); async_state_machine = ASYNC_IDLE; if (query_result) { @@ -4286,7 +4316,7 @@ void PgSQL_Connection::async_free_result() { query_result_reuse = query_result; query_result = NULL; } - first_result = false; + new_result = false; } int PgSQL_Connection::async_set_autocommit(short event, bool ac) { @@ -4440,7 +4470,7 @@ int PgSQL_Connection::async_query(short event, char* stmt, unsigned long length, return 0; } } - if (async_state_machine == ASYNC_NEXT_RESULT_START) { + if (async_state_machine == ASYNC_USE_RESULT_START) { // if we reached this point it measn we are processing a multi-statement // and we need to exit to give control to MySQL_Session processing_multi_statement = true; @@ -4570,3 +4600,28 @@ bool PgSQL_Connection::is_connection_in_reusable_state() const { assert(!(conn_usable == false && is_error_present() == false)); return conn_usable; } + +PGresult* PgSQL_Connection::get_result() { + PGresult* result_tmp = pgsql_result; + pgsql_result = nullptr; + return result_tmp; +} + +bool PgSQL_Connection::set_single_row_mode() { + assert(pgsql_conn); + if (PQsetSingleRowMode(pgsql_conn) == 0) { + // WARNING: DO NOT RELEASE this PGresult + const PGresult* result = PQgetResultFromPGconn(pgsql_conn); + set_error_from_result(result); + proxy_error("Failed to set single row mode. %s\n", get_error_code_with_message().c_str()); + return false; + } + return true; +} + +void PgSQL_Connection::next_multi_statement_result(PGresult* result) { + // set unprocessed result to pgsql_result + pgsql_result = result; + // copy buffer to PSarrayOut + query_result->buffer_to_PSarrayOut(); +} diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 4d0e4cce8..34dd04a48 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -165,7 +165,7 @@ static void * HGCU_thread_run() { myconn->reset(); PgHGM->increase_reset_counter(); myconn=(PgSQL_Connection *)conn_array->index(i); - if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { + if (myconn->pgsql && myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { PgSQL_Connection_userinfo *userinfo = myconn->userinfo; char *auth_password = NULL; if (userinfo->password) { @@ -209,7 +209,7 @@ static void * HGCU_thread_run() { usleep(50); for (i=0;i<(int)conn_array->len;i++) { myconn=(PgSQL_Connection *)conn_array->index(i); - if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { + if (myconn->pgsql && myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { statuses[i]=wait_for_pgsql(myconn->pgsql, statuses[i]); if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) { if ((statuses[i] & MYSQL_WAIT_TIMEOUT) == 0) { diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index c1397a0fe..1b4979b1b 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1749,6 +1749,7 @@ void PgSQL_Query_Result::init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds, conn = _conn; myds = _myds; buffer_init(); + reset(); if (proto == NULL) { return; // this is a mirror diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index be9287df7..25bbab869 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -4151,11 +4151,9 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) { if (myconn->query_result && myconn->query_result->is_transfer_started()) { // transfer to frontend has started, we cannot retry } else { - // a hack to check if we have pending results. This should never occur. - if (myconn->get_last_result() != nullptr) { - // transfer to frontend has started, because this is, at least, - // the second resultset coming from the server - // we cannot retry + // This should never occur. + if (myconn->processing_multi_statement == true) { + // we are in the process of retriving results from a multi-statement query proxy_warning("Disabling query retry because we were in middle of processing results\n"); } else { retry_conn = true; @@ -4642,7 +4640,7 @@ handler_again: if (rc == 0) { if (active_transactions != 0) { // run this only if currently we think there is a transaction - if (myconn->pgsql && (myconn->pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) { // there is no transaction on the backend connection + if (myconn->IsKnownActiveTransaction() == false) { // there is no transaction on the backend connection active_transactions = NumActiveTransactions(); // we check all the hostgroups/backends if (active_transactions == 0) transaction_started_at = 0; // reset it @@ -4655,8 +4653,8 @@ handler_again: // see bug #3549 if (locked_on_hostgroup >= 0) { assert(myconn != NULL); - assert(myconn->pgsql != NULL); - autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT; + assert(myconn->pgsql_conn != NULL); + //autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT; } if (mirror == false && myconn->pgsql) { @@ -4773,7 +4771,6 @@ handler_again: if (myconn->query_result_reuse) { delete myconn->query_result_reuse; } - //myconn->query_result->reset_pid = false; myconn->query_result_reuse = myconn->query_result; myconn->query_result = NULL; } @@ -7009,7 +7006,8 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY); bool resultset_completed = query_result->get_resultset(client_myds->PSarrayOUT); CurrentQuery.rows_sent = query_result->get_num_rows(); - assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called + if (_conn->processing_multi_statement == false) + assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called if (transfer_started == false) { // we have all the resultset when PgSQL_Result_to_PgSQL_wire was called if (qpo && qpo->cache_ttl > 0 && is_tuple == true) { // the resultset should be cached /*if (mysql_errno(pgsql) == 0 && diff --git a/test/tap/tests/pgsql-basic_tests-t.cpp b/test/tap/tests/pgsql-basic_tests-t.cpp index 955fdb5f1..fa94f1044 100644 --- a/test/tap/tests/pgsql-basic_tests-t.cpp +++ b/test/tap/tests/pgsql-basic_tests-t.cpp @@ -25,6 +25,15 @@ res; \ }) +#define PQSENDQUERY(conn,query) ({int send_status = PQsendQuery(conn,query); \ + if (send_status != 1) { \ + fprintf(stderr, "File %s, line %d, status %d, %s\n", \ + __FILE__, __LINE__, status, PQerrorMessage(conn)); \ + } \ + send_status; \ + }) + + CommandLine cl; PGconn* create_new_connection(bool with_ssl) { @@ -35,18 +44,18 @@ PGconn* create_new_connection(bool with_ssl) { if (with_ssl) { ss << " sslmode=require"; - } else { + } else { ss << " sslmode=disable"; - } - - PGconn* conn = PQconnectdb(ss.str().c_str()); + } + + PGconn* conn = PQconnectdb(ss.str().c_str()); const bool res = (conn && PQstatus(conn) == CONNECTION_OK); ok(res, "Connection created successfully. %s", PQerrorMessage(conn)); if (res) return conn; - PQfinish(conn); - return nullptr; + PQfinish(conn); + return nullptr; } // Function to set up the test environment @@ -259,6 +268,324 @@ void test_constraint_violation(PGconn* conn) { PQclear(res); } +void test_multi_statement_transaction(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement transaction + status = PQsendQuery(conn, "BEGIN; " + "INSERT INTO test_table (value) VALUES ('multi statement'); " + "UPDATE test_table SET value = 'multi statement updated' WHERE value = 'multi statement'; " + "COMMIT;"); + ok(status == 1, "Multi-statement transaction sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of BEGIN + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN executed successfully"); + PQclear(res); + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + // Check result of COMMIT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "COMMIT executed successfully"); + PQclear(res); + + res = PQgetResult(conn); + ok(PQtransactionStatus(conn) == PQTRANS_IDLE, "Connection in Idle state"); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement updated'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Multi-statement transaction committed correctly"); + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement updated") == 0, "Multi-statement transaction result is correct"); + } else { + ok(0, "Failed to verify multi-statement transaction"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_transaction_with_error(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement transaction with an error + status = PQSENDQUERY(conn, "BEGIN; " + "INSERT INTO test_table (value) VALUES ('multi statement error'); " + "UPDATE test_table SET value = 'multi statement error updated' WHERE value = 'multi statement error'; " + "INSERT INTO test_table (non_existent_column) VALUES ('error'); " + "COMMIT;"); + ok(status == 1, "Multi-statement transaction with error sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of BEGIN + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN executed successfully"); + PQclear(res); + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + // Check result of erroneous INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_FATAL_ERROR, "Erroneous INSERT failed as expected"); + PQclear(res); + + PQgetResult(conn); + // Ensure the transaction is in error state + ok(PQtransactionStatus(conn) == PQTRANS_INERROR, "Connection in Error Transaction state"); + + // Rollback the transaction + status = PQsendQuery(conn, "ROLLBACK"); + ok(status == 1, "ROLLBACK sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "ROLLBACK executed successfully"); + PQclear(res); + + PQgetResult(conn); + + ok(PQtransactionStatus(conn) == PQTRANS_IDLE, "Connection in Idle state"); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement error' OR value = 'multi statement error updated'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 0, "Multi-statement transaction with error rolled back correctly"); + } else { + ok(0, "Failed to verify rollback of multi-statement transaction with error"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_select_insert(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement SELECT and INSERT + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE id = 1; " + "INSERT INTO test_table (value) VALUES ('multi statement select insert');"); + ok(status == 1, "Multi-statement SELECT and INSERT sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of SELECT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_TUPLES_OK, "SELECT executed successfully"); + PQclear(res); + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + PQgetResult(conn); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement select insert'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Multi-statement SELECT and INSERT committed correctly"); + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement select insert") == 0, "Multi-statement SELECT and INSERT result is correct"); + } else { + ok(0, "Failed to verify multi-statement SELECT and INSERT"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_delete_update(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement DELETE and UPDATE + status = PQsendQuery(conn, "DELETE FROM test_table WHERE value = 'test1'; " + "UPDATE test_table SET value = 'multi statement delete update' WHERE value = 'test4';"); + ok(status == 1, "Multi-statement DELETE and UPDATE sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of DELETE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "DELETE executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + PQgetResult(conn); + + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement delete update'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Multi-statement DELETE and UPDATE committed correctly"); + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement delete update") == 0, "Multi-statement DELETE and UPDATE result is correct"); + } else { + ok(0, "Failed to verify multi-statement DELETE and UPDATE"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_with_error(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement with an error + status = PQsendQuery(conn, "INSERT INTO test_table (value) VALUES ('multi statement error'); " + "UPDATE test_table SET value = 'multi statement error updated' WHERE value = 'multi statement error'; " + "INSERT INTO test_table (non_existent_column) VALUES ('error');"); + ok(status == 1, "Multi-statement with error sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of UPDATE + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully"); + PQclear(res); + + // Check result of erroneous INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_FATAL_ERROR, "Erroneous INSERT failed as expected"); + PQclear(res); + + PQgetResult(conn); + // Verify the results + status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement error' OR value = 'multi statement error updated'"); + ok(status == 1, "Verification query sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 0, "No rows are inserted or updated"); + } else { + ok(0, "Failed to verify rows from multi-statement with error"); + } + PQclear(res); + PQgetResult(conn); +} + +void test_multi_statement_insert_select_select(PGconn* conn) { + PGresult* res; + int status; + + // Execute multi-statement INSERT, SELECT, and SELECT + status = PQsendQuery(conn, "INSERT INTO test_table (value) VALUES ('multi statement select1'), ('multi statement select2'); " + "SELECT value FROM test_table WHERE value = 'multi statement select1'; " + "SELECT value FROM test_table WHERE value = 'multi statement select2';"); + ok(status == 1, "Multi-statement INSERT and SELECTs sent"); + PQconsumeInput(conn); + while (PQisBusy(conn)) { + PQconsumeInput(conn); + } + + // Check result of the INSERT + res = PQgetResult(conn); + ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully"); + PQclear(res); + + // Check result of the first SELECT + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "First SELECT executed successfully"); + if (nRows > 0) { + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement select1") == 0, "First SELECT result is correct"); + } + } else { + ok(0, "First SELECT failed"); + } + PQclear(res); + + // Check result of the second SELECT + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { + int nRows = PQntuples(res); + ok(nRows == 1, "Second SELECT executed successfully"); + if (nRows > 0) { + char* result = PQgetvalue(res, 0, 0); + ok(strcmp(result, "multi statement select2") == 0, "Second SELECT result is correct"); + } + } else { + ok(0, "Second SELECT failed"); + } + PQclear(res); + PQgetResult(conn); +} + void teardown_database(PGconn* conn) { PGresult* res; @@ -271,7 +598,7 @@ void test_invalid_connection(bool with_ssl) { std::stringstream ss; ss << "host=invalid_host port=invalid_port dbname=invalid_db user=invalid_user password=invalid_password"; - + if (with_ssl) { ss << " sslmode=require"; } else { @@ -300,6 +627,12 @@ void execute_tests(bool with_ssl) { test_transaction_error(conn); test_null_value(conn); test_constraint_violation(conn); + test_multi_statement_transaction(conn); + test_multi_statement_transaction_with_error(conn); + test_multi_statement_select_insert(conn); + test_multi_statement_delete_update(conn); + test_multi_statement_with_error(conn); + test_multi_statement_insert_select_select(conn); teardown_database(conn); test_invalid_connection(with_ssl); @@ -307,8 +640,8 @@ void execute_tests(bool with_ssl) { } int main(int argc, char** argv) { - - plan(88); // Total number of tests planned + + plan(176); // Total number of tests planned if (cl.getEnv()) return exit_status();