You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/downloads/master.py

1291 lines
69 KiB

"""Master worker for the missing-tracks download workflow.
`run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps)` is
the single 580-line worker that orchestrates the entire pipeline:
1. PHASE 1 — Analysis: per-track DB ownership check, with album fast path
(lookup album by name+artist, match tracks within it) plus a
MusicBrainz release-cache preflight so per-track post-processing all
uses the same release MBID (prevents Navidrome album splits).
2. Wishlist removal for tracks already in the library.
3. Explicit-content filter.
4. PHASE 2 transition — if nothing missing, mark batch complete, update
per-source playlist phases, kick auto-wishlist completion handler.
5. Soulseek album pre-flight — search for a complete album folder before
falling back to track-by-track search, cache the source for reuse.
6. Wishlist album grouping — derive per-album disc counts and resolve
ONE artist context per album so collab albums don't fold-split.
7. Task creation with explicit album/artist context injection.
8. Hand off to download monitor + start_next_batch_of_downloads.
Lifted verbatim from web_server.py. Wide dependency surface (config, MB
caches, Soulseek client, source-page state dicts, multiple helper funcs)
all injected via `MasterDeps`.
"""
from __future__ import annotations
import json
import logging
import re
import time
import uuid
from dataclasses import dataclass
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any, Callable
from core.downloads import album_bundle_dispatch as _album_bundle_dispatch
from core.runtime_state import download_batches, download_tasks, tasks_lock
logger = logging.getLogger(__name__)
_ALBUM_PREFLIGHT_MIN_SCORE = 0.62
_EDITION_WORDS = {
'deluxe', 'expanded', 'anniversary', 'special', 'platinum', 'bonus',
'remaster', 'remastered', 'edition', 'version',
}
_VARIANT_WORDS = {
'remix', 'rmx', 'acapella', 'a cappella', 'instrumental', 'karaoke',
'live', 'demo', 'extended',
}
_ALBUM_BUNDLE_SOURCES = frozenset(('torrent', 'usenet', 'soulseek'))
def _norm_text(value: Any) -> str:
text = str(value or '').lower()
text = re.sub(r'[_./\\|()[\]{}:;,+]', ' ', text)
text = re.sub(r'[^a-z0-9\s-]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def _similarity(left: Any, right: Any) -> float:
a = _norm_text(left)
b = _norm_text(right)
if not a or not b:
return 0.0
if a == b:
return 1.0
if a in b or b in a:
return min(len(a), len(b)) / max(len(a), len(b))
return SequenceMatcher(None, a, b).ratio()
def _track_title_from_candidate(candidate: Any) -> str:
title = getattr(candidate, 'title', None)
if title:
return str(title)
filename = getattr(candidate, 'filename', '') or ''
stem = Path(filename.replace('\\', '/')).stem
stem = re.sub(r'^\s*(?:disc\s*)?\d+[-_.\s]+', '', stem, flags=re.IGNORECASE)
return stem
def _track_number_from_track(track_data: dict) -> int:
value = track_data.get('track_number') or track_data.get('trackNumber') or 0
try:
return int(str(value).split('/')[0])
except (TypeError, ValueError):
return 0
def _track_number_from_candidate(candidate: Any) -> int:
value = getattr(candidate, 'track_number', None) or 0
try:
return int(str(value).split('/')[0])
except (TypeError, ValueError):
return 0
def _folder_variant_penalty(expected_album_name: str, folder_text: str) -> float:
expected = _norm_text(expected_album_name)
folder = _norm_text(folder_text)
if not folder:
return 0.0
penalty = 0.0
for word in _VARIANT_WORDS:
if word in folder and word not in expected:
penalty += 0.12
for word in _EDITION_WORDS:
if word in folder and word not in expected:
penalty += 0.06
return min(penalty, 0.30)
def _source_quality_score(source: Any) -> float:
score = getattr(source, 'quality_score', None)
if callable(score):
try:
return float(score())
except Exception:
return 0.0
try:
return float(score or 0.0)
except (TypeError, ValueError):
return 0.0
def _album_context_richness(album_ctx: dict) -> int:
if not isinstance(album_ctx, dict):
return 0
fields = ('id', 'name', 'release_date', 'total_tracks', 'album_type')
score = sum(1 for field in fields if album_ctx.get(field))
images = album_ctx.get('images')
if images:
score += 1
artists = album_ctx.get('artists')
if artists:
score += 1
return score
def _score_album_folder(album_result: Any, album_context: dict, artist_context: dict,
tracks_json: list[dict], filtered_track_count: int) -> float:
"""Score one slskd folder as a whole release, not as isolated tracks."""
expected_album = str((album_context or {}).get('name') or '')
expected_artist = str((artist_context or {}).get('name') or '')
expected_count = int((album_context or {}).get('total_tracks') or len(tracks_json) or 0)
expected_year = str((album_context or {}).get('release_date') or '')[:4]
folder_text = ' '.join(
str(getattr(album_result, attr, '') or '')
for attr in ('album_title', 'album_path')
)
album_score = max(
_similarity(expected_album, getattr(album_result, 'album_title', '')),
_similarity(expected_album, getattr(album_result, 'album_path', '')),
)
artist_score = max(
_similarity(expected_artist, getattr(album_result, 'artist', '')),
_similarity(expected_artist, getattr(album_result, 'album_path', '')),
)
actual_count = int(getattr(album_result, 'track_count', 0) or len(getattr(album_result, 'tracks', []) or []))
if expected_count > 0 and actual_count > 0:
diff = abs(actual_count - expected_count)
if diff == 0:
count_score = 1.0
elif diff <= 2:
count_score = 0.75
elif diff <= 5:
count_score = 0.35
else:
count_score = 0.0
else:
count_score = 0.4
candidate_tracks = list(getattr(album_result, 'tracks', []) or [])
matched = 0
expected_tracks = [
(track_data, _norm_text(track_data.get('name', '')))
for track_data in tracks_json
if track_data.get('name')
]
for track_data, expected_title in expected_tracks:
expected_number = _track_number_from_track(track_data)
best = 0.0
for candidate in candidate_tracks:
cand_title = _norm_text(_track_title_from_candidate(candidate))
title_sim = _similarity(expected_title, cand_title)
cand_number = _track_number_from_candidate(candidate)
if expected_number and cand_number and expected_number == cand_number:
title_sim = min(1.0, title_sim + 0.12)
best = max(best, title_sim)
if best >= 0.72:
matched += 1
coverage_score = matched / max(1, len(expected_tracks))
year_score = 0.5
folder_year = str(getattr(album_result, 'year', '') or '')
if expected_year and folder_year:
year_score = 1.0 if expected_year == folder_year else 0.2
elif expected_year and expected_year in _norm_text(folder_text):
year_score = 1.0
quality_count_score = min(1.0, filtered_track_count / max(1, expected_count or actual_count or 1))
peer_score = _source_quality_score(album_result)
penalty = _folder_variant_penalty(expected_album, folder_text)
score = (
album_score * 0.24
+ artist_score * 0.16
+ count_score * 0.16
+ coverage_score * 0.28
+ year_score * 0.06
+ quality_count_score * 0.06
+ peer_score * 0.04
- penalty
)
return max(0.0, min(score, 1.0))
def _resolve_soulseek_client(download_orchestrator: Any) -> Any:
if hasattr(download_orchestrator, 'client'):
try:
client = download_orchestrator.client('soulseek')
if client:
return client
except Exception as exc:
logger.debug("Soulseek client lookup through orchestrator failed: %s", exc)
return getattr(download_orchestrator, 'soulseek', download_orchestrator)
def _soulseek_album_preflight_enabled(config_manager: Any) -> bool:
mode = config_manager.get('download_source.mode', 'hybrid')
if mode == 'soulseek':
return True
if mode != 'hybrid':
return False
order = config_manager.get('download_source.hybrid_order', ['hifi', 'youtube', 'soulseek'])
if order:
return order[0] == 'soulseek'
primary = config_manager.get('download_source.hybrid_primary', '')
return primary == 'soulseek'
def _resolve_album_bundle_source(config_manager: Any) -> str:
"""Return the album-bundle source for this batch.
In single-source mode, the active source may own the whole album if
it supports album bundles. In hybrid mode, only the first source in
the configured order may claim the whole album; later sources remain
per-track fallback.
"""
mode = (config_manager.get('download_source.mode', 'soulseek') or 'soulseek').lower()
if mode in _ALBUM_BUNDLE_SOURCES:
return mode
if mode != 'hybrid':
return ''
order = config_manager.get('download_source.hybrid_order', ['hifi', 'youtube', 'soulseek'])
first = ''
if order:
first = str(order[0] or '').lower()
else:
first = str(config_manager.get('download_source.hybrid_primary', '') or '').lower()
return first if first in _ALBUM_BUNDLE_SOURCES else ''
@dataclass
class MasterDeps:
"""Bundle of cross-cutting deps the master worker needs."""
config_manager: Any
download_orchestrator: Any
run_async: Callable[..., Any]
mb_worker: Any
mb_release_cache: dict
mb_release_cache_lock: Any
mb_release_detail_cache: dict
mb_release_detail_cache_lock: Any
normalize_album_cache_key: Callable[[str], str]
check_and_remove_track_from_wishlist_by_metadata: Callable
is_explicit_blocked: Callable
youtube_playlist_states: dict
tidal_discovery_states: dict
deezer_discovery_states: dict
spotify_public_discovery_states: dict
missing_download_executor: Any
process_failed_tracks_to_wishlist_exact_with_auto_completion: Callable
source_reuse_logger: Any
download_monitor: Any
start_next_batch_of_downloads: Callable[[str], None]
reset_wishlist_auto_processing: Callable[[], None]
class _BatchStateAccessImpl:
"""Concrete ``BatchStateAccess`` for the runtime ``download_batches``
dict — wraps the lock + the existing-batch check so the album-
bundle dispatcher stays decoupled from runtime_state."""
def update_fields(self, batch_id: str, fields: dict) -> None:
with tasks_lock:
row = download_batches.get(batch_id)
if row is not None:
row.update(fields)
def mark_failed(self, batch_id: str, error: str) -> None:
with tasks_lock:
row = download_batches.get(batch_id)
if row is not None:
row['phase'] = 'failed'
row['error'] = error
row['album_bundle_state'] = 'failed'
# Task states that mean a batch still has work in flight. While ANY of a batch's
# tasks is in one of these, a serialized album-pool worker keeps its slot.
_NON_TERMINAL_TASK_STATUSES = ('pending', 'queued', 'searching', 'downloading', 'post_processing')
def _wait_for_batch_drain(batch_id: str, poll_seconds: float = 1.5,
max_wait_seconds: float = 3600.0) -> None:
"""Block until every task in ``batch_id`` reaches a terminal state (the batch
is fully drained), the batch is removed, shutdown is requested, or a safety
cap elapses.
Used to make the dedicated album-bundle pool actually SERIALIZE albums: the
worker holds its pool slot for the album's whole lifetime instead of
returning the instant downloads are started. That stops every album from
dumping its tracks into the shared download pool at once (Sokhi: "searching
for way too many tracks at once"). It's a PASSIVE wait — the downloads are
driven by the monitor + completion callbacks on other threads, so this never
drives the work and can't deadlock; worst case the cap releases the slot and
the downloads simply finish in the background."""
from core.downloads import monitor as _monitor
start = time.time()
while True:
if getattr(_monitor, 'IS_SHUTTING_DOWN', False):
return
with tasks_lock:
batch = download_batches.get(batch_id)
if not batch:
return
queue = list(batch.get('queue', ()) or ())
still_working = any(
download_tasks.get(t, {}).get('status') in _NON_TERMINAL_TASK_STATUSES
for t in queue
)
if not still_working:
return
if time.time() - start > max_wait_seconds:
logger.warning(
"[Album Serialize] batch %s not drained after %.0fs — releasing the "
"album-pool slot (its downloads continue in the background)",
batch_id, max_wait_seconds)
return
time.sleep(poll_seconds)
def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: MasterDeps,
serialize: bool = False):
"""
A master worker that handles the entire missing tracks process:
1. Runs the analysis.
2. If missing tracks are found, it automatically queues them for download.
"""
try:
# PHASE 1: ANALYSIS
with tasks_lock:
if batch_id in download_batches:
download_batches[batch_id]['phase'] = 'analysis'
download_batches[batch_id]['analysis_total'] = len(tracks_json)
download_batches[batch_id]['analysis_processed'] = 0
from database.music_database import MusicDatabase
from core.library import manual_library_match as _mlm
db = MusicDatabase()
active_server = deps.config_manager.get_active_media_server()
analysis_results = []
# Get force download flag and album context from batch
force_download_all = False
ignore_manual_matches = False
batch_album_context = None
batch_artist_context = None
batch_is_album = False
batch_profile_id = 1
batch_source = 'spotify'
batch_playlist_folder_mode = False
batch_playlist_name = 'Unknown Playlist'
batch_playlist_id = playlist_id
batch_source_playlist_ref = ''
# Issue #797 — per-request "Skip AcoustID verification" toggle from
# the album-download modal. When set, every track in this batch
# bypasses the AcoustID quarantine gate (the user has chosen to
# trust the metadata over fingerprint disagreement — useful for
# non-English artists whose native-script metadata AcoustID can't
# reconcile with the romanized request).
batch_skip_acoustid = False
with tasks_lock:
if batch_id in download_batches:
force_download_all = download_batches[batch_id].get('force_download_all', False)
ignore_manual_matches = download_batches[batch_id].get('ignore_manual_matches', False)
batch_is_album = download_batches[batch_id].get('is_album_download', False)
batch_album_context = download_batches[batch_id].get('album_context')
batch_artist_context = download_batches[batch_id].get('artist_context')
batch_profile_id = download_batches[batch_id].get('profile_id', 1) or 1
batch_source = download_batches[batch_id].get('batch_source', 'spotify') or 'spotify'
batch_playlist_folder_mode = download_batches[batch_id].get('playlist_folder_mode', False)
batch_playlist_name = download_batches[batch_id].get('playlist_name', 'Unknown Playlist')
batch_playlist_id = download_batches[batch_id].get('playlist_id', playlist_id)
batch_source_playlist_ref = (
download_batches[batch_id].get('source_playlist_ref') or ''
).strip()
batch_skip_acoustid = bool(download_batches[batch_id].get('skip_acoustid', False))
from core.downloads.playlist_folder import (
resolve_playlist_folder_mode_for_batch,
track_exists_in_playlist_folder_from_track_data,
)
effective_playlist_folder_mode, effective_playlist_name = resolve_playlist_folder_mode_for_batch(
db,
playlist_id=str(batch_playlist_id),
playlist_name=batch_playlist_name,
batch_playlist_folder_mode=batch_playlist_folder_mode,
profile_id=batch_profile_id,
source=batch_source,
)
if effective_playlist_folder_mode and not batch_playlist_folder_mode:
with tasks_lock:
if batch_id in download_batches:
download_batches[batch_id]['playlist_folder_mode'] = True
download_batches[batch_id]['playlist_name'] = effective_playlist_name
if force_download_all:
logger.warning(f"[Force Download] Force download mode enabled for batch {batch_id} - treating all tracks as missing")
# Allow duplicate tracks across albums — when enabled, only skip tracks already
# owned in THIS album, not tracks owned in other albums
allow_duplicates = deps.config_manager.get('wishlist.allow_duplicate_tracks', True)
if allow_duplicates and batch_is_album:
logger.info("[Duplicates] Allow duplicate tracks enabled — only checking ownership within target album")
# PREFLIGHT: Pre-populate MusicBrainz release cache for album downloads.
# This ensures ALL tracks in the album use the same release MBID during
# per-track post-processing, preventing Navidrome album splits.
if batch_is_album and batch_album_context and batch_artist_context:
try:
album_name_pf = batch_album_context.get('name', '')
artist_name_pf = batch_artist_context.get('name', '')
if album_name_pf and artist_name_pf:
mb_svc = deps.mb_worker.mb_service if deps.mb_worker else None
if mb_svc:
from core.album_consistency import _find_best_release
release = _find_best_release(album_name_pf, artist_name_pf, len(tracks_json), mb_svc)
if release and release.get('id'):
release_mbid = release['id']
_artist_key = artist_name_pf.lower().strip()
_rc_key_norm = (deps.normalize_album_cache_key(album_name_pf), _artist_key)
_rc_key_exact = (album_name_pf.lower().strip(), _artist_key)
with deps.mb_release_cache_lock:
deps.mb_release_cache[_rc_key_norm] = release_mbid
deps.mb_release_cache[_rc_key_exact] = release_mbid
# Also cache the full release detail for tag extraction
with deps.mb_release_detail_cache_lock:
deps.mb_release_detail_cache[release_mbid] = release
logger.info(f"[Preflight] Pre-cached MB release for '{album_name_pf}': "
f"'{release.get('title', '')}' ({release_mbid[:8]}...)")
else:
logger.warning(f"[Preflight] No MB release found for '{album_name_pf}' — per-track lookup will be used")
except Exception as pf_err:
logger.error(f"[Preflight] MB release preflight failed: {pf_err}")
# ALBUM FAST PATH: If this is an album download, try to find the album in the DB first
# and match tracks within it — faster and more accurate than N global searches
album_tracks_map = {} # Maps normalized title -> DatabaseTrack for album-scoped matching
if batch_is_album and batch_album_context and batch_artist_context and not force_download_all:
album_name = batch_album_context.get('name', '')
artist_name = batch_artist_context.get('name', '')
total_tracks = batch_album_context.get('total_tracks', 0)
if album_name and artist_name:
try:
db_album, album_confidence = db.check_album_exists_with_editions(
title=album_name, artist=artist_name,
confidence_threshold=0.7,
expected_track_count=total_tracks if total_tracks > 0 else None,
server_source=active_server
)
if db_album and album_confidence >= 0.7:
db_album_tracks = db.get_tracks_by_album(db_album.id)
for t in db_album_tracks:
album_tracks_map[t.title.lower().strip()] = t
logger.info(f"[Album Analysis] Found album '{db_album.title}' in DB with {len(db_album_tracks)} tracks (confidence: {album_confidence:.2f})")
else:
logger.warning(f"[Album Analysis] Album '{album_name}' not found in DB — falling back to per-track search")
except Exception as album_err:
logger.error(f"[Album Analysis] Album lookup error: {album_err} — falling back to per-track search")
for i, track_data in enumerate(tracks_json):
# Use original table index if provided (for partial track selection),
# otherwise fall back to enumeration index
track_index = track_data.get('_original_index', i)
track_name = track_data.get('name', '')
artists = track_data.get('artists', [])
found, confidence = False, 0.0
# Additive payload: the owned library track (DatabaseTrack) when this
# item is found in the library, so downstream (playlist materialization)
# knows WHERE the real file is without re-matching. None when not owned.
matched_track = None
# Manual library matches are authoritative unless the user explicitly
# requested a force re-download from the normal download modal.
_stid = track_data.get('spotify_track_id') or track_data.get('source_track_id') or track_data.get('id', '')
_manual_match = (
_mlm.get_match_for_track(db, batch_profile_id, track_data, default_source=batch_source)
if (not ignore_manual_matches and _stid) else None
)
if _manual_match:
logger.info(f"[Manual Match] '{track_name}' already matched in library — skipping download")
try:
deps.check_and_remove_track_from_wishlist_by_metadata(track_data)
except Exception as _wl_err:
logger.debug(f"[Manual Match] Wishlist removal attempt failed: {_wl_err}")
analysis_results.append({
'track_index': track_index,
'track': track_data,
'found': True,
'confidence': 1.0,
'match_reason': 'manual_library_match',
'matched_file_path': _manual_match.get('library_file_path'),
'matched_track_id': _manual_match.get('library_track_id'),
})
continue
if effective_playlist_folder_mode and not force_download_all:
if track_exists_in_playlist_folder_from_track_data(
effective_playlist_name,
track_data,
):
logger.info(
f"[Playlist Folder] '{track_name}' already on disk in playlist folder — skipping download"
)
try:
deps.check_and_remove_track_from_wishlist_by_metadata(track_data)
except Exception as _wl_err:
logger.debug(f"[Playlist Folder] Wishlist removal attempt failed: {_wl_err}")
analysis_results.append({
'track_index': track_index,
'track': track_data,
'found': True,
'confidence': 1.0,
'match_reason': 'playlist_folder_file',
})
continue
# Skip database check if force download is enabled
if force_download_all:
logger.warning(f"[Force Download] Skipping database check for '{track_name}' - treating as missing")
found, confidence = False, 0.0
elif album_tracks_map:
# Album-scoped matching: check against known album tracks first
track_name_lower = track_name.lower().strip()
# Issue #589 — strip suffixes that just repeat the album
# context (e.g. "Shy Away (MTV Unplugged Live)" on a
# "MTV Unplugged" album → "Shy Away") so album-owned
# tracks don't false-miss when the local DB stored the
# base title. Only fires inside the album-confirmed
# scope; global matching elsewhere is unchanged.
from core.matching.album_context_title import strip_redundant_album_suffix
_album_name_for_strip = (batch_album_context or {}).get('name', '')
_normalized_source_title = strip_redundant_album_suffix(
track_name, _album_name_for_strip
).lower().strip()
# Direct title match (try both raw and normalized)
if track_name_lower in album_tracks_map:
found, confidence = True, 1.0
matched_track = album_tracks_map[track_name_lower]
elif _normalized_source_title and _normalized_source_title in album_tracks_map:
found, confidence = True, 1.0
matched_track = album_tracks_map[_normalized_source_title]
else:
# Fuzzy match against album tracks using string similarity.
# Compare BOTH the raw and normalized source titles —
# whichever scores higher wins. Preserves strict
# matching when the album doesn't imply version
# context (helper returns the input unchanged).
best_sim = 0.0
best_track = None
for db_title_lower, _db_track in album_tracks_map.items():
sim_raw = db._string_similarity(track_name_lower, db_title_lower)
sim_norm = db._string_similarity(_normalized_source_title, db_title_lower) if _normalized_source_title else 0.0
sim = max(sim_raw, sim_norm)
if sim > best_sim:
best_sim = sim
best_track = _db_track
if best_sim >= 0.7:
found, confidence = True, best_sim
matched_track = best_track
else:
# Fall back to global per-track search for this track
# When allow_duplicates is on for album downloads, skip global
# search — the track isn't in THIS album so treat as missing
if allow_duplicates and batch_is_album:
found, confidence = False, 0.0
else:
_fallback_album = batch_album_context.get('name') if batch_album_context else None
for artist in artists:
if isinstance(artist, str):
artist_name = artist
elif isinstance(artist, dict) and 'name' in artist:
artist_name = artist['name']
else:
artist_name = str(artist)
db_track, track_confidence = db.check_track_exists(
track_name, artist_name, confidence_threshold=0.7, server_source=active_server, album=_fallback_album
)
if db_track and track_confidence >= 0.7:
found, confidence = True, track_confidence
matched_track = db_track
break
elif allow_duplicates and batch_is_album:
# Allow duplicates + album download + album not in DB yet → treat all as missing
found, confidence = False, 0.0
else:
# Non-album download (playlist/single track) — always check global
for artist in artists:
# Handle both string format and Spotify API format {'name': 'Artist Name'}
if isinstance(artist, str):
artist_name = artist
elif isinstance(artist, dict) and 'name' in artist:
artist_name = artist['name']
else:
artist_name = str(artist)
db_track, track_confidence = db.check_track_exists(
track_name, artist_name, confidence_threshold=0.7, server_source=active_server
)
if db_track and track_confidence >= 0.7:
found, confidence = True, track_confidence
matched_track = db_track
break
analysis_results.append({
'track_index': track_index, 'track': track_data, 'found': found, 'confidence': confidence,
# Additive: real on-disk location of the owned track (None when not
# owned), so playlist materialization links the right file.
'matched_file_path': getattr(matched_track, 'file_path', None),
'matched_track_id': getattr(matched_track, 'id', None),
})
# WISHLIST REMOVAL: If track is found in database, check if it should be removed from wishlist
if found and confidence >= 0.7:
try:
deps.check_and_remove_track_from_wishlist_by_metadata(track_data)
except Exception as wishlist_error:
logger.error(f"[Analysis] Error checking wishlist removal for found track: {wishlist_error}")
with tasks_lock:
if batch_id in download_batches:
download_batches[batch_id]['analysis_processed'] = i + 1
# Store incremental results for live updates
download_batches[batch_id]['analysis_results'] = analysis_results.copy()
missing_tracks = [res for res in analysis_results if not res['found']]
# Filter explicit tracks if content filter is enabled
if not deps.config_manager.get('content_filter.allow_explicit', True):
before_count = len(missing_tracks)
missing_tracks = [res for res in missing_tracks if not deps.is_explicit_blocked(res.get('track', {}))]
skipped = before_count - len(missing_tracks)
if skipped > 0:
logger.warning(f"[Content Filter] Filtered out {skipped} explicit track(s) from download queue")
# Blocklist (Phase 2a): drop banned artists/albums/tracks before queueing,
# so a blocked item can't slip in via playlist sync / album download /
# discography. Same ID-cascade brain as the wishlist guard (Phase 1) —
# the only other auto-acquisition path. Skipped when the user confirmed
# "download anyway" at the modal (Phase 2b override).
_ignore_blocklist = False
with tasks_lock:
if batch_id in download_batches:
_ignore_blocklist = download_batches[batch_id].get('ignore_blocklist', False)
if not _ignore_blocklist:
try:
_bl_before = len(missing_tracks)
_bl_kept = []
for res in missing_tracks:
reason = db.blocklist_reason_for_track(
batch_profile_id, res.get('track', {}), source=batch_source)
if reason:
logger.info("[Blocklist] Skipping %s '%s' from download queue (%s blocked)",
reason[0], res.get('track', {}).get('name', '?'), reason[0])
else:
_bl_kept.append(res)
if len(_bl_kept) != _bl_before:
logger.info("[Blocklist] Filtered out %d blocklisted track(s) from download queue",
_bl_before - len(_bl_kept))
missing_tracks = _bl_kept
except Exception as _bl_err:
logger.debug("blocklist queue filter skipped: %s", _bl_err)
with tasks_lock:
if batch_id in download_batches:
download_batches[batch_id]['analysis_results'] = analysis_results
# PHASE 2: TRANSITION TO DOWNLOAD (if necessary)
if not missing_tracks:
logger.warning(f"Analysis for batch {batch_id} complete. No missing tracks.")
# Record sync history — all tracks found, nothing to download
tracks_found = sum(1 for r in analysis_results if r.get('found'))
try:
db_sh = MusicDatabase()
db_sh.update_sync_history_completion(batch_id, tracks_found=tracks_found, tracks_downloaded=0, tracks_failed=0)
# Save per-track results (all found, no downloads)
track_results = []
for res in analysis_results:
td = res.get('track', {})
artists = td.get('artists', [])
first_artist = (artists[0].get('name', artists[0]) if isinstance(artists[0], dict) else str(artists[0])) if artists else ''
alb = td.get('album', '')
# Extract image
_img = ''
_alb_obj = td.get('album', {})
if isinstance(_alb_obj, dict):
_alb_imgs = _alb_obj.get('images', [])
if _alb_imgs and isinstance(_alb_imgs, list) and len(_alb_imgs) > 0:
_img = _alb_imgs[0].get('url', '') if isinstance(_alb_imgs[0], dict) else ''
track_results.append({
'index': res.get('track_index', 0),
'name': td.get('name', ''),
'artist': first_artist,
'album': alb.get('name', '') if isinstance(alb, dict) else str(alb or ''),
'image_url': _img,
'duration_ms': td.get('duration_ms', 0),
'source_track_id': td.get('id', ''),
'status': 'found' if res.get('found') else 'not_found',
'confidence': round(res.get('confidence', 0.0), 3),
'matched_track': None,
'download_status': None,
})
if track_results:
db_sh.update_sync_history_track_results(batch_id, json.dumps(track_results))
except Exception as e:
logger.debug("update sync_history track results failed: %s", e)
is_auto_batch = False
with tasks_lock:
if batch_id in download_batches:
is_auto_batch = download_batches[batch_id].get('auto_initiated', False)
download_batches[batch_id]['phase'] = 'complete'
download_batches[batch_id]['completion_time'] = time.time() # Track for auto-cleanup
# Update YouTube playlist phase to 'download_complete' if this is a YouTube playlist
if playlist_id.startswith('youtube_'):
url_hash = playlist_id.replace('youtube_', '')
if url_hash in deps.youtube_playlist_states:
deps.youtube_playlist_states[url_hash]['phase'] = 'download_complete'
logger.warning(f"Updated YouTube playlist {url_hash} to download_complete phase (no missing tracks)")
# Update Tidal playlist phase to 'download_complete' if this is a Tidal playlist
if playlist_id.startswith('tidal_'):
tidal_playlist_id = playlist_id.replace('tidal_', '')
if tidal_playlist_id in deps.tidal_discovery_states:
deps.tidal_discovery_states[tidal_playlist_id]['phase'] = 'download_complete'
logger.warning(f"Updated Tidal playlist {tidal_playlist_id} to download_complete phase (no missing tracks)")
# Update Deezer playlist phase to 'download_complete' if this is a Deezer playlist
if playlist_id.startswith('deezer_'):
deezer_playlist_id = playlist_id.replace('deezer_', '')
if deezer_playlist_id in deps.deezer_discovery_states:
deps.deezer_discovery_states[deezer_playlist_id]['phase'] = 'download_complete'
logger.warning(f"Updated Deezer playlist {deezer_playlist_id} to download_complete phase (no missing tracks)")
# Update Spotify Public playlist phase to 'download_complete' if this is a Spotify Public playlist
if playlist_id.startswith('spotify_public_'):
spotify_public_url_hash = playlist_id.replace('spotify_public_', '')
if spotify_public_url_hash in deps.spotify_public_discovery_states:
deps.spotify_public_discovery_states[spotify_public_url_hash]['phase'] = 'download_complete'
logger.warning(f"Updated Spotify Public playlist {spotify_public_url_hash} to download_complete phase (no missing tracks)")
# Handle auto-initiated wishlist completion even when no missing tracks
if is_auto_batch and playlist_id == 'wishlist':
logger.warning("[Auto-Wishlist] No missing tracks found - calling auto-completion handler to toggle cycle and reschedule")
deps.missing_download_executor.submit(deps.process_failed_tracks_to_wishlist_exact_with_auto_completion, batch_id)
# Organize-by-playlist with NOTHING to download (every track already
# owned): the batch never enters the download/lifecycle path, so build
# the playlist folder here from the owned files the analysis matched.
# Gated + non-fatal; runs once after analysis, not in the per-track loop.
if effective_playlist_folder_mode:
try:
from core.playlists.materialize_service import reconcile_batch_playlists
from database.music_database import MusicDatabase as _MDB
_batch = download_batches.get(batch_id)
if _batch is not None:
# We KNOW the intent is organize-by-playlist here (the gate
# above). The line-431 sync only writes the dict field when
# effective and NOT batch_playlist_folder_mode, so when the
# toggle itself drove it the dict field can still be falsy —
# which makes reconcile build no batch ref. Make the dict
# authoritative so reconcile sees the batch's own playlist.
_batch['playlist_folder_mode'] = True
if effective_playlist_name:
_batch['playlist_name'] = effective_playlist_name
_results = reconcile_batch_playlists(_MDB(), _batch, download_tasks, deps.config_manager)
if not _results:
logger.info(
f"[Playlist Folder] All-owned: nothing rebuilt for "
f"ref={_batch.get('source_playlist_ref') or _batch.get('playlist_id')} "
f"source={_batch.get('batch_source')}"
)
for _pl_name, _mat in _results:
logger.info(
f"[Playlist Folder] Rebuilt '{_mat.playlist_dir}' (all owned): "
f"{_mat.linked} linked, {_mat.copied} copied, {_mat.removed_stale} stale removed"
+ (" (symlinks unsupported here → copied)" if _mat.fellback else "")
)
except Exception as _mat_err:
logger.error(f"[Playlist Folder] All-owned materialize failed (non-fatal): {_mat_err}")
return
logger.warning(f" transitioning batch {batch_id} to download phase with {len(missing_tracks)} tracks.")
# Read batch context (quick lock) before doing any network I/O
with tasks_lock:
if batch_id not in download_batches: return
batch = download_batches[batch_id]
batch_album_context = batch.get('album_context')
batch_artist_context = batch.get('artist_context')
batch_is_album = batch.get('is_album_download', False)
batch_private_album_bundle = bool(batch.get('album_bundle_private_staging'))
batch_playlist_folder_mode = batch.get('playlist_folder_mode', False)
batch_playlist_name = batch.get('playlist_name', 'Unknown Playlist')
# Album-bundle sources download a whole release into private staging,
# then the normal per-track workers claim those staged files. Run this
# only after analysis has found missing tracks; otherwise an already
# owned album would still trigger a release download.
_bundle_state = _BatchStateAccessImpl()
_album_bundle_source = _resolve_album_bundle_source(deps.config_manager)
if _album_bundle_source and _album_bundle_source != 'soulseek':
if _album_bundle_dispatch.try_dispatch(
batch_id=batch_id,
is_album=batch_is_album,
album_context=batch_album_context,
artist_context=batch_artist_context,
config_get=deps.config_manager.get,
plugin_resolver=deps.download_orchestrator.client,
state=_bundle_state,
source_override=_album_bundle_source,
):
return
# === ALBUM PRE-FLIGHT: Search for complete album folder before track-by-track ===
# Only run pre-flight when Soulseek is the download source (or hybrid with soulseek)
preflight_source = None
preflight_tracks = None
soulseek_is_source = _soulseek_album_preflight_enabled(deps.config_manager)
if (batch_is_album and batch_album_context and batch_artist_context
and soulseek_is_source and not batch_private_album_bundle):
artist_name = batch_artist_context.get('name', '')
album_name = batch_album_context.get('name', '')
if artist_name and album_name:
try:
_sr = deps.source_reuse_logger
_sr.info(f"[Album Pre-flight] Searching for '{artist_name} {album_name}'")
logger.info(f"[Album Pre-flight] Searching Soulseek for complete album: '{artist_name} - {album_name}'")
slsk = _resolve_soulseek_client(deps.download_orchestrator)
# Try multiple query variations (banned keywords in artist/album name can return 0 results)
album_queries = [f"{artist_name} {album_name}"]
# Clean artist name (remove feat., parentheticals)
clean_artist = re.sub(r'\s*\(.*?\)', '', artist_name).strip()
clean_artist = re.sub(r'\s*(feat\.?|ft\.?|featuring)\s+.*$', '', clean_artist, flags=re.IGNORECASE).strip()
if clean_artist != artist_name:
album_queries.append(f"{clean_artist} {album_name}")
# Album name only (some users file by album)
album_queries.append(album_name)
album_results = []
track_results = []
album_results_by_source = {}
for aq in album_queries:
_sr.info(f"[Album Pre-flight] Trying query: '{aq}'")
track_results, album_results = deps.run_async(slsk.search(aq, timeout=30))
if album_results:
_sr.info(f"[Album Pre-flight] Found {len(album_results)} album results with query: '{aq}'")
for ar in album_results:
key = (getattr(ar, 'username', ''), getattr(ar, 'album_path', ''))
if key[0] and key[1] and key not in album_results_by_source:
album_results_by_source[key] = ar
else:
_sr.info(f"[Album Pre-flight] No album results for query: '{aq}'")
album_results = list(album_results_by_source.values())
if album_results:
# Score complete folders as releases before falling back to per-track search.
scored_albums = []
for ar in album_results:
filtered_tracks = slsk.filter_results_by_quality_preference(ar.tracks)
if filtered_tracks:
folder_score = _score_album_folder(
ar,
batch_album_context,
batch_artist_context,
tracks_json,
len(filtered_tracks),
)
scored_albums.append((ar, len(filtered_tracks), folder_score))
_sr.info(
f"[Album Pre-flight] Candidate {ar.username}:{ar.album_path} "
f"score={folder_score:.3f}, tracks={ar.track_count}, "
f"quality_tracks={len(filtered_tracks)}"
)
best_album = None
best_score = 0.0
if scored_albums:
scored_albums.sort(key=lambda x: (x[2], x[1], x[0].quality_score), reverse=True)
best_album, _best_filtered_count, best_score = scored_albums[0]
if best_score < _ALBUM_PREFLIGHT_MIN_SCORE:
_sr.info(
f"[Album Pre-flight] Best folder score {best_score:.3f} below "
f"threshold {_ALBUM_PREFLIGHT_MIN_SCORE:.2f}; falling back"
)
logger.warning("[Album Pre-flight] No Soulseek folder passed album-level validation")
best_album = None
if best_album:
_sr.info(f"[Album Pre-flight] Best album result: {best_album.username}:{best_album.album_path} "
f"({best_album.track_count} tracks, quality={best_album.dominant_quality}, score={best_score:.3f})")
logger.info(f"[Album Pre-flight] Found album folder: {best_album.username}"
f"{best_album.track_count} tracks ({best_album.dominant_quality})")
# Browse the user's folder to get all tracks (may have more than search returned)
browse_files = deps.run_async(slsk.browse_user_directory(best_album.username, best_album.album_path))
if browse_files:
folder_tracks = slsk.parse_browse_results_to_tracks(
best_album.username, browse_files, directory=best_album.album_path
)
if folder_tracks:
preflight_source = {
'username': best_album.username,
'folder_path': best_album.album_path
}
preflight_tracks = folder_tracks
_sr.info(f"[Album Pre-flight] Browsed folder: {len(folder_tracks)} audio tracks available")
logger.info(f"[Album Pre-flight] Cached {len(folder_tracks)} tracks from {best_album.username} for source reuse")
else:
_sr.info("[Album Pre-flight] Browse returned files but no audio tracks")
else:
# Browse failed — fall back to using the search result tracks directly
_sr.info("[Album Pre-flight] Browse failed, using search result tracks directly")
preflight_source = {
'username': best_album.username,
'folder_path': best_album.album_path
}
preflight_tracks = best_album.tracks
logger.info(f"[Album Pre-flight] Using {len(best_album.tracks)} tracks from search results (browse unavailable)")
elif not scored_albums:
_sr.info("[Album Pre-flight] No album results passed quality filter")
logger.warning("[Album Pre-flight] No album results matched quality preferences")
else:
_sr.info(f"[Album Pre-flight] Search returned no album results (got {len(track_results)} individual tracks)")
logger.warning("[Album Pre-flight] No complete album folders found, falling back to track-by-track search")
except Exception as preflight_err:
logger.error(f"[Album Pre-flight] Search failed (non-fatal, falling back to track-by-track): {preflight_err}")
deps.source_reuse_logger.info(f"[Album Pre-flight] Exception: {preflight_err}")
# Soulseek album bundles run after analysis so an already-owned
# album does not get downloaded just because the source supports a
# whole-folder flow. When preflight selected a folder, pass that
# exact source into the bundle downloader so we keep the richer
# tracklist-aware scoring instead of doing a weaker second pick.
_bundle_state = _BatchStateAccessImpl()
_album_bundle_source = _resolve_album_bundle_source(deps.config_manager)
if _album_bundle_source == 'soulseek':
if _album_bundle_dispatch.try_dispatch(
batch_id=batch_id,
is_album=batch_is_album,
album_context=batch_album_context,
artist_context=batch_artist_context,
config_get=deps.config_manager.get,
plugin_resolver=deps.download_orchestrator.client,
state=_bundle_state,
source_override=_album_bundle_source,
plugin_kwargs={
'preferred_source': preflight_source,
'preferred_tracks': preflight_tracks,
} if preflight_source and preflight_tracks else None,
):
return
with tasks_lock:
if batch_id not in download_batches: return
download_batches[batch_id]['phase'] = 'downloading'
# Store album pre-flight results on batch for source reuse
# unless the Soulseek album-bundle path already staged a private
# release. Task workers check source reuse before staging match, so
# preloading here would make the staged happy path re-download.
if (
preflight_source
and preflight_tracks
and not download_batches[batch_id].get('album_bundle_private_staging')
):
download_batches[batch_id]['last_good_source'] = preflight_source
download_batches[batch_id]['source_folder_tracks'] = preflight_tracks
download_batches[batch_id]['failed_sources'] = set()
logger.info(f"[Album Pre-flight] Pre-loaded source reuse data on batch {batch_id}")
# Compute total_discs for multi-disc album subfolder support
# Use ALL tracks (tracks_json), not just missing ones, to correctly detect multi-disc
# even when only one disc has missing tracks
if batch_is_album and batch_album_context:
total_discs = max((t.get('disc_number') or 1 for t in tracks_json), default=1)
batch_album_context['total_discs'] = total_discs
if total_discs > 1:
logger.info(f"[Multi-Disc] Detected {total_discs} discs for album '{batch_album_context.get('name')}'")
# Pre-compute per-album data for wishlist tracks (grouped by album ID)
# Wishlist tracks aren't batch_is_album but each track has disc_number in spotify_data
wishlist_album_disc_counts = {}
wishlist_album_artist_map = {} # album_id -> resolved artist context (consistent per album)
wishlist_album_context_map = {} # album_id -> richest shared album context
if playlist_id == 'wishlist':
import json as _json
# First pass: collect disc_number and resolve ONE artist per album
for t in tracks_json:
sp_data = t.get('spotify_data', {})
if isinstance(sp_data, str):
try:
sp_data = _json.loads(sp_data)
except:
sp_data = {}
album_val = sp_data.get('album')
album_id = album_val.get('id') if isinstance(album_val, dict) else album_val if isinstance(album_val, str) else None
# Fallback album key: use album name when ID is missing (e.g. mirrored playlist tracks)
if not album_id and isinstance(album_val, dict) and album_val.get('name'):
album_id = f"_name_{album_val['name'].lower().strip()}"
disc_num = sp_data.get('disc_number') or t.get('disc_number') or 1
if album_id:
wishlist_album_disc_counts[album_id] = max(
wishlist_album_disc_counts.get(album_id, 1), disc_num
)
if isinstance(album_val, dict):
existing_album_ctx = wishlist_album_context_map.get(album_id, {})
if _album_context_richness(album_val) > _album_context_richness(existing_album_ctx):
wishlist_album_context_map[album_id] = dict(album_val)
# Resolve album-level artist once per album (first track wins)
if album_id not in wishlist_album_artist_map:
_wl_source = t.get('source_info') or {}
if isinstance(_wl_source, str):
try:
_wl_source = _json.loads(_wl_source)
except:
_wl_source = {}
_wl_album = album_val if isinstance(album_val, dict) else {}
_wl_album_artists = _wl_album.get('artists', [])
# Priority: watchlist artist > album artists > track artists
if _wl_source.get('watchlist_artist_name'):
wishlist_album_artist_map[album_id] = {
'name': _wl_source['watchlist_artist_name'],
'id': _wl_source.get('watchlist_artist_id', '')
}
elif _wl_source.get('artist_name'):
wishlist_album_artist_map[album_id] = {'name': _wl_source['artist_name']}
elif _wl_album_artists:
_fa = _wl_album_artists[0]
wishlist_album_artist_map[album_id] = _fa if isinstance(_fa, dict) else {'name': str(_fa)}
else:
_wl_track_artists = sp_data.get('artists', [])
if _wl_track_artists:
_fa = _wl_track_artists[0]
wishlist_album_artist_map[album_id] = _fa if isinstance(_fa, dict) else {'name': str(_fa)}
else:
# Try top-level 'artists' (wishlist format uses plural)
_tl_artists = t.get('artists', [])
if _tl_artists:
_tla = _tl_artists[0]
_fallback_name = _tla.get('name', str(_tla)) if isinstance(_tla, dict) else str(_tla)
else:
_fallback_name = t.get('artist', '')
wishlist_album_artist_map[album_id] = {'name': _fallback_name or 'Unknown Artist'}
logger.info(f"[Wishlist Album Grouping] Album '{_wl_album.get('name', album_id)}' → artist: '{wishlist_album_artist_map[album_id].get('name', '?')}'")
for res in missing_tracks:
task_id = str(uuid.uuid4())
track_info = res['track'].copy()
# Add explicit album context to track_info for artist album downloads
if batch_is_album and batch_album_context and batch_artist_context:
track_info['_explicit_album_context'] = batch_album_context
track_info['_explicit_artist_context'] = batch_artist_context
track_info['_is_explicit_album_download'] = True
logger.info(f"[Task Creation] Added explicit album context for: {track_info.get('name')}")
# SPECIAL WISHLIST HANDLING: Inject album context if available to force grouping
elif playlist_id == 'wishlist':
# Extract spotify_data again since it might be buried
spotify_data = track_info.get('spotify_data')
if isinstance(spotify_data, str):
try:
spotify_data = json.loads(spotify_data)
except:
spotify_data = {}
if not spotify_data:
spotify_data = {}
s_album = spotify_data.get('album') or {}
if isinstance(s_album, str):
s_album = {'name': s_album} # Normalize string album to dict
s_artists = spotify_data.get('artists', [])
# We need at least an album name and artist
if s_album and isinstance(s_album, dict) and s_album.get('name'):
# Use pre-computed album-level artist for folder consistency.
# All tracks from the same album get the same artist context,
# preventing folder splits on collab albums (KPOP Demon Hunters, etc.)
album_id_for_lookup = s_album.get('id')
# Fallback album key: match first-pass logic for missing IDs
if not album_id_for_lookup and s_album.get('name'):
album_id_for_lookup = f"_name_{s_album['name'].lower().strip()}"
if not album_id_for_lookup:
album_id_for_lookup = 'wishlist_album'
artist_ctx = wishlist_album_artist_map.get(album_id_for_lookup, {})
if not artist_ctx or not artist_ctx.get('name'):
# Fallback: per-track resolution from artists array
_fb_artists = track_info.get('artists', [])
if _fb_artists:
_fb_a = _fb_artists[0]
_fb_name = _fb_a.get('name', str(_fb_a)) if isinstance(_fb_a, dict) else str(_fb_a)
else:
_fb_name = track_info.get('artist', '')
artist_ctx = {'name': _fb_name or 'Unknown Artist'}
# Construct a shared album context from the richest track in
# this album group so release_date/year and artwork do not
# vary per track and split folders.
album_id = s_album.get('id', 'wishlist_album')
shared_album = wishlist_album_context_map.get(album_id_for_lookup, s_album)
album_ctx = {
'id': album_id,
'name': shared_album.get('name') or s_album.get('name'),
'release_date': shared_album.get('release_date', ''),
'total_tracks': shared_album.get('total_tracks') or s_album.get('total_tracks', 1),
'total_discs': wishlist_album_disc_counts.get(album_id_for_lookup, 1),
'album_type': shared_album.get('album_type') or s_album.get('album_type', 'album'),
'images': shared_album.get('images') or s_album.get('images', []),
'artists': shared_album.get('artists') or s_album.get('artists', []),
}
track_info['_explicit_album_context'] = album_ctx
track_info['_explicit_artist_context'] = artist_ctx
track_info['_is_explicit_album_download'] = True
logger.info(f"[Wishlist] Added album context for: '{track_info.get('name')}' -> '{album_ctx['name']}'")
# Issue #797 — propagate the batch-level "skip AcoustID"
# toggle onto each track so the per-track download context
# (built in core/downloads/candidates.py) can set the
# AcoustID quarantine bypass. Mirrors the _playlist_folder_mode
# threading pattern below.
if batch_skip_acoustid:
track_info['_skip_acoustid'] = True
# Add playlist folder mode flag for sync page playlists and wishlist
# tracks tied to a mirrored playlist with organize_by_playlist enabled.
task_pl_folder_mode = batch_playlist_folder_mode
task_pl_name = batch_playlist_name
if not task_pl_folder_mode and playlist_id == 'wishlist':
wl_source = track_info.get('source_info') or {}
if isinstance(wl_source, str):
try:
wl_source = json.loads(wl_source)
except (json.JSONDecodeError, TypeError):
wl_source = {}
wl_pl_ref = wl_source.get('playlist_id')
wl_pl_name = wl_source.get('playlist_name')
wl_pl_source = wl_source.get('source') or 'spotify'
if wl_pl_ref and hasattr(db, 'resolve_mirrored_playlist'):
wl_mirrored = db.resolve_mirrored_playlist(
wl_pl_ref,
profile_id=batch_profile_id,
default_source=wl_pl_source,
)
if wl_mirrored and wl_mirrored.get('organize_by_playlist'):
task_pl_folder_mode = True
task_pl_name = wl_pl_name or wl_mirrored.get('name') or batch_playlist_name
if task_pl_folder_mode:
# Organize-by-playlist now imports each track NORMALLY into the
# Artist/Album library (i.e. exactly what a normal download does)
# — the playlist folder is built as links/copies AFTER the batch
# from the real library files. So we deliberately DON'T set
# `_playlist_folder_mode` (which routed the real file into a flat
# Music/<playlist>/ dump). We keep `_playlist_name` + source_info
# — they're download provenance (core/downloads/origin.py).
track_info['_playlist_name'] = task_pl_name
if batch_source_playlist_ref:
track_info['source_info'] = {
'playlist_id': batch_source_playlist_ref,
'playlist_name': task_pl_name,
'source': batch_source,
}
logger.info(
f"[Task Creation] Organize-by-playlist (normal import + "
f"materialize after batch): {track_info.get('name')}{task_pl_name}"
)
else:
logger.debug(
f"[Debug] Task Creation - playlist folder mode NOT enabled for: "
f"{track_info.get('name')}"
)
# Download-origin provenance: stamp what TRIGGERED this download
# so the history chokepoint can record it (origin-history modal).
# Wishlist rows already ride their source_info in track_info
# (watchlist_artist_name / playlist_name — the deriver reads
# those directly); this stamp covers DIRECT playlist batches,
# where the playlist context otherwise only survives in
# folder mode.
if '_dl_origin' not in track_info and batch_source_playlist_ref and batch_playlist_name:
_prov_si = track_info.get('source_info') or {}
if isinstance(_prov_si, str):
try:
_prov_si = json.loads(_prov_si)
except (json.JSONDecodeError, TypeError):
_prov_si = {}
if not _prov_si.get('watchlist_artist_name'):
track_info['_dl_origin'] = 'playlist'
track_info['_dl_origin_context'] = (
_prov_si.get('playlist_name') or batch_playlist_name
)
download_tasks[task_id] = {
'status': 'pending', 'track_info': track_info,
'playlist_id': playlist_id, 'batch_id': batch_id,
'track_index': res['track_index'], 'retry_count': 0,
'cached_candidates': [], 'used_sources': set(),
'status_change_time': time.time(),
'metadata_enhanced': False
}
download_batches[batch_id]['queue'].append(task_id)
deps.download_monitor.start_monitoring(batch_id)
deps.start_next_batch_of_downloads(batch_id)
# Album-bundle batches run on the dedicated album pool and pass
# serialize=True: hold this pool slot until the album finishes so only a
# few albums are ever in flight at once, instead of every album batch
# immediately starting and flooding the shared download pool with
# 'searching' tracks (#740 / Sokhi). The residual + playlist + manual
# paths run on the shared download pool and DON'T serialize (blocking
# there would steal an actual download worker).
if serialize:
_wait_for_batch_drain(batch_id)
except Exception as e:
logger.error(f"Master worker for batch {batch_id} failed: {e}")
import traceback
traceback.print_exc()
is_auto_batch = False
with tasks_lock:
if batch_id in download_batches:
is_auto_batch = download_batches[batch_id].get('auto_initiated', False)
download_batches[batch_id]['phase'] = 'error'
download_batches[batch_id]['error'] = str(e)
# Reset YouTube playlist phase to 'discovered' if this is a YouTube playlist on error
if playlist_id.startswith('youtube_'):
url_hash = playlist_id.replace('youtube_', '')
if url_hash in deps.youtube_playlist_states:
deps.youtube_playlist_states[url_hash]['phase'] = 'discovered'
logger.error(f"Reset YouTube playlist {url_hash} to discovered phase (error)")
# Handle auto-initiated wishlist errors - reset flag
if is_auto_batch and playlist_id == 'wishlist':
logger.error("[Auto-Wishlist] Master worker error - resetting auto-processing flag")
deps.reset_wishlist_auto_processing()