Merge pull request #416 from Nezreka/refactor/lift-downloads-candidates-context

PR6: lift _attempt_download_with_candidates to core/downloads/candida…
pull/417/head
BoulderBadgeDad 4 weeks ago committed by GitHub
commit fe936c4c7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,372 @@
"""Candidate fallback download logic.
`attempt_download_with_candidates(task_id, candidates, track, batch_id, deps)`
is the function the search/match pipeline calls once it has a sorted list of
Soulseek candidates for a track. It walks the candidates by descending
confidence and starts the first one that:
1. Hasn't been tried for this task already (`used_sources` dedup).
2. Isn't blacklisted (user-flagged bad match).
3. Doesn't trigger a cancellation race (checked at three points).
When a candidate accepts:
- Stores rich post-processing context in `matched_downloads_context` keyed by
`make_context_key(username, filename)` clean Spotify metadata, album
context (real or synthesized), `is_album_download` flag, batch/task IDs.
- For tracks with clean Spotify data, resolves track_number / disc_number
from (1) track_info (2) track object (3) Spotify API call, with album
metadata backfilled from the API response when local context is incomplete.
- Updates the task with the assigned `download_id`, falls through with a
"searching" reset on failure so the next attempt finds a clean state.
On cancellation mid-download, attempts to cancel the active Soulseek transfer
and notifies the lifecycle via `on_download_completed(success=False)` so the
worker slot frees up.
Lifted verbatim from web_server.py. Wide dependency surface
(soulseek_client, spotify_client, lifecycle callback, context-key helper,
status updater, DB) all injected via `CandidatesDeps`.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from typing import Any, Callable
from core.runtime_state import (
download_tasks,
matched_context_lock,
matched_downloads_context,
tasks_lock,
)
logger = logging.getLogger(__name__)
@dataclass
class CandidatesDeps:
"""Bundle of cross-cutting deps the candidate-fallback logic needs."""
soulseek_client: Any
spotify_client: Any
run_async: Callable[..., Any]
get_database: Callable[[], Any]
update_task_status: Callable
make_context_key: Callable[[str, str], str]
on_download_completed: Callable
def attempt_download_with_candidates(task_id, candidates, track, batch_id=None, deps: CandidatesDeps = None):
"""
Attempts to download with fallback candidate logic (matches GUI's retry_parallel_download_with_fallback).
Returns True if successful, False if all candidates fail.
"""
# Sort candidates by confidence (best first)
candidates.sort(key=lambda r: r.confidence, reverse=True)
with tasks_lock:
task = download_tasks.get(task_id)
if not task:
return False
used_sources = task.get('used_sources', set())
# Try each candidate until one succeeds (like GUI's fallback logic)
for candidate_index, candidate in enumerate(candidates):
# Check cancellation before each attempt
with tasks_lock:
if task_id not in download_tasks:
logger.info(f"[Modal Worker] Task {task_id} was deleted during candidate {candidate_index + 1}")
return False
if download_tasks[task_id]['status'] == 'cancelled':
logger.warning(f"[Modal Worker] Task {task_id} cancelled during candidate {candidate_index + 1}")
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
return False
download_tasks[task_id]['current_candidate_index'] = candidate_index
# Create source key to avoid duplicate attempts (like GUI)
source_key = f"{candidate.username}_{candidate.filename}"
if source_key in used_sources:
logger.info(f"[Modal Worker] Skipping already tried source: {source_key}")
continue
# Blacklist check — skip sources the user has flagged as bad matches
try:
_bl_db = deps.get_database()
if _bl_db.is_blacklisted(candidate.username, candidate.filename):
logger.info(f"[Modal Worker] Skipping blacklisted source: {source_key}")
continue
except Exception:
pass
# CRITICAL: Add source to used_sources IMMEDIATELY to prevent race conditions
# This must happen BEFORE starting download to prevent multiple retries from picking same source
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['used_sources'].add(source_key)
logger.info(f"[Modal Worker] Marked source as used before download attempt: {source_key}")
logger.info(f"[Modal Worker] Trying candidate {candidate_index + 1}/{len(candidates)}: {candidate.filename} (Confidence: {candidate.confidence:.2f})")
try:
# Update task status to downloading
deps.update_task_status(task_id, 'downloading')
# Prepare download - check if we have explicit album context from artist page
track_info = {}
with tasks_lock:
if task_id in download_tasks:
raw_track_info = download_tasks[task_id].get('track_info')
track_info = raw_track_info if isinstance(raw_track_info, dict) else {}
# Use explicit album/artist context if available (from artist album downloads)
has_explicit_context = track_info and track_info.get('_is_explicit_album_download', False)
if has_explicit_context:
# Use the real Spotify album/artist data from the UI
explicit_album = track_info.get('_explicit_album_context', {})
explicit_artist = track_info.get('_explicit_artist_context', {})
# Normalize artist context if it's a plain string (e.g. from wishlist spotify_data)
if isinstance(explicit_artist, str):
explicit_artist = {'name': explicit_artist}
spotify_artist_context = {
'id': explicit_artist.get('id', 'explicit_artist'),
'name': explicit_artist.get('name', track.artists[0] if track.artists else 'Unknown'),
'genres': explicit_artist.get('genres', [])
}
# Handle both image_url formats (direct string or images array)
album_image_url = None
if explicit_album.get('image_url'):
# Backend API returns image_url as direct string
album_image_url = explicit_album.get('image_url')
elif explicit_album.get('images'):
# Fallback: images array format from Spotify API
album_image_url = explicit_album.get('images', [{}])[0].get('url')
spotify_album_context = {
'id': explicit_album.get('id', 'explicit_album'),
'name': explicit_album.get('name', track.album),
'release_date': explicit_album.get('release_date', ''),
'image_url': album_image_url,
'total_tracks': explicit_album.get('total_tracks', 0),
'total_discs': explicit_album.get('total_discs', 1),
'album_type': explicit_album.get('album_type', 'album'),
'artists': explicit_album.get('artists', [{'name': spotify_artist_context.get('name', '')}])
}
logger.info(f"[Explicit Context] Using real album data: '{spotify_album_context['name']}' ({spotify_album_context['album_type']}, {spotify_album_context['total_discs']} disc(s))")
else:
# Fallback to generic context for playlists/wishlists
# Extract album metadata from track_info if available (discovery enriches tracks with full album objects)
fallback_album = track_info.get('album', {}) if track_info else {}
if isinstance(fallback_album, str):
fallback_album = {'name': fallback_album}
elif not isinstance(fallback_album, dict):
fallback_album = {}
fallback_image_url = None
fallback_images = fallback_album.get('images', [])
if fallback_album.get('image_url'):
fallback_image_url = fallback_album['image_url']
elif fallback_images and isinstance(fallback_images, list) and len(fallback_images) > 0:
fallback_image_url = fallback_images[0].get('url') if isinstance(fallback_images[0], dict) else None
spotify_artist_context = {'id': 'from_sync_modal', 'name': track.artists[0] if track.artists else 'Unknown', 'genres': []}
# Preserve album-level artists for consistent folder naming
_fallback_album_artists = fallback_album.get('artists', [])
if not _fallback_album_artists:
_fallback_album_artists = [{'name': track.artists[0]}] if track.artists else []
spotify_album_context = {
'id': fallback_album.get('id', 'from_sync_modal'),
'name': fallback_album.get('name', '') or track.album,
'release_date': fallback_album.get('release_date', ''),
'image_url': fallback_image_url,
'album_type': fallback_album.get('album_type', 'album'),
'total_tracks': fallback_album.get('total_tracks', 0),
'total_discs': fallback_album.get('total_discs', 1),
'artists': _fallback_album_artists
}
download_payload = candidate.__dict__
username = download_payload.get('username')
filename = download_payload.get('filename')
size = download_payload.get('size', 0)
if not username or not filename:
logger.error("[Modal Worker] Invalid candidate data: missing username or filename")
continue
# PROTECTION: Check if there's already an active download for this task
current_download_id = None
with tasks_lock:
if task_id in download_tasks:
current_download_id = download_tasks[task_id].get('download_id')
if current_download_id:
logger.info(f"[Modal Worker] Task {task_id} already has active download {current_download_id} - skipping new download attempt")
logger.info("[Modal Worker] This prevents race condition where multiple retries start overlapping downloads")
continue
# Initiate download
logger.info(f"[Modal Worker] Starting download: {username} / {os.path.basename(filename)}")
download_id = deps.run_async(deps.soulseek_client.download(username, filename, size))
if download_id:
# Store context for post-processing with complete Spotify metadata (GUI PARITY)
context_key = deps.make_context_key(username, filename)
with matched_context_lock:
# Create WebUI equivalent of GUI's SpotifyBasedSearchResult data structure
enhanced_payload = download_payload.copy()
# Extract clean Spotify metadata from track object (same as GUI)
has_clean_spotify_data = track and hasattr(track, 'name') and hasattr(track, 'album')
if has_clean_spotify_data:
# Use clean Spotify metadata (matches GUI's SpotifyBasedSearchResult)
enhanced_payload['spotify_clean_title'] = track.name
enhanced_payload['spotify_clean_album'] = track.album
enhanced_payload['spotify_clean_artist'] = track.artists[0] if track.artists else enhanced_payload.get('artist', '')
# Preserve all artists for metadata tagging
enhanced_payload['artists'] = [{'name': artist} for artist in track.artists] if track.artists else []
logger.info(f"[Context] Using clean Spotify metadata - Album: '{track.album}', Title: '{track.name}'")
# Get track_number and disc_number — prefer track data we already have,
# fall back to detailed API call only if needed
got_track_number = False
# 1. Try track_info (from frontend, has album track data)
tn = track_info.get('track_number', 0) if isinstance(track_info, dict) else 0
dn = track_info.get('disc_number', 1) if isinstance(track_info, dict) else 1
if tn and tn > 0:
enhanced_payload['track_number'] = tn
enhanced_payload['disc_number'] = dn
got_track_number = True
logger.info(f"[Context] Added track_number from track_info: {tn}, disc_number: {dn}")
# 2. Try the track object itself (from album tracks response)
if not got_track_number and hasattr(track, 'track_number') and track.track_number:
enhanced_payload['track_number'] = track.track_number
enhanced_payload['disc_number'] = getattr(track, 'disc_number', 1) or 1
got_track_number = True
logger.info(f"[Context] Added track_number from track object: {track.track_number}, disc_number: {enhanced_payload['disc_number']}")
# 3. Last resort — fetch from metadata source API
if not got_track_number and hasattr(track, 'id') and track.id:
try:
detailed_track = deps.spotify_client.get_track_details(track.id)
if detailed_track and detailed_track.get('track_number'):
enhanced_payload['track_number'] = detailed_track['track_number']
enhanced_payload['disc_number'] = detailed_track.get('disc_number', 1)
got_track_number = True
logger.info(f"[Context] Added track_number from API: {detailed_track['track_number']}, disc_number: {enhanced_payload['disc_number']}")
# Backfill album metadata from detailed track when context
# has incomplete data (missing release_date, total_tracks, etc.)
if isinstance(detailed_track.get('album'), dict):
dt_album = detailed_track['album']
if not spotify_album_context.get('release_date') and dt_album.get('release_date'):
spotify_album_context['release_date'] = dt_album['release_date']
logger.info(f"[Context] Backfilled release_date from API: {dt_album['release_date']}")
if not spotify_album_context.get('album_type') and dt_album.get('album_type'):
spotify_album_context['album_type'] = dt_album['album_type']
if not spotify_album_context.get('total_tracks') and dt_album.get('total_tracks'):
spotify_album_context['total_tracks'] = dt_album['total_tracks']
if not spotify_album_context.get('id') and dt_album.get('id'):
spotify_album_context['id'] = dt_album['id']
if not spotify_album_context.get('image_url') and dt_album.get('images'):
spotify_album_context['image_url'] = dt_album['images'][0].get('url', '')
except Exception as e:
logger.error(f"[Context] API track details failed: {e}")
if not got_track_number:
enhanced_payload.setdefault('track_number', 0)
enhanced_payload.setdefault('disc_number', 1)
logger.warning("[Context] No track_number found from any source")
# Determine if this should be treated as album download
# First check if we have explicit album context from artist page
if has_explicit_context:
is_album_context = True
logger.info("[Context] Using explicit album context flag from artist page")
else:
# Fall back to guessing based on clean data
is_album_context = (
track.album and
track.album.strip() and
track.album != "Unknown Album" and
track.album.lower() != track.name.lower() # Album different from track
)
else:
# Fallback to original data
enhanced_payload['spotify_clean_title'] = enhanced_payload.get('title', '')
enhanced_payload['spotify_clean_album'] = enhanced_payload.get('album', '')
enhanced_payload['spotify_clean_artist'] = enhanced_payload.get('artist', '')
# Preserve existing artists array if available, otherwise create from single artist
if 'artists' not in enhanced_payload and enhanced_payload.get('artist'):
enhanced_payload['artists'] = [{'name': enhanced_payload['artist']}]
enhanced_payload['track_number'] = track_info.get('track_number', 1) # Fallback when no clean Spotify data
is_album_context = False
logger.warning(f"[Context] Using fallback data - no clean Spotify metadata available, track_number={enhanced_payload['track_number']}")
matched_downloads_context[context_key] = {
"spotify_artist": spotify_artist_context,
"spotify_album": spotify_album_context,
"original_search_result": enhanced_payload,
"is_album_download": is_album_context, # Critical fix: Use actual album context
"has_clean_spotify_data": has_clean_spotify_data, # Flag for post-processing
"task_id": task_id, # Add task_id for completion callbacks
"batch_id": batch_id, # Add batch_id for completion callbacks
"track_info": track_info, # Add track_info for playlist folder mode
"_download_username": username, # Source username for AcoustID skip logic
}
logger.info(f"[Context] Set is_album_download: {is_album_context} (has clean data: {has_clean_spotify_data})")
logger.debug(f"[Debug] Context creation - track_info: {track_info is not None}, playlist_folder_mode: {track_info.get('_playlist_folder_mode', False) if track_info else False}")
# Update task with successful download info
with tasks_lock:
if task_id in download_tasks:
# PHASE 3: Final cancellation check after download started (GUI PARITY)
if download_tasks[task_id]['status'] == 'cancelled':
logger.warning(f"[Modal Worker] Task {task_id} cancelled after download {download_id} started - attempting to cancel download")
# Try to cancel the download immediately
try:
deps.run_async(deps.soulseek_client.cancel_download(download_id, username, remove=True))
logger.warning(f"Successfully cancelled active download {download_id}")
except Exception as cancel_error:
logger.error(f"Failed to cancel active download {download_id}: {cancel_error}")
# Free worker slot
if batch_id:
deps.on_download_completed(batch_id, task_id, success=False)
return False
# Store download information - use real download ID from soulseek_client
# CRITICAL FIX: Trust the download ID returned by soulseek_client.download()
download_tasks[task_id]['download_id'] = download_id
download_tasks[task_id]['username'] = username
download_tasks[task_id]['filename'] = filename
logger.info(f"[Modal Worker] Download started successfully for '{filename}'. Download ID: {download_id}")
return True # Success!
else:
logger.error(f"[Modal Worker] Failed to start download for '{filename}'")
# Reset status back to searching for next attempt
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'searching'
continue
except Exception as e:
import traceback
logger.error(f"[Modal Worker] Error attempting download for '{candidate.filename}': {e}")
traceback.print_exc()
# Reset status back to searching for next attempt
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'searching'
continue
# All candidates failed
logger.error(f"[Modal Worker] All {len(candidates)} candidates failed for '{track.name}'")
return False

@ -0,0 +1,415 @@
"""Tests for core/downloads/candidates.py — candidate fallback download logic."""
from __future__ import annotations
import threading
from dataclasses import dataclass
import pytest
from core.downloads import candidates as dc
from core.runtime_state import (
download_tasks,
matched_context_lock,
matched_downloads_context,
tasks_lock,
)
# ---------------------------------------------------------------------------
# Fixtures + fakes
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_state():
download_tasks.clear()
matched_downloads_context.clear()
yield
download_tasks.clear()
matched_downloads_context.clear()
@dataclass
class _Candidate:
username: str = "user1"
filename: str = "song.flac"
confidence: float = 0.9
size: int = 1000
title: str = "Song"
artist: str = "Artist"
album: str = "Album"
@dataclass
class _Track:
name: str = "Song Title"
album: str = "Album Name"
artists: list = None
id: str = "spt-1"
def __post_init__(self):
if self.artists is None:
self.artists = ["Artist Name"]
class _FakeSoulseek:
def __init__(self, download_id="dl-1"):
self._download_id = download_id
self.download_calls = []
self.cancel_calls = []
async def download(self, username, filename, size):
self.download_calls.append((username, filename, size))
return self._download_id
async def cancel_download(self, download_id, username, remove=True):
self.cancel_calls.append((download_id, username, remove))
class _FakeSpotify:
def __init__(self, track_details=None):
self._track_details = track_details
def get_track_details(self, track_id):
return self._track_details
class _FakeDB:
def __init__(self, blacklisted=None):
self._blacklisted = blacklisted or set()
def is_blacklisted(self, username, filename):
return (username, filename) in self._blacklisted
def _run_async(coro):
"""Drive async functions synchronously for tests."""
import asyncio
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
def _build_deps(
*,
soulseek=None,
spotify=None,
db=None,
update_status=None,
on_complete=None,
):
deps = dc.CandidatesDeps(
soulseek_client=soulseek or _FakeSoulseek(),
spotify_client=spotify or _FakeSpotify(),
run_async=_run_async,
get_database=lambda: db or _FakeDB(),
update_task_status=update_status or (lambda task_id, status: None),
make_context_key=lambda u, f: f"{u}::{f}",
on_download_completed=on_complete or (lambda *a, **kw: None),
)
return deps
def _seed_task(task_id, *, status="pending", track_info=None, used_sources=None,
download_id=None):
download_tasks[task_id] = {
"status": status,
"track_info": track_info or {},
"used_sources": used_sources or set(),
"download_id": download_id,
}
# ---------------------------------------------------------------------------
# Happy path — first candidate succeeds
# ---------------------------------------------------------------------------
def test_first_candidate_starts_download_and_returns_true():
"""High-confidence candidate accepts → download_id stored, context populated, returns True."""
deps = _build_deps()
_seed_task("t1")
candidates = [_Candidate(filename="best.flac", confidence=0.95)]
track = _Track()
result = dc.attempt_download_with_candidates("t1", candidates, track, batch_id="b1", deps=deps)
assert result is True
assert deps.soulseek_client.download_calls == [("user1", "best.flac", 1000)]
assert download_tasks["t1"]["download_id"] == "dl-1"
assert "user1::best.flac" in matched_downloads_context
def test_candidates_tried_in_confidence_order():
"""Multiple candidates → tried highest-confidence first."""
deps = _build_deps()
_seed_task("t2")
candidates = [
_Candidate(filename="low.flac", confidence=0.5),
_Candidate(filename="high.flac", confidence=0.95),
_Candidate(filename="mid.flac", confidence=0.7),
]
track = _Track()
dc.attempt_download_with_candidates("t2", candidates, track, batch_id=None, deps=deps)
# First call should be the highest-confidence one
assert deps.soulseek_client.download_calls[0][1] == "high.flac"
# ---------------------------------------------------------------------------
# used_sources dedupe
# ---------------------------------------------------------------------------
def test_already_tried_source_skipped():
"""Source in used_sources is skipped (no duplicate download attempt)."""
deps = _build_deps()
_seed_task("t3", used_sources={"user1_already.flac"})
candidates = [
_Candidate(filename="already.flac", confidence=0.9),
_Candidate(filename="fresh.flac", confidence=0.85),
]
track = _Track()
dc.attempt_download_with_candidates("t3", candidates, track, batch_id=None, deps=deps)
# First candidate skipped (already used), second one tried
assert len(deps.soulseek_client.download_calls) == 1
assert deps.soulseek_client.download_calls[0][1] == "fresh.flac"
# ---------------------------------------------------------------------------
# Blacklist
# ---------------------------------------------------------------------------
def test_blacklisted_source_skipped():
"""Blacklisted candidate is skipped."""
db = _FakeDB(blacklisted={("user1", "blacklisted.flac")})
deps = _build_deps(db=db)
_seed_task("t4")
candidates = [
_Candidate(filename="blacklisted.flac", confidence=0.95),
_Candidate(filename="ok.flac", confidence=0.85),
]
track = _Track()
dc.attempt_download_with_candidates("t4", candidates, track, batch_id=None, deps=deps)
assert deps.soulseek_client.download_calls[0][1] == "ok.flac"
# ---------------------------------------------------------------------------
# Cancellation paths
# ---------------------------------------------------------------------------
def test_cancellation_before_attempt_returns_false():
"""status=cancelled at top of loop → return False, no download attempted."""
deps = _build_deps()
_seed_task("t5", status="cancelled")
candidates = [_Candidate()]
track = _Track()
result = dc.attempt_download_with_candidates("t5", candidates, track, batch_id=None, deps=deps)
assert result is False
assert deps.soulseek_client.download_calls == []
def test_task_deleted_returns_false():
"""Task removed from download_tasks mid-loop → return False."""
deps = _build_deps()
# Task NOT seeded — looks deleted
candidates = [_Candidate()]
track = _Track()
result = dc.attempt_download_with_candidates("missing", candidates, track, batch_id=None, deps=deps)
assert result is False
def test_active_download_id_skips_new_download():
"""If task already has download_id, candidate skipped (race protection)."""
deps = _build_deps()
_seed_task("t6", download_id="existing-dl")
candidates = [_Candidate(), _Candidate(filename="other.flac")]
track = _Track()
dc.attempt_download_with_candidates("t6", candidates, track, batch_id=None, deps=deps)
# Both candidates skipped (download_id already present)
assert deps.soulseek_client.download_calls == []
def test_cancellation_after_download_starts_calls_cancel_and_lifecycle():
"""If task is cancelled after download_id assigned, cancel_download fires + on_complete(False)."""
completion_calls = []
deps = _build_deps(on_complete=lambda batch_id, task_id, success=None: completion_calls.append((batch_id, task_id, success)))
_seed_task("t7")
candidates = [_Candidate()]
track = _Track()
# Simulate cancel happening between download_id assignment and final lock check.
# update_task_status is the callback that runs RIGHT after the download starts.
# We use it to flip status to cancelled.
def cancel_mid_flight(task_id, status):
if status == "downloading":
with tasks_lock:
if task_id in download_tasks:
pass # status set legitimately
# No-op here; we'll cancel via the lock directly below
download_tasks[task_id]["status"] = "cancelled"
deps.update_task_status = cancel_mid_flight
result = dc.attempt_download_with_candidates("t7", candidates, track, batch_id="b7", deps=deps)
assert result is False
# cancel_download was called for the in-flight transfer
assert deps.soulseek_client.cancel_calls
# on_download_completed fired with success=False to free the worker slot
assert completion_calls == [("b7", "t7", False)]
# ---------------------------------------------------------------------------
# Failure path — all candidates exhausted
# ---------------------------------------------------------------------------
def test_all_candidates_failed_returns_false():
"""If soulseek_client.download returns None (failure) for all candidates, returns False."""
soulseek = _FakeSoulseek(download_id=None)
deps = _build_deps(soulseek=soulseek)
_seed_task("t8")
candidates = [_Candidate(filename="c1.flac"), _Candidate(filename="c2.flac")]
track = _Track()
result = dc.attempt_download_with_candidates("t8", candidates, track, batch_id=None, deps=deps)
assert result is False
# Both candidates were tried
assert len(soulseek.download_calls) == 2
def test_exception_during_download_continues_to_next_candidate():
"""An exception on one candidate → continue to the next."""
call_count = [0]
class _FlakySoulseek(_FakeSoulseek):
async def download(self, username, filename, size):
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError("network blip")
return "dl-2"
soulseek = _FlakySoulseek()
deps = _build_deps(soulseek=soulseek)
_seed_task("t9")
candidates = [_Candidate(filename="c1.flac"), _Candidate(filename="c2.flac")]
track = _Track()
result = dc.attempt_download_with_candidates("t9", candidates, track, batch_id=None, deps=deps)
assert result is True
assert download_tasks["t9"]["download_id"] == "dl-2"
# ---------------------------------------------------------------------------
# Context payload
# ---------------------------------------------------------------------------
def test_explicit_album_context_uses_real_album_data():
"""track_info with _is_explicit_album_download=True copies real album/artist context."""
deps = _build_deps()
explicit_album = {
"id": "alb-real",
"name": "Real Album",
"release_date": "2024-05-05",
"total_tracks": 12,
"total_discs": 2,
"album_type": "album",
"image_url": "http://img/a.jpg",
}
explicit_artist = {"id": "art-real", "name": "Real Artist"}
_seed_task("t10", track_info={
"_is_explicit_album_download": True,
"_explicit_album_context": explicit_album,
"_explicit_artist_context": explicit_artist,
"track_number": 5,
"disc_number": 2,
})
candidates = [_Candidate(filename="explicit.flac")]
track = _Track(album="Real Album", artists=["Real Artist"])
dc.attempt_download_with_candidates("t10", candidates, track, batch_id=None, deps=deps)
ctx = matched_downloads_context["user1::explicit.flac"]
assert ctx["spotify_album"]["id"] == "alb-real"
assert ctx["spotify_album"]["total_discs"] == 2
assert ctx["spotify_artist"]["id"] == "art-real"
assert ctx["is_album_download"] is True
def test_track_number_from_track_info_preferred_over_api():
"""track_number from track_info wins over track object and API."""
api_track = {"track_number": 99, "disc_number": 9, "album": {}}
deps = _build_deps(spotify=_FakeSpotify(track_details=api_track))
_seed_task("t11", track_info={"track_number": 5, "disc_number": 1})
candidates = [_Candidate()]
track = _Track()
dc.attempt_download_with_candidates("t11", candidates, track, batch_id=None, deps=deps)
enhanced = matched_downloads_context["user1::song.flac"]["original_search_result"]
assert enhanced["track_number"] == 5
assert enhanced["disc_number"] == 1
def test_api_backfills_album_context_when_missing():
"""When local album context is incomplete, Spotify API backfills release_date / album_type."""
api_track = {
"track_number": 7,
"disc_number": 1,
"album": {
"id": "alb-from-api",
"release_date": "2025-01-01",
"album_type": "album",
"total_tracks": 10,
"images": [{"url": "http://api/img.jpg"}],
},
}
deps = _build_deps(spotify=_FakeSpotify(track_details=api_track))
# No track_info track_number → triggers API fallback
_seed_task("t12")
candidates = [_Candidate()]
track = _Track(album="Album Name") # truthy but no detailed metadata locally
dc.attempt_download_with_candidates("t12", candidates, track, batch_id=None, deps=deps)
ctx = matched_downloads_context["user1::song.flac"]
# release_date defaults to '' in the fallback context, so backfill fires.
assert ctx["spotify_album"]["release_date"] == "2025-01-01"
# Note: id stays "from_sync_modal" because the fallback assigns a non-empty
# placeholder, and the backfill only fires when `not spotify_album_context.get('id')`.
# The current behavior is what production does — assertion documents that.
assert ctx["spotify_album"]["id"] == "from_sync_modal"
# ---------------------------------------------------------------------------
# Sort by confidence is stable for equal scores
# ---------------------------------------------------------------------------
def test_candidates_with_equal_confidence_both_tried():
"""Equal-confidence candidates are tried in their existing order."""
deps = _build_deps()
_seed_task("t13")
candidates = [
_Candidate(filename="a.flac", confidence=0.9),
_Candidate(filename="b.flac", confidence=0.9),
]
track = _Track()
dc.attempt_download_with_candidates("t13", candidates, track, batch_id=None, deps=deps)
# First one wins — second never tried because download succeeded
assert len(deps.soulseek_client.download_calls) == 1
assert deps.soulseek_client.download_calls[0][1] == "a.flac"

@ -20154,318 +20154,28 @@ def _download_track_worker(task_id, batch_id=None):
def _attempt_download_with_candidates(task_id, candidates, track, batch_id=None):
"""
Attempts to download with fallback candidate logic (matches GUI's retry_parallel_download_with_fallback).
Returns True if successful, False if all candidates fail.
"""
# Sort candidates by confidence (best first)
candidates.sort(key=lambda r: r.confidence, reverse=True)
with tasks_lock:
task = download_tasks.get(task_id)
if not task:
return False
used_sources = task.get('used_sources', set())
# Try each candidate until one succeeds (like GUI's fallback logic)
for candidate_index, candidate in enumerate(candidates):
# Check cancellation before each attempt
with tasks_lock:
if task_id not in download_tasks:
logger.info(f"[Modal Worker] Task {task_id} was deleted during candidate {candidate_index + 1}")
return False
if download_tasks[task_id]['status'] == 'cancelled':
logger.warning(f"[Modal Worker] Task {task_id} cancelled during candidate {candidate_index + 1}")
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
return False
download_tasks[task_id]['current_candidate_index'] = candidate_index
# Create source key to avoid duplicate attempts (like GUI)
source_key = f"{candidate.username}_{candidate.filename}"
if source_key in used_sources:
logger.info(f"[Modal Worker] Skipping already tried source: {source_key}")
continue
# Candidate fallback download logic lives in core/downloads/candidates.py.
from core.downloads import candidates as _downloads_candidates
# Blacklist check — skip sources the user has flagged as bad matches
try:
_bl_db = get_database()
if _bl_db.is_blacklisted(candidate.username, candidate.filename):
logger.info(f"[Modal Worker] Skipping blacklisted source: {source_key}")
continue
except Exception:
pass
# CRITICAL: Add source to used_sources IMMEDIATELY to prevent race conditions
# This must happen BEFORE starting download to prevent multiple retries from picking same source
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['used_sources'].add(source_key)
logger.info(f"[Modal Worker] Marked source as used before download attempt: {source_key}")
logger.info(f"[Modal Worker] Trying candidate {candidate_index + 1}/{len(candidates)}: {candidate.filename} (Confidence: {candidate.confidence:.2f})")
try:
# Update task status to downloading
_update_task_status(task_id, 'downloading')
# Prepare download - check if we have explicit album context from artist page
track_info = {}
with tasks_lock:
if task_id in download_tasks:
raw_track_info = download_tasks[task_id].get('track_info')
track_info = raw_track_info if isinstance(raw_track_info, dict) else {}
# Use explicit album/artist context if available (from artist album downloads)
has_explicit_context = track_info and track_info.get('_is_explicit_album_download', False)
if has_explicit_context:
# Use the real Spotify album/artist data from the UI
explicit_album = track_info.get('_explicit_album_context', {})
explicit_artist = track_info.get('_explicit_artist_context', {})
# Normalize artist context if it's a plain string (e.g. from wishlist spotify_data)
if isinstance(explicit_artist, str):
explicit_artist = {'name': explicit_artist}
spotify_artist_context = {
'id': explicit_artist.get('id', 'explicit_artist'),
'name': explicit_artist.get('name', track.artists[0] if track.artists else 'Unknown'),
'genres': explicit_artist.get('genres', [])
}
# Handle both image_url formats (direct string or images array)
album_image_url = None
if explicit_album.get('image_url'):
# Backend API returns image_url as direct string
album_image_url = explicit_album.get('image_url')
elif explicit_album.get('images'):
# Fallback: images array format from Spotify API
album_image_url = explicit_album.get('images', [{}])[0].get('url')
spotify_album_context = {
'id': explicit_album.get('id', 'explicit_album'),
'name': explicit_album.get('name', track.album),
'release_date': explicit_album.get('release_date', ''),
'image_url': album_image_url,
'total_tracks': explicit_album.get('total_tracks', 0),
'total_discs': explicit_album.get('total_discs', 1),
'album_type': explicit_album.get('album_type', 'album'),
'artists': explicit_album.get('artists', [{'name': spotify_artist_context.get('name', '')}])
}
logger.info(f"[Explicit Context] Using real album data: '{spotify_album_context['name']}' ({spotify_album_context['album_type']}, {spotify_album_context['total_discs']} disc(s))")
else:
# Fallback to generic context for playlists/wishlists
# Extract album metadata from track_info if available (discovery enriches tracks with full album objects)
fallback_album = track_info.get('album', {}) if track_info else {}
if isinstance(fallback_album, str):
fallback_album = {'name': fallback_album}
elif not isinstance(fallback_album, dict):
fallback_album = {}
fallback_image_url = None
fallback_images = fallback_album.get('images', [])
if fallback_album.get('image_url'):
fallback_image_url = fallback_album['image_url']
elif fallback_images and isinstance(fallback_images, list) and len(fallback_images) > 0:
fallback_image_url = fallback_images[0].get('url') if isinstance(fallback_images[0], dict) else None
spotify_artist_context = {'id': 'from_sync_modal', 'name': track.artists[0] if track.artists else 'Unknown', 'genres': []}
# Preserve album-level artists for consistent folder naming
_fallback_album_artists = fallback_album.get('artists', [])
if not _fallback_album_artists:
_fallback_album_artists = [{'name': track.artists[0]}] if track.artists else []
spotify_album_context = {
'id': fallback_album.get('id', 'from_sync_modal'),
'name': fallback_album.get('name', '') or track.album,
'release_date': fallback_album.get('release_date', ''),
'image_url': fallback_image_url,
'album_type': fallback_album.get('album_type', 'album'),
'total_tracks': fallback_album.get('total_tracks', 0),
'total_discs': fallback_album.get('total_discs', 1),
'artists': _fallback_album_artists
}
download_payload = candidate.__dict__
username = download_payload.get('username')
filename = download_payload.get('filename')
size = download_payload.get('size', 0)
if not username or not filename:
logger.error("[Modal Worker] Invalid candidate data: missing username or filename")
continue
# PROTECTION: Check if there's already an active download for this task
current_download_id = None
with tasks_lock:
if task_id in download_tasks:
current_download_id = download_tasks[task_id].get('download_id')
if current_download_id:
logger.info(f"[Modal Worker] Task {task_id} already has active download {current_download_id} - skipping new download attempt")
logger.info("[Modal Worker] This prevents race condition where multiple retries start overlapping downloads")
continue
# Initiate download
logger.info(f"[Modal Worker] Starting download: {username} / {os.path.basename(filename)}")
download_id = run_async(soulseek_client.download(username, filename, size))
if download_id:
# Store context for post-processing with complete Spotify metadata (GUI PARITY)
context_key = _make_context_key(username, filename)
with matched_context_lock:
# Create WebUI equivalent of GUI's SpotifyBasedSearchResult data structure
enhanced_payload = download_payload.copy()
# Extract clean Spotify metadata from track object (same as GUI)
has_clean_spotify_data = track and hasattr(track, 'name') and hasattr(track, 'album')
if has_clean_spotify_data:
# Use clean Spotify metadata (matches GUI's SpotifyBasedSearchResult)
enhanced_payload['spotify_clean_title'] = track.name
enhanced_payload['spotify_clean_album'] = track.album
enhanced_payload['spotify_clean_artist'] = track.artists[0] if track.artists else enhanced_payload.get('artist', '')
# Preserve all artists for metadata tagging
enhanced_payload['artists'] = [{'name': artist} for artist in track.artists] if track.artists else []
logger.info(f"[Context] Using clean Spotify metadata - Album: '{track.album}', Title: '{track.name}'")
# Get track_number and disc_number — prefer track data we already have,
# fall back to detailed API call only if needed
got_track_number = False
# 1. Try track_info (from frontend, has album track data)
tn = track_info.get('track_number', 0) if isinstance(track_info, dict) else 0
dn = track_info.get('disc_number', 1) if isinstance(track_info, dict) else 1
if tn and tn > 0:
enhanced_payload['track_number'] = tn
enhanced_payload['disc_number'] = dn
got_track_number = True
logger.info(f"[Context] Added track_number from track_info: {tn}, disc_number: {dn}")
# 2. Try the track object itself (from album tracks response)
if not got_track_number and hasattr(track, 'track_number') and track.track_number:
enhanced_payload['track_number'] = track.track_number
enhanced_payload['disc_number'] = getattr(track, 'disc_number', 1) or 1
got_track_number = True
logger.info(f"[Context] Added track_number from track object: {track.track_number}, disc_number: {enhanced_payload['disc_number']}")
# 3. Last resort — fetch from metadata source API
if not got_track_number and hasattr(track, 'id') and track.id:
try:
detailed_track = spotify_client.get_track_details(track.id)
if detailed_track and detailed_track.get('track_number'):
enhanced_payload['track_number'] = detailed_track['track_number']
enhanced_payload['disc_number'] = detailed_track.get('disc_number', 1)
got_track_number = True
logger.info(f"[Context] Added track_number from API: {detailed_track['track_number']}, disc_number: {enhanced_payload['disc_number']}")
# Backfill album metadata from detailed track when context
# has incomplete data (missing release_date, total_tracks, etc.)
if isinstance(detailed_track.get('album'), dict):
dt_album = detailed_track['album']
if not spotify_album_context.get('release_date') and dt_album.get('release_date'):
spotify_album_context['release_date'] = dt_album['release_date']
logger.info(f"[Context] Backfilled release_date from API: {dt_album['release_date']}")
if not spotify_album_context.get('album_type') and dt_album.get('album_type'):
spotify_album_context['album_type'] = dt_album['album_type']
if not spotify_album_context.get('total_tracks') and dt_album.get('total_tracks'):
spotify_album_context['total_tracks'] = dt_album['total_tracks']
if not spotify_album_context.get('id') and dt_album.get('id'):
spotify_album_context['id'] = dt_album['id']
if not spotify_album_context.get('image_url') and dt_album.get('images'):
spotify_album_context['image_url'] = dt_album['images'][0].get('url', '')
except Exception as e:
logger.error(f"[Context] API track details failed: {e}")
def _build_candidates_deps():
"""Build the CandidatesDeps bundle from web_server.py globals on each call."""
return _downloads_candidates.CandidatesDeps(
soulseek_client=soulseek_client,
spotify_client=spotify_client,
run_async=run_async,
get_database=get_database,
update_task_status=_update_task_status,
make_context_key=_make_context_key,
on_download_completed=_on_download_completed,
)
if not got_track_number:
enhanced_payload.setdefault('track_number', 0)
enhanced_payload.setdefault('disc_number', 1)
logger.warning("[Context] No track_number found from any source")
# Determine if this should be treated as album download
# First check if we have explicit album context from artist page
if has_explicit_context:
is_album_context = True
logger.info("[Context] Using explicit album context flag from artist page")
else:
# Fall back to guessing based on clean data
is_album_context = (
track.album and
track.album.strip() and
track.album != "Unknown Album" and
track.album.lower() != track.name.lower() # Album different from track
)
else:
# Fallback to original data
enhanced_payload['spotify_clean_title'] = enhanced_payload.get('title', '')
enhanced_payload['spotify_clean_album'] = enhanced_payload.get('album', '')
enhanced_payload['spotify_clean_artist'] = enhanced_payload.get('artist', '')
# Preserve existing artists array if available, otherwise create from single artist
if 'artists' not in enhanced_payload and enhanced_payload.get('artist'):
enhanced_payload['artists'] = [{'name': enhanced_payload['artist']}]
enhanced_payload['track_number'] = track_info.get('track_number', 1) # Fallback when no clean Spotify data
is_album_context = False
logger.warning(f"[Context] Using fallback data - no clean Spotify metadata available, track_number={enhanced_payload['track_number']}")
matched_downloads_context[context_key] = {
"spotify_artist": spotify_artist_context,
"spotify_album": spotify_album_context,
"original_search_result": enhanced_payload,
"is_album_download": is_album_context, # Critical fix: Use actual album context
"has_clean_spotify_data": has_clean_spotify_data, # Flag for post-processing
"task_id": task_id, # Add task_id for completion callbacks
"batch_id": batch_id, # Add batch_id for completion callbacks
"track_info": track_info, # Add track_info for playlist folder mode
"_download_username": username, # Source username for AcoustID skip logic
}
logger.info(f"[Context] Set is_album_download: {is_album_context} (has clean data: {has_clean_spotify_data})")
logger.debug(f"[Debug] Context creation - track_info: {track_info is not None}, playlist_folder_mode: {track_info.get('_playlist_folder_mode', False) if track_info else False}")
# Update task with successful download info
with tasks_lock:
if task_id in download_tasks:
# PHASE 3: Final cancellation check after download started (GUI PARITY)
if download_tasks[task_id]['status'] == 'cancelled':
logger.warning(f"[Modal Worker] Task {task_id} cancelled after download {download_id} started - attempting to cancel download")
# Try to cancel the download immediately
try:
run_async(soulseek_client.cancel_download(download_id, username, remove=True))
logger.warning(f"Successfully cancelled active download {download_id}")
except Exception as cancel_error:
logger.error(f"Failed to cancel active download {download_id}: {cancel_error}")
# Free worker slot
if batch_id:
_on_download_completed(batch_id, task_id, success=False)
return False
# Store download information - use real download ID from soulseek_client
# CRITICAL FIX: Trust the download ID returned by soulseek_client.download()
download_tasks[task_id]['download_id'] = download_id
download_tasks[task_id]['username'] = username
download_tasks[task_id]['filename'] = filename
logger.info(f"[Modal Worker] Download started successfully for '{filename}'. Download ID: {download_id}")
return True # Success!
else:
logger.error(f"[Modal Worker] Failed to start download for '{filename}'")
# Reset status back to searching for next attempt
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'searching'
continue
except Exception as e:
import traceback
logger.error(f"[Modal Worker] Error attempting download for '{candidate.filename}': {e}")
traceback.print_exc()
# Reset status back to searching for next attempt
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'searching'
continue
def _attempt_download_with_candidates(task_id, candidates, track, batch_id=None):
return _downloads_candidates.attempt_download_with_candidates(
task_id, candidates, track, batch_id, _build_candidates_deps()
)
# All candidates failed
logger.error(f"[Modal Worker] All {len(candidates)} candidates failed for '{track.name}'")
return False
# ── Staging folder match cache (per-batch, avoids re-scanning for every track) ──
_staging_cache = {} # batch_id -> list of {full_path, title, artist, album, extension}

Loading…
Cancel
Save