From 63624a4f6ea65fd31c7f025ea20987412862f553 Mon Sep 17 00:00:00 2001 From: Broque Thomas Date: Tue, 24 Feb 2026 19:16:14 -0800 Subject: [PATCH] Use nonces and drain interleaved stats in Hydrabase Use UUID nonces to correlate requests/responses and robustly handle interleaved stats/heartbeat messages from the Hydrabase server. Adds _extract_stats and _extract_results helpers, records last_peer_count and timestamp, and loops on recv() with timeouts to drain non-result messages until the matching response (or a results message without a nonce) arrives. Mirrors the same nonce/send-and-drain logic in HydrabaseWorker, adds necessary imports (time, uuid), and improves logging and timeout handling to avoid returning stale or misattributed data. --- core/hydrabase_client.py | 107 ++++++++++++++++++++++++++------------- core/hydrabase_worker.py | 30 ++++++++++- 2 files changed, 100 insertions(+), 37 deletions(-) diff --git a/core/hydrabase_client.py b/core/hydrabase_client.py index 86956f7b..2578ab09 100644 --- a/core/hydrabase_client.py +++ b/core/hydrabase_client.py @@ -9,6 +9,8 @@ and iTunesClient (Track, Artist, Album). import json import logging import re +import time +import uuid from typing import List, Optional, Callable, Tuple from core.itunes_client import Track, Artist, Album @@ -44,8 +46,35 @@ class HydrabaseClient: except Exception: return False + def _extract_stats(self, data): + """Extract peer stats from any message that contains them.""" + if isinstance(data, dict) and 'stats' in data: + stats = data['stats'] + if isinstance(stats, dict) and 'connectedPeers' in stats: + self.last_peer_count = stats['connectedPeers'] + self.last_peer_count_time = time.time() + + @staticmethod + def _extract_results(data) -> Optional[list]: + """Extract results array from a response dict. Returns None if not a results message.""" + if not isinstance(data, dict): + return None + if 'response' in data: + resp = data['response'] + return resp if isinstance(resp, list) else [resp] + if 'results' in data: + return data['results'] + if 'data' in data: + result = data['data'] + return result if isinstance(result, list) else [result] + return None + def _send_and_recv(self, request_type: str, query: str) -> Optional[list]: - """Send a search request and return the response array.""" + """Send a search request and return the response array. + + Uses a nonce for request-response correlation and loops on recv() + to drain any interleaved stats/heartbeat messages from the server. + """ ws, lock = self.get_ws_and_lock() if ws is None: return None @@ -55,50 +84,58 @@ class HydrabaseClient: except Exception: return None + nonce = uuid.uuid4().hex payload = json.dumps({ 'request': { 'type': request_type, 'query': query - } + }, + 'nonce': nonce }) try: with lock: ws.settimeout(self.timeout) ws.send(payload) - raw = ws.recv() - - data = json.loads(raw) - - # Extract stats if present (can appear alongside results) - if isinstance(data, dict) and 'stats' in data: - stats = data['stats'] - if isinstance(stats, dict) and 'connectedPeers' in stats: - import time - self.last_peer_count = stats['connectedPeers'] - self.last_peer_count_time = time.time() - - # Handle various response shapes - if isinstance(data, list): - return data - if isinstance(data, dict): - # Hydrabase returns results under "response" key - if 'response' in data: - resp = data['response'] - return resp if isinstance(resp, list) else [resp] - if 'results' in data: - return data['results'] - if 'data' in data: - result = data['data'] - return result if isinstance(result, list) else [result] - # Stats-only or empty response — no search results - if 'stats' in data: - logger.debug(f"Hydrabase stats-only response for ({request_type}, '{query}')") - return [] - - # Unknown shape — return empty rather than wrapping garbage - logger.debug(f"Hydrabase unexpected response shape for ({request_type}, '{query}'): {list(data.keys()) if isinstance(data, dict) else type(data).__name__}") - return [] + + deadline = time.time() + self.timeout + while True: + remaining = deadline - time.time() + if remaining <= 0: + logger.warning(f"Hydrabase response timeout for ({request_type}, '{query}')") + return None + + ws.settimeout(remaining) + raw = ws.recv() + data = json.loads(raw) + + # Always extract stats from any message + self._extract_stats(data) + + # Bare list — results with no envelope + if isinstance(data, list): + return data + + if not isinstance(data, dict): + continue + + # Response has our nonce — definitely ours + if data.get('nonce') == nonce: + results = self._extract_results(data) + return results if results is not None else [] + + # Response has results but no nonce (server doesn't echo nonces) + if 'nonce' not in data: + results = self._extract_results(data) + if results is not None: + return results + # Stats-only message with no nonce — skip and recv again + logger.debug(f"Hydrabase draining non-result message for ({request_type}, '{query}')") + continue + + # Has a nonce but not ours — stale response, skip it + logger.debug(f"Hydrabase draining stale nonce response for ({request_type}, '{query}')") + except Exception as e: logger.error(f"Hydrabase query failed ({request_type}, '{query}'): {e}") return None diff --git a/core/hydrabase_worker.py b/core/hydrabase_worker.py index e9558cce..f6e44c6b 100644 --- a/core/hydrabase_worker.py +++ b/core/hydrabase_worker.py @@ -12,6 +12,7 @@ import logging import queue import threading import time +import uuid logger = logging.getLogger(__name__) @@ -125,11 +126,13 @@ class HydrabaseWorker: self.stats['dropped'] += 1 return + nonce = uuid.uuid4().hex payload = json.dumps({ 'request': { 'type': item['type'], 'query': item['query'] - } + }, + 'nonce': nonce }) try: @@ -137,8 +140,31 @@ class HydrabaseWorker: if not ws.connected: self.stats['dropped'] += 1 return + ws.settimeout(15) ws.send(payload) - ws.recv() # Required by protocol, response discarded + + # Loop to drain stats/heartbeat messages until we get our response + deadline = time.time() + 15 + while True: + remaining = deadline - time.time() + if remaining <= 0: + break + ws.settimeout(remaining) + raw = ws.recv() + data = json.loads(raw) + + # Got our nonce back — done + if isinstance(data, dict) and data.get('nonce') == nonce: + break + # Got results without nonce — assume it's ours + if isinstance(data, dict) and 'nonce' not in data: + if 'response' in data or 'results' in data or 'data' in data: + break + # Bare list — results + if isinstance(data, list): + break + # Stats/heartbeat — drain and recv again + self.stats['sent'] += 1 except Exception as e: logger.debug(f"Hydrabase send failed: {e}")