Introduced a new function in libpq, PShandleRowData, to handle incoming

messages from the PostgreSQL backend. This function specifically checks
for messages containing row data (indicated by 'D') and processes them
efficiently. By providing direct access to the internal libpq buffer, it
eliminates the need to allocate new memory for each row of data.
v2.x_pg_PrepStmtBase_240714
Rahim Kanji 2 years ago
parent c51a746f96
commit 03c08d2e1f

4
deps/Makefile vendored

@ -307,7 +307,9 @@ postgresql/postgresql/src/interfaces/libpq/libpq.a : libssl/openssl/libssl.a
cd postgresql && rm -rf postgresql-*/ || true
cd postgresql && tar -zxf postgresql-*.tar.gz
cd postgresql/postgresql && patch -p0 < ../get_result_from_pgconn.patch
cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline
cd postgresql/postgresql && patch -p0 < ../handle_row_data.patch
#cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline --enable-debug CFLAGS="-ggdb -O0 -fno-omit-frame-pointer" CPPFLAGS="-g -O0"
cd postgresql/postgresql && LD_LIBRARY_PATH="$(shell pwd)/libssl/openssl" ./configure --with-ssl=openssl --with-includes="$(shell pwd)/libssl/openssl/include/" --with-libraries="$(shell pwd)/libssl/openssl/" --without-readline
cd postgresql/postgresql/src/interfaces/libpq && CC=${CC} CXX=${CXX} ${MAKE} MAKELEVEL=0
#cd postgresql/postgresql && CC=${CC} CXX=${CXX} ${MAKE} -f src/interfaces/libpq/Makefile all

@ -259,6 +259,11 @@ public:
char* param_value[PG_PARAM_SIZE]{};
};
struct Pkt_result {
char type;
PtrSize_t pkt;
};
class PgSQL_Variable {
public:
char *value = (char*)"";
@ -599,7 +604,9 @@ public:
//PgSQL_Conn_Param conn_params;
PgSQL_ErrorInfo error_info;
PGconn* pgsql_conn;
uint8_t result_type;
PGresult* pgsql_result;
PSresult ps_result;
PgSQL_Query_Result* query_result;
PgSQL_Query_Result* query_result_reuse;
bool new_result;

@ -198,6 +198,7 @@ public:
void init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds, PgSQL_Connection* _conn);
unsigned int add_row_description(const PGresult* result);
unsigned int add_row(const PGresult* result);
unsigned int add_row(const PSresult* result);
unsigned int add_command_completion(const PGresult* result);
unsigned int add_error(const PGresult* result);
unsigned int add_empty_query_response(const PGresult* result);
@ -261,6 +262,7 @@ public:
unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
unsigned int copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
unsigned int copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status);
unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result);
private:
bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr);

@ -1530,6 +1530,7 @@ bool PgSQL_Connection_Placeholder::get_gtid(char *buff, uint64_t *trx_id) {
PgSQL_Connection::PgSQL_Connection() {
pgsql_conn = NULL;
result_type = 0;
pgsql_result = NULL;
query_result = NULL;
query_result_reuse = NULL;
@ -1738,8 +1739,7 @@ handler_again:
if (query_result_reuse == NULL) {
query_result = new PgSQL_Query_Result();
query_result->init(&myds->sess->client_myds->myprot, myds, this);
}
else {
} else {
query_result = query_result_reuse;
query_result_reuse = NULL;
query_result->init(&myds->sess->client_myds->myprot, myds, this);
@ -1748,8 +1748,7 @@ handler_again:
if (query_result_reuse == NULL) {
query_result = new PgSQL_Query_Result();
query_result->init(NULL, myds, this);
}
else {
} else {
query_result = query_result_reuse;
query_result_reuse = NULL;
query_result->init(NULL, myds, this);
@ -1779,20 +1778,20 @@ handler_again:
break;
}
//PGresult* result = get_result();
std::unique_ptr<PGresult, decltype(&PQclear)> result(get_result(), PQclear);
if (result_type == 1) {
std::unique_ptr<PGresult, decltype(&PQclear)> result(get_result(), PQclear);
if (result) {
if (result) {
const ExecStatusType exec_status_type = PQresultStatus(result.get());
const ExecStatusType exec_status_type = PQresultStatus(result.get());
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) {
next_multi_statement_result(result.release());
next_event(ASYNC_USE_RESULT_START);
break;
}
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR))) {
next_multi_statement_result(result.release());
next_event(ASYNC_USE_RESULT_START);
break;
}
switch (exec_status_type) {
switch (exec_status_type) {
case PGRES_COMMAND_OK:
query_result->add_command_completion(result.get());
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
@ -1824,46 +1823,64 @@ handler_again:
// handle internal cleanup of libpq that might return residual protocol messages from the broken connection and
// may add multiple final packets.
//if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL);
assert(is_error_present());
set_error_from_result(result.get(), PGSQL_ERROR_FIELD_ALL);
assert(is_error_present());
// we will not send FATAL error messages to the client
const PGSQL_ERROR_SEVERITY severity = get_error_severity();
if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR ||
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING ||
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) {
// we will not send FATAL error messages to the client
const PGSQL_ERROR_SEVERITY severity = get_error_severity();
if (severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_ERROR ||
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_WARNING ||
severity == PGSQL_ERROR_SEVERITY::ERRSEVERITY_NOTICE) {
query_result->add_error(result.get());
}
const PGSQL_ERROR_CATEGORY error_category = get_error_category();
if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) {
proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement);
}
query_result->add_error(result.get());
}
const PGSQL_ERROR_CATEGORY error_category = get_error_category();
if (error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_SYNTAX_ERROR &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_STATUS &&
error_category != PGSQL_ERROR_CATEGORY::ERRCATEGORY_DATA_ERROR) {
proxy_error("Error: %s, Multi-Statement: %d\n", get_error_code_with_message().c_str(), processing_multi_statement);
}
//}
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}
if (new_result == true) {
query_result->add_row_description(result.get());
new_result = false;
}
}
/*if (state == PGRES_COMMAND_OK ||
state == PGRES_EMPTY_QUERY ||
state == PGRES_TUPLES_OK) {
new_result = true;
}*/
if (new_result == true) {
query_result->add_row_description(result.get());
new_result = false;
}
if (PQntuples(result.get()) > 0) {
unsigned int br = query_result->add_row(result.get());
if (PQntuples(result.get()) > 0) {
unsigned int br = query_result->add_row(result.get());
__sync_fetch_and_add(&parent->bytes_recv, br);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br;
myds->bytes_info.bytes_recv += br;
bytes_info.bytes_recv += br;
processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event
if (
(processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8)
||
(pgsql_thread___throttle_ratio_server_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)pgsql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)pgsql_thread___throttle_ratio_server_to_client))
) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause
break;
} else {
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping
}
} else {
query_result->add_command_completion(result.get());
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
}
}
} else if (result_type == 2) {
if (ps_result.id == 'D') {
unsigned int br = query_result->add_row(&ps_result);
__sync_fetch_and_add(&parent->bytes_recv, br);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br;
myds->bytes_info.bytes_recv += br;
bytes_info.bytes_recv += br;
processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event
if (
(processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8)
||
@ -1875,10 +1892,11 @@ handler_again:
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping
}
} else {
query_result->add_command_completion(result.get());
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
assert(0);
}
}
} else {
assert(0);
}
if ((query_result->get_result_packet_type() & (PGSQL_QUERY_RESULT_COMMAND | PGSQL_QUERY_RESULT_EMPTY | PGSQL_QUERY_RESULT_ERROR)) == 0) {
// if we reach here we assume that error_info is already set in previous call
@ -2133,10 +2151,17 @@ void PgSQL_Connection::fetch_result_cont(short event) {
if (pgsql_result)
return;
// we already have data available in buffer
if (PQisBusy(pgsql_conn) == 0) {
pgsql_result = PQgetResult(pgsql_conn);
int rc = PShandleRowData(pgsql_conn, &ps_result);
if (rc == 0) {
result_type = 2;
return;
} else if (rc == 1) {
// we already have data available in buffer
if (PQisBusy(pgsql_conn) == 0) {
result_type = 1;
pgsql_result = PQgetResult(pgsql_conn);
return;
}
}
if (PQconsumeInput(pgsql_conn) == 0) {
@ -2153,11 +2178,20 @@ void PgSQL_Connection::fetch_result_cont(short event) {
return;
}
if (PQisBusy(pgsql_conn)) {
rc = PShandleRowData(pgsql_conn, &ps_result);
if (rc == 0) {
result_type = 2;
return;
} else if (rc == 1) {
if (PQisBusy(pgsql_conn)) {
async_exit_status = PG_EVENT_READ;
return;
}
} else {
async_exit_status = PG_EVENT_READ;
return;
}
result_type = 1;
pgsql_result = PQgetResult(pgsql_conn);
}

@ -1711,6 +1711,41 @@ unsigned int PgSQL_Protocol::copy_ready_status_to_PgSQL_Query_Result(bool send,
return size;
}
unsigned int PgSQL_Protocol::copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result) {
assert(pg_query_result);
assert(result);
assert(result->len);
assert(result->data);
bool alloced_new_buffer = false;
const unsigned int size = result->len;
unsigned char* _ptr = pg_query_result->buffer_reserve_space(size);
// buffer is not enough to store the new row. Remember we have already pushed data to PSarrayOUT
if (_ptr == NULL) {
_ptr = (unsigned char*)l_alloc(size);
alloced_new_buffer = true;
}
memcpy(_ptr, result->data, size);
if (send == true) {
// not supported
//(*myds)->PSarrayOUT->add((void*)_ptr, size);
}
pg_query_result->resultset_size += size;
if (alloced_new_buffer) {
// we created new buffer
//pg_query_result->buffer_to_PSarrayOut();
pg_query_result->PSarrayOUT.add(_ptr, size);
}
pg_query_result->pkt_count++;
return size;
}
PgSQL_Query_Result::PgSQL_Query_Result() {
buffer = NULL;
transfer_started = false;
@ -1763,10 +1798,17 @@ unsigned int PgSQL_Query_Result::add_row_description(const PGresult* result) {
}
unsigned int PgSQL_Query_Result::add_row(const PGresult* result) {
//result_type |= PGSQL_QUERY_RESULT_TUPLE;
return proto->copy_row_to_PgSQL_Query_Result(false,this, result);
}
unsigned int PgSQL_Query_Result::add_row(const PSresult* result) {
const unsigned int res = proto->copy_buffer_to_PgSQL_Query_Result(false, this, result);
result_packet_type |= PGSQL_QUERY_RESULT_TUPLE; // temporary
return res;
}
unsigned int PgSQL_Query_Result::add_error(const PGresult* result) {
unsigned int size = 0;

Loading…
Cancel
Save