From 8d133ecd60c7d293c2175aea5f79329c84899566 Mon Sep 17 00:00:00 2001 From: BoulderBadgeDad Date: Tue, 9 Jun 2026 10:06:53 -0700 Subject: [PATCH] Wishlist: serialize album-bundle downloads so they stop flooding the search pool (Sokhi #740) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sokhi: "downloads searching for way too many tracks at once" — a wishlist run that fanned out into ~one batch per album. Verified the actual search/download concurrency IS capped at 3 (single shared missing_download_executor), so it wasn't really hammering slskd — but the display showed ~20 "searching" and the batch list was a mess. Root cause: run_full_missing_tracks_process was supposed to "block its album-pool worker for the whole search+download" (that's what the dedicated album_bundle_ executor is for), but it RETURNED the instant it had STARTED the downloads. So the album pool only throttled the fast analysis phase — every album batch blew through analysis and immediately dumped its tracks into the shared download pool, all pre-marked 'searching'. The intended serialization never happened. Fix: add serialize= to run_full_missing_tracks_process. Album-bundle batches (dispatched on album_bundle_executor) pass serialize=True and now hold their pool slot via _wait_for_batch_drain() until every task in the batch reaches a terminal state — so only ~N albums are in flight at once. The wait is passive (downloads are driven by the monitor + completion callbacks on other threads, so no deadlock) and bails on shutdown, a removed batch, or a safety cap. The residual / playlist / manual paths run on the SHARED pool and pass serialize=False (blocking there would steal a real download worker), so they're unchanged. Tests: _wait_for_batch_drain returns immediately when all-terminal, waits until tasks finish, bails on shutdown, respects the cap, handles a missing batch. 975 download/wishlist tests pass (only the pre-existing soundcloud /app failures). --- core/downloads/master.py | 57 ++++++++++++++- core/wishlist/processing.py | 13 ++-- tests/downloads/test_album_serialize_drain.py | 73 +++++++++++++++++++ web_server.py | 4 +- 4 files changed, 139 insertions(+), 8 deletions(-) create mode 100644 tests/downloads/test_album_serialize_drain.py diff --git a/core/downloads/master.py b/core/downloads/master.py index 56c99d90..ce30b8d5 100644 --- a/core/downloads/master.py +++ b/core/downloads/master.py @@ -315,7 +315,52 @@ class _BatchStateAccessImpl: row['album_bundle_state'] = 'failed' -def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: MasterDeps): +# Task states that mean a batch still has work in flight. While ANY of a batch's +# tasks is in one of these, a serialized album-pool worker keeps its slot. +_NON_TERMINAL_TASK_STATUSES = ('pending', 'queued', 'searching', 'downloading', 'post_processing') + + +def _wait_for_batch_drain(batch_id: str, poll_seconds: float = 1.5, + max_wait_seconds: float = 3600.0) -> None: + """Block until every task in ``batch_id`` reaches a terminal state (the batch + is fully drained), the batch is removed, shutdown is requested, or a safety + cap elapses. + + Used to make the dedicated album-bundle pool actually SERIALIZE albums: the + worker holds its pool slot for the album's whole lifetime instead of + returning the instant downloads are started. That stops every album from + dumping its tracks into the shared download pool at once (Sokhi: "searching + for way too many tracks at once"). It's a PASSIVE wait — the downloads are + driven by the monitor + completion callbacks on other threads, so this never + drives the work and can't deadlock; worst case the cap releases the slot and + the downloads simply finish in the background.""" + from core.downloads import monitor as _monitor + start = time.time() + while True: + if getattr(_monitor, 'IS_SHUTTING_DOWN', False): + return + with tasks_lock: + batch = download_batches.get(batch_id) + if not batch: + return + queue = list(batch.get('queue', ()) or ()) + still_working = any( + download_tasks.get(t, {}).get('status') in _NON_TERMINAL_TASK_STATUSES + for t in queue + ) + if not still_working: + return + if time.time() - start > max_wait_seconds: + logger.warning( + "[Album Serialize] batch %s not drained after %.0fs — releasing the " + "album-pool slot (its downloads continue in the background)", + batch_id, max_wait_seconds) + return + time.sleep(poll_seconds) + + +def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: MasterDeps, + serialize: bool = False): """ A master worker that handles the entire missing tracks process: 1. Runs the analysis. @@ -1150,6 +1195,16 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma deps.download_monitor.start_monitoring(batch_id) deps.start_next_batch_of_downloads(batch_id) + # Album-bundle batches run on the dedicated album pool and pass + # serialize=True: hold this pool slot until the album finishes so only a + # few albums are ever in flight at once, instead of every album batch + # immediately starting and flooding the shared download pool with + # 'searching' tracks (#740 / Sokhi). The residual + playlist + manual + # paths run on the shared download pool and DON'T serialize (blocking + # there would steal an actual download worker). + if serialize: + _wait_for_batch_drain(batch_id) + except Exception as e: logger.error(f"Master worker for batch {batch_id} failed: {e}") import traceback diff --git a/core/wishlist/processing.py b/core/wishlist/processing.py index b9aabad2..cac989fe 100644 --- a/core/wishlist/processing.py +++ b/core/wishlist/processing.py @@ -61,7 +61,7 @@ class WishlistAutoProcessingRuntime: update_automation_progress: Callable[..., Any] automation_engine: Any missing_download_executor: Any - run_full_missing_tracks_process: Callable[[str, str, list[dict[str, Any]]], Any] + run_full_missing_tracks_process: Callable[..., Any] # (batch_id, playlist_id, tracks, serialize=False) get_batch_max_concurrent: Callable[[], int] get_active_server: Callable[[], str] current_time_fn: Callable[[], float] @@ -246,11 +246,14 @@ def _run_wishlist_cycle( f"'{album_name}' ({len(group.tracks)} tracks) → {album_batch_id}" ) submitted.append(album_batch_id) - # Album bundles block their worker for the whole search+download → dedicated - # pool (falls back to the shared pool when unset). See #740. + # Album bundles block their worker for the whole search+download → + # dedicated pool (falls back to the shared pool when unset). serialize=True + # makes the worker actually HOLD its pool slot until the album drains, so + # only a few albums are in flight at once instead of every album flooding + # the shared download pool with 'searching' tracks (#740 / Sokhi). album_executor.submit( runtime.run_full_missing_tracks_process, - album_batch_id, playlist_id, group.tracks, + album_batch_id, playlist_id, group.tracks, True, ) residual_tracks = grouping.residual_tracks if grouping is not None else tracks @@ -626,7 +629,7 @@ class WishlistManualDownloadRuntime: download_batches: Dict[str, Dict[str, Any]] tasks_lock: Any missing_download_executor: Any - run_full_missing_tracks_process: Callable[[str, str, list[dict[str, Any]]], Any] + run_full_missing_tracks_process: Callable[..., Any] # (batch_id, playlist_id, tracks, serialize=False) get_batch_max_concurrent: Callable[[], int] add_activity_item: Callable[[Any, Any, Any, Any], Any] active_server: str diff --git a/tests/downloads/test_album_serialize_drain.py b/tests/downloads/test_album_serialize_drain.py new file mode 100644 index 00000000..364452ff --- /dev/null +++ b/tests/downloads/test_album_serialize_drain.py @@ -0,0 +1,73 @@ +"""Album-bundle serialization wait (#740 / Sokhi "too many searching"). + +_wait_for_batch_drain holds the album-pool worker until the batch's tasks all +reach a terminal state — so only a few albums are in flight at once instead of +every album flooding the shared download pool. It's a passive wait that must +also bail on shutdown / a removed batch / a safety cap. +""" + +import threading +import time + +import pytest + +from core.runtime_state import download_batches, download_tasks, tasks_lock +from core.downloads import master, monitor + + +def _set_batch(bid, task_statuses): + with tasks_lock: + download_batches[bid] = {'queue': list(task_statuses.keys())} + for tid, st in task_statuses.items(): + download_tasks[tid] = {'status': st} + + +@pytest.fixture(autouse=True) +def _cleanup(): + yield + with tasks_lock: + for bid in ('b1', 'b2', 'b3', 'b4'): + download_batches.pop(bid, None) + for tid in ('t1', 't2', 't3'): + download_tasks.pop(tid, None) + + +def test_returns_immediately_when_all_terminal(): + _set_batch('b1', {'t1': 'completed', 't2': 'failed', 't3': 'not_found'}) + start = time.time() + master._wait_for_batch_drain('b1', poll_seconds=0.05, max_wait_seconds=5) + assert time.time() - start < 1.0 # nothing in flight → no block + + +def test_returns_when_batch_missing(): + master._wait_for_batch_drain('nope', poll_seconds=0.05, max_wait_seconds=5) # no hang + + +def test_waits_until_tasks_go_terminal(): + _set_batch('b2', {'t1': 'searching', 't2': 'downloading'}) + + def finish(): + time.sleep(0.25) + with tasks_lock: + download_tasks['t1']['status'] = 'completed' + download_tasks['t2']['status'] = 'failed' + + threading.Thread(target=finish, daemon=True).start() + start = time.time() + master._wait_for_batch_drain('b2', poll_seconds=0.05, max_wait_seconds=5) + assert 0.2 < time.time() - start < 3.0 # held the slot until they finished + + +def test_bails_on_shutdown(monkeypatch): + _set_batch('b3', {'t1': 'searching'}) # never terminal + monkeypatch.setattr(monitor, 'IS_SHUTTING_DOWN', True) + start = time.time() + master._wait_for_batch_drain('b3', poll_seconds=0.05, max_wait_seconds=10) + assert time.time() - start < 1.0 # didn't block app shutdown + + +def test_respects_safety_cap(): + _set_batch('b4', {'t1': 'searching'}) # never terminal + start = time.time() + master._wait_for_batch_drain('b4', poll_seconds=0.05, max_wait_seconds=0.3) + assert 0.3 <= time.time() - start < 2.0 # released the slot after the cap diff --git a/web_server.py b/web_server.py index 9e7bc148..bd396d6c 100644 --- a/web_server.py +++ b/web_server.py @@ -17150,9 +17150,9 @@ def _build_master_deps(): -def _run_full_missing_tracks_process(batch_id, playlist_id, tracks_json): +def _run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, serialize=False): return _downloads_master.run_full_missing_tracks_process( - batch_id, playlist_id, tracks_json, _build_master_deps() + batch_id, playlist_id, tracks_json, _build_master_deps(), serialize=serialize )