Wishlist modal: merge sibling sub-batches into one status response

Phase 1c.2.1 splits each wishlist run across multiple
``download_batches`` rows (per-album bundle dispatch). The
download-missing modal opens against the original batch_id
allocated by ``start_manual_wishlist_download_batch`` /
``process_wishlist_automatically``. Pre-fix that batch_id was
just one sibling among N, so the modal went stale as soon as the
primary sub-batch finished — subsequent albums downloaded fine
but no live status reached the UI.

Fix: backend merges every sibling sub-batch's tasks +
analysis_results into the response keyed under the originally-
requested batch_id. Modal sees one unified view of the whole run
without knowing about the split. Frontend untouched.

Architecture (Kettui standards):

- ``core/downloads/wishlist_aggregator.py`` — pure
  ``merge_wishlist_run_status(primary, siblings)`` helper.
  No IO, no runtime state, no globals. Lifted out of
  ``status.py`` so the merge contract can be pinned via unit
  tests without standing up the live ``download_batches`` /
  ``download_tasks`` state.
- ``core/downloads/status.py``'s ``build_batched_status`` now
  pre-indexes ``download_batches`` by ``wishlist_run_id`` inside
  the existing ``tasks_lock`` snapshot, then runs the merge
  helper whenever a requested batch has a sibling.

Merge rules pinned by 12 tests:

- ``track_index`` re-indexed globally 0..N-1 across the merged
  ``analysis_results`` so the modal's ``data-track-index`` DOM
  keys don't collide between siblings. Tasks' ``track_index``
  follows the same remap so the analysis-results ↔ tasks
  cross-reference stays intact.
- ``task_id`` is uuid per task — no collision concern.
- Phase: error is sticky; otherwise the LEAST-complete
  pre-terminal phase wins (analysis < album_downloading <
  downloading). All-complete returns ``complete``; mixed
  complete + active returns ``downloading`` so the modal stays
  alive until every sibling lands.
- ``album_bundle``: picks whichever sibling currently has an
  active bundle download (state in
  ``{searching, downloading, downloading_release, staging}``).
  Falls back to the first non-empty bundle so a completed run
  still shows a progress bar.
- ``analysis_progress`` summed across siblings.
- ``active_count`` summed; ``max_concurrent`` keeps primary's
  value as the representative.
- ``playlist_id`` + ``playlist_name`` preserved from the primary
  (the row the modal originally opened against).

Legacy single-batch wishlist runs (no ``wishlist_run_id`` on the
batch) skip the merge entirely — passthrough. Back-compat by
absence.

1108 tests across downloads + wishlist + automation + imports +
playlist-sources + lb-series suites green. 12 new aggregator
tests pin the merge contract.

Closes the open UX gap from the Phase 1c.2.1 ship — modal now
tracks every sibling sub-batch's progress for the full duration
of the wishlist run.
pull/709/head
Broque Thomas 22 hours ago
parent c002014f10
commit 7f751202d2

@ -476,7 +476,16 @@ def build_single_batch_status(batch_id: str, deps: StatusDeps) -> tuple[Optional
def build_batched_status(requested_batch_ids: list, deps: StatusDeps) -> dict:
"""For /api/download_status/batch. Returns the full response dict (always 200)."""
"""For /api/download_status/batch. Returns the full response dict (always 200).
When a requested batch carries a ``wishlist_run_id`` (Phase 1c.2.1
per-album split), the response merges in every sibling sub-batch
of the same run via ``merge_wishlist_run_status``. The merged view
lands keyed under the originally-requested ``batch_id`` so the
frontend modal (which polls one batch id) sees every sibling's
tasks + progress without needing to know about the split."""
from core.downloads.wishlist_aggregator import merge_wishlist_run_status
live_transfers_lookup = deps.get_cached_transfer_data()
response: dict[str, Any] = {"batches": {}}
@ -489,11 +498,49 @@ def build_batched_status(requested_batch_ids: list, deps: StatusDeps) -> dict:
else:
target_batches = download_batches.copy()
# Pre-index sibling batch ids by wishlist_run_id so the per-
# batch loop below can find them in O(1). Snapshot under the
# held lock; subsequent dict mutations don't matter for this
# build.
run_id_to_batch_ids: dict[str, list[str]] = {}
for bid, batch_row in download_batches.items():
run_id = (batch_row or {}).get('wishlist_run_id') if isinstance(batch_row, dict) else None
if run_id:
run_id_to_batch_ids.setdefault(str(run_id), []).append(bid)
for batch_id, batch in target_batches.items():
try:
response["batches"][batch_id] = build_batch_status_data(
primary_status = build_batch_status_data(
batch_id, batch, live_transfers_lookup, deps,
)
# Wishlist-run merge — kicks in only when this batch
# has a run_id AND at least one sibling exists. Falls
# through to legacy single-batch shape otherwise.
run_id = batch.get('wishlist_run_id') if isinstance(batch, dict) else None
sibling_ids = run_id_to_batch_ids.get(str(run_id), []) if run_id else []
if run_id and len(sibling_ids) > 1:
sibling_statuses = []
for sib_id in sibling_ids:
if sib_id == batch_id:
continue
sib_batch = download_batches.get(sib_id)
if not isinstance(sib_batch, dict):
continue
try:
sibling_statuses.append(
build_batch_status_data(
sib_id, sib_batch, live_transfers_lookup, deps,
)
)
except Exception as sib_err:
logger.warning(
f"[Wishlist Run] Sibling status build failed for {sib_id}: {sib_err}"
)
merged = merge_wishlist_run_status(primary_status, sibling_statuses)
response["batches"][batch_id] = merged
else:
response["batches"][batch_id] = primary_status
except Exception as batch_error:
logger.error(f"Error processing batch {batch_id}: {batch_error}")
response["batches"][batch_id] = {"error": str(batch_error)}

@ -0,0 +1,186 @@
"""Merge sibling download_batches statuses into one view for the
wishlist-run model.
When the wishlist runs are split into per-album sub-batches
(Phase 1c.2.1), the frontend modal polls the ORIGINAL batch id
allocated by ``start_manual_wishlist_download_batch`` /
``process_wishlist_automatically``. That batch id is now just one
sibling among N. Without merging, the modal goes blank after the
first sibling finishes because subsequent siblings live under
fresh batch ids the modal never learned about.
This module is the merge layer: pure function, no IO, no runtime
state. ``build_batched_status`` in ``core/downloads/status.py``
calls into it when a requested batch has ``wishlist_run_id`` set
and at least one sibling exists.
Design notes:
- ``track_index`` re-indexed to a global 0..N-1 across the merged
results so the modal's ``data-track-index`` DOM keys don't
collide between siblings (each sibling locally starts at 0).
Tasks reference their analysis result via track_index, so the
remap is applied to tasks too.
- ``task_id`` is a uuid per task no collision concern across
siblings.
- Phase aggregation surfaces the LEAST-complete pre-terminal phase
so the modal stays "alive" until every sibling is done. Sticky
``error`` so failures don't get hidden by a running sibling.
- ``album_bundle`` is picked from whichever sibling currently has
an active bundle download gives the user a useful progress
bar even when the primary sibling is past its bundle stage.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
_PHASE_PRIORITY = (
'analysis',
'album_downloading',
'downloading',
'complete',
)
_ACTIVE_BUNDLE_STATES = frozenset({
'searching',
'downloading',
'downloading_release',
'staging',
})
def _aggregate_phases(phases: List[str]) -> str:
"""Pick the merged phase for a multi-sibling wishlist run.
Rules:
- ``error`` is sticky if any sibling errored, surface error.
- Otherwise return the LEAST-complete pre-terminal phase in
priority order (analysis < album_downloading < downloading
< complete).
- If all siblings are ``complete``, return ``complete``.
- Fallback to the first non-empty phase if nothing matches a
known priority.
"""
phases = [p for p in phases if p]
if not phases:
return 'unknown'
if 'error' in phases:
return 'error'
for p in _PHASE_PRIORITY:
if p in phases:
if p == 'complete':
return 'complete' if all(s == 'complete' for s in phases) else 'downloading'
return p
return phases[0]
def _pick_active_album_bundle(statuses: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Return the album_bundle of whichever sibling is currently
staging or downloading. Falls back to the first non-empty
bundle when nothing is active (so a completed bundle still
shows up vs. a totally empty progress bar)."""
fallback = None
for s in statuses:
bundle = s.get('album_bundle')
if not bundle:
continue
if fallback is None:
fallback = bundle
state = (bundle.get('state') or '').lower()
if state in _ACTIVE_BUNDLE_STATES:
return bundle
return fallback
def merge_wishlist_run_status(
primary: Dict[str, Any],
siblings: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Return a status dict that merges ``siblings`` into ``primary``.
Empty ``siblings`` is the legacy single-batch case primary
is returned unchanged.
The returned dict has the same shape as a single-batch status
response from ``build_batch_status_data`` so the frontend
modal needs no changes to consume it. Tracks and tasks are
re-indexed globally; phase + progress + active_count
aggregated across the run.
"""
if not siblings:
return primary
all_statuses = [primary] + list(siblings)
# Phase aggregation.
merged_phase = _aggregate_phases([s.get('phase', '') for s in all_statuses])
# Analysis progress — sum across siblings.
total = 0
processed = 0
has_progress = False
for s in all_statuses:
ap = s.get('analysis_progress')
if isinstance(ap, dict):
total += int(ap.get('total') or 0)
processed += int(ap.get('processed') or 0)
has_progress = True
# Analysis results — concat + re-index. Build a (batch_obj_id,
# old_track_index) -> new_track_index map so tasks can be
# re-indexed consistently.
merged_results: List[Dict[str, Any]] = []
track_index_remap: Dict[tuple, int] = {}
next_index = 0
for s in all_statuses:
batch_ref = id(s)
for r in (s.get('analysis_results') or []):
old_idx = int(r.get('track_index') or 0)
track_index_remap[(batch_ref, old_idx)] = next_index
new_r = dict(r)
new_r['track_index'] = next_index
merged_results.append(new_r)
next_index += 1
# Tasks — concat + re-index using the remap above. Tasks
# without a remapped entry keep their original track_index
# (defensive — shouldn't happen if analysis_results is
# consistent with the task list).
merged_tasks: List[Dict[str, Any]] = []
for s in all_statuses:
batch_ref = id(s)
for t in (s.get('tasks') or []):
old_idx = int(t.get('track_index') or 0)
new_t = dict(t)
new_t['track_index'] = track_index_remap.get((batch_ref, old_idx), old_idx)
merged_tasks.append(new_t)
merged_tasks.sort(key=lambda x: x.get('track_index', 0))
# Album bundle — pick the active sibling's, fall back to first
# bundle present, omit if none.
merged_bundle = _pick_active_album_bundle(all_statuses)
# Worker accounting — sum active_count across siblings so the
# modal's overall download progress display reflects total
# in-flight work; max_concurrent stays from primary as
# representative.
active_total = sum(int(s.get('active_count') or 0) for s in all_statuses)
merged = dict(primary) # keeps playlist_id, playlist_name, error, etc.
merged['phase'] = merged_phase
if has_progress:
merged['analysis_progress'] = {'total': total, 'processed': processed}
merged['analysis_results'] = merged_results
if merged_tasks or 'tasks' in primary:
merged['tasks'] = merged_tasks
if merged_bundle:
merged['album_bundle'] = merged_bundle
elif 'album_bundle' in primary:
merged['album_bundle'] = primary['album_bundle']
merged['active_count'] = active_total
return merged
__all__ = ['merge_wishlist_run_status']

@ -0,0 +1,168 @@
"""Unit tests for ``core/downloads/wishlist_aggregator.merge_wishlist_run_status``.
Pins the merge contract the wishlist-modal status path depends on
(Phase 1c.2.1 follow-up): when one logical wishlist run is split
across N sub-batches, the frontend modal polls the original
batch_id and expects a unified view that covers every sibling.
"""
from __future__ import annotations
from core.downloads.wishlist_aggregator import merge_wishlist_run_status
def _status(phase, **kwargs):
"""Build a minimal per-batch status dict shaped like
``build_batch_status_data``'s output."""
base = {
'phase': phase,
'playlist_id': 'wishlist',
'playlist_name': 'Wishlist',
'active_count': 0,
'max_concurrent': 3,
}
base.update(kwargs)
return base
def test_empty_siblings_returns_primary_unchanged():
primary = _status('downloading', tasks=[{'task_id': 't1', 'track_index': 0}])
out = merge_wishlist_run_status(primary, [])
assert out is primary
def test_two_siblings_merge_tasks_with_reindexed_track_index():
"""Both siblings locally start at track_index 0 — after merge,
indices are globally unique 0..N-1."""
primary = _status(
'downloading',
analysis_results=[
{'track_index': 0, 'track': {'name': 'A1'}, 'found': False, 'confidence': 0.0},
{'track_index': 1, 'track': {'name': 'A2'}, 'found': False, 'confidence': 0.0},
],
tasks=[
{'task_id': 'task-a1', 'track_index': 0, 'status': 'downloading'},
{'task_id': 'task-a2', 'track_index': 1, 'status': 'downloading'},
],
)
sibling = _status(
'downloading',
analysis_results=[
{'track_index': 0, 'track': {'name': 'B1'}, 'found': False, 'confidence': 0.0},
],
tasks=[
{'task_id': 'task-b1', 'track_index': 0, 'status': 'searching'},
],
)
merged = merge_wishlist_run_status(primary, [sibling])
# Three globally-unique track indices.
assert [r['track_index'] for r in merged['analysis_results']] == [0, 1, 2]
# Each task's track_index re-indexed to match its analysis_result.
indices_by_task = {t['task_id']: t['track_index'] for t in merged['tasks']}
assert indices_by_task == {'task-a1': 0, 'task-a2': 1, 'task-b1': 2}
# Tasks sorted by their new track_index.
assert [t['task_id'] for t in merged['tasks']] == ['task-a1', 'task-a2', 'task-b1']
def test_phase_aggregation_least_complete_pre_terminal_wins():
"""analysis + downloading + complete → analysis."""
primary = _status('complete')
sibling1 = _status('downloading')
sibling2 = _status('analysis')
merged = merge_wishlist_run_status(primary, [sibling1, sibling2])
assert merged['phase'] == 'analysis'
def test_phase_aggregation_album_downloading_wins_over_downloading():
primary = _status('downloading')
sibling = _status('album_downloading')
merged = merge_wishlist_run_status(primary, [sibling])
assert merged['phase'] == 'album_downloading'
def test_phase_aggregation_all_complete_returns_complete():
primary = _status('complete')
sibling1 = _status('complete')
merged = merge_wishlist_run_status(primary, [sibling1])
assert merged['phase'] == 'complete'
def test_phase_aggregation_mixed_complete_and_other_returns_downloading():
"""A finished sibling alongside a still-downloading sibling
surfaces 'downloading' (the run isn't done)."""
primary = _status('complete')
sibling = _status('downloading')
merged = merge_wishlist_run_status(primary, [sibling])
assert merged['phase'] == 'downloading'
def test_phase_aggregation_error_is_sticky():
"""If any sibling errored, the merged phase is 'error' even
if other siblings are still running. Modal should show the
failure so the user notices."""
primary = _status('downloading')
sibling = _status('error')
merged = merge_wishlist_run_status(primary, [sibling])
assert merged['phase'] == 'error'
def test_analysis_progress_summed_across_siblings():
primary = _status(
'analysis',
analysis_progress={'total': 10, 'processed': 7},
)
sibling = _status(
'analysis',
analysis_progress={'total': 5, 'processed': 2},
)
merged = merge_wishlist_run_status(primary, [sibling])
assert merged['analysis_progress'] == {'total': 15, 'processed': 9}
def test_album_bundle_picks_active_sibling_over_idle():
"""Primary is past its bundle stage (state='staged');
sibling is currently downloading_release. Merge surfaces the
active sibling's bundle so the progress bar stays useful."""
primary = _status(
'downloading',
album_bundle={'state': 'staged', 'progress': 100, 'release': 'PRISM (Deluxe)'},
)
sibling = _status(
'album_downloading',
album_bundle={'state': 'downloading_release', 'progress': 42, 'release': '1432'},
)
merged = merge_wishlist_run_status(primary, [sibling])
assert merged['album_bundle']['release'] == '1432'
assert merged['album_bundle']['progress'] == 42
def test_album_bundle_falls_back_when_no_active_sibling():
primary = _status(
'complete',
album_bundle={'state': 'staged', 'progress': 100, 'release': 'PRISM (Deluxe)'},
)
sibling = _status(
'complete',
album_bundle={'state': 'staged', 'progress': 100, 'release': '1432'},
)
merged = merge_wishlist_run_status(primary, [sibling])
# Falls back to primary's bundle (first non-empty).
assert merged['album_bundle']['release'] == 'PRISM (Deluxe)'
def test_active_count_summed_across_siblings():
primary = _status('downloading', active_count=2)
sibling = _status('downloading', active_count=1)
merged = merge_wishlist_run_status(primary, [sibling])
assert merged['active_count'] == 3
def test_primary_playlist_id_preserved():
primary = _status('downloading', playlist_id='wishlist', playlist_name='Wishlist (Auto)')
sibling = _status('downloading', playlist_id='wishlist', playlist_name='Wishlist (Album: 1432)')
merged = merge_wishlist_run_status(primary, [sibling])
# Primary's playlist_name + playlist_id propagate (it's the row the modal opened against).
assert merged['playlist_id'] == 'wishlist'
assert merged['playlist_name'] == 'Wishlist (Auto)'

@ -3424,6 +3424,7 @@ const WHATS_NEW = {
{ title: 'Fix: album-bundle downloads all landing as track 1', desc: 'soulseek album-bundle downloads (and any other untagged release-staging path) were importing every track with track_number=1. the staging-file reader was using the auto-import\'s filename extractor that defaults to 1 when no NN- prefix is present in the filename — so for albums like Ryoto\'s "Cha-La Head-Cha-La" where slskd hands you bare titles, every file got "track 1" stamped on it. now the staging path uses a strict extractor that returns 0 when it can\'t see an explicit prefix, so the downstream resolver correctly falls through to the authoritative Spotify metadata and the right track numbers land in the library.' },
{ title: 'Wishlist albums-cycle: one album-bundle search per album', desc: 'auto-wishlist runs in two cycles — albums + singles. previously the albums cycle dumped every missing track from every album into one big batch and ran a separate per-track Soulseek / Prowlarr search for each (~50 searches for a typical scan). now the albums cycle splits the wishlist into per-album sub-batches at submission time, and each one engages the existing slskd / torrent / usenet album-bundle release-first flow with that album\'s context. tracks without resolvable album metadata stay on the classic per-track residual batch so nothing falls off. singles cycle is unchanged.' },
{ title: 'Wishlist cycle toggles once per run, not per sub-batch', desc: 'follow-up to the per-album split above. with N sub-batches per wishlist run, the cycle toggle (albums → singles or vice versa) was firing once per sub-batch as each one completed — so the cycle could flip mid-run and the automation completion event got emitted multiple times. each wishlist invocation now stamps every sub-batch with a shared run id; the completion handler waits until the LAST sibling finishes before toggling the cycle + emitting the run-complete event. cleaner state machine, future-proofs frontend grouping of sub-batches under one logical run.' },
{ title: 'Wishlist download modal: keeps tracking after first album', desc: 'with the per-album sub-batch split, the modal\'s polling kept hitting the original batch id only and went stale as soon as the first sub-batch finished — subsequent albums downloaded fine but the modal\'s rows stopped updating. backend now merges every sibling sub-batch\'s tasks + analysis results into the response under the original batch id (re-indexes track positions globally so the table doesn\'t collide on row keys, aggregates phase across siblings, picks whichever bundle is currently active). frontend untouched — the modal sees one unified view of the whole run.' },
],
'2.6.2': [
{ date: 'May 24, 2026 — 2.6.2 release' },

Loading…
Cancel
Save