You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/downloads/task_worker.py

656 lines
35 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""Per-task download worker.
Runs as a background thread (one per task) that:
1. Tries source-reuse (use the batch's last good slskd peer if available)
2. Tries staging-match (file already in staging folder, no download needed)
3. Generates smart search queries via the matching engine + legacy fallbacks
4. Iterates queries sequentially against the soulseek client
5. For each query: validates results, attempts download with fallback candidates
6. If hybrid mode: falls back to remaining sources (youtube/tidal/qobuz/hifi/deezer_dl)
7. On total failure: marks task not_found + records search diagnostics
8. On any uncaught exception: marks failed + emergency worker-slot recovery
Lifted verbatim from web_server.py's `_download_track_worker`. The helpers
this calls into (try_source_reuse, store_batch_source, try_staging_match,
get_valid_candidates, attempt_download_with_candidates, on_download_completed,
recover_worker_slot) are passed via `TaskWorkerDeps` since each is itself
a large web_server.py helper that will get its own lift in subsequent PRs.
"""
from __future__ import annotations
import re
import traceback
from dataclasses import dataclass
from typing import Any, Callable, Optional
from core.runtime_state import download_batches, download_tasks, tasks_lock
from core.spotify_client import Track as SpotifyTrack
from utils.logging_config import get_logger
# Must live under the soulsync.* namespace — handlers only attach there. The
# old bare getLogger(__name__) ("core.downloads.task_worker") had no handler,
# so the entire [Modal Worker] story — search queries, retry walks, candidate
# decisions — never reached app.log.
logger = get_logger("downloads.task_worker")
def _resolve_worker_source(username):
"""Logical source bucket for a candidate's username (Soulseek peers all
collapse to 'soulseek'; streaming sources keep their name). Mirrors the
monitor's resolver — imported lazily to avoid an import cycle."""
try:
from core.downloads.monitor import _resolve_download_source
return _resolve_download_source(username)
except Exception:
return 'soulseek'
def _cand_user_file(candidate):
"""Read (username, filename) from a candidate that may be a TrackResult
object or a plain dict (tests / cached raw rows)."""
if isinstance(candidate, dict):
return candidate.get('username'), candidate.get('filename')
return getattr(candidate, 'username', None), getattr(candidate, 'filename', None)
def _try_cached_candidates(task_id, batch_id, track, deps):
"""Quarantine-retry fast path: attempt the already-found candidates before
re-searching anything.
When a verified-bad file is re-queued, the connection was fine (the file
downloaded, it was just the wrong/broken content) — so the next-best pick is
almost always already sitting in ``cached_candidates``. Walk those (skipping
sources already tried or budget-exhausted) and hand them to the normal
download path. Returns True if a download was started; False to fall through
to a fresh search (which only happens for a not-yet-searched source).
"""
with tasks_lock:
task = download_tasks.get(task_id)
if not task:
return False
cached = list(task.get('cached_candidates') or [])
used = set(task.get('used_sources') or ())
exhausted = {str(s).lower() for s in (task.get('exhausted_download_sources') or ())}
remaining = []
for c in cached:
uname, fname = _cand_user_file(c)
if not uname or not fname:
continue
if f"{uname}_{fname}" in used:
continue
if _resolve_worker_source(uname).lower() in exhausted:
continue
remaining.append(c)
if not remaining:
return False
logger.info(
f"[Modal Worker] Quarantine retry: trying {len(remaining)} cached "
f"candidate(s) before re-searching (task {task_id})"
)
return deps.attempt_download_with_candidates(task_id, remaining, track, batch_id)
def _private_album_bundle_staging_miss_reason(batch_id: Optional[str], deps: Any) -> Optional[str]:
"""Return a user-facing miss reason when per-track search should stop.
Torrent / usenet album batches first download one private staged release,
then each track claims the matching staged file. If that claim fails after
the release is already staged, falling through to the normal per-track
search only retries release-level sources N times and can keep re-adding
the same torrent/NZB. For those two sources we treat the staged release as
authoritative for this pass.
Soulseek is deliberately NOT short-circuited. A Soulseek album bundle stages
whichever single folder scored best, and ``album_bundle_partial`` only
reflects whether the files found IN that folder downloaded — not whether the
folder actually contained every track the album needs. So a track the album
needs but that wasn't in the chosen folder would otherwise be marked
not_found with no fallback (#743). Unlike torrent/usenet, Soulseek per-track
search is a genuine per-file network search — it doesn't re-add a release —
so letting these misses fall through to the normal per-track flow (and, in
hybrid mode, onward to the next source) is correct and cheap.
"""
if not batch_id:
return None
batch = download_batches.get(batch_id)
if not isinstance(batch, dict):
return None
source = (batch.get('album_bundle_source') or '').lower()
mode = (getattr(deps.download_orchestrator, 'mode', '') or '').lower()
hybrid_first = ''
if mode == 'hybrid':
order = getattr(deps.download_orchestrator, 'hybrid_order', None) or []
if order:
hybrid_first = str(order[0] or '').lower()
else:
hybrid_first = str(getattr(deps.download_orchestrator, 'hybrid_primary', '') or '').lower()
if (
batch.get('album_bundle_private_staging')
and batch.get('album_bundle_state') == 'staged'
and not batch.get('album_bundle_partial')
and source in ('torrent', 'usenet')
and (mode == source or (mode == 'hybrid' and hybrid_first == source))
):
return f'Track was not found in the staged {source} album release'
return None
@dataclass
class TaskWorkerDeps:
"""Bundle of cross-cutting deps the per-task download worker needs."""
download_orchestrator: Any
matching_engine: Any
run_async: Callable
try_source_reuse: Callable # (task_id, batch_id, track) -> bool
store_batch_source: Callable # (batch_id, username, filename) -> None
try_staging_match: Callable # (task_id, batch_id, track) -> bool
get_valid_candidates: Callable # (results, spotify_track, query) -> list
attempt_download_with_candidates: Callable # (task_id, candidates, track, batch_id) -> bool
on_download_completed: Callable # (batch_id, task_id, success) -> None
recover_worker_slot: Callable # (batch_id, task_id) -> None
try_version_mismatch_fallback: Optional[Callable] = None # (title, artist, task_id, batch_id) -> bool
def download_track_worker(task_id: str, batch_id: Optional[str], deps: TaskWorkerDeps) -> None:
"""Enhanced download worker that matches the GUI's exact retry logic.
Implements sequential query retry, fallback candidates, and download
failure retry.
"""
try:
# Retrieve task details from global state
with tasks_lock:
if task_id not in download_tasks:
logger.warning(f"[Modal Worker] Task {task_id} not found in download_tasks")
return
task = download_tasks[task_id].copy()
# Cancellation Checkpoint 1: Before doing anything
with tasks_lock:
if task_id not in download_tasks:
logger.info(f"[Modal Worker] Task {task_id} was deleted before starting")
return
if download_tasks[task_id]['status'] == 'cancelled':
logger.warning(f"[Modal Worker] Task {task_id} cancelled before starting")
# V2 FIX: Don't call _on_download_completed for cancelled V2 tasks
# V2 system handles worker slot freeing in atomic cancel function
task_playlist_id = download_tasks[task_id].get('playlist_id')
if task_playlist_id:
logger.warning(f"[Modal Worker] V2 task {task_id} cancelled - worker slot already freed by V2 system")
return # V2 system already handled worker slot management
elif batch_id:
# Legacy system - use old completion callback
logger.warning(f"[Modal Worker] Legacy task {task_id} cancelled - using legacy completion callback")
deps.on_download_completed(batch_id, task_id, False)
return
track_data = task['track_info']
track_name = track_data.get('name', 'Unknown Track')
logger.info(f"[Modal Worker] Task {task_id} starting search for track: '{track_name}'")
# Recreate a SpotifyTrack object for the matching engine
# Handle both string format and Spotify API format for artists
raw_artists = track_data.get('artists', [])
processed_artists = []
for artist in raw_artists:
if isinstance(artist, str):
processed_artists.append(artist)
elif isinstance(artist, dict) and 'name' in artist:
processed_artists.append(artist['name'])
else:
processed_artists.append(str(artist))
# Handle album field - extract name if it's a dictionary
raw_album = track_data.get('album', '')
if isinstance(raw_album, dict) and 'name' in raw_album:
album_name = raw_album['name']
elif isinstance(raw_album, str):
album_name = raw_album
else:
album_name = str(raw_album)
track = SpotifyTrack(
id=track_data.get('id', ''),
name=track_data.get('name', ''),
artists=processed_artists,
album=album_name,
duration_ms=track_data.get('duration_ms', 0),
popularity=track_data.get('popularity', 0),
)
logger.info(f"[Modal Worker] Starting download task for: {track.name} by {track.artists[0] if track.artists else 'Unknown'}")
# === SOURCE REUSE: Check batch's last good source before searching ===
if deps.try_source_reuse(task_id, batch_id, track):
# Store source for next worker (cascading reuse)
with tasks_lock:
used_filename = download_tasks.get(task_id, {}).get('filename')
used_username = download_tasks.get(task_id, {}).get('username')
if used_filename and used_username:
deps.store_batch_source(batch_id, used_username, used_filename)
return
# === STAGING CHECK: Check staging folder for existing file before searching ===
if deps.try_staging_match(task_id, batch_id, track):
return
staging_miss_reason = _private_album_bundle_staging_miss_reason(batch_id, deps)
if staging_miss_reason:
logger.warning(
"[Modal Worker] %s for '%s'; skipping redundant per-track %s search",
staging_miss_reason,
track.name,
getattr(deps.download_orchestrator, 'mode', 'release-source'),
)
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'not_found'
download_tasks[task_id]['error_message'] = staging_miss_reason
if batch_id:
deps.on_download_completed(batch_id, task_id, False)
return
# Initialize task state tracking (like GUI's parallel_search_tracking)
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'searching' # Now actively being processed
download_tasks[task_id]['current_query_index'] = 0
download_tasks[task_id]['current_candidate_index'] = 0
download_tasks[task_id]['retry_count'] = 0
download_tasks[task_id]['candidates'] = []
# CRITICAL: Preserve used_sources from previous retry attempts (don't reset to empty set)
# If this is a retry, the monitor will have already marked failed sources
if 'used_sources' not in download_tasks[task_id]:
download_tasks[task_id]['used_sources'] = set()
# Else: keep existing used_sources to avoid retrying same failed hosts
# Cached-first quarantine retry. The monitor sets ``_quarantine_retry``
# when a verified-bad file is re-queued; in that case we walk the
# already-found candidates before re-searching (the connection was fine,
# just the content was wrong). A NON-quarantine entry (fresh download, or
# the monitor's dead-connection/stuck retry) instead starts a new search
# generation: clear the searched-source memory so each source can be
# searched fresh again.
with tasks_lock:
_t = download_tasks.get(task_id, {})
is_quarantine_retry = bool(_t.pop('_quarantine_retry', False))
if not is_quarantine_retry:
_t.pop('searched_queries', None)
if is_quarantine_retry and _try_cached_candidates(task_id, batch_id, track, deps):
with tasks_lock:
used_filename = download_tasks.get(task_id, {}).get('filename')
used_username = download_tasks.get(task_id, {}).get('username')
if used_filename and used_username:
deps.store_batch_source(batch_id, used_username, used_filename)
return
# 1. Generate multiple search queries (like GUI's generate_smart_search_queries)
artist_name = track.artists[0] if track.artists else None
track_name = track.name
release_queries = []
try:
_download_mode = (getattr(deps.download_orchestrator, 'mode', '') or '').lower()
_track_album = (getattr(track, 'album', '') or '').strip()
_track_title = (getattr(track, 'name', '') or '').strip()
_track_artists = list(getattr(track, 'artists', []) or [])
_first_artist = _track_artists[0] if _track_artists else ''
_primary_artist = (
(_first_artist.get('name', '') if isinstance(_first_artist, dict) else str(_first_artist))
or ''
).strip()
if (
_download_mode in ('torrent', 'usenet')
and _primary_artist
and _track_album
and _track_album.lower() not in ('unknown album', _track_title.lower())
):
release_queries.append(f"{_primary_artist} {_track_album}".strip())
except Exception as _release_query_exc:
logger.debug("[Modal Worker] release query hint failed: %s", _release_query_exc)
# Start with matching engine queries
search_queries = deps.matching_engine.generate_download_queries(track)
# Add legacy fallback queries (like GUI does)
legacy_queries = []
if artist_name:
# Add first word of artist approach (legacy compatibility)
artist_words = artist_name.split()
if artist_words:
first_word = artist_words[0]
if first_word.lower() == 'the' and len(artist_words) > 1:
first_word = artist_words[1]
if len(first_word) > 1:
legacy_queries.append(f"{track_name} {first_word}".strip())
# Add track-only query
if track_name.strip():
legacy_queries.append(track_name.strip())
# Add traditional cleaned queries
cleaned_name = re.sub(r'\s*\([^)]*\)', '', track_name).strip()
cleaned_name = re.sub(r'\s*\[[^\]]*\]', '', cleaned_name).strip()
if cleaned_name and cleaned_name.lower() != track_name.lower():
legacy_queries.append(cleaned_name.strip())
# Combine enhanced queries with legacy fallbacks.
#
# Torrent / usenet can use full album releases as a fallback for
# single-track requests, but trying the album release first makes
# playlist batches download whole albums before checking whether a
# track-shaped release exists. Keep release queries last so singles
# stay light when the indexer has a direct result.
all_queries = search_queries + legacy_queries + release_queries
# Remove duplicates while preserving order
unique_queries = []
seen = set()
for query in all_queries:
if query and query.lower() not in seen:
unique_queries.append(query)
seen.add(query.lower())
search_queries = unique_queries
# Expose the query count so the quarantine-retry budget (exhaustive mode)
# can size each source's budget as query_count × retries_per_query.
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['query_count'] = len(search_queries)
logger.info(f"[Modal Worker] Generated {len(search_queries)} smart search queries for '{track.name}': {search_queries}")
logger.info(f"[Modal Worker] About to start search loop for task {task_id} (track: '{track.name}')")
# 2. Sequential Query Search (matches GUI's start_search_worker_parallel logic)
search_diagnostics = [] # Track what happened per query for detailed error messages
all_raw_results = [] # Collect raw results across queries for candidate review modal
# Sources whose per-source quarantine-retry budget is spent (exhaustive
# mode). The monitor sets this when a source gives up; we exclude those
# sources from the hybrid search so the chain falls through to the next
# source instead of re-fetching the same exhausted one (e.g. Soulseek
# keeps returning fresh wrong peers — once its budget is gone, switch to
# HiFi/Tidal/…). See monitor.requeue_quarantined_task_for_retry.
#
# On a quarantine retry we do NOT exclude a source just because it was
# searched once: the first run only ran ONE query before starting a
# download, so the later queries (e.g. "artist + album") have never hit
# that source yet and may surface the correct upload. Instead we remember
# which QUERIES already ran (``searched_queries``) and skip re-running
# only those — their candidates are walked via the cached-first path
# above. The not-yet-searched queries still search the same source, so
# every query is exhausted per source before the chain switches sources.
# Fresh / dead-connection runs cleared searched_queries above, so they
# search everything again.
with tasks_lock:
_t = download_tasks.get(task_id, {})
_exhausted_sources = [str(s) for s in (_t.get('exhausted_download_sources') or ())]
_searched_queries = (
set(_t.get('searched_queries') or ()) if is_quarantine_retry else set()
)
for query_index, query in enumerate(search_queries):
# Cancellation check before each query
with tasks_lock:
if task_id not in download_tasks:
logger.debug(f"[Modal Worker] Task {task_id} was deleted during query {query_index + 1}")
return
if download_tasks[task_id]['status'] == 'cancelled':
logger.debug(f"[Modal Worker] Task {task_id} cancelled during query {query_index + 1}")
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
return
download_tasks[task_id]['current_query_index'] = query_index
# Cached-first: a query already run last generation has its candidates
# sitting in cache (walked above) — re-searching it is the wasteful
# repeat the cached-first design removes. Skip it; the not-yet-run
# queries below still search this source.
if is_quarantine_retry and query in _searched_queries:
logger.debug(
f"[Modal Worker] Skipping already-searched query '{query}' "
f"(candidates served from cache) for task {task_id}"
)
continue
logger.debug(f"[Modal Worker] Query {query_index + 1}/{len(search_queries)}: '{query}'")
logger.debug(f"About to call soulseek search for task {task_id}")
try:
# Hybrid + album-context batches must skip torrent / usenet during
# the per-track loop — they're release-level sources, can't match
# individual tracks meaningfully, and album-bundle handling only
# fires in single-source mode (see core/downloads/master.py). The
# exclusion lets the hybrid chain fall through to per-track-
# compatible sources (soulseek / streaming) instead of attempting
# N redundant Prowlarr searches that all download the same album
# torrent and rely on the auto-import sweep to clean up.
_exclude_for_hybrid_album = None
try:
_batch_is_album = False
if batch_id:
from core.runtime_state import download_batches as _db
_b = _db.get(batch_id)
if isinstance(_b, dict):
_batch_is_album = bool(_b.get('is_album_download'))
if _batch_is_album and getattr(deps.download_orchestrator, 'mode', '') == 'hybrid':
_exclude_for_hybrid_album = ['torrent', 'usenet']
except Exception as _exc_filter_err:
logger.debug("[Modal Worker] album-source-exclusion check failed: %s", _exc_filter_err)
# Fold in budget-exhausted sources (per-source quarantine retry).
_exclude_sources = list(_exhausted_sources)
if _exclude_for_hybrid_album:
_exclude_sources.extend(_exclude_for_hybrid_album)
# Perform search with timeout
tracks_result, _ = deps.run_async(deps.download_orchestrator.search(
query, timeout=30, exclude_sources=_exclude_sources or None,
))
logger.debug(f"Search completed for task {task_id}, got {len(tracks_result) if tracks_result else 0} results")
# CRITICAL: Check cancellation immediately after search returns
with tasks_lock:
if task_id not in download_tasks:
logger.info(f"[Modal Worker] Task {task_id} was deleted after search returned")
return
# Remember this query ran so a later quarantine retry skips
# re-searching it (its candidates are walked via cached-first).
# Recorded regardless of result count: re-running a query is
# deterministic, so a query that returned nothing won't return
# anything new next time either.
_sq = download_tasks[task_id].get('searched_queries')
if not isinstance(_sq, set):
_sq = set()
_sq.add(query)
download_tasks[task_id]['searched_queries'] = _sq
if download_tasks[task_id]['status'] == 'cancelled':
logger.warning(f"[Modal Worker] Task {task_id} cancelled after search returned - ignoring results")
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
# The cancellation endpoint already handles batch management properly
return
if tracks_result:
result_count = len(tracks_result)
# Validate candidates using GUI's get_valid_candidates logic
candidates = deps.get_valid_candidates(tracks_result, track, query)
if candidates:
logger.debug(f"[Modal Worker] Found {len(candidates)} valid candidates for query '{query}'")
# CRITICAL: Check cancellation before processing candidates
with tasks_lock:
if task_id not in download_tasks:
logger.info(f"[Modal Worker] Task {task_id} was deleted before processing candidates")
return
if download_tasks[task_id]['status'] == 'cancelled':
logger.warning(f"[Modal Worker] Task {task_id} cancelled before processing candidates")
# Don't call _on_download_completed for cancelled tasks as it can stop monitoring
return
# Store candidates for retry fallback (like GUI). A
# later quarantine retry walks these via cached-first
# and skips re-searching this query (searched_queries).
download_tasks[task_id]['cached_candidates'] = candidates
# Try to download with these candidates
success = deps.attempt_download_with_candidates(task_id, candidates, track, batch_id)
if success:
# Download initiated successfully - let the download monitoring system handle completion
if batch_id:
logger.info(f"[Modal Worker] Download initiated successfully for task {task_id} - monitoring will handle completion")
# Store this source for batch reuse
with tasks_lock:
used_filename = download_tasks.get(task_id, {}).get('filename')
used_username = download_tasks.get(task_id, {}).get('username')
if used_filename and used_username:
deps.store_batch_source(batch_id, used_username, used_filename)
return # Success, exit the worker
else:
search_diagnostics.append(f'"{query}": {result_count} results, {len(candidates)} passed filters but download failed to start')
else:
search_diagnostics.append(f'"{query}": {result_count} results but none passed quality/artist filters')
# Strip SoundCloud preview snippets before caching for the
# review modal — the user can't pick something useful from
# a 30s preview clip, and clicking one bypasses validation
# and downloads it anyway.
from core.downloads.validation import filter_soundcloud_previews
_filtered_raw = filter_soundcloud_previews(tracks_result[:20], track)
all_raw_results.extend(_filtered_raw)
else:
search_diagnostics.append(f'"{query}": no results found')
except Exception as e:
logger.debug(f"[Modal Worker] Search failed for query '{query}': {e}")
search_diagnostics.append(f'"{query}": search error — {e}')
continue
# === HYBRID FALLBACK: If primary source failed, try remaining sources directly ===
# The orchestrator's hybrid search stops at the first source with results, even if
# those results all fail quality filtering. Try remaining sources individually.
if getattr(deps.download_orchestrator, 'mode', '') == 'hybrid':
try:
orch = deps.download_orchestrator
hybrid_order = getattr(orch, 'hybrid_order', None) or []
if not hybrid_order:
primary = getattr(orch, 'hybrid_primary', 'soulseek')
secondary = getattr(orch, 'hybrid_secondary', '')
hybrid_order = [primary, secondary] if secondary and secondary != primary else [primary]
# Resolve via the orchestrator's generic accessor — the
# legacy per-source attrs were dropped in the registry
# refactor, so getattr(orch, 'soulseek', None) etc. all
# silently returned None and the fallback never fired.
source_clients = {
name: orch.client(name)
for name in ('soulseek', 'youtube', 'tidal', 'qobuz',
'hifi', 'deezer_dl', 'lidarr', 'soundcloud', 'amazon')
}
# The orchestrator tried sources in order but stopped at the first with results.
# We don't know which it stopped at, so try ALL sources except the first
# (which was definitely tried). If the first was skipped (unconfigured),
# the orchestrator would have tried the second — but trying it again is
# harmless (streaming sources return fast).
_exhausted_lower = {s.lower() for s in _exhausted_sources}
remaining_sources = [
s for s in hybrid_order[1:]
if s in source_clients and source_clients[s]
and s.lower() not in _exhausted_lower
]
if remaining_sources:
logger.warning(f"[Hybrid Fallback] Primary source had no valid matches. Trying fallback sources: {remaining_sources}")
for fallback_source in remaining_sources:
fb_client = source_clients[fallback_source]
if hasattr(fb_client, 'is_configured') and not fb_client.is_configured():
continue
# Use first 2 queries only for speed
for fb_query in search_queries[:2]:
try:
logger.warning(f"[Hybrid Fallback] Trying {fallback_source}: '{fb_query}'")
fb_results, _ = deps.run_async(fb_client.search(fb_query, timeout=20))
if not fb_results:
continue
fb_candidates = deps.get_valid_candidates(fb_results, track, fb_query)
if fb_candidates:
logger.warning(f"[Hybrid Fallback] {fallback_source} found {len(fb_candidates)} valid candidates!")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['cached_candidates'] = fb_candidates
success = deps.attempt_download_with_candidates(task_id, fb_candidates, track, batch_id)
if success:
return
except Exception as e:
logger.error(f"[Hybrid Fallback] {fallback_source} search failed: {e}")
continue
logger.warning(f"[Hybrid Fallback] {fallback_source} returned no valid candidates")
except Exception as e:
logger.error(f"[Hybrid Fallback] Error in fallback logic: {e}")
# If we get here, all search queries and hybrid fallbacks failed
logger.warning(f"[Modal Worker] No valid candidates found for '{track.name}' after trying all {len(search_queries)} queries.")
# Last-resort: quarantine retry with no new candidates — the retry search
# exhausted all sources. If the setting is enabled, accept the best
# already-quarantined candidate rather than leaving the track missing.
if is_quarantine_retry and deps.try_version_mismatch_fallback:
_fallback_artist = track.artists[0] if track.artists else ''
if deps.try_version_mismatch_fallback(track.name, _fallback_artist, task_id, batch_id):
return # fallback re-dispatched; batch completion handled by reprocess thread
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'not_found'
_diag_summary = ' | '.join(search_diagnostics) if search_diagnostics else 'no queries attempted'
download_tasks[task_id]['error_message'] = f'No match found for "{track_name}" by {artist_name or "Unknown"} after {len(search_queries)} queries. Breakdown: {_diag_summary}'
# Store raw results so the user can review what Soulseek returned
if all_raw_results and not download_tasks[task_id].get('cached_candidates'):
download_tasks[task_id]['cached_candidates'] = all_raw_results
# Notify batch manager that this task completed (failed) - THREAD SAFE
if batch_id:
try:
deps.on_download_completed(batch_id, task_id, False)
except Exception as completion_error:
logger.error(f"Error in batch completion callback for {task_id}: {completion_error}")
except Exception as e:
track_name_safe = locals().get('track_name', 'unknown') # Safe fallback for track_name
logger.error(f"CRITICAL ERROR in download task for '{track_name_safe}' (task_id: {task_id}): {e}")
traceback.print_exc()
# Update task status safely with timeout
try:
lock_acquired = tasks_lock.acquire(timeout=2.0)
if lock_acquired:
try:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f'Unexpected error during download: {type(e).__name__}: {e}'
logger.error(f"[Exception Recovery] Set task {task_id} status to 'failed'")
finally:
tasks_lock.release()
else:
logger.error(f"[Exception Recovery] Could not acquire lock to update task {task_id} status")
except Exception as status_error:
logger.error(f"Error updating task status in exception handler: {status_error}")
# Notify batch manager that this task completed (failed) - THREAD SAFE with RECOVERY
if batch_id:
try:
deps.on_download_completed(batch_id, task_id, False)
logger.error(f"[Exception Recovery] Successfully freed worker slot for task {task_id}")
except Exception as completion_error:
logger.error(f"[Exception Recovery] Error in batch completion callback for {task_id}: {completion_error}")
# CRITICAL: If batch completion fails, we need to manually recover the worker slot
try:
logger.error(f"[Exception Recovery] Attempting manual worker slot recovery for batch {batch_id}")
deps.recover_worker_slot(batch_id, task_id)
except Exception as recovery_error:
logger.error(f"[Exception Recovery] FATAL: Could not recover worker slot: {recovery_error}")