Initial implementation for 'client_error_limit'

Add initial support for FR limiting the number of connection errors that
can be initiated from a particular address before deniying future
connections from that address. Feature is controlled by new introduced
variables:

- mysql-client_host_error_counts
- mysql-client_host_cache_size
pull/3617/head
Javier Jaramago Fernández 5 years ago
parent bc4109404b
commit e2dd0a30cd

@ -118,6 +118,7 @@ enum MySQL_Thread_status_variable {
st_var_aws_aurora_replicas_skipped_during_query,
st_var_automatic_detected_sqli,
st_var_whitelisted_sqli_fingerprint,
st_var_client_host_error_killed_connections,
st_var_END
};
@ -320,6 +321,7 @@ struct p_th_counter {
whitelisted_sqli_fingerprint,
mysql_killed_backend_connections,
mysql_killed_backend_queries,
client_host_error_killed_connections,
__size
};
};
@ -359,6 +361,20 @@ struct th_metrics_map_idx {
};
};
/**
* @brief Structure holding the data for a Client_Host_Cache entry.
*/
typedef struct _MySQL_Client_Host_Cache_Entry {
/**
* @brief Last time the entry was updated.
*/
uint64_t updated_at;
/**
* @brief Error count associated with the entry.
*/
uint32_t error_count;
} MySQL_Client_Host_Cache_Entry;
class MySQL_Threads_Handler
{
private:
@ -382,6 +398,22 @@ class MySQL_Threads_Handler
// variable address
// special variable : if true, further input validation is required
std::unordered_map<std::string, std::tuple<bool *, bool>> VariablesPointers_bool;
/**
* @brief Holds the clients host cache. It keeps track of the number of
* errors associated to a specific client:
* - Key: client identifier, based on 'clientaddr'.
* - Value: Structure of type 'MySQL_Client_Host_Cache_Entry' holding
* the last time the entry was updated and the error count associated
* with the client.
*/
std::unordered_map<std::string, MySQL_Client_Host_Cache_Entry> client_host_cache;
/**
* @brief Holds the mutex for accessing 'client_host_cache', since every
* access can potentially perform 'read/write' operations, a regular mutex
* is enough.
*/
pthread_mutex_t mutex_client_host_cache;
public:
struct {
int monitor_history;
@ -429,6 +461,8 @@ class MySQL_Threads_Handler
int query_retries_on_failure;
bool client_multi_statements;
bool connection_warming;
int client_host_cache_size;
int client_host_error_counts;
int connect_retries_on_failure;
int connect_retries_delay;
int connection_delay_multiplex_ms;
@ -545,6 +579,52 @@ class MySQL_Threads_Handler
std::array<prometheus::Counter*, p_th_counter::__size> p_counter_array {};
std::array<prometheus::Gauge*, p_th_gauge::__size> p_gauge_array {};
} status_variables;
/**
* @brief Update the client host cache with the supplied 'client_sockaddr',
* and the supplied 'error' parameter specifying if there was a connection
* error or not.
*
* NOTE: This function is not safe, the supplied 'client_sockaddr' should
* have been initialized by 'accept' or 'getpeername'. NULL checks are not
* performed.
*
* @details The 'client_sockaddr' parameter is inspected, and the
* 'client_host_cache' map is only updated in case of:
* - 'address_family' is either 'AF_INET' or 'AF_INET6'.
* - The address obtained from it isn't '127.0.0.1'.
*
* @param client_sockaddr A 'sockaddr' holding the required client information
* to update the 'client_host_cache_map'.
* @param error 'true' if there was an error in the connection that should be
* register, 'false' otherwise.
*/
void update_client_host_cache(struct sockaddr* client_sockaddr, bool error);
/**
* @brief Retrieves the entry of the underlying 'client_host_cache' map for
* the supplied 'client_sockaddr' in case of existing. In case it doesn't
* exist or the supplied 'client_sockaddr' doesn't met the requirements
* for being registered in the map, and zeroed 'MySQL_Client_Host_Cache_Entry'
* is returned.
*
* NOTE: This function is not safe, the supplied 'client_sockaddr' should
* have been initialized by 'accept' or 'getpeername'. NULL checks are not
* performed.
*
* @details The 'client_sockaddr' parameter is inspected, and the
* 'client_host_cache' map is only searched in case of:
* - 'address_family' is either 'AF_INET' or 'AF_INET6'.
* - The address obtained from it isn't '127.0.0.1'.
*
* @param client_sockaddr A 'sockaddr' holding the required client information
* to update the 'client_host_cache_map'.
* @return If found, the corresponding entry for the supplied 'client_sockaddr',
* a zeroed 'MySQL_Client_Host_Cache_Entry' otherwise.
*/
MySQL_Client_Host_Cache_Entry find_client_host_cache(struct sockaddr* client_sockaddr);
/**
* @brief Delete all the entries in the 'client_host_cache' internal map.
*/
void flush_client_host_cache();
/**
* @brief Callback to update the metrics.
*/

@ -799,6 +799,8 @@ __thread bool mysql_thread___enable_client_deprecate_eof;
__thread bool mysql_thread___enable_server_deprecate_eof;
__thread bool mysql_thread___log_mysql_warnings_enabled;
__thread bool mysql_thread___enable_load_data_local_infile;
__thread int mysql_thread___client_host_cache_size;
__thread int mysql_thread___client_host_error_counts;
/* variables used for Query Cache */
__thread int mysql_thread___query_cache_size_MB;
@ -951,6 +953,8 @@ extern __thread bool mysql_thread___enable_client_deprecate_eof;
extern __thread bool mysql_thread___enable_server_deprecate_eof;
extern __thread bool mysql_thread___log_mysql_warnings_enabled;
extern __thread bool mysql_thread___enable_load_data_local_infile;
extern __thread int mysql_thread___client_host_cache_size;
extern __thread int mysql_thread___client_host_error_counts;
/* variables used for Query Cache */
extern __thread int mysql_thread___query_cache_size_MB;

@ -4760,6 +4760,7 @@ void MySQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHA
void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) {
bool is_encrypted = client_myds->encrypted;
bool handshake_response_return = client_myds->myprot.process_pkt_handshake_response((unsigned char *)pkt->ptr,pkt->size);
bool handshake_err = true;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION,8,"Session=%p , DS=%p , handshake_response=%d , switching_auth_stage=%d , is_encrypted=%d , client_encrypted=%d\n", this, client_myds, handshake_response_return, client_myds->switching_auth_stage, is_encrypted, client_myds->encrypted);
if (
@ -4943,6 +4944,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
) {
// we are good!
client_myds->myprot.generate_pkt_OK(true,NULL,NULL, _pid, 0,0,0,0,NULL);
handshake_err = false;
GloMyLogger->log_audit_entry(PROXYSQL_MYSQL_AUTH_OK, this, NULL);
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_CLIENT_AUTH_OK;
@ -4979,6 +4981,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION,8,"Session=%p , DS=%p . STATE_CLIENT_AUTH_OK\n", this, client_myds);
GloMyLogger->log_audit_entry(PROXYSQL_MYSQL_AUTH_OK, this, NULL);
client_myds->myprot.generate_pkt_OK(true,NULL,NULL, _pid, 0,0,0,0,NULL);
handshake_err = false;
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_CLIENT_AUTH_OK;
}
@ -5045,6 +5048,10 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
__sync_add_and_fetch(&MyHGM->status.client_connections_aborted,1);
client_myds->DSS=STATE_SLEEP;
}
if (mysql_thread___client_host_cache_size) {
GloMTH->update_client_host_cache(client_myds->client_addr, handshake_err);
}
}
// Note: as commented in issue #546 and #547 , some clients ignore the status of CLIENT_MULTI_STATEMENTS

@ -88,6 +88,7 @@ mythr_st_vars_t MySQL_Thread_status_variables_counter_array[] {
{ st_var_whitelisted_sqli_fingerprint,p_th_counter::whitelisted_sqli_fingerprint, (char *)"whitelisted_sqli_fingerprint" },
{ st_var_max_connect_timeout_err, p_th_counter::max_connect_timeouts, (char *)"max_connect_timeouts" },
{ st_var_generated_pkt_err, p_th_counter::generated_error_packets, (char *)"generated_error_packets" },
{ st_var_client_host_error_killed_connections, p_th_counter::client_host_error_killed_connections, (char *)"client_host_error_killed_connections" },
};
mythr_g_st_vars_t MySQL_Thread_status_variables_gauge_array[] {
@ -412,6 +413,8 @@ static char * mysql_thread_variables_names[]= {
(char *)"shun_recovery_time_sec",
(char *)"query_retries_on_failure",
(char *)"client_multi_statements",
(char *)"client_host_cache_size",
(char *)"client_host_error_counts",
(char *)"connect_retries_on_failure",
(char *)"connect_retries_delay",
(char *)"connection_delay_multiplex_ms",
@ -863,6 +866,12 @@ th_metrics_map = std::make_tuple(
"proxysql_mysql_killed_backend_queries_total",
"Killed backend queries.",
metric_tags {}
),
std::make_tuple (
p_th_counter::client_host_error_killed_connections,
"proxysql_client_host_error_killed_connections",
"Killed client connections because address exceeded 'client_host_error_counts'.",
metric_tags {}
)
},
th_gauge_vector {
@ -1169,8 +1178,10 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
// Initialize prometheus metrics
init_prometheus_counter_array<th_metrics_map_idx, p_th_counter>(th_metrics_map, this->status_variables.p_counter_array);
init_prometheus_gauge_array<th_metrics_map_idx, p_th_gauge>(th_metrics_map, this->status_variables.p_gauge_array);
}
// Init client_host_cache mutex
pthread_mutex_init(&mutex_client_host_cache, NULL);
}
unsigned int MySQL_Threads_Handler::get_global_version() {
return __sync_fetch_and_add(&__global_MySQL_Thread_Variables_version,0);
@ -2090,6 +2101,9 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["handle_unknown_charset"] = make_tuple(&variables.handle_unknown_charset, 0, HANDLE_UNKNOWN_CHARSET__MAX_HANDLE_VALUE, false);
VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false);
VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false);
VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false);
VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false);
// logs
VariablesPointers_int["auditlog_filesize"] = make_tuple(&variables.auditlog_filesize, 1024*1024, 1*1024*1024*1024, false);
VariablesPointers_int["eventslog_filesize"] = make_tuple(&variables.eventslog_filesize, 1024*1024, 1*1024*1024*1024, false);
@ -2285,6 +2299,120 @@ void MySQL_Threads_Handler::stop_listeners() {
free_tokenizer( &tok );
}
/**
* @brief Gets the client address stored in 'client_addr' member as
* an string if available. If member 'client_addr' is NULL, returns an
* empty string.
*
* @return Either an string holding the string representation of internal
* member 'client_addr', or empty string if this member is NULL.
*/
std::string get_client_addr(struct sockaddr* client_addr) {
char buf[512];
std::string str_client_addr {};
if (client_addr == NULL) {
return str_client_addr;
}
switch (client_addr->sa_family) {
case AF_INET: {
struct sockaddr_in *ipv4 = (struct sockaddr_in *)client_addr;
if (ipv4->sin_port) {
inet_ntop(client_addr->sa_family, &ipv4->sin_addr, buf, INET_ADDRSTRLEN);
str_client_addr = std::string { buf };
} else {
str_client_addr = std::string { "localhost" };
}
break;
}
case AF_INET6: {
struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)client_addr;
inet_ntop(client_addr->sa_family, &ipv6->sin6_addr, buf, INET6_ADDRSTRLEN);
str_client_addr = std::string { buf };
break;
}
default:
str_client_addr = std::string { "localhost" };
break;
}
return str_client_addr;
}
MySQL_Client_Host_Cache_Entry MySQL_Threads_Handler::find_client_host_cache(struct sockaddr* client_sockaddr) {
MySQL_Client_Host_Cache_Entry entry { 0, 0 };
if (client_sockaddr->sa_family != AF_INET && client_sockaddr->sa_family != AF_INET6) {
return entry;
}
std::string client_addr = get_client_addr(client_sockaddr);
if (client_addr == "127.0.0.1") {
return entry;
}
pthread_mutex_lock(&mutex_client_host_cache);
auto found_entry = client_host_cache.find(client_addr);
if (found_entry != client_host_cache.end()) {
entry = found_entry->second;
}
pthread_mutex_unlock(&mutex_client_host_cache);
return entry;
}
void MySQL_Threads_Handler::update_client_host_cache(struct sockaddr* client_sockaddr, bool error) {
if (client_sockaddr->sa_family != AF_INET && client_sockaddr->sa_family != AF_INET6) {
return;
}
std::string client_addr = get_client_addr(client_sockaddr);
if (client_addr == "127.0.0.1") {
return;
}
if (error) {
pthread_mutex_lock(&mutex_client_host_cache);
// If the cache is full, find the latest entry on it, and update/remove it.
if (client_host_cache.size() == static_cast<size_t>(mysql_thread___client_host_cache_size)) {
auto older_elem = std::min_element(
client_host_cache.begin(),
client_host_cache.end(),
[] (const std::pair<std::string, MySQL_Client_Host_Cache_Entry>& f_entry,
const std::pair<std::string, MySQL_Client_Host_Cache_Entry>& s_entry)
{
return f_entry.second.updated_at < s_entry.second.updated_at;
}
);
if (older_elem->first == client_addr) {
older_elem->second.error_count += 1;
older_elem->second.updated_at = monotonic_time();
} else {
client_host_cache.erase(older_elem);
}
}
// Find the entry for the client, and update/insert it.
auto cache_entry = client_host_cache.find(client_addr);
if (cache_entry != client_host_cache.end()) {
cache_entry->second.error_count += 1;
cache_entry->second.updated_at = monotonic_time();
} else {
MySQL_Client_Host_Cache_Entry new_entry { monotonic_time(), 1 };
client_host_cache.insert({client_addr, new_entry});
}
pthread_mutex_unlock(&mutex_client_host_cache);
} else {
pthread_mutex_lock(&mutex_client_host_cache);
client_host_cache.erase(client_addr);
pthread_mutex_unlock(&mutex_client_host_cache);
}
}
void MySQL_Threads_Handler::flush_client_host_cache() {
pthread_mutex_lock(&mutex_client_host_cache);
client_host_cache.clear();
pthread_mutex_unlock(&mutex_client_host_cache);
}
MySQL_Threads_Handler::~MySQL_Threads_Handler() {
if (variables.monitor_username) { free(variables.monitor_username); variables.monitor_username=NULL; }
if (variables.monitor_password) { free(variables.monitor_password); variables.monitor_password=NULL; }
@ -3401,6 +3529,7 @@ void MySQL_Thread::process_all_sessions() {
if (sess_time/1000 > (unsigned long long)mysql_thread___connect_timeout_client) {
proxy_warning("Closing not established client connection %s:%d after %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, sess_time/1000);
sess->healthy = 0;
GloMTH->update_client_host_cache(sess->client_myds->client_addr, true);
}
}
if (maintenance_loop) {
@ -3660,6 +3789,8 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___enable_server_deprecate_eof=(bool)GloMTH->get_variable_int((char *)"enable_server_deprecate_eof");
mysql_thread___enable_load_data_local_infile=(bool)GloMTH->get_variable_int((char *)"enable_load_data_local_infile");
mysql_thread___log_mysql_warnings_enabled=(bool)GloMTH->get_variable_int((char *)"log_mysql_warnings_enabled");
mysql_thread___client_host_cache_size=GloMTH->get_variable_int((char *)"client_host_cache_size");
mysql_thread___client_host_error_counts=GloMTH->get_variable_int((char *)"client_host_error_counts");
#ifdef DEBUG
mysql_thread___session_debug=(bool)GloMTH->get_variable_int((char *)"session_debug");
#endif /* DEBUG */
@ -3736,7 +3867,6 @@ void MySQL_Thread::unregister_session_connection_handler(int idx, bool _new) {
mysql_sessions->remove_index_fast(idx);
}
void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n) {
int c;
union {
@ -3758,6 +3888,19 @@ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsig
}
c=accept(myds->fd, addr, &addrlen);
if (c>-1) { // accept() succeeded
if (mysql_thread___client_host_cache_size) {
MySQL_Client_Host_Cache_Entry client_host_entry =
GloMTH->find_client_host_cache(addr);
if (
client_host_entry.updated_at != 0 &&
client_host_entry.error_count == static_cast<uint32_t>(mysql_thread___client_host_error_counts)
) {
close(c);
status_variables.stvar[st_var_client_host_error_killed_connections] += 1;
return;
}
}
// create a new client connection
mypolls.fds[n].revents=0;
MySQL_Session *sess=create_new_session_and_client_data_stream(c);

@ -1523,6 +1523,17 @@ bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_
return false;
}
if (query_no_space_length==strlen("PROXYSQL FLUSH MYSQL CLIENT HOSTS") && !strncasecmp("PROXYSQL FLUSH MYSQL CLIENT HOSTS",query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL FLUSH MYSQL CLIENT HOSTS command\n");
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
if (GloMTH) {
GloMTH->flush_client_host_cache();
}
SPA->flush_error_log();
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL);
return false;
}
if (
(query_no_space_length==strlen("PROXYSQL FLUSH CONFIGDB") && !strncasecmp("PROXYSQL FLUSH CONFIGDB",query_no_space, query_no_space_length)) // see #923
) {

Loading…
Cancel
Save