mirror of https://github.com/Nezreka/SoulSync.git
Phase 1c.2.1 splits each wishlist run across multiple
``download_batches`` rows (per-album bundle dispatch). The
download-missing modal opens against the original batch_id
allocated by ``start_manual_wishlist_download_batch`` /
``process_wishlist_automatically``. Pre-fix that batch_id was
just one sibling among N, so the modal went stale as soon as the
primary sub-batch finished — subsequent albums downloaded fine
but no live status reached the UI.
Fix: backend merges every sibling sub-batch's tasks +
analysis_results into the response keyed under the originally-
requested batch_id. Modal sees one unified view of the whole run
without knowing about the split. Frontend untouched.
Architecture (Kettui standards):
- ``core/downloads/wishlist_aggregator.py`` — pure
``merge_wishlist_run_status(primary, siblings)`` helper.
No IO, no runtime state, no globals. Lifted out of
``status.py`` so the merge contract can be pinned via unit
tests without standing up the live ``download_batches`` /
``download_tasks`` state.
- ``core/downloads/status.py``'s ``build_batched_status`` now
pre-indexes ``download_batches`` by ``wishlist_run_id`` inside
the existing ``tasks_lock`` snapshot, then runs the merge
helper whenever a requested batch has a sibling.
Merge rules pinned by 12 tests:
- ``track_index`` re-indexed globally 0..N-1 across the merged
``analysis_results`` so the modal's ``data-track-index`` DOM
keys don't collide between siblings. Tasks' ``track_index``
follows the same remap so the analysis-results ↔ tasks
cross-reference stays intact.
- ``task_id`` is uuid per task — no collision concern.
- Phase: error is sticky; otherwise the LEAST-complete
pre-terminal phase wins (analysis < album_downloading <
downloading). All-complete returns ``complete``; mixed
complete + active returns ``downloading`` so the modal stays
alive until every sibling lands.
- ``album_bundle``: picks whichever sibling currently has an
active bundle download (state in
``{searching, downloading, downloading_release, staging}``).
Falls back to the first non-empty bundle so a completed run
still shows a progress bar.
- ``analysis_progress`` summed across siblings.
- ``active_count`` summed; ``max_concurrent`` keeps primary's
value as the representative.
- ``playlist_id`` + ``playlist_name`` preserved from the primary
(the row the modal originally opened against).
Legacy single-batch wishlist runs (no ``wishlist_run_id`` on the
batch) skip the merge entirely — passthrough. Back-compat by
absence.
1108 tests across downloads + wishlist + automation + imports +
playlist-sources + lb-series suites green. 12 new aggregator
tests pin the merge contract.
Closes the open UX gap from the Phase 1c.2.1 ship — modal now
tracks every sibling sub-batch's progress for the full duration
of the wishlist run.
pull/709/head
parent
c002014f10
commit
7f751202d2
@ -0,0 +1,186 @@
|
||||
"""Merge sibling download_batches statuses into one view for the
|
||||
wishlist-run model.
|
||||
|
||||
When the wishlist runs are split into per-album sub-batches
|
||||
(Phase 1c.2.1), the frontend modal polls the ORIGINAL batch id
|
||||
allocated by ``start_manual_wishlist_download_batch`` /
|
||||
``process_wishlist_automatically``. That batch id is now just one
|
||||
sibling among N. Without merging, the modal goes blank after the
|
||||
first sibling finishes because subsequent siblings live under
|
||||
fresh batch ids the modal never learned about.
|
||||
|
||||
This module is the merge layer: pure function, no IO, no runtime
|
||||
state. ``build_batched_status`` in ``core/downloads/status.py``
|
||||
calls into it when a requested batch has ``wishlist_run_id`` set
|
||||
and at least one sibling exists.
|
||||
|
||||
Design notes:
|
||||
|
||||
- ``track_index`` re-indexed to a global 0..N-1 across the merged
|
||||
results so the modal's ``data-track-index`` DOM keys don't
|
||||
collide between siblings (each sibling locally starts at 0).
|
||||
Tasks reference their analysis result via track_index, so the
|
||||
remap is applied to tasks too.
|
||||
- ``task_id`` is a uuid per task — no collision concern across
|
||||
siblings.
|
||||
- Phase aggregation surfaces the LEAST-complete pre-terminal phase
|
||||
so the modal stays "alive" until every sibling is done. Sticky
|
||||
``error`` so failures don't get hidden by a running sibling.
|
||||
- ``album_bundle`` is picked from whichever sibling currently has
|
||||
an active bundle download — gives the user a useful progress
|
||||
bar even when the primary sibling is past its bundle stage.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
_PHASE_PRIORITY = (
|
||||
'analysis',
|
||||
'album_downloading',
|
||||
'downloading',
|
||||
'complete',
|
||||
)
|
||||
_ACTIVE_BUNDLE_STATES = frozenset({
|
||||
'searching',
|
||||
'downloading',
|
||||
'downloading_release',
|
||||
'staging',
|
||||
})
|
||||
|
||||
|
||||
def _aggregate_phases(phases: List[str]) -> str:
|
||||
"""Pick the merged phase for a multi-sibling wishlist run.
|
||||
|
||||
Rules:
|
||||
- ``error`` is sticky — if any sibling errored, surface error.
|
||||
- Otherwise return the LEAST-complete pre-terminal phase in
|
||||
priority order (analysis < album_downloading < downloading
|
||||
< complete).
|
||||
- If all siblings are ``complete``, return ``complete``.
|
||||
- Fallback to the first non-empty phase if nothing matches a
|
||||
known priority.
|
||||
"""
|
||||
phases = [p for p in phases if p]
|
||||
if not phases:
|
||||
return 'unknown'
|
||||
if 'error' in phases:
|
||||
return 'error'
|
||||
for p in _PHASE_PRIORITY:
|
||||
if p in phases:
|
||||
if p == 'complete':
|
||||
return 'complete' if all(s == 'complete' for s in phases) else 'downloading'
|
||||
return p
|
||||
return phases[0]
|
||||
|
||||
|
||||
def _pick_active_album_bundle(statuses: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
|
||||
"""Return the album_bundle of whichever sibling is currently
|
||||
staging or downloading. Falls back to the first non-empty
|
||||
bundle when nothing is active (so a completed bundle still
|
||||
shows up vs. a totally empty progress bar)."""
|
||||
fallback = None
|
||||
for s in statuses:
|
||||
bundle = s.get('album_bundle')
|
||||
if not bundle:
|
||||
continue
|
||||
if fallback is None:
|
||||
fallback = bundle
|
||||
state = (bundle.get('state') or '').lower()
|
||||
if state in _ACTIVE_BUNDLE_STATES:
|
||||
return bundle
|
||||
return fallback
|
||||
|
||||
|
||||
def merge_wishlist_run_status(
|
||||
primary: Dict[str, Any],
|
||||
siblings: List[Dict[str, Any]],
|
||||
) -> Dict[str, Any]:
|
||||
"""Return a status dict that merges ``siblings`` into ``primary``.
|
||||
|
||||
Empty ``siblings`` is the legacy single-batch case — primary
|
||||
is returned unchanged.
|
||||
|
||||
The returned dict has the same shape as a single-batch status
|
||||
response from ``build_batch_status_data`` so the frontend
|
||||
modal needs no changes to consume it. Tracks and tasks are
|
||||
re-indexed globally; phase + progress + active_count
|
||||
aggregated across the run.
|
||||
"""
|
||||
if not siblings:
|
||||
return primary
|
||||
|
||||
all_statuses = [primary] + list(siblings)
|
||||
|
||||
# Phase aggregation.
|
||||
merged_phase = _aggregate_phases([s.get('phase', '') for s in all_statuses])
|
||||
|
||||
# Analysis progress — sum across siblings.
|
||||
total = 0
|
||||
processed = 0
|
||||
has_progress = False
|
||||
for s in all_statuses:
|
||||
ap = s.get('analysis_progress')
|
||||
if isinstance(ap, dict):
|
||||
total += int(ap.get('total') or 0)
|
||||
processed += int(ap.get('processed') or 0)
|
||||
has_progress = True
|
||||
|
||||
# Analysis results — concat + re-index. Build a (batch_obj_id,
|
||||
# old_track_index) -> new_track_index map so tasks can be
|
||||
# re-indexed consistently.
|
||||
merged_results: List[Dict[str, Any]] = []
|
||||
track_index_remap: Dict[tuple, int] = {}
|
||||
next_index = 0
|
||||
for s in all_statuses:
|
||||
batch_ref = id(s)
|
||||
for r in (s.get('analysis_results') or []):
|
||||
old_idx = int(r.get('track_index') or 0)
|
||||
track_index_remap[(batch_ref, old_idx)] = next_index
|
||||
new_r = dict(r)
|
||||
new_r['track_index'] = next_index
|
||||
merged_results.append(new_r)
|
||||
next_index += 1
|
||||
|
||||
# Tasks — concat + re-index using the remap above. Tasks
|
||||
# without a remapped entry keep their original track_index
|
||||
# (defensive — shouldn't happen if analysis_results is
|
||||
# consistent with the task list).
|
||||
merged_tasks: List[Dict[str, Any]] = []
|
||||
for s in all_statuses:
|
||||
batch_ref = id(s)
|
||||
for t in (s.get('tasks') or []):
|
||||
old_idx = int(t.get('track_index') or 0)
|
||||
new_t = dict(t)
|
||||
new_t['track_index'] = track_index_remap.get((batch_ref, old_idx), old_idx)
|
||||
merged_tasks.append(new_t)
|
||||
merged_tasks.sort(key=lambda x: x.get('track_index', 0))
|
||||
|
||||
# Album bundle — pick the active sibling's, fall back to first
|
||||
# bundle present, omit if none.
|
||||
merged_bundle = _pick_active_album_bundle(all_statuses)
|
||||
|
||||
# Worker accounting — sum active_count across siblings so the
|
||||
# modal's overall download progress display reflects total
|
||||
# in-flight work; max_concurrent stays from primary as
|
||||
# representative.
|
||||
active_total = sum(int(s.get('active_count') or 0) for s in all_statuses)
|
||||
|
||||
merged = dict(primary) # keeps playlist_id, playlist_name, error, etc.
|
||||
merged['phase'] = merged_phase
|
||||
if has_progress:
|
||||
merged['analysis_progress'] = {'total': total, 'processed': processed}
|
||||
merged['analysis_results'] = merged_results
|
||||
if merged_tasks or 'tasks' in primary:
|
||||
merged['tasks'] = merged_tasks
|
||||
if merged_bundle:
|
||||
merged['album_bundle'] = merged_bundle
|
||||
elif 'album_bundle' in primary:
|
||||
merged['album_bundle'] = primary['album_bundle']
|
||||
merged['active_count'] = active_total
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
__all__ = ['merge_wishlist_run_status']
|
||||
@ -0,0 +1,168 @@
|
||||
"""Unit tests for ``core/downloads/wishlist_aggregator.merge_wishlist_run_status``.
|
||||
|
||||
Pins the merge contract the wishlist-modal status path depends on
|
||||
(Phase 1c.2.1 follow-up): when one logical wishlist run is split
|
||||
across N sub-batches, the frontend modal polls the original
|
||||
batch_id and expects a unified view that covers every sibling.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from core.downloads.wishlist_aggregator import merge_wishlist_run_status
|
||||
|
||||
|
||||
def _status(phase, **kwargs):
|
||||
"""Build a minimal per-batch status dict shaped like
|
||||
``build_batch_status_data``'s output."""
|
||||
base = {
|
||||
'phase': phase,
|
||||
'playlist_id': 'wishlist',
|
||||
'playlist_name': 'Wishlist',
|
||||
'active_count': 0,
|
||||
'max_concurrent': 3,
|
||||
}
|
||||
base.update(kwargs)
|
||||
return base
|
||||
|
||||
|
||||
def test_empty_siblings_returns_primary_unchanged():
|
||||
primary = _status('downloading', tasks=[{'task_id': 't1', 'track_index': 0}])
|
||||
out = merge_wishlist_run_status(primary, [])
|
||||
assert out is primary
|
||||
|
||||
|
||||
def test_two_siblings_merge_tasks_with_reindexed_track_index():
|
||||
"""Both siblings locally start at track_index 0 — after merge,
|
||||
indices are globally unique 0..N-1."""
|
||||
primary = _status(
|
||||
'downloading',
|
||||
analysis_results=[
|
||||
{'track_index': 0, 'track': {'name': 'A1'}, 'found': False, 'confidence': 0.0},
|
||||
{'track_index': 1, 'track': {'name': 'A2'}, 'found': False, 'confidence': 0.0},
|
||||
],
|
||||
tasks=[
|
||||
{'task_id': 'task-a1', 'track_index': 0, 'status': 'downloading'},
|
||||
{'task_id': 'task-a2', 'track_index': 1, 'status': 'downloading'},
|
||||
],
|
||||
)
|
||||
sibling = _status(
|
||||
'downloading',
|
||||
analysis_results=[
|
||||
{'track_index': 0, 'track': {'name': 'B1'}, 'found': False, 'confidence': 0.0},
|
||||
],
|
||||
tasks=[
|
||||
{'task_id': 'task-b1', 'track_index': 0, 'status': 'searching'},
|
||||
],
|
||||
)
|
||||
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
|
||||
# Three globally-unique track indices.
|
||||
assert [r['track_index'] for r in merged['analysis_results']] == [0, 1, 2]
|
||||
# Each task's track_index re-indexed to match its analysis_result.
|
||||
indices_by_task = {t['task_id']: t['track_index'] for t in merged['tasks']}
|
||||
assert indices_by_task == {'task-a1': 0, 'task-a2': 1, 'task-b1': 2}
|
||||
# Tasks sorted by their new track_index.
|
||||
assert [t['task_id'] for t in merged['tasks']] == ['task-a1', 'task-a2', 'task-b1']
|
||||
|
||||
|
||||
def test_phase_aggregation_least_complete_pre_terminal_wins():
|
||||
"""analysis + downloading + complete → analysis."""
|
||||
primary = _status('complete')
|
||||
sibling1 = _status('downloading')
|
||||
sibling2 = _status('analysis')
|
||||
merged = merge_wishlist_run_status(primary, [sibling1, sibling2])
|
||||
assert merged['phase'] == 'analysis'
|
||||
|
||||
|
||||
def test_phase_aggregation_album_downloading_wins_over_downloading():
|
||||
primary = _status('downloading')
|
||||
sibling = _status('album_downloading')
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
assert merged['phase'] == 'album_downloading'
|
||||
|
||||
|
||||
def test_phase_aggregation_all_complete_returns_complete():
|
||||
primary = _status('complete')
|
||||
sibling1 = _status('complete')
|
||||
merged = merge_wishlist_run_status(primary, [sibling1])
|
||||
assert merged['phase'] == 'complete'
|
||||
|
||||
|
||||
def test_phase_aggregation_mixed_complete_and_other_returns_downloading():
|
||||
"""A finished sibling alongside a still-downloading sibling
|
||||
surfaces 'downloading' (the run isn't done)."""
|
||||
primary = _status('complete')
|
||||
sibling = _status('downloading')
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
assert merged['phase'] == 'downloading'
|
||||
|
||||
|
||||
def test_phase_aggregation_error_is_sticky():
|
||||
"""If any sibling errored, the merged phase is 'error' even
|
||||
if other siblings are still running. Modal should show the
|
||||
failure so the user notices."""
|
||||
primary = _status('downloading')
|
||||
sibling = _status('error')
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
assert merged['phase'] == 'error'
|
||||
|
||||
|
||||
def test_analysis_progress_summed_across_siblings():
|
||||
primary = _status(
|
||||
'analysis',
|
||||
analysis_progress={'total': 10, 'processed': 7},
|
||||
)
|
||||
sibling = _status(
|
||||
'analysis',
|
||||
analysis_progress={'total': 5, 'processed': 2},
|
||||
)
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
assert merged['analysis_progress'] == {'total': 15, 'processed': 9}
|
||||
|
||||
|
||||
def test_album_bundle_picks_active_sibling_over_idle():
|
||||
"""Primary is past its bundle stage (state='staged');
|
||||
sibling is currently downloading_release. Merge surfaces the
|
||||
active sibling's bundle so the progress bar stays useful."""
|
||||
primary = _status(
|
||||
'downloading',
|
||||
album_bundle={'state': 'staged', 'progress': 100, 'release': 'PRISM (Deluxe)'},
|
||||
)
|
||||
sibling = _status(
|
||||
'album_downloading',
|
||||
album_bundle={'state': 'downloading_release', 'progress': 42, 'release': '1432'},
|
||||
)
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
assert merged['album_bundle']['release'] == '1432'
|
||||
assert merged['album_bundle']['progress'] == 42
|
||||
|
||||
|
||||
def test_album_bundle_falls_back_when_no_active_sibling():
|
||||
primary = _status(
|
||||
'complete',
|
||||
album_bundle={'state': 'staged', 'progress': 100, 'release': 'PRISM (Deluxe)'},
|
||||
)
|
||||
sibling = _status(
|
||||
'complete',
|
||||
album_bundle={'state': 'staged', 'progress': 100, 'release': '1432'},
|
||||
)
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
# Falls back to primary's bundle (first non-empty).
|
||||
assert merged['album_bundle']['release'] == 'PRISM (Deluxe)'
|
||||
|
||||
|
||||
def test_active_count_summed_across_siblings():
|
||||
primary = _status('downloading', active_count=2)
|
||||
sibling = _status('downloading', active_count=1)
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
assert merged['active_count'] == 3
|
||||
|
||||
|
||||
def test_primary_playlist_id_preserved():
|
||||
primary = _status('downloading', playlist_id='wishlist', playlist_name='Wishlist (Auto)')
|
||||
sibling = _status('downloading', playlist_id='wishlist', playlist_name='Wishlist (Album: 1432)')
|
||||
merged = merge_wishlist_run_status(primary, [sibling])
|
||||
# Primary's playlist_name + playlist_id propagate (it's the row the modal opened against).
|
||||
assert merged['playlist_id'] == 'wishlist'
|
||||
assert merged['playlist_name'] == 'Wishlist (Auto)'
|
||||
Loading…
Reference in new issue