* Added hostname and ip information in mysql_connection

* Added multiple IP support with load balancing (Round Robin Scheduling)
* Added DNS resolver request max queue size
* Added AI_ADDRCONFIG flag
* Exception handling
pull/4024/head
Rahim Kanji 3 years ago
parent eef3ddfcd6
commit dbfb99ce71

@ -131,7 +131,6 @@ class MySrvC { // MySQL Server Container
public:
MyHGC *myhgc;
char *address;
char* resolved_ip;
uint16_t port;
uint16_t gtid_port;
uint16_t flags;

@ -273,25 +273,24 @@ struct DNS_Cache_Record {
DNS_Cache_Record(const DNS_Cache_Record&) = default;
DNS_Cache_Record& operator=(DNS_Cache_Record&&) = default;
DNS_Cache_Record& operator=(const DNS_Cache_Record&) = default;
DNS_Cache_Record(const std::string& hostname, const std::string& ip, unsigned long long ttl = 0) : hostname(hostname),
ip(ip), ttl(ttl)
DNS_Cache_Record(const std::string& hostname, const std::vector<std::string>& ips, unsigned long long ttl = 0) : hostname_(hostname),
ttl_(ttl) {
std::copy(ips.begin(), ips.end(), std::inserter(ips_, ips_.end()));
}
DNS_Cache_Record(const std::string& hostname, std::set<std::string>&& ips, unsigned long long ttl = 0) : hostname_(hostname),
ips_(std::move(ips)), ttl_(ttl)
{ }
~DNS_Cache_Record() = default;
std::string hostname;
std::string ip;
unsigned long long ttl = 0;
std::string hostname_;
std::set<std::string> ips_;
unsigned long long ttl_ = 0;
};
class DNS_Cache {
public:
enum class OPERATION {
UPDATE,
REMOVE
};
DNS_Cache() : enabled(true) {
int rc = pthread_rwlock_init(&rwlock_, NULL);
assert(rc == 0);
@ -306,14 +305,19 @@ public:
enabled = value;
}
bool add(const std::string& hostname, const std::string& ip);
bool add(const std::string& hostname, std::vector<std::string>&& ips);
void remove(const std::string& hostname);
void clear();
std::string lookup(const std::string& hostname) const;
void bulk_update(const std::list<std::pair<DNS_Cache_Record, OPERATION>> bulk_record);
private:
std::unordered_map<std::string, std::string> records;
struct IP_ADDR {
std::vector<std::string> ips;
unsigned long counter = 0;
};
std::string get_next_ip(const IP_ADDR& ip_addr) const;
std::unordered_map<std::string, IP_ADDR> records;
std::atomic_bool enabled;
mutable pthread_rwlock_t rwlock_;
};
@ -322,7 +326,7 @@ struct DNS_Resolve_Data {
std::promise<std::tuple<bool, DNS_Cache_Record>> result;
std::shared_ptr<DNS_Cache> dns_cache;
std::string hostname;
std::string cached_ip;
std::set<std::string> cached_ips;
unsigned int ttl;
};

@ -457,6 +457,7 @@ class MySQL_Threads_Handler
int monitor_threads_queue_maxsize;
int monitor_local_dns_cache_ttl;
int monitor_local_dns_cache_refresh_interval;
int monitor_local_dns_resolver_queue_maxsize;
char *monitor_username;
char *monitor_password;
char * monitor_replication_lag_use_percona_heartbeat;

@ -108,6 +108,11 @@ class MySQL_Connection {
MySrvC *parent;
MySQL_Connection_userinfo *userinfo;
MySQL_Data_Stream *myds;
struct {
char* hostname;
char* ip;
} connected_host_details;
/**
* @brief Keeps tracks of the 'server_status'. Do not confuse with the 'server_status' from the
* 'MYSQL' connection itself. This flag keeps track of the configured server status from the

@ -895,6 +895,7 @@ __thread int mysql_thread___monitor_threads_max;
__thread int mysql_thread___monitor_threads_queue_maxsize;
__thread int mysql_thread___monitor_local_dns_cache_ttl;
__thread int mysql_thread___monitor_local_dns_cache_refresh_interval;
__thread int mysql_thread___monitor_local_dns_resolver_queue_maxsize;
__thread char * mysql_thread___monitor_username;
__thread char * mysql_thread___monitor_password;
__thread char * mysql_thread___monitor_replication_lag_use_percona_heartbeat;
@ -1058,6 +1059,7 @@ extern __thread int mysql_thread___monitor_threads_max;
extern __thread int mysql_thread___monitor_threads_queue_maxsize;
extern __thread int mysql_thread___monitor_local_dns_cache_ttl;
extern __thread int mysql_thread___monitor_local_dns_cache_refresh_interval;
extern __thread int mysql_thread___monitor_local_dns_resolver_queue_maxsize;
extern __thread char * mysql_thread___monitor_username;
extern __thread char * mysql_thread___monitor_password;
extern __thread char * mysql_thread___monitor_replication_lag_use_percona_heartbeat;

@ -839,7 +839,7 @@ void MySrvConnList::drop_all_connections() {
}
MySrvC::MySrvC(char *add, uint16_t p, uint16_t gp, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag, unsigned int _use_ssl, unsigned int _max_latency_ms, char *_comment) : resolved_ip(NULL) {
MySrvC::MySrvC(char *add, uint16_t p, uint16_t gp, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag, unsigned int _use_ssl, unsigned int _max_latency_ms, char *_comment) {
address=strdup(add);
port=p;
gtid_port=gp;
@ -951,7 +951,6 @@ void MySrvC::shun_and_killall() {
MySrvC::~MySrvC() {
if (address) free(address);
if (comment) free(comment);
if (resolved_ip) free(resolved_ip);
delete ConnectionsUsed;
delete ConnectionsFree;
}

@ -3418,71 +3418,96 @@ void* monitor_dns_resolver_thread(void* args) {
DNS_Resolve_Data* dns_resolve_data = static_cast<DNS_Resolve_Data*>(args);
struct addrinfo hints, *res = NULL;
int gai_rc;
/* set hints for getaddrinfo */
memset(&hints, 0, sizeof(hints));
hints.ai_protocol = IPPROTO_TCP; /* TCP connections only */
hints.ai_family = AF_UNSPEC; /* includes: IPv4, IPv6 or hostname */
hints.ai_protocol = IPPROTO_TCP;
hints.ai_family = AF_UNSPEC; /*includes: IPv4, IPv6*/
hints.ai_socktype = SOCK_STREAM;
/* AI_ADDRCONFIG: IPv4 addresses are returned in the list pointed to by res only if the
local system has at least one IPv4 address configured, and IPv6
addresses are returned only if the local system has at least one
IPv6 address configured. The loopback address is not considered
for this case as valid as a configured address. This flag is
useful on, for example, IPv4-only systems, to ensure that
getaddrinfo() does not return IPv6 socket addresses that would
always fail in connect or bind. */
hints.ai_flags = AI_ADDRCONFIG;
int gai_rc = getaddrinfo(dns_resolve_data->hostname.c_str(), NULL, &hints, &res);
if (gai_rc != 0 || !res)
{
proxy_error("An error occurred while resolving hostname: %s [%d]\n", dns_resolve_data->hostname.c_str(), gai_rc);
goto __error;
}
//unsigned int wait_gai = 1;
try {
std::vector<std::string> ips;
ips.reserve(64);
//time_t start_t = time(NULL);
char ip_addr[INET6_ADDRSTRLEN];
//while ((gai_rc = getaddrinfo(hostname.c_str(), NULL, &hints, &res)) == EAI_AGAIN)
//{
// if (time(NULL) - start_t > (time_t)10)
// break;
for (auto p = res; p != NULL; p = p->ai_next) {
if (p->ai_family == AF_INET) {
struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr;
inet_ntop(p->ai_addr->sa_family, &ipv4->sin_addr, ip_addr, INET_ADDRSTRLEN);
ips.push_back(ip_addr);
}
else {
struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)p->ai_addr;
inet_ntop(p->ai_addr->sa_family, &ipv6->sin6_addr, ip_addr, INET6_ADDRSTRLEN);
ips.push_back(ip_addr);
}
}
// usleep(wait_gai);
// wait_gai *= 2;
//}
try {
gai_rc = getaddrinfo(dns_resolve_data->hostname.c_str(), NULL, &hints, &res);
freeaddrinfo(res);
if (gai_rc != 0 || !res)
{
proxy_error("An error occurred while resolving hostname: %s [%d]\n", dns_resolve_data->hostname.c_str(), gai_rc);
dns_resolve_data->result.set_value(std::make_tuple<>(false, DNS_Cache_Record()));
return NULL;
}
if (!ips.empty()) {
bool to_update_cache = false;
char ip_addr[INET6_ADDRSTRLEN];
if (!dns_resolve_data->cached_ips.empty()) {
switch (res->ai_family) {
case AF_INET: {
struct sockaddr_in* ipv4 = (struct sockaddr_in*)res->ai_addr;
inet_ntop(res->ai_addr->sa_family, &ipv4->sin_addr, ip_addr, INET_ADDRSTRLEN);
break;
}
case AF_INET6: {
struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)res->ai_addr;
inet_ntop(res->ai_addr->sa_family, &ipv6->sin6_addr, ip_addr, INET6_ADDRSTRLEN);
break;
}
}
if (dns_resolve_data->cached_ips.size() == ips.size()) {
for (const std::string& ip : ips) {
freeaddrinfo(res);
if (dns_resolve_data->cached_ips.find(ip) == dns_resolve_data->cached_ips.end()) {
to_update_cache = true;
break;
}
}
}
else
to_update_cache = true;
if (ip_addr) {
if (!dns_resolve_data->cached_ip.empty() && strncmp(dns_resolve_data->cached_ip.c_str(), ip_addr, sizeof(ip_addr) - 1) == 0) {
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ip_addr, monotonic_time() + (1000 * dns_resolve_data->ttl))));
// only update dns_records_bookkeeping
if (!to_update_cache) {
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, std::move(dns_resolve_data->cached_ips), monotonic_time() + (1000 * dns_resolve_data->ttl))));
}
}
else {
dns_resolve_data->dns_cache->add(dns_resolve_data->hostname, ip_addr);
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ip_addr, monotonic_time() + (1000 * dns_resolve_data->ttl))));
else
to_update_cache = true;
if (to_update_cache) {
dns_resolve_data->result.set_value(std::make_tuple<>(true, DNS_Cache_Record(dns_resolve_data->hostname, ips, monotonic_time() + (1000 * dns_resolve_data->ttl))));
dns_resolve_data->dns_cache->add(dns_resolve_data->hostname, std::move(ips));
}
return NULL;
}
}
catch (...)
{
catch (std::exception& ex) {
proxy_error("An exception occurred while resolving hostname: %s [%s]\n", dns_resolve_data->hostname.c_str(), ex.what());
}
catch (...) {
proxy_error("An unknown exception has occurred while resolving hostname: %s\n", dns_resolve_data->hostname.c_str());
}
__error:
dns_resolve_data->result.set_value(std::make_tuple<>(false, DNS_Cache_Record()));
return NULL;
}
@ -3495,12 +3520,17 @@ void* MySQL_Monitor::monitor_dns_cache() {
mysql_thr->refresh_variables();
if (!GloMTH) return NULL; // quick exit during shutdown/restart
constexpr unsigned int num_dns_resolver_threads = 1;
constexpr unsigned int num_dns_resolver_max_threads = 32;
unsigned long long t1 = 0;
unsigned long long t2 = 0;
unsigned long long next_loop_at = 0;
bool dns_cache_enable = true;
// Bookkeeper for dns records and ttl
std::list<DNS_Cache_Record> dns_records_bookkeeping;
// Queue for DNS resolver request
wqueue<WorkItem<DNS_Resolve_Data>*> dns_resolver_queue;
while (GloMyMon->shutdown == false) {
@ -3514,8 +3544,7 @@ void* MySQL_Monitor::monitor_dns_cache() {
// dns cache is disabled
if (mysql_thread___monitor_local_dns_cache_ttl == 0 ||
mysql_thread___monitor_local_dns_cache_refresh_interval == 0)
{
mysql_thread___monitor_local_dns_cache_refresh_interval == 0) {
dns_cache_enable = false;
dns_cache->set_enabled_flag(false);
dns_cache->clear();
@ -3532,8 +3561,9 @@ void* MySQL_Monitor::monitor_dns_cache() {
}*/
}
else {
//dns cache enabled
dns_cache_enable = true;
dns_cache->set_enabled_flag(true); //dns cache enabled
dns_cache->set_enabled_flag(true);
}
}
@ -3569,13 +3599,16 @@ void* MySQL_Monitor::monitor_dns_cache() {
}
else {
if (resultset->rows_count == 0) {
goto __end_monitor_dns_cache_loop;
}
// Remove orphaned records if any
if (dns_records_bookkeeping.empty() == false) {
for (const auto& dns_record : dns_records_bookkeeping)
dns_cache->remove(dns_record.hostname_);
constexpr unsigned int num_dns_resolver_threads = 1;
constexpr unsigned int num_dns_resolver_max_threads = 16;
constexpr unsigned int num_dns_resolver_queue_max_size = 256;
dns_records_bookkeeping.clear();
}
goto __end_monitor_dns_cache_loop;
}
std::vector<DNSResolverThread*> dns_resolver_threads(num_dns_resolver_threads);
@ -3589,6 +3622,7 @@ void* MySQL_Monitor::monitor_dns_cache() {
for (const auto row : resultset->rows) {
const std::string& hostname = row->fields[0];
// Add only hostnames/domain and ignore IPs
if (!validate_ip(hostname))
hostnames.insert(hostname);
}
@ -3600,22 +3634,23 @@ void* MySQL_Monitor::monitor_dns_cache() {
for (auto itr = dns_records_bookkeeping.cbegin();
itr != dns_records_bookkeeping.cend();) {
if (hostnames.find(itr->hostname) == hostnames.end()) {
dns_cache->remove(itr->hostname); // to check
// remove orphaned records
if (hostnames.find(itr->hostname_) == hostnames.end()) {
dns_cache->remove(itr->hostname_);
itr = dns_records_bookkeeping.erase(itr);
}
else {
hostnames.erase(itr->hostname);
hostnames.erase(itr->hostname_);
if (current_time > itr->ttl) {
// Renew dns records if expired
if (current_time > itr->ttl_) {
std::unique_ptr<DNS_Resolve_Data> dns_resolve_data(new DNS_Resolve_Data());
dns_resolve_data->hostname = itr->hostname;
dns_resolve_data->cached_ip = itr->ip;
dns_resolve_data->hostname = std::move(itr->hostname_);
dns_resolve_data->cached_ips = std::move(itr->ips_);
dns_resolve_data->ttl = mysql_thread___monitor_local_dns_cache_ttl;
dns_resolve_data->dns_cache = dns_cache;
dns_resolve_result.emplace_back(dns_resolve_data->result.get_future());
dns_resolver_queue.add(new WorkItem<DNS_Resolve_Data>(dns_resolve_data.release(), monitor_dns_resolver_thread));
itr = dns_records_bookkeeping.erase(itr);
continue;
}
@ -3629,7 +3664,9 @@ void* MySQL_Monitor::monitor_dns_cache() {
unsigned int qsize = dns_resolver_queue.size();
unsigned int num_threads = dns_resolver_threads.size();
if (qsize > num_dns_resolver_queue_max_size / 8) {
if (qsize > mysql_thread___monitor_local_dns_resolver_queue_maxsize / 8) {
proxy_warning("DNS resolver queue too big: %d\n", qsize);
unsigned int threads_max = num_dns_resolver_max_threads;
if (threads_max > num_threads) {
@ -3669,7 +3706,9 @@ void* MySQL_Monitor::monitor_dns_cache() {
unsigned int qsize = dns_resolver_queue.size();
unsigned int num_threads = dns_resolver_threads.size();
if (qsize > num_dns_resolver_queue_max_size / 8) {
if (qsize > mysql_thread___monitor_local_dns_resolver_queue_maxsize / 8) {
proxy_warning("DNS resolver queue too big: %d\n", qsize);
unsigned int threads_max = num_dns_resolver_max_threads;
if (threads_max > num_threads) {
@ -3684,6 +3723,8 @@ void* MySQL_Monitor::monitor_dns_cache() {
num_threads += new_threads;
dns_resolver_threads.resize(num_threads);
proxy_info("Starting %d helper threads\n", new_threads);
for (unsigned int i = old_num_threads; i < num_threads; i++) {
dns_resolver_threads[i] = new DNSResolverThread(dns_resolver_queue, 0);
dns_resolver_threads[i]->start(2048, false);
@ -3693,10 +3734,11 @@ void* MySQL_Monitor::monitor_dns_cache() {
}
}
// close all worker threads
for (size_t i = 0; i < dns_resolver_threads.size(); i++)
dns_resolver_queue.add(NULL);
// update dns records with ip and ttl
for (auto& dns_result : dns_resolve_result) {
auto ret_value = dns_result.get();
@ -3706,7 +3748,6 @@ void* MySQL_Monitor::monitor_dns_cache() {
}
}
for (DNSResolverThread* const dns_resolver_thread : dns_resolver_threads) {
dns_resolver_thread->join();
delete dns_resolver_thread;
@ -3755,6 +3796,7 @@ void * MySQL_Monitor::run() {
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 2048 * 1024);
// DNS Cache is not dependent on monitor enable flag, so need to initialize it here
pthread_t monitor_dns_cache_thread;
if (pthread_create(&monitor_dns_cache_thread, &attr, &monitor_dns_cache_pthread, NULL) != 0) {
// LCOV_EXCL_START
@ -5427,16 +5469,17 @@ void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int r
std::string MySQL_Monitor::dns_lookup(const std::string& hostname, bool return_hostname_if_lookup_fails) {
static thread_local std::shared_ptr<DNS_Cache> dns_cache_thread;
// if IP was provided, no need to do lookup
if (validate_ip(hostname))
return hostname;
static thread_local std::shared_ptr<DNS_Cache> dns_cache_thread;
std::string ip;
if (!dns_cache_thread && GloMyMon)
dns_cache_thread = GloMyMon->dns_cache;
std::string ip;
if (dns_cache_thread) {
ip = dns_cache_thread->lookup(trim(hostname)) ;
@ -5452,16 +5495,15 @@ std::string MySQL_Monitor::dns_lookup(const char* hostname, bool return_hostname
return MySQL_Monitor::dns_lookup(std::string(hostname), return_hostname_if_lookup_fails);
}
bool DNS_Cache::add(const std::string& hostname, const std::string& ip) {
bool DNS_Cache::add(const std::string& hostname, std::vector<std::string>&& ips) {
if (!enabled) return false;
int rc = pthread_rwlock_wrlock(&rwlock_);
assert(rc == 0);
try {
records[hostname] = ip;
}
catch (...) {}
auto& ip_addr = records[hostname];
ip_addr.ips = std::move(ips);
__sync_fetch_and_and(&ip_addr.counter, 0);
rc = pthread_rwlock_unlock(&rwlock_);
assert(rc == 0);
@ -5471,6 +5513,16 @@ bool DNS_Cache::add(const std::string& hostname, const std::string& ip) {
return true;
}
std::string DNS_Cache::get_next_ip(const IP_ADDR& ip_addr) const {
if (ip_addr.ips.empty())
return "";
const auto counter_val = __sync_fetch_and_add(const_cast<unsigned long*>(&ip_addr.counter), 1);
return ip_addr.ips[counter_val%ip_addr.ips.size()];
}
std::string DNS_Cache::lookup(const std::string& hostname) const {
if (!enabled) return "";
@ -5483,7 +5535,7 @@ std::string DNS_Cache::lookup(const std::string& hostname) const {
auto itr = records.find(hostname);
if (itr != records.end()) {
ip = itr->second;
ip = get_next_ip(itr->second);
}
rc = pthread_rwlock_unlock(&rwlock_);
assert(rc == 0);
@ -5496,14 +5548,23 @@ std::string DNS_Cache::lookup(const std::string& hostname) const {
}
void DNS_Cache::remove(const std::string& hostname) {
bool item_removed = false;
int rc = pthread_rwlock_wrlock(&rwlock_);
assert(rc == 0);
records.erase(hostname);
auto itr = records.find(hostname);
if (itr != records.end()) {
records.erase(itr);
item_removed = true;
}
rc = pthread_rwlock_unlock(&rwlock_);
assert(rc == 0);
if (GloMyMon)
if (item_removed && GloMyMon)
__sync_fetch_and_add(&GloMyMon->dns_cache_record_updated, 1);
assert(rc == 0);
}
void DNS_Cache::clear() {
@ -5514,24 +5575,5 @@ void DNS_Cache::clear() {
assert(rc == 0);
}
void DNS_Cache::bulk_update(const std::list<std::pair<DNS_Cache_Record, OPERATION>> bulk_record) {
if (!enabled) return;
pthread_rwlock_wrlock(&rwlock_);
for (const auto& dns_record : bulk_record) {
if (dns_record.second == DNS_Cache::OPERATION::UPDATE) {
records[dns_record.first.hostname] = dns_record.first.ip;
}
else if (dns_record.second == DNS_Cache::OPERATION::REMOVE) {
records.erase(dns_record.first.hostname);
}
if (GloMyMon)
__sync_fetch_and_add(&GloMyMon->dns_cache_record_updated, 1);
}
pthread_rwlock_unlock(&rwlock_);
}
template class WorkItem<MySQL_Monitor_State_Data>;
template class WorkItem<DNS_Resolve_Data>;

@ -482,6 +482,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_threads_queue_maxsize",
(char *)"monitor_local_dns_cache_ttl",
(char *)"monitor_local_dns_cache_refresh_interval",
(char *)"monitor_local_dns_resolver_queue_maxsize",
(char *)"monitor_wait_timeout",
(char *)"monitor_writer_is_also_reader",
(char *)"max_allowed_packet",
@ -1094,6 +1095,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_threads_queue_maxsize = 128;
variables.monitor_local_dns_cache_ttl = 300000;
variables.monitor_local_dns_cache_refresh_interval = 60000;
variables.monitor_local_dns_resolver_queue_maxsize = 128;
variables.monitor_username=strdup((char *)"monitor");
variables.monitor_password=strdup((char *)"monitor");
variables.monitor_replication_lag_use_percona_heartbeat=strdup((char *)"");
@ -2182,7 +2184,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["monitor_local_dns_cache_ttl"] = make_tuple(&variables.monitor_local_dns_cache_ttl, 0, 7*24*3600*1000, false);
VariablesPointers_int["monitor_local_dns_cache_refresh_interval"] = make_tuple(&variables.monitor_local_dns_cache_refresh_interval, 0, 7*24*3600*1000, false);
VariablesPointers_int["monitor_local_dns_resolver_queue_maxsize"] = make_tuple(&variables.monitor_local_dns_resolver_queue_maxsize, 16, 1024, false);
// mirroring
VariablesPointers_int["mirror_max_concurrency"] = make_tuple(&variables.mirror_max_concurrency, 1, 8*1024, false);
VariablesPointers_int["mirror_max_queue_length"] = make_tuple(&variables.mirror_max_queue_length, 0, 1024*1024, false);
@ -4003,7 +4005,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___monitor_threads_queue_maxsize = GloMTH->get_variable_int((char *)"monitor_threads_queue_maxsize");
mysql_thread___monitor_local_dns_cache_ttl = GloMTH->get_variable_int((char*)"monitor_local_dns_cache_ttl");
mysql_thread___monitor_local_dns_cache_refresh_interval = GloMTH->get_variable_int((char*)"monitor_local_dns_cache_refresh_interval");
mysql_thread___monitor_local_dns_resolver_queue_maxsize = GloMTH->get_variable_int((char*)"monitor_local_dns_resolver_queue_maxsize");
if (mysql_thread___firewall_whitelist_errormsg) free(mysql_thread___firewall_whitelist_errormsg);
mysql_thread___firewall_whitelist_errormsg=GloMTH->get_variable_string((char *)"firewall_whitelist_errormsg");

@ -449,6 +449,7 @@ MySQL_Connection::MySQL_Connection() {
statuses.myconnpoll_get = 0;
statuses.myconnpoll_put = 0;
memset(gtid_uuid,0,sizeof(gtid_uuid));
memset(&connected_host_details, 0, sizeof(connected_host_details));
};
MySQL_Connection::~MySQL_Connection() {
@ -508,6 +509,11 @@ MySQL_Connection::~MySQL_Connection() {
}
}
if (connected_host_details.hostname)
free(connected_host_details.hostname);
if (connected_host_details.ip)
free(connected_host_details.ip);
};
bool MySQL_Connection::set_autocommit(bool _ac) {
@ -813,18 +819,27 @@ void MySQL_Connection::connect_start() {
const std::string& res_ip = MySQL_Monitor::dns_lookup(parent->address, false);
if (!res_ip.empty()) {
if (connected_host_details.hostname) {
if (strcmp(connected_host_details.hostname, parent->address) != 0) {
connected_host_details.hostname = (char*)realloc(connected_host_details.hostname, strlen(parent->address) + 1);
strcpy(connected_host_details.hostname, parent->address);
}
}
else {
connected_host_details.hostname = strdup(parent->address);
}
if (parent->resolved_ip) {
if (strcmp(parent->resolved_ip, res_ip.c_str()) != 0) {
free(parent->resolved_ip);
parent->resolved_ip = strdup(res_ip.c_str());
if (connected_host_details.ip) {
if (strcmp(connected_host_details.ip, res_ip.c_str()) != 0) {
connected_host_details.ip = (char*)realloc(connected_host_details.ip, res_ip.size() * sizeof(char) + 1);
strcpy(connected_host_details.ip, res_ip.c_str());
}
}
else {
parent->resolved_ip = strdup(res_ip.c_str());
connected_host_details.ip = strdup(res_ip.c_str());
}
host_ip = parent->resolved_ip;
host_ip = connected_host_details.ip;
}
else {
host_ip = parent->address;

@ -0,0 +1,280 @@
/**
* @file test_dns_cache-t.cpp
* @brief This test will verify dns cache is working properly.
*/
#include <stdio.h>
#include <unistd.h>
#include <string>
#include <algorithm>
#include <functional>
#include <map>
#include <vector>
#include <chrono>
#include <thread>
#include <mysql.h>
#include <mysql/mysqld_error.h>
#include "tap.h"
#include "command_line.h"
#include "utils.h"
std::vector<std::string> split(const std::string& s, char delimiter) {
std::vector<std::string> tokens {};
std::string token {};
std::istringstream tokenStream(s);
while (std::getline(tokenStream, token, delimiter)) {
tokens.push_back(token);
}
return tokens;
}
/**
* @brief Extract the metrics values from the output of the admin command
* 'SHOW PROMETHEUS METRICS'.
* @param metrics_output The output of the command 'SHOW PROMETHEUS METRICS'.
* @return A map holding the metrics identifier and its current value.
*/
std::map<std::string, double> get_metric_values(const std::string& metrics_output) {
std::vector<std::string> output_lines { split(metrics_output, '\n') };
std::map<std::string, double> metrics_map {};
for (const std::string line : output_lines) {
const std::vector<std::string> line_values { split(line, ' ') };
if (line.empty() == false && line[0] != '#') {
if (line_values.size() > 2) {
size_t delim_pos_st = line.rfind("} ");
std::string metric_key = line.substr(0, delim_pos_st);
std::string metric_val = line.substr(delim_pos_st + 2);
metrics_map.insert({metric_key, std::stod(metric_val)});
} else {
metrics_map.insert({line_values.front(), std::stod(line_values.back())});
}
}
}
return metrics_map;
}
bool get_prometheus_metrics(MYSQL* proxysql_admin, std::map<std::string, double>& matric_val) {
matric_val.clear();
if (mysql_query(proxysql_admin, "SHOW PROMETHEUS METRICS\\G")) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin));
return false;
}
MYSQL_RES* p_resulset = mysql_store_result(proxysql_admin);
MYSQL_ROW data_row = mysql_fetch_row(p_resulset);
std::string row_value{};
if (data_row[0]) {
row_value = data_row[0];
}
else {
row_value = "NULL";
}
mysql_free_result(p_resulset);
matric_val = get_metric_values(row_value);
return true;
}
#define STEP_START {
#define STEP_END }
#define DECLARE_PREV_AFTER_METRICS() std::map<std::string, double> prev_metrics, after_metrics
#define EXECUTE_QUERY(QUERY,MYSQL_CONNECTION,IGNORE_RESULT) [&MYSQL_CONNECTION]() -> bool { if (mysql_query(std::ref(MYSQL_CONNECTION), QUERY) && !IGNORE_RESULT) { \
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(std::ref(MYSQL_CONNECTION))); \
return false; } return true; }
#define DELAY_SEC(SECONDS) []() -> bool { std::this_thread::sleep_for(std::chrono::seconds(SECONDS)); return true; }
#define UPDATE_PREV_METRICS(PROXYSQL_ADMIN) std::bind(get_prometheus_metrics, std::ref(PROXYSQL_ADMIN), std::ref(prev_metrics))
#define UPDATE_AFTER_METRICS(PROXYSQL_ADMIN) std::bind(get_prometheus_metrics, std::ref(PROXYSQL_ADMIN), std::ref(after_metrics))
#define CHECK_RESULT(a,b) std::bind(check_result<a>, b, std::ref(prev_metrics), std::ref(after_metrics))
#define LOOP_FUNC(FUNC,TIMES) [&]() -> bool { for(int i=0; i < TIMES; i++) { \
if (FUNC() == false) return false; } \
return true;}
template<typename COMPARE>
bool check_result(const std::string& key, const std::map<std::string, double>& prev_metrics, std::map<std::string, double>& after_metrics) {
auto prev_metric_key = prev_metrics.find(key);
auto after_metric_key = after_metrics.find(key);
bool metric_found = prev_metric_key != prev_metrics.end() && after_metric_key != after_metrics.end();
ok(metric_found, "'%s' metric was present in output from 'SHOW PROMETHEUS METRICS'", key.c_str());
if (metric_found) {
const double prev_metric_val = prev_metric_key->second;
const double after_metric_val = after_metric_key->second;
diag("Started test for metric '%s'", key.c_str());
COMPARE fn;
bool res = fn(after_metric_val, prev_metric_val);
ok(res, "'%s' metric result success.",key.c_str());
}
return true;
}
int main(int argc, char** argv) {
putenv("TAP_HOST=127.0.0.1");
putenv("TAP_PORT=6033");
putenv("TAP_USERNAME=root");
putenv("TAP_PASSWORD=root");
putenv("TAP_WORKDIR=./tests/");
CommandLine cl;
if (cl.getEnv()) {
diag("Failed to get the required environmental variables.");
return -1;
}
// Initialize Admin connection
MYSQL* proxysql_admin = mysql_init(NULL);
if (!proxysql_admin) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin));
return -1;
}
// Connnect to ProxySQL Admin
if (!mysql_real_connect(proxysql_admin, cl.host, cl.admin_username, cl.admin_password, NULL, cl.admin_port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql_admin));
return -1;
}
// Initialize ProxySQL connection
MYSQL* proxysql = mysql_init(NULL);
if (!proxysql) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql));
return -1;
}
// Connect to ProxySQL
if (!mysql_real_connect(proxysql, cl.host, cl.username, cl.password, NULL, cl.port, NULL, 0)) {
fprintf(stderr, "File %s, line %d, Error: %s\n", __FILE__, __LINE__, mysql_error(proxysql));
return exit_status();
}
DECLARE_PREV_AFTER_METRICS();
std::vector<std::vector<std::function<bool()>>> dns_cache_check_steps = {
STEP_START
EXECUTE_QUERY("SET mysql-monitor_enabled='false'", proxysql_admin, false),
EXECUTE_QUERY("SET mysql-monitor_local_dns_cache_refresh_interval=1000", proxysql_admin, false),
EXECUTE_QUERY("SET mysql-monitor_local_dns_cache_ttl=5000", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL VARIABLES TO RUNTIME", proxysql_admin, false),
EXECUTE_QUERY("DELETE FROM mysql_servers", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false),
DELAY_SEC(2)
STEP_END,
STEP_START
UPDATE_PREV_METRICS(proxysql_admin),
EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,'0.0.0.0',7861,0,1000,'dummy entry')", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false),
DELAY_SEC(2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END,
STEP_START
UPDATE_PREV_METRICS(proxysql_admin),
LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2),
DELAY_SEC(2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END,
STEP_START
UPDATE_PREV_METRICS(proxysql_admin),
LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2),
DELAY_SEC(2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END,
STEP_START
UPDATE_PREV_METRICS(proxysql_admin),
EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,'google.com',7861,0,1000,'dummy entry')", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false),
DELAY_SEC(2),
LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END,
STEP_START
UPDATE_PREV_METRICS(proxysql_admin),
EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,' yahoo.com ',7861,0,1000,'dummy entry')", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false),
DELAY_SEC(2),
LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END,
STEP_START
UPDATE_PREV_METRICS(proxysql_admin),
EXECUTE_QUERY("DELETE FROM mysql_servers", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false),
DELAY_SEC(2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END,
STEP_START
UPDATE_PREV_METRICS(proxysql_admin),
EXECUTE_QUERY("INSERT INTO mysql_servers (hostgroup_id,hostname,port,max_replication_lag,max_connections,comment) VALUES (0,'INVALID_DOMAIN',7861,0,1000,'dummy entry')", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL SERVERS TO RUNTIME", proxysql_admin, false),
DELAY_SEC(2),
LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::greater<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END,
STEP_START
//disable dns cache
EXECUTE_QUERY("SET mysql-monitor_local_dns_cache_refresh_interval=0", proxysql_admin, false),
EXECUTE_QUERY("LOAD MYSQL VARIABLES TO RUNTIME", proxysql_admin, false),
DELAY_SEC(2),
UPDATE_PREV_METRICS(proxysql_admin),
LOOP_FUNC(EXECUTE_QUERY("SELECT 1", proxysql, true), 2),
UPDATE_AFTER_METRICS(proxysql_admin),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_record_updated"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_lookup_success"),
CHECK_RESULT(std::equal_to<double>, "proxysql_mysql_monitor_dns_cache_queried")
STEP_END
};
plan((dns_cache_check_steps.size() -1) * 3 * 2);
for (size_t i = 0; i < dns_cache_check_steps.size(); i++) {
diag("Starting Step:'%ld'", i);
for (const auto fn : dns_cache_check_steps[i])
if (fn() == false)
goto __cleanup;
diag("Ending Step:'%ld'\n", i);
}
__cleanup:
mysql_close(proxysql);
mysql_close(proxysql_admin);
return exit_status();
}
Loading…
Cancel
Save