|
|
"""Per-task download worker.
|
|
|
|
|
|
Runs as a background thread (one per task) that:
|
|
|
1. Tries source-reuse (use the batch's last good slskd peer if available)
|
|
|
2. Tries staging-match (file already in staging folder, no download needed)
|
|
|
3. Generates smart search queries via the matching engine + legacy fallbacks
|
|
|
4. Iterates queries sequentially against the soulseek client
|
|
|
5. For each query: validates results, attempts download with fallback candidates
|
|
|
6. If hybrid mode: falls back to remaining sources (youtube/tidal/qobuz/hifi/deezer_dl)
|
|
|
7. On total failure: marks task not_found + records search diagnostics
|
|
|
8. On any uncaught exception: marks failed + emergency worker-slot recovery
|
|
|
|
|
|
Lifted verbatim from web_server.py's `_download_track_worker`. The helpers
|
|
|
this calls into (try_source_reuse, store_batch_source, try_staging_match,
|
|
|
get_valid_candidates, attempt_download_with_candidates, on_download_completed,
|
|
|
recover_worker_slot) are passed via `TaskWorkerDeps` since each is itself
|
|
|
a large web_server.py helper that will get its own lift in subsequent PRs.
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import re
|
|
|
import traceback
|
|
|
from dataclasses import dataclass
|
|
|
from typing import Any, Callable, Optional
|
|
|
|
|
|
from core.runtime_state import download_batches, download_tasks, tasks_lock
|
|
|
from core.spotify_client import Track as SpotifyTrack
|
|
|
from utils.logging_config import get_logger
|
|
|
|
|
|
# Must live under the soulsync.* namespace — handlers only attach there. The
|
|
|
# old bare getLogger(__name__) ("core.downloads.task_worker") had no handler,
|
|
|
# so the entire [Modal Worker] story — search queries, retry walks, candidate
|
|
|
# decisions — never reached app.log.
|
|
|
logger = get_logger("downloads.task_worker")
|
|
|
|
|
|
|
|
|
def _resolve_worker_source(username):
|
|
|
"""Logical source bucket for a candidate's username (Soulseek peers all
|
|
|
collapse to 'soulseek'; streaming sources keep their name). Mirrors the
|
|
|
monitor's resolver — imported lazily to avoid an import cycle."""
|
|
|
try:
|
|
|
from core.downloads.monitor import _resolve_download_source
|
|
|
return _resolve_download_source(username)
|
|
|
except Exception:
|
|
|
return 'soulseek'
|
|
|
|
|
|
|
|
|
def _cand_user_file(candidate):
|
|
|
"""Read (username, filename) from a candidate that may be a TrackResult
|
|
|
object or a plain dict (tests / cached raw rows)."""
|
|
|
if isinstance(candidate, dict):
|
|
|
return candidate.get('username'), candidate.get('filename')
|
|
|
return getattr(candidate, 'username', None), getattr(candidate, 'filename', None)
|
|
|
|
|
|
|
|
|
def _try_cached_candidates(task_id, batch_id, track, deps):
|
|
|
"""Quarantine-retry fast path: attempt the already-found candidates before
|
|
|
re-searching anything.
|
|
|
|
|
|
When a verified-bad file is re-queued, the connection was fine (the file
|
|
|
downloaded, it was just the wrong/broken content) — so the next-best pick is
|
|
|
almost always already sitting in ``cached_candidates``. Walk those (skipping
|
|
|
sources already tried or budget-exhausted) and hand them to the normal
|
|
|
download path. Returns True if a download was started; False to fall through
|
|
|
to a fresh search (which only happens for a not-yet-searched source).
|
|
|
"""
|
|
|
with tasks_lock:
|
|
|
task = download_tasks.get(task_id)
|
|
|
if not task:
|
|
|
return False
|
|
|
cached = list(task.get('cached_candidates') or [])
|
|
|
used = set(task.get('used_sources') or ())
|
|
|
exhausted = {str(s).lower() for s in (task.get('exhausted_download_sources') or ())}
|
|
|
|
|
|
remaining = []
|
|
|
for c in cached:
|
|
|
uname, fname = _cand_user_file(c)
|
|
|
if not uname or not fname:
|
|
|
continue
|
|
|
if f"{uname}_{fname}" in used:
|
|
|
continue
|
|
|
if _resolve_worker_source(uname).lower() in exhausted:
|
|
|
continue
|
|
|
remaining.append(c)
|
|
|
|
|
|
if not remaining:
|
|
|
return False
|
|
|
|
|
|
logger.info(
|
|
|
f"[Modal Worker] Quarantine retry: trying {len(remaining)} cached "
|
|
|
f"candidate(s) before re-searching (task {task_id})"
|
|
|
)
|
|
|
return deps.attempt_download_with_candidates(task_id, remaining, track, batch_id)
|
|
|
|
|
|
|
|
|
def _private_album_bundle_staging_miss_reason(batch_id: Optional[str], deps: Any) -> Optional[str]:
|
|
|
"""Return a user-facing miss reason when per-track search should stop.
|
|
|
|
|
|
Torrent / usenet album batches first download one private staged release,
|
|
|
then each track claims the matching staged file. If that claim fails after
|
|
|
the release is already staged, falling through to the normal per-track
|
|
|
search only retries release-level sources N times and can keep re-adding
|
|
|
the same torrent/NZB. For those two sources we treat the staged release as
|
|
|
authoritative for this pass.
|
|
|
|
|
|
Soulseek is deliberately NOT short-circuited. A Soulseek album bundle stages
|
|
|
whichever single folder scored best, and ``album_bundle_partial`` only
|
|
|
reflects whether the files found IN that folder downloaded — not whether the
|
|
|
folder actually contained every track the album needs. So a track the album
|
|
|
needs but that wasn't in the chosen folder would otherwise be marked
|
|
|
not_found with no fallback (#743). Unlike torrent/usenet, Soulseek per-track
|
|
|
search is a genuine per-file network search — it doesn't re-add a release —
|
|
|
so letting these misses fall through to the normal per-track flow (and, in
|
|
|
hybrid mode, onward to the next source) is correct and cheap.
|
|
|
"""
|
|
|
if not batch_id:
|
|
|
return None
|
|
|
|
|
|
batch = download_batches.get(batch_id)
|
|
|
if not isinstance(batch, dict):
|
|
|
return None
|
|
|
|
|
|
source = (batch.get('album_bundle_source') or '').lower()
|
|
|
mode = (getattr(deps.download_orchestrator, 'mode', '') or '').lower()
|
|
|
hybrid_first = ''
|
|
|
if mode == 'hybrid':
|
|
|
order = getattr(deps.download_orchestrator, 'hybrid_order', None) or []
|
|
|
if order:
|
|
|
hybrid_first = str(order[0] or '').lower()
|
|
|
else:
|
|
|
hybrid_first = str(getattr(deps.download_orchestrator, 'hybrid_primary', '') or '').lower()
|
|
|
if (
|
|
|
batch.get('album_bundle_private_staging')
|
|
|
and batch.get('album_bundle_state') == 'staged'
|
|
|
and not batch.get('album_bundle_partial')
|
|
|
and source in ('torrent', 'usenet')
|
|
|
and (mode == source or (mode == 'hybrid' and hybrid_first == source))
|
|
|
):
|
|
|
return f'Track was not found in the staged {source} album release'
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class TaskWorkerDeps:
|
|
|
"""Bundle of cross-cutting deps the per-task download worker needs."""
|
|
|
download_orchestrator: Any
|
|
|
matching_engine: Any
|
|
|
run_async: Callable
|
|
|
try_source_reuse: Callable # (task_id, batch_id, track) -> bool
|
|
|
store_batch_source: Callable # (batch_id, username, filename) -> None
|
|
|
try_staging_match: Callable # (task_id, batch_id, track) -> bool
|
|
|
get_valid_candidates: Callable # (results, spotify_track, query) -> list
|
|
|
attempt_download_with_candidates: Callable # (task_id, candidates, track, batch_id) -> bool
|
|
|
on_download_completed: Callable # (batch_id, task_id, success) -> None
|
|
|
recover_worker_slot: Callable # (batch_id, task_id) -> None
|
|
|
try_version_mismatch_fallback: Optional[Callable] = None # (title, artist, task_id, batch_id) -> bool
|
|
|
|
|
|
|
|
|
def download_track_worker(task_id: str, batch_id: Optional[str], deps: TaskWorkerDeps) -> None:
|
|
|
"""Enhanced download worker that matches the GUI's exact retry logic.
|
|
|
|
|
|
Implements sequential query retry, fallback candidates, and download
|
|
|
failure retry.
|
|
|
"""
|
|
|
try:
|
|
|
# Retrieve task details from global state
|
|
|
with tasks_lock:
|
|
|
if task_id not in download_tasks:
|
|
|
logger.warning(f"[Modal Worker] Task {task_id} not found in download_tasks")
|
|
|
return
|
|
|
task = download_tasks[task_id].copy()
|
|
|
|
|
|
# Cancellation Checkpoint 1: Before doing anything
|
|
|
with tasks_lock:
|
|
|
if task_id not in download_tasks:
|
|
|
logger.info(f"[Modal Worker] Task {task_id} was deleted before starting")
|
|
|
return
|
|
|
if download_tasks[task_id]['status'] == 'cancelled':
|
|
|
logger.warning(f"[Modal Worker] Task {task_id} cancelled before starting")
|
|
|
# V2 FIX: Don't call _on_download_completed for cancelled V2 tasks
|
|
|
# V2 system handles worker slot freeing in atomic cancel function
|
|
|
task_playlist_id = download_tasks[task_id].get('playlist_id')
|
|
|
if task_playlist_id:
|
|
|
logger.warning(f"[Modal Worker] V2 task {task_id} cancelled - worker slot already freed by V2 system")
|
|
|
return # V2 system already handled worker slot management
|
|
|
elif batch_id:
|
|
|
# Legacy system - use old completion callback
|
|
|
logger.warning(f"[Modal Worker] Legacy task {task_id} cancelled - using legacy completion callback")
|
|
|
deps.on_download_completed(batch_id, task_id, False)
|
|
|
return
|
|
|
|
|
|
track_data = task['track_info']
|
|
|
track_name = track_data.get('name', 'Unknown Track')
|
|
|
|
|
|
logger.info(f"[Modal Worker] Task {task_id} starting search for track: '{track_name}'")
|
|
|
|
|
|
# Recreate a SpotifyTrack object for the matching engine
|
|
|
# Handle both string format and Spotify API format for artists
|
|
|
raw_artists = track_data.get('artists', [])
|
|
|
processed_artists = []
|
|
|
for artist in raw_artists:
|
|
|
if isinstance(artist, str):
|
|
|
processed_artists.append(artist)
|
|
|
elif isinstance(artist, dict) and 'name' in artist:
|
|
|
processed_artists.append(artist['name'])
|
|
|
else:
|
|
|
processed_artists.append(str(artist))
|
|
|
|
|
|
# Handle album field - extract name if it's a dictionary
|
|
|
raw_album = track_data.get('album', '')
|
|
|
if isinstance(raw_album, dict) and 'name' in raw_album:
|
|
|
album_name = raw_album['name']
|
|
|
elif isinstance(raw_album, str):
|
|
|
album_name = raw_album
|
|
|
else:
|
|
|
album_name = str(raw_album)
|
|
|
|
|
|
track = SpotifyTrack(
|
|
|
id=track_data.get('id', ''),
|
|
|
name=track_data.get('name', ''),
|
|
|
artists=processed_artists,
|
|
|
album=album_name,
|
|
|
duration_ms=track_data.get('duration_ms', 0),
|
|
|
popularity=track_data.get('popularity', 0),
|
|
|
)
|
|
|
logger.info(f"[Modal Worker] Starting download task for: {track.name} by {track.artists[0] if track.artists else 'Unknown'}")
|
|
|
|
|
|
# === SOURCE REUSE: Check batch's last good source before searching ===
|
|
|
if deps.try_source_reuse(task_id, batch_id, track):
|
|
|
# Store source for next worker (cascading reuse)
|
|
|
with tasks_lock:
|
|
|
used_filename = download_tasks.get(task_id, {}).get('filename')
|
|
|
used_username = download_tasks.get(task_id, {}).get('username')
|
|
|
if used_filename and used_username:
|
|
|
deps.store_batch_source(batch_id, used_username, used_filename)
|
|
|
return
|
|
|
|
|
|
# === STAGING CHECK: Check staging folder for existing file before searching ===
|
|
|
if deps.try_staging_match(task_id, batch_id, track):
|
|
|
return
|
|
|
staging_miss_reason = _private_album_bundle_staging_miss_reason(batch_id, deps)
|
|
|
if staging_miss_reason:
|
|
|
logger.warning(
|
|
|
"[Modal Worker] %s for '%s'; skipping redundant per-track %s search",
|
|
|
staging_miss_reason,
|
|
|
track.name,
|
|
|
getattr(deps.download_orchestrator, 'mode', 'release-source'),
|
|
|
)
|
|
|
with tasks_lock:
|
|
|
if task_id in download_tasks:
|
|
|
download_tasks[task_id]['status'] = 'not_found'
|
|
|
download_tasks[task_id]['error_message'] = staging_miss_reason
|
|
|
if batch_id:
|
|
|
deps.on_download_completed(batch_id, task_id, False)
|
|
|
return
|
|
|
|
|
|
# Initialize task state tracking (like GUI's parallel_search_tracking)
|
|
|
with tasks_lock:
|
|
|
if task_id in download_tasks:
|
|
|
download_tasks[task_id]['status'] = 'searching' # Now actively being processed
|
|
|
download_tasks[task_id]['current_query_index'] = 0
|
|
|
download_tasks[task_id]['current_candidate_index'] = 0
|
|
|
download_tasks[task_id]['retry_count'] = 0
|
|
|
download_tasks[task_id]['candidates'] = []
|
|
|
# CRITICAL: Preserve used_sources from previous retry attempts (don't reset to empty set)
|
|
|
# If this is a retry, the monitor will have already marked failed sources
|
|
|
if 'used_sources' not in download_tasks[task_id]:
|
|
|
download_tasks[task_id]['used_sources'] = set()
|
|
|
# Else: keep existing used_sources to avoid retrying same failed hosts
|
|
|
|
|
|
# Cached-first quarantine retry. The monitor sets ``_quarantine_retry``
|
|
|
# when a verified-bad file is re-queued; in that case we walk the
|
|
|
# already-found candidates before re-searching (the connection was fine,
|
|
|
# just the content was wrong). A NON-quarantine entry (fresh download, or
|
|
|
# the monitor's dead-connection/stuck retry) instead starts a new search
|
|
|
# generation: clear the searched-source memory so each source can be
|
|
|
# searched fresh again.
|
|
|
with tasks_lock:
|
|
|
_t = download_tasks.get(task_id, {})
|
|
|
is_quarantine_retry = bool(_t.pop('_quarantine_retry', False))
|
|
|
if not is_quarantine_retry:
|
|
|
_t.pop('searched_queries', None)
|
|
|
if is_quarantine_retry and _try_cached_candidates(task_id, batch_id, track, deps):
|
|
|
with tasks_lock:
|
|
|
used_filename = download_tasks.get(task_id, {}).get('filename')
|
|
|
used_username = download_tasks.get(task_id, {}).get('username')
|
|
|
if used_filename and used_username:
|
|
|
deps.store_batch_source(batch_id, used_username, used_filename)
|
|
|
return
|
|
|
|
|
|
# 1. Generate multiple search queries (like GUI's generate_smart_search_queries)
|
|
|
artist_name = track.artists[0] if track.artists else None
|
|
|
track_name = track.name
|
|
|
|
|
|
release_queries = []
|
|
|
try:
|
|
|
_download_mode = (getattr(deps.download_orchestrator, 'mode', '') or '').lower()
|
|
|
_track_album = (getattr(track, 'album', '') or '').strip()
|
|
|
_track_title = (getattr(track, 'name', '') or '').strip()
|
|
|
_track_artists = list(getattr(track, 'artists', []) or [])
|
|
|
_first_artist = _track_artists[0] if _track_artists else ''
|
|
|
_primary_artist = (
|
|
|
(_first_artist.get('name', '') if isinstance(_first_artist, dict) else str(_first_artist))
|
|
|
or ''
|
|
|
).strip()
|
|
|
if (
|
|
|
_download_mode in ('torrent', 'usenet')
|
|
|
and _primary_artist
|
|
|
and _track_album
|
|
|
and _track_album.lower() not in ('unknown album', _track_title.lower())
|
|
|
):
|
|
|
release_queries.append(f"{_primary_artist} {_track_album}".strip())
|
|
|
except Exception as _release_query_exc:
|
|
|
logger.debug("[Modal Worker] release query hint failed: %s", _release_query_exc)
|
|
|
|
|
|
# Start with matching engine queries
|
|
|
search_queries = deps.matching_engine.generate_download_queries(track)
|
|
|
|
|
|
# Add legacy fallback queries (like GUI does)
|
|
|
legacy_queries = []
|
|
|
|
|
|
if artist_name:
|
|
|
# Add first word of artist approach (legacy compatibility)
|
|
|
artist_words = artist_name.split()
|
|
|
if artist_words:
|
|
|
first_word = artist_words[0]
|
|
|
if first_word.lower() == 'the' and len(artist_words) > 1:
|
|
|
first_word = artist_words[1]
|
|
|
|
|
|
if len(first_word) > 1:
|
|
|
legacy_queries.append(f"{track_name} {first_word}".strip())
|
|
|
|
|
|
# Add track-only query
|
|
|
if track_name.strip():
|
|
|
legacy_queries.append(track_name.strip())
|
|
|
|
|
|
# Add traditional cleaned queries
|
|
|
cleaned_name = re.sub(r'\s*\([^)]*\)', '', track_name).strip()
|
|
|
cleaned_name = re.sub(r'\s*\[[^\]]*\]', '', cleaned_name).strip()
|
|
|
|
|
|
if cleaned_name and cleaned_name.lower() != track_name.lower():
|
|
|
legacy_queries.append(cleaned_name.strip())
|
|
|
|
|
|
# Combine enhanced queries with legacy fallbacks.
|
|
|
#
|
|
|
# Torrent / usenet can use full album releases as a fallback for
|
|
|
# single-track requests, but trying the album release first makes
|
|
|
# playlist batches download whole albums before checking whether a
|
|
|
# track-shaped release exists. Keep release queries last so singles
|
|
|
# stay light when the indexer has a direct result.
|
|
|
all_queries = search_queries + legacy_queries + release_queries
|
|
|
|
|
|
# Remove duplicates while preserving order
|
|
|
unique_queries = []
|
|
|
seen = set()
|
|
|
for query in all_queries:
|
|
|
if query and query.lower() not in seen:
|
|
|
unique_queries.append(query)
|
|
|
seen.add(query.lower())
|
|
|
|
|
|
search_queries = unique_queries
|
|
|
# Expose the query count so the quarantine-retry budget (exhaustive mode)
|
|
|
# can size each source's budget as query_count × retries_per_query.
|
|
|
with tasks_lock:
|
|
|
if task_id in download_tasks:
|
|
|
download_tasks[task_id]['query_count'] = len(search_queries)
|
|
|
logger.info(f"[Modal Worker] Generated {len(search_queries)} smart search queries for '{track.name}': {search_queries}")
|
|
|
logger.info(f"[Modal Worker] About to start search loop for task {task_id} (track: '{track.name}')")
|
|
|
|
|
|
# 2. Sequential Query Search (matches GUI's start_search_worker_parallel logic)
|
|
|
search_diagnostics = [] # Track what happened per query for detailed error messages
|
|
|
all_raw_results = [] # Collect raw results across queries for candidate review modal
|
|
|
# Sources whose per-source quarantine-retry budget is spent (exhaustive
|
|
|
# mode). The monitor sets this when a source gives up; we exclude those
|
|
|
# sources from the hybrid search so the chain falls through to the next
|
|
|
# source instead of re-fetching the same exhausted one (e.g. Soulseek
|
|
|
# keeps returning fresh wrong peers — once its budget is gone, switch to
|
|
|
# HiFi/Tidal/…). See monitor.requeue_quarantined_task_for_retry.
|
|
|
#
|
|
|
# On a quarantine retry we do NOT exclude a source just because it was
|
|
|
# searched once: the first run only ran ONE query before starting a
|
|
|
# download, so the later queries (e.g. "artist + album") have never hit
|
|
|
# that source yet and may surface the correct upload. Instead we remember
|
|
|
# which QUERIES already ran (``searched_queries``) and skip re-running
|
|
|
# only those — their candidates are walked via the cached-first path
|
|
|
# above. The not-yet-searched queries still search the same source, so
|
|
|
# every query is exhausted per source before the chain switches sources.
|
|
|
# Fresh / dead-connection runs cleared searched_queries above, so they
|
|
|
# search everything again.
|
|
|
with tasks_lock:
|
|
|
_t = download_tasks.get(task_id, {})
|
|
|
_exhausted_sources = [str(s) for s in (_t.get('exhausted_download_sources') or ())]
|
|
|
_searched_queries = (
|
|
|
set(_t.get('searched_queries') or ()) if is_quarantine_retry else set()
|
|
|
)
|
|
|
for query_index, query in enumerate(search_queries):
|
|
|
# Cancellation check before each query
|
|
|
with tasks_lock:
|
|
|
if task_id not in download_tasks:
|
|
|
logger.debug(f"[Modal Worker] Task {task_id} was deleted during query {query_index + 1}")
|
|
|
return
|
|
|
if download_tasks[task_id]['status'] == 'cancelled':
|
|
|
logger.debug(f"[Modal Worker] Task {task_id} cancelled during query {query_index + 1}")
|
|
|
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
|
|
|
return
|
|
|
download_tasks[task_id]['current_query_index'] = query_index
|
|
|
|
|
|
# Cached-first: a query already run last generation has its candidates
|
|
|
# sitting in cache (walked above) — re-searching it is the wasteful
|
|
|
# repeat the cached-first design removes. Skip it; the not-yet-run
|
|
|
# queries below still search this source.
|
|
|
if is_quarantine_retry and query in _searched_queries:
|
|
|
logger.debug(
|
|
|
f"[Modal Worker] Skipping already-searched query '{query}' "
|
|
|
f"(candidates served from cache) for task {task_id}"
|
|
|
)
|
|
|
continue
|
|
|
|
|
|
logger.debug(f"[Modal Worker] Query {query_index + 1}/{len(search_queries)}: '{query}'")
|
|
|
logger.debug(f"About to call soulseek search for task {task_id}")
|
|
|
|
|
|
try:
|
|
|
# Hybrid + album-context batches must skip torrent / usenet during
|
|
|
# the per-track loop — they're release-level sources, can't match
|
|
|
# individual tracks meaningfully, and album-bundle handling only
|
|
|
# fires in single-source mode (see core/downloads/master.py). The
|
|
|
# exclusion lets the hybrid chain fall through to per-track-
|
|
|
# compatible sources (soulseek / streaming) instead of attempting
|
|
|
# N redundant Prowlarr searches that all download the same album
|
|
|
# torrent and rely on the auto-import sweep to clean up.
|
|
|
_exclude_for_hybrid_album = None
|
|
|
try:
|
|
|
_batch_is_album = False
|
|
|
if batch_id:
|
|
|
from core.runtime_state import download_batches as _db
|
|
|
_b = _db.get(batch_id)
|
|
|
if isinstance(_b, dict):
|
|
|
_batch_is_album = bool(_b.get('is_album_download'))
|
|
|
if _batch_is_album and getattr(deps.download_orchestrator, 'mode', '') == 'hybrid':
|
|
|
_exclude_for_hybrid_album = ['torrent', 'usenet']
|
|
|
except Exception as _exc_filter_err:
|
|
|
logger.debug("[Modal Worker] album-source-exclusion check failed: %s", _exc_filter_err)
|
|
|
# Fold in budget-exhausted sources (per-source quarantine retry).
|
|
|
_exclude_sources = list(_exhausted_sources)
|
|
|
if _exclude_for_hybrid_album:
|
|
|
_exclude_sources.extend(_exclude_for_hybrid_album)
|
|
|
# Perform search with timeout
|
|
|
tracks_result, _ = deps.run_async(deps.download_orchestrator.search(
|
|
|
query, timeout=30, exclude_sources=_exclude_sources or None,
|
|
|
))
|
|
|
logger.debug(f"Search completed for task {task_id}, got {len(tracks_result) if tracks_result else 0} results")
|
|
|
|
|
|
# CRITICAL: Check cancellation immediately after search returns
|
|
|
with tasks_lock:
|
|
|
if task_id not in download_tasks:
|
|
|
logger.info(f"[Modal Worker] Task {task_id} was deleted after search returned")
|
|
|
return
|
|
|
# Remember this query ran so a later quarantine retry skips
|
|
|
# re-searching it (its candidates are walked via cached-first).
|
|
|
# Recorded regardless of result count: re-running a query is
|
|
|
# deterministic, so a query that returned nothing won't return
|
|
|
# anything new next time either.
|
|
|
_sq = download_tasks[task_id].get('searched_queries')
|
|
|
if not isinstance(_sq, set):
|
|
|
_sq = set()
|
|
|
_sq.add(query)
|
|
|
download_tasks[task_id]['searched_queries'] = _sq
|
|
|
if download_tasks[task_id]['status'] == 'cancelled':
|
|
|
logger.warning(f"[Modal Worker] Task {task_id} cancelled after search returned - ignoring results")
|
|
|
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
|
|
|
# The cancellation endpoint already handles batch management properly
|
|
|
return
|
|
|
|
|
|
if tracks_result:
|
|
|
result_count = len(tracks_result)
|
|
|
# Validate candidates using GUI's get_valid_candidates logic
|
|
|
candidates = deps.get_valid_candidates(tracks_result, track, query)
|
|
|
if candidates:
|
|
|
logger.debug(f"[Modal Worker] Found {len(candidates)} valid candidates for query '{query}'")
|
|
|
|
|
|
# CRITICAL: Check cancellation before processing candidates
|
|
|
with tasks_lock:
|
|
|
if task_id not in download_tasks:
|
|
|
logger.info(f"[Modal Worker] Task {task_id} was deleted before processing candidates")
|
|
|
return
|
|
|
if download_tasks[task_id]['status'] == 'cancelled':
|
|
|
logger.warning(f"[Modal Worker] Task {task_id} cancelled before processing candidates")
|
|
|
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
|
|
|
return
|
|
|
# Store candidates for retry fallback (like GUI). A
|
|
|
# later quarantine retry walks these via cached-first
|
|
|
# and skips re-searching this query (searched_queries).
|
|
|
download_tasks[task_id]['cached_candidates'] = candidates
|
|
|
|
|
|
# Try to download with these candidates
|
|
|
success = deps.attempt_download_with_candidates(task_id, candidates, track, batch_id)
|
|
|
if success:
|
|
|
# Download initiated successfully - let the download monitoring system handle completion
|
|
|
if batch_id:
|
|
|
logger.info(f"[Modal Worker] Download initiated successfully for task {task_id} - monitoring will handle completion")
|
|
|
# Store this source for batch reuse
|
|
|
with tasks_lock:
|
|
|
used_filename = download_tasks.get(task_id, {}).get('filename')
|
|
|
used_username = download_tasks.get(task_id, {}).get('username')
|
|
|
if used_filename and used_username:
|
|
|
deps.store_batch_source(batch_id, used_username, used_filename)
|
|
|
return # Success, exit the worker
|
|
|
else:
|
|
|
search_diagnostics.append(f'"{query}": {result_count} results, {len(candidates)} passed filters but download failed to start')
|
|
|
else:
|
|
|
search_diagnostics.append(f'"{query}": {result_count} results but none passed quality/artist filters')
|
|
|
# Strip SoundCloud preview snippets before caching for the
|
|
|
# review modal — the user can't pick something useful from
|
|
|
# a 30s preview clip, and clicking one bypasses validation
|
|
|
# and downloads it anyway.
|
|
|
from core.downloads.validation import filter_soundcloud_previews
|
|
|
_filtered_raw = filter_soundcloud_previews(tracks_result[:20], track)
|
|
|
all_raw_results.extend(_filtered_raw)
|
|
|
else:
|
|
|
search_diagnostics.append(f'"{query}": no results found')
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.debug(f"[Modal Worker] Search failed for query '{query}': {e}")
|
|
|
search_diagnostics.append(f'"{query}": search error — {e}')
|
|
|
continue
|
|
|
|
|
|
# === HYBRID FALLBACK: If primary source failed, try remaining sources directly ===
|
|
|
# The orchestrator's hybrid search stops at the first source with results, even if
|
|
|
# those results all fail quality filtering. Try remaining sources individually.
|
|
|
if getattr(deps.download_orchestrator, 'mode', '') == 'hybrid':
|
|
|
try:
|
|
|
orch = deps.download_orchestrator
|
|
|
hybrid_order = getattr(orch, 'hybrid_order', None) or []
|
|
|
if not hybrid_order:
|
|
|
primary = getattr(orch, 'hybrid_primary', 'soulseek')
|
|
|
secondary = getattr(orch, 'hybrid_secondary', '')
|
|
|
hybrid_order = [primary, secondary] if secondary and secondary != primary else [primary]
|
|
|
|
|
|
# Resolve via the orchestrator's generic accessor — the
|
|
|
# legacy per-source attrs were dropped in the registry
|
|
|
# refactor, so getattr(orch, 'soulseek', None) etc. all
|
|
|
# silently returned None and the fallback never fired.
|
|
|
source_clients = {
|
|
|
name: orch.client(name)
|
|
|
for name in ('soulseek', 'youtube', 'tidal', 'qobuz',
|
|
|
'hifi', 'deezer_dl', 'lidarr', 'soundcloud', 'amazon')
|
|
|
}
|
|
|
|
|
|
# The orchestrator tried sources in order but stopped at the first with results.
|
|
|
# We don't know which it stopped at, so try ALL sources except the first
|
|
|
# (which was definitely tried). If the first was skipped (unconfigured),
|
|
|
# the orchestrator would have tried the second — but trying it again is
|
|
|
# harmless (streaming sources return fast).
|
|
|
_exhausted_lower = {s.lower() for s in _exhausted_sources}
|
|
|
remaining_sources = [
|
|
|
s for s in hybrid_order[1:]
|
|
|
if s in source_clients and source_clients[s]
|
|
|
and s.lower() not in _exhausted_lower
|
|
|
]
|
|
|
if remaining_sources:
|
|
|
logger.warning(f"[Hybrid Fallback] Primary source had no valid matches. Trying fallback sources: {remaining_sources}")
|
|
|
|
|
|
for fallback_source in remaining_sources:
|
|
|
fb_client = source_clients[fallback_source]
|
|
|
if hasattr(fb_client, 'is_configured') and not fb_client.is_configured():
|
|
|
continue
|
|
|
|
|
|
# Use first 2 queries only for speed
|
|
|
for fb_query in search_queries[:2]:
|
|
|
try:
|
|
|
logger.warning(f"[Hybrid Fallback] Trying {fallback_source}: '{fb_query}'")
|
|
|
fb_results, _ = deps.run_async(fb_client.search(fb_query, timeout=20))
|
|
|
if not fb_results:
|
|
|
continue
|
|
|
fb_candidates = deps.get_valid_candidates(fb_results, track, fb_query)
|
|
|
if fb_candidates:
|
|
|
logger.warning(f"[Hybrid Fallback] {fallback_source} found {len(fb_candidates)} valid candidates!")
|
|
|
with tasks_lock:
|
|
|
if task_id in download_tasks:
|
|
|
download_tasks[task_id]['cached_candidates'] = fb_candidates
|
|
|
success = deps.attempt_download_with_candidates(task_id, fb_candidates, track, batch_id)
|
|
|
if success:
|
|
|
return
|
|
|
except Exception as e:
|
|
|
logger.error(f"[Hybrid Fallback] {fallback_source} search failed: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.warning(f"[Hybrid Fallback] {fallback_source} returned no valid candidates")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"[Hybrid Fallback] Error in fallback logic: {e}")
|
|
|
|
|
|
# If we get here, all search queries and hybrid fallbacks failed
|
|
|
logger.warning(f"[Modal Worker] No valid candidates found for '{track.name}' after trying all {len(search_queries)} queries.")
|
|
|
|
|
|
# Last-resort: quarantine retry with no new candidates — the retry search
|
|
|
# exhausted all sources. If the setting is enabled, accept the best
|
|
|
# already-quarantined candidate rather than leaving the track missing.
|
|
|
if is_quarantine_retry and deps.try_version_mismatch_fallback:
|
|
|
_fallback_artist = track.artists[0] if track.artists else ''
|
|
|
if deps.try_version_mismatch_fallback(track.name, _fallback_artist, task_id, batch_id):
|
|
|
return # fallback re-dispatched; batch completion handled by reprocess thread
|
|
|
|
|
|
with tasks_lock:
|
|
|
if task_id in download_tasks:
|
|
|
download_tasks[task_id]['status'] = 'not_found'
|
|
|
_diag_summary = ' | '.join(search_diagnostics) if search_diagnostics else 'no queries attempted'
|
|
|
download_tasks[task_id]['error_message'] = f'No match found for "{track_name}" by {artist_name or "Unknown"} after {len(search_queries)} queries. Breakdown: {_diag_summary}'
|
|
|
# Store raw results so the user can review what Soulseek returned
|
|
|
if all_raw_results and not download_tasks[task_id].get('cached_candidates'):
|
|
|
download_tasks[task_id]['cached_candidates'] = all_raw_results
|
|
|
|
|
|
# Notify batch manager that this task completed (failed) - THREAD SAFE
|
|
|
if batch_id:
|
|
|
try:
|
|
|
deps.on_download_completed(batch_id, task_id, False)
|
|
|
except Exception as completion_error:
|
|
|
logger.error(f"Error in batch completion callback for {task_id}: {completion_error}")
|
|
|
|
|
|
except Exception as e:
|
|
|
track_name_safe = locals().get('track_name', 'unknown') # Safe fallback for track_name
|
|
|
logger.error(f"CRITICAL ERROR in download task for '{track_name_safe}' (task_id: {task_id}): {e}")
|
|
|
traceback.print_exc()
|
|
|
|
|
|
# Update task status safely with timeout
|
|
|
try:
|
|
|
lock_acquired = tasks_lock.acquire(timeout=2.0)
|
|
|
if lock_acquired:
|
|
|
try:
|
|
|
if task_id in download_tasks:
|
|
|
download_tasks[task_id]['status'] = 'failed'
|
|
|
download_tasks[task_id]['error_message'] = f'Unexpected error during download: {type(e).__name__}: {e}'
|
|
|
logger.error(f"[Exception Recovery] Set task {task_id} status to 'failed'")
|
|
|
finally:
|
|
|
tasks_lock.release()
|
|
|
else:
|
|
|
logger.error(f"[Exception Recovery] Could not acquire lock to update task {task_id} status")
|
|
|
except Exception as status_error:
|
|
|
logger.error(f"Error updating task status in exception handler: {status_error}")
|
|
|
|
|
|
# Notify batch manager that this task completed (failed) - THREAD SAFE with RECOVERY
|
|
|
if batch_id:
|
|
|
try:
|
|
|
deps.on_download_completed(batch_id, task_id, False)
|
|
|
logger.error(f"[Exception Recovery] Successfully freed worker slot for task {task_id}")
|
|
|
except Exception as completion_error:
|
|
|
logger.error(f"[Exception Recovery] Error in batch completion callback for {task_id}: {completion_error}")
|
|
|
# CRITICAL: If batch completion fails, we need to manually recover the worker slot
|
|
|
try:
|
|
|
logger.error(f"[Exception Recovery] Attempting manual worker slot recovery for batch {batch_id}")
|
|
|
deps.recover_worker_slot(batch_id, task_id)
|
|
|
except Exception as recovery_error:
|
|
|
logger.error(f"[Exception Recovery] FATAL: Could not recover worker slot: {recovery_error}")
|