From a22aeca39486bf9a62f7027ce5a04887f98a5441 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Sun, 21 Jul 2024 11:26:12 +0500 Subject: [PATCH] Reset Session Implementation * Added Reset Session support * Removed dead code --- include/PgSQL_Connection.h | 9 + include/proxysql_structs.h | 6 + lib/PgSQL_Connection.cpp | 824 +++++++++---------------------------- lib/PgSQL_Data_Stream.cpp | 6 +- lib/PgSQL_Session.cpp | 44 +- 5 files changed, 218 insertions(+), 671 deletions(-) diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index faa61bc90..191e9a40e 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -179,6 +179,8 @@ static const Param_Name_Validation* PgSQL_Param_Name_Accepted_Values[PG_PARAM_SI #define PG_EVENT_NONE 0x00 #define PG_EVENT_READ 0x01 #define PG_EVENT_WRITE 0x02 +#define PG_EVENT_EXCEPT 0x04 +#define PG_EVENT_TIMEOUT 0x08 class PgSQL_Conn_Param { private: @@ -507,10 +509,15 @@ public: void query_cont(short event); void fetch_result_start(); void fetch_result_cont(short event); + void reset_session_start(); + void reset_session_cont(short event); + int async_connect(short event); int async_set_autocommit(short event, bool ac); int async_query(short event, char* stmt, unsigned long length, MYSQL_STMT** _stmt = NULL, stmt_execute_metadata_t* _stmt_meta = NULL); int async_ping(short event); + int async_reset_session(short event); + void next_event(PG_ASYNC_ST new_st); bool IsAutoCommit(); bool is_connected() const; @@ -599,6 +606,8 @@ public: void reset_error() { reset_error_info(error_info, false); } + bool reset_session_in_txn = false; + PGresult* get_result(); void next_multi_statement_result(PGresult* result); bool set_single_row_mode(); diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index d021a4ea3..e34837234 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -118,6 +118,12 @@ enum ASYNC_ST { // MariaDB Async State Machine ASYNC_CLOSE_START, ASYNC_CLOSE_CONT, ASYNC_CLOSE_END, + ASYNC_RESET_SESSION_START, + ASYNC_RESET_SESSION_CONT, + ASYNC_RESET_SESSION_END, + ASYNC_RESET_SESSION_SUCCESSFUL, + ASYNC_RESET_SESSION_FAILED, + ASYNC_RESET_SESSION_TIMEOUT, ASYNC_IDLE }; diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index f83b6595c..cc8bc429b 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -3324,659 +3324,55 @@ handler_again: // should be NULL assert(!pgsql_result); break; -/* case ASYNC_CHANGE_USER_START: - change_user_start(); + case ASYNC_RESET_SESSION_START: + reset_session_start(); if (async_exit_status) { - next_event(ASYNC_CHANGE_USER_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END); - } - break; - case ASYNC_CHANGE_USER_CONT: - assert(myds->sess->status == CHANGING_USER_SERVER || myds->sess->status == RESETTING_CONNECTION); - change_user_cont(event); - if (async_exit_status) { - if (myds->sess->thread->curtime >= myds->wait_until) { - NEXT_IMMEDIATE(ASYNC_CHANGE_USER_TIMEOUT); - } - else { - next_event(ASYNC_CHANGE_USER_CONT); - } - } - else { - NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END); - } - break; - case ASYNC_CHANGE_USER_END: - if (ret_bool) { - NEXT_IMMEDIATE(ASYNC_CHANGE_USER_FAILED); - } - else { - NEXT_IMMEDIATE(ASYNC_CHANGE_USER_SUCCESSFUL); - } - break; - case ASYNC_CHANGE_USER_SUCCESSFUL: - pgsql->server_status = SERVER_STATUS_AUTOCOMMIT; // we reset this due to bug https://jira.mariadb.org/browse/CONC-332 - break; - case ASYNC_CHANGE_USER_FAILED: - break; - case ASYNC_CHANGE_USER_TIMEOUT: - break; - case ASYNC_PING_START: - ping_start(); - if (async_exit_status) { - next_event(ASYNC_PING_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_PING_END); - } - break; - case ASYNC_PING_CONT: - assert(myds->sess->status == PINGING_SERVER); - if (event) { - ping_cont(event); - } - if (async_exit_status) { - if (myds->sess->thread->curtime >= myds->wait_until) { - NEXT_IMMEDIATE(ASYNC_PING_TIMEOUT); - } - else { - next_event(ASYNC_PING_CONT); - } - } - else { - NEXT_IMMEDIATE(ASYNC_PING_END); - } - break; - case ASYNC_PING_END: - if (interr) { - NEXT_IMMEDIATE(ASYNC_PING_FAILED); - } - else { - NEXT_IMMEDIATE(ASYNC_PING_SUCCESSFUL); - } - break; - case ASYNC_PING_SUCCESSFUL: - break; - case ASYNC_PING_FAILED: - break; - case ASYNC_PING_TIMEOUT: - break; - case ASYNC_QUERY_START: - real_query_start(); - __sync_fetch_and_add(&parent->queries_sent, 1); - __sync_fetch_and_add(&parent->bytes_sent, query.length); - statuses.questions++; - myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.length; - myds->bytes_info.bytes_sent += query.length; - bytes_info.bytes_sent += query.length; - if (myds->sess->with_gtid == true) { - __sync_fetch_and_add(&parent->queries_gtid_sync, 1); - } - if (async_exit_status) { - next_event(ASYNC_QUERY_CONT); - } - else { -#ifdef PROXYSQL_USE_RESULT - NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); -#else - NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); -#endif - } - break; - case ASYNC_QUERY_CONT: - real_query_cont(event); - if (async_exit_status) { - next_event(ASYNC_QUERY_CONT); - } - else { -#ifdef PROXYSQL_USE_RESULT - NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); -#else - NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); -#endif - } - break; - - case ASYNC_STMT_PREPARE_START: - stmt_prepare_start(); - __sync_fetch_and_add(&parent->queries_sent, 1); - __sync_fetch_and_add(&parent->bytes_sent, query.length); - myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.length; - myds->bytes_info.bytes_sent += query.length; - bytes_info.bytes_sent += query.length; - if (async_exit_status) { - next_event(ASYNC_STMT_PREPARE_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END); - } - break; - case ASYNC_STMT_PREPARE_CONT: - stmt_prepare_cont(event); - if (async_exit_status) { - next_event(ASYNC_STMT_PREPARE_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END); - } - break; - - case ASYNC_STMT_PREPARE_END: - if (interr) { - NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_FAILED); - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_SUCCESSFUL); - } - break; - case ASYNC_STMT_PREPARE_SUCCESSFUL: - break; - case ASYNC_STMT_PREPARE_FAILED: - break; - - case ASYNC_STMT_EXECUTE_START: - PROXY_TRACE2(); - stmt_execute_start(); - __sync_fetch_and_add(&parent->queries_sent, 1); - __sync_fetch_and_add(&parent->bytes_sent, query.stmt_meta->size); - myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.stmt_meta->size; - myds->bytes_info.bytes_sent += query.stmt_meta->size; - bytes_info.bytes_sent += query.stmt_meta->size; - if (async_exit_status) { - next_event(ASYNC_STMT_EXECUTE_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START); - } - break; - case ASYNC_STMT_EXECUTE_CONT: - PROXY_TRACE2(); - stmt_execute_cont(event); - if (async_exit_status) { - next_event(ASYNC_STMT_EXECUTE_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START); - } - break; - - case ASYNC_STMT_EXECUTE_STORE_RESULT_START: - PROXY_TRACE2(); - if (mysql_stmt_errno(query.stmt)) { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); - } - { - query.stmt_result = mysql_stmt_result_metadata(query.stmt); - if (query.stmt_result == NULL) { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); - } - else { - update_warning_count_from_statement(); - if (myds->sess->mirror == false) { - if (query_result_reuse == NULL) { - query_result = new MySQL_ResultSet(); - query_result->init(&myds->sess->client_myds->myprot, query.stmt_result, pgsql, query.stmt); - } - else { - query_result = query_result_reuse; - query_result_reuse = NULL; - query_result->init(&myds->sess->client_myds->myprot, query.stmt_result, pgsql, query.stmt); - } - } - else { - - } - //async_fetch_row_start=false; - } - } - stmt_execute_store_result_start(); - if (async_exit_status) { - next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); - } - break; - case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT: - PROXY_TRACE2(); - { // this copied mostly from ASYNC_USE_RESULT_CONT - if (myds->sess && myds->sess->client_myds && myds->sess->mirror == false) { - unsigned int buffered_data = 0; - buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; - buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; - if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 8) { - next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause . See #1232 - break; - } - } - } - stmt_execute_store_result_cont(event); - //if (async_fetch_row_start==false) { - // async_fetch_row_start=true; - //} - if (async_exit_status) { - // this copied mostly from ASYNC_USE_RESULT_CONT - MYSQL_ROWS* r = query.stmt->result.data; - long long unsigned int rows_read_inner = 0; - - if (r) { - rows_read_inner++; - while (rows_read_inner < query.stmt->result.rows) { - // it is very important to check rows_read_inner FIRST - // because r->next could point to an invalid memory - rows_read_inner++; - r = r->next; - } - if (rows_read_inner > 1) { - process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(processed_bytes); - if ( - (processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8) - || - (pgsql_thread___throttle_max_bytes_per_second_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)pgsql_thread___throttle_ratio_server_to_client)) - ) { - next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we continue looping - } - } - } - next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); - } - break; - case ASYNC_STMT_EXECUTE_END: - PROXY_TRACE2(); - { - if (query.stmt_result) { - unsigned long long total_size = 0; - MYSQL_ROWS* r = query.stmt->result.data; - if (r) { - total_size += r->length; - if (r->length > 0xFFFFFF) { - total_size += (r->length / 0xFFFFFF) * sizeof(mysql_hdr); - } - total_size += sizeof(mysql_hdr); - while (r->next) { - r = r->next; - total_size += r->length; - if (r->length > 0xFFFFFF) { - total_size += (r->length / 0xFFFFFF) * sizeof(mysql_hdr); - } - total_size += sizeof(mysql_hdr); - } - } - __sync_fetch_and_add(&parent->bytes_recv, total_size); - myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += total_size; - myds->bytes_info.bytes_recv += total_size; - bytes_info.bytes_recv += total_size; + next_event(ASYNC_RESET_SESSION_CONT); + } else { + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); } + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT); } - - update_warning_count_from_statement(); break; - // case ASYNC_STMT_EXECUTE_SUCCESSFUL: - // break; - // case ASYNC_STMT_EXECUTE_FAILED: - // break; - - case ASYNC_NEXT_RESULT_START: - async_exit_status = mysql_next_result_start(&interr, pgsql); - if (async_exit_status) { - next_event(ASYNC_NEXT_RESULT_CONT); - } - else { -#ifdef PROXYSQL_USE_RESULT - NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); -#else - NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); -#endif - } - break; - - case ASYNC_NEXT_RESULT_CONT: - if (event) { - async_exit_status = mysql_next_result_cont(&interr, pgsql, mysql_status(event, true)); - } - if (async_exit_status) { - next_event(ASYNC_NEXT_RESULT_CONT); - } - else { -#ifdef PROXYSQL_USE_RESULT - NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); -#else - NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); -#endif - } - break; - - case ASYNC_NEXT_RESULT_END: - break; -#ifndef PROXYSQL_USE_RESULT - case ASYNC_STORE_RESULT_START: - if (mysql_errno(pgsql)) { - NEXT_IMMEDIATE(ASYNC_QUERY_END); - } - store_result_start(); - if (async_exit_status) { - next_event(ASYNC_STORE_RESULT_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_QUERY_END); - } - break; - case ASYNC_STORE_RESULT_CONT: - store_result_cont(event); + case ASYNC_RESET_SESSION_CONT: + { + reset_session_cont(event); if (async_exit_status) { - next_event(ASYNC_STORE_RESULT_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_QUERY_END); - } - break; -#endif // PROXYSQL_USE_RESULT - case ASYNC_USE_RESULT_START: - if (mysql_errno(pgsql)) { - NEXT_IMMEDIATE(ASYNC_QUERY_END); - } - mysql_result = mysql_use_result(pgsql); - if (mysql_result == NULL) { - NEXT_IMMEDIATE(ASYNC_QUERY_END); + next_event(ASYNC_RESET_SESSION_CONT); + break; } - else { - // since 'add_eof' utilizes 'warning_count,' we are setting the 'warning_count' here - - // Note: There is a possibility of obtaining inaccurate warning_count and server_status at this point - // if the backend server has CLIENT_DEPRECATE_EOF enabled, and the client does not support CLIENT_DEPRECATE_EOF, - // especially when the query generates a warning. This information will be included in the intermediate EOF packet. - // Correct information becomes available only after fetching all rows, - // and the warning_count and status flag details are extracted from the final OK packet. - update_warning_count_from_connection(); - if (myds->sess->mirror == false) { - if (query_result_reuse == NULL) { - query_result = new MySQL_ResultSet(); - query_result->init(&myds->sess->client_myds->myprot, mysql_result, pgsql); - } - else { - query_result = query_result_reuse; - query_result_reuse = NULL; - query_result->init(&myds->sess->client_myds->myprot, mysql_result, pgsql); - } - } - else { - if (query_result_reuse == NULL) { - query_result = new MySQL_ResultSet(); - query_result->init(NULL, mysql_result, pgsql); - } - else { - query_result = query_result_reuse; - query_result_reuse = NULL; - query_result->init(NULL, mysql_result, pgsql); - } - } - async_fetch_row_start = false; - NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); } - break; - case ASYNC_USE_RESULT_CONT: - { - if (myds->sess && myds->sess->client_myds && myds->sess->mirror == false && - myds->sess->status != SHOW_WARNINGS) { // see issue#4072 - unsigned int buffered_data = 0; - buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; - buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; - if (buffered_data > (unsigned int)pgsql_thread___threshold_resultset_size * 8) { - next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232 - break; + PGresult* result = get_result(); + if (result) { + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + set_error_from_result(result, PGSQL_ERROR_FIELD_ALL); + assert(is_error_present()); } + PQclear(result); + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_CONT); } - } - if (async_fetch_row_start == false) { - async_exit_status = mysql_fetch_row_start(&mysql_row, mysql_result); - async_fetch_row_start = true; - } - else { - async_exit_status = mysql_fetch_row_cont(&mysql_row, mysql_result, mysql_status(event, true)); - } - if (async_exit_status) { - next_event(ASYNC_USE_RESULT_CONT); - } - else { - async_fetch_row_start = false; - if (mysql_row) { - if (myds && myds->sess && myds->sess->status == SHOW_WARNINGS) { - if (pgsql_thread___verbose_query_error) { - PgSQL_Data_Stream* client_myds = myds->sess->client_myds; - const char* username = ""; - const char* schema = ""; - const char* client_addr = ""; - const char* digest_text = myds->sess->CurrentQuery.show_warnings_prev_query_digest.c_str(); - - if (client_myds) { - client_addr = client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"; - - if (client_myds->myconn && client_myds->myconn->userinfo) { - username = client_myds->myconn->userinfo->username; - schema = client_myds->myconn->userinfo->schemaname; - } - } - proxy_warning( - "Warning during query on (%d,%s,%d,%lu). User '%s@%s', schema '%s', digest_text '%s', level '%s', code '%s', message '%s'\n", - parent->myhgc->hid, parent->address, parent->port, get_mysql_thread_id(), username, client_addr, - schema, digest_text, mysql_row[0], mysql_row[1], mysql_row[2] - ); - } - else { - proxy_warning( - "Warning during query on (%d,%s,%d,%lu). Level '%s', code '%s', message '%s'\n", - parent->myhgc->hid, parent->address, parent->port, get_mysql_thread_id(), mysql_row[0], mysql_row[1], - mysql_row[2] - ); - } - } - unsigned int br = query_result->add_row(mysql_row); - __sync_fetch_and_add(&parent->bytes_recv, br); - myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; - myds->bytes_info.bytes_recv += br; - bytes_info.bytes_recv += br; - processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event - if ( - (processed_bytes > (unsigned int)pgsql_thread___threshold_resultset_size * 8) - || - (pgsql_thread___throttle_max_bytes_per_second_to_client && pgsql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)pgsql_thread___throttle_ratio_server_to_client)) - ) { - next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause - } - else { - NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping - } - } - else { - if (pgsql) { - int _myerrno = mysql_errno(pgsql); - if (_myerrno) { - if (myds) { - //-- query_result->add_err(myds); - NEXT_IMMEDIATE(ASYNC_QUERY_END); - } - } - } - // since 'add_eof' utilizes 'warning_count,' we are setting the 'warning_count' here - update_warning_count_from_connection(); - // we reach here if there was no error - // exclude warning_count from the OK/EOF packet for the ‘SHOW WARNINGS’ statement - query_result->add_eof(query.length == 13 && strncasecmp(query.ptr, "SHOW WARNINGS", 13) == 0); - NEXT_IMMEDIATE(ASYNC_QUERY_END); - } + if (reset_session_in_txn) { + //assert(IsKnownActiveTransaction() == false); + reset_session_in_txn = false; + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_START); + } + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_END); } - break; - case ASYNC_QUERY_END: - PROXY_TRACE2(); - if (pgsql) { - int _myerrno = mysql_errno(pgsql); - if (_myerrno == 0) { - unknown_transaction_status = false; - update_warning_count_from_connection(); - } - else { - compute_unknown_transaction_status(); - } - if (_myerrno < 2000) { - // we can continue only if the error is coming from the backend. - // (or if zero) - // if the error comes from the client library, something terribly - // wrong happened and we cannot continue - if (pgsql->server_status & SERVER_MORE_RESULTS_EXIST) { - async_state_machine = ASYNC_NEXT_RESULT_START; - } - } - } - if (mysql_result) { - mysql_free_result(mysql_result); - mysql_result = NULL; - } - break; - case ASYNC_SET_AUTOCOMMIT_START: - set_autocommit_start(); - if (async_exit_status) { - next_event(ASYNC_SET_AUTOCOMMIT_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END); - } break; - case ASYNC_SET_AUTOCOMMIT_CONT: - set_autocommit_cont(event); - if (async_exit_status) { - next_event(ASYNC_SET_AUTOCOMMIT_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END); - } - break; - case ASYNC_SET_AUTOCOMMIT_END: - if (ret_bool) { - NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_FAILED); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_SUCCESSFUL); - } - break; - case ASYNC_SET_AUTOCOMMIT_SUCCESSFUL: - options.last_set_autocommit = (options.autocommit ? 1 : 0); // we successfully set autocommit - if ((pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) && options.autocommit == false) { - proxy_warning("It seems we are hitting bug http://bugs.pgsql.com/bug.php?id=66884\n"); - } - break; - case ASYNC_SET_AUTOCOMMIT_FAILED: - //fprintf(stderr,"%s\n",mysql_error(pgsql)); - proxy_error("Failed SET AUTOCOMMIT: %s\n", mysql_error(pgsql)); - PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); - break; - case ASYNC_SET_NAMES_START: - set_names_start(); - if (async_exit_status) { - next_event(ASYNC_SET_NAMES_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_NAMES_END); - } - break; - case ASYNC_SET_NAMES_CONT: - set_names_cont(event); - if (async_exit_status) { - next_event(ASYNC_SET_NAMES_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_NAMES_END); - } - break; - case ASYNC_SET_NAMES_END: - if (interr) { - NEXT_IMMEDIATE(ASYNC_SET_NAMES_FAILED); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_NAMES_SUCCESSFUL); - } - break; - case ASYNC_SET_NAMES_SUCCESSFUL: - break; - case ASYNC_SET_NAMES_FAILED: - //fprintf(stderr,"%s\n",mysql_error(pgsql)); - proxy_error("Failed SET NAMES: %s\n", mysql_error(pgsql)); - PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); - break; - case ASYNC_INITDB_START: - initdb_start(); - if (async_exit_status) { - next_event(ASYNC_INITDB_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_INITDB_END); - } - break; - case ASYNC_INITDB_CONT: - initdb_cont(event); - if (async_exit_status) { - next_event(ASYNC_INITDB_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_INITDB_END); - } - break; - case ASYNC_INITDB_END: - if (interr) { - NEXT_IMMEDIATE(ASYNC_INITDB_FAILED); - } - else { - NEXT_IMMEDIATE(ASYNC_INITDB_SUCCESSFUL); - } - break; - case ASYNC_INITDB_SUCCESSFUL: - break; - case ASYNC_INITDB_FAILED: - proxy_error("Failed INITDB: %s\n", mysql_error(pgsql)); - PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); - //fprintf(stderr,"%s\n",mysql_error(pgsql)); - break; - case ASYNC_SET_OPTION_START: - set_option_start(); - if (async_exit_status) { - next_event(ASYNC_SET_OPTION_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_OPTION_END); - } - break; - case ASYNC_SET_OPTION_CONT: - set_option_cont(event); - if (async_exit_status) { - next_event(ASYNC_SET_OPTION_CONT); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_OPTION_END); - } - break; - case ASYNC_SET_OPTION_END: - if (interr) { - NEXT_IMMEDIATE(ASYNC_SET_OPTION_FAILED); - } - else { - NEXT_IMMEDIATE(ASYNC_SET_OPTION_SUCCESSFUL); + case ASYNC_RESET_SESSION_END: + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_FAILED); } + NEXT_IMMEDIATE(ASYNC_RESET_SESSION_SUCCESSFUL); break; - case ASYNC_SET_OPTION_SUCCESSFUL: - break; - case ASYNC_SET_OPTION_FAILED: - proxy_error("Error setting MYSQL_OPTION_MULTI_STATEMENTS : %s\n", mysql_error(pgsql)); - PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + case ASYNC_RESET_SESSION_FAILED: + case ASYNC_RESET_SESSION_SUCCESSFUL: + case ASYNC_RESET_SESSION_TIMEOUT: break; -*/ default: // not implemented yet assert(0); @@ -4483,6 +3879,86 @@ int PgSQL_Connection::async_query(short event, char* stmt, unsigned long length, return 1; } +// Returns: +// 0 when the query is completed +// 1 when the query is not completed +// the calling function should check pgsql error in pgsql struct +int PgSQL_Connection::async_reset_session(short event) { + PROXY_TRACE(); + PROXY_TRACE2(); + assert(pgsql_conn); + + server_status = parent->status; // we copy it here to avoid race condition. The caller will see this + if (IsServerOffline()) + return -1; + + /*if (myds) { + if (myds->DSS != STATE_MARIADB_QUERY) { + myds->DSS = STATE_MARIADB_QUERY; + } + }*/ + + switch (async_state_machine) { + case ASYNC_RESET_SESSION_SUCCESSFUL: + unknown_transaction_status = false; + async_state_machine = ASYNC_IDLE; + return 0; + break; + case ASYNC_RESET_SESSION_FAILED: + return -1; + break; + case ASYNC_RESET_SESSION_TIMEOUT: + return -2; + break; + case ASYNC_IDLE: + if (myds && myds->sess) { + if (myds->sess->active_transactions == 0) { + myds->sess->active_transactions = 1; + myds->sess->transaction_started_at = myds->sess->thread->curtime; + } + } + async_state_machine = ASYNC_RESET_SESSION_START; + default: + handler(event); + break; + } + + switch (async_state_machine) { + case ASYNC_RESET_SESSION_SUCCESSFUL: + if (myds && myds->sess) { + if (myds->sess->active_transactions != 0) { + myds->sess->active_transactions = 0; + myds->sess->transaction_started_at = 0; + } + } + unknown_transaction_status = false; + async_state_machine = ASYNC_IDLE; + return 0; + break; + case ASYNC_RESET_SESSION_FAILED: + if (myds && myds->sess) { + if (myds->sess->active_transactions != 0) { + myds->sess->active_transactions = 0; + myds->sess->transaction_started_at = 0; + } + } + return -1; + break; + case ASYNC_RESET_SESSION_TIMEOUT: + if (myds && myds->sess) { + if (myds->sess->active_transactions != 0) { + myds->sess->active_transactions = 0; + myds->sess->transaction_started_at = 0; + } + } + return -2; + break; + default: + break; + } + return 1; +} + // Returns: // 0 when the ping is completed successfully // -1 when the ping is completed not successfully @@ -4625,3 +4101,75 @@ void PgSQL_Connection::next_multi_statement_result(PGresult* result) { // copy buffer to PSarrayOut query_result->buffer_to_PSarrayOut(); } + +static int wait_for_pgsql(PGconn* pgsql_conn, int wait_event) { + struct pollfd pfd; + int timeout, res; + + pfd.fd = PQsocket(pgsql_conn); + pfd.events = + (wait_event & PG_EVENT_READ ? POLLIN : 0) | + (wait_event & PG_EVENT_WRITE ? POLLOUT : 0) | + (wait_event & PG_EVENT_EXCEPT ? POLLPRI : 0); + timeout = 1; + res = poll(&pfd, 1, timeout); + if (res == 0) + return PG_EVENT_TIMEOUT | wait_event; + else if (res < 0) + return PG_EVENT_TIMEOUT; + else { + int status = 0; + if (pfd.revents & POLLIN) status |= PG_EVENT_READ; + if (pfd.revents & POLLOUT) status |= PG_EVENT_WRITE; + if (pfd.revents & POLLPRI) status |= PG_EVENT_EXCEPT; + return status; + } +} + +void PgSQL_Connection::reset_session_start() { + PROXY_TRACE(); + assert(pgsql_conn); + reset_error(); + async_exit_status = PG_EVENT_NONE; + reset_session_in_txn = IsKnownActiveTransaction(); + if (PQsendQuery(pgsql_conn, (reset_session_in_txn == false ? "DISCARD ALL" : "ROLLBACK")) == 0) { + // WARNING: DO NOT RELEASE this PGresult + const PGresult* result = PQgetResultFromPGconn(pgsql_conn); + set_error_from_result(result); + proxy_error("Failed to send query. %s\n", get_error_code_with_message().c_str()); + return; + } + flush(); +} + +void PgSQL_Connection::reset_session_cont(short event) { + PROXY_TRACE(); + proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6, "event=%d\n", event); + reset_error(); + async_exit_status = PG_EVENT_NONE; + if (event & POLLOUT) { + flush(); + return; + } + + if (PQconsumeInput(pgsql_conn) == 0) { + // WARNING: DO NOT RELEASE this PGresult + const PGresult* result = PQgetResultFromPGconn(pgsql_conn); + /* We will only set the error if the result is not NULL or we didn't capture error in last call. If the result is NULL, + * it indicates that an error was already captured during a previous PQconsumeInput call, + * and we do not want to overwrite that information. + */ + if (result || is_error_present() == false) { + set_error_from_result(result); + proxy_error("Failed to consume input. %s\n", get_error_code_with_message().c_str()); + } + return; + } + + if (PQisBusy(pgsql_conn)) { + async_exit_status = PG_EVENT_READ; + return; + } + + pgsql_result = PQgetResult(pgsql_conn); +} diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index 7aa786e19..43bee338b 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -1299,12 +1299,10 @@ void PgSQL_Data_Stream::return_MySQL_Connection_To_Pool() { ) { if (pgsql_thread___reset_connection_algorithm == 2) { sess->create_new_session_and_reset_connection(this); - } - else { + } else { destroy_MySQL_Connection_From_Pool(true); } - } - else { + } else { detach_connection(); unplug_backend(); #ifdef STRESSTEST_POOL diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index ff39f1e57..c8f3335ca 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -1473,44 +1473,36 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() { // we recreate local_stmts : see issue #752 delete myconn->local_stmts; myconn->local_stmts = new MySQL_STMTs_local_v14(false); // false by default, it is a backend - int rc = myconn->async_change_user(myds->revents); + int rc = myconn->async_reset_session(myds->revents); if (rc == 0) { - __sync_fetch_and_add(&PgHGM->status.backend_change_user, 1); - //myds->myconn->userinfo->set(client_myds->myconn->userinfo); + //__sync_fetch_and_add(&PgHGM->status.backend_change_user, 1); myds->myconn->reset(); myds->DSS = STATE_MARIADB_GENERIC; myconn->async_state_machine = ASYNC_IDLE; - // if (pgsql_thread___multiplexing && (myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) { myds->return_MySQL_Connection_To_Pool(); - // } else { - // myds->destroy_MySQL_Connection_From_Pool(true); - // } delete mybe->server_myds; mybe->server_myds = NULL; set_status(session_status___NONE); return -1; - } - else { + } else { if (rc == -1 || rc == -2) { if (rc == -2) { - proxy_error("Change user timeout during COM_CHANGE_USER on %s , %d\n", myconn->parent->address, myconn->parent->port); + proxy_error("Resetting Connection timeout during Reset Session on %s , %d\n", myconn->parent->address, myconn->parent->port); PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_CHANGE_USER_TIMEOUT); - } - else { // rc==-1 - int myerr = mysql_errno(myconn->pgsql); + } else { // rc==-1 + const bool error_present = myconn->is_error_present(); PgHGM->p_update_pgsql_error_counter( p_pgsql_error_type::pgsql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, - (myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV) + (error_present ? 9999 : ER_PROXYSQL_OFFLINE_SRV) // TOFIX: 9999 is a placeholder for the actual error code ); - if (myerr != 0) { - proxy_error("Detected an error during COM_CHANGE_USER on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myerr, mysql_error(myconn->pgsql)); - } - else { + if (error_present) { + proxy_error("Detected an error during Reset Session on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, myds->fd, myds->myconn->fd, myconn->get_error_code_with_message().c_str()); + } else { proxy_error( - "Detected an error during COM_CHANGE_USER on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", + "Detected an error during Reset Session on (%d,%s,%d) , FD (Conn:%d , MyDS:%d) : %d, %s\n", myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, @@ -1523,12 +1515,9 @@ int PgSQL_Session::handler_again___status_RESETTING_CONNECTION() { } myds->destroy_MySQL_Connection_From_Pool(false); myds->fd = 0; - //delete mybe->server_myds; - //mybe->server_myds=NULL; RequestEnd(myds); //fix bug #682 return -1; - } - else { + } else { // rc==1 , nothing to do for now if (myds->mypolls == NULL) { thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, thread->curtime); @@ -7728,22 +7717,19 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon uint64_t delay_us = delay_multiplex_us > auto_increment_delay_us ? delay_multiplex_us : auto_increment_delay_us; myds->wait_until = thread->curtime + delay_us; - } - else { + } else { myds->wait_until = thread->curtime + pgsql_thread___connection_delay_multiplex_ms * 1000; } myconn->async_state_machine = ASYNC_IDLE; myconn->multiplex_delayed = true; myds->DSS = STATE_MARIADB_GENERIC; - } - else if (prepared_stmt_with_no_params == true) { // see issue #1432 + } else if (prepared_stmt_with_no_params == true) { // see issue #1432 myconn->async_state_machine = ASYNC_IDLE; myds->DSS = STATE_MARIADB_GENERIC; myds->wait_until = 0; myconn->multiplex_delayed = false; - } - else { + } else { myconn->multiplex_delayed = false; myds->wait_until = 0; myds->DSS = STATE_NOT_INITIALIZED;