From dbfb99ce71393719b5dbb1e61d4a7b7d5221ed20 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Tue, 29 Nov 2022 14:05:17 +0500 Subject: [PATCH] * Added hostname and ip information in mysql_connection * Added multiple IP support with load balancing (Round Robin Scheduling) * Added DNS resolver request max queue size * Added AI_ADDRCONFIG flag * Exception handling --- include/MySQL_HostGroups_Manager.h | 1 - include/MySQL_Monitor.hpp | 34 ++-- include/MySQL_Thread.h | 1 + include/mysql_connection.h | 5 + include/proxysql_structs.h | 2 + lib/MySQL_HostGroups_Manager.cpp | 3 +- lib/MySQL_Monitor.cpp | 228 +++++++++++++--------- lib/MySQL_Thread.cpp | 6 +- lib/mysql_connection.cpp | 27 ++- test/tap/tests/test_dns_cache-t.cpp | 280 ++++++++++++++++++++++++++++ 10 files changed, 468 insertions(+), 119 deletions(-) create mode 100644 test/tap/tests/test_dns_cache-t.cpp diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index cef0858cc..9014767fd 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -131,7 +131,6 @@ class MySrvC { // MySQL Server Container public: MyHGC *myhgc; char *address; - char* resolved_ip; uint16_t port; uint16_t gtid_port; uint16_t flags; diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 113ecfcf1..7cd32c794 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -273,25 +273,24 @@ struct DNS_Cache_Record { DNS_Cache_Record(const DNS_Cache_Record&) = default; DNS_Cache_Record& operator=(DNS_Cache_Record&&) = default; DNS_Cache_Record& operator=(const DNS_Cache_Record&) = default; - DNS_Cache_Record(const std::string& hostname, const std::string& ip, unsigned long long ttl = 0) : hostname(hostname), - ip(ip), ttl(ttl) + DNS_Cache_Record(const std::string& hostname, const std::vector& ips, unsigned long long ttl = 0) : hostname_(hostname), + ttl_(ttl) { + std::copy(ips.begin(), ips.end(), std::inserter(ips_, ips_.end())); + } + DNS_Cache_Record(const std::string& hostname, std::set&& ips, unsigned long long ttl = 0) : hostname_(hostname), + ips_(std::move(ips)), ttl_(ttl) { } + ~DNS_Cache_Record() = default; - std::string hostname; - std::string ip; - unsigned long long ttl = 0; + std::string hostname_; + std::set ips_; + unsigned long long ttl_ = 0; }; class DNS_Cache { public: - enum class OPERATION { - UPDATE, - REMOVE - }; - - DNS_Cache() : enabled(true) { int rc = pthread_rwlock_init(&rwlock_, NULL); assert(rc == 0); @@ -306,14 +305,19 @@ public: enabled = value; } - bool add(const std::string& hostname, const std::string& ip); + bool add(const std::string& hostname, std::vector&& ips); void remove(const std::string& hostname); void clear(); std::string lookup(const std::string& hostname) const; - void bulk_update(const std::list> bulk_record); private: - std::unordered_map records; + struct IP_ADDR { + std::vector ips; + unsigned long counter = 0; + }; + + std::string get_next_ip(const IP_ADDR& ip_addr) const; + std::unordered_map records; std::atomic_bool enabled; mutable pthread_rwlock_t rwlock_; }; @@ -322,7 +326,7 @@ struct DNS_Resolve_Data { std::promise> result; std::shared_ptr dns_cache; std::string hostname; - std::string cached_ip; + std::set cached_ips; unsigned int ttl; }; diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index f84bfee6f..a9533ba2f 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -457,6 +457,7 @@ class MySQL_Threads_Handler int monitor_threads_queue_maxsize; int monitor_local_dns_cache_ttl; int monitor_local_dns_cache_refresh_interval; + int monitor_local_dns_resolver_queue_maxsize; char *monitor_username; char *monitor_password; char * monitor_replication_lag_use_percona_heartbeat; diff --git a/include/mysql_connection.h b/include/mysql_connection.h index 186bc25fc..43e8dadf3 100644 --- a/include/mysql_connection.h +++ b/include/mysql_connection.h @@ -108,6 +108,11 @@ class MySQL_Connection { MySrvC *parent; MySQL_Connection_userinfo *userinfo; MySQL_Data_Stream *myds; + + struct { + char* hostname; + char* ip; + } connected_host_details; /** * @brief Keeps tracks of the 'server_status'. Do not confuse with the 'server_status' from the * 'MYSQL' connection itself. This flag keeps track of the configured server status from the diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 8b54f4cdd..6737f5843 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -895,6 +895,7 @@ __thread int mysql_thread___monitor_threads_max; __thread int mysql_thread___monitor_threads_queue_maxsize; __thread int mysql_thread___monitor_local_dns_cache_ttl; __thread int mysql_thread___monitor_local_dns_cache_refresh_interval; +__thread int mysql_thread___monitor_local_dns_resolver_queue_maxsize; __thread char * mysql_thread___monitor_username; __thread char * mysql_thread___monitor_password; __thread char * mysql_thread___monitor_replication_lag_use_percona_heartbeat; @@ -1058,6 +1059,7 @@ extern __thread int mysql_thread___monitor_threads_max; extern __thread int mysql_thread___monitor_threads_queue_maxsize; extern __thread int mysql_thread___monitor_local_dns_cache_ttl; extern __thread int mysql_thread___monitor_local_dns_cache_refresh_interval; +extern __thread int mysql_thread___monitor_local_dns_resolver_queue_maxsize; extern __thread char * mysql_thread___monitor_username; extern __thread char * mysql_thread___monitor_password; extern __thread char * mysql_thread___monitor_replication_lag_use_percona_heartbeat; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index b8c43a2a2..36e9cd72a 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -839,7 +839,7 @@ void MySrvConnList::drop_all_connections() { } -MySrvC::MySrvC(char *add, uint16_t p, uint16_t gp, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag, unsigned int _use_ssl, unsigned int _max_latency_ms, char *_comment) : resolved_ip(NULL) { +MySrvC::MySrvC(char *add, uint16_t p, uint16_t gp, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag, unsigned int _use_ssl, unsigned int _max_latency_ms, char *_comment) { address=strdup(add); port=p; gtid_port=gp; @@ -951,7 +951,6 @@ void MySrvC::shun_and_killall() { MySrvC::~MySrvC() { if (address) free(address); if (comment) free(comment); - if (resolved_ip) free(resolved_ip); delete ConnectionsUsed; delete ConnectionsFree; } diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 41279dbb3..e77d7fc6d 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -3418,71 +3418,96 @@ void* monitor_dns_resolver_thread(void* args) { DNS_Resolve_Data* dns_resolve_data = static_cast(args); struct addrinfo hints, *res = NULL; - int gai_rc; /* set hints for getaddrinfo */ memset(&hints, 0, sizeof(hints)); - hints.ai_protocol = IPPROTO_TCP; /* TCP connections only */ - hints.ai_family = AF_UNSPEC; /* includes: IPv4, IPv6 or hostname */ + hints.ai_protocol = IPPROTO_TCP; + hints.ai_family = AF_UNSPEC; /*includes: IPv4, IPv6*/ hints.ai_socktype = SOCK_STREAM; + /* AI_ADDRCONFIG: IPv4 addresses are returned in the list pointed to by res only if the + local system has at least one IPv4 address configured, and IPv6 + addresses are returned only if the local system has at least one + IPv6 address configured. The loopback address is not considered + for this case as valid as a configured address. This flag is + useful on, for example, IPv4-only systems, to ensure that + getaddrinfo() does not return IPv6 socket addresses that would + always fail in connect or bind. */ + hints.ai_flags = AI_ADDRCONFIG; + + int gai_rc = getaddrinfo(dns_resolve_data->hostname.c_str(), NULL, &hints, &res); + + if (gai_rc != 0 || !res) + { + proxy_error("An error occurred while resolving hostname: %s [%d]\n", dns_resolve_data->hostname.c_str(), gai_rc); + goto __error; + } - //unsigned int wait_gai = 1; + try { + std::vector ips; + ips.reserve(64); - //time_t start_t = time(NULL); + char ip_addr[INET6_ADDRSTRLEN]; - //while ((gai_rc = getaddrinfo(hostname.c_str(), NULL, &hints, &res)) == EAI_AGAIN) - //{ - // if (time(NULL) - start_t > (time_t)10) - // break; + for (auto p = res; p != NULL; p = p->ai_next) { + + if (p->ai_family == AF_INET) { + struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr; + inet_ntop(p->ai_addr->sa_family, &ipv4->sin_addr, ip_addr, INET_ADDRSTRLEN); + ips.push_back(ip_addr); + } + else { + struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)p->ai_addr; + inet_ntop(p->ai_addr->sa_family, &ipv6->sin6_addr, ip_addr, INET6_ADDRSTRLEN); + ips.push_back(ip_addr); + } + } - // usleep(wait_gai); - // wait_gai *= 2; - //} - try { - gai_rc = getaddrinfo(dns_resolve_data->hostname.c_str(), NULL, &hints, &res); + freeaddrinfo(res); - if (gai_rc != 0 || !res) - { - proxy_error("An error occurred while resolving hostname: %s [%d]\n", dns_resolve_data->hostname.c_str(), gai_rc); - dns_resolve_data->result.set_value(std::make_tuple<>(false, DNS_Cache_Record())); - return NULL; - } + if (!ips.empty()) { + bool to_update_cache = false; - char ip_addr[INET6_ADDRSTRLEN]; + if (!dns_resolve_data->cached_ips.empty()) { - switch (res->ai_family) { - case AF_INET: { - struct sockaddr_in* ipv4 = (struct sockaddr_in*)res->ai_addr; - inet_ntop(res->ai_addr->sa_family, &ipv4->sin_addr, ip_addr, INET_ADDRSTRLEN); - break; - } - case AF_INET6: { - struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)res->ai_addr; - inet_ntop(res->ai_addr->sa_family, &ipv6->sin6_addr, ip_addr, INET6_ADDRSTRLEN); - break; - } - } + if (dns_resolve_data->cached_ips.size() == ips.size()) { + for (const std::string& ip : ips) { - freeaddrinfo(res); + if (dns_resolve_data->cached_ips.find(ip) == dns_resolve_data->cached_ips.end()) { + to_update_cache = true; + break; + } + } + } + else + to_update_cache = true; - if (ip_addr) { - if (!dns_resolve_data->cached_ip.empty() && strncmp(dns_resolve_data->cached_ip.c_str(), ip_addr, sizeof(ip_addr) - 1) == 0) { - dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ip_addr, monotonic_time() + (1000 * dns_resolve_data->ttl)))); + // only update dns_records_bookkeeping + if (!to_update_cache) { + dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, std::move(dns_resolve_data->cached_ips), monotonic_time() + (1000 * dns_resolve_data->ttl)))); + } } - else { - dns_resolve_data->dns_cache->add(dns_resolve_data->hostname, ip_addr); - dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ip_addr, monotonic_time() + (1000 * dns_resolve_data->ttl)))); + else + to_update_cache = true; + + if (to_update_cache) { + dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ips, monotonic_time() + (1000 * dns_resolve_data->ttl)))); + dns_resolve_data->dns_cache->add(dns_resolve_data->hostname, std::move(ips)); } return NULL; } } - catch (...) - { + catch (std::exception& ex) { + proxy_error("An exception occurred while resolving hostname: %s [%s]\n", dns_resolve_data->hostname.c_str(), ex.what()); } - + catch (...) { + proxy_error("An unknown exception has occurred while resolving hostname: %s\n", dns_resolve_data->hostname.c_str()); + } + +__error: dns_resolve_data->result.set_value(std::make_tuple<>(false, DNS_Cache_Record())); + return NULL; } @@ -3495,12 +3520,17 @@ void* MySQL_Monitor::monitor_dns_cache() { mysql_thr->refresh_variables(); if (!GloMTH) return NULL; // quick exit during shutdown/restart + constexpr unsigned int num_dns_resolver_threads = 1; + constexpr unsigned int num_dns_resolver_max_threads = 32; unsigned long long t1 = 0; unsigned long long t2 = 0; unsigned long long next_loop_at = 0; bool dns_cache_enable = true; + // Bookkeeper for dns records and ttl std::list dns_records_bookkeeping; + + // Queue for DNS resolver request wqueue*> dns_resolver_queue; while (GloMyMon->shutdown == false) { @@ -3514,8 +3544,7 @@ void* MySQL_Monitor::monitor_dns_cache() { // dns cache is disabled if (mysql_thread___monitor_local_dns_cache_ttl == 0 || - mysql_thread___monitor_local_dns_cache_refresh_interval == 0) - { + mysql_thread___monitor_local_dns_cache_refresh_interval == 0) { dns_cache_enable = false; dns_cache->set_enabled_flag(false); dns_cache->clear(); @@ -3532,8 +3561,9 @@ void* MySQL_Monitor::monitor_dns_cache() { }*/ } else { + //dns cache enabled dns_cache_enable = true; - dns_cache->set_enabled_flag(true); //dns cache enabled + dns_cache->set_enabled_flag(true); } } @@ -3569,13 +3599,16 @@ void* MySQL_Monitor::monitor_dns_cache() { } else { if (resultset->rows_count == 0) { - goto __end_monitor_dns_cache_loop; - } + // Remove orphaned records if any + if (dns_records_bookkeeping.empty() == false) { + for (const auto& dns_record : dns_records_bookkeeping) + dns_cache->remove(dns_record.hostname_); - constexpr unsigned int num_dns_resolver_threads = 1; - constexpr unsigned int num_dns_resolver_max_threads = 16; - constexpr unsigned int num_dns_resolver_queue_max_size = 256; + dns_records_bookkeeping.clear(); + } + goto __end_monitor_dns_cache_loop; + } std::vector dns_resolver_threads(num_dns_resolver_threads); @@ -3589,6 +3622,7 @@ void* MySQL_Monitor::monitor_dns_cache() { for (const auto row : resultset->rows) { const std::string& hostname = row->fields[0]; + // Add only hostnames/domain and ignore IPs if (!validate_ip(hostname)) hostnames.insert(hostname); } @@ -3600,22 +3634,23 @@ void* MySQL_Monitor::monitor_dns_cache() { for (auto itr = dns_records_bookkeeping.cbegin(); itr != dns_records_bookkeeping.cend();) { - if (hostnames.find(itr->hostname) == hostnames.end()) { - dns_cache->remove(itr->hostname); // to check + // remove orphaned records + if (hostnames.find(itr->hostname_) == hostnames.end()) { + dns_cache->remove(itr->hostname_); itr = dns_records_bookkeeping.erase(itr); } else { - hostnames.erase(itr->hostname); + hostnames.erase(itr->hostname_); - if (current_time > itr->ttl) { + // Renew dns records if expired + if (current_time > itr->ttl_) { std::unique_ptr dns_resolve_data(new DNS_Resolve_Data()); - dns_resolve_data->hostname = itr->hostname; - dns_resolve_data->cached_ip = itr->ip; + dns_resolve_data->hostname = std::move(itr->hostname_); + dns_resolve_data->cached_ips = std::move(itr->ips_); dns_resolve_data->ttl = mysql_thread___monitor_local_dns_cache_ttl; dns_resolve_data->dns_cache = dns_cache; dns_resolve_result.emplace_back(dns_resolve_data->result.get_future()); dns_resolver_queue.add(new WorkItem(dns_resolve_data.release(), monitor_dns_resolver_thread)); - itr = dns_records_bookkeeping.erase(itr); continue; } @@ -3629,7 +3664,9 @@ void* MySQL_Monitor::monitor_dns_cache() { unsigned int qsize = dns_resolver_queue.size(); unsigned int num_threads = dns_resolver_threads.size(); - if (qsize > num_dns_resolver_queue_max_size / 8) { + if (qsize > mysql_thread___monitor_local_dns_resolver_queue_maxsize / 8) { + proxy_warning("DNS resolver queue too big: %d\n", qsize); + unsigned int threads_max = num_dns_resolver_max_threads; if (threads_max > num_threads) { @@ -3669,7 +3706,9 @@ void* MySQL_Monitor::monitor_dns_cache() { unsigned int qsize = dns_resolver_queue.size(); unsigned int num_threads = dns_resolver_threads.size(); - if (qsize > num_dns_resolver_queue_max_size / 8) { + if (qsize > mysql_thread___monitor_local_dns_resolver_queue_maxsize / 8) { + proxy_warning("DNS resolver queue too big: %d\n", qsize); + unsigned int threads_max = num_dns_resolver_max_threads; if (threads_max > num_threads) { @@ -3684,6 +3723,8 @@ void* MySQL_Monitor::monitor_dns_cache() { num_threads += new_threads; dns_resolver_threads.resize(num_threads); + proxy_info("Starting %d helper threads\n", new_threads); + for (unsigned int i = old_num_threads; i < num_threads; i++) { dns_resolver_threads[i] = new DNSResolverThread(dns_resolver_queue, 0); dns_resolver_threads[i]->start(2048, false); @@ -3693,10 +3734,11 @@ void* MySQL_Monitor::monitor_dns_cache() { } } + // close all worker threads for (size_t i = 0; i < dns_resolver_threads.size(); i++) dns_resolver_queue.add(NULL); - + // update dns records with ip and ttl for (auto& dns_result : dns_resolve_result) { auto ret_value = dns_result.get(); @@ -3706,7 +3748,6 @@ void* MySQL_Monitor::monitor_dns_cache() { } } - for (DNSResolverThread* const dns_resolver_thread : dns_resolver_threads) { dns_resolver_thread->join(); delete dns_resolver_thread; @@ -3755,6 +3796,7 @@ void * MySQL_Monitor::run() { pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 2048 * 1024); + // DNS Cache is not dependent on monitor enable flag, so need to initialize it here pthread_t monitor_dns_cache_thread; if (pthread_create(&monitor_dns_cache_thread, &attr, &monitor_dns_cache_pthread, NULL) != 0) { // LCOV_EXCL_START @@ -5427,16 +5469,17 @@ void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int r std::string MySQL_Monitor::dns_lookup(const std::string& hostname, bool return_hostname_if_lookup_fails) { + static thread_local std::shared_ptr dns_cache_thread; + // if IP was provided, no need to do lookup if (validate_ip(hostname)) return hostname; - static thread_local std::shared_ptr dns_cache_thread; - std::string ip; - if (!dns_cache_thread && GloMyMon) dns_cache_thread = GloMyMon->dns_cache; + std::string ip; + if (dns_cache_thread) { ip = dns_cache_thread->lookup(trim(hostname)) ; @@ -5452,16 +5495,15 @@ std::string MySQL_Monitor::dns_lookup(const char* hostname, bool return_hostname return MySQL_Monitor::dns_lookup(std::string(hostname), return_hostname_if_lookup_fails); } -bool DNS_Cache::add(const std::string& hostname, const std::string& ip) { +bool DNS_Cache::add(const std::string& hostname, std::vector&& ips) { if (!enabled) return false; int rc = pthread_rwlock_wrlock(&rwlock_); assert(rc == 0); - try { - records[hostname] = ip; - } - catch (...) {} + auto& ip_addr = records[hostname]; + ip_addr.ips = std::move(ips); + __sync_fetch_and_and(&ip_addr.counter, 0); rc = pthread_rwlock_unlock(&rwlock_); assert(rc == 0); @@ -5471,6 +5513,16 @@ bool DNS_Cache::add(const std::string& hostname, const std::string& ip) { return true; } +std::string DNS_Cache::get_next_ip(const IP_ADDR& ip_addr) const { + + if (ip_addr.ips.empty()) + return ""; + + const auto counter_val = __sync_fetch_and_add(const_cast(&ip_addr.counter), 1); + + return ip_addr.ips[counter_val%ip_addr.ips.size()]; +} + std::string DNS_Cache::lookup(const std::string& hostname) const { if (!enabled) return ""; @@ -5483,7 +5535,7 @@ std::string DNS_Cache::lookup(const std::string& hostname) const { auto itr = records.find(hostname); if (itr != records.end()) { - ip = itr->second; + ip = get_next_ip(itr->second); } rc = pthread_rwlock_unlock(&rwlock_); assert(rc == 0); @@ -5496,14 +5548,23 @@ std::string DNS_Cache::lookup(const std::string& hostname) const { } void DNS_Cache::remove(const std::string& hostname) { + bool item_removed = false; + int rc = pthread_rwlock_wrlock(&rwlock_); assert(rc == 0); - records.erase(hostname); + auto itr = records.find(hostname); + if (itr != records.end()) { + records.erase(itr); + item_removed = true; + } rc = pthread_rwlock_unlock(&rwlock_); - assert(rc == 0); - if (GloMyMon) + if (item_removed && GloMyMon) __sync_fetch_and_add(&GloMyMon->dns_cache_record_updated, 1); + + assert(rc == 0); + + } void DNS_Cache::clear() { @@ -5514,24 +5575,5 @@ void DNS_Cache::clear() { assert(rc == 0); } -void DNS_Cache::bulk_update(const std::list> bulk_record) { - if (!enabled) return; - - pthread_rwlock_wrlock(&rwlock_); - for (const auto& dns_record : bulk_record) { - if (dns_record.second == DNS_Cache::OPERATION::UPDATE) { - records[dns_record.first.hostname] = dns_record.first.ip; - } - else if (dns_record.second == DNS_Cache::OPERATION::REMOVE) { - records.erase(dns_record.first.hostname); - } - - if (GloMyMon) - __sync_fetch_and_add(&GloMyMon->dns_cache_record_updated, 1); - } - pthread_rwlock_unlock(&rwlock_); -} - - template class WorkItem; template class WorkItem; \ No newline at end of file diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 805345e48..52d3096f9 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -482,6 +482,7 @@ static char * mysql_thread_variables_names[]= { (char *)"monitor_threads_queue_maxsize", (char *)"monitor_local_dns_cache_ttl", (char *)"monitor_local_dns_cache_refresh_interval", + (char *)"monitor_local_dns_resolver_queue_maxsize", (char *)"monitor_wait_timeout", (char *)"monitor_writer_is_also_reader", (char *)"max_allowed_packet", @@ -1094,6 +1095,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.monitor_threads_queue_maxsize = 128; variables.monitor_local_dns_cache_ttl = 300000; variables.monitor_local_dns_cache_refresh_interval = 60000; + variables.monitor_local_dns_resolver_queue_maxsize = 128; variables.monitor_username=strdup((char *)"monitor"); variables.monitor_password=strdup((char *)"monitor"); variables.monitor_replication_lag_use_percona_heartbeat=strdup((char *)""); @@ -2182,7 +2184,7 @@ char ** MySQL_Threads_Handler::get_variables_list() { VariablesPointers_int["monitor_local_dns_cache_ttl"] = make_tuple(&variables.monitor_local_dns_cache_ttl, 0, 7*24*3600*1000, false); VariablesPointers_int["monitor_local_dns_cache_refresh_interval"] = make_tuple(&variables.monitor_local_dns_cache_refresh_interval, 0, 7*24*3600*1000, false); - + VariablesPointers_int["monitor_local_dns_resolver_queue_maxsize"] = make_tuple(&variables.monitor_local_dns_resolver_queue_maxsize, 16, 1024, false); // mirroring VariablesPointers_int["mirror_max_concurrency"] = make_tuple(&variables.mirror_max_concurrency, 1, 8*1024, false); VariablesPointers_int["mirror_max_queue_length"] = make_tuple(&variables.mirror_max_queue_length, 0, 1024*1024, false); @@ -4003,7 +4005,7 @@ void MySQL_Thread::refresh_variables() { mysql_thread___monitor_threads_queue_maxsize = GloMTH->get_variable_int((char *)"monitor_threads_queue_maxsize"); mysql_thread___monitor_local_dns_cache_ttl = GloMTH->get_variable_int((char*)"monitor_local_dns_cache_ttl"); mysql_thread___monitor_local_dns_cache_refresh_interval = GloMTH->get_variable_int((char*)"monitor_local_dns_cache_refresh_interval"); - + mysql_thread___monitor_local_dns_resolver_queue_maxsize = GloMTH->get_variable_int((char*)"monitor_local_dns_resolver_queue_maxsize"); if (mysql_thread___firewall_whitelist_errormsg) free(mysql_thread___firewall_whitelist_errormsg); mysql_thread___firewall_whitelist_errormsg=GloMTH->get_variable_string((char *)"firewall_whitelist_errormsg"); diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 6e4e97733..60937b7da 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -449,6 +449,7 @@ MySQL_Connection::MySQL_Connection() { statuses.myconnpoll_get = 0; statuses.myconnpoll_put = 0; memset(gtid_uuid,0,sizeof(gtid_uuid)); + memset(&connected_host_details, 0, sizeof(connected_host_details)); }; MySQL_Connection::~MySQL_Connection() { @@ -508,6 +509,11 @@ MySQL_Connection::~MySQL_Connection() { } } + if (connected_host_details.hostname) + free(connected_host_details.hostname); + + if (connected_host_details.ip) + free(connected_host_details.ip); }; bool MySQL_Connection::set_autocommit(bool _ac) { @@ -813,18 +819,27 @@ void MySQL_Connection::connect_start() { const std::string& res_ip = MySQL_Monitor::dns_lookup(parent->address, false); if (!res_ip.empty()) { + if (connected_host_details.hostname) { + if (strcmp(connected_host_details.hostname, parent->address) != 0) { + connected_host_details.hostname = (char*)realloc(connected_host_details.hostname, strlen(parent->address) + 1); + strcpy(connected_host_details.hostname, parent->address); + } + } + else { + connected_host_details.hostname = strdup(parent->address); + } - if (parent->resolved_ip) { - if (strcmp(parent->resolved_ip, res_ip.c_str()) != 0) { - free(parent->resolved_ip); - parent->resolved_ip = strdup(res_ip.c_str()); + if (connected_host_details.ip) { + if (strcmp(connected_host_details.ip, res_ip.c_str()) != 0) { + connected_host_details.ip = (char*)realloc(connected_host_details.ip, res_ip.size() * sizeof(char) + 1); + strcpy(connected_host_details.ip, res_ip.c_str()); } } else { - parent->resolved_ip = strdup(res_ip.c_str()); + connected_host_details.ip = strdup(res_ip.c_str()); } - host_ip = parent->resolved_ip; + host_ip = connected_host_details.ip; } else { host_ip = parent->address; diff --git a/test/tap/tests/test_dns_cache-t.cpp b/test/tap/tests/test_dns_cache-t.cpp new file mode 100644 index 000000000..ffe1920ce --- /dev/null +++ b/test/tap/tests/test_dns_cache-t.cpp @@ -0,0 +1,280 @@ +/** + * @file test_dns_cache-t.cpp + * @brief This test will verify dns cache is working properly. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "tap.h" +#include "command_line.h" +#include "utils.h" + +std::vector split(const std::string& s, char delimiter) { + std::vector tokens {}; + std::string token {}; + std::istringstream tokenStream(s); + + while (std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + + return tokens; +} + +/** + * @brief Extract the metrics values from the output of the admin command + * 'SHOW PROMETHEUS METRICS'. + * @param metrics_output The output of the command 'SHOW PROMETHEUS METRICS'. + * @return A map holding the metrics identifier and its current value. + */ +std::map get_metric_values(const std::string& metrics_output) { + std::vector output_lines { split(metrics_output, '\n') }; + std::map metrics_map {}; + + for (const std::string line : output_lines) { + const std::vector line_values { split(line, ' ') }; + + if (line.empty() == false && line[0] != '#') { + if (line_values.size() > 2) { + size_t delim_pos_st = line.rfind("} "); + std::string metric_key = line.substr(0, delim_pos_st); + std::string metric_val = line.substr(delim_pos_st + 2); + + metrics_map.insert({metric_key, std::stod(metric_val)}); + } else { + metrics_map.insert({line_values.front(), std::stod(line_values.back())}); + } + } + } + + return metrics_map; +} + +bool get_prometheus_metrics(MYSQL* proxysql_admin, std::map& matric_val) { + matric_val.clear(); + + if (mysql_query(proxysql_admin, "SHOW PROMETHEUS METRICS\\G")) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return false; + } + + MYSQL_RES* p_resulset = mysql_store_result(proxysql_admin); + MYSQL_ROW data_row = mysql_fetch_row(p_resulset); + std::string row_value{}; + if (data_row[0]) { + row_value = data_row[0]; + } + else { + row_value = "NULL"; + } + mysql_free_result(p_resulset); + + matric_val = get_metric_values(row_value); + + return true; +} + + + +#define STEP_START { +#define STEP_END } + +#define DECLARE_PREV_AFTER_METRICS() std::map prev_metrics, after_metrics +#define EXECUTE_QUERY(QUERY,MYSQL_CONNECTION,IGNORE_RESULT) [&MYSQL_CONNECTION]() -> bool { if (mysql_query(std::ref(MYSQL_CONNECTION), QUERY) && !IGNORE_RESULT) { \ + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(std::ref(MYSQL_CONNECTION))); \ + return false; } return true; } +#define DELAY_SEC(SECONDS) []() -> bool { std::this_thread::sleep_for(std::chrono::seconds(SECONDS)); return true; } +#define UPDATE_PREV_METRICS(PROXYSQL_ADMIN) std::bind(get_prometheus_metrics, std::ref(PROXYSQL_ADMIN), std::ref(prev_metrics)) +#define UPDATE_AFTER_METRICS(PROXYSQL_ADMIN) std::bind(get_prometheus_metrics, std::ref(PROXYSQL_ADMIN), std::ref(after_metrics)) +#define CHECK_RESULT(a,b) std::bind(check_result, b, std::ref(prev_metrics), std::ref(after_metrics)) +#define LOOP_FUNC(FUNC,TIMES) [&]() -> bool { for(int i=0; i < TIMES; i++) { \ + if (FUNC() == false) return false; } \ + return true;} + +template +bool check_result(const std::string& key, const std::map& prev_metrics, std::map& after_metrics) { + auto prev_metric_key = prev_metrics.find(key); + auto after_metric_key = after_metrics.find(key); + + bool metric_found = prev_metric_key != prev_metrics.end() && after_metric_key != after_metrics.end(); + + ok(metric_found, "'%s' metric was present in output from 'SHOW PROMETHEUS METRICS'", key.c_str()); + + if (metric_found) { + const double prev_metric_val = prev_metric_key->second; + const double after_metric_val = after_metric_key->second; + + diag("Started test for metric '%s'", key.c_str()); + + COMPARE fn; + bool res = fn(after_metric_val, prev_metric_val); + + ok(res, "'%s' metric result success.",key.c_str()); + } + + return true; +} + +int main(int argc, char** argv) { + putenv("TAP_HOST=127.0.0.1"); + putenv("TAP_PORT=6033"); + putenv("TAP_USERNAME=root"); + putenv("TAP_PASSWORD=root"); + putenv("TAP_WORKDIR=./tests/"); + + CommandLine cl; + + if (cl.getEnv()) { + diag("Failed to get the required environmental variables."); + return -1; + } + + // Initialize Admin connection + MYSQL* proxysql_admin = mysql_init(NULL); + if (!proxysql_admin) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + // Connnect to ProxySQL Admin + if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin)); + return -1; + } + + // Initialize ProxySQL connection + MYSQL* proxysql = mysql_init(NULL); + if (!proxysql) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql)); + return -1; + } + // Connect to ProxySQL + if (!mysql_real_connect(proxysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) { + fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql)); + return exit_status(); + } + + DECLARE_PREV_AFTER_METRICS(); + + std::vector>> dns_cache_check_steps = { + STEP_START + EXECUTE_QUERY("SET mysql-monitor_enabled='false'", proxysql_admin, false), + EXECUTE_QUERY("SET mysql-monitor_local_dns_cache_refresh_interval=1000", proxysql_admin, false), + EXECUTE_QUERY("SET mysql-monitor_local_dns_cache_ttl=5000", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL VARIABLES TO RUNTIME", proxysql_admin, false), + EXECUTE_QUERY("DELETE FROM mysql_servers", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false), + DELAY_SEC(2) + STEP_END, + STEP_START + UPDATE_PREV_METRICS(proxysql_admin), + EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,'0.0.0.0',7861,0,1000,'dummy entry')", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false), + DELAY_SEC(2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END, + STEP_START + UPDATE_PREV_METRICS(proxysql_admin), + LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2), + DELAY_SEC(2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END, + STEP_START + UPDATE_PREV_METRICS(proxysql_admin), + LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2), + DELAY_SEC(2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END, + STEP_START + UPDATE_PREV_METRICS(proxysql_admin), + EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,'google.com',7861,0,1000,'dummy entry')", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false), + DELAY_SEC(2), + LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END, + STEP_START + UPDATE_PREV_METRICS(proxysql_admin), + EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,' yahoo.com ',7861,0,1000,'dummy entry')", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false), + DELAY_SEC(2), + LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END, + STEP_START + UPDATE_PREV_METRICS(proxysql_admin), + EXECUTE_QUERY("DELETE FROM mysql_servers", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false), + DELAY_SEC(2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END, + STEP_START + UPDATE_PREV_METRICS(proxysql_admin), + EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,'INVALID_DOMAIN',7861,0,1000,'dummy entry')", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false), + DELAY_SEC(2), + LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::greater, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END, + STEP_START + //disable dns cache + EXECUTE_QUERY("SET mysql-monitor_local_dns_cache_refresh_interval=0", proxysql_admin, false), + EXECUTE_QUERY("LOAD MYSQL VARIABLES TO RUNTIME", proxysql_admin, false), + DELAY_SEC(2), + UPDATE_PREV_METRICS(proxysql_admin), + LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2), + UPDATE_AFTER_METRICS(proxysql_admin), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_record_updated"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_lookup_success"), + CHECK_RESULT(std::equal_to, "proxysql_mysql_monitor_dns_cache_queried") + STEP_END + }; + + plan((dns_cache_check_steps.size() -1) * 3 * 2); + + for (size_t i = 0; i < dns_cache_check_steps.size(); i++) { + diag("Starting Step:'%ld'", i); + for (const auto fn : dns_cache_check_steps[i]) + if (fn() == false) + goto __cleanup; + diag("Ending Step:'%ld'\n", i); + } + +__cleanup: + mysql_close(proxysql); + mysql_close(proxysql_admin); + + return exit_status(); +}