mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
828 lines
47 KiB
828 lines
47 KiB
"""Batch lifecycle: start workers, on-completion accounting, completion check.
|
|
|
|
Three deeply-coupled functions:
|
|
|
|
- `start_next_batch_of_downloads(batch_id, deps)` — launches workers up to
|
|
the batch's max_concurrent. Skips cancelled tasks, sets searching status,
|
|
submits to the executor, decrement-safe on submit failures (no ghost
|
|
workers).
|
|
|
|
- `on_download_completed(batch_id, task_id, success, deps)` — called when
|
|
a single track download finishes (good or bad). Tracks failed/cancelled
|
|
tracks for wishlist replay, decrements active count, then runs the full
|
|
batch-completion check — which is its own beast: stuck-task detection
|
|
(searching > 10min → not_found, post_processing > 5min → completed),
|
|
M3U regeneration, repair worker hand-off, album consistency pass,
|
|
wishlist failed-tracks processing.
|
|
|
|
- `check_batch_completion_v2(batch_id, deps)` — same completion check
|
|
but called from the V2 atomic cancel path (which bypasses
|
|
on_download_completed). Duplicate logic preserved verbatim.
|
|
|
|
Lifted verbatim from web_server.py. Dependencies injected via
|
|
`LifecycleDeps` since the surface is wide (15+ callbacks/refs).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import shutil
|
|
import time
|
|
import traceback
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Optional
|
|
|
|
from core.downloads.history import record_sync_history_completion
|
|
from core.runtime_state import (
|
|
add_activity_item,
|
|
download_batches,
|
|
download_tasks,
|
|
tasks_lock,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _safe_batch_dirname(batch_id: str) -> str:
|
|
safe = ''.join(ch if ch.isalnum() or ch in ('-', '_') else '_' for ch in str(batch_id or 'batch'))
|
|
return safe or 'batch'
|
|
|
|
|
|
_ALBUM_BUNDLE_CLEANED_SOURCES = ('soulseek', 'torrent', 'usenet')
|
|
|
|
|
|
def _cleanup_private_album_bundle_staging(batch_id: str, batch: dict) -> None:
|
|
"""Best-effort cleanup for album-bundle private staging copies.
|
|
|
|
Fires when a batch reaches a terminal state. SoulSync's per-batch
|
|
copy lives at ``storage/album_bundle_staging/<batch_id>/`` —
|
|
safe to remove because by the time the batch is "complete" the
|
|
per-track workers have already claimed their files out of staging
|
|
via ``try_staging_match`` and moved them to the Transfer dir.
|
|
|
|
Pre-fix this only ran for torrent / usenet bundles because the
|
|
comment was "slskd keeps its own completed folders" — but the
|
|
Soulseek bundle path ALSO copies files into the private staging
|
|
dir (``soulseek_client.py:1599``), so slskd bundle copies were
|
|
leaking forever. Coverage extended to all three bundle-capable
|
|
sources.
|
|
"""
|
|
if not batch.get('album_bundle_private_staging'):
|
|
return
|
|
if (batch.get('album_bundle_source') or '').lower() not in _ALBUM_BUNDLE_CLEANED_SOURCES:
|
|
return
|
|
|
|
staging_path = batch.get('album_bundle_staging_path')
|
|
if not staging_path:
|
|
return
|
|
|
|
path = Path(staging_path)
|
|
expected_name = _safe_batch_dirname(batch_id)
|
|
if path.name != expected_name:
|
|
logger.warning(
|
|
"[Album Bundle] Refusing to clean private staging path with unexpected name: %s",
|
|
staging_path,
|
|
)
|
|
return
|
|
if not path.exists():
|
|
return
|
|
if not path.is_dir():
|
|
logger.warning("[Album Bundle] Refusing to clean non-directory staging path: %s", staging_path)
|
|
return
|
|
|
|
try:
|
|
shutil.rmtree(path)
|
|
logger.info("[Album Bundle] Cleaned private staging folder for batch %s: %s", batch_id, staging_path)
|
|
except Exception as exc:
|
|
logger.warning("[Album Bundle] Could not clean private staging folder %s: %s", staging_path, exc)
|
|
|
|
|
|
def sweep_orphan_album_bundle_staging(
|
|
staging_root: str,
|
|
*,
|
|
active_batch_ids: Optional[set] = None,
|
|
) -> int:
|
|
"""Remove orphan per-batch dirs from album-bundle staging.
|
|
|
|
An orphan is a ``<staging_root>/<dirname>`` subdir whose ``dirname``
|
|
matches no batch_id in the current ``download_batches`` runtime
|
|
state. Happens when:
|
|
|
|
- The app crashed mid-bundle (cleanup never fired).
|
|
- A batch errored on a non-completion code path.
|
|
- A pre-extension Soulseek bundle (where cleanup was gated to
|
|
torrent/usenet) left a copy behind.
|
|
|
|
Intended to run ONCE at server startup, before any new batch can
|
|
register an active staging dir. That guarantees ``active_batch_ids``
|
|
is genuinely empty / pre-existing; we don't race a starting batch.
|
|
|
|
Returns the count of dirs removed. Safe-by-design:
|
|
- Only touches subdirs of the configured staging root.
|
|
- Each candidate goes through the same ``_safe_batch_dirname``
|
|
name-guard as the per-batch cleanup, so escape-via-symlink
|
|
isn't possible.
|
|
- Refuses to act on non-directories.
|
|
- ``shutil.rmtree`` errors are logged, not raised — sweep must
|
|
not crash app startup over a permission glitch.
|
|
"""
|
|
if not staging_root:
|
|
return 0
|
|
root = Path(staging_root)
|
|
if not root.exists() or not root.is_dir():
|
|
return 0
|
|
|
|
active = active_batch_ids if active_batch_ids is not None else set()
|
|
# Normalize active batch ids to their on-disk dirname form so the
|
|
# set lookup matches what's actually on disk.
|
|
active_dirnames = {_safe_batch_dirname(bid) for bid in active if bid}
|
|
|
|
removed = 0
|
|
try:
|
|
entries = list(root.iterdir())
|
|
except OSError as exc:
|
|
logger.warning("[Album Bundle Sweep] Could not list staging root %s: %s", staging_root, exc)
|
|
return 0
|
|
|
|
for entry in entries:
|
|
if not entry.is_dir():
|
|
continue
|
|
# The directory name MUST match what _safe_batch_dirname
|
|
# produces — anything else was hand-created and we leave it
|
|
# alone. Defensive against stray dirs the user might have
|
|
# placed in the staging root.
|
|
if entry.name != _safe_batch_dirname(entry.name):
|
|
continue
|
|
if entry.name in active_dirnames:
|
|
continue
|
|
try:
|
|
shutil.rmtree(entry)
|
|
removed += 1
|
|
logger.info("[Album Bundle Sweep] Removed orphan staging dir: %s", entry)
|
|
except OSError as exc:
|
|
logger.warning("[Album Bundle Sweep] Could not remove orphan staging dir %s: %s", entry, exc)
|
|
|
|
if removed:
|
|
logger.info("[Album Bundle Sweep] Cleaned %d orphan staging dir(s) under %s", removed, staging_root)
|
|
return removed
|
|
|
|
|
|
@dataclass
|
|
class LifecycleDeps:
|
|
"""Bundle of cross-cutting deps the batch lifecycle needs."""
|
|
config_manager: Any
|
|
automation_engine: Any
|
|
download_monitor: Any
|
|
repair_worker: Any
|
|
mb_worker: Any
|
|
is_shutting_down: Callable[[], bool]
|
|
get_batch_lock: Callable[[str], Any] # (batch_id) -> threading.Lock
|
|
submit_download_track_worker: Callable # (task_id, batch_id) -> None (submits to executor)
|
|
submit_failed_to_wishlist: Callable[[str], None] # async — submits to executor
|
|
submit_failed_to_wishlist_with_auto_completion: Callable[[str], None] # async — submits to executor
|
|
process_failed_to_wishlist: Callable[[str], None] # sync — direct call (used by v2 path)
|
|
process_failed_to_wishlist_with_auto_completion: Callable[[str], None] # sync — direct call (used by v2 path)
|
|
get_track_artist_name: Callable
|
|
check_and_remove_from_wishlist: Callable
|
|
regenerate_batch_m3u: Callable
|
|
youtube_playlist_states: dict
|
|
tidal_discovery_states: dict
|
|
deezer_discovery_states: dict
|
|
spotify_public_discovery_states: dict
|
|
ensure_wishlist_track_format: Callable | None = None
|
|
ensure_spotify_track_format: Callable | None = None
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.ensure_wishlist_track_format is None:
|
|
self.ensure_wishlist_track_format = self.ensure_spotify_track_format
|
|
if self.ensure_spotify_track_format is None:
|
|
self.ensure_spotify_track_format = self.ensure_wishlist_track_format
|
|
|
|
if self.ensure_wishlist_track_format is None:
|
|
raise ValueError("LifecycleDeps requires a wishlist track format helper")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# start_next_batch_of_downloads
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def start_next_batch_of_downloads(batch_id: str, deps: LifecycleDeps) -> None:
|
|
"""Start the next batch of downloads up to the concurrent limit (like GUI)."""
|
|
# ENHANCED: Use batch-specific lock to prevent race conditions when multiple threads
|
|
# try to start workers for the same batch concurrently
|
|
batch_lock = deps.get_batch_lock(batch_id)
|
|
|
|
with batch_lock:
|
|
# Prevent starting new tasks if shutting down
|
|
if deps.is_shutting_down():
|
|
logger.info(f"[Batch Manager] Server shutting down - skipping new tasks for batch {batch_id}")
|
|
return
|
|
|
|
with tasks_lock:
|
|
if batch_id not in download_batches:
|
|
return
|
|
|
|
batch = download_batches[batch_id]
|
|
max_concurrent = batch['max_concurrent']
|
|
queue = batch['queue']
|
|
queue_index = batch['queue_index']
|
|
active_count = batch['active_count']
|
|
|
|
logger.info(f"[Batch Lock] Starting workers for {batch_id}: active={active_count}, max={max_concurrent}, queue_pos={queue_index}/{len(queue)}")
|
|
|
|
# Start downloads up to the concurrent limit
|
|
while active_count < max_concurrent and queue_index < len(queue):
|
|
task_id = queue[queue_index]
|
|
|
|
# CRITICAL V2 FIX: Skip cancelled tasks instead of trying to restart them
|
|
if task_id in download_tasks:
|
|
current_status = download_tasks[task_id]['status']
|
|
if current_status == 'cancelled':
|
|
logger.warning(f"[Batch Lock] Skipping cancelled task {task_id} (queue position {queue_index + 1})")
|
|
download_batches[batch_id]['queue_index'] += 1
|
|
queue_index += 1
|
|
continue # Skip to next task without consuming worker slot
|
|
|
|
# IMPORTANT: Set status to 'searching' BEFORE starting worker (like GUI)
|
|
# Must be done INSIDE the lock to prevent race conditions with status polling
|
|
download_tasks[task_id]['status'] = 'searching'
|
|
download_tasks[task_id]['status_change_time'] = time.time()
|
|
logger.info(f"[Batch Manager] Set task {task_id} status to 'searching'")
|
|
else:
|
|
logger.warning(f"[Batch Lock] Task {task_id} not found in download_tasks - skipping")
|
|
download_batches[batch_id]['queue_index'] += 1
|
|
queue_index += 1
|
|
continue
|
|
|
|
# CRITICAL FIX: Submit to executor BEFORE incrementing counters to prevent ghost workers
|
|
try:
|
|
# Submit to executor first - this can fail
|
|
deps.submit_download_track_worker(task_id, batch_id)
|
|
|
|
# Only increment counters AFTER successful submit
|
|
download_batches[batch_id]['active_count'] += 1
|
|
download_batches[batch_id]['queue_index'] += 1
|
|
|
|
logger.info(f"[Batch Lock] Started download {queue_index + 1}/{len(queue)} - Active: {active_count + 1}/{max_concurrent}")
|
|
|
|
# Update local counters for next iteration
|
|
active_count += 1
|
|
queue_index += 1
|
|
|
|
except Exception as submit_error:
|
|
logger.error(f"[Batch Lock] CRITICAL: Failed to submit task {task_id} to executor: {submit_error}")
|
|
logger.info("[Batch Lock] Worker slot NOT consumed - preventing ghost worker")
|
|
|
|
# Reset task status since worker never started
|
|
if task_id in download_tasks:
|
|
download_tasks[task_id]['status'] = 'failed'
|
|
logger.error(f"[Batch Lock] Set task {task_id} status to 'failed' due to submit failure")
|
|
|
|
# Don't increment counters - no worker was actually started
|
|
# This prevents the "ghost worker" issue where active_count is incremented but no actual worker runs
|
|
break # Stop trying to start more workers if executor is failing
|
|
|
|
logger.info(f"[Batch Lock] Finished starting workers for {batch_id}: final_active={download_batches[batch_id]['active_count']}, max={max_concurrent}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# on_download_completed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def on_download_completed(batch_id: str, task_id: str, success: bool, deps: LifecycleDeps) -> None:
|
|
"""Called when a download completes to start the next one in queue."""
|
|
with tasks_lock:
|
|
if batch_id not in download_batches:
|
|
logger.warning(f"[Batch Manager] Batch {batch_id} not found for completed task {task_id}")
|
|
return
|
|
|
|
# Guard against double-calling: track which tasks have already been completed
|
|
# This prevents active_count from being decremented multiple times for the same task
|
|
# (e.g. status polling and post-processing both observe the same terminal task)
|
|
# NOTE: On duplicate calls, we skip decrement/tracking but STILL check batch completion.
|
|
# This is critical because the first call may see the task in 'post_processing' (not finished),
|
|
# and the second call (from post-processing worker) arrives after the task is truly 'completed'.
|
|
# Without the fallthrough, batch_complete would never be emitted.
|
|
completed_tasks = download_batches[batch_id].setdefault('_completed_task_ids', set())
|
|
_is_duplicate_completion = task_id in completed_tasks
|
|
if _is_duplicate_completion:
|
|
logger.info(f"[Batch Manager] Task {task_id} already completed — skipping decrement, still checking batch completion")
|
|
# Set terminal status so the monitor loop stops re-processing this task
|
|
if task_id in download_tasks and download_tasks[task_id].get('status') in ('downloading', 'queued'):
|
|
download_tasks[task_id]['status'] = 'completed'
|
|
# Fall through to batch completion check below (don't return)
|
|
else:
|
|
completed_tasks.add(task_id)
|
|
|
|
if not _is_duplicate_completion:
|
|
# Track failed/cancelled tasks in batch state (replicating sync.py)
|
|
if not success and task_id in download_tasks:
|
|
task = download_tasks[task_id]
|
|
task_status = task.get('status', 'unknown')
|
|
|
|
# Build track_info structure matching sync.py's permanently_failed_tracks format
|
|
original_track_info = task.get('track_info', {})
|
|
|
|
# Ensure wishlist track has proper structure for wishlist service
|
|
wishlist_track_data = deps.ensure_wishlist_track_format(original_track_info)
|
|
|
|
track_info = {
|
|
'download_index': task.get('track_index', 0),
|
|
'table_index': task.get('track_index', 0),
|
|
'track_name': original_track_info.get('name', 'Unknown Track'),
|
|
'artist_name': deps.get_track_artist_name(original_track_info),
|
|
'retry_count': task.get('retry_count', 0),
|
|
'track_data': wishlist_track_data,
|
|
'spotify_track': wishlist_track_data, # Backward-compatible alias for older callers
|
|
'failure_reason': 'Download cancelled' if task_status == 'cancelled' else ('No matching track found' if task_status == 'not_found' else 'Download failed'),
|
|
'candidates': task.get('cached_candidates', []), # Include search results if available
|
|
}
|
|
|
|
if task_status == 'cancelled':
|
|
download_batches[batch_id]['cancelled_tracks'].add(task.get('track_index', 0))
|
|
logger.warning(f"[Batch Manager] Added cancelled track to batch tracking: {track_info['track_name']}")
|
|
add_activity_item("", "Download Cancelled", f"'{track_info['track_name']}'", "Now")
|
|
elif task_status in ('failed', 'not_found'):
|
|
download_batches[batch_id]['permanently_failed_tracks'].append(track_info)
|
|
if task_status == 'not_found':
|
|
logger.info(f"[Batch Manager] Added not-found track to batch tracking: {track_info['track_name']}")
|
|
add_activity_item("", "Not Found", f"'{track_info['track_name']}'", "Now")
|
|
else:
|
|
logger.error(f"[Batch Manager] Added failed track to batch tracking: {track_info['track_name']}")
|
|
add_activity_item("", "Download Failed", f"'{track_info['track_name']}'", "Now")
|
|
|
|
try:
|
|
if deps.automation_engine:
|
|
deps.automation_engine.emit('download_failed', {
|
|
'artist': track_info.get('artist_name', ''),
|
|
'title': track_info.get('track_name', ''),
|
|
'reason': track_info.get('failure_reason', 'Unknown'),
|
|
})
|
|
except Exception as e:
|
|
logger.debug("download_failed emit failed: %s", e)
|
|
|
|
# WISHLIST REMOVAL: Handle successful downloads for wishlist removal
|
|
if success and task_id in download_tasks:
|
|
try:
|
|
task = download_tasks[task_id]
|
|
track_info = task.get('track_info', {})
|
|
logger.info(f"[Batch Manager] Successful download - checking wishlist removal for task {task_id}")
|
|
|
|
# Add activity for successful download
|
|
track_name = track_info.get('name', 'Unknown Track')
|
|
|
|
# Safely extract artist name (handle both list and string formats)
|
|
artists = track_info.get('artists', [])
|
|
if isinstance(artists, list) and len(artists) > 0:
|
|
first_artist = artists[0]
|
|
artist_name = first_artist.get('name', 'Unknown Artist') if isinstance(first_artist, dict) else str(first_artist)
|
|
elif isinstance(artists, str):
|
|
artist_name = artists
|
|
else:
|
|
artist_name = 'Unknown Artist'
|
|
|
|
add_activity_item("", "Download Complete", f"'{track_name}' by {artist_name}", "Now")
|
|
|
|
# Try to remove from wishlist using track info
|
|
if track_info:
|
|
# Create a context-like structure for the wishlist removal function
|
|
context = {
|
|
'track_info': track_info,
|
|
'original_search_result': track_info, # fallback
|
|
}
|
|
deps.check_and_remove_from_wishlist(context)
|
|
except Exception as wishlist_error:
|
|
logger.error(f"[Batch Manager] Error checking wishlist removal for successful download: {wishlist_error}")
|
|
|
|
# Decrement active count
|
|
old_active = download_batches[batch_id]['active_count']
|
|
download_batches[batch_id]['active_count'] -= 1
|
|
new_active = download_batches[batch_id]['active_count']
|
|
|
|
logger.error(f"[Batch Manager] Task {task_id} completed ({'success' if success else 'failed/cancelled'}). Active workers: {old_active} → {new_active}/{download_batches[batch_id]['max_concurrent']}")
|
|
|
|
# ENHANCED: Always check batch completion after any task completes (including duplicate calls)
|
|
# This ensures completion is detected even when mixing normal downloads with cancelled tasks
|
|
logger.info(f"[Batch Manager] Checking batch completion after task {task_id} completed")
|
|
|
|
# FIXED: Check if batch is truly complete (all tasks finished, not just workers freed)
|
|
batch = download_batches[batch_id]
|
|
all_tasks_started = batch['queue_index'] >= len(batch['queue'])
|
|
no_active_workers = batch['active_count'] == 0
|
|
|
|
# Count actually finished tasks (completed, failed, or cancelled)
|
|
# CRITICAL: Don't include 'post_processing' as finished - it's still in progress (unless stuck)!
|
|
# CRITICAL: Don't include 'searching' as finished - task is being retried (unless stuck)!
|
|
finished_count = 0
|
|
retrying_count = 0
|
|
queue = batch.get('queue', [])
|
|
current_time = time.time()
|
|
for queue_task_id in queue:
|
|
if queue_task_id in download_tasks:
|
|
task = download_tasks[queue_task_id]
|
|
task_status = task['status']
|
|
|
|
# STUCK DETECTION: Force fail tasks that have been in transitional states too long
|
|
if task_status == 'searching':
|
|
task_age = current_time - task.get('status_change_time', current_time)
|
|
if task_age > 600: # 10 minutes
|
|
logger.info(f"⏰ [Stuck Detection] Task {queue_task_id} stuck in searching for {task_age:.0f}s - forcing not_found")
|
|
task['status'] = 'not_found'
|
|
task['error_message'] = f'Search stuck for {int(task_age // 60)} minutes with no results — timed out'
|
|
finished_count += 1
|
|
else:
|
|
retrying_count += 1
|
|
elif task_status == 'post_processing':
|
|
task_age = current_time - task.get('status_change_time', current_time)
|
|
if task_age > 300: # 5 minutes (post-processing should be fast)
|
|
logger.info(f"⏰ [Stuck Detection] Task {queue_task_id} stuck in post_processing for {task_age:.0f}s - forcing completion")
|
|
task['status'] = 'completed' # Assume it worked if file verification is taking too long
|
|
finished_count += 1
|
|
else:
|
|
retrying_count += 1
|
|
elif task_status in ['completed', 'failed', 'cancelled', 'not_found']:
|
|
finished_count += 1
|
|
else:
|
|
# Task ID in queue but not in download_tasks - treat as completed to prevent blocking
|
|
logger.warning(f"[Orphaned Task] Task {queue_task_id} in queue but not in download_tasks - counting as finished")
|
|
finished_count += 1
|
|
|
|
all_tasks_truly_finished = finished_count >= len(queue)
|
|
has_retrying_tasks = retrying_count > 0
|
|
|
|
if all_tasks_started and no_active_workers and all_tasks_truly_finished and not has_retrying_tasks:
|
|
logger.error(f"[Batch Manager] Batch {batch_id} truly complete - all {finished_count}/{len(queue)} tasks finished - processing failed tracks to wishlist")
|
|
elif all_tasks_started and no_active_workers and has_retrying_tasks:
|
|
logger.warning(f"[Batch Manager] Batch {batch_id}: all workers free but {retrying_count} tasks retrying - continuing monitoring")
|
|
elif all_tasks_started and no_active_workers:
|
|
# This used to incorrectly mark batch as complete!
|
|
logger.info(f"[Batch Manager] Batch {batch_id}: all workers free but only {finished_count}/{len(queue)} tasks finished - continuing monitoring")
|
|
|
|
if all_tasks_started and no_active_workers and all_tasks_truly_finished and not has_retrying_tasks:
|
|
|
|
# Check if this is an auto-initiated batch
|
|
is_auto_batch = batch.get('auto_initiated', False)
|
|
|
|
# FIXED: Ensure batch is not already marked as complete to prevent duplicate processing
|
|
if batch.get('phase') != 'complete':
|
|
# Mark batch as complete and set completion timestamp for auto-cleanup
|
|
batch['phase'] = 'complete'
|
|
batch['completion_time'] = time.time() # Track when batch completed
|
|
|
|
# Record sync history completion
|
|
from database.music_database import MusicDatabase
|
|
record_sync_history_completion(MusicDatabase(), batch_id, batch)
|
|
|
|
# Add activity for batch completion
|
|
playlist_name = batch.get('playlist_name', 'Unknown Playlist')
|
|
failed_count = len(batch.get('permanently_failed_tracks', []))
|
|
successful_downloads = finished_count - failed_count
|
|
add_activity_item("", "Download Batch Complete", f"'{playlist_name}' - {successful_downloads} tracks downloaded", "Now")
|
|
|
|
# Emit batch_complete event for automation engine (only if something downloaded)
|
|
if successful_downloads > 0:
|
|
try:
|
|
if deps.automation_engine:
|
|
deps.automation_engine.emit('batch_complete', {
|
|
'playlist_name': playlist_name,
|
|
'total_tracks': str(len(queue)),
|
|
'completed_tracks': str(successful_downloads),
|
|
'failed_tracks': str(failed_count),
|
|
})
|
|
except Exception as e:
|
|
logger.debug("batch_complete emit failed: %s", e)
|
|
|
|
# Update YouTube playlist phase to 'download_complete' if this is a YouTube playlist
|
|
playlist_id = batch.get('playlist_id')
|
|
if playlist_id and playlist_id.startswith('youtube_'):
|
|
url_hash = playlist_id.replace('youtube_', '')
|
|
if url_hash in deps.youtube_playlist_states:
|
|
deps.youtube_playlist_states[url_hash]['phase'] = 'download_complete'
|
|
logger.info(f"Updated YouTube playlist {url_hash} to download_complete phase")
|
|
|
|
# Update Tidal playlist phase to 'download_complete' if this is a Tidal playlist
|
|
if playlist_id and playlist_id.startswith('tidal_'):
|
|
tidal_playlist_id = playlist_id.replace('tidal_', '')
|
|
if tidal_playlist_id in deps.tidal_discovery_states:
|
|
deps.tidal_discovery_states[tidal_playlist_id]['phase'] = 'download_complete'
|
|
logger.info(f"Updated Tidal playlist {tidal_playlist_id} to download_complete phase")
|
|
|
|
# Update Deezer playlist phase to 'download_complete' if this is a Deezer playlist
|
|
if playlist_id and playlist_id.startswith('deezer_'):
|
|
deezer_playlist_id = playlist_id.replace('deezer_', '')
|
|
if deezer_playlist_id in deps.deezer_discovery_states:
|
|
deps.deezer_discovery_states[deezer_playlist_id]['phase'] = 'download_complete'
|
|
logger.info(f"Updated Deezer playlist {deezer_playlist_id} to download_complete phase")
|
|
|
|
# Update Spotify Public playlist phase to 'download_complete' if this is a Spotify Public playlist
|
|
if playlist_id and playlist_id.startswith('spotify_public_'):
|
|
spotify_public_url_hash = playlist_id.replace('spotify_public_', '')
|
|
if spotify_public_url_hash in deps.spotify_public_discovery_states:
|
|
deps.spotify_public_discovery_states[spotify_public_url_hash]['phase'] = 'download_complete'
|
|
logger.info(f"Updated Spotify Public playlist {spotify_public_url_hash} to download_complete phase")
|
|
|
|
logger.info(f"[Batch Manager] Batch {batch_id} complete - stopping monitor")
|
|
deps.download_monitor.stop_monitoring(batch_id)
|
|
_cleanup_private_album_bundle_staging(batch_id, batch)
|
|
|
|
# M3U REGENERATION: Regenerate M3U with real library paths now that
|
|
# all post-processing (tagging, moving, DB writes) is complete.
|
|
# The frontend M3U save may fire too early — this ensures paths resolve.
|
|
if deps.config_manager.get('m3u_export.enabled', False):
|
|
try:
|
|
m3u_tracks = []
|
|
for tid in queue:
|
|
if tid in download_tasks and download_tasks[tid].get('status') == 'completed':
|
|
ti = download_tasks[tid].get('track_info', {})
|
|
artists = ti.get('artists', [])
|
|
artist_str = artists[0] if isinstance(artists, list) and artists else ''
|
|
if isinstance(artist_str, dict):
|
|
artist_str = artist_str.get('name', '')
|
|
m3u_tracks.append({
|
|
'name': ti.get('name', ''),
|
|
'artist': artist_str,
|
|
'duration_ms': ti.get('duration_ms', 0),
|
|
})
|
|
if m3u_tracks:
|
|
deps.regenerate_batch_m3u(batch, m3u_tracks)
|
|
except Exception as m3u_err:
|
|
logger.error(f"[M3U] Error regenerating M3U on batch complete: {m3u_err}")
|
|
|
|
# PLAYLIST MATERIALIZE: one path-independent reconcile — drop this
|
|
# batch's newly-resolved tracks into the right Playlists/<name>/
|
|
# folders. Covers an organize-by-playlist download AND a late
|
|
# wishlist arrival (via each track's playlist provenance). Built
|
|
# from the batch's own captured paths — non-fatal, derived view.
|
|
try:
|
|
from core.playlists.materialize_service import reconcile_batch_playlists
|
|
from database.music_database import MusicDatabase
|
|
for _pl_name, _mat in reconcile_batch_playlists(MusicDatabase(), batch, download_tasks, deps.config_manager):
|
|
logger.info(
|
|
f"[Playlist Folder] Rebuilt '{_mat.playlist_dir}': "
|
|
f"{_mat.linked} linked, {_mat.copied} copied, "
|
|
f"{_mat.unchanged} unchanged, {_mat.removed_stale} stale removed"
|
|
+ (" (symlinks unsupported here → copied)" if _mat.fellback else "")
|
|
)
|
|
except Exception as _mat_err:
|
|
logger.error(f"[Playlist Folder] Materialize failed (non-fatal): {_mat_err}")
|
|
|
|
# REPAIR: Scan all album folders from this batch for track number issues
|
|
if deps.repair_worker:
|
|
deps.repair_worker.process_batch(batch_id)
|
|
|
|
# ALBUM CONSISTENCY: Picard-style post-batch pass — pick ONE MusicBrainz
|
|
# release and overwrite album-level tags on all files to guarantee consistency.
|
|
# This is the safety net: even if per-track MB lookups drifted (different cache
|
|
# keys, API hiccups), this pass forces every file to share the same release MBID,
|
|
# album artist ID, release group ID, etc. — preventing Navidrome album splits.
|
|
_cons_files = batch.get('_consistency_files', [])
|
|
if batch.get('is_album_download') and _cons_files and len(_cons_files) >= 2:
|
|
_cons_album = batch.get('album_context', {})
|
|
_cons_artist = batch.get('artist_context', {})
|
|
_cons_album_name = _cons_album.get('name', '') if isinstance(_cons_album, dict) else ''
|
|
_cons_artist_name = _cons_artist.get('name', '') if isinstance(_cons_artist, dict) else ''
|
|
if _cons_album_name and _cons_artist_name:
|
|
try:
|
|
_cons_mb_svc = deps.mb_worker.mb_service if deps.mb_worker else None
|
|
if _cons_mb_svc and deps.config_manager.get('musicbrainz.embed_tags', True):
|
|
from core.album_consistency import run_album_consistency
|
|
from core.metadata.common import get_file_lock
|
|
_cons_result = run_album_consistency(
|
|
file_infos=_cons_files,
|
|
album_name=_cons_album_name,
|
|
artist_name=_cons_artist_name,
|
|
mb_service=_cons_mb_svc,
|
|
total_discs=_cons_album.get('total_discs', 1),
|
|
file_lock_fn=get_file_lock,
|
|
)
|
|
if _cons_result.get('success'):
|
|
logger.info(f"[Album Consistency] {_cons_result['tags_written']}/{_cons_result['total_files']} files "
|
|
f"harmonized to release {_cons_result.get('release_mbid', '')[:8]}...")
|
|
elif _cons_result.get('error'):
|
|
logger.error(f"[Album Consistency] Skipped: {_cons_result['error']}")
|
|
except Exception as cons_err:
|
|
logger.error(f"[Album Consistency] Failed (non-fatal): {cons_err}")
|
|
|
|
# Mark that wishlist processing is starting (prevents premature cleanup)
|
|
batch['wishlist_processing_started'] = True
|
|
|
|
# Process wishlist outside of the lock to prevent threading issues
|
|
if is_auto_batch:
|
|
# For auto-initiated batches, handle completion and schedule next cycle
|
|
deps.submit_failed_to_wishlist_with_auto_completion(batch_id)
|
|
else:
|
|
# For manual batches, use standard wishlist processing
|
|
deps.submit_failed_to_wishlist(batch_id)
|
|
else:
|
|
logger.warning(f"[Batch Manager] Batch {batch_id} already marked complete - skipping duplicate processing")
|
|
|
|
return # Don't start next batch if we're done
|
|
|
|
# Start next downloads in queue
|
|
logger.info(f"[Batch Manager] Starting next batch for {batch_id}")
|
|
start_next_batch_of_downloads(batch_id, deps)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_batch_completion_v2
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def check_batch_completion_v2(batch_id: str, deps: LifecycleDeps) -> Optional[bool]:
|
|
"""V2 SYSTEM: Check if batch is complete after worker slot changes.
|
|
|
|
This is needed because V2 atomic cancel bypasses on_download_completed,
|
|
so we need to manually check for batch completion.
|
|
"""
|
|
try:
|
|
with tasks_lock:
|
|
if batch_id not in download_batches:
|
|
logger.warning(f"[Completion Check V2] Batch {batch_id} not found")
|
|
return
|
|
|
|
batch = download_batches[batch_id]
|
|
all_tasks_started = batch['queue_index'] >= len(batch['queue'])
|
|
no_active_workers = batch['active_count'] == 0
|
|
|
|
# Count actually finished tasks (completed, failed, or cancelled)
|
|
finished_count = 0
|
|
retrying_count = 0
|
|
queue = batch.get('queue', [])
|
|
current_time = time.time()
|
|
|
|
for task_id in queue:
|
|
if task_id in download_tasks:
|
|
task = download_tasks[task_id]
|
|
task_status = task['status']
|
|
|
|
# STUCK DETECTION: Force fail tasks that have been in transitional states too long
|
|
if task_status == 'searching':
|
|
task_age = current_time - task.get('status_change_time', current_time)
|
|
if task_age > 600: # 10 minutes
|
|
logger.info(f"⏰ [Stuck Detection V2] Task {task_id} stuck in searching for {task_age:.0f}s - forcing not_found")
|
|
task['status'] = 'not_found'
|
|
task['error_message'] = f'Search stuck for {int(task_age // 60)} minutes with no results — timed out'
|
|
finished_count += 1
|
|
else:
|
|
retrying_count += 1
|
|
elif task_status == 'post_processing':
|
|
task_age = current_time - task.get('status_change_time', current_time)
|
|
if task_age > 300: # 5 minutes (post-processing should be fast)
|
|
logger.info(f"⏰ [Stuck Detection V2] Task {task_id} stuck in post_processing for {task_age:.0f}s - forcing completion")
|
|
task['status'] = 'completed' # Assume it worked if file verification is taking too long
|
|
finished_count += 1
|
|
else:
|
|
retrying_count += 1
|
|
elif task_status in ['completed', 'failed', 'cancelled', 'not_found']:
|
|
finished_count += 1
|
|
else:
|
|
# Task ID in queue but not in download_tasks - treat as completed to prevent blocking
|
|
logger.warning(f"[Orphaned Task V2] Task {task_id} in queue but not in download_tasks - counting as finished")
|
|
finished_count += 1
|
|
|
|
all_tasks_truly_finished = finished_count >= len(queue)
|
|
has_retrying_tasks = retrying_count > 0
|
|
|
|
logger.warning(f"[Completion Check V2] Batch {batch_id}: tasks_started={all_tasks_started}, workers={no_active_workers}, finished={finished_count}/{len(queue)}, retrying={retrying_count}")
|
|
|
|
is_auto_batch = False
|
|
if all_tasks_started and no_active_workers and all_tasks_truly_finished and not has_retrying_tasks:
|
|
# FIXED: Ensure batch is not already marked as complete to prevent duplicate processing
|
|
if batch.get('phase') != 'complete':
|
|
logger.info(f"[Completion Check V2] Batch {batch_id} is complete - marking as finished")
|
|
|
|
# Check if this is an auto-initiated batch
|
|
is_auto_batch = batch.get('auto_initiated', False)
|
|
|
|
# Mark batch as complete and set completion timestamp for auto-cleanup
|
|
batch['phase'] = 'complete'
|
|
batch['completion_time'] = time.time() # Track when batch completed
|
|
|
|
# Add activity for batch completion
|
|
playlist_name = batch.get('playlist_name', 'Unknown Playlist')
|
|
failed_count = len(batch.get('permanently_failed_tracks', []))
|
|
successful_downloads = finished_count - failed_count
|
|
add_activity_item("", "Download Batch Complete", f"'{playlist_name}' - {successful_downloads} tracks downloaded", "Now")
|
|
|
|
# Emit batch_complete event for automation engine (only if something downloaded)
|
|
if successful_downloads > 0:
|
|
try:
|
|
if deps.automation_engine:
|
|
deps.automation_engine.emit('batch_complete', {
|
|
'playlist_name': playlist_name,
|
|
'total_tracks': str(len(queue)),
|
|
'completed_tracks': str(successful_downloads),
|
|
'failed_tracks': str(failed_count),
|
|
})
|
|
except Exception as e:
|
|
logger.debug("batch_complete emit failed: %s", e)
|
|
else:
|
|
logger.warning(f"[Completion Check V2] Batch {batch_id} already marked complete - skipping duplicate processing")
|
|
return True # Already complete
|
|
|
|
# Update YouTube playlist phase to 'download_complete' if this is a YouTube playlist
|
|
playlist_id = batch.get('playlist_id')
|
|
if playlist_id and playlist_id.startswith('youtube_'):
|
|
url_hash = playlist_id.replace('youtube_', '')
|
|
if url_hash in deps.youtube_playlist_states:
|
|
deps.youtube_playlist_states[url_hash]['phase'] = 'download_complete'
|
|
logger.info(f"[Completion Check V2] Updated YouTube playlist {url_hash} to download_complete phase")
|
|
|
|
# Update Tidal playlist phase to 'download_complete' if this is a Tidal playlist
|
|
if playlist_id and playlist_id.startswith('tidal_'):
|
|
tidal_playlist_id = playlist_id.replace('tidal_', '')
|
|
if tidal_playlist_id in deps.tidal_discovery_states:
|
|
deps.tidal_discovery_states[tidal_playlist_id]['phase'] = 'download_complete'
|
|
logger.info(f"[Completion Check V2] Updated Tidal playlist {tidal_playlist_id} to download_complete phase")
|
|
|
|
# Update Deezer playlist phase to 'download_complete' if this is a Deezer playlist
|
|
if playlist_id and playlist_id.startswith('deezer_'):
|
|
deezer_playlist_id = playlist_id.replace('deezer_', '')
|
|
if deezer_playlist_id in deps.deezer_discovery_states:
|
|
deps.deezer_discovery_states[deezer_playlist_id]['phase'] = 'download_complete'
|
|
logger.info(f"[Completion Check V2] Updated Deezer playlist {deezer_playlist_id} to download_complete phase")
|
|
|
|
# Update Spotify Public playlist phase to 'download_complete' if this is a Spotify Public playlist
|
|
if playlist_id and playlist_id.startswith('spotify_public_'):
|
|
spotify_public_url_hash = playlist_id.replace('spotify_public_', '')
|
|
if spotify_public_url_hash in deps.spotify_public_discovery_states:
|
|
deps.spotify_public_discovery_states[spotify_public_url_hash]['phase'] = 'download_complete'
|
|
logger.info(f"[Completion Check V2] Updated Spotify Public playlist {spotify_public_url_hash} to download_complete phase")
|
|
|
|
logger.info(f"[Completion Check V2] Batch {batch_id} complete - stopping monitor")
|
|
deps.download_monitor.stop_monitoring(batch_id)
|
|
_cleanup_private_album_bundle_staging(batch_id, batch)
|
|
|
|
# PLAYLIST MATERIALIZE: same reconcile as the primary completion path
|
|
# (on_download_completed). Monitor-detected downloads complete via THIS
|
|
# V2 path, so the reconcile must run here too or playlist folders never
|
|
# get built for them. Path-independent, non-fatal, derived view.
|
|
try:
|
|
from core.playlists.materialize_service import reconcile_batch_playlists
|
|
from database.music_database import MusicDatabase
|
|
for _pl_name, _mat in reconcile_batch_playlists(MusicDatabase(), batch, download_tasks, deps.config_manager):
|
|
logger.info(
|
|
f"[Playlist Folder] Rebuilt '{_mat.playlist_dir}': "
|
|
f"{_mat.linked} linked, {_mat.copied} copied, "
|
|
f"{_mat.unchanged} unchanged, {_mat.removed_stale} stale removed"
|
|
+ (" (symlinks unsupported here → copied)" if _mat.fellback else "")
|
|
)
|
|
except Exception as _mat_err:
|
|
logger.error(f"[Playlist Folder] Materialize failed (non-fatal): {_mat_err}")
|
|
|
|
# REPAIR: Scan all album folders from this batch for track number issues
|
|
if deps.repair_worker:
|
|
deps.repair_worker.process_batch(batch_id)
|
|
|
|
# ALBUM CONSISTENCY: Same Picard-style pass as the primary completion path
|
|
_cons_files = batch.get('_consistency_files', [])
|
|
if batch.get('is_album_download') and _cons_files and len(_cons_files) >= 2:
|
|
_cons_album = batch.get('album_context', {})
|
|
_cons_artist = batch.get('artist_context', {})
|
|
_cons_album_name = _cons_album.get('name', '') if isinstance(_cons_album, dict) else ''
|
|
_cons_artist_name = _cons_artist.get('name', '') if isinstance(_cons_artist, dict) else ''
|
|
if _cons_album_name and _cons_artist_name:
|
|
try:
|
|
_cons_mb_svc = deps.mb_worker.mb_service if deps.mb_worker else None
|
|
if _cons_mb_svc and deps.config_manager.get('musicbrainz.embed_tags', True):
|
|
from core.album_consistency import run_album_consistency
|
|
from core.metadata.common import get_file_lock
|
|
_cons_result = run_album_consistency(
|
|
file_infos=_cons_files,
|
|
album_name=_cons_album_name,
|
|
artist_name=_cons_artist_name,
|
|
mb_service=_cons_mb_svc,
|
|
total_discs=_cons_album.get('total_discs', 1),
|
|
file_lock_fn=get_file_lock,
|
|
)
|
|
if _cons_result.get('success'):
|
|
logger.info(f"[Album Consistency V2] {_cons_result['tags_written']}/{_cons_result['total_files']} files "
|
|
f"harmonized to release {_cons_result.get('release_mbid', '')[:8]}...")
|
|
elif _cons_result.get('error'):
|
|
logger.error(f"[Album Consistency V2] Skipped: {_cons_result['error']}")
|
|
except Exception as cons_err:
|
|
logger.error(f"[Album Consistency V2] Failed (non-fatal): {cons_err}")
|
|
|
|
# Process wishlist outside of the lock to prevent threading issues
|
|
if all_tasks_started and no_active_workers and all_tasks_truly_finished and not has_retrying_tasks:
|
|
# Call wishlist processing outside the lock — DIRECT (synchronous) call
|
|
# to match original v2 behavior. The non-v2 path (on_download_completed)
|
|
# uses the async submit_* deps; v2 calls directly because v2 itself runs
|
|
# from a context where blocking is acceptable.
|
|
if is_auto_batch:
|
|
logger.info("[Completion Check V2] Processing auto-initiated batch completion")
|
|
deps.process_failed_to_wishlist_with_auto_completion(batch_id)
|
|
else:
|
|
logger.info("[Completion Check V2] Processing regular batch completion")
|
|
deps.process_failed_to_wishlist(batch_id)
|
|
|
|
return True # Batch was completed
|
|
else:
|
|
logger.warning(f"[Completion Check V2] Batch {batch_id} not yet complete: finished={finished_count}/{len(queue)}, retrying={retrying_count}, workers={batch['active_count']}")
|
|
return False # Batch still in progress
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Completion Check V2] Error checking batch completion: {e}")
|
|
traceback.print_exc()
|
|
return False
|