You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/downloads/status.py

947 lines
46 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""Batch + unified download status helpers.
`build_batch_status_data` is the per-batch payload formatter shared by:
- /api/playlists/<batch_id>/download_status (single batch)
- /api/download_status/batch (multiple batches in one call)
It's NOT pure read-only — it has a safety valve that mutates task state
when slskd reports terminal-but-stuck downloads, and it submits the
post-processing worker when slskd reports 'Succeeded' or when a stuck
file is recovered. Those side effects are preserved exactly.
`build_unified_downloads_response` powers /api/downloads/all — flattens
all tasks across batches into one sorted list with per-row metadata for
the centralized Downloads page.
Lifted verbatim from web_server.py. Dependencies that touch the live
runtime (config, file finder, post-processing submitter, transfer cache)
are passed via `StatusDeps` so the module is web_server-import-free.
"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Optional
from core.runtime_state import (
download_batches,
download_tasks,
tasks_lock,
)
from utils.logging_config import get_logger
# Project logger factory so these lines reach app.log (soulsync.* namespace).
logger = get_logger("downloads.status")
# #836 backstop: how long an slskd error state (Rejected/Failed/Errored/TimedOut)
# may persist on a non-manual task before the status formatter gives up on the
# retry monitor and marks it failed. The monitor's own retry window is ~15s
# (3 × 5s); this is well beyond it so a healthy retry always wins, and it only
# fires when the monitor genuinely can't make progress (e.g. a rejected transfer
# with no other source) — which otherwise hangs the task at 'downloading 0%'
# forever and blocks the whole batch from completing.
ERROR_STATE_TERMINAL_GRACE_SECONDS = 60
def _schedule_completion_callback(deps, batch_id: str, task_id: str, success: bool) -> None:
"""Fire ``deps.on_download_completed`` on a one-shot daemon thread so
the caller can hold ``tasks_lock`` without deadlocking.
``on_download_completed`` re-acquires ``tasks_lock`` (it removes the
completed task from the batch's active set, decrements active_count,
and may submit the next queued worker). Calling it synchronously
from within ``build_batch_status_data`` — which is invoked under the
same Lock — would self-deadlock since ``threading.Lock`` is not
reentrant. A daemon thread defers the call until after the lock is
released.
"""
if deps.on_download_completed is None:
return
def _run():
try:
deps.on_download_completed(batch_id, task_id, success)
except Exception as exc:
logger.error(
"[Status] deferred on_download_completed raised for task %s: %s",
task_id, exc,
)
threading.Thread(
target=_run,
name=f"on-completed-{task_id[:8]}",
daemon=True,
).start()
@dataclass
class StatusDeps:
"""Cross-cutting deps the status helpers need."""
config_manager: Any
docker_resolve_path: Callable[[str], str]
find_completed_file: Callable
make_context_key: Callable[[str, str], str]
submit_post_processing: Callable[[str, str], None] # (task_id, batch_id) -> None
get_cached_transfer_data: Callable[[], dict]
# Engine-state fallback for non-Soulseek (streaming) downloads.
# Without these, YouTube/Tidal/Qobuz/HiFi/Deezer/SoundCloud/Lidarr
# tasks never appear in live_transfers_lookup so their status never
# advances out of 'downloading 0%'.
download_orchestrator: Any = None
run_async: Optional[Callable] = None
on_download_completed: Optional[Callable[[str, str, bool], None]] = None
get_persistent_download_history: Optional[Callable[[int], list[dict]]] = None
# Returns ALL library_history rows with verification_status in
# ('unverified', 'force_imported') — no recency limit, so historical
# entries are never buried by the general history tail cap.
get_unverified_download_history: Optional[Callable[[], list[dict]]] = None
# Streaming sources the engine fallback applies to. Soulseek goes through
# slskd's live_transfers path and must NOT hit the engine fallback.
_STREAMING_SOURCE_NAMES = frozenset((
'youtube', 'tidal', 'qobuz', 'hifi', 'deezer_dl', 'lidarr', 'soundcloud', 'amazon',
'torrent', 'usenet',
))
_RELEASE_SOURCE_NAMES = frozenset(('torrent', 'usenet'))
# Keep these in sync with the engine plugins' state strings.
_ENGINE_FAILURE_STATES = ('Errored', 'Failed', 'Rejected', 'TimedOut', 'Aborted')
_ENGINE_CANCELLED_STATES = ('Cancelled', 'Canceled')
_ENGINE_SUCCESS_STATES = ('Succeeded', 'Completed, Succeeded')
def _engine_state_str(record: Any) -> str:
if record is None:
return ''
state = getattr(record, 'state', None)
if state is None and isinstance(record, dict):
state = record.get('state')
return str(state) if state is not None else ''
def _engine_progress_pct(record: Any) -> float:
if record is None:
return 0
progress = getattr(record, 'progress', None)
if progress is None and isinstance(record, dict):
progress = record.get('progress')
try:
progress = float(progress)
except (TypeError, ValueError):
return 0
if progress <= 1.0:
progress *= 100
return progress
def _apply_engine_state_fallback(
task_id: str,
task: dict,
task_status: dict,
batch_id: str,
deps: StatusDeps,
) -> None:
"""Populate ``task_status`` from the download engine's per-source
record when the task isn't in ``live_transfers_lookup`` — i.e. it's
a non-Soulseek streaming source. Mirrors the Soulseek branch's
Cancelled → Failed → Succeeded → InProgress priority order so
compound states like ``"Completed, Errored"`` hit the failure branch
first.
Mutates ``task`` in place (status / error_message) the same way the
Soulseek branch does, so the next status poll sees the new state.
Submits post-processing on terminal success. Manual-pick failures
are completed here; automatic failures keep the existing retry path
in charge.
"""
if deps.download_orchestrator is None or deps.run_async is None:
return
download_id = task.get('download_id')
if not download_id:
return
ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
username = task.get('username') or ti.get('username')
if username not in _STREAMING_SOURCE_NAMES:
return
manual_pick = bool(task.get('_user_manual_pick'))
if not manual_pick and username not in _RELEASE_SOURCE_NAMES:
return
release_source = username in _RELEASE_SOURCE_NAMES
terminal_status = task.get('status') in ('completed', 'failed', 'cancelled', 'not_found', 'post_processing')
if terminal_status and not (
release_source
and task.get('status') in ('failed', 'not_found')
and download_id
):
return
try:
record = deps.run_async(
deps.download_orchestrator.get_download_status(download_id)
)
except Exception as exc:
logger.debug(
"[Engine Fallback] get_download_status(%s) raised: %s",
download_id, exc,
)
return
if record is None:
return
state_str = _engine_state_str(record)
if not state_str:
return
if any(s in state_str for s in _ENGINE_CANCELLED_STATES):
if task['status'] != 'cancelled':
task['status'] = 'cancelled'
err = getattr(record, 'error_message', None) or getattr(record, 'error', None) or ''
if err:
task['error_message'] = str(err)
_schedule_completion_callback(deps, batch_id, task_id, False)
task_status['status'] = 'cancelled'
task_status['progress'] = _engine_progress_pct(record)
return
if any(s in state_str for s in _ENGINE_FAILURE_STATES):
if not manual_pick:
task_status['status'] = task.get('status', 'downloading')
task_status['progress'] = _engine_progress_pct(record)
logger.info(
"[Engine Fallback] Task %s engine reports '%s' for auto %s attempt; leaving retry handling to monitor",
task_id, state_str, username,
)
return
if task['status'] != 'failed':
task['status'] = 'failed'
err = getattr(record, 'error_message', None) or getattr(record, 'error', None) or ''
task['error_message'] = (
str(err) if err
else f'{username} download failed (engine state: {state_str})'
)
logger.info(
"[Engine Fallback] Task %s engine reports '%s' — marking failed",
task_id, state_str,
)
_schedule_completion_callback(deps, batch_id, task_id, False)
task_status['status'] = 'failed'
task_status['error_message'] = task.get('error_message')
task_status['progress'] = _engine_progress_pct(record)
return
if any(s in state_str for s in _ENGINE_SUCCESS_STATES):
if task['status'] != 'post_processing':
task['status'] = 'post_processing'
logger.info(
"[Engine Fallback] Task %s engine reports '%s' — starting post-processing verification",
task_id, state_str,
)
try:
deps.submit_post_processing(task_id, batch_id)
except Exception as exc:
logger.error(
"[Engine Fallback] submit_post_processing raised for task %s: %s",
task_id, exc,
)
task_status['status'] = 'post_processing'
task_status['progress'] = 95
return
if 'InProgress' in state_str:
task_status['status'] = 'downloading'
if task['status'] in ('searching', 'queued'):
task['status'] = 'downloading'
elif 'Queued' in state_str:
task_status['status'] = 'queued'
task_status['progress'] = _engine_progress_pct(record)
def build_batch_status_data(batch_id: str, batch: dict, live_transfers_lookup: dict, deps: StatusDeps) -> dict:
"""Build status payload for a single batch.
Includes a safety-valve that mutates stuck task state and submits the
post-processing worker when slskd reports 'Succeeded' or when a
stuck-but-recovered file is found on disk.
"""
response_data = {
"phase": batch.get('phase', 'unknown'),
"error": batch.get('error'),
"auto_initiated": batch.get('auto_initiated', False),
"playlist_id": batch.get('playlist_id'), # Include playlist_id for rehydration
"playlist_name": batch.get('playlist_name'), # Include playlist_name for reference
}
album_bundle = _build_album_bundle_status(batch)
if album_bundle:
response_data['album_bundle'] = album_bundle
if response_data["phase"] in ('analysis', 'queued'):
# Surface analysis_progress for both phases — ``queued`` rows
# haven't started analysis yet (processed=0) but the UI still
# needs ``total`` so it can render "Queued — N tracks". The
# master worker flips phase from ``queued`` to ``analysis`` on
# entry (see ``core/downloads/master.py:328``); the wishlist
# submission sites pre-populate ``analysis_total`` so the
# queued state isn't shape-mismatched against the analysis
# state for downstream consumers.
response_data['analysis_progress'] = {
'total': batch.get('analysis_total', 0),
'processed': batch.get('analysis_processed', 0),
}
response_data['analysis_results'] = batch.get('analysis_results', [])
elif response_data["phase"] in ['downloading', 'complete', 'error']:
response_data['analysis_results'] = batch.get('analysis_results', [])
batch_tasks = []
for task_id in batch.get('queue', []):
task = download_tasks.get(task_id)
if not task:
continue
# SAFETY VALVE: Check for downloads stuck too long
current_time = time.time()
task_start_time = task.get('status_change_time', current_time)
task_age = current_time - task_start_time
# If task has been running too long, check if file completed
_dl_timeout = deps.config_manager.get('soulseek.download_timeout', 600) or 600
if task_age > _dl_timeout and task['status'] in ['downloading', 'queued', 'searching']:
stuck_state = task['status']
task_filename = task.get('filename') or (task.get('track_info') or {}).get('filename')
# Before failing, check if the file actually downloaded successfully
recovered = False
if task_filename and stuck_state == 'downloading':
try:
download_dir = deps.docker_resolve_path(deps.config_manager.get('soulseek.download_path', './downloads'))
transfer_dir = deps.docker_resolve_path(deps.config_manager.get('soulseek.transfer_path', './Transfer'))
found_file, file_location = deps.find_completed_file(download_dir, task_filename, transfer_dir)
if found_file:
logger.info(f"[Safety Valve] Task {task_id} stuck but file found in {file_location} — routing to post-processing")
task['status'] = 'post_processing'
task['status_change_time'] = current_time
deps.submit_post_processing(task_id, batch_id)
recovered = True
except Exception as e:
logger.error(f"[Safety Valve] Error checking for completed file: {e}")
if not recovered:
if stuck_state == 'searching':
logger.info(f"⏰ [Safety Valve] Task {task_id} stuck in searching for {task_age:.1f}s - marking not_found")
task['status'] = 'not_found'
task['error_message'] = f'Search stuck for {int(task_age // 60)} minutes with no results — timed out'
else:
logger.error(f"⏰ [Safety Valve] Task {task_id} stuck for {task_age:.1f}s - forcing failure")
task['status'] = 'failed'
task['error_message'] = f'Task stuck in {stuck_state} state for {int(task_age // 60)} minutes — forcibly stopped'
task_status = {
'task_id': task_id,
'track_index': task['track_index'],
'status': task['status'],
'track_info': task['track_info'],
'progress': 0,
# V2 SYSTEM: Add persistent state information
'cancel_requested': task.get('cancel_requested', False),
'cancel_timestamp': task.get('cancel_timestamp'),
'ui_state': task.get('ui_state', 'normal'), # normal|cancelling|cancelled
'playlist_id': task.get('playlist_id'), # For V2 system identification
'error_message': task.get('error_message'), # Surface failure reasons to UI
'quarantine_entry_id': task.get('quarantine_entry_id'),
'has_candidates': bool(task.get('cached_candidates')), # Whether search found results (for clickable review)
# 'verified' / 'unverified' / 'force_imported' — set by the
# import pipeline once post-processing finishes.
'verification_status': task.get('verification_status'),
# "2/5" while the quarantine-retry engine walks candidates.
'retry_info': task.get('retry_info'),
'retry_trigger': task.get('retry_trigger'),
}
_ti = task.get('track_info') if isinstance(task.get('track_info'), dict) else {}
task_filename = task.get('filename') or _ti.get('filename')
task_username = task.get('username') or _ti.get('username')
if (
task_username in _RELEASE_SOURCE_NAMES
and task['status'] not in ['completed', 'cancelled', 'post_processing']
and task.get('download_id')
):
_apply_engine_state_fallback(
task_id, task, task_status, batch_id, deps,
)
batch_tasks.append(task_status)
continue
if task_filename and task_username:
lookup_key = deps.make_context_key(task_username, task_filename)
if lookup_key in live_transfers_lookup:
live_info = live_transfers_lookup[lookup_key]
state_str = live_info.get('state', 'Unknown')
# Don't override tasks that are already in terminal states or post-processing
if task['status'] not in ['completed', 'failed', 'cancelled', 'not_found', 'post_processing']:
# SYNC.PY PARITY: Prioritized state checking (Errored/Cancelled before Completed)
# This prevents "Completed, Errored" states from being marked as completed
if 'Cancelled' in state_str or 'Canceled' in state_str:
task_status['status'] = 'cancelled'
task['status'] = 'cancelled'
elif 'Failed' in state_str or 'Errored' in state_str or 'Rejected' in state_str or 'TimedOut' in state_str:
# User-initiated manual pick — surface the failure
# immediately. The monitor's auto-retry path is gated
# on `_user_manual_pick` and won't fire, so deferring
# to it would leave the task stuck at 'downloading 0%'
# forever. Mark failed here and free the worker slot.
if task.get('_user_manual_pick'):
err_msg = live_info.get('errorMessage') or live_info.get('error') or ''
task['status'] = 'failed'
task['error_message'] = (
str(err_msg) if err_msg
else f'Manual pick failed (state: {state_str})'
)
task_status['status'] = 'failed'
task_status['error_message'] = task['error_message']
logger.info(
f"[Manual Pick] Task {task_id} engine reports '{state_str}' — marking failed"
)
# NOTE: caller (build_batched_status) holds
# tasks_lock. on_download_completed re-acquires
# the same Lock — synchronous call would
# deadlock. Spawn a thread so it runs after we
# release the lock.
_schedule_completion_callback(deps, batch_id, task_id, False)
else:
# Normally the retry monitor picks up an errored state and
# retries within ~15s. But if it can't make progress — e.g. an
# slskd transfer rejected with no other source — the task would
# otherwise sit at 'downloading 0%' forever, spam an ERROR every
# poll, AND block its batch from ever completing (#836: a rejected
# wishlist track, or rejected tracks in an album download).
#
# Backstop: measure how long the ERROR state has persisted (not
# how long the task has downloaded, so a slow-but-healthy transfer
# isn't failed). Once it exceeds the monitor's retry window with no
# resolution, mark the task failed so the worker frees and the
# batch can finish. A working retry transitions the task out of the
# error state first, clearing the timer below — so the healthy path
# never hits this.
# A monitor retry transitions the task (newer
# status_change_time), which restarts the window so each
# error EPISODE gets a fresh grace. If the monitor never
# transitions it (the stuck case), the window keeps growing.
err_since = task.get('_error_state_since')
if err_since is None or task.get('status_change_time', 0) > err_since:
task['_error_state_since'] = err_since = current_time
task.pop('_error_state_logged', None)
error_age = current_time - err_since
if error_age > ERROR_STATE_TERMINAL_GRACE_SECONDS:
err_msg = live_info.get('errorMessage') or live_info.get('error') or ''
task['status'] = 'failed'
task['error_message'] = (
str(err_msg) if err_msg
else f'Download failed (state: {state_str})'
)
task_status['status'] = 'failed'
task_status['error_message'] = task['error_message']
logger.warning(
f"Task {task_id} stuck in error state '{state_str}' for "
f"{error_age:.0f}s with no retry progress — marking failed (#836)"
)
_schedule_completion_callback(deps, batch_id, task_id, False)
else:
# Within the retry window — keep current status so the monitor
# can act. Log once per episode, not every poll, to stop the
# 2-second ERROR spam the reporter saw.
if not task.get('_error_state_logged'):
logger.warning(
f"Task {task_id} API shows error state: {state_str} "
f"- letting monitor handle retry"
)
task['_error_state_logged'] = True
if task['status'] in ['searching', 'downloading', 'queued']:
task_status['status'] = task['status'] # Keep current status for monitor
else:
task_status['status'] = 'downloading' # Default to downloading for error detection
task['status'] = 'downloading'
elif 'Completed' in state_str or 'Succeeded' in state_str:
# Verify bytes actually transferred before trusting state string
expected_size = live_info.get('size', 0)
transferred = live_info.get('bytesTransferred', 0)
if expected_size > 0 and transferred < expected_size:
# State says complete but bytes don't match — keep current status
task_status['status'] = task['status']
logger.info(f"Task {task_id} state says complete but bytes incomplete ({transferred}/{expected_size})")
# NEW VERIFICATION WORKFLOW: Use intermediate post_processing status
# Only set this status once to prevent multiple worker submissions
elif task['status'] != 'post_processing':
task_status['status'] = 'post_processing'
task['status'] = 'post_processing'
logger.info(f"Task {task_id} API reports 'Succeeded' - starting post-processing verification")
# Submit post-processing worker to verify file and complete the task
deps.submit_post_processing(task_id, batch_id)
else:
# FIXED: Always require verification workflow - no bypass for stream processed tasks
# Stream processing only handles metadata, not file verification
task_status['status'] = 'post_processing'
logger.info(f"Task {task_id} waiting for verification worker to complete")
elif 'InProgress' in state_str:
task_status['status'] = 'downloading'
else:
task_status['status'] = 'queued'
task_status['progress'] = live_info.get('percentComplete', 0)
# For completed/post-processing tasks, keep appropriate progress
elif task['status'] == 'completed':
task_status['progress'] = 100
elif task['status'] == 'post_processing':
task_status['progress'] = 95 # Nearly complete, just verifying
else:
# If task is completed but not in live transfers, keep appropriate status
if task['status'] == 'completed':
task_status['progress'] = 100
elif task['status'] == 'post_processing':
task_status['progress'] = 95 # Nearly complete, just verifying
else:
# Non-Soulseek (streaming) sources don't appear in
# slskd's live_transfers_lookup — poll the engine
# directly so YouTube/Tidal/Qobuz/HiFi/Deezer/
# SoundCloud/Lidarr tasks actually advance out of
# 'downloading 0%' instead of staying there forever.
_apply_engine_state_fallback(
task_id, task, task_status, batch_id, deps,
)
batch_tasks.append(task_status)
batch_tasks.sort(key=lambda x: x['track_index'])
response_data['tasks'] = batch_tasks
# CRITICAL: Add batch worker management metadata (was missing!)
# This is essential for client-side worker validation and prevents false desync warnings
response_data['active_count'] = batch.get('active_count', 0)
response_data['max_concurrent'] = batch.get('max_concurrent', 3)
# Add wishlist summary if batch is complete (matching sync.py behavior)
if response_data["phase"] == 'complete' and 'wishlist_summary' in batch:
response_data['wishlist_summary'] = batch['wishlist_summary']
return response_data
# ---------------------------------------------------------------------------
# Route-shaped builders
# ---------------------------------------------------------------------------
def build_single_batch_status(batch_id: str, deps: StatusDeps) -> tuple[Optional[dict], int]:
"""For /api/playlists/<batch_id>/download_status. Returns (response, status)."""
live_transfers_lookup = deps.get_cached_transfer_data()
with tasks_lock:
if batch_id not in download_batches:
return {"error": "Batch not found"}, 404
batch = download_batches[batch_id]
return build_batch_status_data(batch_id, batch, live_transfers_lookup, deps), 200
def build_batched_status(requested_batch_ids: list, deps: StatusDeps) -> dict:
"""For /api/download_status/batch. Returns the full response dict (always 200).
When a requested batch carries a ``wishlist_run_id`` (Phase 1c.2.1
per-album split), the response merges in every sibling sub-batch
of the same run via ``merge_wishlist_run_status``. The merged view
lands keyed under the originally-requested ``batch_id`` so the
frontend modal (which polls one batch id) sees every sibling's
tasks + progress without needing to know about the split."""
from core.downloads.wishlist_aggregator import merge_wishlist_run_status
live_transfers_lookup = deps.get_cached_transfer_data()
response: dict[str, Any] = {"batches": {}}
with tasks_lock:
if requested_batch_ids:
target_batches = {
bid: batch for bid, batch in download_batches.items()
if bid in requested_batch_ids
}
else:
target_batches = download_batches.copy()
# Pre-index sibling batch ids by wishlist_run_id so the per-
# batch loop below can find them in O(1). Snapshot under the
# held lock; subsequent dict mutations don't matter for this
# build.
run_id_to_batch_ids: dict[str, list[str]] = {}
for bid, batch_row in download_batches.items():
run_id = (batch_row or {}).get('wishlist_run_id') if isinstance(batch_row, dict) else None
if run_id:
run_id_to_batch_ids.setdefault(str(run_id), []).append(bid)
for batch_id, batch in target_batches.items():
try:
primary_status = build_batch_status_data(
batch_id, batch, live_transfers_lookup, deps,
)
# Wishlist-run merge — kicks in only when this batch
# has a run_id AND at least one sibling exists. Falls
# through to legacy single-batch shape otherwise.
run_id = batch.get('wishlist_run_id') if isinstance(batch, dict) else None
sibling_ids = run_id_to_batch_ids.get(str(run_id), []) if run_id else []
if run_id and len(sibling_ids) > 1:
sibling_statuses = []
for sib_id in sibling_ids:
if sib_id == batch_id:
continue
sib_batch = download_batches.get(sib_id)
if not isinstance(sib_batch, dict):
continue
try:
sibling_statuses.append(
build_batch_status_data(
sib_id, sib_batch, live_transfers_lookup, deps,
)
)
except Exception as sib_err:
logger.warning(
f"[Wishlist Run] Sibling status build failed for {sib_id}: {sib_err}"
)
merged = merge_wishlist_run_status(primary_status, sibling_statuses)
response["batches"][batch_id] = merged
else:
response["batches"][batch_id] = primary_status
except Exception as batch_error:
logger.error(f"Error processing batch {batch_id}: {batch_error}")
response["batches"][batch_id] = {"error": str(batch_error)}
response["metadata"] = {
"total_batches": len(response["batches"]),
"requested_batch_ids": requested_batch_ids,
"timestamp": time.time(),
}
debug_info = {}
for batch_id, batch_status in response["batches"].items():
if "error" not in batch_status:
active_count = batch_status.get("active_count", 0)
max_concurrent = batch_status.get("max_concurrent", 3)
task_count = len(batch_status.get("tasks", []))
active_tasks = len([t for t in batch_status.get("tasks", []) if t.get("status") in ['searching', 'downloading', 'queued']])
debug_info[batch_id] = {
"reported_active": active_count,
"actual_active_tasks": active_tasks,
"max_concurrent": max_concurrent,
"total_tasks": task_count,
"worker_discrepancy": active_count != active_tasks,
}
response["debug_info"] = debug_info
logger.info(f"[Batched Status] Returning status for {len(response['batches'])} batches")
discrepancies = [bid for bid, info in debug_info.items() if info.get("worker_discrepancy")]
if discrepancies:
logger.info(f"[Batched Status] Worker count discrepancies in batches: {discrepancies}")
return response
_STATUS_PRIORITY = {
'downloading': 0, 'searching': 1, 'post_processing': 2,
'queued': 3, 'pending': 3,
'completed': 4, 'skipped': 5, 'already_owned': 5,
'not_found': 6, 'failed': 7, 'cancelled': 8,
}
_PERSISTENT_HISTORY_TAIL_LIMIT = 50
def _normalize_identity_part(value: Any) -> str:
return str(value or '').strip().casefold()
def _download_identity(title: Any, artist: Any, album: Any) -> tuple[str, str, str]:
return (
_normalize_identity_part(title),
_normalize_identity_part(artist),
_normalize_identity_part(album),
)
def _history_timestamp(value: Any) -> float:
if not value:
return 0.0
if isinstance(value, (int, float)):
return float(value)
text = str(value).strip()
if not text:
return 0.0
try:
# SQLite CURRENT_TIMESTAMP uses "YYYY-MM-DD HH:MM:SS".
return datetime.fromisoformat(text.replace('Z', '+00:00')).timestamp()
except ValueError:
try:
return datetime.strptime(text[:19], '%Y-%m-%d %H:%M:%S').timestamp()
except ValueError:
return 0.0
def _build_history_download_item(entry: dict) -> dict:
history_id = entry.get('id') or entry.get('history_id') or ''
title = entry.get('title') or entry.get('source_track_title') or ''
artist = entry.get('artist_name') or entry.get('source_artist') or ''
album = entry.get('album_name') or ''
created_at = entry.get('created_at') or entry.get('completed_at') or ''
source = entry.get('download_source') or ''
return {
'task_id': f'history-{history_id}' if history_id else f'history-{title}-{created_at}',
'title': title,
'artist': artist,
'album': album,
'artwork': entry.get('thumb_url') or '',
'status': 'completed',
'progress': 100,
'error': None,
'batch_id': '',
'batch_name': source,
'batch_source': source,
'playlist_id': '',
'track_index': 0,
'batch_total': 1,
'timestamp': _history_timestamp(created_at),
'created_at': created_at,
'priority': _STATUS_PRIORITY['completed'],
'quality': entry.get('quality') or '',
'file_path': entry.get('file_path') or '',
'verification_status': entry.get('verification_status'),
'is_persistent_history': True,
}
def build_unified_downloads_response(limit: int, deps: StatusDeps) -> dict:
"""Flat list of every task across batches, sorted active-first then by recency.
Powers /api/downloads/all for the centralized Downloads page.
"""
items = []
live_identities = set()
with tasks_lock:
for task_id, task in download_tasks.items():
track_info = task.get('track_info') or {}
batch_id = task.get('batch_id', '')
batch = download_batches.get(batch_id, {})
# Extract track metadata — handle all format variations
title = ''
artist = ''
album = ''
artwork = ''
if isinstance(track_info, dict):
title = track_info.get('title') or track_info.get('name') or track_info.get('track_name') or ''
# Artist can be: string, list of strings, list of dicts with 'name'
raw_artist = track_info.get('artist') or track_info.get('artist_name') or track_info.get('artists') or ''
if isinstance(raw_artist, list):
parts = []
for a in raw_artist:
if isinstance(a, dict):
parts.append(a.get('name', ''))
else:
parts.append(str(a))
artist = ', '.join(p for p in parts if p)
elif isinstance(raw_artist, dict):
artist = raw_artist.get('name', '')
else:
artist = str(raw_artist) if raw_artist else ''
# Album can be: string or dict with 'name'
raw_album = track_info.get('album') or track_info.get('album_name') or ''
if isinstance(raw_album, dict):
album = raw_album.get('name', '')
else:
album = str(raw_album) if raw_album else ''
artwork = track_info.get('artwork_url') or track_info.get('image_url') or track_info.get('album_art') or ''
# Try album images
if not artwork:
raw_alb = track_info.get('album')
if isinstance(raw_alb, dict):
images = raw_alb.get('images') or []
if images and isinstance(images, list) and len(images) > 0:
artwork = images[0].get('url', '') if isinstance(images[0], dict) else str(images[0])
status = task.get('status', 'queued')
live_identities.add(_download_identity(title, artist, album))
# Determine download progress percentage
progress = 0
if status == 'completed':
progress = 100
elif status == 'post_processing':
progress = 95
elif status in ('downloading', 'searching'):
# Check live transfer data for real progress
task_filename = task.get('filename') or track_info.get('filename')
task_username = task.get('username') or track_info.get('username')
if task_filename and task_username:
lookup_key = deps.make_context_key(task_username, task_filename)
live_info = deps.get_cached_transfer_data().get(lookup_key)
if live_info:
progress = live_info.get('percentComplete', 0)
items.append({
'task_id': task_id,
'title': title,
'artist': artist,
'album': album,
'artwork': artwork,
'status': status,
'progress': progress,
'error': task.get('error_message'),
'verification_status': task.get('verification_status'),
# library_history row id (set at import) so the Unverified review
# queue can act on a still-live completed task before it becomes
# a persistent-history row.
'history_id': task.get('history_id'),
# Real probed audio quality (mutagen-read from the actual file),
# surfaced so the Downloads page can show what was downloaded.
'quality': task.get('quality') or '',
'retry_info': task.get('retry_info'),
'retry_trigger': task.get('retry_trigger'),
'batch_id': batch_id,
'batch_name': batch.get('playlist_name') or batch.get('album_name') or '',
'batch_source': batch.get('source_page') or batch.get('initiated_from') or '',
# playlist_id is needed by per-row cancel (cancel_task_v2
# takes playlist_id + track_index). Surfacing it here so
# the frontend doesn't need a second lookup.
'playlist_id': batch.get('playlist_id', ''),
'track_index': task.get('track_index', 0),
'batch_total': len(batch.get('queue', [])),
'timestamp': task.get('status_change_time', 0),
'priority': _STATUS_PRIORITY.get(status, 9),
'is_persistent_history': False,
})
# --- Unverified history (unconditional, no limit) ---
# Always load every library_history row that still needs human confirmation
# (verification_status IN ('unverified', 'force_imported')). This is NOT
# gated on len(items) < limit so that historical entries from past batches
# are visible even during a large active batch that would otherwise exhaust
# the limit before the history tail is read. Dedup against live tasks by
# identity so a track currently in post-processing isn't shown twice.
if deps.get_unverified_download_history is not None:
try:
unverified_entries = deps.get_unverified_download_history() or []
except Exception as exc:
logger.debug("[Downloads] unverified history lookup failed: %s", exc)
unverified_entries = []
for entry in unverified_entries:
item = _build_history_download_item(entry)
identity = _download_identity(item.get('title'), item.get('artist'), item.get('album'))
if identity in live_identities:
continue
items.append(item)
live_identities.add(identity)
# --- General recent-history tail (capped, recency-ordered) ---
# Fills in the completed/verified tail so the full Downloads list looks
# populated. Gated on len(items) < limit so a busy batch doesn't trigger
# an extra DB round-trip when we're already at capacity.
if deps.get_persistent_download_history is not None and len(items) < limit:
history_limit = min(limit - len(items), _PERSISTENT_HISTORY_TAIL_LIMIT)
try:
history_entries = deps.get_persistent_download_history(history_limit) or []
except Exception as exc:
logger.debug("[Downloads] persistent history lookup failed: %s", exc)
history_entries = []
appended_history = 0
for entry in history_entries:
if len(items) >= limit or appended_history >= history_limit:
break
item = _build_history_download_item(entry)
identity = _download_identity(item.get('title'), item.get('artist'), item.get('album'))
if identity in live_identities:
continue
items.append(item)
live_identities.add(identity)
appended_history += 1
# Sort: active first (by priority), then by timestamp desc within each group.
# NOTE: the array order is presentation-only — the Downloads page filters
# client-side per tab. What matters is that EVERY live task is present: an
# earlier `items[:limit]` truncation (active-first) starved completed/failed/
# unverified rows off the end during a busy batch, so those tabs stayed empty
# until the batch drained. `limit` now bounds only the persistent-history
# tail (handled above); live in-memory tasks are always returned in full
# (they're already bounded by the 5-min cleanup automation).
items.sort(key=lambda x: (x['priority'], -x['timestamp']))
# Build batch summaries for the batch context panel
batch_summaries = []
with tasks_lock:
for bid, batch in download_batches.items():
queue = batch.get('queue', [])
statuses = [download_tasks[tid]['status'] for tid in queue if tid in download_tasks]
summary = {
'batch_id': bid,
'playlist_id': batch.get('playlist_id', ''),
'batch_name': batch.get('playlist_name') or batch.get('album_name') or '',
'source_page': batch.get('source_page') or batch.get('initiated_from') or '',
'phase': batch.get('phase', 'unknown'),
'total': len(queue),
'completed': sum(1 for s in statuses if s in ('completed', 'skipped', 'already_owned')),
'failed': sum(1 for s in statuses if s in ('failed', 'not_found', 'cancelled')),
'active': sum(1 for s in statuses if s in ('downloading', 'searching', 'post_processing')),
'queued': sum(1 for s in statuses if s in ('queued', 'pending')),
}
album_bundle = _build_album_bundle_status(batch)
if album_bundle:
summary['album_bundle'] = album_bundle
batch_summaries.append(summary)
return {
'success': True,
'downloads': items,
'total': len(items),
'batches': batch_summaries,
'timestamp': time.time(),
}
def _build_album_bundle_status(batch: dict) -> dict:
"""Return public batch-level status for torrent/Usenet album-bundle work."""
state = batch.get('album_bundle_state')
if not state:
return {}
status = {
'state': state,
'source': batch.get('album_bundle_source'),
'release': batch.get('album_bundle_release'),
'progress': batch.get('album_bundle_progress'),
'progress_percent': _album_bundle_progress_percent(
batch.get('album_bundle_progress')
),
'speed': batch.get('album_bundle_speed'),
'downloaded': batch.get('album_bundle_downloaded'),
'size': batch.get('album_bundle_size'),
'seeders': batch.get('album_bundle_seeders'),
'grabs': batch.get('album_bundle_grabs'),
'count': batch.get('album_bundle_count'),
}
return {key: value for key, value in status.items() if value is not None}
def _album_bundle_progress_percent(value: Any) -> int:
try:
progress = float(value)
except (TypeError, ValueError):
return 0
if progress <= 1:
progress *= 100
return max(0, min(100, int(round(progress))))