Merge pull request #742 from Nezreka/fix/wishlist-batch-jam-740

Fix #740: run wishlist album-bundle downloads on a dedicated pool
pull/748/head
BoulderBadgeDad 4 weeks ago committed by GitHub
commit 1801bfc8f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -67,6 +67,14 @@ class WishlistAutoProcessingRuntime:
current_time_fn: Callable[[], float]
profile_id: int = 1
logger: Any = module_logger
# Dedicated pool for the inline-blocking per-album bundle downloads.
# Album-bundle batches block their worker thread for the whole search +
# download; running them on the shared ``missing_download_executor`` lets a
# burst of album batches (e.g. a big Album-Completeness "Fix all" → wishlist)
# starve the per-track flow AND the user's manual "Download Wishlist" (#740).
# Routing them here keeps the shared pool free. Falls back to the shared
# executor when unset (older callers / tests) — see the submit site below.
album_bundle_executor: Any = None
def remove_completed_tracks_from_wishlist(
@ -901,7 +909,15 @@ def process_wishlist_automatically(runtime: WishlistAutoProcessingRuntime, autom
f"({len(group.tracks)} tracks) → {album_batch_id} [run {wishlist_run_id[:8]}]"
)
_submitted_batches.append(album_batch_id)
runtime.missing_download_executor.submit(
# Album bundles block their worker thread for the whole
# search+download, so run them on the dedicated album
# pool — never the shared pool that serves analysis,
# per-track downloads and the manual wishlist (#740).
# Fall back to the shared pool if unset (older callers).
_album_executor = (
runtime.album_bundle_executor or runtime.missing_download_executor
)
_album_executor.submit(
runtime.run_full_missing_tracks_process,
album_batch_id, playlist_id, group.tracks,
)

@ -131,6 +131,7 @@ def _build_runtime(
batch_map=None,
guard_acquired=True,
is_actually_processing=False,
album_bundle_executor=None,
):
if progress_calls is None:
progress_calls = []
@ -171,6 +172,7 @@ def _build_runtime(
update_automation_progress=progress_callback,
automation_engine=None,
missing_download_executor=executor,
album_bundle_executor=album_bundle_executor,
run_full_missing_tracks_process=lambda *args, **kwargs: None,
get_batch_max_concurrent=lambda: 4,
get_active_server=lambda: active_server,
@ -466,3 +468,86 @@ def test_process_wishlist_automatically_skips_when_wishlist_batch_is_already_act
assert guard_events == ["enter", "exit"]
assert [kwargs.get("progress") for _args, kwargs in progress_calls if "progress" in kwargs] == [10]
assert any("already active in another batch" in msg for msg in logger.info_messages)
# --- #740: album-bundle batches must route to the dedicated pool ------------
def _two_album_tracks_plus_orphan():
"""2 missing tracks from one album (→ album-bundle batch) + 1 orphan
track with no album metadata ( residual per-track batch)."""
return [
{
"name": "Album Song 1",
"artists": [{"name": "Artist 1"}],
"spotify_data": {
"album": {"id": "alb1", "name": "Album One", "album_type": "album"},
"artists": [{"name": "Artist 1"}],
},
},
{
"name": "Album Song 2",
"artists": [{"name": "Artist 1"}],
"spotify_data": {
"album": {"id": "alb1", "name": "Album One", "album_type": "album"},
"artists": [{"name": "Artist 1"}],
},
},
{
"name": "Orphan",
"artists": [{"name": "X"}],
"spotify_data": {"album": {"album_type": "album"}, "artists": [{"name": "X"}]},
},
]
def test_album_subbatches_route_to_dedicated_album_pool():
"""#740: per-album bundle batches (which block their worker thread for the
whole search+download) must be submitted to the dedicated album_bundle_executor,
NOT the shared missing_download_executor so a burst of album batches can't
starve the per-track flow / manual wishlist. The residual per-track batch
still goes to the shared pool."""
album_executor = _FakeExecutor()
batch_map = {}
runtime, _s, _p, _db, shared_executor, _l, _pr, _g = _build_runtime(
tracks=_two_album_tracks_plus_orphan(),
cycle_value="albums",
count=3,
batch_map=batch_map,
album_bundle_executor=album_executor,
)
process_wishlist_automatically(runtime, automation_id="auto-pool")
# Album sub-batch → dedicated album pool; residual → shared pool.
assert len(album_executor.submissions) == 1
assert len(shared_executor.submissions) == 1
album_batch_id = album_executor.submissions[0][1][0]
assert batch_map[album_batch_id]["is_album_download"] is True
assert batch_map[album_batch_id]["album_context"]["name"] == "Album One"
residual_batch_id = shared_executor.submissions[0][1][0]
assert batch_map[residual_batch_id].get("is_album_download") is not True
def test_album_subbatches_fall_back_to_shared_pool_when_no_album_pool():
"""Bulletproofing: if no dedicated album pool is wired (older callers /
tests), album batches fall back to the shared executor i.e. exactly the
pre-fix behavior, so nothing breaks."""
batch_map = {}
runtime, _s, _p, _db, shared_executor, _l, _pr, _g = _build_runtime(
tracks=_two_album_tracks_plus_orphan(),
cycle_value="albums",
count=3,
batch_map=batch_map,
album_bundle_executor=None, # not wired
)
process_wishlist_automatically(runtime, automation_id="auto-fallback")
# Both the album sub-batch and the residual land on the shared pool.
assert len(shared_executor.submissions) == 2
assert any(
batch_map[args[0]].get("is_album_download") is True
for _fn, args, _kw in shared_executor.submissions
)

@ -845,6 +845,16 @@ retag_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="RetagWork
# Shared task/batch state now lives in core.runtime_state.
missing_download_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix="MissingTrackWorker")
# Dedicated pool for per-album bundle downloads (#740). An album-bundle batch
# blocks its worker thread for the entire search+download; if these run on the
# shared missing_download_executor, a burst of album batches (e.g. a large
# Album-Completeness "Fix all" → wishlist that splits into ~one batch per album)
# saturates all 3 workers and starves the per-track flow AND the user's manual
# "Download Wishlist" analysis, which then never starts. Keeping them on their
# own bounded pool decouples that: hung/slow album downloads can only delay
# other album downloads, never the user-facing path.
album_bundle_executor = ThreadPoolExecutor(max_workers=3, thread_name_prefix="AlbumBundleWorker")
# Parallelizes the per-file metadata-lookup + post-processing in
# /api/import/singles/process. Single-file work is dominated by
# Spotify/iTunes/Deezer search round-trips so 3 workers give a near-
@ -1663,6 +1673,7 @@ def _shutdown_runtime_components():
(retag_executor, "retag executor"),
(sync_executor, "sync executor"),
(missing_download_executor, "missing download executor"),
(album_bundle_executor, "album bundle executor"),
(import_singles_executor, "import singles executor"),
(tidal_discovery_executor, "tidal discovery executor"),
(deezer_discovery_executor, "deezer discovery executor"),
@ -14290,6 +14301,7 @@ def _process_wishlist_automatically(automation_id=None):
update_automation_progress=_update_automation_progress,
automation_engine=automation_engine,
missing_download_executor=missing_download_executor,
album_bundle_executor=album_bundle_executor,
run_full_missing_tracks_process=_run_full_missing_tracks_process,
get_batch_max_concurrent=_get_batch_max_concurrent,
get_active_server=config_manager.get_active_media_server,

Loading…
Cancel
Save