Backport PQsendPipelineSync from PostgreSQL 17 and update code to use it

- Backport PQsendPipelineSync to PostgreSQL 16.3, enabling pipeline
  synchronization without flushing the send buffer.
- Replace calls to PQPipelineSync in code with PQsendPipelineSync
  to use the new functionality.
v3.0_optimizations_and_stability
Rahim Kanji 4 months ago
parent f1426ff99e
commit 9fa3d75fb3

1
deps/Makefile vendored

@ -306,6 +306,7 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a:
cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch
cd postgresql/postgresql && patch -p0 < ../fmt_err_msg.patch
cd postgresql/postgresql && patch -p0 < ../bind_fmt_text.patch
cd postgresql/postgresql && patch -p0 < ../pqsendpipelinesync.patch
#cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0"
cd postgresql/postgresql && LD_LIBRARY_PATH="$(SSL_LDIR)" ./configure --with-ssl=openssl --with-includes="$(SSL_IDIR)" --with-libraries="$(SSL_LDIR)" --without-readline
cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0

@ -0,0 +1,84 @@
diff --git src/interfaces/libpq/fe-exec.c src/interfaces/libpq/fe-exec.c
index b833e76..51ad8d8 100644
--- src/interfaces/libpq/fe-exec.c
+++ src/interfaces/libpq/fe-exec.c
@@ -4558,3 +4558,65 @@ int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result) {
return psHandleRowData(conn, is_first_packet, result);
}
+int
+PQsendPipelineSync(PGconn *conn)
+{
+ PGcmdQueueEntry *entry;
+
+ if (!conn)
+ return 0;
+
+ if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+ {
+ libpq_append_conn_error(conn, "cannot send pipeline when not in pipeline mode");
+ return 0;
+ }
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ /* should be unreachable */
+ appendPQExpBufferStr(&conn->errorMessage,
+ "internal error: cannot send pipeline while in COPY\n");
+ return 0;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ case PGASYNC_IDLE:
+ case PGASYNC_PIPELINE_IDLE:
+ /* OK to send sync */
+ break;
+ }
+
+ entry = pqAllocCmdQueueEntry(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
+ entry->queryclass = PGQUERY_SYNC;
+ entry->query = NULL;
+
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+
+ /*
+ * Give the data a push (in pipeline mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
+ */
+ if (pqPipelineFlush(conn) < 0)
+ goto sendFailed;
+
+ /* OK, it's launched! */
+ pqAppendCmdQueueEntry(conn, entry);
+
+ return 1;
+
+sendFailed:
+ pqRecycleCmdQueueEntry(conn, entry);
+ /* error message should be set up already */
+ return 0;
+}
diff --git src/interfaces/libpq/libpq-fe.h src/interfaces/libpq/libpq-fe.h
index 47f25e0..b769b64 100644
--- src/interfaces/libpq/libpq-fe.h
+++ src/interfaces/libpq/libpq-fe.h
@@ -688,6 +688,9 @@ extern const PGresult *PQgetResultFromPGconn(PGconn *conn);
/* ProxySQL special handler function */
extern int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result);
+/* Send a pipeline sync message without flushing the send buffer */
+extern int PQsendPipelineSync(PGconn *conn);
+
#ifdef __cplusplus
}
#endif

@ -1663,8 +1663,7 @@ void PgSQL_Connection::stmt_prepare_start() {
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
@ -1730,8 +1729,7 @@ void PgSQL_Connection::stmt_describe_start() {
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
@ -1755,8 +1753,7 @@ void PgSQL_Connection::resync_start() {
PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this);
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
proxy_error("Failed to send pipeline sync.\n");
resync_failed = true;
return;
@ -1878,8 +1875,7 @@ void PgSQL_Connection::stmt_execute_start() {
return;
}
} else {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;
@ -1905,8 +1901,7 @@ void PgSQL_Connection::reset_session_start() {
reset_session_in_pipeline = is_pipeline_active();
if (reset_session_in_pipeline) {
// FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher
if (PQpipelineSync(pgsql_conn) == 0) {
if (PQsendPipelineSync(pgsql_conn) == 0) {
set_error_from_PQerrorMessage();
proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str());
return;

Loading…
Cancel
Save