From 03c08d2e1f83836303d20e5566ab525e8dd07852 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Wed, 14 Aug 2024 12:46:48 +0500 Subject: [PATCH] Introduced a new function in libpq, PShandleRowData, to handle incoming messages from the PostgreSQL backend. This function specifically checks for messages containing row data (indicated by 'D') and processes them efficiently. By providing direct access to the internal libpq buffer, it eliminates the need to allocate new memory for each row of data. --- deps/Makefile | 4 +- include/PgSQL_Connection.h | 7 ++ include/PgSQL_Protocol.h | 2 + lib/PgSQL_Connection.cpp | 136 +++++++++++++++++++++++-------------- lib/PgSQL_Protocol.cpp | 44 +++++++++++- 5 files changed, 140 insertions(+), 53 deletions(-) diff --git a/deps/Makefile b/deps/Makefile index 0e5dc8f5c..5a64d0e0c 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -307,7 +307,9 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a : libssl/openssl/libssl.a cd postgresql && rm -rf postgresql-*/ || true cd postgresql && tar -zxf postgresql-*.tar.gz cd postgresql/postgresql && patch -p0 < ../get_result_from_pgconn.patch - cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline + cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch + #cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0" + cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0 #cd postgresql/postgresql && CC=${CC} CXX=${CXX} ${MAKE} -f src/interfaces/libpq/Makefile all diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index cb00ab2bf..2f49eb20c 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -259,6 +259,11 @@ public: char* param_value[PG_PARAM_SIZE]{}; }; +struct Pkt_result { + char type; + PtrSize_t pkt; +}; + class PgSQL_Variable { public: char *value = (char*)""; @@ -599,7 +604,9 @@ public: //PgSQL_Conn_Param conn_params; PgSQL_ErrorInfo error_info; PGconn* pgsql_conn; + uint8_t result_type; PGresult* pgsql_result; + PSresult ps_result; PgSQL_Query_Result* query_result; PgSQL_Query_Result* query_result_reuse; bool new_result; diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index 8580dea50..abe127fcb 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -198,6 +198,7 @@ public: void init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds, PgSQL_Connection* _conn); unsigned int add_row_description(const PGresult* result); unsigned int add_row(const PGresult* result); + unsigned int add_row(const PSresult* result); unsigned int add_command_completion(const PGresult* result); unsigned int add_error(const PGresult* result); unsigned int add_empty_query_response(const PGresult* result); @@ -261,6 +262,7 @@ public: unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); unsigned int copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); unsigned int copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status); + unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result); private: bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr); diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index cb2b61a55..460424e9b 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -1530,6 +1530,7 @@ bool PgSQL_Connection_Placeholder::get_gtid(char *buff, uint64_t *trx_id) { PgSQL_Connection::PgSQL_Connection() { pgsql_conn = NULL; + result_type = 0; pgsql_result = NULL; query_result = NULL; query_result_reuse = NULL; @@ -1738,8 +1739,7 @@ handler_again: if (query_result_reuse == NULL) { query_result = new PgSQL_Query_Result(); query_result->init(&myds->sess->client_myds->myprot, myds, this); - } - else { + } else { query_result = query_result_reuse; query_result_reuse = NULL; query_result->init(&myds->sess->client_myds->myprot, myds, this); @@ -1748,8 +1748,7 @@ handler_again: if (query_result_reuse == NULL) { query_result = new PgSQL_Query_Result(); query_result->init(NULL, myds, this); - } - else { + } else { query_result = query_result_reuse; query_result_reuse = NULL; query_result->init(NULL, myds, this); @@ -1779,20 +1778,20 @@ handler_again: break; } - //PGresult* result = get_result(); - std::unique_ptr result(get_result(), PQclear); + if (result_type == 1) { + std::unique_ptr result(get_result(), PQclear); - if (result) { + if (result) { - const ExecStatusType exec_status_type = PQresultStatus(result.get()); + const ExecStatusType exec_status_type = PQresultStatus(result.get()); - if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) { - next_multi_statement_result(result.release()); - next_event(ASYNC_USE_RESULT_START); - break; - } + if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) { + next_multi_statement_result(result.release()); + next_event(ASYNC_USE_RESULT_START); + break; + } - switch (exec_status_type) { + switch (exec_status_type) { case PGRES_COMMAND_OK: query_result->add_command_completion(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); @@ -1824,46 +1823,64 @@ handler_again: // handle internal cleanup of libpq that might return residual protocol messages from the broken connection and // may add multiple final packets. //if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { - set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL); - assert(is_error_present()); + set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL); + assert(is_error_present()); - // we will not send FATAL error messages to the client - const PGSQL_ERROR_SEVERITY severity = get_error_severity(); - if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR || - severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING || - severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) { + // we will not send FATAL error messages to the client + const PGSQL_ERROR_SEVERITY severity = get_error_severity(); + if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR || + severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING || + severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) { - query_result->add_error(result.get()); - } - - const PGSQL_ERROR_CATEGORY error_category = get_error_category(); - if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && - error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && - error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { - proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement); - } + query_result->add_error(result.get()); + } + + const PGSQL_ERROR_CATEGORY error_category = get_error_category(); + if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && + error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && + error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { + proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement); + } //} NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); - } - - if (new_result == true) { - query_result->add_row_description(result.get()); - new_result = false; - } + } - /*if (state == PGRES_COMMAND_OK || - state == PGRES_EMPTY_QUERY || - state == PGRES_TUPLES_OK) { - new_result = true; - }*/ + if (new_result == true) { + query_result->add_row_description(result.get()); + new_result = false; + } - if (PQntuples(result.get()) > 0) { - unsigned int br = query_result->add_row(result.get()); + if (PQntuples(result.get()) > 0) { + unsigned int br = query_result->add_row(result.get()); + __sync_fetch_and_add(&parent->bytes_recv, br); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; + myds->bytes_info.bytes_recv += br; + bytes_info.bytes_recv += br; + processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event + if ( + (processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8) + || + (pgsql_thread___throttle_ratio_server_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)pgsql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)pgsql_thread___throttle_ratio_server_to_client)) + ) { + next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause + break; + } else { + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping + } + } else { + query_result->add_command_completion(result.get()); + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } + } + } else if (result_type == 2) { + if (ps_result.id == 'D') { + unsigned int br = query_result->add_row(&ps_result); __sync_fetch_and_add(&parent->bytes_recv, br); myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; myds->bytes_info.bytes_recv += br; bytes_info.bytes_recv += br; processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event + if ( (processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8) || @@ -1875,10 +1892,11 @@ handler_again: NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping } } else { - query_result->add_command_completion(result.get()); - NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + assert(0); } - } + } else { + assert(0); + } if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { // if we reach here we assume that error_info is already set in previous call @@ -2133,10 +2151,17 @@ void PgSQL_Connection::fetch_result_cont(short event) { if (pgsql_result) return; - // we already have data available in buffer - if (PQisBusy(pgsql_conn) == 0) { - pgsql_result = PQgetResult(pgsql_conn); + int rc = PShandleRowData(pgsql_conn, &ps_result); + if (rc == 0) { + result_type = 2; return; + } else if (rc == 1) { + // we already have data available in buffer + if (PQisBusy(pgsql_conn) == 0) { + result_type = 1; + pgsql_result = PQgetResult(pgsql_conn); + return; + } } if (PQconsumeInput(pgsql_conn) == 0) { @@ -2153,11 +2178,20 @@ void PgSQL_Connection::fetch_result_cont(short event) { return; } - if (PQisBusy(pgsql_conn)) { + rc = PShandleRowData(pgsql_conn, &ps_result); + if (rc == 0) { + result_type = 2; + return; + } else if (rc == 1) { + if (PQisBusy(pgsql_conn)) { + async_exit_status = PG_EVENT_READ; + return; + } + } else { async_exit_status = PG_EVENT_READ; return; } - + result_type = 1; pgsql_result = PQgetResult(pgsql_conn); } diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index ecc9ac478..ad90165ba 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1711,6 +1711,41 @@ unsigned int PgSQL_Protocol::copy_ready_status_to_PgSQL_Query_Result(bool send, return size; } +unsigned int PgSQL_Protocol::copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result) { + assert(pg_query_result); + assert(result); + assert(result->len); + assert(result->data); + + bool alloced_new_buffer = false; + + const unsigned int size = result->len; + unsigned char* _ptr = pg_query_result->buffer_reserve_space(size); + + // buffer is not enough to store the new row. Remember we have already pushed data to PSarrayOUT + if (_ptr == NULL) { + _ptr = (unsigned char*)l_alloc(size); + alloced_new_buffer = true; + } + + memcpy(_ptr, result->data, size); + + if (send == true) { + // not supported + //(*myds)->PSarrayOUT->add((void*)_ptr, size); + } + + pg_query_result->resultset_size += size; + + if (alloced_new_buffer) { + // we created new buffer + //pg_query_result->buffer_to_PSarrayOut(); + pg_query_result->PSarrayOUT.add(_ptr, size); + } + pg_query_result->pkt_count++; + return size; +} + PgSQL_Query_Result::PgSQL_Query_Result() { buffer = NULL; transfer_started = false; @@ -1763,10 +1798,17 @@ unsigned int PgSQL_Query_Result::add_row_description(const PGresult* result) { } unsigned int PgSQL_Query_Result::add_row(const PGresult* result) { - //result_type |= PGSQL_QUERY_RESULT_TUPLE; + return proto->copy_row_to_PgSQL_Query_Result(false,this, result); } +unsigned int PgSQL_Query_Result::add_row(const PSresult* result) { + + const unsigned int res = proto->copy_buffer_to_PgSQL_Query_Result(false, this, result); + result_packet_type |= PGSQL_QUERY_RESULT_TUPLE; // temporary + return res; +} + unsigned int PgSQL_Query_Result::add_error(const PGresult* result) { unsigned int size = 0;