Wishlist: serialize album-bundle downloads so they stop flooding the search pool (Sokhi #740)

Sokhi: "downloads searching for way too many tracks at once" — a wishlist run
that fanned out into ~one batch per album. Verified the actual search/download
concurrency IS capped at 3 (single shared missing_download_executor), so it
wasn't really hammering slskd — but the display showed ~20 "searching" and the
batch list was a mess.

Root cause: run_full_missing_tracks_process was supposed to "block its album-pool
worker for the whole search+download" (that's what the dedicated album_bundle_
executor is for), but it RETURNED the instant it had STARTED the downloads. So
the album pool only throttled the fast analysis phase — every album batch blew
through analysis and immediately dumped its tracks into the shared download pool,
all pre-marked 'searching'. The intended serialization never happened.

Fix: add serialize= to run_full_missing_tracks_process. Album-bundle batches
(dispatched on album_bundle_executor) pass serialize=True and now hold their pool
slot via _wait_for_batch_drain() until every task in the batch reaches a terminal
state — so only ~N albums are in flight at once. The wait is passive (downloads
are driven by the monitor + completion callbacks on other threads, so no
deadlock) and bails on shutdown, a removed batch, or a safety cap. The residual /
playlist / manual paths run on the SHARED pool and pass serialize=False (blocking
there would steal a real download worker), so they're unchanged.

Tests: _wait_for_batch_drain returns immediately when all-terminal, waits until
tasks finish, bails on shutdown, respects the cap, handles a missing batch. 975
download/wishlist tests pass (only the pre-existing soundcloud /app failures).
pull/848/head
BoulderBadgeDad 3 days ago
parent 6fa956d63a
commit 8d133ecd60

@ -315,7 +315,52 @@ class _BatchStateAccessImpl:
row['album_bundle_state'] = 'failed'
def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: MasterDeps):
# Task states that mean a batch still has work in flight. While ANY of a batch's
# tasks is in one of these, a serialized album-pool worker keeps its slot.
_NON_TERMINAL_TASK_STATUSES = ('pending', 'queued', 'searching', 'downloading', 'post_processing')
def _wait_for_batch_drain(batch_id: str, poll_seconds: float = 1.5,
max_wait_seconds: float = 3600.0) -> None:
"""Block until every task in ``batch_id`` reaches a terminal state (the batch
is fully drained), the batch is removed, shutdown is requested, or a safety
cap elapses.
Used to make the dedicated album-bundle pool actually SERIALIZE albums: the
worker holds its pool slot for the album's whole lifetime instead of
returning the instant downloads are started. That stops every album from
dumping its tracks into the shared download pool at once (Sokhi: "searching
for way too many tracks at once"). It's a PASSIVE wait — the downloads are
driven by the monitor + completion callbacks on other threads, so this never
drives the work and can't deadlock; worst case the cap releases the slot and
the downloads simply finish in the background."""
from core.downloads import monitor as _monitor
start = time.time()
while True:
if getattr(_monitor, 'IS_SHUTTING_DOWN', False):
return
with tasks_lock:
batch = download_batches.get(batch_id)
if not batch:
return
queue = list(batch.get('queue', ()) or ())
still_working = any(
download_tasks.get(t, {}).get('status') in _NON_TERMINAL_TASK_STATUSES
for t in queue
)
if not still_working:
return
if time.time() - start > max_wait_seconds:
logger.warning(
"[Album Serialize] batch %s not drained after %.0fs — releasing the "
"album-pool slot (its downloads continue in the background)",
batch_id, max_wait_seconds)
return
time.sleep(poll_seconds)
def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: MasterDeps,
serialize: bool = False):
"""
A master worker that handles the entire missing tracks process:
1. Runs the analysis.
@ -1150,6 +1195,16 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma
deps.download_monitor.start_monitoring(batch_id)
deps.start_next_batch_of_downloads(batch_id)
# Album-bundle batches run on the dedicated album pool and pass
# serialize=True: hold this pool slot until the album finishes so only a
# few albums are ever in flight at once, instead of every album batch
# immediately starting and flooding the shared download pool with
# 'searching' tracks (#740 / Sokhi). The residual + playlist + manual
# paths run on the shared download pool and DON'T serialize (blocking
# there would steal an actual download worker).
if serialize:
_wait_for_batch_drain(batch_id)
except Exception as e:
logger.error(f"Master worker for batch {batch_id} failed: {e}")
import traceback

@ -61,7 +61,7 @@ class WishlistAutoProcessingRuntime:
update_automation_progress: Callable[..., Any]
automation_engine: Any
missing_download_executor: Any
run_full_missing_tracks_process: Callable[[str, str, list[dict[str, Any]]], Any]
run_full_missing_tracks_process: Callable[..., Any] # (batch_id, playlist_id, tracks, serialize=False)
get_batch_max_concurrent: Callable[[], int]
get_active_server: Callable[[], str]
current_time_fn: Callable[[], float]
@ -246,11 +246,14 @@ def _run_wishlist_cycle(
f"'{album_name}' ({len(group.tracks)} tracks) → {album_batch_id}"
)
submitted.append(album_batch_id)
# Album bundles block their worker for the whole search+download → dedicated
# pool (falls back to the shared pool when unset). See #740.
# Album bundles block their worker for the whole search+download →
# dedicated pool (falls back to the shared pool when unset). serialize=True
# makes the worker actually HOLD its pool slot until the album drains, so
# only a few albums are in flight at once instead of every album flooding
# the shared download pool with 'searching' tracks (#740 / Sokhi).
album_executor.submit(
runtime.run_full_missing_tracks_process,
album_batch_id, playlist_id, group.tracks,
album_batch_id, playlist_id, group.tracks, True,
)
residual_tracks = grouping.residual_tracks if grouping is not None else tracks
@ -626,7 +629,7 @@ class WishlistManualDownloadRuntime:
download_batches: Dict[str, Dict[str, Any]]
tasks_lock: Any
missing_download_executor: Any
run_full_missing_tracks_process: Callable[[str, str, list[dict[str, Any]]], Any]
run_full_missing_tracks_process: Callable[..., Any] # (batch_id, playlist_id, tracks, serialize=False)
get_batch_max_concurrent: Callable[[], int]
add_activity_item: Callable[[Any, Any, Any, Any], Any]
active_server: str

@ -0,0 +1,73 @@
"""Album-bundle serialization wait (#740 / Sokhi "too many searching").
_wait_for_batch_drain holds the album-pool worker until the batch's tasks all
reach a terminal state so only a few albums are in flight at once instead of
every album flooding the shared download pool. It's a passive wait that must
also bail on shutdown / a removed batch / a safety cap.
"""
import threading
import time
import pytest
from core.runtime_state import download_batches, download_tasks, tasks_lock
from core.downloads import master, monitor
def _set_batch(bid, task_statuses):
with tasks_lock:
download_batches[bid] = {'queue': list(task_statuses.keys())}
for tid, st in task_statuses.items():
download_tasks[tid] = {'status': st}
@pytest.fixture(autouse=True)
def _cleanup():
yield
with tasks_lock:
for bid in ('b1', 'b2', 'b3', 'b4'):
download_batches.pop(bid, None)
for tid in ('t1', 't2', 't3'):
download_tasks.pop(tid, None)
def test_returns_immediately_when_all_terminal():
_set_batch('b1', {'t1': 'completed', 't2': 'failed', 't3': 'not_found'})
start = time.time()
master._wait_for_batch_drain('b1', poll_seconds=0.05, max_wait_seconds=5)
assert time.time() - start < 1.0 # nothing in flight → no block
def test_returns_when_batch_missing():
master._wait_for_batch_drain('nope', poll_seconds=0.05, max_wait_seconds=5) # no hang
def test_waits_until_tasks_go_terminal():
_set_batch('b2', {'t1': 'searching', 't2': 'downloading'})
def finish():
time.sleep(0.25)
with tasks_lock:
download_tasks['t1']['status'] = 'completed'
download_tasks['t2']['status'] = 'failed'
threading.Thread(target=finish, daemon=True).start()
start = time.time()
master._wait_for_batch_drain('b2', poll_seconds=0.05, max_wait_seconds=5)
assert 0.2 < time.time() - start < 3.0 # held the slot until they finished
def test_bails_on_shutdown(monkeypatch):
_set_batch('b3', {'t1': 'searching'}) # never terminal
monkeypatch.setattr(monitor, 'IS_SHUTTING_DOWN', True)
start = time.time()
master._wait_for_batch_drain('b3', poll_seconds=0.05, max_wait_seconds=10)
assert time.time() - start < 1.0 # didn't block app shutdown
def test_respects_safety_cap():
_set_batch('b4', {'t1': 'searching'}) # never terminal
start = time.time()
master._wait_for_batch_drain('b4', poll_seconds=0.05, max_wait_seconds=0.3)
assert 0.3 <= time.time() - start < 2.0 # released the slot after the cap

@ -17150,9 +17150,9 @@ def _build_master_deps():
def _run_full_missing_tracks_process(batch_id, playlist_id, tracks_json):
def _run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, serialize=False):
return _downloads_master.run_full_missing_tracks_process(
batch_id, playlist_id, tracks_json, _build_master_deps()
batch_id, playlist_id, tracks_json, _build_master_deps(), serialize=serialize
)

Loading…
Cancel
Save