You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/downloads/monitor.py

1106 lines
59 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""WebUIDownloadMonitor — lifted from web_server.py.
The class body is byte-identical to the original. Module-level globals
(injected via ``init()`` from web_server) include the worker / completion
helpers and orchestrator handles. ``IS_SHUTTING_DOWN`` is a module-level
flag mirrored from web_server's own flag in ``_shutdown_runtime_components``.
"""
import threading
import time
from config.settings import config_manager
from core.runtime_state import (
download_batches,
download_tasks,
matched_context_lock,
matched_downloads_context,
tasks_lock,
)
from utils.async_helpers import run_async
from utils.logging_config import get_logger
# Project logger factory so these lines reach app.log (soulsync.* namespace).
logger = get_logger("downloads.monitor")
# Mirrored from web_server.IS_SHUTTING_DOWN via _shutdown_runtime_components.
IS_SHUTTING_DOWN = False
# Injected at runtime via init() — these are defined later in web_server.py
# than the class is instantiated, so we late-bind them.
_make_context_key = None
_on_download_completed = None
_download_track_worker = None
_run_post_processing_worker = None
_start_next_batch_of_downloads = None
_orphaned_download_keys = None
missing_download_executor = None
download_orchestrator = None
_RELEASE_SOURCE_NAMES = frozenset(('torrent', 'usenet'))
# Hard ceiling on automatic next-candidate retries after a download was
# quarantined (AcoustID mismatch / integrity / duration). The natural
# terminator is used_sources exhaustion — once every candidate the worker can
# find has been tried, attempt_download_with_candidates returns False and the
# worker reports a clean failure. This cap is a safety net against a pathological
# quarantine→retry→quarantine loop (e.g. a source that keeps returning fresh
# wrong files).
#
# Default (non-exhaustive) mode uses this single global cap. The opt-in
# exhaustive mode (post_processing.retry_exhaustive) instead budgets retries
# PER SOURCE — see requeue_quarantined_task_for_retry.
MAX_QUARANTINE_RETRIES = 5
# Absolute runaway guard for exhaustive mode. Per-source budgets are already
# finite (query_count × retries_per_query, and Soulseek peers all collapse to
# one 'soulseek' bucket), but this ceiling caps the TOTAL retries across every
# source so a misbehaving source-resolution can never loop forever.
MAX_TOTAL_QUARANTINE_RETRIES = 100
# Streaming plugins report their source name as the download's "username"
# (see download_orchestrator._streaming_sources). Soulseek uses the peer name
# instead, so anything not in this set is bucketed under 'soulseek' for the
# per-source retry budget.
_STREAMING_SOURCE_NAMES = frozenset((
'youtube', 'tidal', 'qobuz', 'hifi', 'deezer_dl', 'lidarr', 'soundcloud', 'amazon',
))
def _resolve_download_source(username):
"""Map a download's username to its logical source for per-source budgeting.
Streaming sources use the source name as username; Soulseek uses the peer
name, so every Soulseek peer collapses to a single 'soulseek' bucket.
"""
if username and username in _STREAMING_SOURCE_NAMES:
return username
return 'soulseek'
def _remaining_fallback_sources(exhausted):
"""Sources in the configured hybrid chain that haven't exhausted their
per-source budget yet.
When a source spends its whole budget (exhaustive mode), the task switches
to the next source instead of failing — but only if there *is* another
source. Single-source mode has nothing to fall back to, so this returns
empty there (and when the orchestrator isn't wired). The returned list
drives both the give-up decision here and the worker's search-exclusion on
the next attempt (see task_worker: exhausted_download_sources).
"""
orch = download_orchestrator
if orch is None or getattr(orch, 'mode', None) != 'hybrid':
return []
chain = getattr(orch, 'hybrid_order', None) or []
blocked = {str(s).lower() for s in exhausted}
return [s for s in chain if str(s).lower() not in blocked]
def _download_id_key(download_id):
return f"download_id::{download_id}" if download_id else None
def requeue_quarantined_task_for_retry(task_id, batch_id, trigger):
"""Re-queue a task whose download was just quarantined so the worker tries
the NEXT best candidate instead of failing outright.
Called from the post-processing verification wrapper when AcoustID
verification or the integrity/duration check quarantines a file. It mirrors
the monitor's transfer-error retry path: mark the bad source as used, clear
the stale download identity, reset the task to ``searching`` and resubmit
the download worker. Because ``used_sources`` is preserved across the
re-run, the worker skips the quarantined source and picks the next-best
candidate (see ``attempt_download_with_candidates``).
Returns True if a retry was queued — the caller must then NOT mark the task
failed or notify batch completion, since the task is going around again.
Returns False when no retry is possible (retry engine unwired, manual pick,
cancelled, or retry budget exhausted); the caller falls through to its
existing failure handling.
"""
# Opt-out escape hatch — default on. Lets users restore the old
# quarantine-and-fail behaviour without a code change.
if not config_manager.get('post_processing.retry_next_candidate_on_mismatch', True):
return False
# Retry engine not wired (e.g. manual-import path that never started a
# download worker). Nothing to re-run.
if missing_download_executor is None or _download_track_worker is None:
return False
with tasks_lock:
task = download_tasks.get(task_id)
if not task:
return False
# The user explicitly picked this candidate via the candidates modal —
# honour their choice rather than silently swapping in another file.
# (Matches the monitor's transfer-retry guards.)
if task.get('_user_manual_pick'):
return False
if task.get('status') == 'cancelled':
return False
username = task.get('username')
filename = task.get('filename')
# No source identity means this wasn't a worker-dispatched download we
# can retry — without the "{username}_{filename}" key we can't flag the
# bad source as used, so a re-run could re-pick the same file and loop.
# Bail and let the caller fail it normally.
if not username or not filename:
return False
total_count = task.get('quarantine_retry_count', 0)
if config_manager.get('post_processing.retry_exhaustive', False):
# Exhaustive mode: a SEPARATE budget per source. The budget scales
# with the track's own query count (the worker generates a variable
# number of search queries per track) × the configured retries per
# query. Soulseek candidates are walked first (one per retry), then
# the worker's hybrid fallback moves to the next source — each source
# spending its own budget. The natural terminator (used_sources
# exhaustion → worker clean-fail) still ends most tracks well before
# any budget is reached; the budget is the per-source safety ceiling.
source = _resolve_download_source(username)
retries_per_query = config_manager.get('post_processing.retries_per_query', 5)
try:
retries_per_query = int(retries_per_query)
except (TypeError, ValueError):
retries_per_query = 5
if retries_per_query < 1:
retries_per_query = 1
query_count = task.get('query_count') or 1
if query_count < 1:
query_count = 1
budget = query_count * retries_per_query
counts = task.get('quarantine_retry_counts_by_source')
if not isinstance(counts, dict):
counts = {}
source_count = counts.get(source, 0)
if source_count >= budget:
# This source spent its whole budget. Rather than fail the
# track outright, mark the source exhausted and fall through to
# the next source in the hybrid chain (the worker excludes
# exhausted sources from its next search). Only give up once no
# fallback source remains — or the absolute ceiling trips.
exhausted = set(task.get('exhausted_download_sources') or ())
exhausted.add(source)
remaining = _remaining_fallback_sources(exhausted)
if not remaining:
logger.warning(
f"[Retry:{trigger}] Task {task_id} exhausted its retry "
f"budget for source '{source}' ({source_count}/{budget}) "
f"and no fallback source remains — giving up, marking failed"
)
return False
if total_count >= MAX_TOTAL_QUARANTINE_RETRIES:
logger.warning(
f"[Retry:{trigger}] Task {task_id} hit the absolute retry "
f"ceiling ({MAX_TOTAL_QUARANTINE_RETRIES}) — giving up, "
f"marking failed"
)
return False
task['exhausted_download_sources'] = exhausted
# Don't push this source's counter past its budget — it's done.
# The next source starts spending its own fresh budget when its
# first candidate fails verification.
attempt_desc = (
f"source '{source}' budget spent ({source_count}/{budget}) "
f"— switching sources (remaining: {', '.join(remaining)})"
)
else:
if total_count >= MAX_TOTAL_QUARANTINE_RETRIES:
logger.warning(
f"[Retry:{trigger}] Task {task_id} hit the absolute retry "
f"ceiling ({MAX_TOTAL_QUARANTINE_RETRIES}) — giving up, "
f"marking failed"
)
return False
counts[source] = source_count + 1
task['quarantine_retry_counts_by_source'] = counts
attempt_desc = f"source '{source}' {source_count + 1}/{budget}"
else:
# Default mode: a single global cap, conservative and predictable.
if total_count >= MAX_QUARANTINE_RETRIES:
logger.warning(
f"[Retry:{trigger}] Task {task_id} hit the quarantine-retry cap "
f"({MAX_QUARANTINE_RETRIES}) — giving up, marking failed"
)
return False
attempt_desc = f"{total_count + 1}/{MAX_QUARANTINE_RETRIES}"
# Mark the quarantined source as used so the re-run won't pick it again.
# Uses the same "{username}_{filename}" key the worker dedups against.
used_sources = task.get('used_sources', set())
used_sources.add(f"{username}_{filename}")
task['used_sources'] = used_sources
task['quarantine_retry_count'] = total_count + 1
# Flag the re-run as a quarantine retry so the worker walks the
# already-found candidates (cached-first) before re-searching — the
# connection was fine, the content was just wrong. Dead-connection /
# stuck retries (handled elsewhere in the monitor) deliberately do NOT
# set this, so they re-search fresh.
task['_quarantine_retry'] = True
# Drop the stale download identity + the prior attempt's quarantine link.
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
task.pop('quarantine_entry_id', None)
task['status'] = 'searching'
task['status_change_time'] = time.time()
# Surface the retry progress to the UI ("attempt 2/5" next to the
# status while the task goes around again). Cleared implicitly on
# completion (UI only renders it for active/queued states).
task['retry_info'] = attempt_desc
task['retry_trigger'] = trigger
logger.info(
f"[Retry:{trigger}] Re-queuing task {task_id} for next-best candidate "
f"(attempt {attempt_desc})"
)
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
return True
def _is_release_task(task):
ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or ti.get('username')
return username in _RELEASE_SOURCE_NAMES
def _lookup_live_info(task, live_transfers_lookup):
ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
download_id = task.get('download_id')
if _is_release_task(task):
by_id = live_transfers_lookup.get(_download_id_key(download_id))
if by_id:
return by_id
task_filename = task.get('filename') or ti.get('filename')
task_username = task.get('username') or ti.get('username')
if not task_filename or not task_username:
return None
return live_transfers_lookup.get(_make_context_key(task_username, task_filename))
def init(
make_context_key,
on_download_completed,
download_track_worker,
run_post_processing_worker,
start_next_batch_of_downloads,
orphaned_download_keys,
missing_download_executor_obj,
download_orchestrator_obj,
):
"""Bind web_server-side helpers/globals so the class body can resolve them."""
global _make_context_key, _on_download_completed, _download_track_worker
global _run_post_processing_worker, _start_next_batch_of_downloads
global _orphaned_download_keys, missing_download_executor, download_orchestrator
_make_context_key = make_context_key
_on_download_completed = on_download_completed
_download_track_worker = download_track_worker
_run_post_processing_worker = run_post_processing_worker
_start_next_batch_of_downloads = start_next_batch_of_downloads
_orphaned_download_keys = orphaned_download_keys
missing_download_executor = missing_download_executor_obj
download_orchestrator = download_orchestrator_obj
class WebUIDownloadMonitor:
"""
Background monitor for download progress and retry logic, matching GUI's SyncStatusProcessingWorker.
Implements identical timeout detection and automatic retry functionality.
"""
def __init__(self):
self.monitoring = False
self.monitor_thread = None
self.monitored_batches = set()
self._lock = threading.Lock()
def start_monitoring(self, batch_id):
"""Start monitoring a download batch"""
with self._lock:
self.monitored_batches.add(batch_id)
if not self.monitoring:
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info(f"Started download monitor for batch {batch_id}")
def stop_monitoring(self, batch_id):
"""Stop monitoring a specific batch"""
with self._lock:
self.monitored_batches.discard(batch_id)
if not self.monitored_batches:
self.monitoring = False
logger.debug("Stopped download monitor (no active batches)")
def shutdown(self):
"""Stop the monitor loop and clear active batch tracking."""
with self._lock:
self.monitoring = False
self.monitored_batches.clear()
self.monitor_thread = None
logger.info("Download monitor shutdown requested")
def _monitor_loop(self):
"""Main monitoring loop - checks downloads every 1 second for responsive web UX"""
while self.monitoring and self.monitored_batches:
try:
if globals().get('IS_SHUTTING_DOWN', False):
self.monitoring = False
break
self._check_all_downloads()
time.sleep(1) # 1-second polling for fast web UI updates
except Exception as e:
# If we get shutdown errors, stop monitoring gracefully
if "interpreter shutdown" in str(e) or "cannot schedule new futures" in str(e):
logger.info("Monitor detected shutdown, stopping gracefully")
self.monitoring = False
break
logger.error(f"Download monitor error: {e}")
logger.info("Download monitor loop ended")
def _check_all_downloads(self):
"""Check all active downloads for timeouts and failures"""
current_time = time.time()
# Get live transfer data from slskd
live_transfers_lookup = self._get_live_transfers()
# Track tasks with exhausted retries to handle after releasing lock
exhausted_tasks = [] # List of (batch_id, task_id) tuples
# Track completed downloads to handle after releasing lock (prevents deadlock)
completed_tasks = [] # List of (batch_id, task_id) tuples
# Track deferred operations (network calls, nested locks) to run after releasing tasks_lock
deferred_ops = []
with tasks_lock:
# Check all monitored batches for timeouts and errors
for batch_id in list(self.monitored_batches):
if batch_id not in download_batches:
self.monitored_batches.discard(batch_id)
continue
for task_id in download_batches[batch_id].get('queue', []):
task = download_tasks.get(task_id)
if not task:
continue
release_recoverable = (
_is_release_task(task)
and task.get('download_id')
and task.get('status') in ['failed', 'not_found']
)
if task['status'] not in ['downloading', 'queued'] and not release_recoverable:
continue
# Check for timeouts and errors - retries handled directly in _should_retry_task
# If _should_retry_task returns True, it means retries were exhausted
retry_exhausted = False
if not release_recoverable:
retry_exhausted = self._should_retry_task(task_id, task, live_transfers_lookup, current_time, deferred_ops)
# Collect exhausted tasks to handle outside lock (prevents deadlock)
if retry_exhausted:
exhausted_tasks.append((batch_id, task_id))
# ENHANCED: Check for successful completions (especially YouTube).
# Release-style sources can report a completed audio file
# name that differs from the original indexer URL/title
# stored on the task, so prefer the stable download_id.
live_info = _lookup_live_info(task, live_transfers_lookup)
if live_info:
state = live_info.get('state', '')
# Trigger post-processing if download is completed successfully
# slskd uses compound states like 'Completed, Succeeded' - use substring matching
# Must exclude error states first (matching _build_batch_status_data's prioritized checking)
has_error = ('Errored' in state or 'Failed' in state or 'Rejected' in state or 'TimedOut' in state)
has_completion = ('Completed' in state or 'Succeeded' in state)
# Verify bytes actually transferred before trusting state string.
# slskd can report "Completed" before the full file is flushed to disk,
# or on connection drops that leave a partial file.
if has_completion and not has_error:
expected_size = live_info.get('size', 0)
transferred = live_info.get('bytesTransferred', 0)
if expected_size > 0 and transferred < expected_size:
if not task.get('_incomplete_warned'):
logger.debug("Monitor: %s state=%s but bytes incomplete (%s/%s) - waiting", task_id, state, transferred, expected_size)
task['_incomplete_warned'] = True
continue
if has_completion and not has_error and (
task['status'] == 'downloading' or release_recoverable
):
task.pop('_incomplete_warned', None)
# CRITICAL FIX: Transition to 'post_processing' HERE so downloads
# don't depend on browser polling to trigger post-processing.
# Previously, post-processing was only submitted by _build_batch_status_data
# (called from browser-polled endpoints), meaning closing the browser
# left tasks stuck in 'downloading' forever.
task['status'] = 'post_processing'
task['status_change_time'] = current_time
logger.info(f"Monitor detected completed download for {task_id} ({state}) - submitting post-processing")
# Collect for handling outside the lock to prevent deadlock.
# _on_download_completed acquires tasks_lock which is non-reentrant.
completed_tasks.append((batch_id, task_id))
# ---- All work below runs WITHOUT tasks_lock held ----
if globals().get('IS_SHUTTING_DOWN', False) or not self.monitoring:
return
# Execute deferred operations from _should_retry_task (network calls, nested locks)
for op in deferred_ops:
try:
if op[0] == 'cancel_download':
# Issue #648 diagnostic — `op` now carries a trigger
# label (4-tuple, was 3-tuple) so the next log dump
# tells us WHICH path in `_should_retry_task` is
# firing for users seeing "Tidal downloads failed to
# start" mass-cancels. Label format pinned in commit
# message for grep-ability.
if len(op) >= 4:
_, download_id, username, trigger = op[0], op[1], op[2], op[3]
else:
_, download_id, username = op
trigger = 'unlabeled'
logger.info(
f"[CancelTrigger:monitor.{trigger}] download_id={download_id} "
f"username={username}"
)
run_async(download_orchestrator.cancel_download(download_id, username, remove=True))
logger.debug(f"[Deferred] Successfully cancelled download {download_id}")
elif op[0] == 'cleanup_orphan':
_, context_key = op
with matched_context_lock:
matched_downloads_context.pop(context_key, None)
logger.debug(f"[Deferred] Cleaned up orphaned download context: {context_key}")
elif op[0] == 'restart_worker':
_, task_id, batch_id = op
logger.debug(f"[Deferred] Restarting worker for task {task_id}")
missing_download_executor.submit(_download_track_worker, task_id, batch_id)
logger.debug(f"[Deferred] Successfully restarted worker for task {task_id}")
except Exception as e:
logger.error(f"[Deferred] Error executing deferred operation {op[0]}: {e}")
# Handle completed transfers outside the lock. The transfer engine's
# "complete" state only means the remote download finished; the
# post-processing worker still has to find, verify, tag, and move the
# file before it can report real batch success or failure.
for batch_id, task_id in completed_tasks:
try:
# Submit post-processing worker (file move, tagging, AcoustID verification)
# This makes batch downloads fully independent of browser polling.
logger.info(f"[Monitor] Submitting post-processing worker for task {task_id}")
missing_download_executor.submit(_run_post_processing_worker, task_id, batch_id)
except Exception as e:
logger.error(f"[Monitor] Error handling completed task {task_id}: {e}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f'Post-processing could not be scheduled: {e}'
try:
_on_download_completed(batch_id, task_id, success=False)
except Exception as completion_error:
logger.error(
f"[Monitor] Error marking failed post-processing submit for task {task_id}: {completion_error}"
)
# Handle exhausted retry tasks outside the lock to prevent deadlock
for batch_id, task_id in exhausted_tasks:
try:
logger.info(f"[Monitor] Calling completion callback for exhausted task {task_id}")
_on_download_completed(batch_id, task_id, success=False)
except Exception as e:
logger.error(f"[Monitor] Error handling exhausted task {task_id}: {e}")
# ENHANCED: Add worker count validation to detect ghost workers
self._validate_worker_counts()
def _get_live_transfers(self):
"""Get current transfer status from slskd API and YouTube client"""
try:
# Check if we should stop due to shutdown
if not self.monitoring:
return {}
live_transfers = {}
# Only hit slskd API if soulseek is actually configured and active
dl_mode = config_manager.get('download_source.mode', 'hybrid')
hybrid_order = config_manager.get('download_source.hybrid_order', ['hifi', 'youtube', 'soulseek'])
soulseek_active = (dl_mode == 'soulseek' or
(dl_mode == 'hybrid' and 'soulseek' in hybrid_order))
# Get Soulseek downloads from API
transfers_data = None
_slsk = download_orchestrator.client('soulseek') if download_orchestrator and hasattr(download_orchestrator, 'client') else None
if soulseek_active and _slsk and _slsk.base_url:
transfers_data = run_async(download_orchestrator._make_request('GET', 'transfers/downloads'))
if transfers_data:
for user_data in transfers_data:
username = user_data.get('username', 'Unknown')
if 'directories' in user_data:
for directory in user_data['directories']:
if 'files' in directory:
for file_info in directory['files']:
key = _make_context_key(username, file_info.get('filename', ''))
live_transfers[key] = file_info
# Also get non-Soulseek downloads via the engine — single
# cross-source aggregation, no per-source iteration.
try:
all_downloads = []
if download_orchestrator and hasattr(download_orchestrator, 'engine'):
try:
# Exclude soulseek — slskd transfers were already
# pulled via the transfers/downloads endpoint above.
# Without the exclude both fetch paths run, doubling
# the per-tick slskd API hit.
all_downloads = run_async(
download_orchestrator.engine.get_all_downloads(exclude=('soulseek',))
)
except Exception as e:
logger.debug("get_all_downloads failed: %s", e)
for download in all_downloads:
key = _make_context_key(download.username, download.filename)
# Convert DownloadStatus to transfer dict format for monitor compatibility
transfer_row = {
'id': download.id,
'filename': download.filename,
'username': download.username,
'state': download.state,
'percentComplete': download.progress,
'size': download.size,
'bytesTransferred': download.transferred,
'averageSpeed': download.speed,
}
live_transfers[key] = transfer_row
id_key = _download_id_key(download.id)
if id_key:
live_transfers[id_key] = transfer_row
except Exception as yt_error:
logger.error(f"Monitor: Could not fetch streaming source downloads: {yt_error}")
return live_transfers
except Exception as e:
# If we get shutdown-related errors, stop monitoring immediately
if ("interpreter shutdown" in str(e) or
"cannot schedule new futures" in str(e) or
"Event loop is closed" in str(e)):
logger.info("Monitor detected shutdown, stopping immediately")
self.monitoring = False
return {}
else:
logger.error(f"Monitor: Could not fetch live transfers: {e}")
return {}
def _should_retry_task(self, task_id, task, live_transfers_lookup, current_time, deferred_ops):
"""
Determine if a task should be retried due to timeout (matches GUI logic).
IMPORTANT: This runs while tasks_lock is held. All network calls (slskd API)
and nested lock acquisitions (matched_context_lock) are collected into deferred_ops
to be executed AFTER releasing tasks_lock. This prevents deadlocks and long lock holds.
Returns True if retries are exhausted and _on_download_completed should be called outside the lock.
"""
ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
task_filename = task.get('filename') or ti.get('filename')
task_username = task.get('username') or ti.get('username')
if not task_filename or not task_username:
return False
lookup_key = _make_context_key(task_username, task_filename)
live_info = _lookup_live_info(task, live_transfers_lookup)
if not live_info:
# User-initiated manual pick — skip auto-retry. The status
# engine fallback owns the terminal transition for non-Soulseek
# manual downloads. Yanking the task back to 'searching' here
# would defeat the user's explicit selection.
if task.get('_user_manual_pick'):
return False
# Task not in live transfers but status is downloading/queued - likely stuck
if current_time - task.get('status_change_time', current_time) > 90:
retry_count = task.get('stuck_retry_count', 0)
last_retry = task.get('last_retry_time', 0)
if retry_count < 3 and (current_time - last_retry) > 30:
logger.warning(f"Task not in live transfers for >90s - retry {retry_count + 1}/3")
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
download_id = task.get('download_id')
# Defer slskd cancel to outside the lock
if task_username and download_id:
deferred_ops.append(('cancel_download', download_id, task_username,
'not_in_live_transfers_90s'))
# Mark current source as used (full filename to match worker format)
if task_username and task_filename:
used_sources = task.get('used_sources', set())
source_key = f"{task_username}_{task_filename}"
used_sources.add(source_key)
task['used_sources'] = used_sources
logger.warning(f"Marked missing-transfer source as used: {source_key}")
# Defer orphan cleanup
if task_username and task_filename:
_orphaned_download_keys.add(lookup_key)
deferred_ops.append(('cleanup_orphan', lookup_key))
# Clear download info and reset for retry
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
logger.warning(f"Task {task.get('track_info', {}).get('name', 'Unknown')} reset for missing-transfer retry")
batch_id = task.get('batch_id')
if task_id and batch_id:
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count < 3:
return False
else:
track_label = task.get('track_info', {}).get('name', 'Unknown')
tried_sources = task.get('used_sources', set())
sources_str = f' (tried {len(tried_sources)} source{"s" if len(tried_sources) != 1 else ""})' if tried_sources else ''
logger.error("Task failed after 3 retry attempts (not in live transfers)")
task['status'] = 'failed'
task['error_message'] = f'Download disappeared from transfer list 3 times for "{track_label}"{sources_str} — source may be unavailable'
batch_id = task.get('batch_id')
if batch_id:
return True
return False
return False
state_str = live_info.get('state', '')
progress = live_info.get('percentComplete', 0)
# IMMEDIATE ERROR RETRY: Check for errored/rejected/timed-out downloads first (no timeout needed)
if 'Errored' in state_str or 'Failed' in state_str or 'Rejected' in state_str or 'TimedOut' in state_str:
# Same manual-pick guard as the not-in-live-transfers path —
# user explicitly selected this candidate, surface the failure.
if task.get('_user_manual_pick'):
return False
retry_count = task.get('error_retry_count', 0)
last_retry = task.get('last_error_retry_time', 0)
# Don't retry too frequently (wait at least 5 seconds between error retries)
if retry_count < 3 and (current_time - last_retry) > 5: # Max 3 error retry attempts
logger.error(f"Task errored (state: {state_str}) - immediate retry {retry_count + 1}/3")
task['error_retry_count'] = retry_count + 1
task['last_error_retry_time'] = current_time
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or _ti.get('username')
filename = task.get('filename') or _ti.get('filename')
download_id = task.get('download_id')
# Defer slskd cancel to outside the lock
if username and download_id:
deferred_ops.append(('cancel_download', download_id, username,
'errored_state_retry'))
# Mark current source as used to prevent retry loops
# CRITICAL: Use full filename (not basename) to match worker's source_key format
if username and filename:
used_sources = task.get('used_sources', set())
source_key = f"{username}_{filename}"
used_sources.add(source_key)
task['used_sources'] = used_sources
logger.error(f"Marked errored source as used: {source_key}")
# Defer orphan cleanup to outside the lock (needs matched_context_lock)
if username and filename:
old_context_key = _make_context_key(username, filename)
_orphaned_download_keys.add(old_context_key)
deferred_ops.append(('cleanup_orphan', old_context_key))
# Clear download info since we cancelled it
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
# Reset task state for immediate retry
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
logger.error(f"Task {task.get('track_info', {}).get('name', 'Unknown')} reset for error retry")
# Defer worker restart to outside the lock
batch_id = task.get('batch_id')
if task_id and batch_id:
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count < 3:
# Wait a bit before next error retry
return False
else:
# Too many error retries, mark as failed
track_label = task.get('track_info', {}).get('name', 'Unknown')
tried_sources = task.get('used_sources', set())
sources_str = f' (tried {len(tried_sources)} source{"s" if len(tried_sources) != 1 else ""})' if tried_sources else ''
logger.error("Task failed after 3 error retry attempts")
task['status'] = 'failed'
# Tidal-specific error: check if this was a quality issue.
# task['username'] is popped on error-retry (line ~2866) so we can't rely on it;
# used_sources keys are formatted as "{username}_{filename}", so startswith is exact.
is_tidal = any(s.startswith('tidal_') for s in tried_sources)
if is_tidal:
tidal_quality = config_manager.get('tidal_download.quality', 'lossless')
allow_fb = config_manager.get('tidal_download.allow_fallback', True)
if tidal_quality == 'hires' and not allow_fb:
task['error_message'] = (
f'Tidal download failed for "{track_label}" — HiRes quality is unavailable for this track '
f'on your account or in your region. Enable "Quality Fallback" in Tidal settings to fall back to Lossless.'
)
else:
task['error_message'] = (
f'Tidal download failed for "{track_label}"{sources_str}'
f'check Tidal authentication and quality settings.'
)
else:
task['error_message'] = f'Soulseek transfer errored 3 times for "{track_label}"{sources_str} — all sources failed or became unavailable'
# CRITICAL: Notify batch manager so track is added to permanently_failed_tracks
batch_id = task.get('batch_id')
if batch_id:
logger.error(f"[Retry Exhausted] Notifying batch manager of permanent failure for task {task_id}")
return True # Signal that we need to call completion outside the lock
return False
# Check for queued timeout (90 seconds like GUI)
elif 'Queued' in state_str or task['status'] == 'queued':
if 'queued_start_time' not in task:
task['queued_start_time'] = current_time
return False
else:
queue_time = current_time - task['queued_start_time']
# Use context-aware timeouts like GUI:
# - 15 seconds for artist album downloads (streaming context)
# - 90 seconds for background playlist downloads
is_streaming_context = task.get('track_info', {}).get('is_album_download', False)
timeout_threshold = 15.0 if is_streaming_context else 90.0
if queue_time > timeout_threshold:
# Track retry attempts to prevent rapid loops
retry_count = task.get('stuck_retry_count', 0)
last_retry = task.get('last_retry_time', 0)
# Don't retry too frequently (wait at least 30 seconds between retries)
if retry_count < 3 and (current_time - last_retry) > 30: # Max 3 retry attempts
logger.warning(f"Task stuck in queue for {queue_time:.1f}s - immediate retry {retry_count + 1}/3")
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or _ti.get('username')
filename = task.get('filename') or _ti.get('filename')
download_id = task.get('download_id')
# Defer slskd cancel to outside the lock
if username and download_id:
deferred_ops.append(('cancel_download', download_id, username,
'queued_state_timeout'))
# UNIFIED RETRY LOGIC: Handle timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
# CRITICAL: Use full filename (not basename) to match worker's source_key format
if username and filename:
used_sources = task.get('used_sources', set())
source_key = f"{username}_{filename}"
used_sources.add(source_key)
task['used_sources'] = used_sources
logger.error(f"Marked timeout source as used: {source_key}")
# Defer orphan cleanup to outside the lock (needs matched_context_lock)
if username and filename:
old_context_key = _make_context_key(username, filename)
_orphaned_download_keys.add(old_context_key)
deferred_ops.append(('cleanup_orphan', old_context_key))
# Clear download info since we cancelled it
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
# Reset task state for immediate retry (like error retry)
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
logger.error(f"Task {task.get('track_info', {}).get('name', 'Unknown')} reset for timeout retry")
# Defer worker restart to outside the lock
batch_id = task.get('batch_id')
if task_id and batch_id:
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count < 3:
# Wait longer before next retry
return False
else:
# Too many retries, mark as failed
track_label = task.get('track_info', {}).get('name', 'Unknown')
tried_sources = task.get('used_sources', set())
sources_str = f' (tried {len(tried_sources)} source{"s" if len(tried_sources) != 1 else ""})' if tried_sources else ''
logger.error("Task failed after 3 retry attempts (queue timeout)")
task['status'] = 'failed'
task['error_message'] = f'Download stayed queued too long 3 times for "{track_label}"{sources_str} — peers may be offline or have full queues'
# Clear timers to prevent further retry loops
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
# CRITICAL: Notify batch manager so track is added to permanently_failed_tracks
batch_id = task.get('batch_id')
if batch_id:
logger.error(f"[Retry Exhausted] Notifying batch manager of permanent failure for task {task_id}")
return True # Signal that we need to call completion outside the lock
return False
# Check for downloading at 0% timeout (90 seconds like GUI)
elif 'InProgress' in state_str and progress < 1:
if 'downloading_start_time' not in task:
task['downloading_start_time'] = current_time
return False
else:
download_time = current_time - task['downloading_start_time']
# Use context-aware timeouts like GUI:
# - 15 seconds for artist album downloads (streaming context)
# - 90 seconds for background playlist downloads
is_streaming_context = task.get('track_info', {}).get('is_album_download', False)
timeout_threshold = 15.0 if is_streaming_context else 90.0
if download_time > timeout_threshold:
retry_count = task.get('stuck_retry_count', 0)
last_retry = task.get('last_retry_time', 0)
# Don't retry too frequently (wait at least 30 seconds between retries)
if retry_count < 3 and (current_time - last_retry) > 30: # Max 3 retry attempts
logger.warning(f"Task stuck at 0% for {download_time:.1f}s - immediate retry {retry_count + 1}/3")
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or _ti.get('username')
filename = task.get('filename') or _ti.get('filename')
download_id = task.get('download_id')
# Defer slskd cancel to outside the lock
if username and download_id:
deferred_ops.append(('cancel_download', download_id, username,
'stuck_at_0pct_timeout'))
# UNIFIED RETRY LOGIC: Handle 0% timeout retry exactly like error retry
# Mark current source as used to prevent retry loops
# CRITICAL: Use full filename (not basename) to match worker's source_key format
if username and filename:
used_sources = task.get('used_sources', set())
source_key = f"{username}_{filename}"
used_sources.add(source_key)
task['used_sources'] = used_sources
logger.info(f"Marked 0% progress source as used: {source_key}")
# Defer orphan cleanup to outside the lock (needs matched_context_lock)
if username and filename:
old_context_key = _make_context_key(username, filename)
_orphaned_download_keys.add(old_context_key)
deferred_ops.append(('cleanup_orphan', old_context_key))
# Clear download info since we cancelled it
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
# Reset task state for immediate retry (like error retry)
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
logger.warning(f"Task {task.get('track_info', {}).get('name', 'Unknown')} reset for 0% retry")
# Defer worker restart to outside the lock
batch_id = task.get('batch_id')
if task_id and batch_id:
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count < 3:
# Wait longer before next retry
return False
else:
track_label = task.get('track_info', {}).get('name', 'Unknown')
tried_sources = task.get('used_sources', set())
sources_str = f' (tried {len(tried_sources)} source{"s" if len(tried_sources) != 1 else ""})' if tried_sources else ''
logger.error("Task failed after 3 retry attempts (0% progress timeout)")
task['status'] = 'failed'
task['error_message'] = f'Download stuck at 0% three times for "{track_label}"{sources_str} — peers may have connection issues'
# Clear timers to prevent further retry loops
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
# CRITICAL: Notify batch manager so track is added to permanently_failed_tracks
batch_id = task.get('batch_id')
if batch_id:
logger.error(f"[Retry Exhausted] Notifying batch manager of permanent failure for task {task_id}")
return True # Signal that we need to call completion outside the lock
return False
else:
# Only reset timers if actual byte progress is being made
bytes_transferred = live_info.get('bytesTransferred', 0)
if progress >= 1 or bytes_transferred > 0:
# Real progress happening, reset timers and retry counts
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task.pop('stuck_retry_count', None)
else:
# Unknown state with no progress (e.g., "Requested", "Initializing")
# Treat like 0% stuck — start/keep the downloading timer running
if 'downloading_start_time' not in task:
task['downloading_start_time'] = current_time
download_time = current_time - task['downloading_start_time']
# Use context-aware timeouts
is_streaming_context = task.get('track_info', {}).get('is_album_download', False)
timeout_threshold = 15.0 if is_streaming_context else 90.0
if download_time > timeout_threshold:
retry_count = task.get('stuck_retry_count', 0)
last_retry = task.get('last_retry_time', 0)
if retry_count < 3 and (current_time - last_retry) > 30:
logger.warning(f"Task stuck in unknown state '{state_str}' with 0 progress for {download_time:.1f}s - retry {retry_count + 1}/3")
task['stuck_retry_count'] = retry_count + 1
task['last_retry_time'] = current_time
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or _ti.get('username')
filename = task.get('filename') or _ti.get('filename')
download_id = task.get('download_id')
if username and download_id:
deferred_ops.append(('cancel_download', download_id, username,
'unknown_state_no_progress_timeout'))
if username and filename:
used_sources = task.get('used_sources', set())
source_key = f"{username}_{filename}"
used_sources.add(source_key)
task['used_sources'] = used_sources
logger.info(f"Marked unknown-state source as used: {source_key}")
if username and filename:
old_context_key = _make_context_key(username, filename)
_orphaned_download_keys.add(old_context_key)
deferred_ops.append(('cleanup_orphan', old_context_key))
task.pop('download_id', None)
task.pop('username', None)
task.pop('filename', None)
task['status'] = 'searching'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
task['status_change_time'] = current_time
batch_id = task.get('batch_id')
if task_id and batch_id:
deferred_ops.append(('restart_worker', task_id, batch_id))
return False
elif retry_count >= 3:
track_label = task.get('track_info', {}).get('name', 'Unknown')
tried_sources = task.get('used_sources', set())
sources_str = f' (tried {len(tried_sources)} source{"s" if len(tried_sources) != 1 else ""})' if tried_sources else ''
logger.error(f"Task failed after 3 retry attempts (unknown state '{state_str}')")
task['status'] = 'failed'
task['error_message'] = f'Download stuck in "{state_str}" state 3 times for "{track_label}"{sources_str}'
task.pop('queued_start_time', None)
task.pop('downloading_start_time', None)
batch_id = task.get('batch_id')
if batch_id:
return True
return False
return False
def _validate_worker_counts(self):
"""
Validate worker counts to detect and fix ghost workers or orphaned tasks.
This prevents the modal from showing wrong worker counts permanently.
"""
try:
batches_needing_workers = []
with tasks_lock:
for batch_id in list(self.monitored_batches):
if batch_id not in download_batches:
continue
batch = download_batches[batch_id]
reported_active = batch['active_count']
max_concurrent = batch['max_concurrent']
queue = batch.get('queue', [])
queue_index = batch.get('queue_index', 0)
# Count actually active tasks based on status
actually_active = 0
orphaned_tasks = []
# Tasks already processed by _on_download_completed should NOT be counted
# as active, even if their status hasn't been updated yet (race condition
# between stream processor calling _on_download_completed and
# _run_post_processing_worker setting status to 'completed')
completed_task_ids = batch.get('_completed_task_ids', set())
for task_id in queue:
if task_id in download_tasks:
task_status = download_tasks[task_id]['status']
if task_status in ['searching', 'downloading', 'queued', 'post_processing']:
if task_id not in completed_task_ids:
actually_active += 1
elif task_status in ['failed', 'completed', 'cancelled', 'not_found'] and task_id in queue[queue_index:]:
# These are orphaned tasks - they're done but still in active queue
orphaned_tasks.append(task_id)
# Check for discrepancies
if reported_active != actually_active or orphaned_tasks:
logger.warning(f"[Worker Validation] Batch {batch_id}: reported={reported_active}, actual={actually_active}, orphaned={len(orphaned_tasks)}")
if orphaned_tasks:
logger.warning(f"[Worker Validation] Found {len(orphaned_tasks)} orphaned tasks to cleanup")
# Fix the active count if it's wrong
if reported_active != actually_active:
old_count = batch['active_count']
batch['active_count'] = actually_active
logger.info(f"[Worker Validation] Fixed active count: {old_count}{actually_active}")
# Defer starting workers to outside the lock
if actually_active < max_concurrent and queue_index < len(queue):
batches_needing_workers.append(batch_id)
# Start replacement workers outside the lock
for batch_id in batches_needing_workers:
try:
logger.info(f"[Worker Validation] Starting replacement workers for {batch_id}")
_start_next_batch_of_downloads(batch_id)
except Exception as e:
logger.error(f"[Worker Validation] Error starting workers for {batch_id}: {e}")
except Exception as validation_error:
logger.error(f"Error in worker count validation: {validation_error}")