Moved dynamic fast-forward logic (used for COPY ... FROM STDIN) to execute after session variables are set on the backend connection

pull/4875/head
Rahim Kanji 1 year ago
parent 8f51df70b7
commit fa220398f6

@ -56,6 +56,12 @@ public:
pkt.ptr = NULL;
QueryPtr = NULL;
}
void reset() {
pkt.size = 0;
QuerySize = 0;
pkt.ptr = NULL;
QueryPtr = NULL;
}
};
enum pgsql_sslstatus { PGSQL_SSLSTATUS_OK, PGSQL_SSLSTATUS_WANT_IO, PGSQL_SSLSTATUS_FAIL };

@ -2557,13 +2557,6 @@ __get_pkts_from_client:
}
}
// Swtich to fast forward mode if the query matches copy ... stdin command
re2::StringPiece matched;
const char* query_to_match = (CurrentQuery.get_digest_text() ? CurrentQuery.get_digest_text() : (char*)CurrentQuery.QueryPointer);
if (copy_cmd_matcher->match(query_to_match, &matched)) {
switch_normal_to_fast_forward_mode(pkt, std::string(matched.data(), matched.size()), SESSION_FORWARD_TYPE_COPY_FROM_STDIN_STDOUT);
break;
}
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
@ -2588,8 +2581,7 @@ __get_pkts_from_client:
}
}
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed...\n");
mybe->server_myds->killed_at = 0;
mybe->server_myds->kill_type = 0;
mybe->server_myds->mysql_real_query.init(&pkt);
@ -3377,7 +3369,13 @@ handler_again:
}*/
}
}
// Swtich to fast forward mode if the query matches copy ... stdin command
re2::StringPiece matched;
const char* query_to_match = (CurrentQuery.get_digest_text() ? CurrentQuery.get_digest_text() : (char*)CurrentQuery.QueryPointer);
if (copy_cmd_matcher->match(query_to_match, &matched)) {
switch_normal_to_fast_forward_mode(pkt, std::string(matched.data(), matched.size()), SESSION_FORWARD_TYPE_COPY_FROM_STDIN_STDOUT);
break;
}
if (myconn->async_state_machine == ASYNC_IDLE) {
SetQueryTimeout();
}
@ -5966,59 +5964,45 @@ void PgSQL_Session::switch_normal_to_fast_forward_mode(PtrSize_t& pkt, std::stri
proxy_error("UNEXPECTED PACKET FROM CLIENT -- PLEASE REPORT A BUG\n");
assert(0);
}
client_myds->PSarrayIN->add(pkt.ptr, pkt.size);
// current_hostgroup should already be set to the correct hostgroup
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
mybe->server_myds->wait_until = 0;
if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) {
// NOTE: This section is entirely borrowed from 'STATE_SLEEP' for 'session_fast_forward'.
// Check comments there for extra information.
// =============================================================================
if (mybe->server_myds->max_connect_time == 0) {
uint64_t connect_timeout =
pgsql_thread___connect_timeout_server < pgsql_thread___connect_timeout_server_max ?
pgsql_thread___connect_timeout_server_max : pgsql_thread___connect_timeout_server;
mybe->server_myds->max_connect_time = thread->curtime + connect_timeout * 1000;
}
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
CurrentQuery.start_time = thread->curtime;
// =============================================================================
// we don't have a connection
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD
set_status(CONNECTING_SERVER); // now we need a connection
} else {
// In case of having a connection, we need to make user to reset the state machine
// for current server 'PgSQL_Data_Stream'
mybe->server_myds->DSS = STATE_READY;
// myds needs to have encrypted value set correctly
assert(mybe->server_myds->DSS != STATE_NOT_INITIALIZED);
// In case of having a connection, we need to make user to reset the state machine
// for current server 'PgSQL_Data_Stream'
mybe->server_myds->DSS = STATE_READY;
// myds needs to have encrypted value set correctly
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
assert(myconn != NULL);
// if backend connection uses SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from PGconn SSL structure.
if (myconn->is_connected() && myconn->get_pg_ssl_in_use()) {
SSL* ssl_obj = myconn->get_pg_ssl_object();
if (ssl_obj != NULL) {
myds->encrypted = true;
myds->ssl = ssl_obj;
myds->rbio_ssl = BIO_new(BIO_s_mem());
myds->wbio_ssl = BIO_new(BIO_s_mem());
SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl);
} else {
// it means that ProxySQL tried to use SSL to connect to the backend
// but the backend didn't support SSL
}
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
assert(myconn != NULL);
// if backend connection uses SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from PGconn SSL structure.
if (myconn->is_connected() && myconn->get_pg_ssl_in_use()) {
SSL* ssl_obj = myconn->get_pg_ssl_object();
if (ssl_obj != NULL) {
myds->encrypted = true;
myds->ssl = ssl_obj;
myds->rbio_ssl = BIO_new(BIO_s_mem());
myds->wbio_ssl = BIO_new(BIO_s_mem());
SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl);
} else {
// it means that ProxySQL tried to use SSL to connect to the backend
// but the backend didn't support SSL
}
set_status(FAST_FORWARD); // we can set status to FAST_FORWARD
}
set_status(FAST_FORWARD); // we can set status to FAST_FORWARD
//client_myds->PSarrayIN->add(pkt.ptr, pkt.size);
mybe->server_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
// as we are in FAST_FORWARD mode, we directly send the packet to the backend.
// need to reset mysql_real_query
mybe->server_myds->mysql_real_query.reset();
}
void PgSQL_Session::switch_fast_forward_to_normal_mode() {

Loading…
Cancel
Save