diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index c45f25fbc..3f48b8b9d 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -1,6 +1,6 @@ #ifndef __CLASS_MYSQL_MONITOR_H #define __CLASS_MYSQL_MONITOR_H - +#include #include #include @@ -222,12 +222,13 @@ class MySQL_Monitor_State_Data { bool set_wait_timeout(); }; +template class WorkItem { public: - MySQL_Monitor_State_Data *mmsd; + T *data; void *(*routine) (void *); - WorkItem(MySQL_Monitor_State_Data *_mmsd, void *(*start_routine) (void *)) { - mmsd=_mmsd; + WorkItem(T*_data, void *(*start_routine) (void *)) { + data=_data; routine=start_routine; } ~WorkItem() {} @@ -244,6 +245,9 @@ struct p_mon_counter { mysql_monitor_read_only_check_err, mysql_monitor_replication_lag_check_ok, mysql_monitor_replication_lag_check_err, + mysql_monitor_dns_cache_queried, + mysql_monitor_dns_cache_lookup_success, + mysql_monitor_dns_cache_record_updated, __size }; }; @@ -263,7 +267,64 @@ struct mon_metrics_map_idx { }; }; +struct DNS_Cache_Record { + DNS_Cache_Record() = default; + DNS_Cache_Record(DNS_Cache_Record&&) = default; + DNS_Cache_Record(const DNS_Cache_Record&) = default; + DNS_Cache_Record& operator=(DNS_Cache_Record&&) = default; + DNS_Cache_Record& operator=(const DNS_Cache_Record&) = default; + DNS_Cache_Record(const std::string& hostname, const std::string& ip, unsigned long long ttl = 0) : hostname(hostname), + ip(ip), ttl(ttl) + { } + ~DNS_Cache_Record() = default; + + std::string hostname; + std::string ip; + unsigned long long ttl = 0; +}; + +class DNS_Cache { + +public: + enum class OPERATION { + UPDATE, + REMOVE + }; + + + DNS_Cache() { + int rc = pthread_rwlock_init(&rwlock_, NULL); + assert(rc == 0); + } + + ~DNS_Cache() { + pthread_rwlock_destroy(&rwlock_); + } + + bool add(const std::string& hostname, const std::string& ip); + void remove(const std::string& hostname); + std::string lookup(const std::string& hostname, bool return_hostname_if_lookup_fails) const; + void bulk_update(const std::list> bulk_record); + +private: + mutable pthread_rwlock_t rwlock_; + std::unordered_map records; +}; + +struct DNS_Resolve_Data { + std::promise> result; + std::shared_ptr dns_cache; + std::string hostname; + std::string cached_ip; + unsigned int ttl; +}; + + class MySQL_Monitor { + public: + static std::string dns_lookup(const std::string& hostname); + static std::string dns_lookup(const char* hostname); + private: std::vector *tables_defs_monitor; std::vector *tables_defs_monitor_internal; @@ -295,13 +356,16 @@ class MySQL_Monitor { unsigned long long read_only_check_ERR; unsigned long long replication_lag_check_OK; unsigned long long replication_lag_check_ERR; + unsigned long long dns_cache_queried; + unsigned long long dns_cache_lookup_success; //cache hit + unsigned long long dns_cache_record_updated; struct { /// Prometheus metrics arrays std::array p_counter_array {}; std::array p_gauge_array {}; } metrics; void p_update_metrics(); - std::unique_ptr> queue; + std::unique_ptr*>> queue; MySQL_Monitor_Connection_Pool *My_Conn_Pool; bool shutdown; pthread_mutex_t mon_en_mutex; @@ -309,6 +373,9 @@ class MySQL_Monitor { SQLite3DB *admindb; // internal database SQLite3DB *monitordb; // internal database SQLite3DB *monitor_internal_db; // internal database + + std::shared_ptr dns_cache; + MySQL_Monitor(); ~MySQL_Monitor(); void print_version(); @@ -319,6 +386,7 @@ class MySQL_Monitor { void * monitor_galera(); void * monitor_aws_aurora(); void * monitor_replication_lag(); + void * monitor_dns_cache(); void * run(); void populate_monitor_mysql_server_group_replication_log(); void populate_monitor_mysql_server_galera_log(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index f7a6b9b6a..f84bfee6f 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -455,6 +455,8 @@ class MySQL_Threads_Handler int monitor_threads_min; int monitor_threads_max; int monitor_threads_queue_maxsize; + int monitor_local_dns_cache_ttl; + int monitor_local_dns_cache_refresh_interval; char *monitor_username; char *monitor_password; char * monitor_replication_lag_use_percona_heartbeat; diff --git a/include/gen_utils.h b/include/gen_utils.h index 507114f09..571c797f6 100644 --- a/include/gen_utils.h +++ b/include/gen_utils.h @@ -244,6 +244,7 @@ int remove_spaces(const char *); char *trim_spaces_in_place(char *str); char *trim_spaces_and_quotes_in_place(char *str); bool mywildcmp(const char *p, const char *str); +std::string trim(const std::string& s); /** * @brief Helper function that converts a MYSQL_RES into a 'SQLite3_result'. diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 9ccdc7098..8b54f4cdd 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -893,6 +893,8 @@ __thread int mysql_thread___monitor_slave_lag_when_null; __thread int mysql_thread___monitor_threads_min; __thread int mysql_thread___monitor_threads_max; __thread int mysql_thread___monitor_threads_queue_maxsize; +__thread int mysql_thread___monitor_local_dns_cache_ttl; +__thread int mysql_thread___monitor_local_dns_cache_refresh_interval; __thread char * mysql_thread___monitor_username; __thread char * mysql_thread___monitor_password; __thread char * mysql_thread___monitor_replication_lag_use_percona_heartbeat; @@ -1054,6 +1056,8 @@ extern __thread int mysql_thread___monitor_slave_lag_when_null; extern __thread int mysql_thread___monitor_threads_min; extern __thread int mysql_thread___monitor_threads_max; extern __thread int mysql_thread___monitor_threads_queue_maxsize; +extern __thread int mysql_thread___monitor_local_dns_cache_ttl; +extern __thread int mysql_thread___monitor_local_dns_cache_refresh_interval; extern __thread char * mysql_thread___monitor_username; extern __thread char * mysql_thread___monitor_password; extern __thread char * mysql_thread___monitor_replication_lag_use_percona_heartbeat; diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index a826bc8ec..d25a54b5e 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include "MySQL_Protocol.h" #include "MySQL_HostGroups_Manager.h" @@ -55,45 +56,53 @@ static MySQL_Monitor *GloMyMon; } while (rc==SQLITE_LOCKED || rc==SQLITE_BUSY);\ } while (0) +template class ConsumerThread : public Thread { - wqueue& m_queue; + wqueue*>& m_queue; int thrn; public: - ConsumerThread(wqueue& queue, int _n) : m_queue(queue) { + ConsumerThread(wqueue*>& queue, int _n) : m_queue(queue) { thrn=_n; } void* run() { // Remove 1 item at a time and process it. Blocks if no items are // available to process. - for (int i = 0; ( thrn ? i < thrn : 1) ; i++) { -//VALGRIND_DISABLE_ERROR_REPORTING; - WorkItem* item = (WorkItem*)m_queue.remove(); -//VALGRIND_ENABLE_ERROR_REPORTING; - if (item==NULL) { + for (int i = 0; (thrn ? i < thrn : 1); i++) { + //VALGRIND_DISABLE_ERROR_REPORTING; + WorkItem* item = (WorkItem*)m_queue.remove(); + //VALGRIND_ENABLE_ERROR_REPORTING; + if (item == NULL) { if (thrn) { // we took a NULL item that wasn't meant to reach here! Add it again - WorkItem *item=NULL; - GloMyMon->queue->add(item); + WorkItem* item = NULL; + m_queue.add(item); } // this is intentional to EXIT immediately return NULL; } + + if (item->routine) { // NULL is allowed, do nothing for it + bool me = true; + + if (check_monitor_enabled_flag) { + pthread_mutex_lock(&GloMyMon->mon_en_mutex); + me = GloMyMon->monitor_enabled; + pthread_mutex_unlock(&GloMyMon->mon_en_mutex); + } - pthread_mutex_lock(&GloMyMon->mon_en_mutex); - bool me = GloMyMon->monitor_enabled; - pthread_mutex_unlock(&GloMyMon->mon_en_mutex); if (me) { - item->routine((void *)item->mmsd); + item->routine((void *)item->data); } } - delete item->mmsd; + delete item->data; delete item; } return NULL; } }; +using DNSResolverThread = ConsumerThread; static int wait_for_mysql(MYSQL *mysql, int status) { struct pollfd pfd; @@ -363,7 +372,7 @@ void MySQL_Monitor_Connection_Pool::purge_some_connections() { MYSQL *my = (MYSQL *)srv->conns->remove_index_fast(0); MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false); mmsd->mysql=my; - GloMyMon->queue->add(new WorkItem(mmsd,NULL)); + GloMyMon->queue->add(new WorkItem(mmsd,NULL)); } for (unsigned int j=0 ; jconns->len ; j++) { MYSQL *my = (MYSQL *)srv->conns->index(j); @@ -372,7 +381,7 @@ void MySQL_Monitor_Connection_Pool::purge_some_connections() { srv->conns->remove_index_fast(j); MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false); mmsd->mysql=my; - GloMyMon->queue->add(new WorkItem(mmsd,NULL)); + GloMyMon->queue->add(new WorkItem(mmsd,NULL)); } } } @@ -560,6 +569,19 @@ void * monitor_replication_lag_pthread(void *arg) { return NULL; } +void* monitor_dns_cache_pthread(void* arg) { +#ifndef NOJEM + bool cache = false; + mallctl("thread.tcache.enabled", NULL, NULL, &cache, sizeof(bool)); +#endif + while (GloMTH == NULL) { + usleep(50000); + } + usleep(100000); + GloMyMon->monitor_dns_cache(); + return NULL; +} + using metric_name = std::string; using metric_help = std::string; using metric_tags = std::map; @@ -674,6 +696,27 @@ mon_metrics_map = std::make_tuple( metric_tags { { "status", "err" } } + ), + // ==================================================================== + + // ==================================================================== + std::make_tuple( + p_mon_counter::mysql_monitor_dns_cache_queried, + "proxysql_mysql_monitor_dns_cache_queried", + "Number of dns queried 'dns_cache_queried' from 'monitor_dns_resolver_thread'.", + metric_tags {} + ), + std::make_tuple( + p_mon_counter::mysql_monitor_dns_cache_lookup_success, + "proxysql_mysql_monitor_dns_cache_lookup_success", + "Number of dns queried 'dns_cache_lookup_success' from 'monitor_dns_resolver_thread'.", + metric_tags {} + ), + std::make_tuple( + p_mon_counter::mysql_monitor_dns_cache_record_updated, + "proxysql_mysql_monitor_dns_cache_record_updated", + "Number of dns queried 'dns_cache_record_updated' from 'monitor_dns_resolver_thread'.", + metric_tags {} ) // ==================================================================== }, @@ -694,12 +737,12 @@ mon_metrics_map = std::make_tuple( ); MySQL_Monitor::MySQL_Monitor() { - + dns_cache = std::make_shared(); GloMyMon = this; My_Conn_Pool=new MySQL_Monitor_Connection_Pool(); - queue = std::unique_ptr>(new wqueue()); + queue = std::unique_ptr*>>(new wqueue*>()); pthread_mutex_init(&group_replication_mutex,NULL); Group_Replication_Hosts_resultset=NULL; @@ -754,7 +797,7 @@ MySQL_Monitor::MySQL_Monitor() { num_threads=2; aux_threads=0; started_threads=0; - + connect_check_OK = 0; connect_check_ERR = 0; ping_check_OK = 0; @@ -763,8 +806,9 @@ MySQL_Monitor::MySQL_Monitor() { read_only_check_ERR = 0; replication_lag_check_OK = 0; replication_lag_check_ERR = 0; - - + dns_cache_queried = 0; + dns_cache_lookup_success = 0; + dns_cache_record_updated = 0; /* if (GloMTH) { @@ -826,6 +870,9 @@ void MySQL_Monitor::p_update_metrics() { p_update_counter(this->metrics.p_counter_array[p_mon_counter::mysql_monitor_read_only_check_err], GloMyMon->read_only_check_ERR); p_update_counter(this->metrics.p_counter_array[p_mon_counter::mysql_monitor_replication_lag_check_ok], GloMyMon->replication_lag_check_OK); p_update_counter(this->metrics.p_counter_array[p_mon_counter::mysql_monitor_replication_lag_check_err], GloMyMon->replication_lag_check_ERR); + p_update_counter(this->metrics.p_counter_array[p_mon_counter::mysql_monitor_dns_cache_queried], GloMyMon->dns_cache_queried); + p_update_counter(this->metrics.p_counter_array[p_mon_counter::mysql_monitor_dns_cache_lookup_success], GloMyMon->dns_cache_lookup_success); + p_update_counter(this->metrics.p_counter_array[p_mon_counter::mysql_monitor_dns_cache_record_updated], GloMyMon->dns_cache_record_updated); } } @@ -1184,7 +1231,7 @@ bool MySQL_Monitor_State_Data::create_new_connection() { mysql_options4(mysql, MYSQL_OPT_CONNECT_ATTR_ADD, "_server_host", hostname); MYSQL *myrc=NULL; if (port) { - myrc=mysql_real_connect(mysql, hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, port, NULL, 0); + myrc=mysql_real_connect(mysql, MySQL_Monitor::dns_lookup(hostname).c_str(), mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, port, NULL, 0); } else { myrc=mysql_real_connect(mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, hostname, 0); } @@ -2544,8 +2591,8 @@ void * MySQL_Monitor::monitor_connect() { if (rc_ping) { // only if server is responding to pings MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_connect_thread); + WorkItem* item; + item=new WorkItem(mmsd,monitor_connect_thread); GloMyMon->queue->add(item); usleep(us); } @@ -2594,7 +2641,7 @@ __sleep_monitor_connect_loop: mysql_thr=NULL; } for (unsigned int i=0;i *item=NULL; GloMyMon->queue->add(item); } return NULL; @@ -2670,8 +2717,8 @@ void * MySQL_Monitor::monitor_ping() { SQLite3_row *r=*it; MySQL_Monitor_State_Data *mmsd = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_ping_thread); + WorkItem* item; + item=new WorkItem(mmsd,monitor_ping_thread); GloMyMon->queue->add(item); usleep(us); if (GloMyMon->shutdown) return NULL; @@ -2839,7 +2886,7 @@ __sleep_monitor_ping_loop: mysql_thr=NULL; } for (unsigned int i=0;i* item=NULL; GloMyMon->queue->add(item); } return NULL; @@ -2957,8 +3004,8 @@ void * MySQL_Monitor::monitor_read_only() { } } mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_read_only_thread); + WorkItem* item; + item=new WorkItem(mmsd,monitor_read_only_thread); GloMyMon->queue->add(item); usleep(us); } @@ -3007,7 +3054,7 @@ __sleep_monitor_read_only: mysql_thr=NULL; } for (unsigned int i=0;i *item=NULL; GloMyMon->queue->add(item); } return NULL; @@ -3079,8 +3126,8 @@ void * MySQL_Monitor::monitor_group_replication() { mmsd->max_transactions_behind=atoi(r->fields[5]); mmsd->max_transactions_behind_count=mysql_thread___monitor_groupreplication_max_transactions_behind_count; mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_group_replication_thread); + WorkItem* item; + item=new WorkItem(mmsd,monitor_group_replication_thread); GloMyMon->queue->add(item); usleep(us); } @@ -3137,7 +3184,7 @@ __sleep_monitor_group_replication: mysql_thr=NULL; } for (unsigned int i=0;i*item=NULL; GloMyMon->queue->add(item); } return NULL; @@ -3194,8 +3241,8 @@ void * MySQL_Monitor::monitor_galera() { mmsd->writer_is_also_reader=atoi(r->fields[4]); mmsd->max_transactions_behind=atoi(r->fields[5]); mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_galera_thread); + WorkItem* item; + item=new WorkItem(mmsd,monitor_galera_thread); GloMyMon->queue->add(item); usleep(us); } @@ -3228,7 +3275,7 @@ __sleep_monitor_galera: mysql_thr=NULL; } for (unsigned int i=0;i*item=NULL; GloMyMon->queue->add(item); } return NULL; @@ -3295,8 +3342,8 @@ void * MySQL_Monitor::monitor_replication_lag() { if (rc_ping) { // only if server is responding to pings MySQL_Monitor_State_Data *mmsd = new MySQL_Monitor_State_Data(r->fields[1], atoi(r->fields[2]), NULL, atoi(r->fields[4]), atoi(r->fields[0])); mmsd->mondb=monitordb; - WorkItem* item; - item=new WorkItem(mmsd,monitor_replication_lag_thread); + WorkItem* item; + item=new WorkItem(mmsd,monitor_replication_lag_thread); GloMyMon->queue->add(item); usleep(us); } @@ -3345,12 +3392,328 @@ __sleep_monitor_replication_lag: mysql_thr=NULL; } for (unsigned int i=0;i*item=NULL; GloMyMon->queue->add(item); } return NULL; } +bool validate_ip(const std::string& ip) { + + // check if ip is vaild IPV4 ip address + struct sockaddr_in sa4; + if (inet_pton(AF_INET, ip.c_str(), &(sa4.sin_addr)) != 0) + return true; + + // check if ip is vaild IPV6 ip address + struct sockaddr_in6 sa6; + if (inet_pton(AF_INET6, ip.c_str(), &(sa6.sin6_addr)) != 0) + return true; + + return false; +} + +void* monitor_dns_resolver_thread(void* args) { + + DNS_Resolve_Data* dns_resolve_data = static_cast(args); + + struct addrinfo hints, *res = NULL; + int gai_rc; + int rc = 0; + + /* set hints for getaddrinfo */ + memset(&hints, 0, sizeof(hints)); + hints.ai_protocol = IPPROTO_TCP; /* TCP connections only */ + hints.ai_family = AF_UNSPEC; /* includes: IPv4, IPv6 or hostname */ + hints.ai_socktype = SOCK_STREAM; + + //unsigned int wait_gai = 1; + + //time_t start_t = time(NULL); + + //while ((gai_rc = getaddrinfo(hostname.c_str(), NULL, &hints, &res)) == EAI_AGAIN) + //{ + // if (time(NULL) - start_t > (time_t)10) + // break; + + // usleep(wait_gai); + // wait_gai *= 2; + //} + try { + gai_rc = getaddrinfo(dns_resolve_data->hostname.c_str(), NULL, &hints, &res); + + if (gai_rc != 0 || !res) + { + proxy_error("An error occurred while resolving hostname: %s [%d]\n", dns_resolve_data->hostname.c_str(), gai_rc); + dns_resolve_data->result.set_value(std::make_tuple<>(false, DNS_Cache_Record())); + return NULL; + } + + + char ip_addr[INET6_ADDRSTRLEN]; + + switch (res->ai_family) { + case AF_INET: { + struct sockaddr_in* ipv4 = (struct sockaddr_in*)res->ai_addr; + inet_ntop(res->ai_addr->sa_family, &ipv4->sin_addr, ip_addr, INET_ADDRSTRLEN); + break; + } + case AF_INET6: { + struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)res->ai_addr; + inet_ntop(res->ai_addr->sa_family, &ipv6->sin6_addr, ip_addr, INET6_ADDRSTRLEN); + break; + } + } + + freeaddrinfo(res); + + if (ip_addr) { + if (!dns_resolve_data->cached_ip.empty() && strncmp(dns_resolve_data->cached_ip.c_str(), ip_addr, sizeof(ip_addr) - 1) == 0) { + dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ip_addr, monotonic_time() + (1000 * dns_resolve_data->ttl)))); + } + else { + dns_resolve_data->dns_cache->add(dns_resolve_data->hostname, ip_addr); + dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ip_addr, monotonic_time() + (1000 * dns_resolve_data->ttl)))); + } + + return NULL; + } + } + catch (...) + { + } + + dns_resolve_data->result.set_value(std::make_tuple<>(false, DNS_Cache_Record())); + return NULL; +} + +void* MySQL_Monitor::monitor_dns_cache() { + // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) + unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; + std::unique_ptr mysql_thr(new MySQL_Thread()); + mysql_thr->curtime = monotonic_time(); + MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version(); + mysql_thr->refresh_variables(); + if (!GloMTH) return NULL; // quick exit during shutdown/restart + + unsigned long long t1; + unsigned long long t2; + unsigned long long next_loop_at = 0; + + std::list dns_records_bookkeeping; + wqueue*> dns_resolver_queue; + + while (GloMyMon->shutdown == false) { + // update the 'monitor_internal.mysql_servers' table with the latest 'mysql_servers' from 'MyHGM' + { + std::lock_guard mysql_servers_guard(MyHGM->mysql_servers_to_monitor_mutex); + update_monitor_mysql_servers(MyHGM->mysql_servers_to_monitor); + } + + //if (mysql_thread___monitor_local_dns_cache_ttl == 0 || + // mysql_thread___monitor_local_dns_cache_refresh_interval == 0 + // ) { + // + // break; + //} + + unsigned int glover; + char* error = NULL; + int cols = 0; + int affected_rows = 0; + SQLite3_result* resultset = NULL; + char* query = (char*)"SELECT DISTINCT trim(hostname) FROM monitor_internal.mysql_servers"; + t1 = monotonic_time(); + + if (!GloMTH) return NULL; // quick exit during shutdown/restart + glover = GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover; + mysql_thr->refresh_variables(); + next_loop_at = 0; + } + + if (t1 < next_loop_at) { + goto __sleep_monitor_dns_cache_loop; + } + next_loop_at = t1 + (1000 * mysql_thread___monitor_local_dns_cache_refresh_interval); + + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + goto __end_monitor_dns_cache_loop; + } + else { + if (resultset->rows_count == 0) { + goto __end_monitor_dns_cache_loop; + } + + + constexpr unsigned int num_dns_resolver_threads = 1; + constexpr unsigned int num_dns_resolver_max_threads = 16; + constexpr unsigned int num_dns_resolver_queue_max_size = 128; + + std::vector dns_resolver_threads(num_dns_resolver_threads); + + for (unsigned int i = 0; i < num_dns_resolver_threads; i++) { + dns_resolver_threads[i] = new DNSResolverThread(dns_resolver_queue, 0); + dns_resolver_threads[i]->start(2048, false); + } + + std::set hostnames; + + for (const auto row : resultset->rows) { + const std::string& hostname = row->fields[0]; + + if (!validate_ip(hostname)) + hostnames.insert(hostname); + } + + std::list>> dns_resolve_result; + + if (dns_records_bookkeeping.empty() == false) { + unsigned long long current_time = monotonic_time(); + + for (auto itr = dns_records_bookkeeping.cbegin(); + itr != dns_records_bookkeeping.cend();) { + if (hostnames.find(itr->hostname) == hostnames.end()) { + dns_cache->remove(itr->hostname); // to check + itr = dns_records_bookkeeping.erase(itr); + } + else { + hostnames.erase(itr->hostname); + + if (current_time >= itr->ttl) { + std::unique_ptr dns_resolve_data(new DNS_Resolve_Data()); + dns_resolve_data->hostname = itr->hostname; + dns_resolve_data->cached_ip = itr->ip; + dns_resolve_data->ttl = mysql_thread___monitor_local_dns_cache_ttl; + dns_resolve_data->dns_cache = dns_cache; + dns_resolve_result.emplace_back(dns_resolve_data->result.get_future()); + dns_resolver_queue.add(new WorkItem(dns_resolve_data.release(), monitor_dns_resolver_thread)); + + itr = dns_records_bookkeeping.erase(itr); + continue; + } + + itr++; + } + } + } + + { + unsigned int qsize = dns_resolver_queue.size(); + unsigned int num_threads = dns_resolver_threads.size(); + + if (qsize > num_dns_resolver_queue_max_size / 8) { + unsigned int threads_max = num_dns_resolver_max_threads; + + if (threads_max > num_threads) { + unsigned int new_threads = threads_max - num_threads; + + if ((qsize / 8) < new_threads) { + new_threads = qsize / 8; // try to not burst threads + } + + if (new_threads) { + unsigned int old_num_threads = num_threads; + num_threads += new_threads; + dns_resolver_threads.resize(num_threads); + + for (unsigned int i = old_num_threads; i < num_threads; i++) { + dns_resolver_threads[i] = new DNSResolverThread(dns_resolver_queue, 0); + dns_resolver_threads[i]->start(2048, false); + } + } + } + } + } + + if (hostnames.empty() == false) { + + for (const std::string& host_name : hostnames) { + std::unique_ptr dns_resolve_data(new DNS_Resolve_Data()); + dns_resolve_data->hostname = host_name; + dns_resolve_data->ttl = mysql_thread___monitor_local_dns_cache_ttl; + dns_resolve_data->dns_cache = dns_cache; + dns_resolve_result.emplace_back(dns_resolve_data->result.get_future()); + dns_resolver_queue.add(new WorkItem(dns_resolve_data.release(), monitor_dns_resolver_thread)); + } + } + + { + unsigned int qsize = dns_resolver_queue.size(); + unsigned int num_threads = dns_resolver_threads.size(); + + if (qsize > num_dns_resolver_queue_max_size / 8) { + unsigned int threads_max = num_dns_resolver_max_threads; + + if (threads_max > num_threads) { + unsigned int new_threads = threads_max - num_threads; + + if ((qsize / 8) < new_threads) { + new_threads = qsize / 8; // try to not burst threads + } + + if (new_threads) { + unsigned int old_num_threads = num_threads; + num_threads += new_threads; + dns_resolver_threads.resize(num_threads); + + for (unsigned int i = old_num_threads; i < num_threads; i++) { + dns_resolver_threads[i] = new DNSResolverThread(dns_resolver_queue, 0); + dns_resolver_threads[i]->start(2048, false); + } + } + } + } + } + + for (size_t i = 0; i < dns_resolver_threads.size(); i++) + dns_resolver_queue.add(NULL); + + + for (auto& dns_result : dns_resolve_result) { + auto ret_value = dns_result.get(); + + if (std::get<0>(ret_value)) { + DNS_Cache_Record dns_record = get<1>(ret_value); + dns_records_bookkeeping.emplace_back(dns_record); + proxy_error("Hostname: %s IP: %s\n", dns_record.hostname.c_str(), dns_record.ip.c_str()); + } + } + + + for (DNSResolverThread* const dns_resolver_thread : dns_resolver_threads) { + dns_resolver_thread->join(); + delete dns_resolver_thread; + } + + if (GloMyMon->shutdown) return NULL; + } + + __end_monitor_dns_cache_loop: + if (resultset) { + delete resultset; + resultset = NULL; + } + + __sleep_monitor_dns_cache_loop: + t2 = monotonic_time(); + if (t2 < next_loop_at) { + unsigned long long st = 0; + st = next_loop_at - t2; + if (st > 500000) { + st = 500000; + } + usleep(st); + } + } + + return NULL; +} + void * MySQL_Monitor::run() { while (GloMTH==NULL) { @@ -3365,26 +3728,37 @@ void * MySQL_Monitor::run() { MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); mysql_thr->refresh_variables(); //if (!GloMTH) return NULL; // quick exit during shutdown/restart + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, 2048 * 1024); + + pthread_t monitor_dns_cache_thread; + if (pthread_create(&monitor_dns_cache_thread, &attr, &monitor_dns_cache_pthread, NULL) != 0) { + // LCOV_EXCL_START + proxy_error("Thread creation\n"); + assert(0); + // LCOV_EXCL_STOP + } + __monitor_run: while (queue->size()) { // this is a clean up in case Monitor was restarted - WorkItem* item = (WorkItem*)queue->remove(); + WorkItem* item = (WorkItem*)queue->remove(); if (item) { - if (item->mmsd) { - delete item->mmsd; + if (item->data) { + delete item->data; } delete item; } } - ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*num_threads); + ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*num_threads); for (unsigned int i=0;i(*queue, 0); threads[i]->start(2048,false); } started_threads += num_threads; this->metrics.p_counter_array[p_mon_counter::mysql_monitor_workers_started]->Increment(num_threads); - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setstacksize (&attr, 2048*1024); + pthread_t monitor_connect_thread; if (pthread_create(&monitor_connect_thread, &attr, &monitor_connect_pthread,NULL) != 0) { // LCOV_EXCL_START @@ -3446,10 +3820,10 @@ __monitor_run: if (old_num_threads < threads_min) { num_threads = threads_min; this->metrics.p_gauge_array[p_mon_gauge::mysql_monitor_workers]->Set(threads_min); - threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); + threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); started_threads += (num_threads - old_num_threads); for (unsigned int i = old_num_threads ; i < num_threads ; i++) { - threads[i] = new ConsumerThread(*queue, 0); + threads[i] = new ConsumerThread(*queue, 0); threads[i]->start(2048,false); } } @@ -3475,10 +3849,10 @@ __monitor_run: unsigned int old_num_threads = num_threads; num_threads += new_threads; this->metrics.p_gauge_array[p_mon_gauge::mysql_monitor_workers]->Increment(new_threads); - threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); + threads= (ConsumerThread **)realloc(threads, sizeof(ConsumerThread *)*num_threads); started_threads += new_threads; for (unsigned int i = old_num_threads ; i < num_threads ; i++) { - threads[i] = new ConsumerThread(*queue, 0); + threads[i] = new ConsumerThread(*queue, 0); threads[i]->start(2048,false); } } @@ -3494,11 +3868,11 @@ __monitor_run: } if (qsize > 0) { proxy_info("Monitor is starting %d helper threads\n", qsize); - ConsumerThread **threads_aux= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*qsize); + ConsumerThread **threads_aux= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*qsize); aux_threads = qsize; started_threads += aux_threads; for (unsigned int i=0; i(*queue, 245); threads_aux[i]->start(2048,false); } for (unsigned int i=0; i*item=NULL; GloMyMon->queue->add(item); } for (unsigned int i=0;i *item=NULL; GloMyMon->queue->add(item); } return NULL; @@ -5025,4 +5403,95 @@ void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int r #endif // TEST_AURORA } +std::string MySQL_Monitor::dns_lookup(const std::string& hostname) { + static thread_local std::shared_ptr dns_cache_thread; + const std::string& hostname_trim = trim(hostname); + std::string ip; + + if (!dns_cache_thread && GloMyMon) + dns_cache_thread = GloMyMon->dns_cache; + + if (dns_cache_thread) { + ip = dns_cache_thread->lookup(hostname_trim, false) ; + } + + return !ip.empty() ? ip : hostname_trim; +} + +std::string MySQL_Monitor::dns_lookup(const char* hostname) { + return MySQL_Monitor::dns_lookup(std::string(hostname)); +} + +bool DNS_Cache::add(const std::string& hostname, const std::string& ip) { + int rc = pthread_rwlock_wrlock(&rwlock_); + assert(rc == 0); + try { + records.emplace(hostname, ip); + } + catch (...) {} + rc = pthread_rwlock_unlock(&rwlock_); + assert(rc == 0); + + if (GloMyMon) + __sync_fetch_and_add(&GloMyMon->dns_cache_record_updated, 1); + + return true; +} + +std::string DNS_Cache::lookup(const std::string& hostname, bool return_hostname_if_lookup_fails) const { + std::string ip; + + __sync_fetch_and_add(&GloMyMon->dns_cache_queried, 1); + + int rc = pthread_rwlock_rdlock(&rwlock_); + assert(rc == 0); + auto itr = records.find(hostname); + + if (itr != records.end()) { + ip = itr->second; + } + rc = pthread_rwlock_unlock(&rwlock_); + assert(rc == 0); + + if (!ip.empty() && GloMyMon) { + __sync_fetch_and_add(&GloMyMon->dns_cache_lookup_success, 1); + } + + if (ip.empty() && return_hostname_if_lookup_fails) { + ip = hostname; + } + + return ip; +} + +void DNS_Cache::remove(const std::string& hostname) { + int rc = pthread_rwlock_wrlock(&rwlock_); + assert(rc == 0); + records.erase(hostname); + rc = pthread_rwlock_unlock(&rwlock_); + assert(rc == 0); + + if (GloMyMon) + __sync_fetch_and_add(&GloMyMon->dns_cache_record_updated, 1); +} + +void DNS_Cache::bulk_update(const std::list> bulk_record) { + + pthread_rwlock_wrlock(&rwlock_); + for (const auto& dns_record : bulk_record) { + if (dns_record.second == DNS_Cache::OPERATION::UPDATE) { + records[dns_record.first.hostname] = dns_record.first.ip; + } + else if (dns_record.second == DNS_Cache::OPERATION::REMOVE) { + records.erase(dns_record.first.hostname); + } + + if (GloMyMon) + __sync_fetch_and_add(&GloMyMon->dns_cache_record_updated, 1); + } + pthread_rwlock_unlock(&rwlock_); +} + +template class WorkItem; +template class WorkItem; \ No newline at end of file diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index 966270e3a..9f25729fa 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -218,7 +218,7 @@ void * kill_query_thread(void *arg) { default: break; } - ret=mysql_real_connect(mysql,ka->hostname,ka->username,ka->password,NULL,ka->port,NULL,0); + ret=mysql_real_connect(mysql, MySQL_Monitor::dns_lookup(ka->hostname).c_str(), ka->username, ka->password, NULL, ka->port, NULL, 0); } else { switch (ka->kill_type) { case KILL_QUERY: @@ -5271,8 +5271,8 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session=%p , DS=%p . User '%s' has exceeded the 'max_user_connections' resource (current value: %d)\n", this, client_myds, client_myds->myconn->userinfo->username, used_users); char *a=(char *)"User '%s' has exceeded the 'max_user_connections' resource (current value: %d)"; char *b=(char *)malloc(strlen(a)+strlen(client_myds->myconn->userinfo->username)+16); - GloMyLogger->log_audit_entry(PROXYSQL_MYSQL_AUTH_ERR, this, NULL, b); sprintf(b,a,client_myds->myconn->userinfo->username,used_users); + GloMyLogger->log_audit_entry(PROXYSQL_MYSQL_AUTH_ERR, this, NULL, b); client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1226,(char *)"42000", b, true); proxy_warning("User '%s' has exceeded the 'max_user_connections' resource (current value: %d)\n",client_myds->myconn->userinfo->username,used_users); free(b); diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 4993b14af..f564d2e09 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -480,6 +480,8 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_threads_min", (char *)"monitor_threads_max", (char *)"monitor_threads_queue_maxsize", + (char *)"monitor_local_dns_cache_ttl", + (char *)"monitor_local_dns_cache_refresh_interval", (char *)"monitor_wait_timeout", (char *)"monitor_writer_is_also_reader", (char *)"max_allowed_packet", @@ -1090,6 +1092,8 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_threads_min = 8; variables.monitor_threads_max = 128; variables.monitor_threads_queue_maxsize = 128; + variables.monitor_local_dns_cache_ttl = 300000; + variables.monitor_local_dns_cache_refresh_interval = 60000; variables.monitor_username=strdup((char *)"monitor"); variables.monitor_password=strdup((char *)"monitor"); variables.monitor_replication_lag_use_percona_heartbeat=strdup((char *)""); @@ -2176,6 +2180,9 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_slave_lag_when_null"] = make_tuple(&variables.monitor_slave_lag_when_null, 0, 604800, false); VariablesPointers_int["monitor_threads_queue_maxsize"] = make_tuple(&variables.monitor_threads_queue_maxsize, 16, 1024, false); + VariablesPointers_int["monitor_local_dns_cache_ttl"] = make_tuple(&variables.monitor_local_dns_cache_ttl, 100, 7*24*3600*1000, false); + VariablesPointers_int["monitor_local_dns_cache_refresh_interval"] = make_tuple(&variables.monitor_local_dns_cache_refresh_interval, 100, 600*1000, false); + // mirroring VariablesPointers_int["mirror_max_concurrency"] = make_tuple(&variables.mirror_max_concurrency, 1, 8*1024, false); VariablesPointers_int["mirror_max_queue_length"] = make_tuple(&variables.mirror_max_queue_length, 0, 1024*1024, false); @@ -3994,6 +4001,9 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_threads_min = GloMTH->get_variable_int((char *)"monitor_threads_min"); mysql_thread___monitor_threads_max = GloMTH->get_variable_int((char *)"monitor_threads_max"); mysql_thread___monitor_threads_queue_maxsize = GloMTH->get_variable_int((char *)"monitor_threads_queue_maxsize"); + mysql_thread___monitor_local_dns_cache_ttl = GloMTH->get_variable_int((char*)"monitor_local_dns_cache_ttl"); + mysql_thread___monitor_local_dns_cache_refresh_interval = GloMTH->get_variable_int((char*)"monitor_local_dns_cache_refresh_interval"); + if (mysql_thread___firewall_whitelist_errormsg) free(mysql_thread___firewall_whitelist_errormsg); mysql_thread___firewall_whitelist_errormsg=GloMTH->get_variable_string((char *)"firewall_whitelist_errormsg"); @@ -4572,6 +4582,24 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus(bool _memory) { pta[1]=buf; result->add_row(pta); } + { + pta[0] = (char*)"MySQL_Monitor_dns_cache_queried"; + sprintf(buf, "%llu", GloMyMon->dns_cache_queried); + pta[1] = buf; + result->add_row(pta); + } + { + pta[0] = (char*)"MySQL_Monitor_dns_cache_lookup_success"; + sprintf(buf, "%llu", GloMyMon->dns_cache_lookup_success); + pta[1] = buf; + result->add_row(pta); + } + { + pta[0] = (char*)"MySQL_Monitor_dns_cache_record_updated"; + sprintf(buf, "%llu", GloMyMon->dns_cache_record_updated); + pta[1] = buf; + result->add_row(pta); + } } free(pta); return result; diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index ac7789a3f..af18bc2f2 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -100,7 +100,7 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) { //mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); { unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val); } //rc_conn = mysql_real_connect(conn, node->hostname, username, password, NULL, node->port, NULL, CLIENT_COMPRESS); // FIXME: add optional support for compression - rc_conn = mysql_real_connect(conn, node->hostname, username, password, NULL, node->port, NULL, 0); + rc_conn = mysql_real_connect(conn, MySQL_Monitor::dns_lookup(node->hostname).c_str(), username, password, NULL, node->port, NULL, 0); // if (rc_conn) { // } //char *query = query1; @@ -881,7 +881,7 @@ void ProxySQL_Cluster::pull_mysql_query_rules_from_peer(const string& expected_c //mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); { unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val); } proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum.c_str()); - rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + rc_conn = mysql_real_connect(conn, MySQL_Monitor::dns_lookup(hostname).c_str(), username, password, NULL, port, NULL, 0); if (rc_conn) { MYSQL_RES *result1 = NULL; MYSQL_RES *result2 = NULL; @@ -1147,7 +1147,7 @@ void ProxySQL_Cluster::pull_mysql_users_from_peer(const string& expected_checksu { unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val); } proxy_info("Cluster: Fetching MySQL Users from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum.c_str()); - rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + rc_conn = mysql_real_connect(conn, MySQL_Monitor::dns_lookup(hostname).c_str(), username, password, NULL, port, NULL, 0); if (rc_conn == nullptr) { proxy_info("Cluster: Fetching MySQL Users from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); metrics.p_counter_array[p_cluster_counter::pulled_mysql_users_failure]->Increment(); @@ -1461,7 +1461,7 @@ void ProxySQL_Cluster::pull_mysql_servers_from_peer(const std::string& checksum, //mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); { unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val); } proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d started. Expected checksum %s\n", hostname, port, peer_checksum); - rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + rc_conn = mysql_real_connect(conn, MySQL_Monitor::dns_lookup(hostname).c_str(), username, password, NULL, port, NULL, 0); if (rc_conn) { std::vector results {}; @@ -1825,7 +1825,7 @@ void ProxySQL_Cluster::pull_global_variables_from_peer(const string& var_type, c //mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout); { unsigned char val = 1; mysql_options(conn, MYSQL_OPT_SSL_ENFORCE, &val); } proxy_info("Cluster: Fetching %s variables from peer %s:%d started\n", vars_type_str, hostname, port); - rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + rc_conn = mysql_real_connect(conn, MySQL_Monitor::dns_lookup(hostname).c_str(), username, password, NULL, port, NULL, 0); if (rc_conn) { std::string s_query = ""; @@ -1974,7 +1974,7 @@ void ProxySQL_Cluster::pull_proxysql_servers_from_peer(const std::string& expect "Cluster: Fetching ProxySQL Servers from peer %s:%d started. Expected checksum: %s\n", hostname, port, expected_checksum.c_str() ); - rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); + rc_conn = mysql_real_connect(conn, MySQL_Monitor::dns_lookup(hostname).c_str(), username, password, NULL, port, NULL, 0); if (rc_conn) { rc_query = mysql_query(conn,"SELECT hostname, port, weight, comment FROM runtime_proxysql_servers ORDER BY hostname, port"); if ( rc_query == 0 ) { diff --git a/lib/gen_utils.cpp b/lib/gen_utils.cpp index d02e3faf2..7be885797 100644 --- a/lib/gen_utils.cpp +++ b/lib/gen_utils.cpp @@ -112,7 +112,6 @@ char *trim_spaces_and_quotes_in_place(char *str) { return str; } - bool mywildcmp(const char *p, const char *str) { if (*p == '\0') { if (*str == '\0') { @@ -144,6 +143,18 @@ bool mywildcmp(const char *p, const char *str) { return false; } +std::string trim(const std::string& s) +{ + if (s.length() == 0) + return s; + + std::string::size_type b = s.find_first_not_of(" \t\n"); + std::string::size_type e = s.find_last_not_of(" \t\n"); + if (b == std::string::npos) + return ""; + + return std::string(s, b, e - b + 1); +} void * PtrSizeArray::operator new(size_t size) { return l_alloc(size); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 81646e719..344c87862 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -808,7 +808,7 @@ void MySQL_Connection::connect_start() { } } if (parent->port) { - async_exit_status=mysql_real_connect_start(&ret_mysql, mysql, parent->address, userinfo->username, auth_password, userinfo->schemaname, parent->port, NULL, client_flags); + async_exit_status=mysql_real_connect_start(&ret_mysql, mysql, MySQL_Monitor::dns_lookup(parent->address).c_str(), userinfo->username, auth_password, userinfo->schemaname, parent->port, NULL, client_flags); } else { async_exit_status=mysql_real_connect_start(&ret_mysql, mysql, "localhost", userinfo->username, auth_password, userinfo->schemaname, parent->port, parent->address, client_flags); }