fix: move manual wishlist cleanup into background worker

The manual wishlist download endpoint blocked the request thread on a
slow library-cleanup pass before submitting the batch — for a 24-track
wishlist that's ~50 per-track DB lookups serialised in the request
handler, taking 30+ seconds before the frontend got a response. The
modal sat at "Pending..." with no progress visible the whole time.

Split start_manual_wishlist_download_batch into:

1. SYNC path (request handler):
   - Generate batch_id, create download_batches entry with phase=analysis
     and analysis_total=0 placeholder.
   - Submit a single bg job (`_prepare_and_run_manual_wishlist_batch`) to
     the missing-download executor.
   - Return 200 with batch_id immediately. Frontend can start polling
     /api/active-processes status right away.

2. BG path (executor thread):
   - db.remove_wishlist_duplicates (slow-ish, single SQL)
   - remove_tracks_already_in_library (the slow one — per-track DB checks)
   - wishlist_service.get_wishlist_tracks_for_download
   - sanitize + dedupe + filter (track_ids / category)
   - Update batch.analysis_total with the real filtered count
   - add_activity_item("Wishlist Download Started", ...)
   - run_full_missing_tracks_process (master worker)

Edge case: if cleanup empties the wishlist, the bg job marks the batch
phase='complete' with error='No tracks in wishlist' (instead of the old
synchronous 400 response). Frontend status poll picks this up and the
modal can close cleanly.

Tests: existing 2 manual-download tests updated to drive the bg job
explicitly via a new `_run_submitted_bg_job` helper. Added 2 new tests:
- `..._returns_immediately_with_placeholder` — proves the sync path
  doesn't trigger any cleanup or master-worker calls; analysis_total=0.
- `..._marks_batch_complete_when_wishlist_empty_after_cleanup` —
  cleanup empties the list, master worker never invoked, batch ends
  with phase='complete'.

Full suite: 1232 passing (was 1230). Ruff clean.
pull/413/head
Broque Thomas 1 month ago
parent 8019e13a2e
commit 6a25dcd49e

@ -311,7 +311,66 @@ def start_manual_wishlist_download_batch(
category: str | None = None,
force_download_all: bool = False,
) -> tuple[Dict[str, Any], int]:
"""Prepare and submit a manual wishlist batch."""
"""Submit a manual wishlist batch.
The batch entry is created synchronously so the frontend can start polling
status immediately. The slow library-cleanup pass and master-worker hand-off
run in the background, freeing the request handler from a 30s+ block on
per-track DB checks for large wishlists.
"""
logger = runtime.logger
try:
batch_id = str(uuid.uuid4())
playlist_id = "wishlist"
playlist_name = "Wishlist"
with runtime.tasks_lock:
runtime.download_batches[batch_id] = {
'phase': 'analysis',
'playlist_id': playlist_id,
'playlist_name': playlist_name,
'queue': [],
'active_count': 0,
'max_concurrent': runtime.get_batch_max_concurrent(),
'queue_index': 0,
# analysis_total starts at 0; the bg job updates it after cleanup
# finishes and the real track count is known.
'analysis_total': 0,
'analysis_processed': 0,
'analysis_results': [],
'permanently_failed_tracks': [],
'cancelled_tracks': set(),
'force_download_all': True,
'profile_id': runtime.profile_id,
}
runtime.missing_download_executor.submit(
_prepare_and_run_manual_wishlist_batch,
runtime,
batch_id,
track_ids,
category,
)
return {"success": True, "batch_id": batch_id}, 200
except Exception as e:
logger.error(f"Error starting wishlist download process: {e}")
import traceback
traceback.print_exc()
return {"success": False, "error": str(e)}, 500
def _prepare_and_run_manual_wishlist_batch(
runtime: WishlistManualDownloadRuntime,
batch_id: str,
track_ids,
category: str | None,
) -> None:
"""Background worker for the manual wishlist batch — does the slow cleanup
+ sanitize + filter + master-worker hand-off off the request thread."""
logger = runtime.logger
try:
@ -340,7 +399,12 @@ def start_manual_wishlist_download_batch(
raw_wishlist_tracks = wishlist_service.get_wishlist_tracks_for_download(profile_id=manual_profile_id)
if not raw_wishlist_tracks:
return {"success": False, "error": "No tracks in wishlist"}, 400
logger.warning("[Manual-Wishlist] No tracks in wishlist after cleanup — marking batch complete")
with runtime.tasks_lock:
if batch_id in runtime.download_batches:
runtime.download_batches[batch_id]['phase'] = 'complete'
runtime.download_batches[batch_id]['error'] = 'No tracks in wishlist'
return
wishlist_tracks, duplicates_found = sanitize_and_dedupe_wishlist_tracks(raw_wishlist_tracks)
if duplicates_found > 0:
@ -372,41 +436,25 @@ def start_manual_wishlist_download_batch(
for i, track in enumerate(wishlist_tracks):
track['_original_index'] = i
runtime.add_activity_item("", "Wishlist Download Started", f"{len(wishlist_tracks)} tracks", "Now")
batch_id = str(uuid.uuid4())
playlist_id = "wishlist"
playlist_name = "Wishlist"
task_queue = []
# Update batch with the real track count now that filtering is done
with runtime.tasks_lock:
runtime.download_batches[batch_id] = {
'phase': 'analysis',
'playlist_id': playlist_id,
'playlist_name': playlist_name,
'queue': task_queue,
'active_count': 0,
'max_concurrent': runtime.get_batch_max_concurrent(),
'queue_index': 0,
'analysis_total': len(wishlist_tracks),
'analysis_processed': 0,
'analysis_results': [],
'permanently_failed_tracks': [],
'cancelled_tracks': set(),
'force_download_all': True,
'profile_id': manual_profile_id,
}
if batch_id in runtime.download_batches:
runtime.download_batches[batch_id]['analysis_total'] = len(wishlist_tracks)
logger.info(f"Starting wishlist batch {batch_id} with {len(wishlist_tracks)} tracks")
runtime.missing_download_executor.submit(runtime.run_full_missing_tracks_process, batch_id, playlist_id, wishlist_tracks)
runtime.add_activity_item("", "Wishlist Download Started", f"{len(wishlist_tracks)} tracks", "Now")
return {"success": True, "batch_id": batch_id}, 200
logger.info(f"Starting wishlist batch {batch_id} with {len(wishlist_tracks)} tracks")
runtime.run_full_missing_tracks_process(batch_id, "wishlist", wishlist_tracks)
except Exception as e:
logger.error(f"Error starting wishlist download process: {e}")
except Exception as exc:
logger.error(f"Error preparing manual wishlist batch {batch_id}: {exc}")
import traceback
traceback.print_exc()
return {"success": False, "error": str(e)}, 500
with runtime.tasks_lock:
if batch_id in runtime.download_batches:
runtime.download_batches[batch_id]['phase'] = 'error'
runtime.download_batches[batch_id]['error'] = str(exc)
def cleanup_wishlist_against_library(

@ -85,6 +85,7 @@ def _build_runtime(tracks, owned_matches=None, batch_map=None):
executor = _FakeExecutor()
logger = _FakeLogger()
activity_calls = []
master_calls = []
batch_map = batch_map or {}
runtime = WishlistManualDownloadRuntime(
@ -92,18 +93,57 @@ def _build_runtime(tracks, owned_matches=None, batch_map=None):
download_batches=batch_map,
tasks_lock=_FakeLock(),
missing_download_executor=executor,
run_full_missing_tracks_process=lambda *args, **kwargs: None,
run_full_missing_tracks_process=lambda *args, **kwargs: master_calls.append((args, kwargs)),
get_batch_max_concurrent=lambda: 4,
add_activity_item=lambda *args: activity_calls.append(args),
active_server="navidrome",
logger=logger,
profile_id=1,
)
return runtime, wishlist_service, music_db, executor, logger, activity_calls, batch_map
return runtime, wishlist_service, music_db, executor, logger, activity_calls, batch_map, master_calls
def _run_submitted_bg_job(executor):
"""Execute the bg job the executor received — simulates ThreadPoolExecutor."""
assert len(executor.submissions) == 1, "expected exactly one bg submission"
fn, args, kwargs = executor.submissions[0]
fn(*args, **kwargs)
def test_start_manual_wishlist_download_batch_returns_immediately_with_placeholder():
"""Endpoint returns 200 immediately; cleanup runs in the bg job."""
runtime, service, _db, executor, _logger, activity_calls, batch_map, master_calls = _build_runtime(
tracks=[
{
"id": "track-1",
"name": "Song 1",
"artists": [{"name": "Artist 1"}],
"album": {"name": "Album 1", "album_type": "album"},
},
]
)
payload, status = processing.start_manual_wishlist_download_batch(runtime)
# Synchronous response: 200 with batch_id, batch entry created with placeholder count.
assert status == 200
assert payload["success"] is True
assert "batch_id" in payload
assert batch_map[payload["batch_id"]]["analysis_total"] == 0 # placeholder
assert batch_map[payload["batch_id"]]["phase"] == "analysis"
assert batch_map[payload["batch_id"]]["force_download_all"] is True
# Cleanup has NOT yet run (no DB calls, no master worker invocation).
assert service.removed_ids == set()
assert master_calls == []
assert activity_calls == []
# The bg job is queued.
assert len(executor.submissions) == 1
def test_start_manual_wishlist_download_batch_filters_track_ids_and_starts_batch():
runtime, _service, _db, executor, logger, activity_calls, batch_map = _build_runtime(
runtime, _service, _db, executor, logger, activity_calls, batch_map, master_calls = _build_runtime(
tracks=[
{
"id": "track-1",
@ -129,19 +169,23 @@ def test_start_manual_wishlist_download_batch_filters_track_ids_and_starts_batch
assert status == 200
assert payload["success"] is True
assert "batch_id" in payload
# Run the bg job that the executor would have run on a real thread.
_run_submitted_bg_job(executor)
assert activity_calls == [("", "Wishlist Download Started", "1 tracks", "Now")]
assert len(executor.submissions) == 1
_submitted_fn, submitted_args, _submitted_kwargs = executor.submissions[0]
assert submitted_args[1] == "wishlist"
assert submitted_args[2][0]["id"] == "track-2"
assert submitted_args[2][0]["_original_index"] == 0
assert len(master_calls) == 1
master_args, _ = master_calls[0]
assert master_args[1] == "wishlist"
assert master_args[2][0]["id"] == "track-2"
assert master_args[2][0]["_original_index"] == 0
assert batch_map[payload["batch_id"]]["analysis_total"] == 1
assert batch_map[payload["batch_id"]]["force_download_all"] is True
assert any("Filtered to 1 specific tracks by ID" in msg for msg in logger.info_messages)
def test_start_manual_wishlist_download_batch_skips_enhance_tracks_during_cleanup():
runtime, service, _db, executor, logger, activity_calls, batch_map = _build_runtime(
runtime, service, _db, executor, logger, activity_calls, batch_map, master_calls = _build_runtime(
tracks=[
{
"id": "enhance-1",
@ -164,10 +208,39 @@ def test_start_manual_wishlist_download_batch_skips_enhance_tracks_during_cleanu
assert status == 200
assert payload["success"] is True
# Run the bg job — cleanup happens here, not in the request handler.
_run_submitted_bg_job(executor)
assert service.removed_ids == {"owned-1"}
assert len(executor.submissions) == 1
_submitted_fn, submitted_args, _submitted_kwargs = executor.submissions[0]
assert [track["id"] for track in submitted_args[2]] == ["enhance-1"]
assert len(master_calls) == 1
master_args, _ = master_calls[0]
assert [track["id"] for track in master_args[2]] == ["enhance-1"]
assert batch_map[payload["batch_id"]]["analysis_total"] == 1
assert activity_calls == [("", "Wishlist Download Started", "1 tracks", "Now")]
assert any("Cleaned up 1 already-owned tracks" in msg for msg in logger.info_messages)
def test_bg_job_marks_batch_complete_when_wishlist_empty_after_cleanup():
"""If cleanup empties the wishlist, the bg job marks the batch complete (not error 400)."""
runtime, _service, _db, executor, _logger, _activity, batch_map, master_calls = _build_runtime(
tracks=[
{
"id": "owned-1",
"name": "Owned Song",
"artists": [{"name": "Artist B"}],
"album": {"name": "Owned Album", "album_type": "album"},
},
],
owned_matches={("Owned Song", "Artist B")},
)
payload, status = processing.start_manual_wishlist_download_batch(runtime)
assert status == 200
_run_submitted_bg_job(executor)
# Cleanup removed the only track. Master worker never called. Batch marked complete.
assert master_calls == []
assert batch_map[payload["batch_id"]]["phase"] == "complete"
assert batch_map[payload["batch_id"]]["error"] == "No tracks in wishlist"

Loading…
Cancel
Save