From e2dd0a30cd79700f7da3ced1ecdaf2c1387006c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Tue, 10 Aug 2021 21:00:06 +0200 Subject: [PATCH] Initial implementation for 'client_error_limit' Add initial support for FR limiting the number of connection errors that can be initiated from a particular address before deniying future connections from that address. Feature is controlled by new introduced variables: - mysql-client_host_error_counts - mysql-client_host_cache_size --- include/MySQL_Thread.h | 80 ++++++++++++++++++++ include/proxysql_structs.h | 4 + lib/MySQL_Session.cpp | 7 ++ lib/MySQL_Thread.cpp | 147 ++++++++++++++++++++++++++++++++++++- lib/ProxySQL_Admin.cpp | 11 +++ 5 files changed, 247 insertions(+), 2 deletions(-) diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 4afa51740..7b9f7d015 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -118,6 +118,7 @@ enum MySQL_Thread_status_variable { st_var_aws_aurora_replicas_skipped_during_query, st_var_automatic_detected_sqli, st_var_whitelisted_sqli_fingerprint, + st_var_client_host_error_killed_connections, st_var_END }; @@ -320,6 +321,7 @@ struct p_th_counter { whitelisted_sqli_fingerprint, mysql_killed_backend_connections, mysql_killed_backend_queries, + client_host_error_killed_connections, __size }; }; @@ -359,6 +361,20 @@ struct th_metrics_map_idx { }; }; +/** + * @brief Structure holding the data for a Client_Host_Cache entry. + */ +typedef struct _MySQL_Client_Host_Cache_Entry { + /** + * @brief Last time the entry was updated. + */ + uint64_t updated_at; + /** + * @brief Error count associated with the entry. + */ + uint32_t error_count; +} MySQL_Client_Host_Cache_Entry; + class MySQL_Threads_Handler { private: @@ -382,6 +398,22 @@ class MySQL_Threads_Handler // variable address // special variable : if true, further input validation is required std::unordered_map> VariablesPointers_bool; + /** + * @brief Holds the clients host cache. It keeps track of the number of + * errors associated to a specific client: + * - Key: client identifier, based on 'clientaddr'. + * - Value: Structure of type 'MySQL_Client_Host_Cache_Entry' holding + * the last time the entry was updated and the error count associated + * with the client. + */ + std::unordered_map client_host_cache; + /** + * @brief Holds the mutex for accessing 'client_host_cache', since every + * access can potentially perform 'read/write' operations, a regular mutex + * is enough. + */ + pthread_mutex_t mutex_client_host_cache; + public: struct { int monitor_history; @@ -429,6 +461,8 @@ class MySQL_Threads_Handler int query_retries_on_failure; bool client_multi_statements; bool connection_warming; + int client_host_cache_size; + int client_host_error_counts; int connect_retries_on_failure; int connect_retries_delay; int connection_delay_multiplex_ms; @@ -545,6 +579,52 @@ class MySQL_Threads_Handler std::array p_counter_array {}; std::array p_gauge_array {}; } status_variables; + /** + * @brief Update the client host cache with the supplied 'client_sockaddr', + * and the supplied 'error' parameter specifying if there was a connection + * error or not. + * + * NOTE: This function is not safe, the supplied 'client_sockaddr' should + * have been initialized by 'accept' or 'getpeername'. NULL checks are not + * performed. + * + * @details The 'client_sockaddr' parameter is inspected, and the + * 'client_host_cache' map is only updated in case of: + * - 'address_family' is either 'AF_INET' or 'AF_INET6'. + * - The address obtained from it isn't '127.0.0.1'. + * + * @param client_sockaddr A 'sockaddr' holding the required client information + * to update the 'client_host_cache_map'. + * @param error 'true' if there was an error in the connection that should be + * register, 'false' otherwise. + */ + void update_client_host_cache(struct sockaddr* client_sockaddr, bool error); + /** + * @brief Retrieves the entry of the underlying 'client_host_cache' map for + * the supplied 'client_sockaddr' in case of existing. In case it doesn't + * exist or the supplied 'client_sockaddr' doesn't met the requirements + * for being registered in the map, and zeroed 'MySQL_Client_Host_Cache_Entry' + * is returned. + * + * NOTE: This function is not safe, the supplied 'client_sockaddr' should + * have been initialized by 'accept' or 'getpeername'. NULL checks are not + * performed. + * + * @details The 'client_sockaddr' parameter is inspected, and the + * 'client_host_cache' map is only searched in case of: + * - 'address_family' is either 'AF_INET' or 'AF_INET6'. + * - The address obtained from it isn't '127.0.0.1'. + * + * @param client_sockaddr A 'sockaddr' holding the required client information + * to update the 'client_host_cache_map'. + * @return If found, the corresponding entry for the supplied 'client_sockaddr', + * a zeroed 'MySQL_Client_Host_Cache_Entry' otherwise. + */ + MySQL_Client_Host_Cache_Entry find_client_host_cache(struct sockaddr* client_sockaddr); + /** + * @brief Delete all the entries in the 'client_host_cache' internal map. + */ + void flush_client_host_cache(); /** * @brief Callback to update the metrics. */ diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 57a47eb81..6b08ec1f2 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -799,6 +799,8 @@ __thread bool mysql_thread___enable_client_deprecate_eof; __thread bool mysql_thread___enable_server_deprecate_eof; __thread bool mysql_thread___log_mysql_warnings_enabled; __thread bool mysql_thread___enable_load_data_local_infile; +__thread int mysql_thread___client_host_cache_size; +__thread int mysql_thread___client_host_error_counts; /* variables used for Query Cache */ __thread int mysql_thread___query_cache_size_MB; @@ -951,6 +953,8 @@ extern __thread bool mysql_thread___enable_client_deprecate_eof; extern __thread bool mysql_thread___enable_server_deprecate_eof; extern __thread bool mysql_thread___log_mysql_warnings_enabled; extern __thread bool mysql_thread___enable_load_data_local_infile; +extern __thread int mysql_thread___client_host_cache_size; +extern __thread int mysql_thread___client_host_error_counts; /* variables used for Query Cache */ extern __thread int mysql_thread___query_cache_size_MB; diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 9a9f583ea..62ba5b3c4 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -4760,6 +4760,7 @@ void MySQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHA void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) { bool is_encrypted = client_myds->encrypted; bool handshake_response_return = client_myds->myprot.process_pkt_handshake_response((unsigned char *)pkt->ptr,pkt->size); + bool handshake_err = true; proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION,8,"Session=%p , DS=%p , handshake_response=%d , switching_auth_stage=%d , is_encrypted=%d , client_encrypted=%d\n", this, client_myds, handshake_response_return, client_myds->switching_auth_stage, is_encrypted, client_myds->encrypted); if ( @@ -4943,6 +4944,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( ) { // we are good! client_myds->myprot.generate_pkt_OK(true,NULL,NULL, _pid, 0,0,0,0,NULL); + handshake_err = false; GloMyLogger->log_audit_entry(PROXYSQL_MYSQL_AUTH_OK, this, NULL); status=WAITING_CLIENT_DATA; client_myds->DSS=STATE_CLIENT_AUTH_OK; @@ -4979,6 +4981,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION,8,"Session=%p , DS=%p . STATE_CLIENT_AUTH_OK\n", this, client_myds); GloMyLogger->log_audit_entry(PROXYSQL_MYSQL_AUTH_OK, this, NULL); client_myds->myprot.generate_pkt_OK(true,NULL,NULL, _pid, 0,0,0,0,NULL); + handshake_err = false; status=WAITING_CLIENT_DATA; client_myds->DSS=STATE_CLIENT_AUTH_OK; } @@ -5045,6 +5048,10 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( __sync_add_and_fetch(&MyHGM->status.client_connections_aborted,1); client_myds->DSS=STATE_SLEEP; } + + if (mysql_thread___client_host_cache_size) { + GloMTH->update_client_host_cache(client_myds->client_addr, handshake_err); + } } // Note: as commented in issue #546 and #547 , some clients ignore the status of CLIENT_MULTI_STATEMENTS diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 946942294..8b90af25a 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -88,6 +88,7 @@ mythr_st_vars_t MySQL_Thread_status_variables_counter_array[] { { st_var_whitelisted_sqli_fingerprint,p_th_counter::whitelisted_sqli_fingerprint, (char *)"whitelisted_sqli_fingerprint" }, { st_var_max_connect_timeout_err, p_th_counter::max_connect_timeouts, (char *)"max_connect_timeouts" }, { st_var_generated_pkt_err, p_th_counter::generated_error_packets, (char *)"generated_error_packets" }, + { st_var_client_host_error_killed_connections, p_th_counter::client_host_error_killed_connections, (char *)"client_host_error_killed_connections" }, }; mythr_g_st_vars_t MySQL_Thread_status_variables_gauge_array[] { @@ -412,6 +413,8 @@ static char * mysql_thread_variables_names[]= { (char *)"shun_recovery_time_sec", (char *)"query_retries_on_failure", (char *)"client_multi_statements", + (char *)"client_host_cache_size", + (char *)"client_host_error_counts", (char *)"connect_retries_on_failure", (char *)"connect_retries_delay", (char *)"connection_delay_multiplex_ms", @@ -863,6 +866,12 @@ th_metrics_map = std::make_tuple( "proxysql_mysql_killed_backend_queries_total", "Killed backend queries.", metric_tags {} + ), + std::make_tuple ( + p_th_counter::client_host_error_killed_connections, + "proxysql_client_host_error_killed_connections", + "Killed client connections because address exceeded 'client_host_error_counts'.", + metric_tags {} ) }, th_gauge_vector { @@ -1169,8 +1178,10 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { // Initialize prometheus metrics init_prometheus_counter_array(th_metrics_map, this->status_variables.p_counter_array); init_prometheus_gauge_array(th_metrics_map, this->status_variables.p_gauge_array); -} + // Init client_host_cache mutex + pthread_mutex_init(&mutex_client_host_cache, NULL); +} unsigned int MySQL_Threads_Handler::get_global_version() { return __sync_fetch_and_add(&__global_MySQL_Thread_Variables_version,0); @@ -2090,6 +2101,9 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["handle_unknown_charset"] = make_tuple(&variables.handle_unknown_charset, 0, HANDLE_UNKNOWN_CHARSET__MAX_HANDLE_VALUE, false); VariablesPointers_int["ping_interval_server_msec"] = make_tuple(&variables.ping_interval_server_msec, 1000, 7*24*3600*1000, false); VariablesPointers_int["ping_timeout_server"] = make_tuple(&variables.ping_timeout_server, 10, 600*1000, false); + VariablesPointers_int["client_host_cache_size"] = make_tuple(&variables.client_host_cache_size, 0, 1024*1024, false); + VariablesPointers_int["client_host_error_counts"] = make_tuple(&variables.client_host_error_counts, 0, 1024*1024, false); + // logs VariablesPointers_int["auditlog_filesize"] = make_tuple(&variables.auditlog_filesize, 1024*1024, 1*1024*1024*1024, false); VariablesPointers_int["eventslog_filesize"] = make_tuple(&variables.eventslog_filesize, 1024*1024, 1*1024*1024*1024, false); @@ -2285,6 +2299,120 @@ void MySQL_Threads_Handler::stop_listeners() { free_tokenizer( &tok ); } +/** + * @brief Gets the client address stored in 'client_addr' member as + * an string if available. If member 'client_addr' is NULL, returns an + * empty string. + * + * @return Either an string holding the string representation of internal + * member 'client_addr', or empty string if this member is NULL. + */ +std::string get_client_addr(struct sockaddr* client_addr) { + char buf[512]; + std::string str_client_addr {}; + + if (client_addr == NULL) { + return str_client_addr; + } + + switch (client_addr->sa_family) { + case AF_INET: { + struct sockaddr_in *ipv4 = (struct sockaddr_in *)client_addr; + if (ipv4->sin_port) { + inet_ntop(client_addr->sa_family, &ipv4->sin_addr, buf, INET_ADDRSTRLEN); + str_client_addr = std::string { buf }; + } else { + str_client_addr = std::string { "localhost" }; + } + break; + } + case AF_INET6: { + struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)client_addr; + inet_ntop(client_addr->sa_family, &ipv6->sin6_addr, buf, INET6_ADDRSTRLEN); + str_client_addr = std::string { buf }; + break; + } + default: + str_client_addr = std::string { "localhost" }; + break; + } + + return str_client_addr; +} + +MySQL_Client_Host_Cache_Entry MySQL_Threads_Handler::find_client_host_cache(struct sockaddr* client_sockaddr) { + MySQL_Client_Host_Cache_Entry entry { 0, 0 }; + if (client_sockaddr->sa_family != AF_INET && client_sockaddr->sa_family != AF_INET6) { + return entry; + } + std::string client_addr = get_client_addr(client_sockaddr); + if (client_addr == "127.0.0.1") { + return entry; + } + + pthread_mutex_lock(&mutex_client_host_cache); + auto found_entry = client_host_cache.find(client_addr); + if (found_entry != client_host_cache.end()) { + entry = found_entry->second; + } + pthread_mutex_unlock(&mutex_client_host_cache); + + return entry; +} + +void MySQL_Threads_Handler::update_client_host_cache(struct sockaddr* client_sockaddr, bool error) { + if (client_sockaddr->sa_family != AF_INET && client_sockaddr->sa_family != AF_INET6) { + return; + } + std::string client_addr = get_client_addr(client_sockaddr); + if (client_addr == "127.0.0.1") { + return; + } + + if (error) { + pthread_mutex_lock(&mutex_client_host_cache); + // If the cache is full, find the latest entry on it, and update/remove it. + if (client_host_cache.size() == static_cast(mysql_thread___client_host_cache_size)) { + auto older_elem = std::min_element( + client_host_cache.begin(), + client_host_cache.end(), + [] (const std::pair& f_entry, + const std::pair& s_entry) + { + return f_entry.second.updated_at < s_entry.second.updated_at; + } + ); + if (older_elem->first == client_addr) { + older_elem->second.error_count += 1; + older_elem->second.updated_at = monotonic_time(); + } else { + client_host_cache.erase(older_elem); + } + } + + // Find the entry for the client, and update/insert it. + auto cache_entry = client_host_cache.find(client_addr); + if (cache_entry != client_host_cache.end()) { + cache_entry->second.error_count += 1; + cache_entry->second.updated_at = monotonic_time(); + } else { + MySQL_Client_Host_Cache_Entry new_entry { monotonic_time(), 1 }; + client_host_cache.insert({client_addr, new_entry}); + } + pthread_mutex_unlock(&mutex_client_host_cache); + } else { + pthread_mutex_lock(&mutex_client_host_cache); + client_host_cache.erase(client_addr); + pthread_mutex_unlock(&mutex_client_host_cache); + } +} + +void MySQL_Threads_Handler::flush_client_host_cache() { + pthread_mutex_lock(&mutex_client_host_cache); + client_host_cache.clear(); + pthread_mutex_unlock(&mutex_client_host_cache); +} + MySQL_Threads_Handler::~MySQL_Threads_Handler() { if (variables.monitor_username) { free(variables.monitor_username); variables.monitor_username=NULL; } if (variables.monitor_password) { free(variables.monitor_password); variables.monitor_password=NULL; } @@ -3401,6 +3529,7 @@ void MySQL_Thread::process_all_sessions() { if (sess_time/1000 > (unsigned long long)mysql_thread___connect_timeout_client) { proxy_warning("Closing not established client connection %s:%d after %llums\n",sess->client_myds->addr.addr,sess->client_myds->addr.port, sess_time/1000); sess->healthy = 0; + GloMTH->update_client_host_cache(sess->client_myds->client_addr, true); } } if (maintenance_loop) { @@ -3660,6 +3789,8 @@ void MySQL_Thread::refresh_variables() { mysql_thread___enable_server_deprecate_eof=(bool)GloMTH->get_variable_int((char *)"enable_server_deprecate_eof"); mysql_thread___enable_load_data_local_infile=(bool)GloMTH->get_variable_int((char *)"enable_load_data_local_infile"); mysql_thread___log_mysql_warnings_enabled=(bool)GloMTH->get_variable_int((char *)"log_mysql_warnings_enabled"); + mysql_thread___client_host_cache_size=GloMTH->get_variable_int((char *)"client_host_cache_size"); + mysql_thread___client_host_error_counts=GloMTH->get_variable_int((char *)"client_host_error_counts"); #ifdef DEBUG mysql_thread___session_debug=(bool)GloMTH->get_variable_int((char *)"session_debug"); #endif /* DEBUG */ @@ -3736,7 +3867,6 @@ void MySQL_Thread::unregister_session_connection_handler(int idx, bool _new) { mysql_sessions->remove_index_fast(idx); } - void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n) { int c; union { @@ -3758,6 +3888,19 @@ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsig } c=accept(myds->fd, addr, &addrlen); if (c>-1) { // accept() succeeded + if (mysql_thread___client_host_cache_size) { + MySQL_Client_Host_Cache_Entry client_host_entry = + GloMTH->find_client_host_cache(addr); + if ( + client_host_entry.updated_at != 0 && + client_host_entry.error_count == static_cast(mysql_thread___client_host_error_counts) + ) { + close(c); + status_variables.stvar[st_var_client_host_error_killed_connections] += 1; + return; + } + } + // create a new client connection mypolls.fds[n].revents=0; MySQL_Session *sess=create_new_session_and_client_data_stream(c); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index ef20bdc7e..42227a51d 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -1523,6 +1523,17 @@ bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_ return false; } + if (query_no_space_length==strlen("PROXYSQL FLUSH MYSQL CLIENT HOSTS") && !strncasecmp("PROXYSQL FLUSH MYSQL CLIENT HOSTS",query_no_space, query_no_space_length)) { + proxy_info("Received PROXYSQL FLUSH MYSQL CLIENT HOSTS command\n"); + ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa; + if (GloMTH) { + GloMTH->flush_client_host_cache(); + } + SPA->flush_error_log(); + SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL); + return false; + } + if ( (query_no_space_length==strlen("PROXYSQL FLUSH CONFIGDB") && !strncasecmp("PROXYSQL FLUSH CONFIGDB",query_no_space, query_no_space_length)) // see #923 ) {