Use nonces and drain interleaved stats in Hydrabase

Use UUID nonces to correlate requests/responses and robustly handle interleaved stats/heartbeat messages from the Hydrabase server. Adds _extract_stats and _extract_results helpers, records last_peer_count and timestamp, and loops on recv() with timeouts to drain non-result messages until the matching response (or a results message without a nonce) arrives. Mirrors the same nonce/send-and-drain logic in HydrabaseWorker, adds necessary imports (time, uuid), and improves logging and timeout handling to avoid returning stale or misattributed data.
pull/165/head
Broque Thomas 3 months ago
parent 7261b04950
commit 63624a4f6e

@ -9,6 +9,8 @@ and iTunesClient (Track, Artist, Album).
import json
import logging
import re
import time
import uuid
from typing import List, Optional, Callable, Tuple
from core.itunes_client import Track, Artist, Album
@ -44,8 +46,35 @@ class HydrabaseClient:
except Exception:
return False
def _extract_stats(self, data):
"""Extract peer stats from any message that contains them."""
if isinstance(data, dict) and 'stats' in data:
stats = data['stats']
if isinstance(stats, dict) and 'connectedPeers' in stats:
self.last_peer_count = stats['connectedPeers']
self.last_peer_count_time = time.time()
@staticmethod
def _extract_results(data) -> Optional[list]:
"""Extract results array from a response dict. Returns None if not a results message."""
if not isinstance(data, dict):
return None
if 'response' in data:
resp = data['response']
return resp if isinstance(resp, list) else [resp]
if 'results' in data:
return data['results']
if 'data' in data:
result = data['data']
return result if isinstance(result, list) else [result]
return None
def _send_and_recv(self, request_type: str, query: str) -> Optional[list]:
"""Send a search request and return the response array."""
"""Send a search request and return the response array.
Uses a nonce for request-response correlation and loops on recv()
to drain any interleaved stats/heartbeat messages from the server.
"""
ws, lock = self.get_ws_and_lock()
if ws is None:
return None
@ -55,50 +84,58 @@ class HydrabaseClient:
except Exception:
return None
nonce = uuid.uuid4().hex
payload = json.dumps({
'request': {
'type': request_type,
'query': query
}
},
'nonce': nonce
})
try:
with lock:
ws.settimeout(self.timeout)
ws.send(payload)
raw = ws.recv()
data = json.loads(raw)
# Extract stats if present (can appear alongside results)
if isinstance(data, dict) and 'stats' in data:
stats = data['stats']
if isinstance(stats, dict) and 'connectedPeers' in stats:
import time
self.last_peer_count = stats['connectedPeers']
self.last_peer_count_time = time.time()
# Handle various response shapes
if isinstance(data, list):
return data
if isinstance(data, dict):
# Hydrabase returns results under "response" key
if 'response' in data:
resp = data['response']
return resp if isinstance(resp, list) else [resp]
if 'results' in data:
return data['results']
if 'data' in data:
result = data['data']
return result if isinstance(result, list) else [result]
# Stats-only or empty response — no search results
if 'stats' in data:
logger.debug(f"Hydrabase stats-only response for ({request_type}, '{query}')")
return []
# Unknown shape — return empty rather than wrapping garbage
logger.debug(f"Hydrabase unexpected response shape for ({request_type}, '{query}'): {list(data.keys()) if isinstance(data, dict) else type(data).__name__}")
return []
deadline = time.time() + self.timeout
while True:
remaining = deadline - time.time()
if remaining <= 0:
logger.warning(f"Hydrabase response timeout for ({request_type}, '{query}')")
return None
ws.settimeout(remaining)
raw = ws.recv()
data = json.loads(raw)
# Always extract stats from any message
self._extract_stats(data)
# Bare list — results with no envelope
if isinstance(data, list):
return data
if not isinstance(data, dict):
continue
# Response has our nonce — definitely ours
if data.get('nonce') == nonce:
results = self._extract_results(data)
return results if results is not None else []
# Response has results but no nonce (server doesn't echo nonces)
if 'nonce' not in data:
results = self._extract_results(data)
if results is not None:
return results
# Stats-only message with no nonce — skip and recv again
logger.debug(f"Hydrabase draining non-result message for ({request_type}, '{query}')")
continue
# Has a nonce but not ours — stale response, skip it
logger.debug(f"Hydrabase draining stale nonce response for ({request_type}, '{query}')")
except Exception as e:
logger.error(f"Hydrabase query failed ({request_type}, '{query}'): {e}")
return None

@ -12,6 +12,7 @@ import logging
import queue
import threading
import time
import uuid
logger = logging.getLogger(__name__)
@ -125,11 +126,13 @@ class HydrabaseWorker:
self.stats['dropped'] += 1
return
nonce = uuid.uuid4().hex
payload = json.dumps({
'request': {
'type': item['type'],
'query': item['query']
}
},
'nonce': nonce
})
try:
@ -137,8 +140,31 @@ class HydrabaseWorker:
if not ws.connected:
self.stats['dropped'] += 1
return
ws.settimeout(15)
ws.send(payload)
ws.recv() # Required by protocol, response discarded
# Loop to drain stats/heartbeat messages until we get our response
deadline = time.time() + 15
while True:
remaining = deadline - time.time()
if remaining <= 0:
break
ws.settimeout(remaining)
raw = ws.recv()
data = json.loads(raw)
# Got our nonce back — done
if isinstance(data, dict) and data.get('nonce') == nonce:
break
# Got results without nonce — assume it's ours
if isinstance(data, dict) and 'nonce' not in data:
if 'response' in data or 'results' in data or 'data' in data:
break
# Bare list — results
if isinstance(data, list):
break
# Stats/heartbeat — drain and recv again
self.stats['sent'] += 1
except Exception as e:
logger.debug(f"Hydrabase send failed: {e}")

Loading…
Cancel
Save