From dcbe140199e884aa6af3f01235e9d78eac6852f3 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Mon, 20 May 2024 12:23:58 +0500 Subject: [PATCH] Route query to PostgreSQL backend server --- include/PgSQL_Connection.h | 154 +++- include/PgSQL_Protocol.h | 75 +- include/PgSQL_Session.h | 2 +- include/proxysql.h | 2 +- include/proxysql_structs.h | 6 +- lib/Makefile | 6 +- lib/PgSQL_Connection.cpp | 1725 ++++++++++++++++++++++++++++++++++-- lib/PgSQL_Protocol.cpp | 555 +++++++++++- lib/PgSQL_Session.cpp | 251 +++++- lib/PgSQL_Thread.cpp | 2 +- lib/ProxySQL_Admin.cpp | 5 +- src/Makefile | 8 +- 12 files changed, 2630 insertions(+), 161 deletions(-) diff --git a/include/PgSQL_Connection.h b/include/PgSQL_Connection.h index bab7641c2..836c78de4 100644 --- a/include/PgSQL_Connection.h +++ b/include/PgSQL_Connection.h @@ -8,7 +8,7 @@ using json = nlohmann::json; class PgSQL_SrvC; - +class PgSQL_ResultSet; //#define STATUS_MYSQL_CONNECTION_TRANSACTION 0x00000001 // DEPRECATED #define STATUS_MYSQL_CONNECTION_COMPRESSION 0x00000002 #define STATUS_MYSQL_CONNECTION_USER_VARIABLE 0x00000004 @@ -172,6 +172,10 @@ static const Param_Name_Validation* PgSQL_Param_Name_Accepted_Values[PG_PARAM_SI &load_balance_hosts }; +#define PG_EVENT_NONE 0x00 +#define PG_EVENT_READ 0x01 +#define PG_EVENT_WRITE 0x02 + class PgSQL_Conn_Param { private: bool validate(PgSQL_Param_Name key, const char* val) { @@ -263,7 +267,6 @@ enum pgsql_charset_action { POSTGRESQL_CHARSET_ACTION_CONNECT_START }; - class PgSQL_Connection_userinfo { private: uint64_t compute_hash(); @@ -282,7 +285,7 @@ class PgSQL_Connection_userinfo { bool set_schemaname(char *, int); }; -class PgSQL_Connection { +class PgSQL_Connection_Placeholder { private: void update_warning_count_from_connection(); void update_warning_count_from_statement(); @@ -371,7 +374,7 @@ class PgSQL_Connection { uint32_t status_flags; int async_exit_status; // exit status of MariaDB Client Library Non blocking API int interr; // integer return - MDB_ASYNC_ST async_state_machine; // Async state machine + PG_ASYNC_ST async_state_machine; // Async state machine short wait_events; uint8_t compression_pkt_id; my_bool ret_bool; @@ -383,8 +386,8 @@ class PgSQL_Connection { bool unknown_transaction_status; void compute_unknown_transaction_status(); char gtid_uuid[128]; - PgSQL_Connection(); - ~PgSQL_Connection(); + PgSQL_Connection_Placeholder(); + ~PgSQL_Connection_Placeholder(); bool set_autocommit(bool); bool set_no_backslash_escapes(bool); unsigned int set_charset(unsigned int, enum pgsql_charset_action); @@ -414,8 +417,8 @@ class PgSQL_Connection { void set_option_start(); void set_option_cont(short event); void set_query(char *stmt, unsigned long length); - MDB_ASYNC_ST handler(short event); - void next_event(MDB_ASYNC_ST new_st); + PG_ASYNC_ST handler(short event); + void next_event(PG_ASYNC_ST new_st); int async_connect(short event); int async_change_user(short event); @@ -481,4 +484,139 @@ class PgSQL_Connection { unsigned int number_of_matching_session_variables(const PgSQL_Connection *client_conn, unsigned int& not_matching); unsigned long get_mysql_thread_id() { return pgsql ? pgsql->thread_id : 0; } }; + +enum PG_ERROR_TYPE { + PG_NO_ERROR, + PG_CONNECT_FAILED, + PG_QUERY_FAILED, + PG_RESULT_FAILED, + +}; + +class PgSQL_Connection : public PgSQL_Connection_Placeholder { +public: + PgSQL_Connection(); + ~PgSQL_Connection(); + + PG_ASYNC_ST handler(short event); + void connect_start(); + void connect_cont(short event); + void query_start(); + void query_cont(short event); + void fetch_result_start(); + void fetch_result_cont(short event); + int async_connect(short event); + int async_set_autocommit(short event, bool ac); + int async_query(short event, char* stmt, unsigned long length, MYSQL_STMT** _stmt = NULL, stmt_execute_metadata_t* _stmt_meta = NULL); + int async_ping(short event); + void next_event(PG_ASYNC_ST new_st); + bool IsAutoCommit(); + bool is_connected() const; + void compute_unknown_transaction_status(); + void async_free_result(); + void flush(); + + std::string get_error_code_from_result() const; + + bool is_error_present() const { + return err_type != PG_NO_ERROR; + } + + PG_ERROR_TYPE get_error_type() const { + return err_type; + } + + std::string get_error_message() const { + return err_msg; + } + + void set_error(PG_ERROR_TYPE _err_type, const std::string& _err_msg) { + err_type = _err_type; + err_msg = _err_msg; + } + + void reset_error() { + err_type = PG_NO_ERROR; + err_msg.clear(); + } + + PGresult* get_last_result() const { + return last_result; + } + + void set_last_result(PGresult* res) { + if (last_result) { + PQclear(last_result); + } + + last_result = res; + } + + void reset_last_result() { + if (last_result) { + PQclear(last_result); + last_result = nullptr; + } + } + void optimize() {} + //PgSQL_Conn_Param conn_params; + + PGconn* pgsql_conn; + PGresult* last_result; + PgSQL_ResultSet* MyRS; + PgSQL_ResultSet* MyRS_reuse; + + PG_ERROR_TYPE err_type; + std::string err_msg; + //PgSQL_SrvC* parent; + //PgSQL_Connection_userinfo* userinfo; + //PgSQL_Data_Stream* myds; + //int fd; + + /*std::string get_error_code(PG_ERROR_TYPE* errtype = NULL) { + assert(pgsql_conn); + + std::string error_code = PGCONN_NO_ERROR; + + ConnStatusType status = PQstatus(pgsql_conn); + if (status == CONNECTION_BAD) { + if (errtype) *errtype = PG_CONNECTION_ERROR; + error_code = PQparameterStatus(pgsql_conn, "SQLSTATE"); + } + else if (pgsql_result != NULL) { + ExecStatusType status = PQresultStatus(pgsql_result); + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { + if (errtype) *errtype = PG_SERVER_ERROR; + error_code = PQresultErrorField(pgsql_result, PG_DIAG_SQLSTATE); + } + } + + return error_code; + } + + std::string get_error_message(PG_ERROR_TYPE* errtype = NULL) { + assert(pgsql_conn); + + std::string error_message{}; + + ConnStatusType status = PQstatus(pgsql_conn); + if (status == CONNECTION_BAD) { + if (errtype) *errtype = PG_CONNECTION_ERROR; + error_message = PQerrorMessage(pgsql_conn); + } + else if (pgsql_result != NULL) { + ExecStatusType status = PQresultStatus(pgsql_result); + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { + if (errtype) *errtype = PG_SERVER_ERROR; + error_message = PQresultErrorMessage(pgsql_result); + } + } + + return error_message; + }*/ +}; + + + + #endif /* __CLASS_POSTGRESQL_CONNECTION_H */ diff --git a/include/PgSQL_Protocol.h b/include/PgSQL_Protocol.h index a1fe4205e..95b534a6e 100644 --- a/include/PgSQL_Protocol.h +++ b/include/PgSQL_Protocol.h @@ -63,21 +63,40 @@ struct pgsql_hdr { PtrSize_t data; }; +struct PG_Field { + char* name; + uint32_t tbl_oid; + uint16_t col_idx; + uint32_t type_oid; + uint16_t col_len; + uint32_t type_mod; + uint16_t fmt; +}; + +using PG_Fields = std::vector; + class PG_pkt { public: PG_pkt(unsigned c = PG_PKT_DEFAULT_SIZE) { + ownership = true; capacity = l_near_pow_2(c); size = 0; ptr = (char*)malloc(capacity); multiple_pkt_mode = false; } + PG_pkt(void* _ptr, unsigned int _capacity) { + ownership = false; + ptr = (char*)_ptr; + capacity = _capacity; + size = 0; + } ~PG_pkt() { reset(); } void reset() { - if (ptr) + if (ptr && ownership == true) free(ptr); ptr = nullptr; size = 0; @@ -154,10 +173,51 @@ private: // currently for debug only. will replace this with a single variable that will contain last pkt offset std::vector pkt_offset; bool multiple_pkt_mode = false; - + bool ownership = true; friend void SQLite3_to_Postgres(PtrSizeArray* psa, SQLite3_result* result, char* error, int affected_rows, const char* query_type); }; +class PgSQL_Protocol; +class PgSQL_ResultSet { +public: + PgSQL_Data_Stream* ds; + PgSQL_Protocol* proto; + PGconn* pgsql_conn; + PGresult* result; + + PgSQL_ResultSet(); + ~PgSQL_ResultSet(); + + void init(PgSQL_Protocol* _proto, PGconn* _conn); + void buffer_init(PgSQL_Protocol* _proto); + + unsigned int add_row_description(PGresult* result); + unsigned int add_row(PGresult* result); + unsigned int add_eof(PGresult* result); + void add_err(PgSQL_Data_Stream* _myds); + bool get_resultset(PtrSizeArray* PSarrayFinal); + + void buffer_to_PSarrayOut(bool _last = false); + unsigned long long current_size(); + + + +//private: + unsigned char* buffer; + unsigned int buffer_used; + + uint8_t sid; + bool transfer_started; + bool resultset_completed; + + unsigned int num_fields; + unsigned long long num_rows; + unsigned long long resultset_size; + PtrSizeArray PSarrayOUT; + + friend class PgSQL_Protocol; +}; + class PgSQL_Protocol : public MySQL_Protocol { public: void init(PgSQL_Data_Stream** __myds, PgSQL_Connection_userinfo* __userinfo, PgSQL_Session* __sess) { @@ -166,14 +226,21 @@ public: sess = __sess; current_PreStmt = NULL; } + PgSQL_Data_Stream* get_myds() { return *myds; } bool generate_pkt_initial_handshake(bool send, void** ptr, unsigned int* len, uint32_t* thread_id, bool deprecate_eof_active) override; bool process_startup_packet(unsigned char* pkt, unsigned int len, bool& ssl_request); EXECUTION_STATE process_handshake_response_packet(unsigned char* pkt, unsigned int len); void welcome_client(); - void generate_error_packet(bool send_ready, const char* msg, const char* code, bool fatal); - bool generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query); + void generate_error_packet(bool send, bool ready, const char* msg, const char* code, bool fatal, PtrSize_t* _ptr = NULL); + bool generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query, PtrSize_t* _ptr = NULL); + + //bool generate_row_description(bool send, PgSQL_ResultSet* rs, const PG_Fields& fields, unsigned int size); + + unsigned int copy_row_description_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result); + unsigned int copy_row_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result); + unsigned int copy_eof_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result); private: bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr); void load_conn_parameters(pgsql_hdr* pkt, bool startup); diff --git a/include/PgSQL_Session.h b/include/PgSQL_Session.h index f91d5d403..68710e000 100644 --- a/include/PgSQL_Session.h +++ b/include/PgSQL_Session.h @@ -359,7 +359,7 @@ public: PgSQL_Backend* find_or_create_backend(int, PgSQL_Data_Stream* _myds = NULL); void SQLite3_to_MySQL(SQLite3_result*, char*, int, MySQL_Protocol*, bool in_transaction = false, bool deprecate_eof_active = false); - void MySQL_Result_to_MySQL_wire(MYSQL* mysql, MySQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds = NULL); + void PgSQL_Result_to_PgSQL_wire(PGconn* mysql, PgSQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds = NULL); void MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT* stmt, PgSQL_Connection* myconn); unsigned int NumActiveTransactions(bool check_savpoint = false); bool HasOfflineBackends(); diff --git a/include/proxysql.h b/include/proxysql.h index 522bd4d1d..95a0a75b2 100644 --- a/include/proxysql.h +++ b/include/proxysql.h @@ -52,7 +52,7 @@ #include "mysql.h" #include "mariadb_com.h" - +#include "libpq-fe.h" #include "proxysql_mem.h" #include "proxysql_structs.h" diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 5d502d4bf..831e866da 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -47,7 +47,7 @@ enum cred_username_type { USERNAME_BACKEND, USERNAME_FRONTEND, USERNAME_NONE }; #define PROXYSQL_USE_RESULT -enum MDB_ASYNC_ST { // MariaDB Async State Machine +enum ASYNC_ST { // MariaDB Async State Machine ASYNC_CONNECT_START, ASYNC_CONNECT_CONT, ASYNC_CONNECT_END, @@ -122,6 +122,10 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine ASYNC_IDLE }; +using MDB_ASYNC_ST = ASYNC_ST; +using PG_ASYNC_ST = ASYNC_ST; + + // list of possible debugging modules enum debug_module { PROXY_DEBUG_GENERIC, diff --git a/lib/Makefile b/lib/Makefile index ae6e4b526..0ebb36a31 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -65,9 +65,13 @@ LIBSCRAM_PATH=$(DEPS_PATH)/libscram/ LIBSCRAM_IDIR=$(LIBSCRAM_PATH)/include/ LIBSCRAM_LDIR=$(LIBSCRAM_PATH)/lib/ +POSTGRES_PATH=$(DEPS_PATH)/postgresql/postgresql/src/ +POSTGRES_IFACE=$(POSTGRES_PATH)/interfaces/libpq/ -I$(POSTGRES_PATH)/include +POSTGRES_LDIR=$(POSTGRES_IFACES)/libpq/ + IDIR := ../include -IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(SSL_IDIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR) +IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(SSL_IDIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR) -I$(POSTGRES_IFACE) ifeq ($(UNAME_S),Linux) IDIRS += -I$(COREDUMPER_IDIR) endif diff --git a/lib/PgSQL_Connection.cpp b/lib/PgSQL_Connection.cpp index 7b5acf392..caebdd9e6 100644 --- a/lib/PgSQL_Connection.cpp +++ b/lib/PgSQL_Connection.cpp @@ -266,7 +266,7 @@ PgSQL_Connection_userinfo::~PgSQL_Connection_userinfo() { if (schemaname) free(schemaname); } -void PgSQL_Connection::compute_unknown_transaction_status() { +void PgSQL_Connection_Placeholder::compute_unknown_transaction_status() { if (pgsql) { int _myerrno=mysql_errno(pgsql); if (_myerrno == 0) { @@ -392,7 +392,7 @@ bool PgSQL_Connection_userinfo::set_schemaname(char *_new, int l) { -PgSQL_Connection::PgSQL_Connection() { +PgSQL_Connection_Placeholder::PgSQL_Connection_Placeholder() { pgsql=NULL; async_state_machine=ASYNC_CONNECT_START; ret_mysql=NULL; @@ -452,7 +452,7 @@ PgSQL_Connection::PgSQL_Connection() { memset(&connected_host_details, 0, sizeof(connected_host_details)); }; -PgSQL_Connection::~PgSQL_Connection() { +PgSQL_Connection_Placeholder::~PgSQL_Connection_Placeholder() { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Destroying PgSQL_Connection %p\n", this); if (options.server_version) free(options.server_version); if (options.init_connect) free(options.init_connect); @@ -509,20 +509,23 @@ PgSQL_Connection::~PgSQL_Connection() { } } - if (connected_host_details.hostname) + if (connected_host_details.hostname) { free(connected_host_details.hostname); - - if (connected_host_details.ip) + connected_host_details.hostname = NULL; + } + if (connected_host_details.ip) { free(connected_host_details.ip); + connected_host_details.hostname = NULL; + } }; -bool PgSQL_Connection::set_autocommit(bool _ac) { +bool PgSQL_Connection_Placeholder::set_autocommit(bool _ac) { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Setting autocommit %d\n", _ac); options.autocommit=_ac; return _ac; } -bool PgSQL_Connection::set_no_backslash_escapes(bool _ac) { +bool PgSQL_Connection_Placeholder::set_no_backslash_escapes(bool _ac) { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Setting no_backslash_escapes %d\n", _ac); options.no_backslash_escapes=_ac; return _ac; @@ -530,7 +533,7 @@ bool PgSQL_Connection::set_no_backslash_escapes(bool _ac) { void print_backtrace(void); -unsigned int PgSQL_Connection::set_charset(unsigned int _c, enum pgsql_charset_action action) { +unsigned int PgSQL_Connection_Placeholder::set_charset(unsigned int _c, enum pgsql_charset_action action) { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Setting charset %d\n", _c); // SQL_CHARACTER_SET should be set befor setting SQL_CHRACTER_ACTION @@ -548,7 +551,7 @@ unsigned int PgSQL_Connection::set_charset(unsigned int _c, enum pgsql_charset_a return _c; } -void PgSQL_Connection::update_warning_count_from_connection() { +void PgSQL_Connection_Placeholder::update_warning_count_from_connection() { // if a prepared statement was cached while 'mysql_thread_query_digest' was true, and subsequently, // 'mysql_thread_query_digest' is set to false, fetching that statement from the cache may still contain the digest text. // To prevent this, we will check the digest text in conjunction with 'mysql_thread_query_digest' to verify whether it @@ -565,7 +568,7 @@ void PgSQL_Connection::update_warning_count_from_connection() { } } -void PgSQL_Connection::update_warning_count_from_statement() { +void PgSQL_Connection_Placeholder::update_warning_count_from_statement() { // if a prepared statement was cached while 'mysql_thread_query_digest' was true, and subsequently, // 'mysql_thread_query_digest' is set to false, fetching that statement from the cache may still contain the digest text. // To prevent this, we will check the digest text in conjunction with 'mysql_thread_query_digest' to verify whether it @@ -578,13 +581,13 @@ void PgSQL_Connection::update_warning_count_from_statement() { } } -bool PgSQL_Connection::is_expired(unsigned long long timeout) { +bool PgSQL_Connection_Placeholder::is_expired(unsigned long long timeout) { // FIXME: here the check should be a sanity check // FIXME: for now this is just a temporary (and stupid) check return false; } -void PgSQL_Connection::set_status(bool set, uint32_t status_flag) { +void PgSQL_Connection_Placeholder::set_status(bool set, uint32_t status_flag) { if (set) { this->status_flags |= status_flag; } else { @@ -592,11 +595,11 @@ void PgSQL_Connection::set_status(bool set, uint32_t status_flag) { } } -bool PgSQL_Connection::get_status(uint32_t status_flag) { +bool PgSQL_Connection_Placeholder::get_status(uint32_t status_flag) { return this->status_flags & status_flag; } -void PgSQL_Connection::set_status_sql_log_bin0(bool v) { +void PgSQL_Connection_Placeholder::set_status_sql_log_bin0(bool v) { if (v) { status_flags |= STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0; } else { @@ -604,11 +607,11 @@ void PgSQL_Connection::set_status_sql_log_bin0(bool v) { } } -bool PgSQL_Connection::get_status_sql_log_bin0() { +bool PgSQL_Connection_Placeholder::get_status_sql_log_bin0() { return status_flags & STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0; } -bool PgSQL_Connection::requires_CHANGE_USER(const PgSQL_Connection *client_conn) { +bool PgSQL_Connection_Placeholder::requires_CHANGE_USER(const PgSQL_Connection *client_conn) { char *username = client_conn->userinfo->username; if (strcmp(userinfo->username,username)) { // the two connections use different usernames @@ -647,7 +650,7 @@ bool PgSQL_Connection::requires_CHANGE_USER(const PgSQL_Connection *client_conn) return false; } -unsigned int PgSQL_Connection::reorder_dynamic_variables_idx() { +unsigned int PgSQL_Connection_Placeholder::reorder_dynamic_variables_idx() { dynamic_variables_idx.clear(); // note that we are inserting the index already ordered for (auto i = SQL_NAME_LAST_LOW_WM + 1 ; i < SQL_NAME_LAST_HIGH_WM ; i++) { @@ -659,7 +662,7 @@ unsigned int PgSQL_Connection::reorder_dynamic_variables_idx() { return r; } -unsigned int PgSQL_Connection::number_of_matching_session_variables(const PgSQL_Connection *client_conn, unsigned int& not_matching) { +unsigned int PgSQL_Connection_Placeholder::number_of_matching_session_variables(const PgSQL_Connection *client_conn, unsigned int& not_matching) { unsigned int ret=0; for (auto i = 0; i < SQL_NAME_LAST_LOW_WM; i++) { if (client_conn->var_hash[i] && i != SQL_CHARACTER_ACTION) { // client has a variable set @@ -694,7 +697,7 @@ unsigned int PgSQL_Connection::number_of_matching_session_variables(const PgSQL_ } -bool PgSQL_Connection::match_tracked_options(const PgSQL_Connection *c) { +bool PgSQL_Connection_Placeholder::match_tracked_options(const PgSQL_Connection *c) { uint32_t cf1 = options.client_flag; // own client flags uint32_t cf2 = c->options.client_flag; // other client flags if ((cf1 & CLIENT_FOUND_ROWS) == (cf2 & CLIENT_FOUND_ROWS)) { @@ -710,7 +713,7 @@ bool PgSQL_Connection::match_tracked_options(const PgSQL_Connection *c) { } // non blocking API -void PgSQL_Connection::connect_start() { +void PgSQL_Connection_Placeholder::connect_start() { PROXY_TRACE(); pgsql=mysql_init(NULL); assert(pgsql); @@ -905,12 +908,12 @@ void PgSQL_Connection::connect_start() { // } } -void PgSQL_Connection::connect_cont(short event) { +void PgSQL_Connection_Placeholder::connect_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_real_connect_cont(&ret_mysql, pgsql, mysql_status(event, true)); } -void PgSQL_Connection::change_user_start() { +void PgSQL_Connection_Placeholder::change_user_start() { PROXY_TRACE(); //fprintf(stderr,"change_user_start FD %d\n", fd); PgSQL_Connection_userinfo *_ui = NULL; @@ -937,35 +940,35 @@ void PgSQL_Connection::change_user_start() { async_exit_status = mysql_change_user_start(&ret_bool,pgsql,_ui->username, auth_password, _ui->schemaname); } -void PgSQL_Connection::change_user_cont(short event) { +void PgSQL_Connection_Placeholder::change_user_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_change_user_cont(&ret_bool, pgsql, mysql_status(event, true)); } -void PgSQL_Connection::ping_start() { +void PgSQL_Connection_Placeholder::ping_start() { PROXY_TRACE(); //fprintf(stderr,"ping_start FD %d\n", fd); async_exit_status = mysql_ping_start(&interr,pgsql); } -void PgSQL_Connection::ping_cont(short event) { +void PgSQL_Connection_Placeholder::ping_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); //fprintf(stderr,"ping_cont FD %d, event %d\n", fd, event); async_exit_status = mysql_ping_cont(&interr,pgsql, mysql_status(event, true)); } -void PgSQL_Connection::initdb_start() { +void PgSQL_Connection_Placeholder::initdb_start() { PROXY_TRACE(); PgSQL_Connection_userinfo *client_ui=myds->sess->client_myds->myconn->userinfo; async_exit_status = mysql_select_db_start(&interr,pgsql,client_ui->schemaname); } -void PgSQL_Connection::initdb_cont(short event) { +void PgSQL_Connection_Placeholder::initdb_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_select_db_cont(&interr,pgsql, mysql_status(event, true)); } -void PgSQL_Connection::set_option_start() { +void PgSQL_Connection_Placeholder::set_option_start() { PROXY_TRACE(); enum_mysql_set_option set_option; @@ -973,22 +976,22 @@ void PgSQL_Connection::set_option_start() { async_exit_status = mysql_set_server_option_start(&interr,pgsql,set_option); } -void PgSQL_Connection::set_option_cont(short event) { +void PgSQL_Connection_Placeholder::set_option_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_set_server_option_cont(&interr,pgsql, mysql_status(event, true)); } -void PgSQL_Connection::set_autocommit_start() { +void PgSQL_Connection_Placeholder::set_autocommit_start() { PROXY_TRACE(); async_exit_status = mysql_autocommit_start(&ret_bool, pgsql, options.autocommit); } -void PgSQL_Connection::set_autocommit_cont(short event) { +void PgSQL_Connection_Placeholder::set_autocommit_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_autocommit_cont(&ret_bool, pgsql, mysql_status(event, true)); } -void PgSQL_Connection::set_names_start() { +void PgSQL_Connection_Placeholder::set_names_start() { PROXY_TRACE(); const MARIADB_CHARSET_INFO * c = proxysql_find_charset_nr(atoi(pgsql_variables.client_get_value(myds->sess, SQL_CHARACTER_SET))); if (!c) { @@ -1000,12 +1003,12 @@ void PgSQL_Connection::set_names_start() { async_exit_status = mysql_set_character_set_start(&interr,pgsql, NULL, atoi(pgsql_variables.client_get_value(myds->sess, SQL_CHARACTER_SET))); } -void PgSQL_Connection::set_names_cont(short event) { +void PgSQL_Connection_Placeholder::set_names_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_set_character_set_cont(&interr,pgsql, mysql_status(event, true)); } -void PgSQL_Connection::set_query(char *stmt, unsigned long length) { +void PgSQL_Connection_Placeholder::set_query(char *stmt, unsigned long length) { query.length=length; query.ptr=stmt; if (length > largest_query_length) { @@ -1016,18 +1019,18 @@ void PgSQL_Connection::set_query(char *stmt, unsigned long length) { } } -void PgSQL_Connection::real_query_start() { +void PgSQL_Connection_Placeholder::real_query_start() { PROXY_TRACE(); async_exit_status = mysql_real_query_start(&interr , pgsql, query.ptr, query.length); } -void PgSQL_Connection::real_query_cont(short event) { +void PgSQL_Connection_Placeholder::real_query_cont(short event) { if (event == 0) return; proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_real_query_cont(&interr ,pgsql , mysql_status(event, true)); } -void PgSQL_Connection::stmt_prepare_start() { +void PgSQL_Connection_Placeholder::stmt_prepare_start() { PROXY_TRACE(); query.stmt=mysql_stmt_init(pgsql); my_bool my_arg=true; @@ -1035,12 +1038,12 @@ void PgSQL_Connection::stmt_prepare_start() { async_exit_status = mysql_stmt_prepare_start(&interr , query.stmt, query.ptr, query.length); } -void PgSQL_Connection::stmt_prepare_cont(short event) { +void PgSQL_Connection_Placeholder::stmt_prepare_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_stmt_prepare_cont(&interr , query.stmt , mysql_status(event, true)); } -void PgSQL_Connection::stmt_execute_start() { +void PgSQL_Connection_Placeholder::stmt_execute_start() { PROXY_TRACE(); int _rc=0; assert(query.stmt->mysql); // if we reached here, we hit bug #740 @@ -1057,40 +1060,40 @@ void PgSQL_Connection::stmt_execute_start() { async_exit_status = mysql_stmt_execute_start(&interr , query.stmt); } -void PgSQL_Connection::stmt_execute_cont(short event) { +void PgSQL_Connection_Placeholder::stmt_execute_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_stmt_execute_cont(&interr , query.stmt , mysql_status(event, true)); } -void PgSQL_Connection::stmt_execute_store_result_start() { +void PgSQL_Connection_Placeholder::stmt_execute_store_result_start() { PROXY_TRACE(); async_exit_status = mysql_stmt_store_result_start(&interr, query.stmt); } -void PgSQL_Connection::stmt_execute_store_result_cont(short event) { +void PgSQL_Connection_Placeholder::stmt_execute_store_result_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_stmt_store_result_cont(&interr , query.stmt , mysql_status(event, true)); } #ifndef PROXYSQL_USE_RESULT -void PgSQL_Connection::store_result_start() { +void PgSQL_Connection_Placeholder::store_result_start() { PROXY_TRACE(); async_exit_status = mysql_store_result_start(&mysql_result, pgsql); } -void PgSQL_Connection::store_result_cont(short event) { +void PgSQL_Connection_Placeholder::store_result_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); async_exit_status = mysql_store_result_cont(&mysql_result , pgsql , mysql_status(event, true)); } #endif // PROXYSQL_USE_RESULT -void PgSQL_Connection::set_is_client() { +void PgSQL_Connection_Placeholder::set_is_client() { //-- local_stmts->set_is_client(myds->sess); } #define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) -MDB_ASYNC_ST PgSQL_Connection::handler(short event) { +PG_ASYNC_ST PgSQL_Connection_Placeholder::handler(short event) { unsigned long long processed_bytes=0; // issue #527 : this variable will store the amount of bytes processed during this event if (pgsql==NULL) { // it is the first time handler() is being called @@ -1841,7 +1844,7 @@ handler_again: return async_state_machine; } -void PgSQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(unsigned long long& processed_bytes) { +void PgSQL_Connection_Placeholder::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(unsigned long long& processed_bytes) { PROXY_TRACE2(); // there is more than 1 row unsigned long long total_size=0; @@ -1907,7 +1910,7 @@ void PgSQL_Connection::process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(unsi bytes_info.bytes_recv += total_size; } -void PgSQL_Connection::next_event(MDB_ASYNC_ST new_st) { +void PgSQL_Connection_Placeholder::next_event(PG_ASYNC_ST new_st) { #ifdef DEBUG int fd; #endif /* DEBUG */ @@ -1948,7 +1951,7 @@ void PgSQL_Connection::next_event(MDB_ASYNC_ST new_st) { }; -int PgSQL_Connection::async_connect(short event) { +int PgSQL_Connection_Placeholder::async_connect(short event) { PROXY_TRACE(); if (pgsql==NULL && async_state_machine!=ASYNC_CONNECT_START) { // LCOV_EXCL_START @@ -1987,7 +1990,7 @@ int PgSQL_Connection::async_connect(short event) { } -bool PgSQL_Connection::IsServerOffline() { +bool PgSQL_Connection_Placeholder::IsServerOffline() { bool ret=false; if (parent==NULL) return ret; @@ -2008,7 +2011,7 @@ bool PgSQL_Connection::IsServerOffline() { // 0 when the query is completed // 1 when the query is not completed // the calling function should check pgsql error in pgsql struct -int PgSQL_Connection::async_query(short event, char *stmt, unsigned long length, MYSQL_STMT **_stmt, stmt_execute_metadata_t *stmt_meta) { +int PgSQL_Connection_Placeholder::async_query(short event, char *stmt, unsigned long length, MYSQL_STMT **_stmt, stmt_execute_metadata_t *stmt_meta) { PROXY_TRACE(); PROXY_TRACE2(); assert(pgsql); @@ -2105,7 +2108,7 @@ int PgSQL_Connection::async_query(short event, char *stmt, unsigned long length, // 1 when the ping is not completed // -2 on timeout // the calling function should check pgsql error in pgsql struct -int PgSQL_Connection::async_ping(short event) { +int PgSQL_Connection_Placeholder::async_ping(short event) { PROXY_TRACE(); assert(pgsql); assert(ret_mysql); @@ -2148,7 +2151,7 @@ int PgSQL_Connection::async_ping(short event) { return 1; } -int PgSQL_Connection::async_change_user(short event) { +int PgSQL_Connection_Placeholder::async_change_user(short event) { PROXY_TRACE(); assert(pgsql); assert(ret_mysql); @@ -2195,7 +2198,7 @@ int PgSQL_Connection::async_change_user(short event) { return 1; } -int PgSQL_Connection::async_select_db(short event) { +int PgSQL_Connection_Placeholder::async_select_db(short event) { PROXY_TRACE(); assert(pgsql); assert(ret_mysql); @@ -2236,7 +2239,7 @@ int PgSQL_Connection::async_select_db(short event) { return 1; } -int PgSQL_Connection::async_set_autocommit(short event, bool ac) { +int PgSQL_Connection_Placeholder::async_set_autocommit(short event, bool ac) { PROXY_TRACE(); assert(pgsql); assert(ret_mysql); @@ -2279,7 +2282,7 @@ int PgSQL_Connection::async_set_autocommit(short event, bool ac) { return 1; } -int PgSQL_Connection::async_set_names(short event, unsigned int c) { +int PgSQL_Connection_Placeholder::async_set_names(short event, unsigned int c) { PROXY_TRACE(); assert(pgsql); assert(ret_mysql); @@ -2322,7 +2325,7 @@ int PgSQL_Connection::async_set_names(short event, unsigned int c) { return 1; } -int PgSQL_Connection::async_set_option(short event, bool mask) { +int PgSQL_Connection_Placeholder::async_set_option(short event, bool mask) { PROXY_TRACE(); assert(pgsql); assert(ret_mysql); @@ -2367,7 +2370,7 @@ int PgSQL_Connection::async_set_option(short event, bool mask) { return 1; } -void PgSQL_Connection::async_free_result() { +void PgSQL_Connection_Placeholder::async_free_result() { PROXY_TRACE(); assert(pgsql); //assert(ret_mysql); @@ -2420,7 +2423,7 @@ void PgSQL_Connection::async_free_result() { // This function check if autocommit=0 and if there are any savepoint. // this is an attempt to mitigate MySQL bug https://bugs.pgsql.com/bug.php?id=107875 -bool PgSQL_Connection::AutocommitFalse_AndSavepoint() { +bool PgSQL_Connection_Placeholder::AutocommitFalse_AndSavepoint() { bool ret=false; if (IsAutoCommit() == false) { if (get_status(STATUS_MYSQL_CONNECTION_HAS_SAVEPOINT) == true) { @@ -2430,7 +2433,7 @@ bool PgSQL_Connection::AutocommitFalse_AndSavepoint() { return ret; } -bool PgSQL_Connection::IsKnownActiveTransaction() { +bool PgSQL_Connection_Placeholder::IsKnownActiveTransaction() { bool in_trx = pgsql ? pgsql->server_status & SERVER_STATUS_IN_TRANS : false; if (in_trx == false) { @@ -2440,7 +2443,7 @@ bool PgSQL_Connection::IsKnownActiveTransaction() { return in_trx; } -bool PgSQL_Connection::IsActiveTransaction() { +bool PgSQL_Connection_Placeholder::IsActiveTransaction() { bool ret=false; if (pgsql) { ret = (pgsql->server_status & SERVER_STATUS_IN_TRANS); @@ -2466,7 +2469,7 @@ bool PgSQL_Connection::IsActiveTransaction() { } -bool PgSQL_Connection::IsAutoCommit() { +bool PgSQL_Connection_Placeholder::IsAutoCommit() { bool ret=false; if (pgsql) { ret = (pgsql->server_status & SERVER_STATUS_AUTOCOMMIT); @@ -2489,7 +2492,7 @@ bool PgSQL_Connection::IsAutoCommit() { return ret; } -bool PgSQL_Connection::MultiplexDisabled(bool check_delay_token) { +bool PgSQL_Connection_Placeholder::MultiplexDisabled(bool check_delay_token) { // status_flags stores information about the status of the connection // can be used to determine if multiplexing can be enabled or not bool ret=false; @@ -2503,7 +2506,7 @@ bool PgSQL_Connection::MultiplexDisabled(bool check_delay_token) { return ret; } -bool PgSQL_Connection::IsKeepMultiplexEnabledVariables(char *query_digest_text) { +bool PgSQL_Connection_Placeholder::IsKeepMultiplexEnabledVariables(char *query_digest_text) { if (query_digest_text==NULL) return true; char *query_digest_text_filter_select = NULL; @@ -2599,7 +2602,7 @@ bool PgSQL_Connection::IsKeepMultiplexEnabledVariables(char *query_digest_text) return true; } -void PgSQL_Connection::ProcessQueryAndSetStatusFlags(char *query_digest_text) { +void PgSQL_Connection_Placeholder::ProcessQueryAndSetStatusFlags(char *query_digest_text) { if (query_digest_text==NULL) return; // unknown what to do with multiplex int mul=-1; @@ -2764,7 +2767,7 @@ void PgSQL_Connection::ProcessQueryAndSetStatusFlags(char *query_digest_text) { } } -void PgSQL_Connection::optimize() { +void PgSQL_Connection_Placeholder::optimize() { if (pgsql->net.max_packet > 65536) { // FIXME: temporary, maybe for very long time . This needs to become a global variable if ( ( pgsql->net.buff == pgsql->net.read_pos ) && ( pgsql->net.read_pos == pgsql->net.write_pos ) ) { free(pgsql->net.buff); @@ -2781,7 +2784,7 @@ void PgSQL_Connection::optimize() { // close_mysql() is a replacement for mysql_close() // if avoids that a QUIT command stops forever // FIXME: currently doesn't support encryption and compression -void PgSQL_Connection::close_mysql() { +void PgSQL_Connection_Placeholder::close_mysql() { if ((send_quit) && (pgsql->net.pvio) && ret_mysql) { char buff[5]; mysql_hdr myhdr; @@ -2804,7 +2807,7 @@ void PgSQL_Connection::close_mysql() { // this function is identical to async_query() , with the only exception that MyRS should never be set -int PgSQL_Connection::async_send_simple_command(short event, char *stmt, unsigned long length) { +int PgSQL_Connection_Placeholder::async_send_simple_command(short event, char *stmt, unsigned long length) { PROXY_TRACE(); assert(pgsql); assert(ret_mysql); @@ -2861,7 +2864,7 @@ int PgSQL_Connection::async_send_simple_command(short event, char *stmt, unsigne return 1; } -void PgSQL_Connection::reset() { +void PgSQL_Connection_Placeholder::reset() { bool old_no_multiplex_hg = get_status(STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG); bool old_compress = get_status(STATUS_MYSQL_CONNECTION_COMPRESSION); status_flags=0; @@ -2908,7 +2911,7 @@ void PgSQL_Connection::reset() { } } -bool PgSQL_Connection::get_gtid(char *buff, uint64_t *trx_id) { +bool PgSQL_Connection_Placeholder::get_gtid(char *buff, uint64_t *trx_id) { // note: current implementation for for OWN GTID only! bool ret = false; if (buff==NULL || trx_id == NULL) { @@ -2939,3 +2942,1583 @@ bool PgSQL_Connection::get_gtid(char *buff, uint64_t *trx_id) { } return ret; } + + + +PgSQL_Connection::PgSQL_Connection() { + pgsql_conn = NULL; + last_result = NULL; + MyRS = NULL; + MyRS_reuse = NULL; + reset_error(); +} + +PgSQL_Connection::~PgSQL_Connection() { + reset_last_result(); + if (userinfo) { + delete userinfo; + userinfo = NULL; + } + if (pgsql_conn) { + PQfinish(pgsql_conn); + pgsql_conn = NULL; + } + if (MyRS) { + delete MyRS; + MyRS = NULL; + } + if (MyRS_reuse) { + delete MyRS_reuse; + MyRS_reuse = NULL; + } + for (auto i = 0; i < SQL_NAME_LAST_HIGH_WM; i++) { + if (variables[i].value) { + free(variables[i].value); + variables[i].value = NULL; + var_hash[i] = 0; + } + } + + if (connected_host_details.hostname) { + free(connected_host_details.hostname); + connected_host_details.hostname = NULL; + } + if (connected_host_details.ip) { + free(connected_host_details.ip); + connected_host_details.hostname = NULL; + } + +} + +void PgSQL_Connection::next_event(PG_ASYNC_ST new_st) { +#ifdef DEBUG + int fd; +#endif /* DEBUG */ + wait_events = 0; + + if (async_exit_status & PG_EVENT_READ) + wait_events |= POLLIN; + if (async_exit_status & PG_EVENT_WRITE) + wait_events |= POLLOUT; + if (wait_events) +#ifdef DEBUG + fd = PQsocket(pgsql_conn); +#else + PQsocket(pgsql_conn); +#endif /* DEBUG */ + else +#ifdef DEBUG + fd = -1; +#endif /* DEBUG */ + + proxy_debug(PROXY_DEBUG_NET, 8, "fd=%d, wait_events=%d , old_ST=%d, new_ST=%d\n", fd, wait_events, async_state_machine, new_st); + async_state_machine = new_st; +}; + + +PG_ASYNC_ST PgSQL_Connection::handler(short event) { + unsigned long long processed_bytes = 0; // issue #527 : this variable will store the amount of bytes processed during this event + if (pgsql_conn == NULL) { + // it is the first time handler() is being called + async_state_machine = ASYNC_CONNECT_START; + myds->wait_until = myds->sess->thread->curtime + mysql_thread___connect_timeout_server * 1000; + if (myds->max_connect_time) { + if (myds->wait_until > myds->max_connect_time) { + myds->wait_until = myds->max_connect_time; + } + } + } +handler_again: + proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6, "async_state_machine=%d\n", async_state_machine); + switch (async_state_machine) { + case ASYNC_CONNECT_START: + connect_start(); + if (async_exit_status) { + next_event(ASYNC_CONNECT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_CONNECT_END); + } + break; + case ASYNC_CONNECT_CONT: + if (event) { + connect_cont(event); + } + if (async_exit_status) { + if (myds->sess->thread->curtime >= myds->wait_until) { + NEXT_IMMEDIATE(ASYNC_CONNECT_TIMEOUT); + } + next_event(ASYNC_CONNECT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_CONNECT_END); + } + break; + case ASYNC_CONNECT_END: + if (myds) { + if (myds->sess) { + if (myds->sess->thread) { + unsigned long long curtime = monotonic_time(); + myds->sess->thread->atomic_curtime = curtime; + } + } + } + if (get_error_type() != PG_NO_ERROR) { + // always increase the counter + proxy_error("Failed to PQconnectStart() on %u:%s:%d , FD (Conn:%d , MyDS:%d) , %d: %s.\n", parent->myhgc->hid, parent->address, parent->port, PQsocket(pgsql_conn), myds->fd, PQstatus(pgsql_conn), get_error_message().c_str()); + NEXT_IMMEDIATE(ASYNC_CONNECT_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_CONNECT_SUCCESSFUL); + } + break; + case ASYNC_CONNECT_SUCCESSFUL: + if (is_connected()) { + //if (pgsql->options.use_ssl == 1) + // if (myds) + // if (myds->sess != NULL) + // if (myds->sess->session_fast_forward == true) { + // assert(myds->ssl == NULL); + // if (myds->ssl == NULL) { + // // check the definition of P_MARIADB_TLS + // /* P_MARIADB_TLS* matls = (P_MARIADB_TLS*)pgsql->net.pvio->ctls; + // if (matls != NULL) { + // myds->encrypted = true; + // myds->ssl = (SSL *)matls->ssl; + // myds->rbio_ssl = BIO_new(BIO_s_mem()); + // myds->wbio_ssl = BIO_new(BIO_s_mem()); + // SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl); + // } else { + // // if pgsql->options.use_ssl == 1 but matls == NULL + // // it means that ProxySQL tried to use SSL to connect to the backend + // // but the backend didn't support SSL + // } + // */ + // } + // } + } + __sync_fetch_and_add(&PgHGM->status.server_connections_connected, 1); + __sync_fetch_and_add(&parent->connect_OK, 1); + + if (PQisnonblocking(pgsql_conn) == false) { + // Set non-blocking mode + if (PQsetnonblocking(pgsql_conn, 1) != 0) { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_CONNECT_FAILED, errmsg); + proxy_error("Failed to set non-blocking mode: %s\n", errmsg.c_str()); + NEXT_IMMEDIATE(ASYNC_CONNECT_FAILED); + } + } + //MySQL_Monitor::update_dns_cache_from_mysql_conn(pgsql); + break; + case ASYNC_CONNECT_FAILED: + PQfinish(pgsql_conn);//release connection even on error + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + parent->connect_error(mysql_errno(pgsql)); + break; + case ASYNC_CONNECT_TIMEOUT: + // to fix + PQfinish(pgsql_conn);//release connection + proxy_error("Connect timeout on %s:%d : exceeded by %lluus\n", parent->address, parent->port, myds->sess->thread->curtime - myds->wait_until); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + parent->connect_error(mysql_errno(pgsql)); + break; + case ASYNC_SET_AUTOCOMMIT_START: + //set_autocommit_start(); + async_exit_status = PG_EVENT_NONE; + if (async_exit_status) { + next_event(ASYNC_SET_AUTOCOMMIT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END); + } + break; + case ASYNC_SET_AUTOCOMMIT_CONT: + //set_autocommit_cont(event); + async_exit_status = PG_EVENT_NONE; + if (async_exit_status) { + next_event(ASYNC_SET_AUTOCOMMIT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END); + } + break; + case ASYNC_SET_AUTOCOMMIT_END: + if (/*ret_bool*/ false) { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_SUCCESSFUL); + } + break; + case ASYNC_SET_AUTOCOMMIT_SUCCESSFUL: + //options.last_set_autocommit = (options.autocommit ? 1 : 0); // we successfully set autocommit + //if ((pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) && options.autocommit == false) { + // proxy_warning("It seems we are hitting bug http://bugs.pgsql.com/bug.php?id=66884\n"); + //} + break; + case ASYNC_SET_AUTOCOMMIT_FAILED: + //fprintf(stderr,"%s\n",mysql_error(pgsql)); + //proxy_error("Failed SET AUTOCOMMIT: %s\n", mysql_error(pgsql)); + //PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + break; + + case ASYNC_QUERY_START: + query_start(); + __sync_fetch_and_add(&parent->queries_sent, 1); + __sync_fetch_and_add(&parent->bytes_sent, query.length); + statuses.questions++; + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.length; + myds->bytes_info.bytes_sent += query.length; + bytes_info.bytes_sent += query.length; + if (myds->sess->with_gtid == true) { + __sync_fetch_and_add(&parent->queries_gtid_sync, 1); + } + if (async_exit_status) { + next_event(ASYNC_QUERY_CONT); + } else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + case ASYNC_QUERY_CONT: + if (event) { + query_cont(event); + } + if (async_exit_status) { + next_event(ASYNC_QUERY_CONT); + } else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + case ASYNC_USE_RESULT_START: + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + fetch_result_start(); + if (async_exit_status == PG_EVENT_NONE) { + async_fetch_row_start = false; + //if (myds->sess->mirror == false) { + // if (MyRS_reuse == NULL) { + // MyRS = new PgSQL_ResultSet(); + // MyRS->init(&myds->sess->client_myds->myprot, pgsql_conn); + // } + // else { + // MyRS = MyRS_reuse; + // MyRS_reuse = NULL; + // MyRS->init(&myds->sess->client_myds->myprot, pgsql_conn); + // } + //} else { + // if (MyRS_reuse == NULL) { + // MyRS = new PgSQL_ResultSet(); + // MyRS->init(NULL, pgsql_conn); + // } + // else { + // MyRS = MyRS_reuse; + // MyRS_reuse = NULL; + // MyRS->init(NULL, pgsql_conn); + // } + //} + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } + else { + assert(0); // shouldn't ever reach here + } + break; + case ASYNC_USE_RESULT_CONT: + { + if (is_error_present()) { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + + if (myds->sess && myds->sess->client_myds && myds->sess->mirror == false && + myds->sess->status != SHOW_WARNINGS) { // see issue#4072 + unsigned int buffered_data = 0; + buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; + buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; + if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size * 8) { + next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232 + break; + } + } + + fetch_result_cont(event); + + if (async_exit_status) { + next_event(ASYNC_USE_RESULT_CONT); + break; + } + + PGresult* result = get_last_result(); + + /*if (result == NULL && async_fetch_row_start == false) { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_RESULT_FAILED, errmsg); + proxy_error("Failed to get initial result. %s\n", errmsg.c_str()); + NEXT_IMMEDIATE(ASYNC_QUERY_END); + }*/ + + if (result) { + switch (PQresultStatus(result)) { + case PGRES_COMMAND_OK: + NEXT_IMMEDIATE(ASYNC_QUERY_END); + break; + case PGRES_TUPLES_OK: + case PGRES_SINGLE_TUPLE: + break; + default: + const std::string errmsg = PQresultErrorMessage(result); + set_error(PG_RESULT_FAILED, errmsg); + proxy_error("Failed condition in PQresultStatus(). %s\n", errmsg.c_str()); + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + + if (async_fetch_row_start == false) { + async_fetch_row_start = true; + if (myds->sess->mirror == false) { + if (MyRS_reuse == NULL) { + MyRS = new PgSQL_ResultSet(); + MyRS->init(&myds->sess->client_myds->myprot, pgsql_conn); + } + else { + MyRS = MyRS_reuse; + MyRS_reuse = NULL; + MyRS->init(&myds->sess->client_myds->myprot, pgsql_conn); + } + } + else { + if (MyRS_reuse == NULL) { + MyRS = new PgSQL_ResultSet(); + MyRS->init(NULL, pgsql_conn); + } + else { + MyRS = MyRS_reuse; + MyRS_reuse = NULL; + MyRS->init(NULL, pgsql_conn); + } + } + MyRS->add_row_description(result); + } + + if (PQntuples(result) > 0) { + + unsigned int br = MyRS->add_row(result); + __sync_fetch_and_add(&parent->bytes_recv, br); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; + myds->bytes_info.bytes_recv += br; + bytes_info.bytes_recv += br; + processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event + if ( + (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size * 8) + || + (mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)mysql_thread___throttle_ratio_server_to_client)) + ) { + next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause + } + else { + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping + } + } else { + MyRS->add_eof(result); + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } + } + + if (MyRS->resultset_completed == false) { + if (myds) { + MyRS->add_err(myds); + } + } + // we reach here if there was no error + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + break; + case ASYNC_QUERY_END: + PROXY_TRACE2(); + if (is_error_present()) { + compute_unknown_transaction_status(); + } else { + unknown_transaction_status = false; + } + reset_last_result(); + break; +/* case ASYNC_CHANGE_USER_START: + change_user_start(); + if (async_exit_status) { + next_event(ASYNC_CHANGE_USER_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END); + } + break; + case ASYNC_CHANGE_USER_CONT: + assert(myds->sess->status == CHANGING_USER_SERVER || myds->sess->status == RESETTING_CONNECTION); + change_user_cont(event); + if (async_exit_status) { + if (myds->sess->thread->curtime >= myds->wait_until) { + NEXT_IMMEDIATE(ASYNC_CHANGE_USER_TIMEOUT); + } + else { + next_event(ASYNC_CHANGE_USER_CONT); + } + } + else { + NEXT_IMMEDIATE(ASYNC_CHANGE_USER_END); + } + break; + case ASYNC_CHANGE_USER_END: + if (ret_bool) { + NEXT_IMMEDIATE(ASYNC_CHANGE_USER_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_CHANGE_USER_SUCCESSFUL); + } + break; + case ASYNC_CHANGE_USER_SUCCESSFUL: + pgsql->server_status = SERVER_STATUS_AUTOCOMMIT; // we reset this due to bug https://jira.mariadb.org/browse/CONC-332 + break; + case ASYNC_CHANGE_USER_FAILED: + break; + case ASYNC_CHANGE_USER_TIMEOUT: + break; + case ASYNC_PING_START: + ping_start(); + if (async_exit_status) { + next_event(ASYNC_PING_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_PING_END); + } + break; + case ASYNC_PING_CONT: + assert(myds->sess->status == PINGING_SERVER); + if (event) { + ping_cont(event); + } + if (async_exit_status) { + if (myds->sess->thread->curtime >= myds->wait_until) { + NEXT_IMMEDIATE(ASYNC_PING_TIMEOUT); + } + else { + next_event(ASYNC_PING_CONT); + } + } + else { + NEXT_IMMEDIATE(ASYNC_PING_END); + } + break; + case ASYNC_PING_END: + if (interr) { + NEXT_IMMEDIATE(ASYNC_PING_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_PING_SUCCESSFUL); + } + break; + case ASYNC_PING_SUCCESSFUL: + break; + case ASYNC_PING_FAILED: + break; + case ASYNC_PING_TIMEOUT: + break; + case ASYNC_QUERY_START: + real_query_start(); + __sync_fetch_and_add(&parent->queries_sent, 1); + __sync_fetch_and_add(&parent->bytes_sent, query.length); + statuses.questions++; + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.length; + myds->bytes_info.bytes_sent += query.length; + bytes_info.bytes_sent += query.length; + if (myds->sess->with_gtid == true) { + __sync_fetch_and_add(&parent->queries_gtid_sync, 1); + } + if (async_exit_status) { + next_event(ASYNC_QUERY_CONT); + } + else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + case ASYNC_QUERY_CONT: + real_query_cont(event); + if (async_exit_status) { + next_event(ASYNC_QUERY_CONT); + } + else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + + case ASYNC_STMT_PREPARE_START: + stmt_prepare_start(); + __sync_fetch_and_add(&parent->queries_sent, 1); + __sync_fetch_and_add(&parent->bytes_sent, query.length); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.length; + myds->bytes_info.bytes_sent += query.length; + bytes_info.bytes_sent += query.length; + if (async_exit_status) { + next_event(ASYNC_STMT_PREPARE_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END); + } + break; + case ASYNC_STMT_PREPARE_CONT: + stmt_prepare_cont(event); + if (async_exit_status) { + next_event(ASYNC_STMT_PREPARE_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_END); + } + break; + + case ASYNC_STMT_PREPARE_END: + if (interr) { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_PREPARE_SUCCESSFUL); + } + break; + case ASYNC_STMT_PREPARE_SUCCESSFUL: + break; + case ASYNC_STMT_PREPARE_FAILED: + break; + + case ASYNC_STMT_EXECUTE_START: + PROXY_TRACE2(); + stmt_execute_start(); + __sync_fetch_and_add(&parent->queries_sent, 1); + __sync_fetch_and_add(&parent->bytes_sent, query.stmt_meta->size); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_sent] += query.stmt_meta->size; + myds->bytes_info.bytes_sent += query.stmt_meta->size; + bytes_info.bytes_sent += query.stmt_meta->size; + if (async_exit_status) { + next_event(ASYNC_STMT_EXECUTE_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START); + } + break; + case ASYNC_STMT_EXECUTE_CONT: + PROXY_TRACE2(); + stmt_execute_cont(event); + if (async_exit_status) { + next_event(ASYNC_STMT_EXECUTE_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_START); + } + break; + + case ASYNC_STMT_EXECUTE_STORE_RESULT_START: + PROXY_TRACE2(); + if (mysql_stmt_errno(query.stmt)) { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + { + query.stmt_result = mysql_stmt_result_metadata(query.stmt); + if (query.stmt_result == NULL) { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + else { + update_warning_count_from_statement(); + if (myds->sess->mirror == false) { + if (MyRS_reuse == NULL) { + MyRS = new MySQL_ResultSet(); + MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, pgsql, query.stmt); + } + else { + MyRS = MyRS_reuse; + MyRS_reuse = NULL; + MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, pgsql, query.stmt); + } + } + else { + + } + //async_fetch_row_start=false; + } + } + stmt_execute_store_result_start(); + if (async_exit_status) { + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + break; + case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT: + PROXY_TRACE2(); + { // this copied mostly from ASYNC_USE_RESULT_CONT + if (myds->sess && myds->sess->client_myds && myds->sess->mirror == false) { + unsigned int buffered_data = 0; + buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; + buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; + if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size * 8) { + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause . See #1232 + break; + } + } + } + stmt_execute_store_result_cont(event); + //if (async_fetch_row_start==false) { + // async_fetch_row_start=true; + //} + if (async_exit_status) { + // this copied mostly from ASYNC_USE_RESULT_CONT + MYSQL_ROWS* r = query.stmt->result.data; + long long unsigned int rows_read_inner = 0; + + if (r) { + rows_read_inner++; + while (rows_read_inner < query.stmt->result.rows) { + // it is very important to check rows_read_inner FIRST + // because r->next could point to an invalid memory + rows_read_inner++; + r = r->next; + } + if (rows_read_inner > 1) { + process_rows_in_ASYNC_STMT_EXECUTE_STORE_RESULT_CONT(processed_bytes); + if ( + (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size * 8) + || + (mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)mysql_thread___throttle_ratio_server_to_client)) + ) { + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we continue looping + } + } + } + next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END); + } + break; + case ASYNC_STMT_EXECUTE_END: + PROXY_TRACE2(); + { + if (query.stmt_result) { + unsigned long long total_size = 0; + MYSQL_ROWS* r = query.stmt->result.data; + if (r) { + total_size += r->length; + if (r->length > 0xFFFFFF) { + total_size += (r->length / 0xFFFFFF) * sizeof(mysql_hdr); + } + total_size += sizeof(mysql_hdr); + while (r->next) { + r = r->next; + total_size += r->length; + if (r->length > 0xFFFFFF) { + total_size += (r->length / 0xFFFFFF) * sizeof(mysql_hdr); + } + total_size += sizeof(mysql_hdr); + } + } + __sync_fetch_and_add(&parent->bytes_recv, total_size); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += total_size; + myds->bytes_info.bytes_recv += total_size; + bytes_info.bytes_recv += total_size; + } + } + + update_warning_count_from_statement(); + break; + // case ASYNC_STMT_EXECUTE_SUCCESSFUL: + // break; + // case ASYNC_STMT_EXECUTE_FAILED: + // break; + + case ASYNC_NEXT_RESULT_START: + async_exit_status = mysql_next_result_start(&interr, pgsql); + if (async_exit_status) { + next_event(ASYNC_NEXT_RESULT_CONT); + } + else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + + case ASYNC_NEXT_RESULT_CONT: + if (event) { + async_exit_status = mysql_next_result_cont(&interr, pgsql, mysql_status(event, true)); + } + if (async_exit_status) { + next_event(ASYNC_NEXT_RESULT_CONT); + } + else { +#ifdef PROXYSQL_USE_RESULT + NEXT_IMMEDIATE(ASYNC_USE_RESULT_START); +#else + NEXT_IMMEDIATE(ASYNC_STORE_RESULT_START); +#endif + } + break; + + case ASYNC_NEXT_RESULT_END: + break; +#ifndef PROXYSQL_USE_RESULT + case ASYNC_STORE_RESULT_START: + if (mysql_errno(pgsql)) { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + store_result_start(); + if (async_exit_status) { + next_event(ASYNC_STORE_RESULT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + break; + case ASYNC_STORE_RESULT_CONT: + store_result_cont(event); + if (async_exit_status) { + next_event(ASYNC_STORE_RESULT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + break; +#endif // PROXYSQL_USE_RESULT + case ASYNC_USE_RESULT_START: + if (mysql_errno(pgsql)) { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + mysql_result = mysql_use_result(pgsql); + if (mysql_result == NULL) { + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + else { + // since 'add_eof' utilizes 'warning_count,' we are setting the 'warning_count' here + + // Note: There is a possibility of obtaining inaccurate warning_count and server_status at this point + // if the backend server has CLIENT_DEPRECATE_EOF enabled, and the client does not support CLIENT_DEPRECATE_EOF, + // especially when the query generates a warning. This information will be included in the intermediate EOF packet. + // Correct information becomes available only after fetching all rows, + // and the warning_count and status flag details are extracted from the final OK packet. + update_warning_count_from_connection(); + if (myds->sess->mirror == false) { + if (MyRS_reuse == NULL) { + MyRS = new MySQL_ResultSet(); + MyRS->init(&myds->sess->client_myds->myprot, mysql_result, pgsql); + } + else { + MyRS = MyRS_reuse; + MyRS_reuse = NULL; + MyRS->init(&myds->sess->client_myds->myprot, mysql_result, pgsql); + } + } + else { + if (MyRS_reuse == NULL) { + MyRS = new MySQL_ResultSet(); + MyRS->init(NULL, mysql_result, pgsql); + } + else { + MyRS = MyRS_reuse; + MyRS_reuse = NULL; + MyRS->init(NULL, mysql_result, pgsql); + } + } + async_fetch_row_start = false; + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); + } + break; + case ASYNC_USE_RESULT_CONT: + { + if (myds->sess && myds->sess->client_myds && myds->sess->mirror == false && + myds->sess->status != SHOW_WARNINGS) { // see issue#4072 + unsigned int buffered_data = 0; + buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN; + buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN; + if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size * 8) { + next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232 + break; + } + } + } + if (async_fetch_row_start == false) { + async_exit_status = mysql_fetch_row_start(&mysql_row, mysql_result); + async_fetch_row_start = true; + } + else { + async_exit_status = mysql_fetch_row_cont(&mysql_row, mysql_result, mysql_status(event, true)); + } + if (async_exit_status) { + next_event(ASYNC_USE_RESULT_CONT); + } + else { + async_fetch_row_start = false; + if (mysql_row) { + if (myds && myds->sess && myds->sess->status == SHOW_WARNINGS) { + if (mysql_thread___verbose_query_error) { + PgSQL_Data_Stream* client_myds = myds->sess->client_myds; + const char* username = ""; + const char* schema = ""; + const char* client_addr = ""; + const char* digest_text = myds->sess->CurrentQuery.show_warnings_prev_query_digest.c_str(); + + if (client_myds) { + client_addr = client_myds->addr.addr ? client_myds->addr.addr : (char*)"unknown"; + + if (client_myds->myconn && client_myds->myconn->userinfo) { + username = client_myds->myconn->userinfo->username; + schema = client_myds->myconn->userinfo->schemaname; + } + } + + proxy_warning( + "Warning during query on (%d,%s,%d,%lu). User '%s@%s', schema '%s', digest_text '%s', level '%s', code '%s', message '%s'\n", + parent->myhgc->hid, parent->address, parent->port, get_mysql_thread_id(), username, client_addr, + schema, digest_text, mysql_row[0], mysql_row[1], mysql_row[2] + ); + } + else { + proxy_warning( + "Warning during query on (%d,%s,%d,%lu). Level '%s', code '%s', message '%s'\n", + parent->myhgc->hid, parent->address, parent->port, get_mysql_thread_id(), mysql_row[0], mysql_row[1], + mysql_row[2] + ); + } + } + unsigned int br = MyRS->add_row(mysql_row); + __sync_fetch_and_add(&parent->bytes_recv, br); + myds->sess->thread->status_variables.stvar[st_var_queries_backends_bytes_recv] += br; + myds->bytes_info.bytes_recv += br; + bytes_info.bytes_recv += br; + processed_bytes += br; // issue #527 : this variable will store the amount of bytes processed during this event + if ( + (processed_bytes > (unsigned int)mysql_thread___threshold_resultset_size * 8) + || + (mysql_thread___throttle_ratio_server_to_client && mysql_thread___throttle_max_bytes_per_second_to_client && (processed_bytes > (unsigned long long)mysql_thread___throttle_max_bytes_per_second_to_client / 10 * (unsigned long long)mysql_thread___throttle_ratio_server_to_client)) + ) { + next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause + } + else { + NEXT_IMMEDIATE(ASYNC_USE_RESULT_CONT); // we continue looping + } + } + else { + if (pgsql) { + int _myerrno = mysql_errno(pgsql); + if (_myerrno) { + if (myds) { + //-- MyRS->add_err(myds); + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + } + } + // since 'add_eof' utilizes 'warning_count,' we are setting the 'warning_count' here + update_warning_count_from_connection(); + // we reach here if there was no error + // exclude warning_count from the OK/EOF packet for the ‘SHOW WARNINGS’ statement + MyRS->add_eof(query.length == 13 && strncasecmp(query.ptr, "SHOW WARNINGS", 13) == 0); + NEXT_IMMEDIATE(ASYNC_QUERY_END); + } + } + break; + case ASYNC_QUERY_END: + PROXY_TRACE2(); + if (pgsql) { + int _myerrno = mysql_errno(pgsql); + if (_myerrno == 0) { + unknown_transaction_status = false; + update_warning_count_from_connection(); + } + else { + compute_unknown_transaction_status(); + } + if (_myerrno < 2000) { + // we can continue only if the error is coming from the backend. + // (or if zero) + // if the error comes from the client library, something terribly + // wrong happened and we cannot continue + if (pgsql->server_status & SERVER_MORE_RESULTS_EXIST) { + async_state_machine = ASYNC_NEXT_RESULT_START; + } + } + } + if (mysql_result) { + mysql_free_result(mysql_result); + mysql_result = NULL; + } + break; + case ASYNC_SET_AUTOCOMMIT_START: + set_autocommit_start(); + if (async_exit_status) { + next_event(ASYNC_SET_AUTOCOMMIT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END); + } + break; + case ASYNC_SET_AUTOCOMMIT_CONT: + set_autocommit_cont(event); + if (async_exit_status) { + next_event(ASYNC_SET_AUTOCOMMIT_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_END); + } + break; + case ASYNC_SET_AUTOCOMMIT_END: + if (ret_bool) { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_AUTOCOMMIT_SUCCESSFUL); + } + break; + case ASYNC_SET_AUTOCOMMIT_SUCCESSFUL: + options.last_set_autocommit = (options.autocommit ? 1 : 0); // we successfully set autocommit + if ((pgsql->server_status & SERVER_STATUS_AUTOCOMMIT) && options.autocommit == false) { + proxy_warning("It seems we are hitting bug http://bugs.pgsql.com/bug.php?id=66884\n"); + } + break; + case ASYNC_SET_AUTOCOMMIT_FAILED: + //fprintf(stderr,"%s\n",mysql_error(pgsql)); + proxy_error("Failed SET AUTOCOMMIT: %s\n", mysql_error(pgsql)); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + break; + case ASYNC_SET_NAMES_START: + set_names_start(); + if (async_exit_status) { + next_event(ASYNC_SET_NAMES_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_NAMES_END); + } + break; + case ASYNC_SET_NAMES_CONT: + set_names_cont(event); + if (async_exit_status) { + next_event(ASYNC_SET_NAMES_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_NAMES_END); + } + break; + case ASYNC_SET_NAMES_END: + if (interr) { + NEXT_IMMEDIATE(ASYNC_SET_NAMES_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_NAMES_SUCCESSFUL); + } + break; + case ASYNC_SET_NAMES_SUCCESSFUL: + break; + case ASYNC_SET_NAMES_FAILED: + //fprintf(stderr,"%s\n",mysql_error(pgsql)); + proxy_error("Failed SET NAMES: %s\n", mysql_error(pgsql)); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + break; + case ASYNC_INITDB_START: + initdb_start(); + if (async_exit_status) { + next_event(ASYNC_INITDB_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_INITDB_END); + } + break; + case ASYNC_INITDB_CONT: + initdb_cont(event); + if (async_exit_status) { + next_event(ASYNC_INITDB_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_INITDB_END); + } + break; + case ASYNC_INITDB_END: + if (interr) { + NEXT_IMMEDIATE(ASYNC_INITDB_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_INITDB_SUCCESSFUL); + } + break; + case ASYNC_INITDB_SUCCESSFUL: + break; + case ASYNC_INITDB_FAILED: + proxy_error("Failed INITDB: %s\n", mysql_error(pgsql)); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + //fprintf(stderr,"%s\n",mysql_error(pgsql)); + break; + case ASYNC_SET_OPTION_START: + set_option_start(); + if (async_exit_status) { + next_event(ASYNC_SET_OPTION_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_OPTION_END); + } + break; + case ASYNC_SET_OPTION_CONT: + set_option_cont(event); + if (async_exit_status) { + next_event(ASYNC_SET_OPTION_CONT); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_OPTION_END); + } + break; + case ASYNC_SET_OPTION_END: + if (interr) { + NEXT_IMMEDIATE(ASYNC_SET_OPTION_FAILED); + } + else { + NEXT_IMMEDIATE(ASYNC_SET_OPTION_SUCCESSFUL); + } + break; + case ASYNC_SET_OPTION_SUCCESSFUL: + break; + case ASYNC_SET_OPTION_FAILED: + proxy_error("Error setting MYSQL_OPTION_MULTI_STATEMENTS : %s\n", mysql_error(pgsql)); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::pgsql, parent->myhgc->hid, parent->address, parent->port, mysql_errno(pgsql)); + break; +*/ + default: + // not implemented yet + assert(0); + } + return async_state_machine; +} + +void PgSQL_Connection::connect_start() { + PROXY_TRACE(); + reset_error(); + async_exit_status = PG_EVENT_NONE; + + std::ostringstream conninfo; + conninfo << "postgres://"; + conninfo << userinfo->username << ":" << userinfo->password; // username and password + conninfo << "@"; + conninfo << parent->address << ":" << parent->port; // backend address and port + conninfo << "/"; + conninfo << userinfo->schemaname; // currently schemaname consists of datasename (have to improve this in future). In PostgreSQL database and schema are NOT the same. + conninfo << "?"; + //conninfo << "require_auth=" << AUTHENTICATION_METHOD_STR[pgsql_thread___authentication_method]; // authentication method + conninfo << "application_name=proxysql"; + conninfo << "&sslmode=disable"; // currently we are not supporting SSL + /*conninfo << "user=" << userinfo->username << " "; + conninfo << "password=" << userinfo->password << " "; + conninfo << "host=" << parent->address << " "; + conninfo << "port=" << parent->port << " "; + conninfo << "dbname=" << userinfo->schemaname << " "; + conninfo << "application_name=proxysql "; + conninfo << "sslmode=disable";*/ + const std::string& conninfo_str = conninfo.str(); + pgsql_conn = PQconnectStart(conninfo_str.c_str()); + //pgsql_conn = PQconnectdb(conninfo_str.c_str()); + if (pgsql_conn == NULL || PQstatus(pgsql_conn) == CONNECTION_BAD) { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_CONNECT_FAILED, errmsg); + proxy_error("Connect failed. %s\n", errmsg.c_str()); + return; + } + fd = PQsocket(pgsql_conn); + async_exit_status = PG_EVENT_WRITE; +} + +void PgSQL_Connection::connect_cont(short event) { + PROXY_TRACE(); + assert(pgsql_conn); + reset_error(); + async_exit_status = PG_EVENT_NONE; + +#if 0 + const char* message = nullptr; + switch (PQstatus(pgsql_conn)) + { + case CONNECTION_STARTED: + message = "Connecting..."; + break; + + case CONNECTION_MADE: + message = "Connected to server (waiting to send) ..."; + break; + + case CONNECTION_AWAITING_RESPONSE: + message = "Waiting for a response from the server..."; + break; + + case CONNECTION_AUTH_OK: + message = "Received authentication; waiting for backend start - up to finish..."; + break; + + case CONNECTION_SSL_STARTUP: + message = "Negotiating SSL encryption..."; + break; + + case CONNECTION_SETENV: + message = "Negotiating environment-driven parameter settings..."; + break; + + default: + message = "Connecting..."; + } + + proxy_info("Connection status: %d %s\n", PQsocket(pgsql_conn), message); +#endif + + PostgresPollingStatusType poll_res = PQconnectPoll(pgsql_conn); + switch (poll_res) { + case PGRES_POLLING_WRITING: + async_exit_status = PG_EVENT_WRITE; + break; + case PGRES_POLLING_ACTIVE: + case PGRES_POLLING_READING: + async_exit_status = PG_EVENT_READ; + break; + case PGRES_POLLING_FAILED: + case PGRES_POLLING_OK: + async_exit_status = PG_EVENT_NONE; + break; + default: + { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_CONNECT_FAILED, errmsg); + proxy_error("Connect failed. %s\n", errmsg.c_str()); + return; + } + } +} + +void PgSQL_Connection::query_start() { + PROXY_TRACE(); + reset_error(); + async_exit_status = PG_EVENT_NONE; + if (PQsendQuery(pgsql_conn, query.ptr) == 0) { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_QUERY_FAILED, errmsg); + proxy_error("Failed to send query. %s\n", errmsg.c_str()); + return; + } + flush(); +} + +void PgSQL_Connection::query_cont(short event) { + PROXY_TRACE(); + proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6, "event=%d\n", event); + reset_error(); + async_exit_status = PG_EVENT_NONE; + if (event & POLLOUT) { + flush(); + } +} + +void PgSQL_Connection::fetch_result_start() { + PROXY_TRACE(); + reset_error(); + async_exit_status = PG_EVENT_NONE; + if (PQsetSingleRowMode(pgsql_conn) == 0) { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_RESULT_FAILED, errmsg); + proxy_error("Failed to set single row mode. %s\n", errmsg.c_str()); + return; + } +} + +void PgSQL_Connection::fetch_result_cont(short event) { + PROXY_TRACE(); + reset_error(); + async_exit_status = PG_EVENT_NONE; + + if (PQconsumeInput(pgsql_conn) == 0) { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_RESULT_FAILED, errmsg); + proxy_error("Failed to consume input. %s\n", errmsg.c_str()); + return; + } + + if (PQisBusy(pgsql_conn)) { + async_exit_status = PG_EVENT_READ; + return; + } + + set_last_result(PQgetResult(pgsql_conn)); +} + +void PgSQL_Connection::flush() { + reset_error(); + int res = PQflush(pgsql_conn); + + if (res > 0) { + async_exit_status = PG_EVENT_WRITE; + } + else if (res == 0) { + async_exit_status = PG_EVENT_READ; + } + else { + const std::string errmsg = PQerrorMessage(pgsql_conn); + set_error(PG_QUERY_FAILED, errmsg); + proxy_error("Failed to flush data backend. %s\n", errmsg.c_str()); + async_exit_status = PG_EVENT_NONE; + } +} + +int PgSQL_Connection::async_connect(short event) { + PROXY_TRACE(); + if (pgsql_conn == NULL && async_state_machine != ASYNC_CONNECT_START) { + // LCOV_EXCL_START + assert(0); + // LCOV_EXCL_STOP + } + if (async_state_machine == ASYNC_IDLE) { + myds->wait_until = 0; + return 0; + } + if (async_state_machine == ASYNC_CONNECT_SUCCESSFUL) { + compute_unknown_transaction_status(); + async_state_machine = ASYNC_IDLE; + myds->wait_until = 0; + creation_time = monotonic_time(); + return 0; + } + handler(event); + switch (async_state_machine) { + case ASYNC_CONNECT_SUCCESSFUL: + compute_unknown_transaction_status(); + async_state_machine = ASYNC_IDLE; + myds->wait_until = 0; + return 0; + break; + case ASYNC_CONNECT_FAILED: + return -1; + break; + case ASYNC_CONNECT_TIMEOUT: + return -2; + break; + default: + return 1; + } + return 1; +} + +bool PgSQL_Connection::is_connected() const { + if (pgsql_conn == nullptr || PQstatus(pgsql_conn) != CONNECTION_OK) { + return false; + } + return true; +} + +std::string PgSQL_Connection::get_error_code_from_result() const { + assert(pgsql_conn); + std::string error_code{}; + if (last_result != nullptr) { + ExecStatusType status = PQresultStatus(last_result); + if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { + error_code = PQresultErrorField(last_result, PG_DIAG_SQLSTATE); + } + } + + return error_code; +} + +void PgSQL_Connection::compute_unknown_transaction_status() { + + if (is_connected()) { + const std::string& errocode = get_error_code_from_result(); + + if (errocode.empty()) { + unknown_transaction_status = false; // no error + return; + } + if (errocode[0] == 'C') { // client error + // do not change it + return; + } + if (errocode[0] == 'P') { // server error + unknown_transaction_status = true; + return; + } + // all other cases, server error + unknown_transaction_status = true; + } +} + +void PgSQL_Connection::async_free_result() { + PROXY_TRACE(); + assert(pgsql_conn); + + if (query.ptr) { + query.ptr = NULL; + query.length = 0; + } + if (query.stmt_result) { + mysql_free_result(query.stmt_result); + query.stmt_result = NULL; + } + if (userinfo) { + // if userinfo is NULL , the connection is being destroyed + // because it is reset on destructor ( ~PgSQL_Connection() ) + // therefore this section is skipped completely + // this should prevent bug #1046 + //if (query.stmt) { + // if (query.stmt->mysql) { + // if (query.stmt->mysql == pgsql) { // extra check + // mysql_stmt_free_result(query.stmt); + // } + // } + // // If we reached here from 'ASYNC_STMT_PREPARE_FAILED', the + // // prepared statement was never added to 'local_stmts', thus + // // it will never be freed when 'local_stmts' are purged. If + // // initialized, it must be freed. For more context see #3525. + // if (this->async_state_machine == ASYNC_STMT_PREPARE_FAILED) { + // if (query.stmt != NULL) { + // proxy_mysql_stmt_close(query.stmt); + // } + // } + // query.stmt = NULL; + //} + reset_last_result(); + } + compute_unknown_transaction_status(); + async_state_machine = ASYNC_IDLE; + if (MyRS) { + if (MyRS_reuse) { + delete (MyRS_reuse); + } + MyRS_reuse = MyRS; + MyRS = NULL; + } +} + +int PgSQL_Connection::async_set_autocommit(short event, bool ac) { + PROXY_TRACE(); + assert(pgsql_conn); + server_status = parent->status; // we copy it here to avoid race condition. The caller will see this + if (IsServerOffline()) + return -1; + + switch (async_state_machine) { + case ASYNC_SET_AUTOCOMMIT_SUCCESSFUL: + unknown_transaction_status = false; + async_state_machine = ASYNC_IDLE; + return 0; + break; + case ASYNC_SET_AUTOCOMMIT_FAILED: + return -1; + break; + case ASYNC_QUERY_END: + case ASYNC_IDLE: + set_autocommit(ac); + async_state_machine = ASYNC_SET_AUTOCOMMIT_START; + default: + handler(event); + break; + } + + // check again + switch (async_state_machine) { + case ASYNC_SET_AUTOCOMMIT_SUCCESSFUL: + unknown_transaction_status = false; + async_state_machine = ASYNC_IDLE; + return 0; + break; + case ASYNC_SET_AUTOCOMMIT_FAILED: + return -1; + break; + default: + return 1; + break; + } + return 1; +} + +bool PgSQL_Connection::IsAutoCommit() { + bool ret = true; + /*if (pgsql) { + ret = (pgsql->server_status & SERVER_STATUS_AUTOCOMMIT); + if (ret) { + if (options.last_set_autocommit == 0) { + // it seems we hit bug http://bugs.pgsql.com/bug.php?id=66884 + // we last sent SET AUTOCOMMIT = 0 , but the server says it is 1 + // we assume that what we sent last is correct . #873 + ret = false; + } + } + else { + if (options.last_set_autocommit == -1) { + // if a connection was reset (thus last_set_autocommit==-1) + // the information related to SERVER_STATUS_AUTOCOMMIT is lost + // therefore we fall back on the safe assumption that autocommit==1 + ret = true; + } + } + }*/ + return ret; +} + +// Returns: +// 0 when the query is completed +// 1 when the query is not completed +// the calling function should check pgsql error in pgsql struct +int PgSQL_Connection::async_query(short event, char* stmt, unsigned long length, MYSQL_STMT** _stmt, stmt_execute_metadata_t* stmt_meta) { + PROXY_TRACE(); + PROXY_TRACE2(); + assert(pgsql_conn); + + server_status = parent->status; // we copy it here to avoid race condition. The caller will see this + if (IsServerOffline()) + return -1; + + if (myds) { + if (myds->DSS != STATE_MARIADB_QUERY) { + myds->DSS = STATE_MARIADB_QUERY; + } + } + switch (async_state_machine) { + case ASYNC_QUERY_END: + processing_multi_statement = false; // no matter if we are processing a multi statement or not, we reached the end + return 0; + break; + case ASYNC_IDLE: + if (myds && myds->sess) { + if (myds->sess->active_transactions == 0) { + // every time we start a query (no matter if COM_QUERY, STMT_PREPARE or otherwise) + // also a transaction starts, even if in autocommit mode + myds->sess->active_transactions = 1; + myds->sess->transaction_started_at = myds->sess->thread->curtime; + } + } + if (stmt_meta == NULL) + set_query(stmt, length); + async_state_machine = ASYNC_QUERY_START; + if (_stmt) { + query.stmt = *_stmt; + if (stmt_meta == NULL) { + async_state_machine = ASYNC_STMT_PREPARE_START; + } + else { + if (query.stmt_meta == NULL) { + query.stmt_meta = stmt_meta; + } + async_state_machine = ASYNC_STMT_EXECUTE_START; + } + } + default: + handler(event); + break; + } + + if (async_state_machine == ASYNC_QUERY_END) { + PROXY_TRACE2(); + compute_unknown_transaction_status(); + if (is_error_present()) { + return -1; + } + else { + return 0; + } + } + if (async_state_machine == ASYNC_STMT_EXECUTE_END) { + PROXY_TRACE2(); + query.stmt_meta = NULL; + async_state_machine = ASYNC_QUERY_END; + compute_unknown_transaction_status(); + if (mysql_stmt_errno(query.stmt)) { + return -1; + } + else { + return 0; + } + } + if (async_state_machine == ASYNC_STMT_PREPARE_SUCCESSFUL || async_state_machine == ASYNC_STMT_PREPARE_FAILED) { + query.stmt_meta = NULL; + compute_unknown_transaction_status(); + if (async_state_machine == ASYNC_STMT_PREPARE_FAILED) { + return -1; + } + else { + *_stmt = query.stmt; + return 0; + } + } + if (async_state_machine == ASYNC_NEXT_RESULT_START) { + // if we reached this point it measn we are processing a multi-statement + // and we need to exit to give control to MySQL_Session + processing_multi_statement = true; + return 2; + } + if (processing_multi_statement == true) { + // we are in the middle of processing a multi-statement + return 3; + } + return 1; +} + +// Returns: +// 0 when the ping is completed successfully +// -1 when the ping is completed not successfully +// 1 when the ping is not completed +// -2 on timeout +// the calling function should check pgsql error in pgsql struct +int PgSQL_Connection::async_ping(short event) { + PROXY_TRACE(); + assert(pgsql_conn); + switch (async_state_machine) { + case ASYNC_PING_SUCCESSFUL: + unknown_transaction_status = false; + async_state_machine = ASYNC_IDLE; + return 0; + break; + case ASYNC_PING_FAILED: + return -1; + break; + case ASYNC_PING_TIMEOUT: + return -2; + break; + case ASYNC_IDLE: + async_state_machine = ASYNC_PING_START; + default: + //handler(event); + async_state_machine = ASYNC_PING_SUCCESSFUL; + break; + } + + // check again + switch (async_state_machine) { + case ASYNC_PING_SUCCESSFUL: + unknown_transaction_status = false; + async_state_machine = ASYNC_IDLE; + return 0; + break; + case ASYNC_PING_FAILED: + return -1; + break; + case ASYNC_PING_TIMEOUT: + return -2; + break; + default: + return 1; + break; + } + return 1; +} + diff --git a/lib/PgSQL_Protocol.cpp b/lib/PgSQL_Protocol.cpp index ed4515bfe..798e62434 100644 --- a/lib/PgSQL_Protocol.cpp +++ b/lib/PgSQL_Protocol.cpp @@ -32,6 +32,8 @@ extern PgSQL_Authentication* GloPgAuth; void PG_pkt::make_space(unsigned int len) { + if (ownership == false) return; + if ((size + len) <= capacity) { return; } else { @@ -682,7 +684,7 @@ bool PgSQL_Protocol::process_startup_packet(unsigned char* pkt, unsigned int len if (!user || *user == '\0') { proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p. no username supplied.\n", (*myds), (*myds)->sess); - generate_error_packet(false, "no username supplied", NULL, true); + generate_error_packet(true, false, "no username supplied", NULL, true); return false; } @@ -729,7 +731,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char* if (!user || *user == '\0') { proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Client password pkt before startup packet.\n", (*myds), (*myds)->sess, user); - generate_error_packet(false, "client password pkt before startup packet", NULL, true); + generate_error_packet(true, false, "client password pkt before startup packet", NULL, true); goto __exit_process_pkt_handshake_response; } @@ -800,7 +802,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char* if (!pass || *pass == '\0') { proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Empty password returned by client.\n", (*myds), (*myds)->sess, user); - generate_error_packet(false, "empty password returned by client", NULL, true); + generate_error_packet(true, false, "empty password returned by client", NULL, true); break; } @@ -836,7 +838,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char* proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Selected SASL mechanism: %s.\n", (*myds), (*myds)->sess, user, mech); if (strcmp(mech, "SCRAM-SHA-256") != 0) { proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Client selected an invalid SASL authentication mechanism: %s.\n", (*myds), (*myds)->sess, user, mech); - generate_error_packet(false, "client selected an invalid SASL authentication mechanism", NULL, true); + generate_error_packet(true, false, "client selected an invalid SASL authentication mechanism", NULL, true); break; } @@ -856,7 +858,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char* if (!scram_handle_client_first(&(*myds)->scram_state, &stored_user_info, ((const unsigned char*)hdr.data.ptr) + read_pos, length)) { proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. SASL authentication failed\n", (*myds), (*myds)->sess, user); - generate_error_packet(false, "SASL authentication failed", NULL, true); + generate_error_packet(true,false, "SASL authentication failed", NULL, true); break; } @@ -902,7 +904,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char* } } else { proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. User not found in the database.\n", (*myds), (*myds)->sess, user); - generate_error_packet(false, "User not found", NULL, true); + generate_error_packet(true,false, "User not found", NULL, true); } // set the default session charset //(*myds)->sess->default_charset = charset; @@ -1004,10 +1006,13 @@ void PgSQL_Protocol::welcome_client() { //(*myds)->sess->status = WAITING_CLIENT_DATA; } -void PgSQL_Protocol::generate_error_packet(bool send_ready, const char* msg, const char* code, bool fatal) { +void PgSQL_Protocol::generate_error_packet(bool send, bool ready, const char* msg, const char* code, bool fatal, PtrSize_t* _ptr) { + // to avoid memory leak + assert(send == true || _ptr); + PG_pkt pgpkt{}; - if (send_ready) + if (ready) pgpkt.set_multi_pkt_mode(true); pgpkt.write_generic('E', "cscscscsc", @@ -1015,31 +1020,39 @@ void PgSQL_Protocol::generate_error_packet(bool send_ready, const char* msg, con 'V', fatal ? "FATAL" : "ERROR", 'C', code ? code : "08P01", 'M', msg, 0); - if (send_ready) { + if (ready) { pgpkt.write_ReadyForQuery(); pgpkt.set_multi_pkt_mode(false); } + auto buff = pgpkt.detach(); - (*myds)->PSarrayOUT->add((void*)buff.first, buff.second); - switch ((*myds)->DSS) { - case STATE_SERVER_HANDSHAKE: - case STATE_CLIENT_HANDSHAKE: - case STATE_QUERY_SENT_DS: - case STATE_QUERY_SENT_NET: - case STATE_ERR: - (*myds)->DSS = STATE_ERR; - break; - case STATE_OK: - break; - case STATE_SLEEP: - if ((*myds)->sess->session_fast_forward == true) { // see issue #733 + if (send) { + (*myds)->PSarrayOUT->add((void*)buff.first, buff.second); + switch ((*myds)->DSS) { + case STATE_SERVER_HANDSHAKE: + case STATE_CLIENT_HANDSHAKE: + case STATE_QUERY_SENT_DS: + case STATE_QUERY_SENT_NET: + case STATE_ERR: + (*myds)->DSS = STATE_ERR; + break; + case STATE_OK: break; + case STATE_SLEEP: + if ((*myds)->sess->session_fast_forward == true) { // see issue #733 + break; + } + default: + // LCOV_EXCL_START + assert(0); + // LCOV_EXCL_STOP } - default: - // LCOV_EXCL_START - assert(0); - // LCOV_EXCL_STOP + } + + if (_ptr) { + _ptr->ptr = buff.first; + _ptr->size = buff.second; } } @@ -1184,11 +1197,13 @@ char* extract_tag_from_query(const char* query) { } -bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query) { +bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query, PtrSize_t* _ptr) { + // to avoid memory leak + assert(send == true || _ptr); PG_pkt pgpkt{}; - if (send == true) { + if (ready == true) { pgpkt.set_multi_pkt_mode(true); } @@ -1199,8 +1214,7 @@ bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg, if (strcmp(tag, "INSERT") == 0) { sprintf(tmpbuf, "%s 0 %d", tag, rows); pgpkt.write_CommandComplete(tmpbuf); - } - else if (strcmp(tag, "UPDATE") == 0 || + } else if (strcmp(tag, "UPDATE") == 0 || strcmp(tag, "DELETE") == 0 || strcmp(tag, "MERGE") == 0 || strcmp(tag, "MOVE") == 0 || @@ -1210,21 +1224,492 @@ bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg, strcmp(tag, "COPY") == 0 ) { sprintf(tmpbuf, "%s %d", tag, rows); pgpkt.write_CommandComplete(tmpbuf); - } - else { + } else { pgpkt.write_CommandComplete(tag); } if (ready == true) { pgpkt.write_ReadyForQuery(); + pgpkt.set_multi_pkt_mode(false); } + auto buff = pgpkt.detach(); if (send == true) { - pgpkt.set_multi_pkt_mode(false); - auto buff = pgpkt.detach(); (*myds)->PSarrayOUT->add((void*)buff.first, buff.second); + } else { + _ptr->ptr = buff.first; + _ptr->size = buff.second; } - free(tag); return true; } + +//bool PgSQL_Protocol::generate_row_description(bool send, PgSQL_ResultSet* rs, const PG_Fields& fields, unsigned int size) { +// if ((*myds)->sess->mirror == true) { +// return true; +// } +// +// unsigned char* _ptr = NULL; +// +// if (rs) { +// if (size <= (RESULTSET_BUFLEN - rs->buffer_used)) { +// // there is space in the buffer, add the data to it +// _ptr = rs->buffer + rs->buffer_used; +// rs->buffer_used += size; +// } else { +// // there is no space in the buffer, we flush the buffer and recreate it +// rs->buffer_to_PSarrayOut(); +// // now we can check again if there is space in the buffer +// if (size <= (RESULTSET_BUFLEN - rs->buffer_used)) { +// // there is space in the NEW buffer, add the data to it +// _ptr = rs->buffer + rs->buffer_used; +// rs->buffer_used += size; +// } else { +// // a new buffer is not enough to store the new row +// _ptr = (unsigned char*)l_alloc(size); +// } +// } +// } else { +// _ptr = (unsigned char*)l_alloc(size); +// } +// +// PG_pkt pgpkt(_ptr, 0); +// +// pgpkt.put_char('T'); +// pgpkt.put_uint32(size ); +// pgpkt.put_uint16(fields.size()); +// +// for (unsigned int i = 0; i < fields.size(); i++) { +// pgpkt.put_string(fields[i].name); +// pgpkt.put_uint32(fields[i].tbl_oid); +// pgpkt.put_uint16(fields[i].col_idx); +// pgpkt.put_uint32(fields[i].type_oid); +// pgpkt.put_uint16(fields[i].col_len); +// pgpkt.put_uint32(fields[i].type_mod); +// pgpkt.put_uint16(fields[i].fmt); +// } +// +// if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); } +// +////#ifdef DEBUG +//// if (dump_pkt) { __dump_pkt(__func__, _ptr, size); } +////#endif +// if (rs) { +// if (_ptr >= rs->buffer && _ptr < rs->buffer + RESULTSET_BUFLEN) { +// // we are writing within the buffer, do not add to PSarrayOUT +// } else { +// // we are writing outside the buffer, add to PSarrayOUT +// rs->PSarrayOUT.add(_ptr, size); +// } +// } +// return true; +//} + +unsigned int PgSQL_Protocol::copy_row_description_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result) { + if ((*myds)->sess->mirror == true) { + return true; + } + + assert(pg_rs); + assert(result); + + unsigned int fields_cnt = PQnfields(result); + unsigned int size = 1 + 4 + 2; + for (int i = 0; i < fields_cnt; i++) { + size += strlen(PQfname(result, i)) + 1 + 18; // null terminator, name, reloid, colnr, oid, typsize, typmod, fmt + } + + unsigned char* _ptr = NULL; + + unsigned int available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used); + if (size <= available_capacity) { + // there is space in the buffer, add the data to it + _ptr = pg_rs->buffer + pg_rs->buffer_used; + pg_rs->buffer_used += size; + } else { + // there is no space in the buffer, we flush the buffer and recreate it + pg_rs->buffer_to_PSarrayOut(); + // now we can check again if there is space in the buffer + available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used); + if (size <= available_capacity) { + // there is space in the NEW buffer, add the data to it + _ptr = pg_rs->buffer + pg_rs->buffer_used; + pg_rs->buffer_used += size; + } else { + // a new buffer is not enough to store the new row + _ptr = (unsigned char*)l_alloc(size); + available_capacity = size; + } + } + + assert(_ptr); + PG_pkt pgpkt(_ptr, available_capacity); + + pgpkt.put_char('T'); + pgpkt.put_uint32(size - 1); + pgpkt.put_uint16(fields_cnt); + + for (unsigned int i = 0; i < fields_cnt; i++) { + pgpkt.put_string(PQfname(result, i)); + pgpkt.put_uint32(PQftable(result, i)); + pgpkt.put_uint16(PQftablecol(result, i)); + pgpkt.put_uint32(PQftype(result, i)); + pgpkt.put_uint16(PQfsize(result, i)); + pgpkt.put_uint32(PQfmod(result, i)); + pgpkt.put_uint16(PQfformat(result, i)); + } + + if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); } + +//#ifdef DEBUG +// if (dump_pkt) { __dump_pkt(__func__, _ptr, size); } +//#endif + if (pg_rs) { + if (_ptr >= pg_rs->buffer && _ptr < pg_rs->buffer + RESULTSET_BUFLEN) { + // we are writing within the buffer, do not add to PSarrayOUT + } + else { + // we are writing outside the buffer, add to PSarrayOUT + pg_rs->PSarrayOUT.add(_ptr, size); + } + } + + pg_rs->num_fields = fields_cnt; + pg_rs->resultset_size = size; + + return size; +} + +unsigned int PgSQL_Protocol::copy_row_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result) { + if ((*myds)->sess->mirror == true) { + return true; + } + + assert(pg_rs); + assert(result); + assert(pg_rs->num_fields); + + const unsigned int numRows = PQntuples(result); + unsigned int total_size = 0; + for (unsigned int i = 0; i < numRows; i++) { + unsigned int size = 1 + 4 + 2; // 'D', length, field count + for (int j = 0; j < pg_rs->num_fields; j++) { + size += PQgetlength(result, i, j) + 4; // length, value + } + total_size += size; + unsigned char* _ptr = NULL; + + unsigned int available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used); + if (size <= available_capacity) { + // there is space in the buffer, add the data to it + _ptr = pg_rs->buffer + pg_rs->buffer_used; + pg_rs->buffer_used += size; + } + else { + // there is no space in the buffer, we flush the buffer and recreate it + pg_rs->buffer_to_PSarrayOut(); + // now we can check again if there is space in the buffer + available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used); + if (size <= available_capacity) { + // there is space in the NEW buffer, add the data to it + _ptr = pg_rs->buffer + pg_rs->buffer_used; + pg_rs->buffer_used += size; + } + else { + // a new buffer is not enough to store the new row + _ptr = (unsigned char*)l_alloc(size); + available_capacity = size; + } + } + + assert(_ptr); + PG_pkt pgpkt(_ptr, available_capacity); + + pgpkt.put_char('D'); + pgpkt.put_uint32(size - 1); + pgpkt.put_uint16(pg_rs->num_fields); + int column_value_len = 0; + for (int j = 0; j < pg_rs->num_fields; j++) { + column_value_len = PQgetlength(result, i, j); + pgpkt.put_uint32(column_value_len); + pgpkt.put_bytes(PQgetvalue(result, i, j), column_value_len); + } + + if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); } + + pg_rs->resultset_size += size; + + if (pg_rs) { + if (_ptr >= pg_rs->buffer && _ptr < pg_rs->buffer + RESULTSET_BUFLEN) { + // we are writing within the buffer, do not add to PSarrayOUT + } + else { + // we are writing outside the buffer, add to PSarrayOUT + pg_rs->PSarrayOUT.add(_ptr, size); + } + } + } + + pg_rs->num_rows += numRows; + + return total_size; +} + +unsigned int PgSQL_Protocol::copy_eof_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result) { + if ((*myds)->sess->mirror == true) { + return true; + } + + assert(pg_rs); + assert(result); + assert(pg_rs->num_fields); + + + const char* tag = PQcmdStatus(result); + + if (!tag) assert(0); // for testing it should not be null + + const unsigned int tag_len = strlen(tag) + 1; + + unsigned int size = 1 + 4 + tag_len + 1 + 4 + 1; // 'C', length, tag, Z, length, I + + unsigned char* _ptr = NULL; + + unsigned int available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used); + if (size <= available_capacity) { + // there is space in the buffer, add the data to it + _ptr = pg_rs->buffer + pg_rs->buffer_used; + pg_rs->buffer_used += size; + } else { + // there is no space in the buffer, we flush the buffer and recreate it + pg_rs->buffer_to_PSarrayOut(); + // now we can check again if there is space in the buffer + available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used); + if (size <= available_capacity) { + // there is space in the NEW buffer, add the data to it + _ptr = pg_rs->buffer + pg_rs->buffer_used; + pg_rs->buffer_used += size; + } else { + // a new buffer is not enough to store the new row + _ptr = (unsigned char*)l_alloc(size); + available_capacity = size; + } + } + + assert(_ptr); + PG_pkt pgpkt(_ptr, available_capacity); + + pgpkt.put_char('C'); + pgpkt.put_uint32(tag_len + 4); + pgpkt.put_string(tag); + pgpkt.put_char('Z'); + pgpkt.put_uint32(4 + 1); + pgpkt.put_char('I'); + + if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); } + + pg_rs->resultset_size += size; + + if (pg_rs) { + if (_ptr >= pg_rs->buffer && _ptr < pg_rs->buffer + RESULTSET_BUFLEN) { + // we are writing within the buffer, do not add to PSarrayOUT + } else { + // we are writing outside the buffer, add to PSarrayOUT + pg_rs->PSarrayOUT.add(_ptr, size); + } + } + + return size; +} + + +PgSQL_ResultSet::PgSQL_ResultSet() { + buffer = NULL; + transfer_started = false; + resultset_completed = false; + buffer_used = 0; + resultset_size = 0; + sid = 0; + num_rows = 0; + ds = NULL; +} + +PgSQL_ResultSet::~PgSQL_ResultSet() { + PtrSize_t pkt; + while (PSarrayOUT.len) { + PSarrayOUT.remove_index_fast(0, &pkt); + l_free(pkt.size, pkt.ptr); + } + + if (buffer) { + free(buffer); + buffer = NULL; + } +} + +void PgSQL_ResultSet::buffer_init(PgSQL_Protocol* _proto) { + if (buffer == NULL) { + buffer = (unsigned char*)malloc(RESULTSET_BUFLEN); + } + buffer_used = 0; + proto = _proto; +} + +void PgSQL_ResultSet::init(PgSQL_Protocol* _proto, PGconn* _conn) { + PROXY_TRACE2(); + transfer_started = false; + resultset_completed = false; + //resultset_size = 0; + //sid = 0; + num_rows = 0; + proto = _proto; + result = NULL; + pgsql_conn = _conn; + buffer_init(_proto); + + if (proto) { // if proto = NULL , this is a mirror + ds = proto->get_myds(); + sid = ds->pkt_sid + 1; + } + + // immediately generate the first set of packets + // columns count + if (proto == NULL) { + return; // this is a mirror + } + + /* + num_fields = PQnfields(result); + unsigned int total_size = 0; + for (int i = 0; i < num_fields; i++) { + total_size += strlen(PQfname(result, i)) + 18; // name, reloid, colnr, oid, typsize, typmod, fmt + } + + PG_Fields fields(num_fields); + + for (int i = 0; i < num_fields; i++) { + fields[i].name = PQfname(result, i); + fields[i].tbl_oid = PQftable(result, i); + fields[i].col_idx = PQftablecol(result, i); + fields[i].type_oid = PQftype(result, i); + fields[i].col_len = PQfsize(result, i); + fields[i].type_mod = PQfmod(result, i); + fields[i].fmt = PQfformat(result, i); + } + + proto->generate_row_description(false, fields, resultset_size); + */ + +} + +unsigned int PgSQL_ResultSet::add_row_description(PGresult* result) { + + const unsigned int bytes = proto->copy_row_description_to_PgSQL_ResultSet(false, this, result); + + if (RESULTSET_BUFLEN <= (buffer_used + 9)) { + buffer_to_PSarrayOut(); + } + + return bytes; +} + +unsigned int PgSQL_ResultSet::add_row(PGresult* result) { + const unsigned int bytes = proto->copy_row_to_PgSQL_ResultSet(false,this, result); + + if (RESULTSET_BUFLEN <= (buffer_used + 9)) { + buffer_to_PSarrayOut(); + } + + return bytes; +} + +void PgSQL_ResultSet::add_err(PgSQL_Data_Stream* _myds) { + PtrSize_t pkt; + if (proto) { + buffer_to_PSarrayOut(); + + const char* sqlstate = NULL; + const char* errmsg = NULL; + + if (result) { + sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE); + errmsg = PQresultErrorMessage(result); + } + + if (_myds && _myds->killed_at) { // see case #750 + if (_myds->kill_type == 0) { + proto->generate_error_packet(false, true, (char*)"Query execution was interrupted, query_timeout exceeded", sqlstate ? sqlstate : "57014", false, &pkt); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _myds->myconn->parent->myhgc->hid, _myds->myconn->parent->address, _myds->myconn->parent->port, 1907); + } + else { + proto->generate_error_packet(false, true, (char*)"Query execution was interrupted", sqlstate ? sqlstate : "57014", false, &pkt); + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _myds->myconn->parent->myhgc->hid, _myds->myconn->parent->address, _myds->myconn->parent->port, 1317); + } + } else { + + proto->generate_error_packet(false, true, errmsg ? errmsg : "Unknown error", sqlstate, false, &pkt); + // TODO: Check this is a mysql error + PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _myds->myconn->parent->myhgc->hid, _myds->myconn->parent->address, _myds->myconn->parent->port, 1907); + } + PSarrayOUT.add(pkt.ptr, pkt.size); + sid++; + resultset_size += pkt.size; + } + resultset_completed = true; +} + +bool PgSQL_ResultSet::get_resultset(PtrSizeArray* PSarrayFinal) { + transfer_started = true; + if (proto) { + PSarrayFinal->copy_add(&PSarrayOUT, 0, PSarrayOUT.len); + while (PSarrayOUT.len) + PSarrayOUT.remove_index(PSarrayOUT.len - 1, NULL); + } + return resultset_completed; +} + +void PgSQL_ResultSet::buffer_to_PSarrayOut(bool _last) { + if (buffer_used == 0) + return; // exit immediately if the buffer is empty + if (buffer_used < RESULTSET_BUFLEN / 2) { + if (_last == false) { + buffer = (unsigned char*)realloc(buffer, buffer_used); + } + } + PSarrayOUT.add(buffer, buffer_used); + if (_last) { + buffer = NULL; + } + else { + buffer = (unsigned char*)malloc(RESULTSET_BUFLEN); + } + buffer_used = 0; +} + +unsigned long long PgSQL_ResultSet::current_size() { + unsigned long long intsize = 0; + intsize += sizeof(PgSQL_ResultSet); + intsize += RESULTSET_BUFLEN; // size of buffer + if (PSarrayOUT.len == 0) // see bug #699 + return intsize; + intsize += sizeof(PtrSizeArray); + intsize += (PSarrayOUT.size * sizeof(PtrSize_t*)); + unsigned int i; + for (i = 0; i < PSarrayOUT.len; i++) { + PtrSize_t* pkt = PSarrayOUT.index(i); + if (pkt->size > RESULTSET_BUFLEN) { + intsize += pkt->size; + } + else { + intsize += RESULTSET_BUFLEN; + } + } + return intsize; +} + +unsigned int PgSQL_ResultSet::add_eof(PGresult* result) { + const unsigned int bytes = proto->copy_eof_to_PgSQL_ResultSet(false, this, result); + buffer_to_PSarrayOut(); + resultset_completed = true; + return bytes; +} diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index 87bb16920..e4b9990eb 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -3039,7 +3039,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) { mybe->server_myds->wait_until = thread->curtime + mysql_thread___connect_timeout_server * 1000; pause_until = 0; } - if (mybe->server_myds->max_connect_time) { + if (mybe->server_myds->max_connect_time ) { if (thread->curtime >= mybe->server_myds->max_connect_time) { if (mirror) { PROXY_TRACE(); @@ -3057,7 +3057,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) { if (thread) { thread->status_variables.stvar[st_var_max_connect_timeout_err]++; } - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 9001, (char*)"HY000", errmsg.c_str(), true); + client_myds->myprot.generate_error_packet(true, true, errmsg.c_str(), "57P03", false); // not sure if this is the right error code RequestEnd(mybe->server_myds); string hg_status{}; @@ -3201,7 +3201,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) { else { char buf[256]; sprintf(buf, "Max connect failure while reaching hostgroup %d", current_hostgroup); - client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 9002, (char*)"HY000", buf, true); + client_myds->myprot.generate_error_packet(true,true,buf, "57P03", false); // not sure if this is the right error code if (thread) { thread->status_variables.stvar[st_var_max_connect_timeout_err]++; } @@ -4050,8 +4050,194 @@ __get_pkts_from_client: } } else { - proxy_error("Experimental feature"); - assert(0); + char command = c = *((unsigned char*)pkt.ptr + 0); + switch (command) { + case 'Q': + { + __sync_add_and_fetch(&thread->status_variables.stvar[st_var_queries], 1); + if (session_type == PROXYSQL_SESSION_PGSQL) { + bool rc_break = false; + bool lock_hostgroup = false; + if (session_fast_forward == false) { + // Note: CurrentQuery sees the query as sent by the client. + // shortly after, the packets it used to contain the query will be deallocated + CurrentQuery.begin((unsigned char*)pkt.ptr, pkt.size, true); + } + rc_break = handler_special_queries(&pkt); + if (rc_break == true) { + if (mirror == false) { + // track also special queries + //RequestEnd(NULL); + // we moved this inside handler_special_queries() + // because a pointer was becoming invalid + break; + } + else { + handler_ret = -1; + return handler_ret; + } + } + timespec begint; + timespec endt; + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); + } + qpo = GloQPro->process_mysql_query(TO_CLIENT_SESSION(this), pkt.ptr, pkt.size, TO_QUERY_INFO(&CurrentQuery)); + if (thread->variables.stats_time_query_processor) { + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); + thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + + (endt.tv_sec * 1000000000 + endt.tv_nsec) - + (begint.tv_sec * 1000000000 + begint.tv_nsec); + } + assert(qpo); // GloQPro->process_mysql_query() should always return a qpo + // This block was moved from 'handler_special_queries' to support + // handling of 'USE' statements which are preceded by a comment. + // For more context check issue: #3493. + // =================================================== + if (session_type != PROXYSQL_SESSION_CLICKHOUSE) { + const char* qd = CurrentQuery.get_digest_text(); + bool use_db_query = false; + + if (qd != NULL) { + if ( + (strncasecmp((char*)"USE", qd, 3) == 0) + && + ( + (strncasecmp((char*)"USE ", qd, 4) == 0) + || + (strncasecmp((char*)"USE`", qd, 4) == 0) + ) + ) { + use_db_query = true; + } + } + else { + if (pkt.size > (5 + 4) && strncasecmp((char*)"USE ", (char*)pkt.ptr + 5, 4) == 0) { + use_db_query = true; + } + } + + if (use_db_query) { + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_USE_DB(&pkt); + + if (mirror == false) { + break; + } + else { + handler_ret = -1; + return handler_ret; + } + } + } + // =================================================== + if (qpo->max_lag_ms >= 0) { + thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; + } + rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup); + if (mirror == false && rc_break == false) { + if (mysql_thread___automatic_detect_sqli) { + if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_detect_SQLi()) { + handler_ret = -1; + return handler_ret; + } + } + } + if (rc_break == true) { + if (mirror == false) { + break; + } + else { + handler_ret = -1; + return handler_ret; + } + } + if (mirror == false) { + handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); + } + + if (autocommit_on_hostgroup >= 0) { + } + if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6 + if (locked_on_hostgroup < 0) { + if (lock_hostgroup) { + // we are locking on hostgroup now + if (qpo->destination_hostgroup >= 0) { + if (transaction_persistent_hostgroup == -1) { + current_hostgroup = qpo->destination_hostgroup; + } + } + locked_on_hostgroup = current_hostgroup; + thread->status_variables.stvar[st_var_hostgroup_locked]++; + thread->status_variables.stvar[st_var_hostgroup_locked_set_cmds]++; + } + } + if (locked_on_hostgroup >= 0) { + if (current_hostgroup != locked_on_hostgroup) { + client_myds->DSS = STATE_QUERY_SENT_NET; + int l = CurrentQuery.QueryLength; + char* end = (char*)""; + if (l > 256) { + l = 253; + end = (char*)"..."; + } + string nqn = string((char*)CurrentQuery.QueryPointer, l); + char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s"; + char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64); + sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); + client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, 9005, (char*)"HY000", buf, true); + thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; + RequestEnd(NULL); + free(buf); + l_free(pkt.size, pkt.ptr); + break; + } + } + } + mybe = find_or_create_backend(current_hostgroup); + status = PROCESSING_QUERY; + // set query retries + mybe->server_myds->query_retries_on_failure = mysql_thread___query_retries_on_failure; + // if a number of retries is set in mysql_query_rules, that takes priority + if (qpo) { + if (qpo->retries >= 0) { + mybe->server_myds->query_retries_on_failure = qpo->retries; + } + } + mybe->server_myds->connect_retries_on_failure = mysql_thread___connect_retries_on_failure; + mybe->server_myds->wait_until = 0; + pause_until = 0; + if (mysql_thread___default_query_delay) { + pause_until = thread->curtime + mysql_thread___default_query_delay * 1000; + } + if (qpo) { + if (qpo->delay > 0) { + if (pause_until == 0) + pause_until = thread->curtime; + pause_until += qpo->delay * 1000; + } + } + + + proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n"); + mybe->server_myds->killed_at = 0; + mybe->server_myds->kill_type = 0; + if (GloMyLdapAuth) { + if (session_type == PROXYSQL_SESSION_PGSQL) { + if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) { + add_ldap_comment_to_pkt(&pkt); + } + } + } + mybe->server_myds->mysql_real_query.init(&pkt); + mybe->server_myds->statuses.questions++; + client_myds->setDSS_STATE_QUERY_SENT_NET(); + } + } + break; + default: + // not implemented yet + assert(0); + } } break; } @@ -4808,10 +4994,10 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds, switch (status) { case PROCESSING_QUERY: if (myconn) { - MySQL_Result_to_MySQL_wire(myconn->pgsql, myconn->MyRS, myconn->warning_count, myds); + PgSQL_Result_to_PgSQL_wire(myconn->pgsql_conn, myconn->MyRS, myconn->warning_count, myds); } else { - MySQL_Result_to_MySQL_wire(NULL, NULL, 0, myds); + PgSQL_Result_to_PgSQL_wire(NULL, NULL, 0, myds); } break; case PROCESSING_STMT_PREPARE: @@ -5154,23 +5340,23 @@ handler_again: goto handler_again; } } - if (handler_again___verify_backend_autocommit()) { - goto handler_again; - } + //if (handler_again___verify_backend_autocommit()) { + // goto handler_again; + //} if (locked_on_hostgroup == -1 || locked_on_hostgroup_and_all_variables_set == false) { if (handler_again___verify_backend_multi_statement()) { goto handler_again; } - if (handler_again___verify_backend_session_track_gtids()) { - goto handler_again; - } + //if (handler_again___verify_backend_session_track_gtids()) { + // goto handler_again; + //} // Optimize network traffic when we can use 'SET NAMES' - if (verify_set_names(this)) { - goto handler_again; - } + //if (verify_set_names(this)) { + // goto handler_again; + //} for (auto i = 0; i < SQL_NAME_LAST_LOW_WM; i++) { auto client_hash = client_myds->myconn->var_hash[i]; @@ -5265,14 +5451,14 @@ handler_again: if (rc == 0) { if (active_transactions != 0) { // run this only if currently we think there is a transaction - if ((myconn->pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) { // there is no transaction on the backend connection + if (myconn->pgsql && (myconn->pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) { // there is no transaction on the backend connection active_transactions = NumActiveTransactions(); // we check all the hostgroups/backends if (active_transactions == 0) transaction_started_at = 0; // reset it } } - handler_rc0_Process_GTID(myconn); + //handler_rc0_Process_GTID(myconn); // if we are locked on hostgroup, the value of autocommit is copied from the backend connection // see bug #3549 @@ -5282,7 +5468,7 @@ handler_again: autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT; } - if (mirror == false) { + if (mirror == false && myconn->pgsql) { // Support for LAST_INSERT_ID() if (myconn->pgsql->insert_id) { last_insert_id = myconn->pgsql->insert_id; @@ -5300,7 +5486,7 @@ handler_again: switch (status) { case PROCESSING_QUERY: - MySQL_Result_to_MySQL_wire(myconn->pgsql, myconn->MyRS, myconn->warning_count, myconn->myds); + PgSQL_Result_to_PgSQL_wire(myconn->pgsql_conn, myconn->MyRS, myconn->warning_count, myconn->myds); break; case PROCESSING_STMT_PREPARE: { @@ -5394,7 +5580,7 @@ handler_again: break; // rc==2 : a multi-resultset (or multi statement) was detected, and the current statement is completed case 2: - MySQL_Result_to_MySQL_wire(myconn->pgsql, myconn->MyRS, myconn->warning_count, myconn->myds); + PgSQL_Result_to_PgSQL_wire(myconn->pgsql_conn, myconn->MyRS, myconn->warning_count, myconn->myds); if (myconn->MyRS) { // we also need to clear MyRS, so that the next staement will recreate it if needed if (myconn->MyRS_reuse) { delete myconn->MyRS_reuse; @@ -5984,7 +6170,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( } #endif // DEBUG sprintf(_s, "ProxySQL Error: Access denied for user '%s'@'%s' (using password: %s)", client_myds->myconn->userinfo->username, client_addr, (client_myds->myconn->userinfo->password ? "YES" : "NO")); - client_myds->myprot.generate_error_packet(false, _s, "28P01", true); + client_myds->myprot.generate_error_packet(true,false, _s, "28P01", true); proxy_error("%s\n", _s); free(_s); __sync_fetch_and_add(&PgHGM->status.access_denied_wrong_password, 1); @@ -7653,7 +7839,7 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED } void PgSQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT* stmt, PgSQL_Connection* myconn) { - MySQL_ResultSet* MyRS = NULL; + PgSQL_ResultSet* MyRS = NULL; if (myconn) { if (myconn->MyRS) { MyRS = myconn->MyRS; @@ -7702,21 +7888,21 @@ void PgSQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT* stmt, PgSQL_Conn } } -void PgSQL_Session::MySQL_Result_to_MySQL_wire(MYSQL* pgsql, MySQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds) { +void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PGconn* pgsql, PgSQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds) { if (pgsql == NULL) { // error client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, 2013, (char*)"HY000", (char*)"Lost connection to MySQL server during query"); return; } if (MyRS) { - assert(MyRS->result); + //assert(MyRS->result); bool transfer_started = MyRS->transfer_started; bool resultset_completed = MyRS->get_resultset(client_myds->PSarrayOUT); CurrentQuery.rows_sent = MyRS->num_rows; bool com_field_list = client_myds->com_field_list; - assert(resultset_completed); // the resultset should always be completed if MySQL_Result_to_MySQL_wire is called - if (transfer_started == false) { // we have all the resultset when MySQL_Result_to_MySQL_wire was called - if (qpo && qpo->cache_ttl > 0 && com_field_list == false) { // the resultset should be cached + assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called + if (transfer_started == false) { // we have all the resultset when PgSQL_Result_to_PgSQL_wire was called + /*if (qpo && qpo->cache_ttl > 0 && com_field_list == false) { // the resultset should be cached if (mysql_errno(pgsql) == 0 && (mysql_warning_count(pgsql) == 0 || mysql_thread___query_cache_handle_warnings == 1)) { // no errors @@ -7748,11 +7934,10 @@ void PgSQL_Session::MySQL_Result_to_MySQL_wire(MYSQL* pgsql, MySQL_ResultSet* My client_myds->resultset_length = 0; } } - } + }*/ } - } - else { // no result set - int myerrno = mysql_errno(pgsql); + } else { // no result set + /*int myerrno = mysql_errno(pgsql); if (myerrno == 0) { unsigned int num_rows = mysql_affected_rows(pgsql); uint16_t setStatus = (active_transactions ? SERVER_STATUS_IN_TRANS : 0); @@ -7780,7 +7965,7 @@ void PgSQL_Session::MySQL_Result_to_MySQL_wire(MYSQL* pgsql, MySQL_ResultSet* My client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, mysql_errno(pgsql), sqlstate, mysql_error(pgsql)); } //client_myds->pkt_sid++; - } + }*/ } } diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 2a6870740..7bd02f156 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -5395,7 +5395,7 @@ void PgSQL_Thread::push_MyConn_local(PgSQL_Connection * c) { PgSQL_SrvC* mysrvc = NULL; mysrvc = (PgSQL_SrvC*)c->parent; // reset insert_id #1093 - c->pgsql->insert_id = 0; + //c->pgsql->insert_id = 0; if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE) { if (c->async_state_machine == ASYNC_IDLE) { cached_connections->add(c); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 7edd95d2f..fefe32310 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -160,6 +160,9 @@ static unordered_map&> module_tablenames = { { "scheduler", scheduler_tablenames }, { "proxysql_servers", proxysql_servers_tablenames }, { "restapi", restapi_tablenames }, + { "pgsql_servers", pgsql_servers_tablenames }, + { "pgsql_firewall", pgsql_firewall_tablenames }, + { "pgsql_query_rules", pgsql_query_rules_tablenames }, }; static void BQE1(SQLite3DB *db, const vector& tbs, const string& p1, const string& p2, const string& p3) { @@ -13596,7 +13599,7 @@ void ProxySQL_Admin::send_error_msg_to_client(Client_Session& sess, const cha PgSQL_Data_Stream* myds = sess->client_myds; char* new_msg = (char*)malloc(strlen(msg) + sizeof(prefix_msg)); sprintf(new_msg, "%s%s", prefix_msg, msg); - myds->myprot.generate_error_packet(false, new_msg, NULL, false); + myds->myprot.generate_error_packet(true,true, new_msg, NULL, false); free(new_msg); myds->DSS = STATE_SLEEP; } diff --git a/src/Makefile b/src/Makefile index bbe652e85..9bc9daf80 100644 --- a/src/Makefile +++ b/src/Makefile @@ -89,9 +89,9 @@ EV_PATH := $(DEPS_PATH)/libev/libev/ EV_IDIR := $(EV_PATH) EV_LDIR := $(EV_PATH)/.libs -POSTGRESQL_PATH := $(DEPS_PATH)/postgresql/postgresql/ -POSTGRESQL_IDIR := $(POSTGRESQL_PATH)/src/include -I$(POSTGRESQL_PATH)/src/interfaces/libpq -POSTGRESQL_LDIR := $(POSTGRESQL_PATH)/src/interfaces/libpq +POSTGRESQL_PATH := $(DEPS_PATH)/postgresql/postgresql/src +POSTGRESQL_IDIR := $(POSTGRESQL_PATH)/include -I$(POSTGRESQL_PATH)/interfaces/libpq +POSTGRESQL_LDIR := $(POSTGRESQL_PATH)/interfaces/libpq -L$(POSTGRESQL_PATH)/common -L$(POSTGRESQL_PATH)/port LIBUSUAL_PATH=$(DEPS_PATH)/libusual/libusual LIBUSUAL_IDIR=$(LIBUSUAL_PATH) @@ -167,7 +167,7 @@ endif MYCXXFLAGS += $(IDIRS) $(OPTZ) $(DEBUG) $(PSQLCH) -DGITVERSION=\"$(GIT_VERSION)\" $(WGCOV) $(WASAN) -STATICMYLIBS := -Wl,-Bstatic -lconfig -lproxysql -ldaemon -lconfig++ -lre2 -lpcrecpp -lpcre -lmariadbclient -lpq -lhttpserver -lmicrohttpd -linjection -lcurl -lssl -lcrypto -lev -lscram -lusual +STATICMYLIBS := -Wl,-Bstatic -lconfig -lproxysql -ldaemon -lconfig++ -lre2 -lpcrecpp -lpcre -lmariadbclient -lhttpserver -lmicrohttpd -linjection -lcurl -lssl -lcrypto -lev -lscram -lusual -lpq -lpgcommon -lpgport ifneq ($(NOJEMALLOC),1) STATICMYLIBS += -ljemalloc endif