fix: add periodic cleanup timer for api/request in-memory store

Inbound music requests are tracked in an in-memory _pending_requests

dict with a 1-hour TTL. Cleanup was only triggered inside

create_request(), so during quiet periods stale entries stayed in

memory until the next inbound request.

Add a background thread that wakes every 5 minutes and evicts any

entry older than _MAX_REQUEST_AGE. The thread is started once during

API blueprint registration (start_cleanup_thread is idempotent) and

is a daemon, so it exits automatically on process shutdown.

stop_cleanup_thread() is exposed for tests and future graceful-

shutdown hooks. It signals the stop event so the thread exits

without waiting for the next cleanup interval.
pull/330/head
JohnBaumb 4 weeks ago
parent af8a2ea31a
commit 06220a5a83

@ -43,6 +43,7 @@ def create_api_blueprint():
from .listenbrainz import register_routes as reg_listenbrainz
from .cache import register_routes as reg_cache
from .request import register_routes as reg_request
from .request import start_cleanup_thread as _start_request_cleanup
# ---- rate-limit only /api/v1 routes (not the whole app) ----
limiter.limit("60 per minute")(bp)
@ -62,6 +63,11 @@ def create_api_blueprint():
reg_cache(bp)
reg_request(bp)
# Start the periodic cleanup timer for in-memory request tracking so
# idle periods don't leave stale entries in memory. Idempotent across
# calls; safe with multi-blueprint registration.
_start_request_cleanup()
# ---- error handlers (scoped to this Blueprint) ----
@bp.errorhandler(400)
def _bad_request(e):

@ -23,6 +23,16 @@ _requests_lock = threading.Lock()
# Max age before auto-cleanup
_MAX_REQUEST_AGE = timedelta(hours=1)
# How often the background cleanup timer runs. Short enough to keep memory
# bounded during idle periods, long enough that slow-polling external clients
# still see their request for close to the TTL.
_CLEANUP_INTERVAL_SECONDS = 300 # 5 minutes
# Guards for the singleton background cleanup thread.
_cleanup_thread: "threading.Thread | None" = None
_cleanup_stop_event = threading.Event()
_cleanup_thread_lock = threading.Lock()
def _cleanup_old_requests():
"""Remove requests older than 1 hour to prevent unbounded growth."""
@ -32,6 +42,57 @@ def _cleanup_old_requests():
if r.get('created_at', datetime.now()) < cutoff]
for rid in expired:
del _pending_requests[rid]
return len(expired) if expired else 0
def _cleanup_loop():
"""Background thread: periodically evict expired requests."""
while not _cleanup_stop_event.is_set():
# wait() returns True if the event was set (shutdown), False on timeout
if _cleanup_stop_event.wait(timeout=_CLEANUP_INTERVAL_SECONDS):
return
try:
removed = _cleanup_old_requests()
if removed:
logger.debug(f"Request cleanup: evicted {removed} stale entries")
except Exception as e:
logger.warning(f"Request cleanup loop error: {e}")
def start_cleanup_thread() -> bool:
"""Start the background cleanup timer once per process.
Returns True if a new thread was started, False if one was already
running. Safe to call multiple times; callers in multi-worker setups
should still gate on worker identity if they want exactly one thread
across the entire deployment.
"""
global _cleanup_thread
with _cleanup_thread_lock:
if _cleanup_thread is not None and _cleanup_thread.is_alive():
return False
_cleanup_stop_event.clear()
_cleanup_thread = threading.Thread(
target=_cleanup_loop,
name="api-request-cleanup",
daemon=True,
)
_cleanup_thread.start()
logger.info("Started api/request cleanup timer (interval=%ss)" % _CLEANUP_INTERVAL_SECONDS)
return True
def stop_cleanup_thread(timeout: float = 2.0) -> None:
"""Signal the cleanup thread to exit. Used in tests and shutdown paths."""
global _cleanup_thread
with _cleanup_thread_lock:
thread = _cleanup_thread
_cleanup_stop_event.set()
if thread is not None and thread.is_alive():
thread.join(timeout=timeout)
with _cleanup_thread_lock:
_cleanup_thread = None
_cleanup_stop_event.clear()
def _run_search_and_download(request_id, query, notify_url):

Loading…
Cancel
Save