Address CodeRabbit + Gemini review feedback

Fixes for actionable findings on PR #5806:

DNS_Cache.cpp / hpp:
- lookup(): initialize *ip_count = 0 on the disabled-cache fast path so
  callers don't observe a stale count from a previous lookup
  (CodeRabbit).
- add_if_not_exist(): only bump counter_record_updated_ when an insert
  actually happened, and return that signal to callers.  Previously a
  no-op call inflated the cache-update stats (CodeRabbit).
- get_connected_peer_ip_from_socket(): drop the malloc, switch to a
  stack sockaddr_storage, and only assign result on AF_INET / AF_INET6
  with a successful inet_ntop.  Previously, on an unknown address
  family, we'd copy the uninitialized ip_addr buffer into result
  (Gemini + CodeRabbit).
- monitor_dns_resolver_thread(): cache_ttl is signed int and at the max
  configured TTL (7*24*3600*1000 ms) `1000 * cache_ttl` overflows
  before being added to monotonic_time().  Force the multiply into
  unsigned long long space with 1000ULL (Gemini).
- DNS_Cache::IP_ADDR::counter is now mutable so get_next_ip() (const)
  can __sync_fetch_and_add it without a const_cast (Gemini).

PgSQL_Monitor.cpp:
- monitor_dns_cache(): refresh pgsql_thread___monitor_local_dns_*
  before the loop, not just on the first version-bump.  Otherwise the
  resolver runs its first pass against zero-initialized TTL / refresh
  interval and starts in the wrong enabled/disabled state until a
  later config bump (CodeRabbit).

test/tap/tests/pgsql-test_dns_cache-t.cpp:
- Step 7 ("Cache disabled by refresh_interval=0") was inserting an IP
  literal (0.0.0.0), which bypasses DNS regardless of the cache state.
  Switched to a resolvable hostname (example.com) so a regression in
  the cache-disable path would actually move counters and fail the
  assertion.  Added a third assertion on dns_cache_lookup_success for
  symmetry.  Plan adjusted to 17.

Deliberately skipped:
- CR comment about mysql_thread___resolution_family change requiring
  a cache flush: pre-existing MySQL behavior, out of scope for this PR.
- CR comment about pthread_attr_init / setstacksize return checks in
  main.cpp: matches the existing un-checked pattern in
  MySQL_Monitor::run().  No need to diverge here.
- Gemini comment about persistent thread pool vs. per-iteration spawn:
  MySQL_Monitor::monitor_dns_cache() also recreates the resolver
  threads each loop iteration; the PgSQL variant follows the same
  pattern.
- Gemini comment about 2048-byte stack size: Thread::start takes ss
  in KB (lib/thread.cpp:46 `pthread_attr_setstacksize(&attr, ss*1024)`),
  so 2048 here means 2 MB, matching the MySQL resolver pool.
pgsql_dns_cache
Rene Cannao 1 month ago
parent c10b30523c
commit a62d1b4e99

@ -85,7 +85,11 @@ public:
private:
struct IP_ADDR {
std::vector<std::string> ips;
unsigned long counter = 0;
// 'counter' is bumped by get_next_ip() (a const method) for
// round-robin selection; the logical state of the cache record is
// unchanged, so mutable is the right tool here and lets us drop a
// const_cast at the call site.
mutable unsigned long counter = 0;
};
std::string get_next_ip(const IP_ADDR& ip_addr) const;

@ -35,31 +35,27 @@ std::string get_connected_peer_ip_from_socket(int socket_fd) {
std::string result;
char ip_addr[INET6_ADDRSTRLEN];
union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} custom_sockaddr;
struct sockaddr* addr = (struct sockaddr*)malloc(sizeof(custom_sockaddr));
struct sockaddr_storage custom_sockaddr;
socklen_t addrlen = sizeof(custom_sockaddr);
memset(addr, 0, sizeof(custom_sockaddr));
int rc = getpeername(socket_fd, addr, &addrlen);
if (rc == 0) {
if (addr->sa_family == AF_INET) {
struct sockaddr_in* ipv4 = (struct sockaddr_in*)addr;
inet_ntop(addr->sa_family, &ipv4->sin_addr, ip_addr, INET_ADDRSTRLEN);
}
else if (addr->sa_family == AF_INET6) {
struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)addr;
inet_ntop(addr->sa_family, &ipv6->sin6_addr, ip_addr, INET6_ADDRSTRLEN);
}
result = ip_addr;
memset(&custom_sockaddr, 0, sizeof(custom_sockaddr));
if (getpeername(socket_fd, (struct sockaddr*)&custom_sockaddr, &addrlen) != 0)
return result;
// Only assign to result when sa_family is one we know how to format.
// Other families (AF_UNIX, AF_NETLINK, ...) shouldn't happen for a TCP
// peer fd, but if they ever do we'd previously emit uninitialized memory
// from ip_addr.
if (custom_sockaddr.ss_family == AF_INET) {
const struct sockaddr_in* ipv4 = (const struct sockaddr_in*)&custom_sockaddr;
if (inet_ntop(AF_INET, &ipv4->sin_addr, ip_addr, INET_ADDRSTRLEN))
result = ip_addr;
}
else if (custom_sockaddr.ss_family == AF_INET6) {
const struct sockaddr_in6* ipv6 = (const struct sockaddr_in6*)&custom_sockaddr;
if (inet_ntop(AF_INET6, &ipv6->sin6_addr, ip_addr, INET6_ADDRSTRLEN))
result = ip_addr;
}
free(addr);
return result;
}
@ -144,7 +140,7 @@ void* monitor_dns_resolver_thread(const std::vector<DNS_Resolve_Data*>& dns_reso
dns_resolve_data->result.set_value(std::make_tuple<>(true,
DNS_Cache_Record(dns_resolve_data->hostname,
std::move(dns_resolve_data->cached_ips),
monotonic_time() + (1000 * cache_ttl))));
monotonic_time() + (1000ULL * static_cast<unsigned long long>(cache_ttl)))));
}
}
else
@ -153,7 +149,7 @@ void* monitor_dns_resolver_thread(const std::vector<DNS_Resolve_Data*>& dns_reso
if (to_update_cache) {
dns_resolve_data->result.set_value(std::make_tuple<>(true,
DNS_Cache_Record(dns_resolve_data->hostname, ips,
monotonic_time() + (1000 * cache_ttl))));
monotonic_time() + (1000ULL * static_cast<unsigned long long>(cache_ttl)))));
dns_resolve_data->dns_cache->add(dns_resolve_data->hostname, std::move(ips));
}
@ -217,6 +213,7 @@ bool DNS_Cache::add(const std::string& hostname, std::vector<std::string>&& ips)
bool DNS_Cache::add_if_not_exist(const std::string& hostname, std::vector<std::string>&& ips) {
if (!enabled) return false;
bool inserted = false;
int rc = pthread_rwlock_wrlock(&rwlock_);
assert(rc == 0);
if (records.find(hostname) == records.end()) {
@ -226,14 +223,15 @@ bool DNS_Cache::add_if_not_exist(const std::string& hostname, std::vector<std::s
auto& ip_addr = records[hostname];
ip_addr.ips = std::move(ips);
__sync_fetch_and_and(&ip_addr.counter, 0);
inserted = true;
}
rc = pthread_rwlock_unlock(&rwlock_);
assert(rc == 0);
if (counter_record_updated_)
if (inserted && counter_record_updated_)
__sync_fetch_and_add(counter_record_updated_, 1);
return true;
return inserted;
}
std::string DNS_Cache::get_next_ip(const IP_ADDR& ip_addr) const {
@ -241,13 +239,17 @@ std::string DNS_Cache::get_next_ip(const IP_ADDR& ip_addr) const {
if (ip_addr.ips.empty())
return "";
const auto counter_val = __sync_fetch_and_add(const_cast<unsigned long*>(&ip_addr.counter), 1);
const auto counter_val = __sync_fetch_and_add(&ip_addr.counter, 1);
return ip_addr.ips[counter_val % ip_addr.ips.size()];
}
std::string DNS_Cache::lookup(const std::string& hostname, size_t* ip_count) const {
if (!enabled) return "";
if (!enabled) {
if (ip_count)
*ip_count = 0;
return "";
}
std::string ip;

@ -2837,10 +2837,21 @@ void* PgSQL_Monitor::monitor_dns_cache() {
bool dns_cache_enable = true;
// Per-instance Thread variable refresher. Without this the pgsql_thread___
// globals stay at their startup defaults inside this worker.
unsigned int local_thread_vars_version = 0;
// globals stay at their startup defaults inside this worker. Refresh
// once up-front so the first loop iteration sees the configured TTL /
// refresh interval rather than zero-initialized values — the
// version-bump check below then only fires on later config changes.
std::unique_ptr<PgSQL_Thread> pgsql_thr { new PgSQL_Thread() };
pgsql_thr->curtime = monotonic_time();
pgsql_thr->refresh_variables();
unsigned int local_thread_vars_version = GloPTH ? GloPTH->get_global_version() : 0;
if (pgsql_thread___monitor_local_dns_cache_ttl == 0 ||
pgsql_thread___monitor_local_dns_cache_refresh_interval == 0) {
dns_cache_enable = false;
dns_cache->set_enabled_flag(false);
} else {
dns_cache->set_enabled_flag(true);
}
std::list<DNS_Cache_Record> dns_records_bookkeeping;
wqueue<DNS_Resolve_Data*> dns_resolver_queue;

@ -156,7 +156,7 @@ int main(int /*argc*/, char** /*argv*/) {
return -1;
}
plan(16);
plan(17);
PGConnPtr admin = admin_connect();
if (!admin) {
@ -324,23 +324,31 @@ int main(int /*argc*/, char** /*argv*/) {
// Step 7: Disabled cache (refresh_interval=0) flatlines counters
// =====================================================================
diag("---- Step 7: Cache disabled by refresh_interval=0");
admin_exec(admin, "DELETE FROM pgsql_servers WHERE hostgroup_id=999");
admin_exec(admin, "LOAD PGSQL SERVERS TO RUNTIME");
sleep_seconds(1);
admin_exec(admin, "SET pgsql-monitor_local_dns_cache_refresh_interval=0");
admin_exec(admin, "LOAD PGSQL VARIABLES TO RUNTIME");
// Use an IP literal so any client connect attempt does not invoke
// DNS code at all — keeps the assertion clean.
admin_exec(admin, "DELETE FROM pgsql_servers WHERE hostgroup_id=999");
// Use a resolvable hostname so the assertion exercises the actual cache
// path — if disabling the cache regressed (resolver still ran, or the
// connect path still called into the cache), record_updated and queried
// would move and the test would fail. An IP literal would bypass DNS
// regardless of refresh_interval and miss that regression.
admin_exec(admin,
"INSERT INTO pgsql_servers (hostgroup_id,hostname,port,max_connections,comment) "
"VALUES (999,'0.0.0.0',7861,10,'pgsql-dns-test cache-off')");
"VALUES (999,'example.com',7861,10,'pgsql-dns-test cache-off')");
admin_exec(admin, "LOAD PGSQL SERVERS TO RUNTIME");
sleep_seconds(2);
sleep_seconds(3);
before = read_pg_counters(admin);
hammer_proxy(3);
sleep_seconds(1);
sleep_seconds(2);
after = read_pg_counters(admin);
ok(after.queried == before.queried,
"cache off: dns_cache_queried unchanged (%ld -> %ld)",
before.queried, after.queried);
ok(after.lookup_success == before.lookup_success,
"cache off: dns_cache_lookup_success unchanged (%ld -> %ld)",
before.lookup_success, after.lookup_success);
ok(after.record_updated == before.record_updated,
"cache off: dns_cache_record_updated unchanged (%ld -> %ld)",
before.record_updated, after.record_updated);

Loading…
Cancel
Save