You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/api_call_tracker.py

315 lines
12 KiB

"""
Centralized API call tracker for all enrichment services.
Tracks actual API calls (not items processed) with rolling timestamps
for real-time rate monitoring and minute-bucketed history for 24-hour graphs.
Thread-safe, persists 24h history to disk on shutdown and restores on startup.
"""
import json
import os
import threading
import time
from collections import deque, defaultdict
# Known rate limits per service (calls/minute)
RATE_LIMITS = {
'spotify': 171, # MIN_API_INTERVAL=0.35s → ~171/min
'itunes': 20, # MIN_API_INTERVAL=3.0s → ~20/min
'deezer': 60, # MIN_API_INTERVAL=1.0s → ~60/min
'lastfm': 300, # MIN_API_INTERVAL=0.2s → ~300/min
'genius': 30, # MIN_API_INTERVAL=2.0s → ~30/min
'musicbrainz': 60, # MIN_API_INTERVAL=1.0s → ~60/min
'audiodb': 30, # MIN_API_INTERVAL=2.0s → ~30/min
'tidal': 120, # MIN_API_INTERVAL=0.5s → ~120/min
'qobuz': 60, # Variable throttle, ~60/min estimate
'discogs': 60, # MIN_API_INTERVAL=1.0s with auth → ~60/min
}
# Display names for UI
SERVICE_LABELS = {
'spotify': 'Spotify',
'itunes': 'Apple Music',
'deezer': 'Deezer',
'lastfm': 'Last.fm',
'genius': 'Genius',
'musicbrainz': 'MusicBrainz',
'audiodb': 'AudioDB',
'tidal': 'Tidal',
'qobuz': 'Qobuz',
'discogs': 'Discogs',
}
# Display order
SERVICE_ORDER = [
'spotify', 'itunes', 'deezer', 'lastfm', 'genius',
'musicbrainz', 'audiodb', 'tidal', 'qobuz', 'discogs',
]
_PERSIST_PATH = os.path.join('database', 'api_call_history.json')
class ApiCallTracker:
"""Centralized tracker for actual API calls across all enrichment services."""
def __init__(self):
self._lock = threading.Lock()
# Recent call timestamps per service (last 60s window for speedometer)
# maxlen=600 covers 10 calls/sec for 60s — more than any service does
self._recent_calls = defaultdict(lambda: deque(maxlen=600))
# 24-hour minute-bucketed history per service
# Each entry: (minute_floor_timestamp, call_count)
self._minute_history = defaultdict(lambda: deque(maxlen=1440))
self._current_minute_counts = defaultdict(int)
self._current_minute_ts = {}
# Rate limit event log — records bans, peaks, escalations
# Each entry: {ts, event, service, endpoint, duration, detail}
self._events = deque(maxlen=200)
# Restore persisted history from disk
self._load()
def record_call(self, service_key, endpoint=None):
"""Record an API call. Called from rate_limited decorators.
Args:
service_key: Service identifier ('spotify', 'itunes', etc.)
endpoint: Optional endpoint name for per-endpoint tracking (Spotify only)
"""
now = time.time()
minute_floor = int(now // 60) * 60
with self._lock:
# Record in recent timestamps
self._recent_calls[service_key].append(now)
# Roll minute bucket
self._roll_minute(service_key, minute_floor)
# Spotify per-endpoint tracking
if endpoint and service_key == 'spotify':
ep_key = f"spotify:{endpoint}"
self._recent_calls[ep_key].append(now)
self._roll_minute(ep_key, minute_floor)
def _roll_minute(self, key, minute_floor):
"""Roll the minute bucket forward if we've moved to a new minute.
Must be called while holding self._lock."""
prev_ts = self._current_minute_ts.get(key)
if prev_ts is None or minute_floor > prev_ts:
# Flush previous minute's count to history
if prev_ts is not None and self._current_minute_counts[key] > 0:
self._minute_history[key].append((prev_ts, self._current_minute_counts[key]))
# Fill gaps with zeros (if minutes were skipped)
if prev_ts is not None:
gap_start = prev_ts + 60
while gap_start < minute_floor:
self._minute_history[key].append((gap_start, 0))
gap_start += 60
# Start new minute
self._current_minute_ts[key] = minute_floor
self._current_minute_counts[key] = 1
else:
self._current_minute_counts[key] += 1
def record_event(self, service_key, event_type, detail='', endpoint='', duration=0):
"""Record a rate limit event (ban, escalation, cooldown, etc.).
Called from spotify_client.py when rate limits are detected."""
with self._lock:
self._events.append({
'ts': time.time(),
'event': event_type,
'service': service_key,
'endpoint': endpoint,
'duration': duration,
'detail': detail,
})
def get_events(self, since=None):
"""Get rate limit events, optionally filtered by timestamp."""
cutoff = since or (time.time() - 86400)
with self._lock:
return [e for e in self._events if e['ts'] >= cutoff]
def get_debug_summary(self):
"""Build a comprehensive debug summary for Copy Debug Info.
Includes 24h totals, peaks, rate limit events, and per-endpoint breakdown."""
now = time.time()
cutoff_24h = now - 86400
summary = {}
with self._lock:
for svc in SERVICE_ORDER:
# 24h total calls
total = 0
peak_cpm = 0
peak_ts = 0
for ts, count in self._minute_history.get(svc, []):
if ts >= cutoff_24h:
total += count
if count > peak_cpm:
peak_cpm = count
peak_ts = ts
# Include current minute
cur_ts = self._current_minute_ts.get(svc)
cur_count = self._current_minute_counts.get(svc, 0)
if cur_ts and cur_ts >= cutoff_24h:
total += cur_count
if cur_count > peak_cpm:
peak_cpm = cur_count
peak_ts = cur_ts
if total == 0:
continue
entry = {
'total_24h': total,
'peak_cpm': peak_cpm,
'limit_cpm': RATE_LIMITS.get(svc, 60),
}
if peak_ts:
entry['peak_at'] = time.strftime('%Y-%m-%d %H:%M', time.localtime(peak_ts))
summary[svc] = entry
# Spotify per-endpoint breakdown
if svc == 'spotify':
ep_totals = {}
for key in list(self._minute_history.keys()):
if key.startswith('spotify:'):
ep_name = key[8:]
ep_total = sum(c for ts, c in self._minute_history[key] if ts >= cutoff_24h)
cur = self._current_minute_counts.get(key, 0)
ep_total += cur
if ep_total > 0:
ep_totals[ep_name] = ep_total
if ep_totals:
summary[svc]['endpoints'] = ep_totals
# Rate limit events
events = [e for e in self._events if e['ts'] >= cutoff_24h]
if events:
summary['_rate_limit_events'] = [{
'time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(e['ts'])),
'event': e['event'],
'service': e['service'],
'endpoint': e.get('endpoint', ''),
'duration': e.get('duration', 0),
'detail': e.get('detail', ''),
} for e in events[-20:]] # Last 20 events
return summary
def get_calls_per_minute(self, service_key):
"""Get current calls/minute rate from last 60 seconds."""
now = time.time()
cutoff = now - 60.0
with self._lock:
dq = self._recent_calls.get(service_key)
if not dq:
return 0.0
count = sum(1 for ts in dq if ts > cutoff)
return float(count)
def get_24h_history(self, service_key):
"""Return list of [minute_timestamp, count] for last 24 hours.
Includes the current in-progress minute."""
now = time.time()
cutoff = now - 86400
with self._lock:
history = []
for ts, count in self._minute_history.get(service_key, []):
if ts >= cutoff:
history.append([ts, count])
# Include current minute in progress
cur_ts = self._current_minute_ts.get(service_key)
cur_count = self._current_minute_counts.get(service_key, 0)
if cur_ts is not None and cur_count > 0 and cur_ts >= cutoff:
history.append([cur_ts, cur_count])
return history
def get_all_rates(self):
"""Get current rates for all services. Used by WebSocket emission."""
result = {}
for svc in SERVICE_ORDER:
cpm = self.get_calls_per_minute(svc)
entry = {
'cpm': round(cpm, 1),
'limit': RATE_LIMITS.get(svc, 60),
}
# Spotify per-endpoint breakdown
if svc == 'spotify':
endpoints = {}
for key in list(self._recent_calls.keys()):
if key.startswith('spotify:'):
ep_name = key[8:] # strip 'spotify:'
ep_cpm = self.get_calls_per_minute(key)
if ep_cpm > 0:
endpoints[ep_name] = round(ep_cpm, 1)
entry['endpoints'] = endpoints
result[svc] = entry
return result
def save(self):
"""Persist 24h minute history to disk. Call on shutdown."""
try:
now = time.time()
cutoff = now - 86400
data = {}
with self._lock:
for key, hist in self._minute_history.items():
entries = [[ts, count] for ts, count in hist if ts >= cutoff]
# Include current in-progress minute
cur_ts = self._current_minute_ts.get(key)
cur_count = self._current_minute_counts.get(key, 0)
if cur_ts is not None and cur_count > 0 and cur_ts >= cutoff:
entries.append([cur_ts, cur_count])
if entries:
data[key] = entries
events = [dict(e) for e in self._events if e['ts'] >= cutoff]
with open(_PERSIST_PATH, 'w') as f:
json.dump({'ts': now, 'history': data, 'events': events}, f)
except Exception as e:
print(f"[ApiCallTracker] Failed to save history: {e}")
def _load(self):
"""Restore 24h minute history from disk. Called on init."""
try:
if not os.path.exists(_PERSIST_PATH):
return
with open(_PERSIST_PATH, 'r') as f:
raw = json.load(f)
saved_ts = raw.get('ts', 0)
# Only restore if saved within last 24h
if time.time() - saved_ts > 86400:
return
history = raw.get('history', {})
events = raw.get('events', [])
cutoff = time.time() - 86400
with self._lock:
for key, entries in history.items():
for ts, count in entries:
if ts >= cutoff:
self._minute_history[key].append((ts, count))
for e in events:
if e.get('ts', 0) >= cutoff:
self._events.append(e)
print(f"[ApiCallTracker] Restored history for {len(history)} services, {len(events)} events")
except Exception as e:
print(f"[ApiCallTracker] Failed to load history: {e}")
# Singleton instance
api_call_tracker = ApiCallTracker()