@ -152,65 +152,55 @@ void consume_results(PGconn* conn) {
PGresult * res = nullptr ;
bool saw_error = false ;
std : : string errmsg ;
// Keep looping until PQgetResult() returns NULL and
// connection is not busy anymore.
for ( ; ; ) {
// Drain all immediately available results
while ( ( res = PQgetResult ( conn ) ) ! = nullptr ) {
ExecStatusType status = PQresultStatus ( res ) ;
if ( status = = PGRES_FATAL_ERROR ) {
saw_error = true ;
errmsg = PQresultErrorMessage ( res ) ;
}
if ( status = = PGRES_PIPELINE_ABORTED ) {
// If pipeline was aborted, we need to clear the error
else if ( status = = PGRES_PIPELINE_ABORTED ) {
saw_error = true ;
errmsg = std : : string ( " Pipeline aborted : " ) + PQresultErrorMessage ( res ) ;
errmsg = std : : string ( " Pipeline aborted : " ) + PQresultErrorMessage ( res ) ;
}
PQclear ( res ) ;
}
if ( ! PQisBusy ( conn ) ) {
while ( ( res = PQgetResult ( conn ) ) ! = nullptr ) {
// Ensure all results are drained
while ( ( res = PQgetResult ( conn ) ) ! = nullptr )
PQclear ( res ) ;
}
break ; // ReadyForQuery reached
break ;
}
// ---- handle flushing + reading ----
int f = PQflush ( conn ) ;
if ( f = = - 1 ) {
if ( PQflush ( conn ) = = - 1 ) {
throw std : : runtime_error ( std : : string ( " PQflush failed: " ) + PQerrorMessage ( conn ) ) ;
}
short events = POLLIN ;
if ( f = = 1 ) {
// still data to send also watch POLLOUT
events | = POLLOUT ;
}
struct pollfd pfd ;
pfd . fd = PQsocket ( conn ) ;
pfd . events = events ;
if ( pfd . fd < 0 ) {
pfd . events = POLLIN | POLLOUT ;
if ( pfd . fd < 0 )
throw std : : runtime_error ( " Invalid PostgreSQL socket " ) ;
}
if ( poll ( & pfd , 1 , - 1 ) < 0 ) {
if ( poll ( & pfd , 1 , - 1 ) < 0 )
throw std : : runtime_error ( " poll() failed " ) ;
}
// If socket readable consume input
if ( pfd . revents & POLLIN ) {
if ( PQconsumeInput ( conn ) = = 0 ) {
throw std : : runtime_error (
std : : string ( " PQconsumeInput failed: " ) + PQerrorMessage ( conn ) ) ;
}
if ( ( pfd . revents & POLLIN ) & & PQconsumeInput ( conn ) = = 0 ) {
throw std : : runtime_error ( std : : string ( " PQconsumeInput failed: " ) + PQerrorMessage ( conn ) ) ;
}
// If socket writable and f==1 PQflush() will be retried on next iteration
}
if ( saw_error ) {
throw std : : runtime_error ( " PostgreSQL error: " + errmsg ) ;
}
// call PQgetResult() one final time to clear any leftover
while ( ( res = PQgetResult ( conn ) ) ! = nullptr )
PQclear ( res ) ;
}
void test_query_processor ( PGconn * admin_conn ) {