Route query to PostgreSQL backend server

v2.x_pg_PrepStmtBase_240714
Rahim Kanji 2 years ago
parent d4cec89bfa
commit dcbe140199

@ -8,7 +8,7 @@
using json = nlohmann::json;
class PgSQL_SrvC;
class PgSQL_ResultSet;
//#define STATUS_MYSQL_CONNECTION_TRANSACTION 0x00000001 // DEPRECATED
#define STATUS_MYSQL_CONNECTION_COMPRESSION 0x00000002
#define STATUS_MYSQL_CONNECTION_USER_VARIABLE 0x00000004
@ -172,6 +172,10 @@ static const Param_Name_Validation* PgSQL_Param_Name_Accepted_Values[PG_PARAM_SI
&load_balance_hosts
};
#define PG_EVENT_NONE 0x00
#define PG_EVENT_READ 0x01
#define PG_EVENT_WRITE 0x02
class PgSQL_Conn_Param {
private:
bool validate(PgSQL_Param_Name key, const char* val) {
@ -263,7 +267,6 @@ enum pgsql_charset_action {
POSTGRESQL_CHARSET_ACTION_CONNECT_START
};
class PgSQL_Connection_userinfo {
private:
uint64_t compute_hash();
@ -282,7 +285,7 @@ class PgSQL_Connection_userinfo {
bool set_schemaname(char *, int);
};
class PgSQL_Connection {
class PgSQL_Connection_Placeholder {
private:
void update_warning_count_from_connection();
void update_warning_count_from_statement();
@ -371,7 +374,7 @@ class PgSQL_Connection {
uint32_t status_flags;
int async_exit_status; // exit status of MariaDB Client Library Non blocking API
int interr; // integer return
MDB_ASYNC_ST async_state_machine; // Async state machine
PG_ASYNC_ST async_state_machine; // Async state machine
short wait_events;
uint8_t compression_pkt_id;
my_bool ret_bool;
@ -383,8 +386,8 @@ class PgSQL_Connection {
bool unknown_transaction_status;
void compute_unknown_transaction_status();
char gtid_uuid[128];
PgSQL_Connection();
~PgSQL_Connection();
PgSQL_Connection_Placeholder();
~PgSQL_Connection_Placeholder();
bool set_autocommit(bool);
bool set_no_backslash_escapes(bool);
unsigned int set_charset(unsigned int, enum pgsql_charset_action);
@ -414,8 +417,8 @@ class PgSQL_Connection {
void set_option_start();
void set_option_cont(short event);
void set_query(char *stmt, unsigned long length);
MDB_ASYNC_ST handler(short event);
void next_event(MDB_ASYNC_ST new_st);
PG_ASYNC_ST handler(short event);
void next_event(PG_ASYNC_ST new_st);
int async_connect(short event);
int async_change_user(short event);
@ -481,4 +484,139 @@ class PgSQL_Connection {
unsigned int number_of_matching_session_variables(const PgSQL_Connection *client_conn, unsigned int& not_matching);
unsigned long get_mysql_thread_id() { return pgsql ? pgsql->thread_id : 0; }
};
enum PG_ERROR_TYPE {
PG_NO_ERROR,
PG_CONNECT_FAILED,
PG_QUERY_FAILED,
PG_RESULT_FAILED,
};
class PgSQL_Connection : public PgSQL_Connection_Placeholder {
public:
PgSQL_Connection();
~PgSQL_Connection();
PG_ASYNC_ST handler(short event);
void connect_start();
void connect_cont(short event);
void query_start();
void query_cont(short event);
void fetch_result_start();
void fetch_result_cont(short event);
int async_connect(short event);
int async_set_autocommit(short event, bool ac);
int async_query(short event, char* stmt, unsigned long length, MYSQL_STMT** _stmt = NULL, stmt_execute_metadata_t* _stmt_meta = NULL);
int async_ping(short event);
void next_event(PG_ASYNC_ST new_st);
bool IsAutoCommit();
bool is_connected() const;
void compute_unknown_transaction_status();
void async_free_result();
void flush();
std::string get_error_code_from_result() const;
bool is_error_present() const {
return err_type != PG_NO_ERROR;
}
PG_ERROR_TYPE get_error_type() const {
return err_type;
}
std::string get_error_message() const {
return err_msg;
}
void set_error(PG_ERROR_TYPE _err_type, const std::string& _err_msg) {
err_type = _err_type;
err_msg = _err_msg;
}
void reset_error() {
err_type = PG_NO_ERROR;
err_msg.clear();
}
PGresult* get_last_result() const {
return last_result;
}
void set_last_result(PGresult* res) {
if (last_result) {
PQclear(last_result);
}
last_result = res;
}
void reset_last_result() {
if (last_result) {
PQclear(last_result);
last_result = nullptr;
}
}
void optimize() {}
//PgSQL_Conn_Param conn_params;
PGconn* pgsql_conn;
PGresult* last_result;
PgSQL_ResultSet* MyRS;
PgSQL_ResultSet* MyRS_reuse;
PG_ERROR_TYPE err_type;
std::string err_msg;
//PgSQL_SrvC* parent;
//PgSQL_Connection_userinfo* userinfo;
//PgSQL_Data_Stream* myds;
//int fd;
/*std::string get_error_code(PG_ERROR_TYPE* errtype = NULL) {
assert(pgsql_conn);
std::string error_code = PGCONN_NO_ERROR;
ConnStatusType status = PQstatus(pgsql_conn);
if (status == CONNECTION_BAD) {
if (errtype) *errtype = PG_CONNECTION_ERROR;
error_code = PQparameterStatus(pgsql_conn, "SQLSTATE");
}
else if (pgsql_result != NULL) {
ExecStatusType status = PQresultStatus(pgsql_result);
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
if (errtype) *errtype = PG_SERVER_ERROR;
error_code = PQresultErrorField(pgsql_result, PG_DIAG_SQLSTATE);
}
}
return error_code;
}
std::string get_error_message(PG_ERROR_TYPE* errtype = NULL) {
assert(pgsql_conn);
std::string error_message{};
ConnStatusType status = PQstatus(pgsql_conn);
if (status == CONNECTION_BAD) {
if (errtype) *errtype = PG_CONNECTION_ERROR;
error_message = PQerrorMessage(pgsql_conn);
}
else if (pgsql_result != NULL) {
ExecStatusType status = PQresultStatus(pgsql_result);
if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
if (errtype) *errtype = PG_SERVER_ERROR;
error_message = PQresultErrorMessage(pgsql_result);
}
}
return error_message;
}*/
};
#endif /* __CLASS_POSTGRESQL_CONNECTION_H */

@ -63,21 +63,40 @@ struct pgsql_hdr {
PtrSize_t data;
};
struct PG_Field {
char* name;
uint32_t tbl_oid;
uint16_t col_idx;
uint32_t type_oid;
uint16_t col_len;
uint32_t type_mod;
uint16_t fmt;
};
using PG_Fields = std::vector<PG_Field>;
class PG_pkt
{
public:
PG_pkt(unsigned c = PG_PKT_DEFAULT_SIZE) {
ownership = true;
capacity = l_near_pow_2(c);
size = 0;
ptr = (char*)malloc(capacity);
multiple_pkt_mode = false;
}
PG_pkt(void* _ptr, unsigned int _capacity) {
ownership = false;
ptr = (char*)_ptr;
capacity = _capacity;
size = 0;
}
~PG_pkt() {
reset();
}
void reset() {
if (ptr)
if (ptr && ownership == true)
free(ptr);
ptr = nullptr;
size = 0;
@ -154,10 +173,51 @@ private:
// currently for debug only. will replace this with a single variable that will contain last pkt offset
std::vector<unsigned int> pkt_offset;
bool multiple_pkt_mode = false;
bool ownership = true;
friend void SQLite3_to_Postgres(PtrSizeArray* psa, SQLite3_result* result, char* error, int affected_rows, const char* query_type);
};
class PgSQL_Protocol;
class PgSQL_ResultSet {
public:
PgSQL_Data_Stream* ds;
PgSQL_Protocol* proto;
PGconn* pgsql_conn;
PGresult* result;
PgSQL_ResultSet();
~PgSQL_ResultSet();
void init(PgSQL_Protocol* _proto, PGconn* _conn);
void buffer_init(PgSQL_Protocol* _proto);
unsigned int add_row_description(PGresult* result);
unsigned int add_row(PGresult* result);
unsigned int add_eof(PGresult* result);
void add_err(PgSQL_Data_Stream* _myds);
bool get_resultset(PtrSizeArray* PSarrayFinal);
void buffer_to_PSarrayOut(bool _last = false);
unsigned long long current_size();
//private:
unsigned char* buffer;
unsigned int buffer_used;
uint8_t sid;
bool transfer_started;
bool resultset_completed;
unsigned int num_fields;
unsigned long long num_rows;
unsigned long long resultset_size;
PtrSizeArray PSarrayOUT;
friend class PgSQL_Protocol;
};
class PgSQL_Protocol : public MySQL_Protocol {
public:
void init(PgSQL_Data_Stream** __myds, PgSQL_Connection_userinfo* __userinfo, PgSQL_Session* __sess) {
@ -166,14 +226,21 @@ public:
sess = __sess;
current_PreStmt = NULL;
}
PgSQL_Data_Stream* get_myds() { return *myds; }
bool generate_pkt_initial_handshake(bool send, void** ptr, unsigned int* len, uint32_t* thread_id, bool deprecate_eof_active) override;
bool process_startup_packet(unsigned char* pkt, unsigned int len, bool& ssl_request);
EXECUTION_STATE process_handshake_response_packet(unsigned char* pkt, unsigned int len);
void welcome_client();
void generate_error_packet(bool send_ready, const char* msg, const char* code, bool fatal);
bool generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query);
void generate_error_packet(bool send, bool ready, const char* msg, const char* code, bool fatal, PtrSize_t* _ptr = NULL);
bool generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query, PtrSize_t* _ptr = NULL);
//bool generate_row_description(bool send, PgSQL_ResultSet* rs, const PG_Fields& fields, unsigned int size);
unsigned int copy_row_description_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result);
unsigned int copy_row_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result);
unsigned int copy_eof_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result);
private:
bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr);
void load_conn_parameters(pgsql_hdr* pkt, bool startup);

@ -359,7 +359,7 @@ public:
PgSQL_Backend* find_or_create_backend(int, PgSQL_Data_Stream* _myds = NULL);
void SQLite3_to_MySQL(SQLite3_result*, char*, int, MySQL_Protocol*, bool in_transaction = false, bool deprecate_eof_active = false);
void MySQL_Result_to_MySQL_wire(MYSQL* mysql, MySQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds = NULL);
void PgSQL_Result_to_PgSQL_wire(PGconn* mysql, PgSQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds = NULL);
void MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT* stmt, PgSQL_Connection* myconn);
unsigned int NumActiveTransactions(bool check_savpoint = false);
bool HasOfflineBackends();

@ -52,7 +52,7 @@
#include "mysql.h"
#include "mariadb_com.h"
#include "libpq-fe.h"
#include "proxysql_mem.h"
#include "proxysql_structs.h"

@ -47,7 +47,7 @@ enum cred_username_type { USERNAME_BACKEND, USERNAME_FRONTEND, USERNAME_NONE };
#define PROXYSQL_USE_RESULT
enum MDB_ASYNC_ST { // MariaDB Async State Machine
enum ASYNC_ST { // MariaDB Async State Machine
ASYNC_CONNECT_START,
ASYNC_CONNECT_CONT,
ASYNC_CONNECT_END,
@ -122,6 +122,10 @@ enum MDB_ASYNC_ST { // MariaDB Async State Machine
ASYNC_IDLE
};
using MDB_ASYNC_ST = ASYNC_ST;
using PG_ASYNC_ST = ASYNC_ST;
// list of possible debugging modules
enum debug_module {
PROXY_DEBUG_GENERIC,

@ -65,9 +65,13 @@ LIBSCRAM_PATH=$(DEPS_PATH)/libscram/
LIBSCRAM_IDIR=$(LIBSCRAM_PATH)/include/
LIBSCRAM_LDIR=$(LIBSCRAM_PATH)/lib/
POSTGRES_PATH=$(DEPS_PATH)/postgresql/postgresql/src/
POSTGRES_IFACE=$(POSTGRES_PATH)/interfaces/libpq/ -I$(POSTGRES_PATH)/include
POSTGRES_LDIR=$(POSTGRES_IFACES)/libpq/
IDIR := ../include
IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(SSL_IDIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR)
IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(SSL_IDIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR) -I$(POSTGRES_IFACE)
ifeq ($(UNAME_S),Linux)
IDIRS += -I$(COREDUMPER_IDIR)
endif

File diff suppressed because it is too large Load Diff

@ -32,6 +32,8 @@ extern PgSQL_Authentication* GloPgAuth;
void PG_pkt::make_space(unsigned int len) {
if (ownership == false) return;
if ((size + len) <= capacity) {
return;
} else {
@ -682,7 +684,7 @@ bool PgSQL_Protocol::process_startup_packet(unsigned char* pkt, unsigned int len
if (!user || *user == '\0') {
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p. no username supplied.\n", (*myds), (*myds)->sess);
generate_error_packet(false, "no username supplied", NULL, true);
generate_error_packet(true, false, "no username supplied", NULL, true);
return false;
}
@ -729,7 +731,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char*
if (!user || *user == '\0') {
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Client password pkt before startup packet.\n", (*myds), (*myds)->sess, user);
generate_error_packet(false, "client password pkt before startup packet", NULL, true);
generate_error_packet(true, false, "client password pkt before startup packet", NULL, true);
goto __exit_process_pkt_handshake_response;
}
@ -800,7 +802,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char*
if (!pass || *pass == '\0') {
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Empty password returned by client.\n", (*myds), (*myds)->sess, user);
generate_error_packet(false, "empty password returned by client", NULL, true);
generate_error_packet(true, false, "empty password returned by client", NULL, true);
break;
}
@ -836,7 +838,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char*
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Selected SASL mechanism: %s.\n", (*myds), (*myds)->sess, user, mech);
if (strcmp(mech, "SCRAM-SHA-256") != 0) {
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. Client selected an invalid SASL authentication mechanism: %s.\n", (*myds), (*myds)->sess, user, mech);
generate_error_packet(false, "client selected an invalid SASL authentication mechanism", NULL, true);
generate_error_packet(true, false, "client selected an invalid SASL authentication mechanism", NULL, true);
break;
}
@ -856,7 +858,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char*
if (!scram_handle_client_first(&(*myds)->scram_state, &stored_user_info, ((const unsigned char*)hdr.data.ptr) + read_pos, length)) {
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. SASL authentication failed\n", (*myds), (*myds)->sess, user);
generate_error_packet(false, "SASL authentication failed", NULL, true);
generate_error_packet(true,false, "SASL authentication failed", NULL, true);
break;
}
@ -902,7 +904,7 @@ EXECUTION_STATE PgSQL_Protocol::process_handshake_response_packet(unsigned char*
}
} else {
proxy_debug(PROXY_DEBUG_MYSQL_AUTH, 5, "Session=%p , DS=%p , user='%s'. User not found in the database.\n", (*myds), (*myds)->sess, user);
generate_error_packet(false, "User not found", NULL, true);
generate_error_packet(true,false, "User not found", NULL, true);
}
// set the default session charset
//(*myds)->sess->default_charset = charset;
@ -1004,10 +1006,13 @@ void PgSQL_Protocol::welcome_client() {
//(*myds)->sess->status = WAITING_CLIENT_DATA;
}
void PgSQL_Protocol::generate_error_packet(bool send_ready, const char* msg, const char* code, bool fatal) {
void PgSQL_Protocol::generate_error_packet(bool send, bool ready, const char* msg, const char* code, bool fatal, PtrSize_t* _ptr) {
// to avoid memory leak
assert(send == true || _ptr);
PG_pkt pgpkt{};
if (send_ready)
if (ready)
pgpkt.set_multi_pkt_mode(true);
pgpkt.write_generic('E', "cscscscsc",
@ -1015,31 +1020,39 @@ void PgSQL_Protocol::generate_error_packet(bool send_ready, const char* msg, con
'V', fatal ? "FATAL" : "ERROR",
'C', code ? code : "08P01", 'M', msg, 0);
if (send_ready) {
if (ready) {
pgpkt.write_ReadyForQuery();
pgpkt.set_multi_pkt_mode(false);
}
auto buff = pgpkt.detach();
(*myds)->PSarrayOUT->add((void*)buff.first, buff.second);
switch ((*myds)->DSS) {
case STATE_SERVER_HANDSHAKE:
case STATE_CLIENT_HANDSHAKE:
case STATE_QUERY_SENT_DS:
case STATE_QUERY_SENT_NET:
case STATE_ERR:
(*myds)->DSS = STATE_ERR;
break;
case STATE_OK:
break;
case STATE_SLEEP:
if ((*myds)->sess->session_fast_forward == true) { // see issue #733
if (send) {
(*myds)->PSarrayOUT->add((void*)buff.first, buff.second);
switch ((*myds)->DSS) {
case STATE_SERVER_HANDSHAKE:
case STATE_CLIENT_HANDSHAKE:
case STATE_QUERY_SENT_DS:
case STATE_QUERY_SENT_NET:
case STATE_ERR:
(*myds)->DSS = STATE_ERR;
break;
case STATE_OK:
break;
case STATE_SLEEP:
if ((*myds)->sess->session_fast_forward == true) { // see issue #733
break;
}
default:
// LCOV_EXCL_START
assert(0);
// LCOV_EXCL_STOP
}
default:
// LCOV_EXCL_START
assert(0);
// LCOV_EXCL_STOP
}
if (_ptr) {
_ptr->ptr = buff.first;
_ptr->size = buff.second;
}
}
@ -1184,11 +1197,13 @@ char* extract_tag_from_query(const char* query) {
}
bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query) {
bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query, PtrSize_t* _ptr) {
// to avoid memory leak
assert(send == true || _ptr);
PG_pkt pgpkt{};
if (send == true) {
if (ready == true) {
pgpkt.set_multi_pkt_mode(true);
}
@ -1199,8 +1214,7 @@ bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg,
if (strcmp(tag, "INSERT") == 0) {
sprintf(tmpbuf, "%s 0 %d", tag, rows);
pgpkt.write_CommandComplete(tmpbuf);
}
else if (strcmp(tag, "UPDATE") == 0 ||
} else if (strcmp(tag, "UPDATE") == 0 ||
strcmp(tag, "DELETE") == 0 ||
strcmp(tag, "MERGE") == 0 ||
strcmp(tag, "MOVE") == 0 ||
@ -1210,21 +1224,492 @@ bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg,
strcmp(tag, "COPY") == 0 ) {
sprintf(tmpbuf, "%s %d", tag, rows);
pgpkt.write_CommandComplete(tmpbuf);
}
else {
} else {
pgpkt.write_CommandComplete(tag);
}
if (ready == true) {
pgpkt.write_ReadyForQuery();
pgpkt.set_multi_pkt_mode(false);
}
auto buff = pgpkt.detach();
if (send == true) {
pgpkt.set_multi_pkt_mode(false);
auto buff = pgpkt.detach();
(*myds)->PSarrayOUT->add((void*)buff.first, buff.second);
} else {
_ptr->ptr = buff.first;
_ptr->size = buff.second;
}
free(tag);
return true;
}
//bool PgSQL_Protocol::generate_row_description(bool send, PgSQL_ResultSet* rs, const PG_Fields& fields, unsigned int size) {
// if ((*myds)->sess->mirror == true) {
// return true;
// }
//
// unsigned char* _ptr = NULL;
//
// if (rs) {
// if (size <= (RESULTSET_BUFLEN - rs->buffer_used)) {
// // there is space in the buffer, add the data to it
// _ptr = rs->buffer + rs->buffer_used;
// rs->buffer_used += size;
// } else {
// // there is no space in the buffer, we flush the buffer and recreate it
// rs->buffer_to_PSarrayOut();
// // now we can check again if there is space in the buffer
// if (size <= (RESULTSET_BUFLEN - rs->buffer_used)) {
// // there is space in the NEW buffer, add the data to it
// _ptr = rs->buffer + rs->buffer_used;
// rs->buffer_used += size;
// } else {
// // a new buffer is not enough to store the new row
// _ptr = (unsigned char*)l_alloc(size);
// }
// }
// } else {
// _ptr = (unsigned char*)l_alloc(size);
// }
//
// PG_pkt pgpkt(_ptr, 0);
//
// pgpkt.put_char('T');
// pgpkt.put_uint32(size );
// pgpkt.put_uint16(fields.size());
//
// for (unsigned int i = 0; i < fields.size(); i++) {
// pgpkt.put_string(fields[i].name);
// pgpkt.put_uint32(fields[i].tbl_oid);
// pgpkt.put_uint16(fields[i].col_idx);
// pgpkt.put_uint32(fields[i].type_oid);
// pgpkt.put_uint16(fields[i].col_len);
// pgpkt.put_uint32(fields[i].type_mod);
// pgpkt.put_uint16(fields[i].fmt);
// }
//
// if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); }
//
////#ifdef DEBUG
//// if (dump_pkt) { __dump_pkt(__func__, _ptr, size); }
////#endif
// if (rs) {
// if (_ptr >= rs->buffer && _ptr < rs->buffer + RESULTSET_BUFLEN) {
// // we are writing within the buffer, do not add to PSarrayOUT
// } else {
// // we are writing outside the buffer, add to PSarrayOUT
// rs->PSarrayOUT.add(_ptr, size);
// }
// }
// return true;
//}
unsigned int PgSQL_Protocol::copy_row_description_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result) {
if ((*myds)->sess->mirror == true) {
return true;
}
assert(pg_rs);
assert(result);
unsigned int fields_cnt = PQnfields(result);
unsigned int size = 1 + 4 + 2;
for (int i = 0; i < fields_cnt; i++) {
size += strlen(PQfname(result, i)) + 1 + 18; // null terminator, name, reloid, colnr, oid, typsize, typmod, fmt
}
unsigned char* _ptr = NULL;
unsigned int available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used);
if (size <= available_capacity) {
// there is space in the buffer, add the data to it
_ptr = pg_rs->buffer + pg_rs->buffer_used;
pg_rs->buffer_used += size;
} else {
// there is no space in the buffer, we flush the buffer and recreate it
pg_rs->buffer_to_PSarrayOut();
// now we can check again if there is space in the buffer
available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used);
if (size <= available_capacity) {
// there is space in the NEW buffer, add the data to it
_ptr = pg_rs->buffer + pg_rs->buffer_used;
pg_rs->buffer_used += size;
} else {
// a new buffer is not enough to store the new row
_ptr = (unsigned char*)l_alloc(size);
available_capacity = size;
}
}
assert(_ptr);
PG_pkt pgpkt(_ptr, available_capacity);
pgpkt.put_char('T');
pgpkt.put_uint32(size - 1);
pgpkt.put_uint16(fields_cnt);
for (unsigned int i = 0; i < fields_cnt; i++) {
pgpkt.put_string(PQfname(result, i));
pgpkt.put_uint32(PQftable(result, i));
pgpkt.put_uint16(PQftablecol(result, i));
pgpkt.put_uint32(PQftype(result, i));
pgpkt.put_uint16(PQfsize(result, i));
pgpkt.put_uint32(PQfmod(result, i));
pgpkt.put_uint16(PQfformat(result, i));
}
if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); }
//#ifdef DEBUG
// if (dump_pkt) { __dump_pkt(__func__, _ptr, size); }
//#endif
if (pg_rs) {
if (_ptr >= pg_rs->buffer && _ptr < pg_rs->buffer + RESULTSET_BUFLEN) {
// we are writing within the buffer, do not add to PSarrayOUT
}
else {
// we are writing outside the buffer, add to PSarrayOUT
pg_rs->PSarrayOUT.add(_ptr, size);
}
}
pg_rs->num_fields = fields_cnt;
pg_rs->resultset_size = size;
return size;
}
unsigned int PgSQL_Protocol::copy_row_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result) {
if ((*myds)->sess->mirror == true) {
return true;
}
assert(pg_rs);
assert(result);
assert(pg_rs->num_fields);
const unsigned int numRows = PQntuples(result);
unsigned int total_size = 0;
for (unsigned int i = 0; i < numRows; i++) {
unsigned int size = 1 + 4 + 2; // 'D', length, field count
for (int j = 0; j < pg_rs->num_fields; j++) {
size += PQgetlength(result, i, j) + 4; // length, value
}
total_size += size;
unsigned char* _ptr = NULL;
unsigned int available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used);
if (size <= available_capacity) {
// there is space in the buffer, add the data to it
_ptr = pg_rs->buffer + pg_rs->buffer_used;
pg_rs->buffer_used += size;
}
else {
// there is no space in the buffer, we flush the buffer and recreate it
pg_rs->buffer_to_PSarrayOut();
// now we can check again if there is space in the buffer
available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used);
if (size <= available_capacity) {
// there is space in the NEW buffer, add the data to it
_ptr = pg_rs->buffer + pg_rs->buffer_used;
pg_rs->buffer_used += size;
}
else {
// a new buffer is not enough to store the new row
_ptr = (unsigned char*)l_alloc(size);
available_capacity = size;
}
}
assert(_ptr);
PG_pkt pgpkt(_ptr, available_capacity);
pgpkt.put_char('D');
pgpkt.put_uint32(size - 1);
pgpkt.put_uint16(pg_rs->num_fields);
int column_value_len = 0;
for (int j = 0; j < pg_rs->num_fields; j++) {
column_value_len = PQgetlength(result, i, j);
pgpkt.put_uint32(column_value_len);
pgpkt.put_bytes(PQgetvalue(result, i, j), column_value_len);
}
if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); }
pg_rs->resultset_size += size;
if (pg_rs) {
if (_ptr >= pg_rs->buffer && _ptr < pg_rs->buffer + RESULTSET_BUFLEN) {
// we are writing within the buffer, do not add to PSarrayOUT
}
else {
// we are writing outside the buffer, add to PSarrayOUT
pg_rs->PSarrayOUT.add(_ptr, size);
}
}
}
pg_rs->num_rows += numRows;
return total_size;
}
unsigned int PgSQL_Protocol::copy_eof_to_PgSQL_ResultSet(bool send, PgSQL_ResultSet* pg_rs, PGresult* result) {
if ((*myds)->sess->mirror == true) {
return true;
}
assert(pg_rs);
assert(result);
assert(pg_rs->num_fields);
const char* tag = PQcmdStatus(result);
if (!tag) assert(0); // for testing it should not be null
const unsigned int tag_len = strlen(tag) + 1;
unsigned int size = 1 + 4 + tag_len + 1 + 4 + 1; // 'C', length, tag, Z, length, I
unsigned char* _ptr = NULL;
unsigned int available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used);
if (size <= available_capacity) {
// there is space in the buffer, add the data to it
_ptr = pg_rs->buffer + pg_rs->buffer_used;
pg_rs->buffer_used += size;
} else {
// there is no space in the buffer, we flush the buffer and recreate it
pg_rs->buffer_to_PSarrayOut();
// now we can check again if there is space in the buffer
available_capacity = (RESULTSET_BUFLEN - pg_rs->buffer_used);
if (size <= available_capacity) {
// there is space in the NEW buffer, add the data to it
_ptr = pg_rs->buffer + pg_rs->buffer_used;
pg_rs->buffer_used += size;
} else {
// a new buffer is not enough to store the new row
_ptr = (unsigned char*)l_alloc(size);
available_capacity = size;
}
}
assert(_ptr);
PG_pkt pgpkt(_ptr, available_capacity);
pgpkt.put_char('C');
pgpkt.put_uint32(tag_len + 4);
pgpkt.put_string(tag);
pgpkt.put_char('Z');
pgpkt.put_uint32(4 + 1);
pgpkt.put_char('I');
if (send == true) { (*myds)->PSarrayOUT->add((void*)_ptr, size); }
pg_rs->resultset_size += size;
if (pg_rs) {
if (_ptr >= pg_rs->buffer && _ptr < pg_rs->buffer + RESULTSET_BUFLEN) {
// we are writing within the buffer, do not add to PSarrayOUT
} else {
// we are writing outside the buffer, add to PSarrayOUT
pg_rs->PSarrayOUT.add(_ptr, size);
}
}
return size;
}
PgSQL_ResultSet::PgSQL_ResultSet() {
buffer = NULL;
transfer_started = false;
resultset_completed = false;
buffer_used = 0;
resultset_size = 0;
sid = 0;
num_rows = 0;
ds = NULL;
}
PgSQL_ResultSet::~PgSQL_ResultSet() {
PtrSize_t pkt;
while (PSarrayOUT.len) {
PSarrayOUT.remove_index_fast(0, &pkt);
l_free(pkt.size, pkt.ptr);
}
if (buffer) {
free(buffer);
buffer = NULL;
}
}
void PgSQL_ResultSet::buffer_init(PgSQL_Protocol* _proto) {
if (buffer == NULL) {
buffer = (unsigned char*)malloc(RESULTSET_BUFLEN);
}
buffer_used = 0;
proto = _proto;
}
void PgSQL_ResultSet::init(PgSQL_Protocol* _proto, PGconn* _conn) {
PROXY_TRACE2();
transfer_started = false;
resultset_completed = false;
//resultset_size = 0;
//sid = 0;
num_rows = 0;
proto = _proto;
result = NULL;
pgsql_conn = _conn;
buffer_init(_proto);
if (proto) { // if proto = NULL , this is a mirror
ds = proto->get_myds();
sid = ds->pkt_sid + 1;
}
// immediately generate the first set of packets
// columns count
if (proto == NULL) {
return; // this is a mirror
}
/*
num_fields = PQnfields(result);
unsigned int total_size = 0;
for (int i = 0; i < num_fields; i++) {
total_size += strlen(PQfname(result, i)) + 18; // name, reloid, colnr, oid, typsize, typmod, fmt
}
PG_Fields fields(num_fields);
for (int i = 0; i < num_fields; i++) {
fields[i].name = PQfname(result, i);
fields[i].tbl_oid = PQftable(result, i);
fields[i].col_idx = PQftablecol(result, i);
fields[i].type_oid = PQftype(result, i);
fields[i].col_len = PQfsize(result, i);
fields[i].type_mod = PQfmod(result, i);
fields[i].fmt = PQfformat(result, i);
}
proto->generate_row_description(false, fields, resultset_size);
*/
}
unsigned int PgSQL_ResultSet::add_row_description(PGresult* result) {
const unsigned int bytes = proto->copy_row_description_to_PgSQL_ResultSet(false, this, result);
if (RESULTSET_BUFLEN <= (buffer_used + 9)) {
buffer_to_PSarrayOut();
}
return bytes;
}
unsigned int PgSQL_ResultSet::add_row(PGresult* result) {
const unsigned int bytes = proto->copy_row_to_PgSQL_ResultSet(false,this, result);
if (RESULTSET_BUFLEN <= (buffer_used + 9)) {
buffer_to_PSarrayOut();
}
return bytes;
}
void PgSQL_ResultSet::add_err(PgSQL_Data_Stream* _myds) {
PtrSize_t pkt;
if (proto) {
buffer_to_PSarrayOut();
const char* sqlstate = NULL;
const char* errmsg = NULL;
if (result) {
sqlstate = PQresultErrorField(result, PG_DIAG_SQLSTATE);
errmsg = PQresultErrorMessage(result);
}
if (_myds && _myds->killed_at) { // see case #750
if (_myds->kill_type == 0) {
proto->generate_error_packet(false, true, (char*)"Query execution was interrupted, query_timeout exceeded", sqlstate ? sqlstate : "57014", false, &pkt);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _myds->myconn->parent->myhgc->hid, _myds->myconn->parent->address, _myds->myconn->parent->port, 1907);
}
else {
proto->generate_error_packet(false, true, (char*)"Query execution was interrupted", sqlstate ? sqlstate : "57014", false, &pkt);
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _myds->myconn->parent->myhgc->hid, _myds->myconn->parent->address, _myds->myconn->parent->port, 1317);
}
} else {
proto->generate_error_packet(false, true, errmsg ? errmsg : "Unknown error", sqlstate, false, &pkt);
// TODO: Check this is a mysql error
PgHGM->p_update_pgsql_error_counter(p_pgsql_error_type::proxysql, _myds->myconn->parent->myhgc->hid, _myds->myconn->parent->address, _myds->myconn->parent->port, 1907);
}
PSarrayOUT.add(pkt.ptr, pkt.size);
sid++;
resultset_size += pkt.size;
}
resultset_completed = true;
}
bool PgSQL_ResultSet::get_resultset(PtrSizeArray* PSarrayFinal) {
transfer_started = true;
if (proto) {
PSarrayFinal->copy_add(&PSarrayOUT, 0, PSarrayOUT.len);
while (PSarrayOUT.len)
PSarrayOUT.remove_index(PSarrayOUT.len - 1, NULL);
}
return resultset_completed;
}
void PgSQL_ResultSet::buffer_to_PSarrayOut(bool _last) {
if (buffer_used == 0)
return; // exit immediately if the buffer is empty
if (buffer_used < RESULTSET_BUFLEN / 2) {
if (_last == false) {
buffer = (unsigned char*)realloc(buffer, buffer_used);
}
}
PSarrayOUT.add(buffer, buffer_used);
if (_last) {
buffer = NULL;
}
else {
buffer = (unsigned char*)malloc(RESULTSET_BUFLEN);
}
buffer_used = 0;
}
unsigned long long PgSQL_ResultSet::current_size() {
unsigned long long intsize = 0;
intsize += sizeof(PgSQL_ResultSet);
intsize += RESULTSET_BUFLEN; // size of buffer
if (PSarrayOUT.len == 0) // see bug #699
return intsize;
intsize += sizeof(PtrSizeArray);
intsize += (PSarrayOUT.size * sizeof(PtrSize_t*));
unsigned int i;
for (i = 0; i < PSarrayOUT.len; i++) {
PtrSize_t* pkt = PSarrayOUT.index(i);
if (pkt->size > RESULTSET_BUFLEN) {
intsize += pkt->size;
}
else {
intsize += RESULTSET_BUFLEN;
}
}
return intsize;
}
unsigned int PgSQL_ResultSet::add_eof(PGresult* result) {
const unsigned int bytes = proto->copy_eof_to_PgSQL_ResultSet(false, this, result);
buffer_to_PSarrayOut();
resultset_completed = true;
return bytes;
}

@ -3039,7 +3039,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) {
mybe->server_myds->wait_until = thread->curtime + mysql_thread___connect_timeout_server * 1000;
pause_until = 0;
}
if (mybe->server_myds->max_connect_time) {
if (mybe->server_myds->max_connect_time ) {
if (thread->curtime >= mybe->server_myds->max_connect_time) {
if (mirror) {
PROXY_TRACE();
@ -3057,7 +3057,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) {
if (thread) {
thread->status_variables.stvar[st_var_max_connect_timeout_err]++;
}
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 9001, (char*)"HY000", errmsg.c_str(), true);
client_myds->myprot.generate_error_packet(true, true, errmsg.c_str(), "57P03", false); // not sure if this is the right error code
RequestEnd(mybe->server_myds);
string hg_status{};
@ -3201,7 +3201,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) {
else {
char buf[256];
sprintf(buf, "Max connect failure while reaching hostgroup %d", current_hostgroup);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, 9002, (char*)"HY000", buf, true);
client_myds->myprot.generate_error_packet(true,true,buf, "57P03", false); // not sure if this is the right error code
if (thread) {
thread->status_variables.stvar[st_var_max_connect_timeout_err]++;
}
@ -4050,8 +4050,194 @@ __get_pkts_from_client:
}
}
else {
proxy_error("Experimental feature");
assert(0);
char command = c = *((unsigned char*)pkt.ptr + 0);
switch (command) {
case 'Q':
{
__sync_add_and_fetch(&thread->status_variables.stvar[st_var_queries], 1);
if (session_type == PROXYSQL_SESSION_PGSQL) {
bool rc_break = false;
bool lock_hostgroup = false;
if (session_fast_forward == false) {
// Note: CurrentQuery sees the query as sent by the client.
// shortly after, the packets it used to contain the query will be deallocated
CurrentQuery.begin((unsigned char*)pkt.ptr, pkt.size, true);
}
rc_break = handler_special_queries(&pkt);
if (rc_break == true) {
if (mirror == false) {
// track also special queries
//RequestEnd(NULL);
// we moved this inside handler_special_queries()
// because a pointer was becoming invalid
break;
}
else {
handler_ret = -1;
return handler_ret;
}
}
timespec begint;
timespec endt;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint);
}
qpo = GloQPro->process_mysql_query(TO_CLIENT_SESSION(this), pkt.ptr, pkt.size, TO_QUERY_INFO(&CurrentQuery));
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt);
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
(endt.tv_sec * 1000000000 + endt.tv_nsec) -
(begint.tv_sec * 1000000000 + begint.tv_nsec);
}
assert(qpo); // GloQPro->process_mysql_query() should always return a qpo
// This block was moved from 'handler_special_queries' to support
// handling of 'USE' statements which are preceded by a comment.
// For more context check issue: #3493.
// ===================================================
if (session_type != PROXYSQL_SESSION_CLICKHOUSE) {
const char* qd = CurrentQuery.get_digest_text();
bool use_db_query = false;
if (qd != NULL) {
if (
(strncasecmp((char*)"USE", qd, 3) == 0)
&&
(
(strncasecmp((char*)"USE ", qd, 4) == 0)
||
(strncasecmp((char*)"USE`", qd, 4) == 0)
)
) {
use_db_query = true;
}
}
else {
if (pkt.size > (5 + 4) && strncasecmp((char*)"USE ", (char*)pkt.ptr + 5, 4) == 0) {
use_db_query = true;
}
}
if (use_db_query) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_USE_DB(&pkt);
if (mirror == false) {
break;
}
else {
handler_ret = -1;
return handler_ret;
}
}
}
// ===================================================
if (qpo->max_lag_ms >= 0) {
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
}
rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup);
if (mirror == false && rc_break == false) {
if (mysql_thread___automatic_detect_sqli) {
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_detect_SQLi()) {
handler_ret = -1;
return handler_ret;
}
}
}
if (rc_break == true) {
if (mirror == false) {
break;
}
else {
handler_ret = -1;
return handler_ret;
}
}
if (mirror == false) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
}
if (autocommit_on_hostgroup >= 0) {
}
if (mysql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6
if (locked_on_hostgroup < 0) {
if (lock_hostgroup) {
// we are locking on hostgroup now
if (qpo->destination_hostgroup >= 0) {
if (transaction_persistent_hostgroup == -1) {
current_hostgroup = qpo->destination_hostgroup;
}
}
locked_on_hostgroup = current_hostgroup;
thread->status_variables.stvar[st_var_hostgroup_locked]++;
thread->status_variables.stvar[st_var_hostgroup_locked_set_cmds]++;
}
}
if (locked_on_hostgroup >= 0) {
if (current_hostgroup != locked_on_hostgroup) {
client_myds->DSS = STATE_QUERY_SENT_NET;
int l = CurrentQuery.QueryLength;
char* end = (char*)"";
if (l > 256) {
l = 253;
end = (char*)"...";
}
string nqn = string((char*)CurrentQuery.QueryPointer, l);
char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s";
char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64);
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, 9005, (char*)"HY000", buf, true);
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
RequestEnd(NULL);
free(buf);
l_free(pkt.size, pkt.ptr);
break;
}
}
}
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure = mysql_thread___query_retries_on_failure;
// if a number of retries is set in mysql_query_rules, that takes priority
if (qpo) {
if (qpo->retries >= 0) {
mybe->server_myds->query_retries_on_failure = qpo->retries;
}
}
mybe->server_myds->connect_retries_on_failure = mysql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
pause_until = 0;
if (mysql_thread___default_query_delay) {
pause_until = thread->curtime + mysql_thread___default_query_delay * 1000;
}
if (qpo) {
if (qpo->delay > 0) {
if (pause_until == 0)
pause_until = thread->curtime;
pause_until += qpo->delay * 1000;
}
}
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
mybe->server_myds->killed_at = 0;
mybe->server_myds->kill_type = 0;
if (GloMyLdapAuth) {
if (session_type == PROXYSQL_SESSION_PGSQL) {
if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) {
add_ldap_comment_to_pkt(&pkt);
}
}
}
mybe->server_myds->mysql_real_query.init(&pkt);
mybe->server_myds->statuses.questions++;
client_myds->setDSS_STATE_QUERY_SENT_NET();
}
}
break;
default:
// not implemented yet
assert(0);
}
}
break;
}
@ -4808,10 +4994,10 @@ void PgSQL_Session::handler_minus1_GenerateErrorMessage(PgSQL_Data_Stream* myds,
switch (status) {
case PROCESSING_QUERY:
if (myconn) {
MySQL_Result_to_MySQL_wire(myconn->pgsql, myconn->MyRS, myconn->warning_count, myds);
PgSQL_Result_to_PgSQL_wire(myconn->pgsql_conn, myconn->MyRS, myconn->warning_count, myds);
}
else {
MySQL_Result_to_MySQL_wire(NULL, NULL, 0, myds);
PgSQL_Result_to_PgSQL_wire(NULL, NULL, 0, myds);
}
break;
case PROCESSING_STMT_PREPARE:
@ -5154,23 +5340,23 @@ handler_again:
goto handler_again;
}
}
if (handler_again___verify_backend_autocommit()) {
goto handler_again;
}
//if (handler_again___verify_backend_autocommit()) {
// goto handler_again;
//}
if (locked_on_hostgroup == -1 || locked_on_hostgroup_and_all_variables_set == false) {
if (handler_again___verify_backend_multi_statement()) {
goto handler_again;
}
if (handler_again___verify_backend_session_track_gtids()) {
goto handler_again;
}
//if (handler_again___verify_backend_session_track_gtids()) {
// goto handler_again;
//}
// Optimize network traffic when we can use 'SET NAMES'
if (verify_set_names(this)) {
goto handler_again;
}
//if (verify_set_names(this)) {
// goto handler_again;
//}
for (auto i = 0; i < SQL_NAME_LAST_LOW_WM; i++) {
auto client_hash = client_myds->myconn->var_hash[i];
@ -5265,14 +5451,14 @@ handler_again:
if (rc == 0) {
if (active_transactions != 0) { // run this only if currently we think there is a transaction
if ((myconn->pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) { // there is no transaction on the backend connection
if (myconn->pgsql && (myconn->pgsql->server_status & SERVER_STATUS_IN_TRANS) == 0) { // there is no transaction on the backend connection
active_transactions = NumActiveTransactions(); // we check all the hostgroups/backends
if (active_transactions == 0)
transaction_started_at = 0; // reset it
}
}
handler_rc0_Process_GTID(myconn);
//handler_rc0_Process_GTID(myconn);
// if we are locked on hostgroup, the value of autocommit is copied from the backend connection
// see bug #3549
@ -5282,7 +5468,7 @@ handler_again:
autocommit = myconn->pgsql->server_status & SERVER_STATUS_AUTOCOMMIT;
}
if (mirror == false) {
if (mirror == false && myconn->pgsql) {
// Support for LAST_INSERT_ID()
if (myconn->pgsql->insert_id) {
last_insert_id = myconn->pgsql->insert_id;
@ -5300,7 +5486,7 @@ handler_again:
switch (status) {
case PROCESSING_QUERY:
MySQL_Result_to_MySQL_wire(myconn->pgsql, myconn->MyRS, myconn->warning_count, myconn->myds);
PgSQL_Result_to_PgSQL_wire(myconn->pgsql_conn, myconn->MyRS, myconn->warning_count, myconn->myds);
break;
case PROCESSING_STMT_PREPARE:
{
@ -5394,7 +5580,7 @@ handler_again:
break;
// rc==2 : a multi-resultset (or multi statement) was detected, and the current statement is completed
case 2:
MySQL_Result_to_MySQL_wire(myconn->pgsql, myconn->MyRS, myconn->warning_count, myconn->myds);
PgSQL_Result_to_PgSQL_wire(myconn->pgsql_conn, myconn->MyRS, myconn->warning_count, myconn->myds);
if (myconn->MyRS) { // we also need to clear MyRS, so that the next staement will recreate it if needed
if (myconn->MyRS_reuse) {
delete myconn->MyRS_reuse;
@ -5984,7 +6170,7 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
}
#endif // DEBUG
sprintf(_s, "ProxySQL Error: Access denied for user '%s'@'%s' (using password: %s)", client_myds->myconn->userinfo->username, client_addr, (client_myds->myconn->userinfo->password ? "YES" : "NO"));
client_myds->myprot.generate_error_packet(false, _s, "28P01", true);
client_myds->myprot.generate_error_packet(true,false, _s, "28P01", true);
proxy_error("%s\n", _s);
free(_s);
__sync_fetch_and_add(&PgHGM->status.access_denied_wrong_password, 1);
@ -7653,7 +7839,7 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
}
void PgSQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT* stmt, PgSQL_Connection* myconn) {
MySQL_ResultSet* MyRS = NULL;
PgSQL_ResultSet* MyRS = NULL;
if (myconn) {
if (myconn->MyRS) {
MyRS = myconn->MyRS;
@ -7702,21 +7888,21 @@ void PgSQL_Session::MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT* stmt, PgSQL_Conn
}
}
void PgSQL_Session::MySQL_Result_to_MySQL_wire(MYSQL* pgsql, MySQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds) {
void PgSQL_Session::PgSQL_Result_to_PgSQL_wire(PGconn* pgsql, PgSQL_ResultSet* MyRS, unsigned int warning_count, PgSQL_Data_Stream* _myds) {
if (pgsql == NULL) {
// error
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, 2013, (char*)"HY000", (char*)"Lost connection to MySQL server during query");
return;
}
if (MyRS) {
assert(MyRS->result);
//assert(MyRS->result);
bool transfer_started = MyRS->transfer_started;
bool resultset_completed = MyRS->get_resultset(client_myds->PSarrayOUT);
CurrentQuery.rows_sent = MyRS->num_rows;
bool com_field_list = client_myds->com_field_list;
assert(resultset_completed); // the resultset should always be completed if MySQL_Result_to_MySQL_wire is called
if (transfer_started == false) { // we have all the resultset when MySQL_Result_to_MySQL_wire was called
if (qpo && qpo->cache_ttl > 0 && com_field_list == false) { // the resultset should be cached
assert(resultset_completed); // the resultset should always be completed if PgSQL_Result_to_PgSQL_wire is called
if (transfer_started == false) { // we have all the resultset when PgSQL_Result_to_PgSQL_wire was called
/*if (qpo && qpo->cache_ttl > 0 && com_field_list == false) { // the resultset should be cached
if (mysql_errno(pgsql) == 0 &&
(mysql_warning_count(pgsql) == 0 ||
mysql_thread___query_cache_handle_warnings == 1)) { // no errors
@ -7748,11 +7934,10 @@ void PgSQL_Session::MySQL_Result_to_MySQL_wire(MYSQL* pgsql, MySQL_ResultSet* My
client_myds->resultset_length = 0;
}
}
}
}*/
}
}
else { // no result set
int myerrno = mysql_errno(pgsql);
} else { // no result set
/*int myerrno = mysql_errno(pgsql);
if (myerrno == 0) {
unsigned int num_rows = mysql_affected_rows(pgsql);
uint16_t setStatus = (active_transactions ? SERVER_STATUS_IN_TRANS : 0);
@ -7780,7 +7965,7 @@ void PgSQL_Session::MySQL_Result_to_MySQL_wire(MYSQL* pgsql, MySQL_ResultSet* My
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, client_myds->pkt_sid + 1, mysql_errno(pgsql), sqlstate, mysql_error(pgsql));
}
//client_myds->pkt_sid++;
}
}*/
}
}

@ -5395,7 +5395,7 @@ void PgSQL_Thread::push_MyConn_local(PgSQL_Connection * c) {
PgSQL_SrvC* mysrvc = NULL;
mysrvc = (PgSQL_SrvC*)c->parent;
// reset insert_id #1093
c->pgsql->insert_id = 0;
//c->pgsql->insert_id = 0;
if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE) {
if (c->async_state_machine == ASYNC_IDLE) {
cached_connections->add(c);

@ -160,6 +160,9 @@ static unordered_map<string, const vector<string>&> module_tablenames = {
{ "scheduler", scheduler_tablenames },
{ "proxysql_servers", proxysql_servers_tablenames },
{ "restapi", restapi_tablenames },
{ "pgsql_servers", pgsql_servers_tablenames },
{ "pgsql_firewall", pgsql_firewall_tablenames },
{ "pgsql_query_rules", pgsql_query_rules_tablenames },
};
static void BQE1(SQLite3DB *db, const vector<string>& tbs, const string& p1, const string& p2, const string& p3) {
@ -13596,7 +13599,7 @@ void ProxySQL_Admin::send_error_msg_to_client(Client_Session<T>& sess, const cha
PgSQL_Data_Stream* myds = sess->client_myds;
char* new_msg = (char*)malloc(strlen(msg) + sizeof(prefix_msg));
sprintf(new_msg, "%s%s", prefix_msg, msg);
myds->myprot.generate_error_packet(false, new_msg, NULL, false);
myds->myprot.generate_error_packet(true,true, new_msg, NULL, false);
free(new_msg);
myds->DSS = STATE_SLEEP;
}

@ -89,9 +89,9 @@ EV_PATH := $(DEPS_PATH)/libev/libev/
EV_IDIR := $(EV_PATH)
EV_LDIR := $(EV_PATH)/.libs
POSTGRESQL_PATH := $(DEPS_PATH)/postgresql/postgresql/
POSTGRESQL_IDIR := $(POSTGRESQL_PATH)/src/include -I$(POSTGRESQL_PATH)/src/interfaces/libpq
POSTGRESQL_LDIR := $(POSTGRESQL_PATH)/src/interfaces/libpq
POSTGRESQL_PATH := $(DEPS_PATH)/postgresql/postgresql/src
POSTGRESQL_IDIR := $(POSTGRESQL_PATH)/include -I$(POSTGRESQL_PATH)/interfaces/libpq
POSTGRESQL_LDIR := $(POSTGRESQL_PATH)/interfaces/libpq -L$(POSTGRESQL_PATH)/common -L$(POSTGRESQL_PATH)/port
LIBUSUAL_PATH=$(DEPS_PATH)/libusual/libusual
LIBUSUAL_IDIR=$(LIBUSUAL_PATH)
@ -167,7 +167,7 @@ endif
MYCXXFLAGS += $(IDIRS) $(OPTZ) $(DEBUG) $(PSQLCH) -DGITVERSION=\"$(GIT_VERSION)\" $(WGCOV) $(WASAN)
STATICMYLIBS := -Wl,-Bstatic -lconfig -lproxysql -ldaemon -lconfig++ -lre2 -lpcrecpp -lpcre -lmariadbclient -lpq -lhttpserver -lmicrohttpd -linjection -lcurl -lssl -lcrypto -lev -lscram -lusual
STATICMYLIBS := -Wl,-Bstatic -lconfig -lproxysql -ldaemon -lconfig++ -lre2 -lpcrecpp -lpcre -lmariadbclient -lhttpserver -lmicrohttpd -linjection -lcurl -lssl -lcrypto -lev -lscram -lusual -lpq -lpgcommon -lpgport
ifneq ($(NOJEMALLOC),1)
STATICMYLIBS += -ljemalloc
endif

Loading…
Cancel
Save