Merge pull request #48 from sysown/v2.x_postgres_backend

Added Multi-Statement support for PostgreSQL
v2.x_pg_PrepStmtBase_240714
René Cannaò 2 years ago committed by GitHub
commit 526113dd59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -531,6 +531,7 @@ public:
return PQprotocolVersion(pgsql_conn);
}
inline
bool is_error_present() const {
if (error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL ||
error_info.severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR ||
@ -540,30 +541,36 @@ public:
return false;
}
inline
PGSQL_ERROR_SEVERITY get_error_severity() const {
return error_info.severity;
}
inline
PGSQL_ERROR_CATEGORY get_error_category() const {
return error_info.category;
}
inline
const std::string& get_error_message() const {
return error_info.message;
}
std::string get_error_code_with_message() const {
return ("[" + std::string(error_info.sqlstate) + "] " + error_info.message);
}
const char* get_error_code_str() const {
inline
const char* get_error_code_str() const {
return error_info.sqlstate;
}
inline
PGSQL_ERROR_CODES get_error_code() const {
return error_info.code;
}
inline
std::string get_error_code_with_message() const {
return ("[" + std::string(error_info.sqlstate) + "] " + error_info.message);
}
void set_error(const char* code, const char* message, bool is_fatal) {
PgSQL_Error_Helper::fill_error_info(error_info, code, message, is_fatal ? "FATAL" : "ERROR");
}
@ -573,8 +580,15 @@ public:
PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL : PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR);
}
// safety check. Sometimes libpq return garbage result when connection is lost with the backend
bool is_error_result_valid(const PGresult* result) const {
if (result == nullptr)
return false;
return (PQresultErrorField(result, PG_DIAG_SQLSTATE) != nullptr);
}
void set_error_from_result(const PGresult* result, uint16_t ext_fields = 0) {
if (result) {
if (is_error_result_valid(result)) {
PgSQL_Error_Helper::fill_error_info(error_info, result, ext_fields);
} else {
const char* errmsg = PQerrorMessage(pgsql_conn);
@ -583,37 +597,20 @@ public:
}
}
void reset_error() {
reset_error_info(error_info, false);
}
PGresult* get_last_result() const {
return last_result;
}
void set_last_result(PGresult* result) {
if (last_result) {
PQclear(last_result);
}
void reset_error() { reset_error_info(error_info, false); }
last_result = result;
}
void reset_last_result() {
if (last_result) {
PQclear(last_result);
last_result = nullptr;
}
}
PGresult* get_result();
void next_multi_statement_result(PGresult* result);
bool set_single_row_mode();
void optimize() {}
//PgSQL_Conn_Param conn_params;
PgSQL_ErrorInfo error_info;
PGconn* pgsql_conn;
PGresult* last_result;
PGresult* pgsql_result;
PgSQL_Query_Result* query_result;
PgSQL_Query_Result* query_result_reuse;
bool first_result;
bool new_result;
//PgSQL_SrvC* parent;
//PgSQL_Connection_userinfo* userinfo;
//PgSQL_Data_Stream* myds;

@ -232,6 +232,7 @@ private:
uint8_t result_packet_type;
friend class PgSQL_Protocol;
friend class PgSQL_Connection;
};
class PgSQL_Protocol : public MySQL_Protocol {

@ -2951,19 +2951,23 @@ bool PgSQL_Connection_Placeholder::get_gtid(char *buff, uint64_t *trx_id) {
PgSQL_Connection::PgSQL_Connection() {
pgsql_conn = NULL;
last_result = NULL;
pgsql_result = NULL;
query_result = NULL;
query_result_reuse = NULL;
first_result = true;
new_result = true;
reset_error();
}
PgSQL_Connection::~PgSQL_Connection() {
reset_last_result();
if (userinfo) {
delete userinfo;
userinfo = NULL;
}
if (pgsql_result) {
PQclear(pgsql_result);
pgsql_result = NULL;
}
if (pgsql_conn) {
PQfinish(pgsql_conn);
pgsql_conn = NULL;
@ -3137,7 +3141,8 @@ handler_again:
if (async_exit_status) {
next_event(ASYNC_QUERY_CONT);
} else {
if (is_error_present()) {
if (is_error_present() ||
!set_single_row_mode()) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
@ -3149,7 +3154,7 @@ handler_again:
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
first_result = true;
new_result = true;
if (myds->sess->mirror == false) {
if (query_result_reuse == NULL) {
query_result = new PgSQL_Query_Result();
@ -3195,16 +3200,26 @@ handler_again:
break;
}
const PGresult* result = get_last_result();
//PGresult* result = get_result();
std::unique_ptr<PGresult, decltype(&PQclear)> result(get_result(), PQclear);
if (result) {
auto state = PQresultStatus(result);
switch (state) {
const ExecStatusType exec_status_type = PQresultStatus(result.get());
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) {
next_multi_statement_result(result.release());
next_event(ASYNC_USE_RESULT_START);
break;
}
switch (exec_status_type) {
case PGRES_COMMAND_OK:
query_result->add_command_completion(result);
query_result->add_command_completion(result.get());
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_EMPTY_QUERY:
query_result->add_empty_query_response(result);
query_result->add_empty_query_response(result.get());
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
break;
case PGRES_TUPLES_OK:
@ -3220,11 +3235,17 @@ handler_again:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
default:
// if on previous call we encountered a FATAL error, we will not process the result, as it will contain residual protocol messages
// from the broken connection
if (is_error_present() == true && get_error_severity() == PGSQL_ERROR_SEVERITY::ERRSEVERITY_FATAL) {
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}
// we don't have a command completion, empty query responseor error packet in the result. This check is here to
// handle internal cleanup of libpq that might return residual protocol messages from the broken connection and
// may add multiple final packets.
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
set_error_from_result(result, PGSQL_ERROR_FIELD_ALL);
//if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL);
assert(is_error_present());
// we will not send FATAL error messages to the client
@ -3233,26 +3254,32 @@ handler_again:
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING ||
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) {
query_result->add_error(result);
query_result->add_error(result.get());
}
const PGSQL_ERROR_CATEGORY error_category = get_error_category();
if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) {
proxy_error("Error: %s\n", get_error_code_with_message().c_str());
proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement);
}
}
//}
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}
if (first_result == true) {
query_result->add_row_description(result);
first_result = false;
if (new_result == true) {
query_result->add_row_description(result.get());
new_result = false;
}
if (PQntuples(result) > 0) {
unsigned int br = query_result->add_row(result);
/*if (state == PGRES_COMMAND_OK ||
state == PGRES_EMPTY_QUERY ||
state == PGRES_TUPLES_OK) {
new_result = true;
}*/
if (PQntuples(result.get()) > 0) {
unsigned int br = query_result->add_row(result.get());
__sync_fetch_and_add(&parent->bytes_recv, br);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br;
myds->bytes_info.bytes_recv += br;
@ -3268,7 +3295,7 @@ handler_again:
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping
}
} else {
query_result->add_command_completion(result);
query_result->add_command_completion(result.get());
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}
}
@ -3283,7 +3310,7 @@ handler_again:
// finally add ready for query packet
query_result->add_ready_status(PQtransactionStatus(pgsql_conn));
//processing_multi_statement = false;
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
break;
@ -3294,7 +3321,8 @@ handler_again:
} else {
unknown_transaction_status = false;
}
reset_last_result();
// should be NULL
assert(!pgsql_result);
break;
/* case ASYNC_CHANGE_USER_START:
change_user_start();
@ -4089,6 +4117,7 @@ void PgSQL_Connection::connect_cont(short event) {
void PgSQL_Connection::query_start() {
PROXY_TRACE();
reset_error();
processing_multi_statement = false;
async_exit_status = PG_EVENT_NONE;
if (PQsendQuery(pgsql_conn, query.ptr) == 0) {
// WARNING: DO NOT RELEASE this PGresult
@ -4114,20 +4143,17 @@ void PgSQL_Connection::fetch_result_start() {
PROXY_TRACE();
reset_error();
async_exit_status = PG_EVENT_NONE;
if (PQsetSingleRowMode(pgsql_conn) == 0) {
// WARNING: DO NOT RELEASE this PGresult
const PGresult* result = PQgetResultFromPGconn(pgsql_conn);
set_error_from_result(result);
proxy_error("Failed to set single row mode. %s\n", get_error_code_with_message().c_str());
return;
}
}
void PgSQL_Connection::fetch_result_cont(short event) {
PROXY_TRACE();
reset_last_result();
async_exit_status = PG_EVENT_NONE;
// Avoid fetching a new result if one is already available.
// This situation can happen when a multi-statement query has been executed.
if (pgsql_result)
return;
if (PQconsumeInput(pgsql_conn) == 0) {
// WARNING: DO NOT RELEASE this PGresult
const PGresult* result = PQgetResultFromPGconn(pgsql_conn);
@ -4139,6 +4165,7 @@ void PgSQL_Connection::fetch_result_cont(short event) {
set_error_from_result(result);
proxy_error("Failed to consume input. %s\n", get_error_code_with_message().c_str());
}
return;
}
if (PQisBusy(pgsql_conn)) {
@ -4146,7 +4173,7 @@ void PgSQL_Connection::fetch_result_cont(short event) {
return;
}
set_last_result(PQgetResult(pgsql_conn));
pgsql_result = PQgetResult(pgsql_conn);
}
void PgSQL_Connection::flush() {
@ -4276,7 +4303,10 @@ void PgSQL_Connection::async_free_result() {
// query.stmt = NULL;
//}
}
reset_last_result();
if (pgsql_result) {
PQclear(pgsql_result);
pgsql_result = NULL;
}
compute_unknown_transaction_status();
async_state_machine = ASYNC_IDLE;
if (query_result) {
@ -4286,7 +4316,7 @@ void PgSQL_Connection::async_free_result() {
query_result_reuse = query_result;
query_result = NULL;
}
first_result = false;
new_result = false;
}
int PgSQL_Connection::async_set_autocommit(short event, bool ac) {
@ -4440,7 +4470,7 @@ int PgSQL_Connection::async_query(short event, char* stmt, unsigned long length,
return 0;
}
}
if (async_state_machine == ASYNC_NEXT_RESULT_START) {
if (async_state_machine == ASYNC_USE_RESULT_START) {
// if we reached this point it measn we are processing a multi-statement
// and we need to exit to give control to MySQL_Session
processing_multi_statement = true;
@ -4570,3 +4600,28 @@ bool PgSQL_Connection::is_connection_in_reusable_state() const {
assert(!(conn_usable == false && is_error_present() == false));
return conn_usable;
}
PGresult* PgSQL_Connection::get_result() {
PGresult* result_tmp = pgsql_result;
pgsql_result = nullptr;
return result_tmp;
}
bool PgSQL_Connection::set_single_row_mode() {
assert(pgsql_conn);
if (PQsetSingleRowMode(pgsql_conn) == 0) {
// WARNING: DO NOT RELEASE this PGresult
const PGresult* result = PQgetResultFromPGconn(pgsql_conn);
set_error_from_result(result);
proxy_error("Failed to set single row mode. %s\n", get_error_code_with_message().c_str());
return false;
}
return true;
}
void PgSQL_Connection::next_multi_statement_result(PGresult* result) {
// set unprocessed result to pgsql_result
pgsql_result = result;
// copy buffer to PSarrayOut
query_result->buffer_to_PSarrayOut();
}

@ -165,7 +165,7 @@ static void * HGCU_thread_run() {
myconn->reset();
PgHGM->increase_reset_counter();
myconn=(PgSQL_Connection *)conn_array->index(i);
if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) {
if (myconn->pgsql && myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) {
PgSQL_Connection_userinfo *userinfo = myconn->userinfo;
char *auth_password = NULL;
if (userinfo->password) {
@ -209,7 +209,7 @@ static void * HGCU_thread_run() {
usleep(50);
for (i=0;i<(int)conn_array->len;i++) {
myconn=(PgSQL_Connection *)conn_array->index(i);
if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) {
if (myconn->pgsql && myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) {
statuses[i]=wait_for_pgsql(myconn->pgsql, statuses[i]);
if (myconn->pgsql->net.pvio && myconn->pgsql->net.fd && myconn->pgsql->net.buff) {
if ((statuses[i] & MYSQL_WAIT_TIMEOUT) == 0) {

@ -1749,6 +1749,7 @@ void PgSQL_Query_Result::init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds,
conn = _conn;
myds = _myds;
buffer_init();
reset();
if (proto == NULL) {
return; // this is a mirror

@ -4151,11 +4151,9 @@ bool PgSQL_Session::handler_minus1_ClientLibraryError(PgSQL_Data_Stream* myds) {
if (myconn->query_result && myconn->query_result->is_transfer_started()) {
// transfer to frontend has started, we cannot retry
} else {
// a hack to check if we have pending results. This should never occur.
if (myconn->get_last_result() != nullptr) {
// transfer to frontend has started, because this is, at least,
// the second resultset coming from the server
// we cannot retry
// This should never occur.
if (myconn->processing_multi_statement == true) {
// we are in the process of retriving results from a multi-statement query
proxy_warning("Disabling query retry because we were in middle of processing results\n");
} else {
retry_conn = true;
@ -4642,7 +4640,7 @@ handler_again:
if (rc == 0) {
if (active_transactions != 0) { // run this only if currently we think there is a transaction
if (myconn->pgsql && (myconn->pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) { // there is no transaction on the backend connection
if (myconn->IsKnownActiveTransaction() == false) { // there is no transaction on the backend connection
active_transactions = NumActiveTransactions(); // we check all the hostgroups/backends
if (active_transactions == 0)
transaction_started_at = 0; // reset it
@ -4655,8 +4653,8 @@ handler_again:
// see bug #3549
if (locked_on_hostgroup >= 0) {
assert(myconn != NULL);
assert(myconn->pgsql != NULL);
autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT;
assert(myconn->pgsql_conn != NULL);
//autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT;
}
if (mirror == false && myconn->pgsql) {
@ -4773,7 +4771,6 @@ handler_again:
if (myconn->query_result_reuse) {
delete myconn->query_result_reuse;
}
//myconn->query_result->reset_pid = false;
myconn->query_result_reuse = myconn->query_result;
myconn->query_result = NULL;
}
@ -7009,7 +7006,8 @@ void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PgSQL_Connection* _conn, PgSQL_Da
bool is_tuple = query_result->get_result_packet_type() == (PGSQL_QUERY_RESULT_TUPLE | PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_READY);
bool resultset_completed = query_result->get_resultset(client_myds->PSarrayOUT);
CurrentQuery.rows_sent = query_result->get_num_rows();
assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called
if (_conn->processing_multi_statement == false)
assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called
if (transfer_started == false) { // we have all the resultset when PgSQL_Result_to_PgSQL_wire was called
if (qpo && qpo->cache_ttl > 0 && is_tuple == true) { // the resultset should be cached
/*if (mysql_errno(pgsql) == 0 &&

@ -25,6 +25,15 @@
res; \
})
#define PQSENDQUERY(conn,query) ({int send_status = PQsendQuery(conn,query); \
if (send_status != 1) { \
fprintf(stderr, "File %s, line %d, status %d, %s\n", \
__FILE__, __LINE__, status, PQerrorMessage(conn)); \
} \
send_status; \
})
CommandLine cl;
PGconn* create_new_connection(bool with_ssl) {
@ -35,18 +44,18 @@ PGconn* create_new_connection(bool with_ssl) {
if (with_ssl) {
ss << " sslmode=require";
} else {
} else {
ss << " sslmode=disable";
}
PGconn* conn = PQconnectdb(ss.str().c_str());
}
PGconn* conn = PQconnectdb(ss.str().c_str());
const bool res = (conn && PQstatus(conn) == CONNECTION_OK);
ok(res, "Connection created successfully. %s", PQerrorMessage(conn));
if (res) return conn;
PQfinish(conn);
return nullptr;
PQfinish(conn);
return nullptr;
}
// Function to set up the test environment
@ -259,6 +268,324 @@ void test_constraint_violation(PGconn* conn) {
PQclear(res);
}
void test_multi_statement_transaction(PGconn* conn) {
PGresult* res;
int status;
// Execute multi-statement transaction
status = PQsendQuery(conn, "BEGIN; "
"INSERT INTO test_table (value) VALUES ('multi statement'); "
"UPDATE test_table SET value = 'multi statement updated' WHERE value = 'multi statement'; "
"COMMIT;");
ok(status == 1, "Multi-statement transaction sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
// Check result of BEGIN
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN executed successfully");
PQclear(res);
// Check result of INSERT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully");
PQclear(res);
// Check result of UPDATE
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully");
PQclear(res);
// Check result of COMMIT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "COMMIT executed successfully");
PQclear(res);
res = PQgetResult(conn);
ok(PQtransactionStatus(conn) == PQTRANS_IDLE, "Connection in Idle state");
// Verify the results
status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement updated'");
ok(status == 1, "Verification query sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
int nRows = PQntuples(res);
ok(nRows == 1, "Multi-statement transaction committed correctly");
char* result = PQgetvalue(res, 0, 0);
ok(strcmp(result, "multi statement updated") == 0, "Multi-statement transaction result is correct");
} else {
ok(0, "Failed to verify multi-statement transaction");
}
PQclear(res);
PQgetResult(conn);
}
void test_multi_statement_transaction_with_error(PGconn* conn) {
PGresult* res;
int status;
// Execute multi-statement transaction with an error
status = PQSENDQUERY(conn, "BEGIN; "
"INSERT INTO test_table (value) VALUES ('multi statement error'); "
"UPDATE test_table SET value = 'multi statement error updated' WHERE value = 'multi statement error'; "
"INSERT INTO test_table (non_existent_column) VALUES ('error'); "
"COMMIT;");
ok(status == 1, "Multi-statement transaction with error sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
// Check result of BEGIN
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN executed successfully");
PQclear(res);
// Check result of INSERT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully");
PQclear(res);
// Check result of UPDATE
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully");
PQclear(res);
// Check result of erroneous INSERT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_FATAL_ERROR, "Erroneous INSERT failed as expected");
PQclear(res);
PQgetResult(conn);
// Ensure the transaction is in error state
ok(PQtransactionStatus(conn) == PQTRANS_INERROR, "Connection in Error Transaction state");
// Rollback the transaction
status = PQsendQuery(conn, "ROLLBACK");
ok(status == 1, "ROLLBACK sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "ROLLBACK executed successfully");
PQclear(res);
PQgetResult(conn);
ok(PQtransactionStatus(conn) == PQTRANS_IDLE, "Connection in Idle state");
// Verify the results
status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement error' OR value = 'multi statement error updated'");
ok(status == 1, "Verification query sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
int nRows = PQntuples(res);
ok(nRows == 0, "Multi-statement transaction with error rolled back correctly");
} else {
ok(0, "Failed to verify rollback of multi-statement transaction with error");
}
PQclear(res);
PQgetResult(conn);
}
void test_multi_statement_select_insert(PGconn* conn) {
PGresult* res;
int status;
// Execute multi-statement SELECT and INSERT
status = PQsendQuery(conn, "SELECT value FROM test_table WHERE id = 1; "
"INSERT INTO test_table (value) VALUES ('multi statement select insert');");
ok(status == 1, "Multi-statement SELECT and INSERT sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
// Check result of SELECT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_TUPLES_OK, "SELECT executed successfully");
PQclear(res);
// Check result of INSERT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully");
PQclear(res);
PQgetResult(conn);
// Verify the results
status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement select insert'");
ok(status == 1, "Verification query sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
int nRows = PQntuples(res);
ok(nRows == 1, "Multi-statement SELECT and INSERT committed correctly");
char* result = PQgetvalue(res, 0, 0);
ok(strcmp(result, "multi statement select insert") == 0, "Multi-statement SELECT and INSERT result is correct");
} else {
ok(0, "Failed to verify multi-statement SELECT and INSERT");
}
PQclear(res);
PQgetResult(conn);
}
void test_multi_statement_delete_update(PGconn* conn) {
PGresult* res;
int status;
// Execute multi-statement DELETE and UPDATE
status = PQsendQuery(conn, "DELETE FROM test_table WHERE value = 'test1'; "
"UPDATE test_table SET value = 'multi statement delete update' WHERE value = 'test4';");
ok(status == 1, "Multi-statement DELETE and UPDATE sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
// Check result of DELETE
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "DELETE executed successfully");
PQclear(res);
// Check result of UPDATE
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully");
PQclear(res);
PQgetResult(conn);
// Verify the results
status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement delete update'");
ok(status == 1, "Verification query sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
int nRows = PQntuples(res);
ok(nRows == 1, "Multi-statement DELETE and UPDATE committed correctly");
char* result = PQgetvalue(res, 0, 0);
ok(strcmp(result, "multi statement delete update") == 0, "Multi-statement DELETE and UPDATE result is correct");
} else {
ok(0, "Failed to verify multi-statement DELETE and UPDATE");
}
PQclear(res);
PQgetResult(conn);
}
void test_multi_statement_with_error(PGconn* conn) {
PGresult* res;
int status;
// Execute multi-statement with an error
status = PQsendQuery(conn, "INSERT INTO test_table (value) VALUES ('multi statement error'); "
"UPDATE test_table SET value = 'multi statement error updated' WHERE value = 'multi statement error'; "
"INSERT INTO test_table (non_existent_column) VALUES ('error');");
ok(status == 1, "Multi-statement with error sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
// Check result of INSERT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully");
PQclear(res);
// Check result of UPDATE
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "UPDATE executed successfully");
PQclear(res);
// Check result of erroneous INSERT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_FATAL_ERROR, "Erroneous INSERT failed as expected");
PQclear(res);
PQgetResult(conn);
// Verify the results
status = PQsendQuery(conn, "SELECT value FROM test_table WHERE value = 'multi statement error' OR value = 'multi statement error updated'");
ok(status == 1, "Verification query sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
int nRows = PQntuples(res);
ok(nRows == 0, "No rows are inserted or updated");
} else {
ok(0, "Failed to verify rows from multi-statement with error");
}
PQclear(res);
PQgetResult(conn);
}
void test_multi_statement_insert_select_select(PGconn* conn) {
PGresult* res;
int status;
// Execute multi-statement INSERT, SELECT, and SELECT
status = PQsendQuery(conn, "INSERT INTO test_table (value) VALUES ('multi statement select1'), ('multi statement select2'); "
"SELECT value FROM test_table WHERE value = 'multi statement select1'; "
"SELECT value FROM test_table WHERE value = 'multi statement select2';");
ok(status == 1, "Multi-statement INSERT and SELECTs sent");
PQconsumeInput(conn);
while (PQisBusy(conn)) {
PQconsumeInput(conn);
}
// Check result of the INSERT
res = PQgetResult(conn);
ok(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT executed successfully");
PQclear(res);
// Check result of the first SELECT
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
int nRows = PQntuples(res);
ok(nRows == 1, "First SELECT executed successfully");
if (nRows > 0) {
char* result = PQgetvalue(res, 0, 0);
ok(strcmp(result, "multi statement select1") == 0, "First SELECT result is correct");
}
} else {
ok(0, "First SELECT failed");
}
PQclear(res);
// Check result of the second SELECT
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) {
int nRows = PQntuples(res);
ok(nRows == 1, "Second SELECT executed successfully");
if (nRows > 0) {
char* result = PQgetvalue(res, 0, 0);
ok(strcmp(result, "multi statement select2") == 0, "Second SELECT result is correct");
}
} else {
ok(0, "Second SELECT failed");
}
PQclear(res);
PQgetResult(conn);
}
void teardown_database(PGconn* conn) {
PGresult* res;
@ -271,7 +598,7 @@ void test_invalid_connection(bool with_ssl) {
std::stringstream ss;
ss << "host=invalid_host port=invalid_port dbname=invalid_db user=invalid_user password=invalid_password";
if (with_ssl) {
ss << " sslmode=require";
} else {
@ -300,6 +627,12 @@ void execute_tests(bool with_ssl) {
test_transaction_error(conn);
test_null_value(conn);
test_constraint_violation(conn);
test_multi_statement_transaction(conn);
test_multi_statement_transaction_with_error(conn);
test_multi_statement_select_insert(conn);
test_multi_statement_delete_update(conn);
test_multi_statement_with_error(conn);
test_multi_statement_insert_select_select(conn);
teardown_database(conn);
test_invalid_connection(with_ssl);
@ -307,8 +640,8 @@ void execute_tests(bool with_ssl) {
}
int main(int argc, char** argv) {
plan(88); // Total number of tests planned
plan(176); // Total number of tests planned
if (cl.getEnv())
return exit_status();

Loading…
Cancel
Save