diff --git src/interfaces/libpq/fe-exec.c src/interfaces/libpq/fe-exec.c index b833e76..51ad8d8 100644 --- src/interfaces/libpq/fe-exec.c +++ src/interfaces/libpq/fe-exec.c @@ -4558,3 +4558,65 @@ int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result) { return psHandleRowData(conn, is_first_packet, result); } +int +PQsendPipelineSync(PGconn *conn) +{ + PGcmdQueueEntry *entry; + + if (!conn) + return 0; + + if (conn->pipelineStatus == PQ_PIPELINE_OFF) + { + libpq_append_conn_error(conn, "cannot send pipeline when not in pipeline mode"); + return 0; + } + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + /* should be unreachable */ + appendPQExpBufferStr(&conn->errorMessage, + "internal error: cannot send pipeline while in COPY\n"); + return 0; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_IDLE: + case PGASYNC_PIPELINE_IDLE: + /* OK to send sync */ + break; + } + + entry = pqAllocCmdQueueEntry(conn); + if (entry == NULL) + return 0; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + /* + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. + */ + if (pqPipelineFlush(conn) < 0) + goto sendFailed; + + /* OK, it's launched! */ + pqAppendCmdQueueEntry(conn, entry); + + return 1; + +sendFailed: + pqRecycleCmdQueueEntry(conn, entry); + /* error message should be set up already */ + return 0; +} diff --git src/interfaces/libpq/libpq-fe.h src/interfaces/libpq/libpq-fe.h index 47f25e0..b769b64 100644 --- src/interfaces/libpq/libpq-fe.h +++ src/interfaces/libpq/libpq-fe.h @@ -688,6 +688,9 @@ extern const PGresult *PQgetResultFromPGconn(PGconn *conn); /* ProxySQL special handler function */ extern int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result); +/* Send a pipeline sync message without flushing the send buffer */ +extern int PQsendPipelineSync(PGconn *conn); + #ifdef __cplusplus } #endif