Refactor related to Data Stream and Session: 1st

Also fixed minor test bugs
v2.x_code_refactor_2206
René Cannaò 4 years ago
parent 9a6320309f
commit 1bc48ad073

@ -146,7 +146,7 @@ build_tap_test: build_src
cd test/tap && OPTZ="${O0} -ggdb -DDEBUG" CC=${CC} CXX=${CXX} ${MAKE}
.PHONY: build_tap_test_debug
build_tap_test_debug: build_src
build_tap_test_debug: build_src_debug
cd test/tap && OPTZ="${O0} -ggdb -DDEBUG" CC=${CC} CXX=${CXX} ${MAKE} debug
.PHONY: build_src_debug

@ -74,14 +74,15 @@ class Query_Info {
class Client_Session
{
friend class MySQL_Session;
friend class Admin_MySQL_Session;
private:
//int handler_ret;
void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *, bool *);
// void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *, bool *);
void handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *);
// void handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *);
// void handler_WCDSS_MYSQL_COM_FIELD_LIST(PtrSize_t *);
void handler_WCDSS_MYSQL_COM_INIT_DB(PtrSize_t *);
// void handler_WCDSS_MYSQL_COM_INIT_DB(PtrSize_t *);
/**
* @brief Handles 'COM_QUERIES' holding 'USE DB' statements.
*
@ -93,7 +94,7 @@ class Client_Session
* But since it was change for handling 'USE' statements which are preceded by
* comments, it's called after 'QueryProcessor' has processed the query.
*/
void handler_WCDSS_MYSQL_COM_QUERY_USE_DB(PtrSize_t *pkt);
// void handler_WCDSS_MYSQL_COM_QUERY_USE_DB(PtrSize_t *pkt);
// void handler_WCDSS_MYSQL_COM_PING(PtrSize_t *);
// void handler_WCDSS_MYSQL_COM_CHANGE_USER(PtrSize_t *, bool *);
@ -113,47 +114,51 @@ class Client_Session
*/
// void handler_WCDSS_MYSQL_COM_RESET_CONNECTION(PtrSize_t *pkt);
// void handler_WCDSS_MYSQL_COM_SET_OPTION(PtrSize_t *);
void handler_WCDSS_MYSQL_COM_STATISTICS(PtrSize_t *);
// void handler_WCDSS_MYSQL_COM_STATISTICS(PtrSize_t *);
// void handler_WCDSS_MYSQL_COM_PROCESS_KILL(PtrSize_t *);
bool handler_WCDSS_MYSQL_COM_QUERY_qpo(PtrSize_t *, bool *lock_hostgroup, bool ps=false);
// bool handler_WCDSS_MYSQL_COM_QUERY_qpo(PtrSize_t *, bool *lock_hostgroup, bool ps=false);
void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_mysql_connection();
void return_proxysql_internal(PtrSize_t *);
/*
bool handler_special_queries(PtrSize_t *);
bool handler_CommitRollback(PtrSize_t *);
bool handler_SetAutocommit(PtrSize_t *);
*/
/**
* @brief Performs the cleanup of current session state, and the required operations to the supplied
* 'ProxySQL_Data_Stream' required for processing further queries.
* @param The 'ProxySQL_Data_Stream' which executed the previous query and which status should be updated.
*/
void RequestEnd_mysql(ProxySQL_Data_Stream *);
// void RequestEnd_mysql(ProxySQL_Data_Stream *);
void LogQuery(ProxySQL_Data_Stream *);
/*
void handler_WCDSS_MYSQL_COM_QUERY___create_mirror_session();
int handler_again___status_PINGING_SERVER();
int handler_again___status_RESETTING_CONNECTION();
*/
void handler_again___new_thread_to_kill_connection();
bool handler_again___verify_init_connect();
bool handler_again___verify_ldap_user_variable();
bool handler_again___verify_backend_autocommit();
bool handler_again___verify_backend_session_track_gtids();
bool handler_again___verify_backend_multi_statement();
bool handler_again___verify_backend_user_schema();
bool handler_again___status_SETTING_INIT_CONNECT(int *);
bool handler_again___status_SETTING_LDAP_USER_VARIABLE(int *);
// bool handler_again___verify_init_connect();
// bool handler_again___verify_ldap_user_variable();
// bool handler_again___verify_backend_autocommit();
// bool handler_again___verify_backend_session_track_gtids();
// bool handler_again___verify_backend_multi_statement();
// bool handler_again___verify_backend_user_schema();
// bool handler_again___status_SETTING_INIT_CONNECT(int *);
// bool handler_again___status_SETTING_LDAP_USER_VARIABLE(int *);
bool handler_again___status_SETTING_SQL_MODE(int *);
bool handler_again___status_SETTING_SESSION_TRACK_GTIDS(int *);
bool handler_again___status_CHANGING_CHARSET(int *_rc);
bool handler_again___status_CHANGING_SCHEMA(int *);
// bool handler_again___status_SETTING_SESSION_TRACK_GTIDS(int *);
// bool handler_again___status_CHANGING_CHARSET(int *_rc);
// bool handler_again___status_CHANGING_SCHEMA(int *);
bool handler_again___status_CONNECTING_SERVER(int *);
bool handler_again___status_CHANGING_USER_SERVER(int *);
bool handler_again___status_CHANGING_AUTOCOMMIT(int *);
bool handler_again___status_SETTING_MULTI_STMT(int *_rc);
bool handler_again___multiple_statuses(int *rc);
void add_ldap_comment_to_pkt(PtrSize_t *);
// bool handler_again___status_CHANGING_AUTOCOMMIT(int *);
// bool handler_again___status_SETTING_MULTI_STMT(int *_rc);
// bool handler_again___multiple_statuses(int *rc);
// void add_ldap_comment_to_pkt(PtrSize_t *);
int get_pkts_from_client(bool&, PtrSize_t&);
/*
@ -170,29 +175,29 @@ class Client_Session
/*
bool handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, ProxySQL_Data_Stream *myds, bool& prepared_stmt_with_no_params);
void handler_rc0_PROCESSING_STMT_EXECUTE(ProxySQL_Data_Stream *myds);
*/
bool handler_minus1_ClientLibraryError(ProxySQL_Data_Stream *myds, int myerr, char **errmsg);
void handler_minus1_LogErrorDuringQuery(MySQL_Connection *myconn, int myerr, char *errmsg);
bool handler_minus1_HandleErrorCodes(ProxySQL_Data_Stream *myds, int myerr, char **errmsg, int& handler_ret);
void handler_minus1_GenerateErrorMessage(ProxySQL_Data_Stream *myds, MySQL_Connection *myconn, bool& wrong_pass);
void handler_minus1_HandleBackendConnection(ProxySQL_Data_Stream *myds, MySQL_Connection *myconn);
*/
int RunQuery_mysql(ProxySQL_Data_Stream *myds, MySQL_Connection *myconn);
void handler___status_WAITING_CLIENT_DATA();
void handler_rc0_Process_GTID(MySQL_Connection *myconn);
void handler_WCDSS_MYSQL_COM_INIT_DB_replace_CLICKHOUSE(PtrSize_t& pkt);
void handler_WCDSS_MYSQL_COM_QUERY___not_mysql(PtrSize_t& pkt);
bool handler_WCDSS_MYSQL_COM_QUERY_detect_SQLi();
bool handler_WCDSS_MULTI_PACKET(PtrSize_t& pkt);
bool handler_WCDSS_MYSQL_COM__various(PtrSize_t* pkt, bool* wrong_pass);
void handler___status_WAITING_CLIENT_DATA___default();
// bool handler_WCDSS_MYSQL_COM_QUERY_detect_SQLi();
// bool handler_WCDSS_MULTI_PACKET(PtrSize_t& pkt);
// bool handler_WCDSS_MYSQL_COM__various(PtrSize_t* pkt, bool* wrong_pass);
// void handler___status_WAITING_CLIENT_DATA___default();
void handler___status_NONE_or_default(PtrSize_t& pkt);
void handler_WCDSS_MYSQL_COM_QUERY_inner1(PtrSize_t&);
void handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt);
// void handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt);
// void handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt);
// void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt);
// void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt);
// int handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(PtrSize_t *pkt, bool *lock_hostgroup, unsigned int nTrx, string& nq);
protected:
@ -200,8 +205,8 @@ class Client_Session
void reset();
public:
bool handler_again___status_SETTING_GENERIC_VARIABLE(int *_rc, const char *var_name, const char *var_value, bool no_quote=false, bool set_transaction=false);
bool handler_again___status_SETTING_SQL_LOG_BIN(int *);
// bool handler_again___status_SETTING_GENERIC_VARIABLE(int *_rc, const char *var_name, const char *var_value, bool no_quote=false, bool set_transaction=false);
// bool handler_again___status_SETTING_SQL_LOG_BIN(int *);
std::stack<enum session_status> previous_status;
void * operator new(size_t);
void operator delete(void *);
@ -222,7 +227,7 @@ class Client_Session
StatCounters *command_counters;
MySQL_Backend *mybe;
PtrArray *mybes;
ProxySQL_Data_Stream *client_myds;
// ProxySQL_Data_Stream *client_myds;
char * default_schema;
char * user_attributes;
@ -314,8 +319,8 @@ class Client_Session
MySQL_Backend * find_or_create_mysql_backend(int, ProxySQL_Data_Stream *_myds=NULL);
void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *, bool in_transaction=false, bool deprecate_eof_active=false);
void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS, ProxySQL_Data_Stream *_myds=NULL);
void MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt, MySQL_Connection *myconn);
// void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS, ProxySQL_Data_Stream *_myds=NULL);
// void MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt, MySQL_Connection *myconn);
unsigned int NumActiveTransactions();
bool HasOfflineBackends();
bool SetEventInOfflineBackends();
@ -325,7 +330,7 @@ class Client_Session
void reset_all_mysql_backends();
void writeout();
void Memory_Stats();
void create_new_session_and_reset_mysql_connection(ProxySQL_Data_Stream *_myds);
// void create_new_session_and_reset_mysql_connection(ProxySQL_Data_Stream *_myds);
bool handle_command_query_kill(PtrSize_t *);
/**
* @brief Performs the final operations after current query has finished to be executed. It updates the session

@ -0,0 +1,209 @@
#ifndef __CLASS_MYSQL_DATA_STREAM_H
#define __CLASS_MYSQL_DATA_STREAM_H
#include "proxysql.h"
#include "cpp.h"
#include "MySQL_Protocol.h"
/*
#define QUEUE_T_DEFAULT_SIZE 32768
#define MY_SSL_BUFFER 8192
*/
// this class avoid copying data
class MyDS_real_query {
public:
PtrSize_t pkt; // packet coming from the client
char *QueryPtr; // pointer to beginning of the query
unsigned int QuerySize; // size of the query
void init(PtrSize_t *_pkt) {
pkt.ptr=_pkt->ptr;
pkt.size=_pkt->size;
QueryPtr=(char *)pkt.ptr+5;
QuerySize=pkt.size-5;
}
void end() {
l_free(pkt.size,pkt.ptr);
pkt.size=0;
QuerySize=0;
pkt.ptr=NULL;
QueryPtr=NULL;
}
};
/*
enum sslstatus { SSLSTATUS_OK, SSLSTATUS_WANT_IO, SSLSTATUS_FAIL};
*/
class MySQL_Data_Stream: public ProxySQL_Data_Stream
{
private:
int array2buffer();
int buffer2array();
void generate_compressed_packet();
enum sslstatus do_ssl_handshake();
void queue_encrypted_bytes(const char *buf, size_t len);
public:
/*
queue_t queueIN;
uint64_t pkts_recv; // counter of received packets
queue_t queueOUT;
uint64_t pkts_sent; // counter of sent packets
*/
struct {
PtrSize_t pkt;
unsigned int partial;
} CompPktIN;
struct {
PtrSize_t pkt;
unsigned int partial;
} CompPktOUT;
MySQL_Protocol myprot;
MyDS_real_query mysql_real_query;
/*
bytes_stats_t bytes_info; // bytes statistics
PtrSize_t multi_pkt;
unsigned long long pause_until;
unsigned long long wait_until;
unsigned long long killed_at;
unsigned long long max_connect_time;
struct {
unsigned long long questions;
unsigned long long myconnpoll_get;
unsigned long long myconnpoll_put;
} statuses;
PtrSizeArray *PSarrayIN;
PtrSizeArray *PSarrayOUT;
//PtrSizeArray *PSarrayOUTpending;
PtrSizeArray *resultset;
unsigned int resultset_length;
ProxySQL_Poll *mypolls;
//int listener;
*/
MySQL_Connection *myconn;
/*
Client_Session *sess; // pointer to the session using this data stream
*/
MySQL_Backend *mybe; // if this is a connection to a mysql server, this points to a backend structure
/*
char *x509_subject_alt_name;
SSL *ssl;
BIO *rbio_ssl;
BIO *wbio_ssl;
char *ssl_write_buf;
size_t ssl_write_len;
struct sockaddr *client_addr;
struct {
char *addr;
int port;
} addr;
struct {
char *addr;
int port;
} proxy_addr;
unsigned int connect_tries;
int query_retries_on_failure;
int connect_retries_on_failure;
enum data_stream_status DSS;
enum MySQL_DS_type myds_type;
socklen_t client_addrlen;
int fd; // file descriptor
int poll_fds_idx;
int active_transaction; // 1 if there is an active transaction
int active; // data stream is active. If not, shutdown+close needs to be called
int status; // status . FIXME: make it a ORable variable
int switching_auth_stage;
int switching_auth_type;
unsigned int tmp_charset;
short revents;
char kill_type;
bool encrypted;
bool net_failure;
uint8_t pkt_sid;
*/
bool com_field_list;
char *com_field_wild;
MySQL_Data_Stream();
~MySQL_Data_Stream();
int array2buffer_full();
void init(); // initialize the data stream
void init(enum MySQL_DS_type, Client_Session *, int); // initialize with arguments
/*
void shut_soft();
void shut_hard();
*/
int read_from_net();
int write_to_net();
int write_to_net_poll();
/*
bool available_data_out();
void remove_pollout();
*/
void set_pollout();
void mysql_free();
/*
void set_net_failure();
void setDSS_STATE_QUERY_SENT_NET();
void setDSS(enum data_stream_status dss) {
DSS=dss;
}
*/
int read_pkts();
int write_pkts();
void unplug_backend();
void check_data_flow();
int assign_fd_from_mysql_conn();
unsigned char * resultset2buffer(bool);
void buffer2resultset(unsigned char *, unsigned int);
// safe way to attach a MySQL Connection
void attach_connection(MySQL_Connection *mc) {
statuses.myconnpoll_get++;
myconn=mc;
myconn->statuses.myconnpoll_get++;
mc->myds=this;
}
// safe way to detach a MySQL Connection
void detach_connection() {
assert(myconn);
myconn->statuses.myconnpoll_put++;
statuses.myconnpoll_put++;
myconn->myds=NULL;
myconn=NULL;
}
void return_MySQL_Connection_To_Pool();
void destroy_MySQL_Connection_From_Pool(bool sq);
void free_mysql_real_query();
/*
void reinit_queues();
void destroy_queues();
bool data_in_rbio();
*/
};
#endif /* __CLASS_MYSQL_DATA_STREAM_H */

@ -24,7 +24,7 @@ class MySQL_ResultSet {
bool resultset_completed;
//bool reset_pid;
uint8_t sid;
ProxySQL_Data_Stream *myds;
MySQL_Data_Stream *myds;
MySQL_Protocol *myprot;
MYSQL *mysql;
MYSQL_RES *result;
@ -53,7 +53,7 @@ class MySQL_ResultSet {
unsigned int add_row2(MYSQL_ROWS *row, unsigned char *offset);
void add_eof();
void remove_last_eof();
void add_err(ProxySQL_Data_Stream *_myds);
void add_err(MySQL_Data_Stream *myds);
bool get_resultset(PtrSizeArray *PSarrayFinal);
//bool generate_COM_FIELD_LIST_response(PtrSizeArray *PSarrayFinal);
unsigned char *buffer;
@ -96,17 +96,17 @@ class MySQL_Protocol {
MySQL_Connection_userinfo *userinfo;
Client_Session *sess;
public:
ProxySQL_Data_Stream **myds;
MySQL_Data_Stream **myds;
#ifdef DEBUG
bool dump_pkt;
#endif
MySQL_Prepared_Stmt_info *current_PreStmt;
uint16_t prot_status;
ProxySQL_Data_Stream *get_myds() { return *myds; }
MySQL_Data_Stream *get_myds() { return *myds; }
MySQL_Protocol() {
prot_status=0;
}
void init(ProxySQL_Data_Stream **, MySQL_Connection_userinfo *, Client_Session *);
void init(MySQL_Data_Stream **, MySQL_Connection_userinfo *, Client_Session *);
// members get as arguments:
// - a data stream (optionally NULL for some)

@ -76,19 +76,16 @@ class Query_Info {
class MySQL_Session: public Client_Session
{
friend class Client_Session;
friend class Admin_MySQL_Session;
friend class Query_Info;
friend class MySQL_Protocol;
private:
/*
//int handler_ret;
void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *, bool *);
void handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *);
*/
void handler_WCDSS_MYSQL_COM_FIELD_LIST(PtrSize_t *);
/*
void handler_WCDSS_MYSQL_COM_INIT_DB(PtrSize_t *);
*/
/**
* @brief Handles 'COM_QUERIES' holding 'USE DB' statements.
*
@ -100,9 +97,7 @@ class MySQL_Session: public Client_Session
* But since it was change for handling 'USE' statements which are preceded by
* comments, it's called after 'QueryProcessor' has processed the query.
*/
/*
void handler_WCDSS_MYSQL_COM_QUERY_USE_DB(PtrSize_t *pkt);
*/
void handler_WCDSS_MYSQL_COM_PING(PtrSize_t *);
void handler_WCDSS_MYSQL_COM_CHANGE_USER(PtrSize_t *, bool *);
/**
@ -121,33 +116,32 @@ class MySQL_Session: public Client_Session
*/
void handler_WCDSS_MYSQL_COM_RESET_CONNECTION(PtrSize_t *pkt);
void handler_WCDSS_MYSQL_COM_SET_OPTION(PtrSize_t *);
/*
void handler_WCDSS_MYSQL_COM_STATISTICS(PtrSize_t *);
*/
void handler_WCDSS_MYSQL_COM_PROCESS_KILL(PtrSize_t *);
/*
bool handler_WCDSS_MYSQL_COM_QUERY_qpo(PtrSize_t *, bool *lock_hostgroup, bool ps=false);
/*
void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_mysql_connection();
void return_proxysql_internal(PtrSize_t *);
*/
bool handler_special_queries(PtrSize_t *);
bool handler_CommitRollback(PtrSize_t *);
bool handler_SetAutocommit(PtrSize_t *);
*/
/**
* @brief Performs the cleanup of current session state, and the required operations to the supplied
* 'ProxySQL_Data_Stream' required for processing further queries.
* @param The 'ProxySQL_Data_Stream' which executed the previous query and which status should be updated.
*/
/*
void RequestEnd_mysql(ProxySQL_Data_Stream *);
/*
void LogQuery(ProxySQL_Data_Stream *);
*/
void handler_WCDSS_MYSQL_COM_QUERY___create_mirror_session();
int handler_again___status_PINGING_SERVER();
int handler_again___status_RESETTING_CONNECTION();
/*
void handler_again___new_thread_to_kill_connection();
*/
bool handler_again___verify_init_connect();
bool handler_again___verify_ldap_user_variable();
@ -157,21 +151,23 @@ class MySQL_Session: public Client_Session
bool handler_again___verify_backend_user_schema();
bool handler_again___status_SETTING_INIT_CONNECT(int *);
bool handler_again___status_SETTING_LDAP_USER_VARIABLE(int *);
/*
bool handler_again___status_SETTING_SQL_MODE(int *);
*/
bool handler_again___status_SETTING_SESSION_TRACK_GTIDS(int *);
bool handler_again___status_CHANGING_CHARSET(int *_rc);
bool handler_again___status_CHANGING_SCHEMA(int *);
/*
bool handler_again___status_CONNECTING_SERVER(int *);
bool handler_again___status_CHANGING_USER_SERVER(int *);
*/
bool handler_again___status_CHANGING_AUTOCOMMIT(int *);
bool handler_again___status_SETTING_MULTI_STMT(int *_rc);
bool handler_again___multiple_statuses(int *rc);
*/
void mysql_session_init();
void mysql_session_reset();
/*
void add_ldap_comment_to_pkt(PtrSize_t *);
/*
int get_pkts_from_client(bool&, PtrSize_t&);
*/
void handler_WCDSS_MYSQL_COM_STMT_RESET(PtrSize_t&);
@ -186,34 +182,39 @@ class MySQL_Session: public Client_Session
*/
bool handler_rc0_PROCESSING_STMT_PREPARE(enum session_status& st, ProxySQL_Data_Stream *myds, bool& prepared_stmt_with_no_params);
void handler_rc0_PROCESSING_STMT_EXECUTE(ProxySQL_Data_Stream *myds);
/*
bool handler_minus1_ClientLibraryError(ProxySQL_Data_Stream *myds, int myerr, char **errmsg);
void handler_minus1_LogErrorDuringQuery(MySQL_Connection *myconn, int myerr, char *errmsg);
bool handler_minus1_HandleErrorCodes(ProxySQL_Data_Stream *myds, int myerr, char **errmsg, int& handler_ret);
void handler_minus1_GenerateErrorMessage(ProxySQL_Data_Stream *myds, MySQL_Connection *myconn, bool& wrong_pass);
void handler_minus1_HandleBackendConnection(ProxySQL_Data_Stream *myds, MySQL_Connection *myconn);
/*
int RunQuery_mysql(ProxySQL_Data_Stream *myds, MySQL_Connection *myconn);
*/
void handler___status_WAITING_CLIENT_DATA();
/*
void handler_rc0_Process_GTID(MySQL_Connection *myconn);
void handler_WCDSS_MYSQL_COM_INIT_DB_replace_CLICKHOUSE(PtrSize_t& pkt);
void handler_WCDSS_MYSQL_COM_QUERY___not_mysql(PtrSize_t& pkt);
*/
bool handler_WCDSS_MYSQL_COM_QUERY_detect_SQLi();
bool handler_WCDSS_MULTI_PACKET(PtrSize_t& pkt);
bool handler_WCDSS_MYSQL_COM__various(PtrSize_t* pkt, bool* wrong_pass);
void handler___status_WAITING_CLIENT_DATA___default();
void handler___status_NONE_or_default(PtrSize_t& pkt);
/*
void handler_WCDSS_MYSQL_COM_QUERY_inner1(PtrSize_t&);
*/
void handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_OK_msg(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t *pkt);
void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t *pkt);
// int handler_WCD_SS_MCQ_qpo_Parse_SQL_LOG_BIN(PtrSize_t *pkt, bool *lock_hostgroup, unsigned int nTrx, string& nq);
public:
MySQL_Session();
~MySQL_Session();
bool handler_again___status_SETTING_GENERIC_VARIABLE(int *_rc, const char *var_name, const char *var_value, bool no_quote=false, bool set_transaction=false);
bool handler_again___status_SETTING_SQL_LOG_BIN(int *);
/*
std::stack<enum session_status> previous_status;
void * operator new(size_t);
void operator delete(void *);
@ -234,7 +235,9 @@ class MySQL_Session: public Client_Session
StatCounters *command_counters;
MySQL_Backend *mybe;
PtrArray *mybes;
ProxySQL_Data_Stream *client_myds;
*/
MySQL_Data_Stream *client_myds;
/*
char * default_schema;
char * user_attributes;
@ -330,8 +333,10 @@ class MySQL_Session: public Client_Session
MySQL_Backend * find_or_create_mysql_backend(int, ProxySQL_Data_Stream *_myds=NULL);
void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *, bool in_transaction=false, bool deprecate_eof_active=false);
*/
void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MySQL_ResultSet *MyRS, ProxySQL_Data_Stream *_myds=NULL);
void MySQL_Stmt_Result_to_MySQL_wire(MYSQL_STMT *stmt, MySQL_Connection *myconn);
/*
unsigned int NumActiveTransactions();
bool HasOfflineBackends();
bool SetEventInOfflineBackends();
@ -339,9 +344,13 @@ class MySQL_Session: public Client_Session
unsigned long long IdleTime();
void reset_all_mysql_backends();
*/
void writeout();
/*
void Memory_Stats();
void create_new_session_and_reset_mysql_connection(ProxySQL_Data_Stream *_myds);
*/
void create_new_session_and_reset_mysql_connection(MySQL_Data_Stream *_myds);
/*
bool handle_command_query_kill(PtrSize_t *);
*/
/**
@ -365,6 +374,13 @@ class MySQL_Session: public Client_Session
void detected_broken_connection(const char *file, unsigned int line, const char *func, const char *action, MySQL_Connection *myconn, int myerr, const char *message, bool verbose=false);
void generate_status_one_hostgroup(int hid, std::string& s);
*/
bool RunQuery_Success(MySQL_Connection *, bool& prepared_stmt_with_no_params);
bool RunQuery_Failed(MySQL_Connection *, bool&, int&);
bool RunQuery_Continue(MySQL_Connection *, int);
bool ProcessingRequest_MatchEnvironment(MySQL_Connection *myconn);
void LogKillQueryTimeout(MySQL_Data_Stream *myds, char *filename, int line);
};
/*

@ -8,21 +8,22 @@
#include <vector>
#include <memory>
class Client_Session;
class MySQL_Session;
extern const MARIADB_CHARSET_INFO * proxysql_find_charset_nr(unsigned int nr);
extern MARIADB_CHARSET_INFO * proxysql_find_charset_name(const char *name);
extern MARIADB_CHARSET_INFO * proxysql_find_charset_collate(const char *collatename);
extern void print_backtrace(void);
typedef bool (*verify_var)(Client_Session* session, int idx, uint32_t client_hash, uint32_t server_hash);
typedef bool (*update_var)(Client_Session* session, int idx, int &_rc);
typedef bool (*verify_var)(MySQL_Session* session, int idx, uint32_t client_hash, uint32_t server_hash);
typedef bool (*update_var)(MySQL_Session* session, int idx, int &_rc);
bool validate_charset(Client_Session* session, int idx, int &_rc);
bool update_server_variable(Client_Session* session, int idx, int &_rc);
bool verify_server_variable(Client_Session* session, int idx, uint32_t client_hash, uint32_t server_hash);
bool validate_charset(MySQL_Session* session, int idx, int &_rc);
bool update_server_variable(MySQL_Session* session, int idx, int &_rc);
bool verify_server_variable(MySQL_Session* session, int idx, uint32_t client_hash, uint32_t server_hash);
bool verify_set_names(Client_Session* session);
bool logbin_update_server_variable(Client_Session* session, int idx, int &_rc);
bool verify_set_names(MySQL_Session* session);
bool logbin_update_server_variable(MySQL_Session* session, int idx, int &_rc);
class MySQL_Variables {
static verify_var verifiers[SQL_NAME_LAST_HIGH_WM];
@ -37,20 +38,22 @@ public:
virtual ~MySQL_Variables();
bool client_set_value(Client_Session* session, int idx, const std::string& value);
bool client_set_hash_and_value(Client_Session* session, int idx, const std::string& value, uint32_t hash);
const char* client_get_value(Client_Session* session, int idx) const;
uint32_t client_get_hash(Client_Session* session, int idx) const;
bool client_set_value(MySQL_Session* session, int idx, const std::string& value);
bool client_set_hash_and_value(MySQL_Session* session, int idx, const std::string& value, uint32_t hash);
const char* client_get_value(MySQL_Session* session, int idx) const;
uint32_t client_get_hash(MySQL_Session* session, int idx) const;
void server_set_value(Client_Session* session, int idx, const char* value);
void server_set_hash_and_value(Client_Session* session, int idx, const char* value, uint32_t hash);
const char* server_get_value(Client_Session* session, int idx) const;
inline uint32_t server_get_hash(Client_Session* session, int idx) const;
void server_set_value(MySQL_Session* session, int idx, const char* value);
void server_set_hash_and_value(MySQL_Session* session, int idx, const char* value, uint32_t hash);
const char* server_get_value(MySQL_Session* session, int idx) const;
inline uint32_t server_get_hash(MySQL_Session* session, int idx) const;
bool verify_variable(Client_Session* session, int idx) const;
bool verify_variable(MySQL_Session* session, int idx) const;
bool update_variable(Client_Session* session, session_status status, int &_rc);
bool parse_variable_boolean(Client_Session *sess, int idx, std::string &value1, bool* lock_hostgroup);
bool parse_variable_number(Client_Session *sess, int idx, std::string &value1, bool* lock_hostgroup);
bool update_variable(MySQL_Session* session, session_status status, int &_rc);
bool parse_variable_boolean(MySQL_Session *sess, int idx, std::string &value1, bool* lock_hostgroup);
bool parse_variable_number(MySQL_Session *sess, int idx, std::string &value1, bool* lock_hostgroup);
};
#endif // #ifndef MYSQL_VARIABLES_H

@ -1,5 +1,5 @@
#ifndef __CLASS_MYSQL_DATA_STREAM_H
#define __CLASS_MYSQL_DATA_STREAM_H
#ifndef __CLASS_PROXYSQL_DATA_STREAM_H
#define __CLASS_PROXYSQL_DATA_STREAM_H
#include "proxysql.h"
#include "cpp.h"
@ -9,6 +9,7 @@
#define QUEUE_T_DEFAULT_SIZE 32768
#define MY_SSL_BUFFER 8192
/*
// this class avoid copying data
class MyDS_real_query {
public:
@ -16,12 +17,6 @@ class MyDS_real_query {
char *QueryPtr; // pointer to beginning of the query
unsigned int QuerySize; // size of the query
void init(PtrSize_t *_pkt) {
/*
assert(QueryPtr==NULL);
assert(QuerySize==0);
assert(pkt.ptr==NULL);
assert(pkt.size==0);
*/
pkt.ptr=_pkt->ptr;
pkt.size=_pkt->size;
QueryPtr=(char *)pkt.ptr+5;
@ -35,17 +30,20 @@ class MyDS_real_query {
QueryPtr=NULL;
}
};
*/
enum sslstatus { SSLSTATUS_OK, SSLSTATUS_WANT_IO, SSLSTATUS_FAIL};
class ProxySQL_Data_Stream
{
private:
/*
int array2buffer();
int buffer2array();
void generate_compressed_packet();
enum sslstatus do_ssl_handshake();
void queue_encrypted_bytes(const char *buf, size_t len);
*/
public:
queue_t queueIN;
@ -53,6 +51,7 @@ class ProxySQL_Data_Stream
queue_t queueOUT;
uint64_t pkts_sent; // counter of sent packets
/*
struct {
PtrSize_t pkt;
unsigned int partial;
@ -64,6 +63,7 @@ class ProxySQL_Data_Stream
MySQL_Protocol myprot;
MyDS_real_query mysql_real_query;
*/
bytes_stats_t bytes_info; // bytes statistics
PtrSize_t multi_pkt;
@ -87,9 +87,9 @@ class ProxySQL_Data_Stream
ProxySQL_Poll *mypolls;
//int listener;
MySQL_Connection *myconn;
// MySQL_Connection *myconn;
Client_Session *sess; // pointer to the session using this data stream
MySQL_Backend *mybe; // if this is a connection to a mysql server, this points to a backend structure
// MySQL_Backend *mybe; // if this is a connection to a mysql server, this points to a backend structure
char *x509_subject_alt_name;
SSL *ssl;
BIO *rbio_ssl;
@ -136,24 +136,24 @@ class ProxySQL_Data_Stream
uint8_t pkt_sid;
bool com_field_list;
char *com_field_wild;
// bool com_field_list;
// char *com_field_wild;
ProxySQL_Data_Stream();
~ProxySQL_Data_Stream();
int array2buffer_full();
void init(); // initialize the data stream
void init(enum MySQL_DS_type, Client_Session *, int); // initialize with arguments
// int array2buffer_full();
// void init(); // initialize the data stream
// void init(enum MySQL_DS_type, Client_Session *, int); // initialize with arguments
void shut_soft();
void shut_hard();
int read_from_net();
int write_to_net();
int write_to_net_poll();
// int read_from_net();
// int write_to_net();
// int write_to_net_poll();
bool available_data_out();
void remove_pollout();
void set_pollout();
void mysql_free();
// void set_pollout();
// void mysql_free();
void set_net_failure();
void setDSS_STATE_QUERY_SENT_NET();
@ -162,11 +162,12 @@ class ProxySQL_Data_Stream
DSS=dss;
}
int read_pkts();
int write_pkts();
// int read_pkts();
// int write_pkts();
void unplug_backend();
// void unplug_backend();
/*
void check_data_flow();
int assign_fd_from_mysql_conn();
@ -194,9 +195,10 @@ class ProxySQL_Data_Stream
void destroy_MySQL_Connection_From_Pool(bool sq);
void free_mysql_real_query();
*/
void reinit_queues();
void destroy_queues();
bool data_in_rbio();
};
#endif /* __CLASS_MYSQL_DATA_STREAM_H */
#endif /* __CLASS_PROXYSQL_DATA_STREAM_H */

@ -164,7 +164,7 @@ class ProxyWorker_Thread
void read_one_byte_from_pipe(unsigned int n);
void tune_timeout_for_myds_needs_pause(ProxySQL_Data_Stream *myds);
void tune_timeout_for_session_needs_pause(ProxySQL_Data_Stream *myds);
void configure_pollout(ProxySQL_Data_Stream *myds, unsigned int n);
void configure_pollout(MySQL_Data_Stream *myds, unsigned int n);
protected:
int nfds;
@ -216,7 +216,7 @@ class ProxyWorker_Thread
pthread_mutex_t thread_mutex;
ProxyWorker_Thread();
~ProxyWorker_Thread();
Client_Session * create_new_session_and_client_data_stream(int _fd);
MySQL_Session * create_new_session_and_client_mysql_data_stream(int _fd);
bool init();
void run___get_multiple_idle_connections(int& num_idles);
void run___cleanup_mirror_queue();
@ -231,7 +231,7 @@ class ProxyWorker_Thread
bool process_data_on_mysql_data_stream(ProxySQL_Data_Stream *myds, unsigned int n);
void ProcessAllSessions_SortingSessions();
void ProcessAllSessions_CompletedMirrorSession(unsigned int& n, Client_Session *sess);
void ProcessAllSessions_MaintenanceLoop(Client_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_);
void ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_);
void process_all_sessions();
void refresh_variables();
void register_session_connection_handler(Client_Session *_sess, bool _new=false);

@ -62,7 +62,7 @@ class SQLite3_Server {
unsigned int num_aurora_servers[3];
unsigned int max_num_aurora_servers;
pthread_mutex_t aurora_mutex;
void populate_aws_aurora_table(Client_Session *sess);
void populate_aws_aurora_table(MySQL_Session *sess);
void init_aurora_ifaces_string(std::string& s);
#endif // TEST_AURORA
#ifdef TEST_GALERA
@ -70,13 +70,13 @@ class SQLite3_Server {
unsigned int num_galera_servers[3];
unsigned int max_num_galera_servers;
pthread_mutex_t galera_mutex;
void populate_galera_table(Client_Session *sess);
void populate_galera_table(MySQL_Session *sess);
void init_galera_ifaces_string(std::string& s);
#endif // TEST_GALERA
#ifdef TEST_GROUPREP
unsigned int max_num_grouprep_servers;
pthread_mutex_t grouprep_mutex;
void populate_grouprep_table(Client_Session *sess, int txs_behind = 0);
void populate_grouprep_table(MySQL_Session *sess, int txs_behind = 0);
void init_grouprep_ifaces_string(std::string& s);
group_rep_status grouprep_test_value(const std::string& srv_addr);
#endif // TEST_GROUPREP

@ -18,7 +18,7 @@
#include "fileutils.hpp"
#include "configfile.hpp"
//#include "query_processor.h"
#include "proxysql_admin.h"
//#include "proxysql_admin.h"
//#include "SQLite3_Server.h"
#ifdef PROXYSQLCLICKHOUSE
#include "ClickHouse_Server.h"

@ -7,12 +7,10 @@
class MySQL_Backend
{
public:
void * operator new(size_t);
void operator delete(void *);
int hostgroup_id;
char gtid_uuid[128];
uint64_t gtid_trxid;
ProxySQL_Data_Stream *server_myds;
MySQL_Data_Stream *server_myds;
// mysql_cp_entry_t *server_mycpe;
bytes_stats_t server_bytes_at_cmd;
//MySQL_Hostgroup_Entry *mshge;

@ -107,7 +107,7 @@ class MySQL_Connection {
MySQL_ResultSet *MyRS_reuse;
MySrvC *parent;
MySQL_Connection_userinfo *userinfo;
ProxySQL_Data_Stream *myds;
MySQL_Data_Stream *myds;
/**
* @brief Keeps tracks of the 'server_status'. Do not confuse with the 'server_status' from the
* 'MYSQL' connection itself. This flag keeps track of the configured server status from the

@ -13,6 +13,7 @@
#include <array>
#include "ProxySQL_RESTAPI_Server.hpp"
//#include "MySQL_Session.h"
typedef struct { uint32_t hash; uint32_t key; } t_symstruct;
class ProxySQL_Config;
@ -116,6 +117,21 @@ struct admin_metrics_map_idx {
// ProxySQL_Admin shared variables
extern int admin__web_verbosity;
// class Admin_MySQL_Session is identical to MySQL_Session
// but has different session_type
// ... or no ?!?!!!
/*
class Admin_MySQL_Session: public MySQL_Session
{
friend class Client_Session;
friend class MySQL_Session;
friend class Query_Info;
friend class MySQL_Protocol;
private:
public:
};
*/
class ProxySQL_Admin {
private:
volatile int main_shutdown;

@ -458,9 +458,11 @@ typedef struct __SQP_query_parser_t SQP_par_t;
#ifndef PROXYSQL_CLASSES
#define PROXYSQL_CLASSES
class ProxySQL_Data_Stream;
class MySQL_Data_Stream;
class MySQL_Connection_userinfo;
class Client_Session;
class MySQL_Session;
class ProxySQL_Admin;
class MySQL_Backend;
class MySQL_Monitor;
class ProxyWorker_Thread;

@ -9,8 +9,12 @@
#include "MySQL_Logger.hpp"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "MySQL_Session.h"
#include "query_processor.h"
#include "proxysql_admin.h"
#include <search.h>
#include <stdlib.h>
#include <stdio.h>
@ -68,11 +72,11 @@
using namespace clickhouse;
__thread Client_Session * clickhouse_thread___mysql_sess;
__thread MySQL_Session * clickhouse_thread___mysql_sess;
//static void ClickHouse_to_MySQL(SQLite3_result *result, char *error, int affected_rows, MySQL_Protocol *myprot) {
inline void ClickHouse_to_MySQL(const Block& block) {
Client_Session *sess = clickhouse_thread___mysql_sess;
MySQL_Session *sess = clickhouse_thread___mysql_sess;
MySQL_Protocol *myprot=NULL;
myprot=&sess->client_myds->myprot;
@ -434,7 +438,8 @@ class sqlite3server_main_loop_listeners {
static sqlite3server_main_loop_listeners S_amll;
void ClickHouse_Server_session_handler(Client_Session *sess, void *_pa, PtrSize_t *pkt) {
void ClickHouse_Server_session_handler(Client_Session *c_sess, void *_pa, PtrSize_t *pkt) {
MySQL_Session *sess = (MySQL_Session *)c_sess;
char *error=NULL;
int cols;
int affected_rows;
@ -1077,7 +1082,7 @@ __run_query:
if (clickhouse_sess->connected == true) {
if (clickhouse_sess->schema_initialized == false) {
if (sess && sess->client_myds) {
ProxySQL_Data_Stream *ds = sess->client_myds;
MySQL_Data_Stream *ds = sess->client_myds;
if (ds->myconn && ds->myconn->userinfo && ds->myconn->userinfo->schemaname) {
char *sn = ds->myconn->userinfo->schemaname;
char *use_query = NULL;
@ -1215,8 +1220,8 @@ static void *child_mysql(void *arg) {
ProxyWorker_Thread *mysql_thr=new ProxyWorker_Thread();
mysql_thr->curtime=monotonic_time();
Client_Session *sess = NULL;
ProxySQL_Data_Stream *myds = NULL;
MySQL_Session *sess = NULL;
MySQL_Data_Stream *myds = NULL;
ClickHouse_Session *sqlite_sess = new ClickHouse_Session();
sqlite_sess->init();
@ -1224,7 +1229,7 @@ static void *child_mysql(void *arg) {
GloQPro->init_thread();
mysql_thr->refresh_variables();
sess=mysql_thr->create_new_session_and_client_data_stream(client);
sess=mysql_thr->create_new_session_and_client_mysql_data_stream(client);
sess->thread=mysql_thr;
sess->session_type = PROXYSQL_SESSION_CLICKHOUSE;
sess->handler_function=ClickHouse_Server_session_handler;

File diff suppressed because it is too large Load Diff

@ -123,7 +123,7 @@ MYCXXFLAGS=-std=c++11 $(MYCFLAGS) $(PSQLCH) $(ENABLE_EPOLL)
default: libproxysql.a
.PHONY: default
_OBJ_CXX = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo ProxySQL_Data_Stream.oo ProxyWorker_Thread.oo Client_Session.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo set_parser.oo MySQL_Variables.oo c_tokenizer.oo proxysql_utils.oo
_OBJ_CXX = ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo ProxySQL_Data_Stream.oo MySQL_Data_Stream.oo ProxyWorker_Thread.oo Client_Session.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo set_parser.oo MySQL_Variables.oo c_tokenizer.oo proxysql_utils.oo
OBJ_CXX = $(patsubst %,$(ODIR)/%,$(_OBJ_CXX))
HEADERS = ../include/*.h ../include/*.hpp

File diff suppressed because it is too large Load Diff

@ -18,6 +18,11 @@
#include "prometheus_helpers.h"
#include "proxysql_utils.h"
#include "MySQL_Data_Stream.h"
#include "MySQL_Session.h"
#include "proxysql_admin.h"
#define char_malloc (char *)malloc
#define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); }
@ -3193,8 +3198,9 @@ MySQL_Connection * MySrvConnList::get_random_MyConn(Client_Session *sess, bool f
} else {
i=fastrand()%l;
}
if (sess && sess->client_myds && sess->client_myds->myconn && sess->client_myds->myconn->userinfo) {
MySQL_Connection * client_conn = sess->client_myds->myconn;
// APPLY THIS LOGIC ONLY FOR MYSQL FRONTEND
if (sess && ((MySQL_Session *)sess)->client_myds && ((MySQL_Session *)sess)->client_myds->myconn && ((MySQL_Session *)sess)->client_myds->myconn->userinfo) {
MySQL_Connection * client_conn = ((MySQL_Session *)sess)->client_myds->myconn;
get_random_MyConn_inner_search(i, l, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn);
if (connection_quality_level !=3 ) { // we didn't find the perfect connection
get_random_MyConn_inner_search(0, i, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn);

@ -7,6 +7,9 @@
#include "MySQL_PreparedStatement.h"
#include "MySQL_Logger.hpp"
#include "MySQL_Session.h"
#include "MySQL_Data_Stream.h"
#include <dirent.h>
#include <libgen.h>
@ -224,13 +227,16 @@ void MySQL_Event::write_auth(std::fstream *f, Client_Session *sess) {
default:
break;
}
if (sess->client_myds) {
if (sess->client_myds->proxy_addr.addr) {
std::string s = sess->client_myds->proxy_addr.addr;
s += ":" + std::to_string(sess->client_myds->proxy_addr.port);
j["proxy_addr"] = s;
if (sess->session_type==PROXYSQL_SESSION_MYSQL || sess->session_type==PROXYSQL_SESSION_ADMIN || sess->session_type==PROXYSQL_SESSION_SQLITE) {
MySQL_Data_Stream *client_myds = ((MySQL_Session *)sess)->client_myds;
if (client_myds) {
if (client_myds->proxy_addr.addr) {
std::string s = client_myds->proxy_addr.addr;
s += ":" + std::to_string(client_myds->proxy_addr.port);
j["proxy_addr"] = s;
}
j["ssl"] = client_myds->encrypted;
}
j["ssl"] = sess->client_myds->encrypted;
}
// for performance reason, we are moving the write lock
// right before the write to disk
@ -623,10 +629,16 @@ void MySQL_Logger::audit_set_datadir(char *s) {
flush_log();
};
void MySQL_Logger::log_request(Client_Session *sess, ProxySQL_Data_Stream *myds) {
void MySQL_Logger::log_request(Client_Session *c_sess, ProxySQL_Data_Stream *pds) {
if (events.enabled==false) return;
if (events.logfile==NULL) return;
if (c_sess->session_type != PROXYSQL_SESSION_MYSQL)
return;
MySQL_Session *sess = (MySQL_Session *)c_sess;
MySQL_Data_Stream *myds = (MySQL_Data_Stream *)pds;
MySQL_Connection_userinfo *ui=sess->client_myds->myconn->userinfo;
uint64_t curtime_real=realtime_time();
@ -741,11 +753,16 @@ void MySQL_Logger::log_request(Client_Session *sess, ProxySQL_Data_Stream *myds)
}
}
void MySQL_Logger::log_audit_entry(log_event_type _et, Client_Session *sess, ProxySQL_Data_Stream *myds, char *xi) {
void MySQL_Logger::log_audit_entry(log_event_type _et, Client_Session *c_sess, ProxySQL_Data_Stream *pds, char *xi) {
if (audit.enabled==false) return;
if (audit.logfile==NULL) return;
if (sess == NULL) return;
if (c_sess == NULL) return;
if (c_sess->session_type != PROXYSQL_SESSION_MYSQL)
return;
MySQL_Session *sess = (MySQL_Session *)c_sess;
MySQL_Data_Stream *myds = (MySQL_Data_Stream *)pds;
if (sess->client_myds == NULL) return;
MySQL_Connection_userinfo *ui= NULL;

@ -6,6 +6,7 @@
#include "MySQL_PreparedStatement.h"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "MySQL_Authentication.hpp"
#include "MySQL_LDAP_Authentication.hpp"
#include "MySQL_Variables.h"
@ -297,7 +298,7 @@ MySQL_Prepared_Stmt_info::MySQL_Prepared_Stmt_info(unsigned char *pkt, unsigned
void MySQL_Protocol::init(ProxySQL_Data_Stream **__myds, MySQL_Connection_userinfo *__userinfo, Client_Session *__sess) {
void MySQL_Protocol::init(MySQL_Data_Stream **__myds, MySQL_Connection_userinfo *__userinfo, Client_Session *__sess) {
myds=__myds;
userinfo=__userinfo;
sess=__sess;
@ -1464,7 +1465,8 @@ bool MySQL_Protocol::process_pkt_COM_CHANGE_USER(unsigned char *pkt, unsigned in
// Check and get 'Client Auth Plugin' if capability is supported
char* client_auth_plugin = nullptr;
if (pkt + len > pkt + cur) {
int capabilities = (*myds)->sess->client_myds->myconn->options.client_flag;
//int capabilities = (*myds)->sess->client_myds->myconn->options.client_flag; // was this a bug?
int capabilities = (*myds)->myconn->options.client_flag;
if (capabilities & CLIENT_PLUGIN_AUTH) {
client_auth_plugin = reinterpret_cast<char*>(pkt + cur);
}
@ -1601,13 +1603,14 @@ bool MySQL_Protocol::process_pkt_COM_CHANGE_USER(unsigned char *pkt, unsigned in
if (default_transaction_isolation != j_user_attributes.end()) {
std::string def_trx_isolation_val =
j_user_attributes["default-transaction_isolation"].get<std::string>();
mysql_variables.client_set_value((*myds)->sess, SQL_ISOLATION_LEVEL, def_trx_isolation_val.c_str());
mysql_variables.client_set_value((MySQL_Session *)((*myds)->sess), SQL_ISOLATION_LEVEL, def_trx_isolation_val.c_str());
}
}
}
assert(sess);
assert(sess->client_myds);
MySQL_Connection *myconn=sess->client_myds->myconn;
assert(sess->session_type==PROXYSQL_SESSION_MYSQL);
assert(((MySQL_Session *)sess)->client_myds);
MySQL_Connection *myconn=((MySQL_Session *)sess)->client_myds->myconn;
assert(myconn);
myconn->set_charset(charset, CONNECT_START);
@ -1618,10 +1621,10 @@ bool MySQL_Protocol::process_pkt_COM_CHANGE_USER(unsigned char *pkt, unsigned in
/* We are processing handshake from client. Client sends us a character set it will use in communication.
* we store this character set in the client's variables to use later in multiplexing with different backends
*/
mysql_variables.client_set_value(sess, SQL_CHARACTER_SET_RESULTS, ss.str().c_str());
mysql_variables.client_set_value(sess, SQL_CHARACTER_SET_CLIENT, ss.str().c_str());
mysql_variables.client_set_value(sess, SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());
mysql_variables.client_set_value(sess, SQL_COLLATION_CONNECTION, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_CHARACTER_SET_RESULTS, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_CHARACTER_SET_CLIENT, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_COLLATION_CONNECTION, ss.str().c_str());
}
return ret;
}
@ -2154,9 +2157,10 @@ __exit_do_auth:
free(tmp_pass);
}
#endif
assert(sess);
assert(sess->client_myds);
myconn=sess->client_myds->myconn;
assert(sess != NULL);
assert(sess->session_type==PROXYSQL_SESSION_MYSQL || sess->session_type==PROXYSQL_SESSION_ADMIN || sess->session_type==PROXYSQL_SESSION_STATS || sess->session_type==PROXYSQL_SESSION_SQLITE || sess->session_type==PROXYSQL_SESSION_CLICKHOUSE);
assert(((MySQL_Session *)sess)->client_myds != NULL);
myconn=((MySQL_Session *)sess)->client_myds->myconn;
assert(myconn);
myconn->set_charset(charset, CONNECT_START);
{
@ -2166,10 +2170,10 @@ __exit_do_auth:
/* We are processing handshake from client. Client sends us a character set it will use in communication.
* we store this character set in the client's variables to use later in multiplexing with different backends
*/
mysql_variables.client_set_value(sess, SQL_CHARACTER_SET_RESULTS, ss.str().c_str());
mysql_variables.client_set_value(sess, SQL_CHARACTER_SET_CLIENT, ss.str().c_str());
mysql_variables.client_set_value(sess, SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());
mysql_variables.client_set_value(sess, SQL_COLLATION_CONNECTION, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_CHARACTER_SET_RESULTS, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_CHARACTER_SET_CLIENT, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());
mysql_variables.client_set_value((MySQL_Session *)sess, SQL_COLLATION_CONNECTION, ss.str().c_str());
}
// enable compression
if (capabilities & CLIENT_COMPRESS) {
@ -2256,7 +2260,7 @@ bool MySQL_Protocol::verify_user_attributes(int calling_line, const char *callin
auto default_transaction_isolation = j.find("default-transaction_isolation");
if (default_transaction_isolation != j.end()) {
std::string default_transaction_isolation_value = j["default-transaction_isolation"].get<std::string>();
mysql_variables.client_set_value((*myds)->sess, SQL_ISOLATION_LEVEL, default_transaction_isolation_value.c_str());
mysql_variables.client_set_value((MySQL_Session *)((*myds)->sess), SQL_ISOLATION_LEVEL, default_transaction_isolation_value.c_str());
}
}
}
@ -2654,7 +2658,7 @@ void MySQL_ResultSet::init(MySQL_Protocol *_myprot, MYSQL_RES *_res, MYSQL *_my,
if (myprot==NULL) {
return; // this is a mirror
}
ProxySQL_Data_Stream * c_myds = *(myprot->myds);
MySQL_Data_Stream * c_myds = *(myprot->myds);
if (c_myds->com_field_list==false) {
myprot->generate_pkt_column_count(false,&pkt.ptr,&pkt.size,sid,num_fields,this);
sid++;
@ -2727,7 +2731,7 @@ void MySQL_ResultSet::init_with_stmt(MySQL_Connection *myconn) {
PROXY_TRACE2();
assert(stmt);
MYSQL_STMT *_stmt = stmt;
ProxySQL_Data_Stream * c_myds = *(myprot->myds);
MySQL_Data_Stream * c_myds = *(myprot->myds);
buffer_to_PSarrayOut();
unsigned long long total_size=0;
MYSQL_ROWS *r=_stmt->result.data;
@ -2986,7 +2990,7 @@ void MySQL_ResultSet::add_eof() {
resultset_completed=true;
}
void MySQL_ResultSet::add_err(ProxySQL_Data_Stream *_myds) {
void MySQL_ResultSet::add_err(MySQL_Data_Stream *_myds) {
PtrSize_t pkt;
if (myprot) {
MYSQL *_mysql=_myds->myconn->mysql;

File diff suppressed because it is too large Load Diff

@ -1,13 +1,15 @@
#include "MySQL_Variables.h"
#include "proxysql.h"
#include "Client_Session.h"
#include "MySQL_Session.h"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "SpookyV2.h"
#include <sstream>
static inline char is_digit(char c) {
if(c >= '0' && c <= '9')
return 1;
@ -76,7 +78,7 @@ MySQL_Variables::MySQL_Variables() {
MySQL_Variables::~MySQL_Variables() {}
bool MySQL_Variables::client_set_hash_and_value(Client_Session* session, int idx, const std::string& value, uint32_t hash) {
bool MySQL_Variables::client_set_hash_and_value(MySQL_Session* session, int idx, const std::string& value, uint32_t hash) {
if (!session || !session->client_myds || !session->client_myds->myconn) {
proxy_warning("Session validation failed\n");
return false;
@ -91,7 +93,7 @@ bool MySQL_Variables::client_set_hash_and_value(Client_Session* session, int idx
return true;
}
void MySQL_Variables::server_set_hash_and_value(Client_Session* session, int idx, const char* value, uint32_t hash) {
void MySQL_Variables::server_set_hash_and_value(MySQL_Session* session, int idx, const char* value, uint32_t hash) {
if (!session || !session->mybe || !session->mybe->server_myds || !session->mybe->server_myds->myconn || !value) {
proxy_warning("Session validation failed\n");
return;
@ -137,7 +139,7 @@ void MySQL_Variables::server_set_hash_and_value(Client_Session* session, int idx
*
* @return 'true' in case of success, 'false' otherwise.
*/
bool MySQL_Variables::client_set_value(Client_Session* session, int idx, const std::string& value) {
bool MySQL_Variables::client_set_value(MySQL_Session* session, int idx, const std::string& value) {
if (!session || !session->client_myds || !session->client_myds->myconn) {
proxy_warning("Session validation failed\n");
return false;
@ -189,21 +191,21 @@ bool MySQL_Variables::client_set_value(Client_Session* session, int idx, const s
return true;
}
const char* MySQL_Variables::client_get_value(Client_Session* session, int idx) const {
const char* MySQL_Variables::client_get_value(MySQL_Session* session, int idx) const {
assert(session);
assert(session->client_myds);
assert(session->client_myds->myconn);
return session->client_myds->myconn->variables[idx].value;
}
uint32_t MySQL_Variables::client_get_hash(Client_Session* session, int idx) const {
uint32_t MySQL_Variables::client_get_hash(MySQL_Session* session, int idx) const {
assert(session);
assert(session->client_myds);
assert(session->client_myds->myconn);
return session->client_myds->myconn->var_hash[idx];
}
void MySQL_Variables::server_set_value(Client_Session* session, int idx, const char* value) {
void MySQL_Variables::server_set_value(MySQL_Session* session, int idx, const char* value) {
assert(session);
assert(session->mybe);
assert(session->mybe->server_myds);
@ -219,7 +221,7 @@ void MySQL_Variables::server_set_value(Client_Session* session, int idx, const c
session->mybe->server_myds->myconn->reorder_dynamic_variables_idx();
}
const char* MySQL_Variables::server_get_value(Client_Session* session, int idx) const {
const char* MySQL_Variables::server_get_value(MySQL_Session* session, int idx) const {
assert(session);
assert(session->mybe);
assert(session->mybe->server_myds);
@ -227,7 +229,7 @@ const char* MySQL_Variables::server_get_value(Client_Session* session, int idx)
return session->mybe->server_myds->myconn->variables[idx].value;
}
uint32_t MySQL_Variables::server_get_hash(Client_Session* session, int idx) const {
uint32_t MySQL_Variables::server_get_hash(MySQL_Session* session, int idx) const {
assert(session);
assert(session->mybe);
assert(session->mybe->server_myds);
@ -236,6 +238,11 @@ uint32_t MySQL_Variables::server_get_hash(Client_Session* session, int idx) cons
}
bool MySQL_Variables::update_variable(Client_Session* session, session_status status, int &_rc) {
assert(session->session_type==PROXYSQL_SESSION_MYSQL);
return update_variable((MySQL_Session *)session, status, _rc);
}
bool MySQL_Variables::update_variable(MySQL_Session* session, session_status status, int &_rc) {
int idx = SQL_NAME_LAST_HIGH_WM;
if (session->status == SETTING_VARIABLE) {
// if status is SETTING_VARIABLE , what variable needs to be changed is defined in changing_variable_idx
@ -253,6 +260,11 @@ bool MySQL_Variables::update_variable(Client_Session* session, session_status st
}
bool MySQL_Variables::verify_variable(Client_Session* session, int idx) const {
assert(session->session_type==PROXYSQL_SESSION_MYSQL);
return verify_variable((MySQL_Session *)session, idx);
}
bool MySQL_Variables::verify_variable(MySQL_Session* session, int idx) const {
auto ret = false;
if (likely(verifiers[idx])) {
auto client_hash = session->client_myds->myconn->var_hash[idx];
@ -264,10 +276,10 @@ bool MySQL_Variables::verify_variable(Client_Session* session, int idx) const {
return ret;
}
bool validate_charset(Client_Session* session, int idx, int &_rc) {
bool validate_charset(MySQL_Session* session, int idx, int &_rc) {
if (idx == SQL_CHARACTER_SET || idx == SQL_CHARACTER_SET_CLIENT || idx == SQL_CHARACTER_SET_RESULTS ||
idx == SQL_CHARACTER_SET_CONNECTION || idx == SQL_CHARACTER_SET_DATABASE || idx == SQL_COLLATION_CONNECTION) {
ProxySQL_Data_Stream *myds = session->mybe->server_myds;
MySQL_Data_Stream *myds = session->mybe->server_myds;
MySQL_Connection *myconn = myds->myconn;
char msg[128];
const MARIADB_CHARSET_INFO *ci = NULL;
@ -357,7 +369,7 @@ bool validate_charset(Client_Session* session, int idx, int &_rc) {
return true;
}
bool update_server_variable(Client_Session* session, int idx, int &_rc) {
bool update_server_variable(MySQL_Session* session, int idx, int &_rc) {
bool no_quote = true;
if (mysql_tracked_variables[idx].quote) no_quote = false;
bool st = mysql_tracked_variables[idx].set_transaction;
@ -428,7 +440,12 @@ bool update_server_variable(Client_Session* session, int idx, int &_rc) {
return ret;
}
bool verify_set_names(Client_Session* session) {
bool verify_set_names(Client_Session * session) {
assert(session->session_type==PROXYSQL_SESSION_MYSQL);
return verify_set_names((MySQL_Session *)session);
}
bool verify_set_names(MySQL_Session* session) {
uint32_t client_charset_hash = mysql_variables.client_get_hash(session, SQL_CHARACTER_SET_CLIENT);
if (client_charset_hash == 0)
return false;
@ -482,7 +499,7 @@ bool verify_set_names(Client_Session* session) {
return false;
}
inline bool verify_server_variable(Client_Session* session, int idx, uint32_t client_hash, uint32_t server_hash) {
inline bool verify_server_variable(MySQL_Session* session, int idx, uint32_t client_hash, uint32_t server_hash) {
if (client_hash && client_hash != server_hash) {
// Edge case for set charset command, because we do not know database character set
// for now we are setting connection and collation to empty
@ -518,12 +535,12 @@ inline bool verify_server_variable(Client_Session* session, int idx, uint32_t cl
return false;
}
bool logbin_update_server_variable(Client_Session* session, int idx, int &_rc) {
bool logbin_update_server_variable(MySQL_Session* session, int idx, int &_rc) {
return session->handler_again___status_SETTING_SQL_LOG_BIN(&_rc);
}
bool MySQL_Variables::parse_variable_boolean(Client_Session *sess, int idx, string& value1, bool * lock_hostgroup) {
bool MySQL_Variables::parse_variable_boolean(MySQL_Session *sess, int idx, string& value1, bool * lock_hostgroup) {
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Processing SET %s value %s\n", mysql_tracked_variables[idx].set_variable_name, value1.c_str());
int __tmp_value = -1;
if (
@ -564,7 +581,7 @@ bool MySQL_Variables::parse_variable_boolean(Client_Session *sess, int idx, stri
bool MySQL_Variables::parse_variable_number(Client_Session *sess, int idx, string& value1, bool * lock_hostgroup) {
bool MySQL_Variables::parse_variable_number(MySQL_Session *sess, int idx, string& value1, bool * lock_hostgroup) {
int vl = strlen(value1.c_str());
const char *v = value1.c_str();
bool only_digit_chars = true;

@ -6,7 +6,6 @@
#include <prometheus/exposer.h>
#include <prometheus/counter.h>
#include "MySQL_HostGroups_Manager.h"
#include "proxysql_admin.h"
#include "re2/re2.h"
#include "re2/regexp.h"
#include "proxysql.h"
@ -15,8 +14,11 @@
#include "proxysql_utils.h"
#include "prometheus_helpers.h"
#include "cpp.h"
#include "MySQL_Session.h"
#include "proxysql_admin.h"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "query_processor.h"
#include "ProxySQL_HTTP_Server.hpp" // HTTP server
#include "MySQL_Authentication.hpp"
@ -1171,9 +1173,12 @@ int ProxySQL_Test___PurgeDigestTable(bool async_purge, bool parallel, char **msg
int ProxySQL_Test___GenerateRandomQueryInDigestTable(int n) {
//unsigned long long queries=n;
//queries *= 1000;
Client_Session *sess = new Client_Session();
sess->client_myds = new ProxySQL_Data_Stream();
MySQL_Session *sess = new MySQL_Session();
// When the session is destroyed, client_connections is automatically decreased.
// Because this is not a real connection, we artificially increase
// client_connections
__sync_fetch_and_add(&MyHGM->status.client_connections,1);
sess->client_myds = new MySQL_Data_Stream();
sess->client_myds->fd=0;
sess->client_myds->init(MYDS_FRONTEND, sess, sess->client_myds->fd);
MySQL_Connection *myconn=new MySQL_Connection();
@ -1393,7 +1398,7 @@ static admin_main_loop_listeners S_amll;
bool admin_handler_command_kill_connection(char *query_no_space, unsigned int query_no_space_length, Client_Session *sess, ProxySQL_Admin *pa) {
bool admin_handler_command_kill_connection(char *query_no_space, unsigned int query_no_space_length, MySQL_Session *sess, ProxySQL_Admin *pa) {
uint32_t id=atoi(query_no_space+16);
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Trying to kill session %u\n", id);
bool rc=GloPWTH->kill_session(id);
@ -1412,7 +1417,7 @@ bool admin_handler_command_kill_connection(char *query_no_space, unsigned int qu
* returns false if the command is a valid one and is processed
* return true if the command is not a valid one and needs to be executed by SQLite (that will return an error)
*/
bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_space_length, Client_Session *sess, ProxySQL_Admin *pa) {
bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_space_length, MySQL_Session *sess, ProxySQL_Admin *pa) {
if (!(strncasecmp("PROXYSQL CLUSTER_NODE_UUID ", query_no_space, strlen("PROXYSQL CLUSTER_NODE_UUID ")))) {
int l = strlen("PROXYSQL CLUSTER_NODE_UUID ");
if (sess->client_myds->addr.port == 0) {
@ -1750,7 +1755,7 @@ bool is_valid_global_variable(const char *var_name) {
// multiple variables at once.
//
// It modifies the original query.
bool admin_handler_command_set(char *query_no_space, unsigned int query_no_space_length, Client_Session *sess, ProxySQL_Admin *pa, char **q, unsigned int *ql) {
bool admin_handler_command_set(char *query_no_space, unsigned int query_no_space_length, MySQL_Session *sess, ProxySQL_Admin *pa, char **q, unsigned int *ql) {
if (!strstr(query_no_space,(char *)"password")) { // issue #599
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Received command %s\n", query_no_space);
if (strncasecmp(query_no_space,(char *)"set autocommit",strlen((char *)"set autocommit"))) {
@ -1809,7 +1814,7 @@ bool admin_handler_command_set(char *query_no_space, unsigned int query_no_space
/* Note:
* This function can modify the original query
*/
bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query_no_space_length, Client_Session *sess, ProxySQL_Admin *pa, char **q, unsigned int *ql) {
bool admin_handler_command_load_or_save(char *query_no_space, unsigned int query_no_space_length, MySQL_Session *sess, ProxySQL_Admin *pa, char **q, unsigned int *ql) {
proxy_debug(PROXY_DEBUG_ADMIN, 5, "Received command %s\n", query_no_space);
#ifdef DEBUG
@ -3652,8 +3657,8 @@ std::string timediff_timezone_offset() {
return time_zone_offset;
}
void admin_session_handler(Client_Session *sess, void *_pa, PtrSize_t *pkt) {
void admin_session_handler(Client_Session *c_sess, void *_pa, PtrSize_t *pkt) {
MySQL_Session *sess = (MySQL_Session *)c_sess;
ProxySQL_Admin *pa=(ProxySQL_Admin *)_pa;
bool needs_vacuum = false;
char *error=NULL;
@ -5199,11 +5204,11 @@ void *child_mysql(void *arg) {
mysql_thr->curtime=monotonic_time();
GloQPro->init_thread();
mysql_thr->refresh_variables();
Client_Session *sess=mysql_thr->create_new_session_and_client_data_stream(client);
MySQL_Session *sess = mysql_thr->create_new_session_and_client_mysql_data_stream(client);
sess->thread=mysql_thr;
sess->session_type = PROXYSQL_SESSION_ADMIN;
sess->handler_function=admin_session_handler;
ProxySQL_Data_Stream *myds=sess->client_myds;
MySQL_Data_Stream *myds=sess->client_myds;
sess->start_time=mysql_thr->curtime;

@ -9,6 +9,8 @@
#include "ProxySQL_Cluster.hpp"
#include "MySQL_LDAP_Authentication.hpp"
#include "proxysql_admin.h"
#ifdef DEBUG
#define DEB "_DEBUG"
#else

@ -160,7 +160,7 @@ static enum sslstatus get_sslstatus(SSL* ssl, int n)
}
}
/*
void ProxySQL_Data_Stream::queue_encrypted_bytes(const char *buf, size_t len) {
ssl_write_buf = (char*)realloc(ssl_write_buf, ssl_write_len + len);
memcpy(ssl_write_buf + ssl_write_len, buf, len);
@ -219,7 +219,7 @@ enum sslstatus ProxySQL_Data_Stream::do_ssl_handshake() {
}
status = get_sslstatus(ssl, n);
//proxy_info("SSL status = %d\n", status);
/* Did SSL request to write bytes? */
// Did SSL request to write bytes?
if (status == SSLSTATUS_WANT_IO) {
//proxy_info("SSL status is WANT_IO %d\n", status);
do {
@ -236,6 +236,7 @@ enum sslstatus ProxySQL_Data_Stream::do_ssl_handshake() {
}
return status;
}
*/
// Constructor
ProxySQL_Data_Stream::ProxySQL_Data_Stream() {
@ -251,11 +252,12 @@ ProxySQL_Data_Stream::ProxySQL_Data_Stream() {
proxy_addr.port=0;
sess=NULL;
/*
mysql_real_query.pkt.ptr=NULL;
mysql_real_query.pkt.size=0;
mysql_real_query.QueryPtr=NULL;
mysql_real_query.QuerySize=0;
*/
query_retries_on_failure=0;
connect_retries_on_failure=0;
max_connect_time=0;
@ -273,10 +275,10 @@ ProxySQL_Data_Stream::ProxySQL_Data_Stream() {
resultset=NULL;
queue_init(queueIN,QUEUE_T_DEFAULT_SIZE);
queue_init(queueOUT,QUEUE_T_DEFAULT_SIZE);
mybe=NULL;
// mybe=NULL;
active=1;
mypolls=NULL;
myconn=NULL; // 20141011
// myconn=NULL; // 20141011
DSS=STATE_NOT_CONNECTED;
encrypted=false;
switching_auth_stage = 0;
@ -288,12 +290,14 @@ ProxySQL_Data_Stream::ProxySQL_Data_Stream() {
ssl_write_len = 0;
ssl_write_buf = NULL;
net_failure=false;
/*
CompPktIN.pkt.ptr=NULL;
CompPktIN.pkt.size=0;
CompPktIN.partial=0;
CompPktOUT.pkt.ptr=NULL;
CompPktOUT.pkt.size=0;
CompPktOUT.partial=0;
*/
multi_pkt.ptr=NULL;
multi_pkt.size=0;
@ -301,7 +305,7 @@ ProxySQL_Data_Stream::ProxySQL_Data_Stream() {
statuses.myconnpoll_get = 0;
statuses.myconnpoll_put = 0;
com_field_wild=NULL;
// com_field_wild=NULL;
}
// Destructor
@ -322,13 +326,14 @@ ProxySQL_Data_Stream::~ProxySQL_Data_Stream() {
proxy_addr.addr=NULL;
}
/*
free_mysql_real_query();
if (com_field_wild) {
free(com_field_wild);
com_field_wild=NULL;
}
*/
proxy_debug(PROXY_DEBUG_NET,1, "Shutdown Data Stream. Session=%p, DataStream=%p\n" , sess, this);
PtrSize_t pkt;
if (PSarrayIN) {
@ -355,6 +360,7 @@ ProxySQL_Data_Stream::~ProxySQL_Data_Stream() {
if (mypolls) mypolls->remove_index_fast(poll_fds_idx);
/*
if (fd>0) {
// // Changing logic here. The socket should be closed only if it is not a backend
if (myds_type==MYDS_FRONTEND) {
@ -377,21 +383,14 @@ ProxySQL_Data_Stream::~ProxySQL_Data_Stream() {
SSL_shutdown(ssl);
}
if (ssl) SSL_free(ssl);
/*
SSL_free() should also take care of these
if (rbio_ssl) {
BIO_free(rbio_ssl);
}
if (wbio_ssl) {
BIO_free(wbio_ssl);
}
*/
}
*/
if (multi_pkt.ptr) {
l_free(multi_pkt.size,multi_pkt.ptr);
multi_pkt.ptr=NULL;
multi_pkt.size=0;
}
/*
if (CompPktIN.pkt.ptr) {
l_free(CompPktIN.pkt.size,CompPktIN.pkt.ptr);
CompPktIN.pkt.ptr=NULL;
@ -402,12 +401,14 @@ ProxySQL_Data_Stream::~ProxySQL_Data_Stream() {
CompPktOUT.pkt.ptr=NULL;
CompPktOUT.pkt.size=0;
}
*/
if (x509_subject_alt_name) {
free(x509_subject_alt_name);
x509_subject_alt_name=NULL;
}
}
/*
// this function initializes a ProxySQL_Data_Stream
void ProxySQL_Data_Stream::init() {
if (myds_type!=MYDS_LISTENER) {
@ -422,7 +423,7 @@ void ProxySQL_Data_Stream::init() {
queue_destroy(queueOUT);
}
}
*/
void ProxySQL_Data_Stream::reinit_queues() {
if (queueIN.buffer==NULL)
queue_init(queueIN,QUEUE_T_DEFAULT_SIZE);
@ -430,6 +431,7 @@ void ProxySQL_Data_Stream::reinit_queues() {
queue_init(queueOUT,QUEUE_T_DEFAULT_SIZE);
}
/*
// this function initializes a ProxySQL_Data_Stream with arguments
void ProxySQL_Data_Stream::init(enum MySQL_DS_type _type, Client_Session *_sess, int _fd) {
myds_type=_type;
@ -440,6 +442,7 @@ void ProxySQL_Data_Stream::init(enum MySQL_DS_type _type, Client_Session *_sess,
//if (myconn==NULL) myconn = new MySQL_Connection();
if (myconn) myconn->fd=fd;
}
*/
// Soft shutdown of socket : it only deactivate the data stream
// TODO: should check the status of the data stream, and identify if it is safe to reconnect or if the session should be destroyed
@ -469,6 +472,7 @@ void ProxySQL_Data_Stream::shut_hard() {
}
}
/*
void ProxySQL_Data_Stream::check_data_flow() {
if ( (PSarrayIN->len || queue_data(queueIN) ) && ( PSarrayOUT->len || queue_data(queueOUT) ) ){
// there is data at both sides of the data stream: this is considered a fatal error
@ -525,20 +529,6 @@ int ProxySQL_Data_Stream::read_from_net() {
}
}
} else {
/*
if (!SSL_is_init_finished(ssl)) {
int ret = SSL_do_handshake(ssl);
int ret2;
if (ret != 1) {
//ERR_print_errors_fp(stderr);
ret2 = SSL_get_error(ssl, ret);
fprintf(stderr,"%d\n",ret2);
}
return 0;
} else {
r = SSL_read (ssl, queue_w_ptr(queueIN), s);
}
*/
PROXY_TRACE();
if (s < MY_SSL_BUFFER) {
return 0; // no enough space for reads
@ -587,14 +577,6 @@ int ProxySQL_Data_Stream::read_from_net() {
//proxy_info("Read %d bytes from SSL\n", r);
if (n2 > 0) {
}
/*
do {
n2 = SSL_read(ssl, buf2, sizeof(buf2));
if (n2 > 0) {
}
} while (n > 0);
*/
status = get_sslstatus(ssl, n2);
//proxy_info("SSL status = %d\n", status);
if (status == SSLSTATUS_WANT_IO) {
@ -745,7 +727,7 @@ int ProxySQL_Data_Stream::write_to_net() {
}
return bytes_io;
}
*/
bool ProxySQL_Data_Stream::available_data_out() {
int buflen=queue_data(queueOUT);
if (buflen || PSarrayOUT->len) {
@ -760,6 +742,7 @@ void ProxySQL_Data_Stream::remove_pollout() {
_pollfd->events = 0;
}
/*
void ProxySQL_Data_Stream::set_pollout() {
struct pollfd *_pollfd;
_pollfd=&mypolls->fds[poll_fds_idx];
@ -799,18 +782,6 @@ void ProxySQL_Data_Stream::set_pollout() {
int ProxySQL_Data_Stream::write_to_net_poll() {
int rc=0;
if (active==0) return rc;
/*
if (encrypted && !SSL_is_init_finished(ssl)) {
int ret = SSL_do_handshake(ssl);
int ret2;
if (ret != 1) {
//ERR_print_errors_fp(stderr);
ret2 = SSL_get_error(ssl, ret);
fprintf(stderr,"%d\n",ret2);
}
return 0;
}
*/
if (encrypted) {
if (!SSL_is_init_finished(ssl)) {
//proxy_info("SSL_is_init_finished completed: NO!\n");
@ -822,15 +793,6 @@ int ProxySQL_Data_Stream::write_to_net_poll() {
} else {
//proxy_info("SSL_is_init_finished completed: YES\n");
}
/*
if (!SSL_is_init_finished(ssl)) {
proxy_info("SSL_is_init_finished completed: NO!\n");
if (fd>0 && sess->session_type == PROXYSQL_SESSION_MYSQL) {
set_pollout();
return 0;
}
}
*/
//proxy_info("ssl_write_len: %u\n", ssl_write_len);
if (ssl_write_len) {
int n = write(fd, ssl_write_buf, ssl_write_len);
@ -887,7 +849,6 @@ int ProxySQL_Data_Stream::write_to_net_poll() {
}
return rc;
}
int ProxySQL_Data_Stream::read_pkts() {
int rc=0;
int r=0;
@ -1195,8 +1156,8 @@ void ProxySQL_Data_Stream::generate_compressed_packet() {
l_free(p2.size,p2.ptr);
}
}
*/
/*
int ProxySQL_Data_Stream::array2buffer() {
int ret=0;
unsigned int idx=0;
@ -1275,7 +1236,8 @@ __exit_array2buffer:
}
return ret;
}
*/
/*
unsigned char * ProxySQL_Data_Stream::resultset2buffer(bool del) {
unsigned int i;
unsigned int l=0;
@ -1322,27 +1284,23 @@ void ProxySQL_Data_Stream::buffer2resultset(unsigned char *ptr, unsigned int siz
memcpy((char *)buff + (bl-bf), __ptr, l);
bf -= l;
__ptr+=l;
/*
l=hdr.pkt_length+sizeof(mysql_hdr);
pkt=l_alloc(l);
memcpy(pkt,__ptr,l);
resultset->add(pkt,l);
__ptr+=l;
*/
}
if (buff) {
// last buffer to add
resultset->add(buff,bl-bf);
}
};
*/
/*
int ProxySQL_Data_Stream::array2buffer_full() {
int rc=0;
int r=0;
while((r=array2buffer())) rc+=r;
return rc;
}
*/
/*
int ProxySQL_Data_Stream::assign_fd_from_mysql_conn() {
assert(myconn);
//proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p, oldFD=%d, newFD=%d\n", this->sess, this, fd, myconn->myconn.net.fd);
@ -1359,6 +1317,7 @@ void ProxySQL_Data_Stream::unplug_backend() {
mypolls=NULL;
fd=0;
}
*/
void ProxySQL_Data_Stream::set_net_failure() {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p , myds_type:%d\n", this->sess, this, myds_type);
@ -1375,6 +1334,7 @@ void ProxySQL_Data_Stream::setDSS_STATE_QUERY_SENT_NET() {
DSS=STATE_QUERY_SENT_NET;
}
/*
void ProxySQL_Data_Stream::return_MySQL_Connection_To_Pool() {
MySQL_Connection *mc=myconn;
mc->last_time_used=sess->thread->curtime;
@ -1419,12 +1379,14 @@ void ProxySQL_Data_Stream::free_mysql_real_query() {
mysql_real_query.end();
}
}
*/
void ProxySQL_Data_Stream::destroy_queues() {
queue_destroy(queueIN);
queue_destroy(queueOUT);
}
/*
void ProxySQL_Data_Stream::destroy_MySQL_Connection_From_Pool(bool sq) {
MySQL_Connection *mc=myconn;
mc->last_time_used=sess->thread->curtime;
@ -1433,6 +1395,7 @@ void ProxySQL_Data_Stream::destroy_MySQL_Connection_From_Pool(bool sq) {
mc->send_quit=sq;
MyHGM->destroy_MyConn_from_pool(mc);
}
*/
bool ProxySQL_Data_Stream::data_in_rbio() {
if (rbio_ssl->num_write > rbio_ssl->num_read) {

@ -10,6 +10,8 @@
#include "SQLite3_Server.h"
#include "MySQL_Authentication.hpp"
#include "proxysql_admin.h"
#include <search.h>
#include <stdlib.h>
#include <stdio.h>

@ -8,6 +8,8 @@
#include "ProxySQL_RESTAPI_Server.hpp"
#include "proxysql_utils.h"
#include "proxysql_admin.h"
using namespace httpserver;

@ -12,6 +12,8 @@
#include "re2/regexp.h"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "MySQL_Session.h"
#include "query_processor.h"
#include "StatCounters.h"
#include "MySQL_PreparedStatement.h"
@ -2801,11 +2803,11 @@ ProxyWorker_Thread::~ProxyWorker_Thread() {
}
Client_Session * ProxyWorker_Thread::create_new_session_and_client_data_stream(int _fd) {
MySQL_Session * ProxyWorker_Thread::create_new_session_and_client_mysql_data_stream(int _fd) {
int arg_on=1;
Client_Session *sess=new Client_Session;
MySQL_Session *sess=new MySQL_Session;
register_session(sess); // register session
sess->client_myds = new ProxySQL_Data_Stream();
sess->client_myds = new MySQL_Data_Stream();
sess->client_myds->fd=_fd;
setsockopt(sess->client_myds->fd, IPPROTO_TCP, TCP_NODELAY, (char *) &arg_on, sizeof(arg_on));
@ -2945,9 +2947,9 @@ void ProxyWorker_Thread::run___get_multiple_idle_connections(int& num_idles) {
int i;
num_idles=MyHGM->get_multiple_idle_connections(-1, curtime-mysql_thread___ping_interval_server_msec*1000, my_idle_conns, SESSIONS_FOR_CONNECTIONS_HANDLER);
for (i=0; i<num_idles; i++) {
ProxySQL_Data_Stream *myds;
MySQL_Data_Stream *myds;
MySQL_Connection *mc=my_idle_conns[i];
Client_Session *sess=new Client_Session();
MySQL_Session *sess=new MySQL_Session();
sess->mybe=sess->find_or_create_mysql_backend(mc->parent->myhgc->hid);
myds=sess->mybe->server_myds;
@ -3012,7 +3014,7 @@ void ProxyWorker_Thread::ProcessAllMyDS_BeforePoll() {
}
myds->revents=0;
if (myds->myds_type!=MYDS_LISTENER) {
configure_pollout(myds, n);
configure_pollout((MySQL_Data_Stream *)myds, n);
}
}
proxy_debug(PROXY_DEBUG_NET,1,"Poll for DataStream=%p will be called with FD=%d and events=%d\n", mypolls.myds[n], mypolls.fds[n].fd, mypolls.fds[n].events);
@ -3355,10 +3357,10 @@ void ProxyWorker_Thread::idle_thread_to_kill_idle_sessions() {
}
for (i=0;i<SESS_TO_SCAN && mysess_idx < mysql_sessions->len; i++) {
uint32_t sess_pos=mysess_idx;
Client_Session *mysess=(Client_Session *)mysql_sessions->index(sess_pos);
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
if (mysess->idle_since < min_idle || mysess->killed==true) {
mysess->killed=true;
ProxySQL_Data_Stream *tmp_myds=mysess->client_myds;
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
@ -3368,7 +3370,7 @@ void ProxyWorker_Thread::idle_thread_to_kill_idle_sessions() {
sessmap.erase(mysess->thread_session_id);
if (mysql_sessions->len > 1) {
// take the last element and adjust the map
Client_Session *mysess_last=(Client_Session *)mysql_sessions->index(mysql_sessions->len-1);
MySQL_Session *mysess_last=(MySQL_Session *)mysql_sessions->index(mysql_sessions->len-1);
if (mysess->thread_session_id != mysess_last->thread_session_id)
sessmap[mysess_last->thread_session_id]=sess_pos;
}
@ -3385,8 +3387,8 @@ void ProxyWorker_Thread::idle_thread_prepares_session_to_send_to_worker_thread(i
if (events[i].events) {
uint32_t sess_thr_id=events[i].data.u32;
uint32_t sess_pos=sessmap[sess_thr_id];
Client_Session *mysess=(Client_Session *)mysql_sessions->index(sess_pos);
ProxySQL_Data_Stream *tmp_myds=mysess->client_myds;
MySQL_Session *mysess=(MySQL_Session *)mysql_sessions->index(sess_pos);
MySQL_Data_Stream *tmp_myds=mysess->client_myds;
int dsidx=tmp_myds->poll_fds_idx;
//fprintf(stderr,"Removing session %p, DS %p idx %d\n",mysess,tmp_myds,dsidx);
mypolls.remove_index_fast(dsidx);
@ -3473,7 +3475,7 @@ void ProxyWorker_Thread::worker_thread_gets_sessions_from_idle_thread() {
while (myexchange.resume_mysql_sessions->len) {
Client_Session *mysess=(Client_Session *)myexchange.resume_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
ProxySQL_Data_Stream *myds=mysess->client_myds;
ProxySQL_Data_Stream *myds=((MySQL_Session *)mysess)->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
}
}
@ -3528,10 +3530,11 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
//
// this can happen, for example, with a low wait_timeout and running transaction
if (myds->sess->status==WAITING_CLIENT_DATA) {
if (myds->myconn->async_state_machine==ASYNC_IDLE) {
proxy_warning("Detected broken idle connection on %s:%d\n", myds->myconn->parent->address, myds->myconn->parent->port);
myds->destroy_MySQL_Connection_From_Pool(false);
myds->sess->set_unhealthy();
MySQL_Data_Stream *_myds = (MySQL_Data_Stream *)myds;
if (_myds->myconn->async_state_machine==ASYNC_IDLE) {
proxy_warning("Detected broken idle connection on %s:%d\n", _myds->myconn->parent->address, _myds->myconn->parent->port);
_myds->destroy_MySQL_Connection_From_Pool(false);
_myds->sess->set_unhealthy();
return false;
}
}
@ -3543,14 +3546,15 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
// only if we aren't using MariaDB Client Library
int rb = 0;
do {
rb = myds->read_from_net();
if (rb > 0 && myds->myds_type == MYDS_FRONTEND) {
MySQL_Data_Stream *_myds = (MySQL_Data_Stream *)myds;
rb = _myds->read_from_net();
if (rb > 0 && _myds->myds_type == MYDS_FRONTEND) {
status_variables.stvar[st_var_queries_frontends_bytes_recv] += rb;
}
myds->read_pkts();
_myds->read_pkts();
if (rb > 0 && myds->myds_type == MYDS_BACKEND) {
if (myds->sess->session_fast_forward) {
if (rb > 0 && _myds->myds_type == MYDS_BACKEND) {
if (_myds->sess->session_fast_forward) {
struct pollfd _fds;
nfds_t _nfds = 1;
_fds.fd = mypolls.fds[n].fd;
@ -3559,7 +3563,7 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
int _rc = poll(&_fds, _nfds, 0);
if ((_rc > 0) && _fds.revents == POLLIN) {
// there is more data
myds->revents = _fds.revents;
_myds->revents = _fds.revents;
} else {
rb = 0; // exit loop
}
@ -3568,10 +3572,10 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
}
} else {
bool set_rb_zero = true;
if (rb > 0 && myds->myds_type == MYDS_FRONTEND) {
if (myds->encrypted == true) {
if (SSL_is_init_finished(myds->ssl)) {
if (myds->data_in_rbio()) {
if (rb > 0 && _myds->myds_type == MYDS_FRONTEND) {
if (_myds->encrypted == true) {
if (SSL_is_init_finished(_myds->ssl)) {
if (_myds->data_in_rbio()) {
set_rb_zero = false;
}
}
@ -3584,7 +3588,8 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
} else {
if (mypolls.fds[n].revents) {
myds->myconn->handler(mypolls.fds[n].revents);
MySQL_Data_Stream *_myds = (MySQL_Data_Stream *)myds;
_myds->myconn->handler(mypolls.fds[n].revents);
}
}
if ( (mypolls.fds[n].events & POLLOUT)
@ -3593,20 +3598,21 @@ bool ProxyWorker_Thread::process_data_on_mysql_data_stream(ProxySQL_Data_Stream
) {
myds->set_net_failure();
}
myds->check_data_flow();
((MySQL_Data_Stream *)myds)->check_data_flow();
}
if (myds->active==0) {
if (myds->sess->client_myds==myds) {
proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", myds->sess, myds, myds->fd);
myds->sess->set_unhealthy();
MySQL_Session *mysess = (MySQL_Session *)(myds->sess);
if (mysess->client_myds==myds) {
proxy_debug(PROXY_DEBUG_NET,1, "Session=%p, DataStream=%p -- Deleting FD %d\n", mysess, myds, myds->fd);
mysess->set_unhealthy();
} else {
// if this is a backend with fast_forward, set unhealthy
// if this is a backend without fast_forward, do not set unhealthy: it will be handled by client library
if (myds->sess->session_fast_forward) { // if fast forward
if (mysess->session_fast_forward) { // if fast forward
if (myds->myds_type==MYDS_BACKEND) { // and backend
myds->sess->set_unhealthy(); // set unhealthy
mysess->set_unhealthy(); // set unhealthy
}
}
}
@ -3664,7 +3670,7 @@ void ProxyWorker_Thread::ProcessAllSessions_CompletedMirrorSession(unsigned int&
// this function was inline in ProxyWorker_Thread::process_all_sessions()
void ProxyWorker_Thread::ProcessAllSessions_MaintenanceLoop(Client_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_) {
void ProxyWorker_Thread::ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_) {
unsigned int numTrx=0;
sess->active_transactions=sess->NumActiveTransactions();
{
@ -3759,7 +3765,11 @@ void ProxyWorker_Thread::process_all_sessions() {
ProcessAllSessions_SortingSessions();
}
for (n=0; n<mysql_sessions->len; n++) {
Client_Session *sess=(Client_Session *)mysql_sessions->index(n);
Client_Session *c_sess=(Client_Session *)mysql_sessions->index(n);
if (c_sess->session_type != PROXYSQL_SESSION_MYSQL) {
continue;
}
MySQL_Session *sess = (MySQL_Session *)c_sess;
#ifdef DEBUG
if(sess==sess_stopat) {
sess_stopat=sess;
@ -3787,7 +3797,9 @@ void ProxyWorker_Thread::process_all_sessions() {
if (idle_maintenance_thread==false)
#endif // IDLE_THREADS
{
ProcessAllSessions_MaintenanceLoop(sess, sess_time, total_active_transactions_);
if (sess->session_type==PROXYSQL_SESSION_MYSQL) {
ProcessAllSessions_MaintenanceLoop(sess, sess_time, total_active_transactions_);
}
}
#ifdef IDLE_THREADS
else
@ -4172,7 +4184,7 @@ void ProxyWorker_Thread::listener_handle_new_connection(ProxySQL_Data_Stream *my
// create a new client connection
mypolls.fds[n].revents=0;
Client_Session *sess=create_new_session_and_client_data_stream(c);
MySQL_Session *sess=create_new_session_and_client_mysql_data_stream(c);
__sync_add_and_fetch(&MyHGM->status.client_connections_created,1);
if (__sync_add_and_fetch(&MyHGM->status.client_connections,1) > mysql_thread___max_connections) {
sess->max_connections_reached=true;
@ -4621,7 +4633,11 @@ SQLite3_result * ProxyWorker_Threads_Handler::SQL3_Processlist() {
pthread_mutex_lock(&thr->thread_mutex);
unsigned int j;
for (j=0; j<thr->mysql_sessions->len; j++) {
Client_Session *sess=(Client_Session *)thr->mysql_sessions->pdata[j];
Client_Session *c_sess=(Client_Session *)thr->mysql_sessions->pdata[j];
if (c_sess->session_type != PROXYSQL_SESSION_MYSQL) {
continue;
}
MySQL_Session *sess = (MySQL_Session *)c_sess;
if (sess->client_myds) {
char buf[1024];
char **pta=(char **)malloc(sizeof(char *)*colnum);
@ -5231,9 +5247,13 @@ void ProxyWorker_Thread::Get_Memory_Stats() {
}
MySQL_Connection * ProxyWorker_Thread::get_MyConn_local(unsigned int _hid, Client_Session *sess, char *gtid_uuid, uint64_t gtid_trxid, int max_lag_ms) {
MySQL_Connection * ProxyWorker_Thread::get_MyConn_local(unsigned int _hid, Client_Session *_sess, char *gtid_uuid, uint64_t gtid_trxid, int max_lag_ms) {
// some sanity check
if (sess == NULL) return NULL;
if (_sess == NULL) return NULL;
if (_sess->session_type != PROXYSQL_SESSION_MYSQL) {
return NULL;
}
MySQL_Session *sess = (MySQL_Session *)_sess;
if (sess->client_myds == NULL) return NULL;
if (sess->client_myds->myconn == NULL) return NULL;
if (sess->client_myds->myconn->userinfo == NULL) return NULL;
@ -5360,7 +5380,11 @@ void ProxyWorker_Thread::Scan_Sessions_to_Kill_All() {
void ProxyWorker_Thread::Scan_Sessions_to_Kill(PtrArray *mysess) {
for (unsigned int n=0; n<mysess->len && ( kq.conn_ids.size() + kq.query_ids.size() ) ; n++) {
Client_Session *_sess=(Client_Session *)mysess->index(n);
Client_Session *c_sess=(Client_Session *)mysess->index(n);
if (c_sess->session_type != PROXYSQL_SESSION_MYSQL) {
continue;
}
MySQL_Session *_sess = (MySQL_Session *)c_sess;
bool cont=true;
for (std::vector<thr_id_usr *>::iterator it=kq.conn_ids.begin(); cont && it!=kq.conn_ids.end(); ++it) {
thr_id_usr *t = *it;
@ -5405,9 +5429,13 @@ bool ProxyWorker_Thread::move_session_to_idle_mysql_sessions(ProxySQL_Data_Strea
if (_tmp_idle < ( (curtime > (unsigned int)mysql_thread___session_idle_ms * 1000) ? (curtime - mysql_thread___session_idle_ms * 1000) : 0)) {
// make sure data stream has no pending data out and session is not throttled (#1939)
// because epoll thread does not handle data stream with data out
if (myds->sess->client_myds == myds && !myds->available_data_out() && myds->sess->pause_until <= curtime) {
if (myds->sess->session_type != PROXYSQL_SESSION_MYSQL) {
return false;
}
MySQL_Session *mysess = (MySQL_Session *)(myds->sess);
if (mysess->client_myds == myds && !myds->available_data_out() && mysess->pause_until <= curtime) {
//unsigned int j;
bool has_backends = myds->sess->has_any_backend();
bool has_backends = mysess->has_any_backend();
/*
for (j=0;j<myds->sess->mybes->len;j++) {
MySQL_Backend *tmp_mybe=(MySQL_Backend *)myds->sess->mybes->index(j);
@ -5418,14 +5446,14 @@ bool ProxyWorker_Thread::move_session_to_idle_mysql_sessions(ProxySQL_Data_Strea
}
*/
if (has_backends==false) {
unsigned long long idle_since = curtime - myds->sess->IdleTime();
unsigned long long idle_since = curtime - mysess->IdleTime();
mypolls.remove_index_fast(n);
myds->mypolls=NULL;
unsigned int i = find_session_idx_in_mysql_sessions(myds->sess);
myds->sess->thread=NULL;
unsigned int i = find_session_idx_in_mysql_sessions(mysess);
mysess->thread=NULL;
unregister_session(i);
myds->sess->idle_since = idle_since;
idle_mysql_sessions->add(myds->sess);
mysess->idle_since = idle_since;
idle_mysql_sessions->add(mysess);
return true;
}
}
@ -5435,10 +5463,14 @@ bool ProxyWorker_Thread::move_session_to_idle_mysql_sessions(ProxySQL_Data_Strea
#endif // IDLE_THREADS
bool ProxyWorker_Thread::set_backend_to_be_skipped_if_frontend_is_slow(ProxySQL_Data_Stream *myds, unsigned int n) {
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
if (myds->sess->session_type != PROXYSQL_SESSION_MYSQL) {
return false;
}
MySQL_Session *mysess = (MySQL_Session *)(myds->sess);
if (mysess && mysess->client_myds && mysess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
buffered_data = mysess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += mysess->client_myds->resultset->len * RESULTSET_BUFLEN;
// we pause receiving from backend at mysql_thread___threshold_resultset_size * 8
// but assuming that client isn't completely blocked, we will stop checking for data
// only at mysql_thread___threshold_resultset_size * 4
@ -5454,9 +5486,9 @@ bool ProxyWorker_Thread::set_backend_to_be_skipped_if_frontend_is_slow(ProxySQL_
void ProxyWorker_Thread::idle_thread_gets_sessions_from_worker_thread() {
pthread_mutex_lock(&myexchange.mutex_idles);
while (myexchange.idle_mysql_sessions->len) {
Client_Session *mysess=(Client_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0);
MySQL_Session *mysess=(MySQL_Session *)myexchange.idle_mysql_sessions->remove_index_fast(0);
register_session(mysess, false);
ProxySQL_Data_Stream *myds=mysess->client_myds;
MySQL_Data_Stream *myds=mysess->client_myds;
mypolls.add(POLLIN, myds->fd, myds, monotonic_time());
// add in epoll()
struct epoll_event event;
@ -5541,15 +5573,18 @@ void ProxyWorker_Thread::check_for_invalid_fd(unsigned int n) {
// check if the FD is valid
if (mypolls.fds[n].revents==POLLNVAL) {
// debugging output before assert
ProxySQL_Data_Stream *_myds=mypolls.myds[n];
if (_myds) {
if (_myds->myconn) {
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd, _myds->myconn->fd);
assert(mypolls.fds[n].revents!=POLLNVAL);
ProxySQL_Data_Stream *pds=mypolls.myds[n];
if (pds) {
if (pds->sess->session_type == PROXYSQL_SESSION_MYSQL) {
MySQL_Data_Stream * _myds = (MySQL_Data_Stream *)pds;
if (_myds->myconn) {
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d, MyConnFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd, _myds->myconn->fd);
assert(mypolls.fds[n].revents!=POLLNVAL);
}
}
}
// if we reached her, we didn't assert() yet
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, _myds->fd);
proxy_error("revents==POLLNVAL for FD=%d, events=%d, MyDSFD=%d\n", mypolls.fds[n].fd, mypolls.fds[n].events, pds->fd);
assert(mypolls.fds[n].revents!=POLLNVAL);
}
}
@ -5590,13 +5625,14 @@ void ProxyWorker_Thread::tune_timeout_for_session_needs_pause(ProxySQL_Data_Stre
}
}
void ProxyWorker_Thread::configure_pollout(ProxySQL_Data_Stream *myds, unsigned int n) {
void ProxyWorker_Thread::configure_pollout(MySQL_Data_Stream *myds, unsigned int n) {
if (myds->myds_type==MYDS_FRONTEND && myds->DSS==STATE_SLEEP && myds->sess && myds->sess->status==WAITING_CLIENT_DATA) {
myds->set_pollout();
} else {
if (myds->DSS > STATE_MARIADB_BEGIN && myds->DSS < STATE_MARIADB_END) {
mypolls.fds[n].events = POLLIN;
if (mypolls.myds[n]->myconn->async_exit_status & MYSQL_WAIT_WRITE)
assert(myds == mypolls.myds[n]);
if (myds->myconn->async_exit_status & MYSQL_WAIT_WRITE)
mypolls.fds[n].events |= POLLOUT;
} else {
myds->set_pollout();

@ -6,6 +6,8 @@
#include "proxysql_atomic.h"
#include "SpookyV2.h"
#include "prometheus_helpers.h"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "MySQL_Protocol.h"
#define THR_UPDATE_CNT(__a, __b, __c, __d) \

@ -8,6 +8,8 @@
#include "MySQL_PreparedStatement.h"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "MySQL_Session.h"
#include "query_processor.h"
#include "SpookyV2.h"
@ -1432,6 +1434,10 @@ Query_Processor_Output * Query_Processor::process_mysql_query(Client_Session *se
//}
pthread_rwlock_unlock(&rwlock);
}
MySQL_Data_Stream * client_myds = NULL;
if (sess->session_type==PROXYSQL_SESSION_MYSQL) {
client_myds = ((MySQL_Session *)sess)->client_myds;
}
QP_rule_t *qr = NULL;
re2_t *re2p;
int flagIN=0;
@ -1462,13 +1468,13 @@ __internal_loop:
continue;
}
if (qr->username && strlen(qr->username)) {
if (strcmp(qr->username,sess->client_myds->myconn->userinfo->username)!=0) {
if ( client_myds!= NULL && strcmp(qr->username,client_myds->myconn->userinfo->username)!=0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has no matching username\n", qr->rule_id);
continue;
}
}
if (qr->schemaname && strlen(qr->schemaname)) {
if (strcmp(qr->schemaname,sess->client_myds->myconn->userinfo->schemaname)!=0) {
if ( client_myds!= NULL && strcmp(qr->schemaname,client_myds->myconn->userinfo->schemaname)!=0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has no matching schemaname\n", qr->rule_id);
continue;
}
@ -1476,9 +1482,9 @@ __internal_loop:
// match on client address
if (qr->client_addr && strlen(qr->client_addr)) {
if (sess->client_myds->addr.addr) {
if (client_myds!= NULL && client_myds->addr.addr) {
if (qr->client_addr_wildcard_position == -1) { // no wildcard , old algorithm
if (strcmp(qr->client_addr,sess->client_myds->addr.addr)!=0) {
if (strcmp(qr->client_addr,client_myds->addr.addr)!=0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has no matching client_addr\n", qr->rule_id);
continue;
}
@ -1486,7 +1492,7 @@ __internal_loop:
// catch all!
// therefore we have a match
} else { // client_addr_wildcard_position > 0
if (strncmp(qr->client_addr,sess->client_myds->addr.addr,qr->client_addr_wildcard_position)!=0) {
if (strncmp(qr->client_addr,client_myds->addr.addr,qr->client_addr_wildcard_position)!=0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has no matching client_addr\n", qr->rule_id);
continue;
}
@ -1495,9 +1501,9 @@ __internal_loop:
}
// match on proxy_addr
if (qr->proxy_addr && strlen(qr->proxy_addr)) {
if (sess->client_myds->proxy_addr.addr) {
if (strcmp(qr->proxy_addr,sess->client_myds->proxy_addr.addr)!=0) {
if (client_myds!= NULL && qr->proxy_addr && strlen(qr->proxy_addr)) {
if (client_myds->proxy_addr.addr) {
if (strcmp(qr->proxy_addr,client_myds->proxy_addr.addr)!=0) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has no matching proxy_addr\n", qr->rule_id);
continue;
}
@ -1505,8 +1511,8 @@ __internal_loop:
}
// match on proxy_port
if (qr->proxy_port>=0) {
if (qr->proxy_port!=sess->client_myds->proxy_addr.port) {
if (client_myds!= NULL && qr->proxy_port>=0) {
if (qr->proxy_port!=client_myds->proxy_addr.port) {
proxy_debug(PROXY_DEBUG_MYSQL_QUERY_PROCESSOR, 5, "query rule %d has no matching proxy_port\n", qr->rule_id);
continue;
}
@ -1701,8 +1707,8 @@ __exit_process_mysql_query:
if (_thr___rules_fast_routing___keys_values) {
char keybuf[256];
char * keybuf_ptr = keybuf;
const char * u = sess->client_myds->myconn->userinfo->username;
const char * s = sess->client_myds->myconn->userinfo->schemaname;
const char * u = client_myds->myconn->userinfo->username;
const char * s = client_myds->myconn->userinfo->schemaname;
size_t keylen = strlen(u)+strlen(rand_del)+strlen(s)+30; // 30 is a big number
if (keylen > 250) {
keybuf_ptr = (char *)malloc(keylen);
@ -1742,13 +1748,13 @@ __exit_process_mysql_query:
char *username = NULL;
char *client_address = NULL;
bool check_run = true;
if (sess->client_myds) {
if (client_myds != NULL) {
check_run = false;
if (sess->client_myds->myconn && sess->client_myds->myconn->userinfo && sess->client_myds->myconn->userinfo->username) {
if (sess->client_myds->addr.addr) {
if (client_myds->myconn && client_myds->myconn->userinfo && client_myds->myconn->userinfo->username) {
if (client_myds->addr.addr) {
check_run = true;
username = sess->client_myds->myconn->userinfo->username;
client_address = sess->client_myds->addr.addr;
username = client_myds->myconn->userinfo->username;
client_address = client_myds->addr.addr;
pthread_mutex_lock(&global_mysql_firewall_whitelist_mutex);
// FIXME
// for now this function search for either username@ip or username@''
@ -1763,7 +1769,7 @@ __exit_process_mysql_query:
ret->firewall_whitelist_mode = wus_status;
if (wus_status == WUS_DETECTING || wus_status == WUS_PROTECTING) {
bool allowed_query = false;
char * schemaname = sess->client_myds->myconn->userinfo->schemaname;
char * schemaname = client_myds->myconn->userinfo->schemaname;
if (qp && qp->digest) {
allowed_query = find_firewall_whitelist_rule(username, client_address, schemaname, flagIN, qp->digest);
}
@ -1791,7 +1797,7 @@ __exit_process_mysql_query:
if (ret->firewall_whitelist_mode == WUS_DETECTING) {
action = (char *)"detected unknown";
}
proxy_warning("Firewall %s query with digest %s from user %s@%s\n", action, buf, username, sess->client_myds->addr.addr);
proxy_warning("Firewall %s query with digest %s from user %s@%s\n", action, buf, username, client_myds->addr.addr);
}
}
}
@ -1926,10 +1932,14 @@ unsigned long long Query_Processor::query_parser_update_counters(Client_Session
unsigned long long ret=_thr_commands_counters[c]->add_time(t);
char *ca = (char *)"";
MySQL_Data_Stream * client_myds = NULL;
if (sess->session_type==PROXYSQL_SESSION_MYSQL) {
client_myds = ((MySQL_Session *)sess)->client_myds;
}
if (mysql_thread___query_digests_track_hostname) {
if (sess->client_myds) {
if (sess->client_myds->addr.addr) {
ca = sess->client_myds->addr.addr;
if (client_myds) {
if (client_myds->addr.addr) {
ca = client_myds->addr.addr;
}
}
}
@ -1940,10 +1950,10 @@ unsigned long long Query_Processor::query_parser_update_counters(Client_Session
SpookyHash myhash;
myhash.Init(19,3);
assert(sess);
assert(sess->client_myds);
assert(sess->client_myds->myconn);
assert(sess->client_myds->myconn->userinfo);
MySQL_Connection_userinfo *ui=sess->client_myds->myconn->userinfo;
assert(client_myds);
assert(client_myds->myconn);
assert(client_myds->myconn->userinfo);
MySQL_Connection_userinfo *ui=client_myds->myconn->userinfo;
assert(ui->username);
assert(ui->schemaname);
myhash.Update(ui->username,strlen(ui->username));
@ -1959,10 +1969,10 @@ unsigned long long Query_Processor::query_parser_update_counters(Client_Session
SpookyHash myhash;
myhash.Init(19,3);
assert(sess);
assert(sess->client_myds);
assert(sess->client_myds->myconn);
assert(sess->client_myds->myconn->userinfo);
MySQL_Connection_userinfo *ui=sess->client_myds->myconn->userinfo;
assert(client_myds);
assert(client_myds->myconn);
assert(client_myds->myconn->userinfo);
MySQL_Connection_userinfo *ui=client_myds->myconn->userinfo;
assert(ui->username);
assert(ui->schemaname);
MySQL_STMT_Global_info *stmt_info=sess->CurrentQuery.stmt_info;
@ -1979,6 +1989,10 @@ unsigned long long Query_Processor::query_parser_update_counters(Client_Session
}
void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connection_userinfo *ui, unsigned long long t, unsigned long long n, MySQL_STMT_Global_info *_stmt_info, Client_Session *sess) {
MySQL_Data_Stream * client_myds = NULL;
if (sess->session_type==PROXYSQL_SESSION_MYSQL) {
client_myds = ((MySQL_Session *)sess)->client_myds;
}
pthread_rwlock_wrlock(&digest_rwlock);
QP_query_digest_stats *qds;
@ -2007,9 +2021,9 @@ void Query_Processor::update_query_digest(SQP_par_t *qp, int hid, MySQL_Connecti
}
char *ca = (char *)"";
if (mysql_thread___query_digests_track_hostname) {
if (sess->client_myds) {
if (sess->client_myds->addr.addr) {
ca = sess->client_myds->addr.addr;
if (client_myds) {
if (client_myds->addr.addr) {
ca = client_myds->addr.addr;
}
}
}

@ -1,14 +1,7 @@
#include "proxysql.h"
#include "cpp.h"
#include "ProxySQL_Data_Stream.h"
void * MySQL_Backend::operator new(size_t size) {
return l_alloc(size);
}
void MySQL_Backend::operator delete(void *ptr) {
l_free(sizeof(MySQL_Backend),ptr);
}
#include "MySQL_Data_Stream.h"
MySQL_Backend::MySQL_Backend() {
hostgroup_id=-1;

@ -7,8 +7,10 @@
#include "MySQL_PreparedStatement.h"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "query_processor.h"
#include "MySQL_Variables.h"
#include "MySQL_Session.h"
#include <atomic>
@ -530,14 +532,14 @@ unsigned int MySQL_Connection::set_charset(unsigned int _c, enum charset_action
// SQL_CHARACTER_SET should be set befor setting SQL_CHRACTER_ACTION
std::stringstream ss;
ss << _c;
mysql_variables.client_set_value(myds->sess, SQL_CHARACTER_SET, ss.str());
mysql_variables.client_set_value((MySQL_Session *)(myds->sess), SQL_CHARACTER_SET, ss.str());
// When SQL_CHARACTER_ACTION is set character set variables are set according to
// SQL_CHRACTER_SET value
ss.str(std::string());
ss.clear();
ss << action;
mysql_variables.client_set_value(myds->sess, SQL_CHARACTER_ACTION, ss.str());
mysql_variables.client_set_value((MySQL_Session *)(myds->sess), SQL_CHARACTER_ACTION, ss.str());
return _c;
}
@ -718,7 +720,7 @@ void MySQL_Connection::connect_start() {
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (void *)&timeout);
/* Take client character set and use it to connect to backend */
if (myds && myds->sess) {
csname = mysql_variables.client_get_value(myds->sess, SQL_CHARACTER_SET);
csname = mysql_variables.client_get_value((MySQL_Session *)(myds->sess), SQL_CHARACTER_SET);
}
const MARIADB_CHARSET_INFO * c = NULL;
@ -741,11 +743,11 @@ void MySQL_Connection::connect_start() {
std::stringstream ss;
ss << c->nr;
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET, ss.str().c_str());
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET_RESULTS, ss.str().c_str());
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET_CLIENT, ss.str().c_str());
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());
mysql_variables.server_set_value(myds->sess, SQL_COLLATION_CONNECTION, ss.str().c_str());
mysql_variables.server_set_value((MySQL_Session *)(myds->sess), SQL_CHARACTER_SET, ss.str().c_str());
mysql_variables.server_set_value((MySQL_Session *)(myds->sess), SQL_CHARACTER_SET_RESULTS, ss.str().c_str());
mysql_variables.server_set_value((MySQL_Session *)(myds->sess), SQL_CHARACTER_SET_CLIENT, ss.str().c_str());
mysql_variables.server_set_value((MySQL_Session *)(myds->sess), SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());
mysql_variables.server_set_value((MySQL_Session *)(myds->sess), SQL_COLLATION_CONNECTION, ss.str().c_str());
}
//mysql_options(mysql, MYSQL_SET_CHARSET_NAME, c->csname);
mysql->charset = c;
@ -755,9 +757,10 @@ void MySQL_Connection::connect_start() {
if (myds) {
if (myds->sess) {
if (myds->sess->client_myds) {
if (myds->sess->client_myds->myconn) {
uint32_t orig_client_flags = myds->sess->client_myds->myconn->options.client_flag;
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (mysess->client_myds) {
if (mysess->client_myds->myconn) {
uint32_t orig_client_flags = mysess->client_myds->myconn->options.client_flag;
if (orig_client_flags & CLIENT_FOUND_ROWS) {
client_flags |= CLIENT_FOUND_ROWS;
}
@ -784,9 +787,10 @@ void MySQL_Connection::connect_start() {
if (myds != NULL) {
if (myds->sess != NULL) {
if (myds->sess->session_fast_forward == true) { // this is a fast_forward connection
assert(myds->sess->client_myds != NULL);
MySQL_Connection * c = myds->sess->client_myds->myconn;
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (mysess->session_fast_forward == true) { // this is a fast_forward connection
assert(mysess->client_myds != NULL);
MySQL_Connection * c = mysess->client_myds->myconn;
assert(c != NULL);
mysql->options.client_flag &= ~(CLIENT_DEPRECATE_EOF); // we disable it by default
// if both client_flag and server_capabilities (used for client) , set CLIENT_DEPRECATE_EOF
@ -836,11 +840,12 @@ void MySQL_Connection::change_user_start() {
PROXY_TRACE();
//fprintf(stderr,"change_user_start FD %d\n", fd);
MySQL_Connection_userinfo *_ui = NULL;
if (myds->sess->client_myds == NULL) {
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (mysess->client_myds == NULL) {
// if client_myds is not defined, we are using CHANGE_USER to reset the connection
_ui = userinfo;
} else {
_ui = myds->sess->client_myds->myconn->userinfo;
_ui = mysess->client_myds->myconn->userinfo;
userinfo->set(_ui); // fix for bug #605
}
char *auth_password=NULL;
@ -878,7 +883,8 @@ void MySQL_Connection::ping_cont(short event) {
void MySQL_Connection::initdb_start() {
PROXY_TRACE();
MySQL_Connection_userinfo *client_ui=myds->sess->client_myds->myconn->userinfo;
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
MySQL_Connection_userinfo *client_ui=mysess->client_myds->myconn->userinfo;
async_exit_status = mysql_select_db_start(&interr,mysql,client_ui->schemaname);
}
@ -912,14 +918,15 @@ void MySQL_Connection::set_autocommit_cont(short event) {
void MySQL_Connection::set_names_start() {
PROXY_TRACE();
const MARIADB_CHARSET_INFO * c = proxysql_find_charset_nr(atoi(mysql_variables.client_get_value(myds->sess, SQL_CHARACTER_SET)));
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
const MARIADB_CHARSET_INFO * c = proxysql_find_charset_nr(atoi(mysql_variables.client_get_value(mysess, SQL_CHARACTER_SET)));
if (!c) {
// LCOV_EXCL_START
proxy_error("Not existing charset number %u\n", atoi(mysql_variables.client_get_value(myds->sess, SQL_CHARACTER_SET)));
proxy_error("Not existing charset number %u\n", atoi(mysql_variables.client_get_value(mysess, SQL_CHARACTER_SET)));
assert(0);
// LCOV_EXCL_STOP
}
async_exit_status = mysql_set_character_set_start(&interr,mysql, NULL, atoi(mysql_variables.client_get_value(myds->sess, SQL_CHARACTER_SET)));
async_exit_status = mysql_set_character_set_start(&interr,mysql, NULL, atoi(mysql_variables.client_get_value(mysess, SQL_CHARACTER_SET)));
}
void MySQL_Connection::set_names_cont(short event) {
@ -1276,13 +1283,14 @@ handler_again:
NEXT_IMMEDIATE(ASYNC_STMT_EXECUTE_END);
} else {
if (myds->sess->mirror==false) {
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (MyRS_reuse == NULL) {
MyRS = new MySQL_ResultSet();
MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, mysql, query.stmt);
MyRS->init(&mysess->client_myds->myprot, query.stmt_result, mysql, query.stmt);
} else {
MyRS = MyRS_reuse;
MyRS_reuse = NULL;
MyRS->init(&myds->sess->client_myds->myprot, query.stmt_result, mysql, query.stmt);
MyRS->init(&mysess->client_myds->myprot, query.stmt_result, mysql, query.stmt);
}
} else {
/*
@ -1310,10 +1318,11 @@ handler_again:
case ASYNC_STMT_EXECUTE_STORE_RESULT_CONT:
PROXY_TRACE2();
{ // this copied mostly from ASYNC_USE_RESULT_CONT
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (myds->sess && mysess->client_myds && mysess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
buffered_data = mysess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += mysess->client_myds->resultset->len * RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*8) {
next_event(ASYNC_STMT_EXECUTE_STORE_RESULT_CONT); // we temporarily pause . See #1232
break;
@ -1454,14 +1463,15 @@ handler_again:
if (mysql_result==NULL) {
NEXT_IMMEDIATE(ASYNC_QUERY_END);
} else {
if (myds->sess->mirror==false) {
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (mysess->mirror==false) {
if (MyRS_reuse == NULL) {
MyRS = new MySQL_ResultSet();
MyRS->init(&myds->sess->client_myds->myprot, mysql_result, mysql);
MyRS->init(&mysess->client_myds->myprot, mysql_result, mysql);
} else {
MyRS = MyRS_reuse;
MyRS_reuse = NULL;
MyRS->init(&myds->sess->client_myds->myprot, mysql_result, mysql);
MyRS->init(&mysess->client_myds->myprot, mysql_result, mysql);
}
} else {
if (MyRS_reuse == NULL) {
@ -1479,10 +1489,11 @@ handler_again:
break;
case ASYNC_USE_RESULT_CONT:
{
if (myds->sess && myds->sess->client_myds && myds->sess->mirror==false) {
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (mysess && mysess->client_myds && mysess->mirror==false) {
unsigned int buffered_data=0;
buffered_data = myds->sess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += myds->sess->client_myds->resultset->len * RESULTSET_BUFLEN;
buffered_data = mysess->client_myds->PSarrayOUT->len * RESULTSET_BUFLEN;
buffered_data += mysess->client_myds->resultset->len * RESULTSET_BUFLEN;
if (buffered_data > (unsigned int)mysql_thread___threshold_resultset_size*8) {
next_event(ASYNC_USE_RESULT_CONT); // we temporarily pause . See #1232
break;
@ -2538,12 +2549,13 @@ void MySQL_Connection::ProcessQueryAndSetStatusFlags(char *query_digest_text) {
}
if (mysql) {
if (myds && myds->sess) {
if (myds->sess->client_myds && myds->sess->client_myds->myconn) {
MySQL_Session * mysess = (MySQL_Session *)myds->sess;
if (mysess->client_myds && mysess->client_myds->myconn) {
// if SERVER_STATUS_NO_BACKSLASH_ESCAPES is changed it is likely
// because of sql_mode was changed
// we set the same on the client connection
unsigned int ss = mysql->server_status & SERVER_STATUS_NO_BACKSLASH_ESCAPES;
myds->sess->client_myds->myconn->set_no_backslash_escapes(ss);
mysess->client_myds->myconn->set_no_backslash_escapes(ss);
}
}
}

@ -8,10 +8,14 @@
#include "MySQL_Logger.hpp"
#include "ProxySQL_Data_Stream.h"
#include "MySQL_Data_Stream.h"
#include "MySQL_Session.h"
#include "proxysql_utils.h"
#include "query_processor.h"
#include "SQLite3_Server.h"
#include "proxysql_admin.h"
#include <search.h>
#include <stdlib.h>
#include <stdio.h>
@ -309,8 +313,8 @@ bool match_monitor_query(const std::string& monitor_query, const std::string& qu
}
}
void SQLite3_Server_session_handler(Client_Session *sess, void *_pa, PtrSize_t *pkt) {
void SQLite3_Server_session_handler(Client_Session *c_sess, void *_pa, PtrSize_t *pkt) {
MySQL_Session *sess = (MySQL_Session *)c_sess;
char *error=NULL;
int cols;
int affected_rows;
@ -784,11 +788,11 @@ static void *child_mysql(void *arg) {
GloQPro->init_thread();
mysql_thr->refresh_variables();
Client_Session *sess=mysql_thr->create_new_session_and_client_data_stream(client);
MySQL_Session *sess=mysql_thr->create_new_session_and_client_mysql_data_stream(client);
sess->thread=mysql_thr;
sess->session_type = PROXYSQL_SESSION_SQLITE;
sess->handler_function=SQLite3_Server_session_handler;
ProxySQL_Data_Stream *myds=sess->client_myds;
MySQL_Data_Stream *myds=sess->client_myds;
fds[0].fd=client;
fds[0].revents=0;
@ -1105,7 +1109,7 @@ SQLite3_Server::SQLite3_Server() {
#ifdef TEST_GALERA
void SQLite3_Server::populate_galera_table(Client_Session *sess) {
void SQLite3_Server::populate_galera_table(MySQL_Session *sess) {
// this function needs to be called with lock on mutex galera_mutex already acquired
sessdb->execute("BEGIN TRANSACTION");
char *error=NULL;
@ -1158,7 +1162,7 @@ void SQLite3_Server::populate_galera_table(Client_Session *sess) {
#endif // TEST_GALERA
#ifdef TEST_AURORA
void SQLite3_Server::populate_aws_aurora_table(Client_Session *sess) {
void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess) {
// this function needs to be called with lock on mutex aurora_mutex already acquired
sessdb->execute("DELETE FROM REPLICA_HOST_STATUS");
sqlite3_stmt *statement=NULL;
@ -1244,7 +1248,7 @@ void SQLite3_Server::populate_aws_aurora_table(Client_Session *sess) {
* @param sess The current session performing a query.
* @param txs_behind Unused parameter.
*/
void SQLite3_Server::populate_grouprep_table(Client_Session *sess, int txs_behind) {
void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind) {
GloAdmin->mysql_servers_wrlock();
// We are going to repopulate the map
this->grouprep_map.clear();
@ -1481,7 +1485,7 @@ bool SQLite3_Server::set_variable(char *name, char *value) { // this is the pub
void SQLite3_Server::send_MySQL_OK(MySQL_Protocol *myprot, char *msg, int rows, uint16_t status) {
assert(myprot);
ProxySQL_Data_Stream *myds=myprot->get_myds();
MySQL_Data_Stream *myds=myprot->get_myds();
myds->DSS=STATE_QUERY_SENT_DS;
myprot->generate_pkt_OK(true,NULL,NULL,1,rows,0,status,0,msg,false);
myds->DSS=STATE_SLEEP;
@ -1489,7 +1493,7 @@ void SQLite3_Server::send_MySQL_OK(MySQL_Protocol *myprot, char *msg, int rows,
void SQLite3_Server::send_MySQL_ERR(MySQL_Protocol *myprot, char *msg) {
assert(myprot);
ProxySQL_Data_Stream *myds=myprot->get_myds();
MySQL_Data_Stream *myds=myprot->get_myds();
myds->DSS=STATE_QUERY_SENT_DS;
myprot->generate_pkt_ERR(true,NULL,NULL,1,1045,(char *)"28000",msg);
myds->DSS=STATE_SLEEP;

@ -22,6 +22,8 @@
#include "proxysql_restapi.h"
#include "Web_Interface.hpp"
#include "proxysql_admin.h"
#include <libdaemon/dfork.h>
#include <libdaemon/dsignal.h>
#include <libdaemon/dlog.h>

@ -12,6 +12,15 @@
* @date 2021-10-28
*/
/*
NOTE:
Test 4 assumes that no new variable was introduced after
table history_mysql_status_variables was initialized.
If this is not true, manually run the following before running the test:
DELETE FROM history_mysql_status_variables;
*/
#include <algorithm>
#include <string>
#include <stdio.h>

@ -13,6 +13,12 @@
* any error reported by the client due to broken connections.
*/
/*
NOTE: the parameters in this test are tuned in a way that if proxysql starts
with only 1 worker thread, it is unlikely to ping all connections on time.
See note on wait_timeout
*/
#include <string>
#include <vector>
#include <map>
@ -33,6 +39,45 @@ using std::pair;
using srv_cfg = vector<pair<string,int>>;
int wait_timeout = 10;
// if only 1 worker thread is running, wait_timeout should be bigger
// 1 worker thread : wait_timeout = 45
// 4 worker threads : wait_timeout = 10
int compute_wait_timeout(MYSQL *my_conn) {
int res = EXIT_SUCCESS;
res = mysql_query(my_conn, "SELECT @@mysql-threads");
if (res != EXIT_SUCCESS) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(my_conn));
res = EXIT_FAILURE;
return res;
}
MYSQL_RES* my_res = mysql_store_result(my_conn);
if (my_res == nullptr) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(my_conn));
res = EXIT_FAILURE;
return res;
}
MYSQL_ROW row = mysql_fetch_row(my_res);
if (row == nullptr || row[0] == nullptr) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(my_conn));
res = EXIT_FAILURE;
return res;
} else {
const char *val = row[0];
diag("mysql-threads = %s", val);
if (strcmp(val,"1")==0) {
diag("Setting wait_timeout to 45 instead of 10");
wait_timeout = 45;
}
}
mysql_free_result(my_res);
return res;
}
int change_mysql_cfg(
const CommandLine& cl, const string& host, const string& port, const srv_cfg& new_srv_cfg, srv_cfg& out_old_srv_cfg
) {
@ -78,7 +123,9 @@ int change_mysql_cfg(
mysql_free_result(my_res);
mysql_query(my_conn, string { "SET GLOBAL " + config_var.first + "=" + std::to_string(config_var.second) }.c_str());
string query = string { "SET GLOBAL " + config_var.first + "=" + std::to_string(config_var.second) };
diag("Setting on %s:%s : %s", host.c_str(), port.c_str(), query.c_str());
mysql_query(my_conn, query.c_str());
if (res != EXIT_SUCCESS) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(my_conn));
res = EXIT_FAILURE;
@ -337,6 +384,7 @@ int wait_target_backend_conns(MYSQL* admin, uint32_t tg_backend_conns, uint32_t
break;
} else {
waited += 1;
diag("tg_backend_conns: %d, cur_conn_num: %ld , not matching after %lu checks", tg_backend_conns, cur_conn_num, waited);
sleep(1);
}
}
@ -387,6 +435,10 @@ int main(int, char**) {
return EXIT_FAILURE;
}
if (compute_wait_timeout(proxy_admin) != EXIT_SUCCESS) {
return EXIT_FAILURE;
}
double intv = 5;
double b = 128;
double b_0 = 256;
@ -416,15 +468,22 @@ int main(int, char**) {
MYSQL_QUERY(proxy_admin, "LOAD MYSQL SERVERS TO RUNTIME");
diag("Setting ProxySQL config...");
// Set the backend connections ping frequency
MYSQL_QUERY(proxy_admin, string { "SET mysql-ping_interval_server_msec=" + std::to_string(freq) }.c_str());
// Make sure no connection cleanup takes place
MYSQL_QUERY(proxy_admin, "SET mysql-free_connections_pct=100");
// Don't retry on failure
MYSQL_QUERY(proxy_admin, "SET mysql-query_retries_on_failure=0");
// Set a higher max_connection number for the servers
MYSQL_QUERY(proxy_admin, "LOAD MYSQL VARIABLES TO RUNTIME");
{
// Set the backend connections ping frequency
string query = string { "SET mysql-ping_interval_server_msec=" + std::to_string(freq) };
diag("%s", query.c_str());
MYSQL_QUERY(proxy_admin, query.c_str());
// Make sure no connection cleanup takes place
query = "SET mysql-free_connections_pct=100";
diag("%s", query.c_str());
MYSQL_QUERY(proxy_admin, query.c_str());
// Don't retry on failure
query = "SET mysql-query_retries_on_failure=0";
diag("%s", query.c_str());
MYSQL_QUERY(proxy_admin, query.c_str());
// Set a higher max_connection number for the servers
MYSQL_QUERY(proxy_admin, "LOAD MYSQL VARIABLES TO RUNTIME");
}
// Configure MySQL infra servers with: 'wait_timeout' and 'max_connections'
vector<pair<mysql_res_row, srv_cfg>> servers_old_configs {};
@ -440,7 +499,7 @@ int main(int, char**) {
return EXIT_FAILURE;
}
srv_cfg new_srv_cfg { { "wait_timeout", 10 }, { "max_connections", 2500 } };
srv_cfg new_srv_cfg { { "wait_timeout", wait_timeout }, { "max_connections", 2500 } };
for (const mysql_res_row& srv_row : servers_rows) {
srv_cfg old_srv_cfg {};

Loading…
Cancel
Save