From 9fa3d75fb365d48db4836f9e4d893d35a96a3525 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 29 Oct 2025 16:40:06 +0500 Subject: [PATCH] Backport PQsendPipelineSync from PostgreSQL 17 and update code to use it - Backport PQsendPipelineSync to PostgreSQL 16.3, enabling pipeline synchronization without flushing the send buffer. - Replace calls to PQPipelineSync in code with PQsendPipelineSync to use the new functionality. --- deps/Makefile | 1 + deps/postgresql/pqsendpipelinesync.patch | 84 ++++++++++++++++++++++++ lib/PgSQL_Connection.cpp | 15 ++--- 3 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 deps/postgresql/pqsendpipelinesync.patch diff --git a/deps/Makefile b/deps/Makefile index 59440ec1f..25627777c 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -306,6 +306,7 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a: cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch cd postgresql/postgresql && patch -p0 < ../fmt_err_msg.patch cd postgresql/postgresql && patch -p0 < ../bind_fmt_text.patch + cd postgresql/postgresql && patch -p0 < ../pqsendpipelinesync.patch #cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0" cd postgresql/postgresql && LD_LIBRARY_PATH="$(SSL_LDIR)" ./configure --with-ssl=openssl --with-includes="$(SSL_IDIR)" --with-libraries="$(SSL_LDIR)" --without-readline cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0 diff --git a/deps/postgresql/pqsendpipelinesync.patch b/deps/postgresql/pqsendpipelinesync.patch new file mode 100644 index 000000000..f5eff6624 --- /dev/null +++ b/deps/postgresql/pqsendpipelinesync.patch @@ -0,0 +1,84 @@ +diff --git src/interfaces/libpq/fe-exec.c src/interfaces/libpq/fe-exec.c +index b833e76..51ad8d8 100644 +--- src/interfaces/libpq/fe-exec.c ++++ src/interfaces/libpq/fe-exec.c +@@ -4558,3 +4558,65 @@ int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result) { + return psHandleRowData(conn, is_first_packet, result); + } + ++int ++PQsendPipelineSync(PGconn *conn) ++{ ++ PGcmdQueueEntry *entry; ++ ++ if (!conn) ++ return 0; ++ ++ if (conn->pipelineStatus == PQ_PIPELINE_OFF) ++ { ++ libpq_append_conn_error(conn, "cannot send pipeline when not in pipeline mode"); ++ return 0; ++ } ++ ++ switch (conn->asyncStatus) ++ { ++ case PGASYNC_COPY_IN: ++ case PGASYNC_COPY_OUT: ++ case PGASYNC_COPY_BOTH: ++ /* should be unreachable */ ++ appendPQExpBufferStr(&conn->errorMessage, ++ "internal error: cannot send pipeline while in COPY\n"); ++ return 0; ++ case PGASYNC_READY: ++ case PGASYNC_READY_MORE: ++ case PGASYNC_BUSY: ++ case PGASYNC_IDLE: ++ case PGASYNC_PIPELINE_IDLE: ++ /* OK to send sync */ ++ break; ++ } ++ ++ entry = pqAllocCmdQueueEntry(conn); ++ if (entry == NULL) ++ return 0; /* error msg already set */ ++ ++ entry->queryclass = PGQUERY_SYNC; ++ entry->query = NULL; ++ ++ /* construct the Sync message */ ++ if (pqPutMsgStart('S', conn) < 0 || ++ pqPutMsgEnd(conn) < 0) ++ goto sendFailed; ++ ++ /* ++ * Give the data a push (in pipeline mode, only if we're past the size ++ * threshold). In nonblock mode, don't complain if we're unable to send ++ * it all; PQgetResult() will do any additional flushing needed. ++ */ ++ if (pqPipelineFlush(conn) < 0) ++ goto sendFailed; ++ ++ /* OK, it's launched! */ ++ pqAppendCmdQueueEntry(conn, entry); ++ ++ return 1; ++ ++sendFailed: ++ pqRecycleCmdQueueEntry(conn, entry); ++ /* error message should be set up already */ ++ return 0; ++} +diff --git src/interfaces/libpq/libpq-fe.h src/interfaces/libpq/libpq-fe.h +index 47f25e0..b769b64 100644 +--- src/interfaces/libpq/libpq-fe.h ++++ src/interfaces/libpq/libpq-fe.h +@@ -688,6 +688,9 @@ extern const PGresult *PQgetResultFromPGconn(PGconn *conn); + /* ProxySQL special handler function */ + extern int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result); + ++/* Send a pipeline sync message without flushing the send buffer */ ++extern int PQsendPipelineSync(PGconn *conn); ++ + #ifdef __cplusplus + } + #endif diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index 2f7b9953b..a77da0f74 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -1663,8 +1663,7 @@ void PgSQL_Connection::stmt_prepare_start() { return; } } else { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return; @@ -1730,8 +1729,7 @@ void PgSQL_Connection::stmt_describe_start() { return; } } else { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return; @@ -1755,8 +1753,7 @@ void PgSQL_Connection::resync_start() { PQsetNoticeReceiver(pgsql_conn, &PgSQL_Connection::notice_handler_cb, this); - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { proxy_error("Failed to send pipeline sync.\n"); resync_failed = true; return; @@ -1878,8 +1875,7 @@ void PgSQL_Connection::stmt_execute_start() { return; } } else { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return; @@ -1905,8 +1901,7 @@ void PgSQL_Connection::reset_session_start() { reset_session_in_pipeline = is_pipeline_active(); if (reset_session_in_pipeline) { - // FIXME: Switch to PQsendPipelineSync once libpq is updated to version 17 or higher - if (PQpipelineSync(pgsql_conn) == 0) { + if (PQsendPipelineSync(pgsql_conn) == 0) { set_error_from_PQerrorMessage(); proxy_error("Failed to send pipeline sync. %s\n", get_error_code_with_message().c_str()); return;