Checking the data stream on both ends doesn’t apply to frontend connections, since response data is buffered during extended queries.

Fixed TAP test
v3.0_optimizations_and_stability
Rahim Kanji 6 months ago
parent 7a3a5c71df
commit 7c665b9f78

@ -438,7 +438,8 @@ void PgSQL_Data_Stream::shut_hard() {
}
void PgSQL_Data_Stream::check_data_flow() {
if ((PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) {
// This does not apply to frontend data streams because response data is buffered during extended queries.
if ((myds_type != MYDS_FRONTEND) && (PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) {
// there is data at both sides of the data stream: this is considered a fatal error
proxy_error("Session=%p, DataStream=%p -- Data at both ends of a PgSQL data stream: IN <%d bytes %d packets> , OUT <%d bytes %d packets>\n", sess, this, PSarrayIN->len, queue_data(queueIN), PSarrayOUT->len, queue_data(queueOUT));
shut_soft();

@ -152,65 +152,55 @@ void consume_results(PGconn* conn) {
PGresult* res = nullptr;
bool saw_error = false;
std::string errmsg;
// Keep looping until PQgetResult() returns NULL and
// connection is not busy anymore.
for (;;) {
// Drain all immediately available results
while ((res = PQgetResult(conn)) != nullptr) {
ExecStatusType status = PQresultStatus(res);
if (status == PGRES_FATAL_ERROR) {
saw_error = true;
errmsg = PQresultErrorMessage(res);
}
if (status == PGRES_PIPELINE_ABORTED) {
// If pipeline was aborted, we need to clear the error
else if (status == PGRES_PIPELINE_ABORTED) {
saw_error = true;
errmsg = std::string("Pipeline aborted : ") + PQresultErrorMessage(res);
errmsg = std::string("Pipeline aborted: ") + PQresultErrorMessage(res);
}
PQclear(res);
}
if (!PQisBusy(conn)) {
while ((res = PQgetResult(conn)) != nullptr) {
// Ensure all results are drained
while ((res = PQgetResult(conn)) != nullptr)
PQclear(res);
}
break; // ReadyForQuery reached
break;
}
// ---- handle flushing + reading ----
int f = PQflush(conn);
if (f == -1) {
if (PQflush(conn) == -1) {
throw std::runtime_error(std::string("PQflush failed: ") + PQerrorMessage(conn));
}
short events = POLLIN;
if (f == 1) {
// still data to send also watch POLLOUT
events |= POLLOUT;
}
struct pollfd pfd;
pfd.fd = PQsocket(conn);
pfd.events = events;
if (pfd.fd < 0) {
pfd.events = POLLIN | POLLOUT;
if (pfd.fd < 0)
throw std::runtime_error("Invalid PostgreSQL socket");
}
if (poll(&pfd, 1, -1) < 0) {
if (poll(&pfd, 1, -1) < 0)
throw std::runtime_error("poll() failed");
}
// If socket readable consume input
if (pfd.revents & POLLIN) {
if (PQconsumeInput(conn) == 0) {
throw std::runtime_error(
std::string("PQconsumeInput failed: ") + PQerrorMessage(conn));
}
if ((pfd.revents & POLLIN) && PQconsumeInput(conn) == 0) {
throw std::runtime_error(std::string("PQconsumeInput failed: ") + PQerrorMessage(conn));
}
// If socket writable and f==1 PQflush() will be retried on next iteration
}
if (saw_error) {
throw std::runtime_error("PostgreSQL error: " + errmsg);
}
// call PQgetResult() one final time to clear any leftover
while ((res = PQgetResult(conn)) != nullptr)
PQclear(res);
}
void test_query_processor(PGconn* admin_conn) {

Loading…
Cancel
Save