Reset Session Implementation

* Added Reset Session support
* Removed dead code
v2.x_pg_PrepStmtBase_240714
Rahim Kanji 2 years ago
parent 0331ca7ff7
commit a22aeca394

@ -179,6 +179,8 @@ static const Param_Name_Validation* PgSQL_Param_Name_Accepted_Values[PG_PARAM_SI
#define PG_EVENT_NONE 0x00
#define PG_EVENT_READ 0x01
#define PG_EVENT_WRITE 0x02
#define PG_EVENT_EXCEPT 0x04
#define PG_EVENT_TIMEOUT 0x08
class PgSQL_Conn_Param {
private:
@ -507,10 +509,15 @@ public:
void query_cont(short event);
void fetch_result_start();
void fetch_result_cont(short event);
void reset_session_start();
void reset_session_cont(short event);
int async_connect(short event);
int async_set_autocommit(short event, bool ac);
int async_query(short event, char* stmt, unsigned long length, MYSQL_STMT** _stmt = NULL, stmt_execute_metadata_t* _stmt_meta = NULL);
int async_ping(short event);
int async_reset_session(short event);
void next_event(PG_ASYNC_ST new_st);
bool IsAutoCommit();
bool is_connected() const;
@ -599,6 +606,8 @@ public:
void reset_error() { reset_error_info(error_info, false); }
bool reset_session_in_txn = false;
PGresult* get_result();
void next_multi_statement_result(PGresult* result);
bool set_single_row_mode();

@ -118,6 +118,12 @@ enum ASYNC_ST { // MariaDB Async State Machine
ASYNC_CLOSE_START,
ASYNC_CLOSE_CONT,
ASYNC_CLOSE_END,
ASYNC_RESET_SESSION_START,
ASYNC_RESET_SESSION_CONT,
ASYNC_RESET_SESSION_END,
ASYNC_RESET_SESSION_SUCCESSFUL,
ASYNC_RESET_SESSION_FAILED,
ASYNC_RESET_SESSION_TIMEOUT,
ASYNC_IDLE
};

@ -3324,659 +3324,55 @@ handler_again:
// should be NULL
assert(!pgsql_result);
break;
/* case ASYNC_CHANGE_USER_START:
change_user_start();
case ASYNC_RESET_SESSION_START:
reset_session_start();
if (async_exit_status) {
next_event(ASYNC_CHANGE_USER_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END);
}
break;
case ASYNC_CHANGE_USER_CONT:
assert(myds->sess->status == CHANGING_USER_SERVER || myds->sess->status == RESETTING_CONNECTION);
change_user_cont(event);
if (async_exit_status) {
if (myds->sess->thread->curtime >= myds->wait_until) {
NEXT_IMMEDIATE(ASYNC_CHANGE_USER_TIMEOUT);
}
else {
next_event(ASYNC_CHANGE_USER_CONT);
}
}
else {
NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END);
}
break;
case ASYNC_CHANGE_USER_END:
if (ret_bool) {
NEXT_IMMEDIATE(ASYNC_CHANGE_USER_FAILED);
}
else {
NEXT_IMMEDIATE(ASYNC_CHANGE_USER_SUCCESSFUL);
}
break;
case ASYNC_CHANGE_USER_SUCCESSFUL:
pgsql->server_status = SERVER_STATUS_AUTOCOMMIT; // we reset this due to bug https://jira.mariadb.org/browse/CONC-332
break;
case ASYNC_CHANGE_USER_FAILED:
break;
case ASYNC_CHANGE_USER_TIMEOUT:
break;
case ASYNC_PING_START:
ping_start();
if (async_exit_status) {
next_event(ASYNC_PING_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_PING_END);
}
break;
case ASYNC_PING_CONT:
assert(myds->sess->status == PINGING_SERVER);
if (event) {
ping_cont(event);
}
if (async_exit_status) {
if (myds->sess->thread->curtime >= myds->wait_until) {
NEXT_IMMEDIATE(ASYNC_PING_TIMEOUT);
}
else {
next_event(ASYNC_PING_CONT);
}
}
else {
NEXT_IMMEDIATE(ASYNC_PING_END);
}
break;
case ASYNC_PING_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_PING_FAILED);
}
else {
NEXT_IMMEDIATE(ASYNC_PING_SUCCESSFUL);
}
break;
case ASYNC_PING_SUCCESSFUL:
break;
case ASYNC_PING_FAILED:
break;
case ASYNC_PING_TIMEOUT:
break;
case ASYNC_QUERY_START:
real_query_start();
__sync_fetch_and_add(&parent->queries_sent, 1);
__sync_fetch_and_add(&parent->bytes_sent, query.length);
statuses.questions++;
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.length;
myds->bytes_info.bytes_sent += query.length;
bytes_info.bytes_sent += query.length;
if (myds->sess->with_gtid == true) {
__sync_fetch_and_add(&parent->queries_gtid_sync, 1);
}
if (async_exit_status) {
next_event(ASYNC_QUERY_CONT);
}
else {
#ifdef PROXYSQL_USE_RESULT
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
#else
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
#endif
}
break;
case ASYNC_QUERY_CONT:
real_query_cont(event);
if (async_exit_status) {
next_event(ASYNC_QUERY_CONT);
}
else {
#ifdef PROXYSQL_USE_RESULT
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
#else
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
#endif
}
break;
case ASYNC_STMT_PREPARE_START:
stmt_prepare_start();
__sync_fetch_and_add(&parent->queries_sent, 1);
__sync_fetch_and_add(&parent->bytes_sent, query.length);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.length;
myds->bytes_info.bytes_sent += query.length;
bytes_info.bytes_sent += query.length;
if (async_exit_status) {
next_event(ASYNC_STMT_PREPARE_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END);
}
break;
case ASYNC_STMT_PREPARE_CONT:
stmt_prepare_cont(event);
if (async_exit_status) {
next_event(ASYNC_STMT_PREPARE_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END);
}
break;
case ASYNC_STMT_PREPARE_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_FAILED);
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_SUCCESSFUL);
}
break;
case ASYNC_STMT_PREPARE_SUCCESSFUL:
break;
case ASYNC_STMT_PREPARE_FAILED:
break;
case ASYNC_STMT_EXECUTE_START:
PROXY_TRACE2();
stmt_execute_start();
__sync_fetch_and_add(&parent->queries_sent, 1);
__sync_fetch_and_add(&parent->bytes_sent, query.stmt_meta->size);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.stmt_meta->size;
myds->bytes_info.bytes_sent += query.stmt_meta->size;
bytes_info.bytes_sent += query.stmt_meta->size;
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START);
}
break;
case ASYNC_STMT_EXECUTE_CONT:
PROXY_TRACE2();
stmt_execute_cont(event);
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START);
}
break;
case ASYNC_STMT_EXECUTE_STORE_RESULT_START:
PROXY_TRACE2();
if (mysql_stmt_errno(query.stmt)) {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
{
query.stmt_result = mysql_stmt_result_metadata(query.stmt);
if (query.stmt_result == NULL) {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
else {
update_warning_count_from_statement();
if (myds->sess->mirror == false) {
if (query_result_reuse == NULL) {
query_result = new MySQL_ResultSet();
query_result->init(&myds->sess->client_myds->myprot, query.stmt_result, pgsql, query.stmt);
}
else {
query_result = query_result_reuse;
query_result_reuse = NULL;
query_result->init(&myds->sess->client_myds->myprot, query.stmt_result, pgsql, query.stmt);
}
}
else {
}
//async_fetch_row_start=false;
}
}
stmt_execute_store_result_start();
if (async_exit_status) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
break;
case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT:
PROXY_TRACE2();
{ // this copied mostly from ASYNC_USE_RESULT_CONT
if (myds->sess && myds->sess->client_myds && myds->sess->mirror == false) {
unsigned int buffered_data = 0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 8) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause . See #1232
break;
}
}
}
stmt_execute_store_result_cont(event);
//if (async_fetch_row_start==false) {
// async_fetch_row_start=true;
//}
if (async_exit_status) {
// this copied mostly from ASYNC_USE_RESULT_CONT
MYSQL_ROWS* r = query.stmt->result.data;
long long unsigned int rows_read_inner = 0;
if (r) {
rows_read_inner++;
while (rows_read_inner < query.stmt->result.rows) {
// it is very important to check rows_read_inner FIRST
// because r->next could point to an invalid memory
rows_read_inner++;
r = r->next;
}
if (rows_read_inner > 1) {
process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(processed_bytes);
if (
(processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8)
||
(pgsql_thread___throttle_max_bytes_per_second_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)pgsql_thread___throttle_ratio_server_to_client))
) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we continue looping
}
}
}
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
}
break;
case ASYNC_STMT_EXECUTE_END:
PROXY_TRACE2();
{
if (query.stmt_result) {
unsigned long long total_size = 0;
MYSQL_ROWS* r = query.stmt->result.data;
if (r) {
total_size += r->length;
if (r->length > 0xFFFFFF) {
total_size += (r->length / 0xFFFFFF) * sizeof(mysql_hdr);
}
total_size += sizeof(mysql_hdr);
while (r->next) {
r = r->next;
total_size += r->length;
if (r->length > 0xFFFFFF) {
total_size += (r->length / 0xFFFFFF) * sizeof(mysql_hdr);
}
total_size += sizeof(mysql_hdr);
}
}
__sync_fetch_and_add(&parent->bytes_recv, total_size);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += total_size;
myds->bytes_info.bytes_recv += total_size;
bytes_info.bytes_recv += total_size;
next_event(ASYNC_RESET_SESSION_CONT);
} else {
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT);
}
update_warning_count_from_statement();
break;
// case ASYNC_STMT_EXECUTE_SUCCESSFUL:
// break;
// case ASYNC_STMT_EXECUTE_FAILED:
// break;
case ASYNC_NEXT_RESULT_START:
async_exit_status = mysql_next_result_start(&interr, pgsql);
if (async_exit_status) {
next_event(ASYNC_NEXT_RESULT_CONT);
}
else {
#ifdef PROXYSQL_USE_RESULT
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
#else
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
#endif
}
break;
case ASYNC_NEXT_RESULT_CONT:
if (event) {
async_exit_status = mysql_next_result_cont(&interr, pgsql, mysql_status(event, true));
}
if (async_exit_status) {
next_event(ASYNC_NEXT_RESULT_CONT);
}
else {
#ifdef PROXYSQL_USE_RESULT
NEXT_IMMEDIATE(ASYNC_USE_RESULT_START);
#else
NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START);
#endif
}
break;
case ASYNC_NEXT_RESULT_END:
break;
#ifndef PROXYSQL_USE_RESULT
case ASYNC_STORE_RESULT_START:
if (mysql_errno(pgsql)) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
store_result_start();
if (async_exit_status) {
next_event(ASYNC_STORE_RESULT_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
break;
case ASYNC_STORE_RESULT_CONT:
store_result_cont(event);
case ASYNC_RESET_SESSION_CONT:
{
reset_session_cont(event);
if (async_exit_status) {
next_event(ASYNC_STORE_RESULT_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
break;
#endif // PROXYSQL_USE_RESULT
case ASYNC_USE_RESULT_START:
if (mysql_errno(pgsql)) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
mysql_result = mysql_use_result(pgsql);
if (mysql_result == NULL) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
next_event(ASYNC_RESET_SESSION_CONT);
break;
}
else {
// since 'add_eof' utilizes 'warning_count,' we are setting the 'warning_count' here
// Note: There is a possibility of obtaining inaccurate warning_count and server_status at this point
// if the backend server has CLIENT_DEPRECATE_EOF enabled, and the client does not support CLIENT_DEPRECATE_EOF,
// especially when the query generates a warning. This information will be included in the intermediate EOF packet.
// Correct information becomes available only after fetching all rows,
// and the warning_count and status flag details are extracted from the final OK packet.
update_warning_count_from_connection();
if (myds->sess->mirror == false) {
if (query_result_reuse == NULL) {
query_result = new MySQL_ResultSet();
query_result->init(&myds->sess->client_myds->myprot, mysql_result, pgsql);
}
else {
query_result = query_result_reuse;
query_result_reuse = NULL;
query_result->init(&myds->sess->client_myds->myprot, mysql_result, pgsql);
}
}
else {
if (query_result_reuse == NULL) {
query_result = new MySQL_ResultSet();
query_result->init(NULL, mysql_result, pgsql);
}
else {
query_result = query_result_reuse;
query_result_reuse = NULL;
query_result->init(NULL, mysql_result, pgsql);
}
}
async_fetch_row_start = false;
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT);
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
break;
case ASYNC_USE_RESULT_CONT:
{
if (myds->sess && myds->sess->client_myds && myds->sess->mirror == false &&
myds->sess->status != SHOW_WARNINGS) { // see issue#4072
unsigned int buffered_data = 0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 8) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232
break;
PGresult* result = get_result();
if (result) {
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
set_error_from_result(result, PGSQL_ERROR_FIELD_ALL);
assert(is_error_present());
}
PQclear(result);
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT);
}
}
if (async_fetch_row_start == false) {
async_exit_status = mysql_fetch_row_start(&mysql_row, mysql_result);
async_fetch_row_start = true;
}
else {
async_exit_status = mysql_fetch_row_cont(&mysql_row, mysql_result, mysql_status(event, true));
}
if (async_exit_status) {
next_event(ASYNC_USE_RESULT_CONT);
}
else {
async_fetch_row_start = false;
if (mysql_row) {
if (myds && myds->sess && myds->sess->status == SHOW_WARNINGS) {
if (pgsql_thread___verbose_query_error) {
PgSQL_Data_Stream* client_myds = myds->sess->client_myds;
const char* username = "";
const char* schema = "";
const char* client_addr = "";
const char* digest_text = myds->sess->CurrentQuery.show_warnings_prev_query_digest.c_str();
if (client_myds) {
client_addr = client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown";
if (client_myds->myconn && client_myds->myconn->userinfo) {
username = client_myds->myconn->userinfo->username;
schema = client_myds->myconn->userinfo->schemaname;
}
}
proxy_warning(
"Warning during query on (%d,%s,%d,%lu). User '%s@%s', schema '%s', digest_text '%s', level '%s', code '%s', message '%s'\n",
parent->myhgc->hid, parent->address, parent->port, get_mysql_thread_id(), username, client_addr,
schema, digest_text, mysql_row[0], mysql_row[1], mysql_row[2]
);
}
else {
proxy_warning(
"Warning during query on (%d,%s,%d,%lu). Level '%s', code '%s', message '%s'\n",
parent->myhgc->hid, parent->address, parent->port, get_mysql_thread_id(), mysql_row[0], mysql_row[1],
mysql_row[2]
);
}
}
unsigned int br = query_result->add_row(mysql_row);
__sync_fetch_and_add(&parent->bytes_recv, br);
myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br;
myds->bytes_info.bytes_recv += br;
bytes_info.bytes_recv += br;
processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event
if (
(processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8)
||
(pgsql_thread___throttle_max_bytes_per_second_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)pgsql_thread___throttle_ratio_server_to_client))
) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause
}
else {
NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping
}
}
else {
if (pgsql) {
int _myerrno = mysql_errno(pgsql);
if (_myerrno) {
if (myds) {
//-- query_result->add_err(myds);
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
}
}
// since 'add_eof' utilizes 'warning_count,' we are setting the 'warning_count' here
update_warning_count_from_connection();
// we reach here if there was no error
// exclude warning_count from the OK/EOF packet for the SHOW WARNINGS statement
query_result->add_eof(query.length == 13 && strncasecmp(query.ptr, "SHOW WARNINGS", 13) == 0);
NEXT_IMMEDIATE(ASYNC_QUERY_END);
}
if (reset_session_in_txn) {
//assert(IsKnownActiveTransaction() == false);
reset_session_in_txn = false;
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END);
}
break;
case ASYNC_QUERY_END:
PROXY_TRACE2();
if (pgsql) {
int _myerrno = mysql_errno(pgsql);
if (_myerrno == 0) {
unknown_transaction_status = false;
update_warning_count_from_connection();
}
else {
compute_unknown_transaction_status();
}
if (_myerrno < 2000) {
// we can continue only if the error is coming from the backend.
// (or if zero)
// if the error comes from the client library, something terribly
// wrong happened and we cannot continue
if (pgsql->server_status & SERVER_MORE_RESULTS_EXIST) {
async_state_machine = ASYNC_NEXT_RESULT_START;
}
}
}
if (mysql_result) {
mysql_free_result(mysql_result);
mysql_result = NULL;
}
break;
case ASYNC_SET_AUTOCOMMIT_START:
set_autocommit_start();
if (async_exit_status) {
next_event(ASYNC_SET_AUTOCOMMIT_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END);
}
break;
case ASYNC_SET_AUTOCOMMIT_CONT:
set_autocommit_cont(event);
if (async_exit_status) {
next_event(ASYNC_SET_AUTOCOMMIT_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END);
}
break;
case ASYNC_SET_AUTOCOMMIT_END:
if (ret_bool) {
NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_FAILED);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_SUCCESSFUL);
}
break;
case ASYNC_SET_AUTOCOMMIT_SUCCESSFUL:
options.last_set_autocommit = (options.autocommit ? 1 : 0); // we successfully set autocommit
if ((pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) && options.autocommit == false) {
proxy_warning("It seems we are hitting bug http://bugs.pgsql.com/bug.php?id=66884\n");
}
break;
case ASYNC_SET_AUTOCOMMIT_FAILED:
//fprintf(stderr,"%s\n",mysql_error(pgsql));
proxy_error("Failed SET AUTOCOMMIT: %s\n", mysql_error(pgsql));
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql));
break;
case ASYNC_SET_NAMES_START:
set_names_start();
if (async_exit_status) {
next_event(ASYNC_SET_NAMES_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_END);
}
break;
case ASYNC_SET_NAMES_CONT:
set_names_cont(event);
if (async_exit_status) {
next_event(ASYNC_SET_NAMES_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_END);
}
break;
case ASYNC_SET_NAMES_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_FAILED);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_NAMES_SUCCESSFUL);
}
break;
case ASYNC_SET_NAMES_SUCCESSFUL:
break;
case ASYNC_SET_NAMES_FAILED:
//fprintf(stderr,"%s\n",mysql_error(pgsql));
proxy_error("Failed SET NAMES: %s\n", mysql_error(pgsql));
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql));
break;
case ASYNC_INITDB_START:
initdb_start();
if (async_exit_status) {
next_event(ASYNC_INITDB_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_INITDB_END);
}
break;
case ASYNC_INITDB_CONT:
initdb_cont(event);
if (async_exit_status) {
next_event(ASYNC_INITDB_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_INITDB_END);
}
break;
case ASYNC_INITDB_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_INITDB_FAILED);
}
else {
NEXT_IMMEDIATE(ASYNC_INITDB_SUCCESSFUL);
}
break;
case ASYNC_INITDB_SUCCESSFUL:
break;
case ASYNC_INITDB_FAILED:
proxy_error("Failed INITDB: %s\n", mysql_error(pgsql));
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql));
//fprintf(stderr,"%s\n",mysql_error(pgsql));
break;
case ASYNC_SET_OPTION_START:
set_option_start();
if (async_exit_status) {
next_event(ASYNC_SET_OPTION_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_OPTION_END);
}
break;
case ASYNC_SET_OPTION_CONT:
set_option_cont(event);
if (async_exit_status) {
next_event(ASYNC_SET_OPTION_CONT);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_OPTION_END);
}
break;
case ASYNC_SET_OPTION_END:
if (interr) {
NEXT_IMMEDIATE(ASYNC_SET_OPTION_FAILED);
}
else {
NEXT_IMMEDIATE(ASYNC_SET_OPTION_SUCCESSFUL);
case ASYNC_RESET_SESSION_END:
if (is_error_present()) {
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_FAILED);
}
NEXT_IMMEDIATE(ASYNC_RESET_SESSION_SUCCESSFUL);
break;
case ASYNC_SET_OPTION_SUCCESSFUL:
break;
case ASYNC_SET_OPTION_FAILED:
proxy_error("Error setting MYSQL_OPTION_MULTI_STATEMENTS : %s\n", mysql_error(pgsql));
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql));
case ASYNC_RESET_SESSION_FAILED:
case ASYNC_RESET_SESSION_SUCCESSFUL:
case ASYNC_RESET_SESSION_TIMEOUT:
break;
*/
default:
// not implemented yet
assert(0);
@ -4483,6 +3879,86 @@ int PgSQL_Connection::async_query(short event, char* stmt, unsigned long length,
return 1;
}
// Returns:
// 0 when the query is completed
// 1 when the query is not completed
// the calling function should check pgsql error in pgsql struct
int PgSQL_Connection::async_reset_session(short event) {
PROXY_TRACE();
PROXY_TRACE2();
assert(pgsql_conn);
server_status = parent->status; // we copy it here to avoid race condition. The caller will see this
if (IsServerOffline())
return -1;
/*if (myds) {
if (myds->DSS != STATE_MARIADB_QUERY) {
myds->DSS = STATE_MARIADB_QUERY;
}
}*/
switch (async_state_machine) {
case ASYNC_RESET_SESSION_SUCCESSFUL:
unknown_transaction_status = false;
async_state_machine = ASYNC_IDLE;
return 0;
break;
case ASYNC_RESET_SESSION_FAILED:
return -1;
break;
case ASYNC_RESET_SESSION_TIMEOUT:
return -2;
break;
case ASYNC_IDLE:
if (myds && myds->sess) {
if (myds->sess->active_transactions == 0) {
myds->sess->active_transactions = 1;
myds->sess->transaction_started_at = myds->sess->thread->curtime;
}
}
async_state_machine = ASYNC_RESET_SESSION_START;
default:
handler(event);
break;
}
switch (async_state_machine) {
case ASYNC_RESET_SESSION_SUCCESSFUL:
if (myds && myds->sess) {
if (myds->sess->active_transactions != 0) {
myds->sess->active_transactions = 0;
myds->sess->transaction_started_at = 0;
}
}
unknown_transaction_status = false;
async_state_machine = ASYNC_IDLE;
return 0;
break;
case ASYNC_RESET_SESSION_FAILED:
if (myds && myds->sess) {
if (myds->sess->active_transactions != 0) {
myds->sess->active_transactions = 0;
myds->sess->transaction_started_at = 0;
}
}
return -1;
break;
case ASYNC_RESET_SESSION_TIMEOUT:
if (myds && myds->sess) {
if (myds->sess->active_transactions != 0) {
myds->sess->active_transactions = 0;
myds->sess->transaction_started_at = 0;
}
}
return -2;
break;
default:
break;
}
return 1;
}
// Returns:
// 0 when the ping is completed successfully
// -1 when the ping is completed not successfully
@ -4625,3 +4101,75 @@ void PgSQL_Connection::next_multi_statement_result(PGresult* result) {
// copy buffer to PSarrayOut
query_result->buffer_to_PSarrayOut();
}
static int wait_for_pgsql(PGconn* pgsql_conn, int wait_event) {
struct pollfd pfd;
int timeout, res;
pfd.fd = PQsocket(pgsql_conn);
pfd.events =
(wait_event & PG_EVENT_READ ? POLLIN : 0) |
(wait_event & PG_EVENT_WRITE ? POLLOUT : 0) |
(wait_event & PG_EVENT_EXCEPT ? POLLPRI : 0);
timeout = 1;
res = poll(&pfd, 1, timeout);
if (res == 0)
return PG_EVENT_TIMEOUT | wait_event;
else if (res < 0)
return PG_EVENT_TIMEOUT;
else {
int status = 0;
if (pfd.revents & POLLIN) status |= PG_EVENT_READ;
if (pfd.revents & POLLOUT) status |= PG_EVENT_WRITE;
if (pfd.revents & POLLPRI) status |= PG_EVENT_EXCEPT;
return status;
}
}
void PgSQL_Connection::reset_session_start() {
PROXY_TRACE();
assert(pgsql_conn);
reset_error();
async_exit_status = PG_EVENT_NONE;
reset_session_in_txn = IsKnownActiveTransaction();
if (PQsendQuery(pgsql_conn, (reset_session_in_txn == false ? "DISCARD ALL" : "ROLLBACK")) == 0) {
// WARNING: DO NOT RELEASE this PGresult
const PGresult* result = PQgetResultFromPGconn(pgsql_conn);
set_error_from_result(result);
proxy_error("Failed to send query. %s\n", get_error_code_with_message().c_str());
return;
}
flush();
}
void PgSQL_Connection::reset_session_cont(short event) {
PROXY_TRACE();
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6, "event=%d\n", event);
reset_error();
async_exit_status = PG_EVENT_NONE;
if (event & POLLOUT) {
flush();
return;
}
if (PQconsumeInput(pgsql_conn) == 0) {
// WARNING: DO NOT RELEASE this PGresult
const PGresult* result = PQgetResultFromPGconn(pgsql_conn);
/* We will only set the error if the result is not NULL or we didn't capture error in last call. If the result is NULL,
* it indicates that an error was already captured during a previous PQconsumeInput call,
* and we do not want to overwrite that information.
*/
if (result || is_error_present() == false) {
set_error_from_result(result);
proxy_error("Failed to consume input. %s\n", get_error_code_with_message().c_str());
}
return;
}
if (PQisBusy(pgsql_conn)) {
async_exit_status = PG_EVENT_READ;
return;
}
pgsql_result = PQgetResult(pgsql_conn);
}

@ -1299,12 +1299,10 @@ void PgSQL_Data_Stream::return_MySQL_Connection_To_Pool() {
) {
if (pgsql_thread___reset_connection_algorithm == 2) {
sess->create_new_session_and_reset_connection(this);
}
else {
} else {
destroy_MySQL_Connection_From_Pool(true);
}
}
else {
} else {
detach_connection();
unplug_backend();
#ifdef STRESSTEST_POOL

@ -1473,44 +1473,36 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() {
// we recreate local_stmts : see issue #752
delete myconn->local_stmts;
myconn->local_stmts = new MySQL_STMTs_local_v14(false); // false by default, it is a backend
int rc = myconn->async_change_user(myds->revents);
int rc = myconn->async_reset_session(myds->revents);
if (rc == 0) {
__sync_fetch_and_add(&PgHGM->status.backend_change_user, 1);
//myds->myconn->userinfo->set(client_myds->myconn->userinfo);
//__sync_fetch_and_add(&PgHGM->status.backend_change_user, 1);
myds->myconn->reset();
myds->DSS = STATE_MARIADB_GENERIC;
myconn->async_state_machine = ASYNC_IDLE;
// if (pgsql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
myds->return_MySQL_Connection_To_Pool();
// } else {
// myds->destroy_MySQL_Connection_From_Pool(true);
// }
delete mybe->server_myds;
mybe->server_myds = NULL;
set_status(session_status___NONE);
return -1;
}
else {
} else {
if (rc == -1 || rc == -2) {
if (rc == -2) {
proxy_error("Change user timeout during COM_CHANGE_USER on %s , %d\n", myconn->parent->address, myconn->parent->port);
proxy_error("Resetting Connection timeout during Reset Session on %s , %d\n", myconn->parent->address, myconn->parent->port);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_CHANGE_USER_TIMEOUT);
}
else { // rc==-1
int myerr = mysql_errno(myconn->pgsql);
} else { // rc==-1
const bool error_present = myconn->is_error_present();
PgHGM->p_update_pgsql_error_counter(
p_pgsql_error_type::pgsql,
myconn->parent->myhgc->hid,
myconn->parent->address,
myconn->parent->port,
(myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV)
(error_present ? 9999 : ER_PROXYSQL_OFFLINE_SRV) // TOFIX: 9999 is a placeholder for the actual error code
);
if (myerr != 0) {
proxy_error("Detected an error during COM_CHANGE_USER on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->pgsql));
}
else {
if (error_present) {
proxy_error("Detected an error during Reset Session on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myconn->get_error_code_with_message().c_str());
} else {
proxy_error(
"Detected an error during COM_CHANGE_USER on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n",
"Detected an error during Reset Session on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n",
myconn->parent->myhgc->hid,
myconn->parent->address,
myconn->parent->port,
@ -1523,12 +1515,9 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() {
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
//delete mybe->server_myds;
//mybe->server_myds=NULL;
RequestEnd(myds); //fix bug #682
return -1;
}
else {
} else {
// rc==1 , nothing to do for now
if (myds->mypolls == NULL) {
thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, thread->curtime);
@ -7728,22 +7717,19 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon
uint64_t delay_us = delay_multiplex_us > auto_increment_delay_us ? delay_multiplex_us : auto_increment_delay_us;
myds->wait_until = thread->curtime + delay_us;
}
else {
} else {
myds->wait_until = thread->curtime + pgsql_thread___connection_delay_multiplex_ms * 1000;
}
myconn->async_state_machine = ASYNC_IDLE;
myconn->multiplex_delayed = true;
myds->DSS = STATE_MARIADB_GENERIC;
}
else if (prepared_stmt_with_no_params == true) { // see issue #1432
} else if (prepared_stmt_with_no_params == true) { // see issue #1432
myconn->async_state_machine = ASYNC_IDLE;
myds->DSS = STATE_MARIADB_GENERIC;
myds->wait_until = 0;
myconn->multiplex_delayed = false;
}
else {
} else {
myconn->multiplex_delayed = false;
myds->wait_until = 0;
myds->DSS = STATE_NOT_INITIALIZED;

Loading…
Cancel
Save