diff --git a/deps/Makefile b/deps/Makefile index 0e5dc8f5c..5a64d0e0c 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -307,7 +307,9 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a : libssl/openssl/libssl.a cd postgresql && rm -rf postgresql-*/ || true cd postgresql && tar -zxf postgresql-*.tar.gz cd postgresql/postgresql && patch -p0 < ../get_result_from_pgconn.patch - cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline + cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch + #cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0" + cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0 #cd postgresql/postgresql && CC=${CC} CXX=${CXX} ${MAKE} -f src/interfaces/libpq/Makefile all diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index cb00ab2bf..2f49eb20c 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -259,6 +259,11 @@ public: char* param_value[PG_PARAM_SIZE]{}; }; +struct Pkt_result { + char type; + PtrSize_t pkt; +}; + class PgSQL_Variable { public: char *value = (char*)""; @@ -599,7 +604,9 @@ public: //PgSQL_Conn_Param conn_params; PgSQL_ErrorInfo error_info; PGconn* pgsql_conn; + uint8_t result_type; PGresult* pgsql_result; + PSresult ps_result; PgSQL_Query_Result* query_result; PgSQL_Query_Result* query_result_reuse; bool new_result; diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index 8580dea50..abe127fcb 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -198,6 +198,7 @@ public: void init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds, PgSQL_Connection* _conn); unsigned int add_row_description(const PGresult* result); unsigned int add_row(const PGresult* result); + unsigned int add_row(const PSresult* result); unsigned int add_command_completion(const PGresult* result); unsigned int add_error(const PGresult* result); unsigned int add_empty_query_response(const PGresult* result); @@ -261,6 +262,7 @@ public: unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); unsigned int copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result); unsigned int copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status); + unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result); private: bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr); diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index cb2b61a55..460424e9b 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -1530,6 +1530,7 @@ bool PgSQL_Connection_Placeholder::get_gtid(char *buff, uint64_t *trx_id) { PgSQL_Connection::PgSQL_Connection() { pgsql_conn = NULL; + result_type = 0; pgsql_result = NULL; query_result = NULL; query_result_reuse = NULL; @@ -1738,8 +1739,7 @@ handler_again: if (query_result_reuse == NULL) { query_result = new PgSQL_Query_Result(); query_result->init(&myds->sess->client_myds->myprot, myds, this); - } - else { + } else { query_result = query_result_reuse; query_result_reuse = NULL; query_result->init(&myds->sess->client_myds->myprot, myds, this); @@ -1748,8 +1748,7 @@ handler_again: if (query_result_reuse == NULL) { query_result = new PgSQL_Query_Result(); query_result->init(NULL, myds, this); - } - else { + } else { query_result = query_result_reuse; query_result_reuse = NULL; query_result->init(NULL, myds, this); @@ -1779,20 +1778,20 @@ handler_again: break; } - //PGresult* result = get_result(); - std::unique_ptr result(get_result(), PQclear); + if (result_type == 1) { + std::unique_ptr result(get_result(), PQclear); - if (result) { + if (result) { - const ExecStatusType exec_status_type = PQresultStatus(result.get()); + const ExecStatusType exec_status_type = PQresultStatus(result.get()); - if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) { - next_multi_statement_result(result.release()); - next_event(ASYNC_USE_RESULT_START); - break; - } + if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) { + next_multi_statement_result(result.release()); + next_event(ASYNC_USE_RESULT_START); + break; + } - switch (exec_status_type) { + switch (exec_status_type) { case PGRES_COMMAND_OK: query_result->add_command_completion(result.get()); NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); @@ -1824,46 +1823,64 @@ handler_again: // handle internal cleanup of libpq that might return residual protocol messages from the broken connection and // may add multiple final packets. //if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { - set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL); - assert(is_error_present()); + set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL); + assert(is_error_present()); - // we will not send FATAL error messages to the client - const PGSQL_ERROR_SEVERITY severity = get_error_severity(); - if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR || - severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING || - severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) { + // we will not send FATAL error messages to the client + const PGSQL_ERROR_SEVERITY severity = get_error_severity(); + if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR || + severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING || + severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) { - query_result->add_error(result.get()); - } - - const PGSQL_ERROR_CATEGORY error_category = get_error_category(); - if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && - error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && - error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { - proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement); - } + query_result->add_error(result.get()); + } + + const PGSQL_ERROR_CATEGORY error_category = get_error_category(); + if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR && + error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS && + error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) { + proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement); + } //} NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); - } - - if (new_result == true) { - query_result->add_row_description(result.get()); - new_result = false; - } + } - /*if (state == PGRES_COMMAND_OK || - state == PGRES_EMPTY_QUERY || - state == PGRES_TUPLES_OK) { - new_result = true; - }*/ + if (new_result == true) { + query_result->add_row_description(result.get()); + new_result = false; + } - if (PQntuples(result.get()) > 0) { - unsigned int br = query_result->add_row(result.get()); + if (PQntuples(result.get()) > 0) { + unsigned int br = query_result->add_row(result.get()); + __sync_fetch_and_add(&parent->bytes_recv, br); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; + myds->bytes_info.bytes_recv += br; + bytes_info.bytes_recv += br; + processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event + if ( + (processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8) + || + (pgsql_thread___throttle_ratio_server_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)pgsql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)pgsql_thread___throttle_ratio_server_to_client)) + ) { + next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause + break; + } else { + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping + } + } else { + query_result->add_command_completion(result.get()); + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } + } + } else if (result_type == 2) { + if (ps_result.id == 'D') { + unsigned int br = query_result->add_row(&ps_result); __sync_fetch_and_add(&parent->bytes_recv, br); myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; myds->bytes_info.bytes_recv += br; bytes_info.bytes_recv += br; processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event + if ( (processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8) || @@ -1875,10 +1892,11 @@ handler_again: NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping } } else { - query_result->add_command_completion(result.get()); - NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + assert(0); } - } + } else { + assert(0); + } if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) { // if we reach here we assume that error_info is already set in previous call @@ -2133,10 +2151,17 @@ void PgSQL_Connection::fetch_result_cont(short event) { if (pgsql_result) return; - // we already have data available in buffer - if (PQisBusy(pgsql_conn) == 0) { - pgsql_result = PQgetResult(pgsql_conn); + int rc = PShandleRowData(pgsql_conn, &ps_result); + if (rc == 0) { + result_type = 2; return; + } else if (rc == 1) { + // we already have data available in buffer + if (PQisBusy(pgsql_conn) == 0) { + result_type = 1; + pgsql_result = PQgetResult(pgsql_conn); + return; + } } if (PQconsumeInput(pgsql_conn) == 0) { @@ -2153,11 +2178,20 @@ void PgSQL_Connection::fetch_result_cont(short event) { return; } - if (PQisBusy(pgsql_conn)) { + rc = PShandleRowData(pgsql_conn, &ps_result); + if (rc == 0) { + result_type = 2; + return; + } else if (rc == 1) { + if (PQisBusy(pgsql_conn)) { + async_exit_status = PG_EVENT_READ; + return; + } + } else { async_exit_status = PG_EVENT_READ; return; } - + result_type = 1; pgsql_result = PQgetResult(pgsql_conn); } diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index ecc9ac478..ad90165ba 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -1711,6 +1711,41 @@ unsigned int PgSQL_Protocol::copy_ready_status_to_PgSQL_Query_Result(bool send, return size; } +unsigned int PgSQL_Protocol::copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result) { + assert(pg_query_result); + assert(result); + assert(result->len); + assert(result->data); + + bool alloced_new_buffer = false; + + const unsigned int size = result->len; + unsigned char* _ptr = pg_query_result->buffer_reserve_space(size); + + // buffer is not enough to store the new row. Remember we have already pushed data to PSarrayOUT + if (_ptr == NULL) { + _ptr = (unsigned char*)l_alloc(size); + alloced_new_buffer = true; + } + + memcpy(_ptr, result->data, size); + + if (send == true) { + // not supported + //(*myds)->PSarrayOUT->add((void*)_ptr, size); + } + + pg_query_result->resultset_size += size; + + if (alloced_new_buffer) { + // we created new buffer + //pg_query_result->buffer_to_PSarrayOut(); + pg_query_result->PSarrayOUT.add(_ptr, size); + } + pg_query_result->pkt_count++; + return size; +} + PgSQL_Query_Result::PgSQL_Query_Result() { buffer = NULL; transfer_started = false; @@ -1763,10 +1798,17 @@ unsigned int PgSQL_Query_Result::add_row_description(const PGresult* result) { } unsigned int PgSQL_Query_Result::add_row(const PGresult* result) { - //result_type |= PGSQL_QUERY_RESULT_TUPLE; + return proto->copy_row_to_PgSQL_Query_Result(false,this, result); } +unsigned int PgSQL_Query_Result::add_row(const PSresult* result) { + + const unsigned int res = proto->copy_buffer_to_PgSQL_Query_Result(false, this, result); + result_packet_type |= PGSQL_QUERY_RESULT_TUPLE; // temporary + return res; +} + unsigned int PgSQL_Query_Result::add_error(const PGresult* result) { unsigned int size = 0;