You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/imports/pipeline.py

1119 lines
56 KiB

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""Import/post-processing pipeline for downloads and imported files."""
from __future__ import annotations
import json
import os
import threading
import time
from types import SimpleNamespace
from typing import Any
from config.settings import config_manager
from core.imports.file_ops import (
cleanup_empty_directories,
cleanup_slskd_dedup_siblings,
create_lossy_copy,
downsample_hires_flac,
get_audio_quality_string,
get_quality_tier_from_extension,
safe_move_file,
)
from core.imports.context import (
build_import_album_info,
detect_album_info_web,
extract_artist_name,
get_import_clean_artist,
get_import_clean_title,
get_import_context_artist,
get_import_has_clean_metadata,
get_import_original_search,
get_import_source,
get_import_track_info,
normalize_import_context,
)
from core.imports.file_integrity import check_audio_integrity, resolve_duration_tolerance
from core.imports.filename import extract_track_number_from_filename
from core.imports.guards import check_flac_bit_depth, move_to_quarantine
from core.imports.quarantine import entry_id_from_quarantined_filename
from core.imports.side_effects import (
emit_track_downloaded,
record_download_provenance,
record_library_history_download,
record_retag_download,
record_soulsync_library_entry,
)
from core.wishlist.resolution import check_and_remove_from_wishlist
from core.runtime_state import (
add_activity_item,
download_batches,
download_tasks,
matched_context_lock,
matched_downloads_context,
mark_task_completed as _mark_task_completed,
post_process_locks,
post_process_locks_lock,
processed_download_ids,
tasks_lock,
)
from core.metadata.artwork import download_cover_art
from core.metadata.common import wipe_source_tags
from core.metadata.enrichment import enhance_file_metadata
from core.imports.paths import (
build_final_path_for_track,
build_simple_download_destination,
docker_resolve_path,
)
from core.imports.album_naming import resolve_album_group
from core.metadata.lyrics import generate_lrc_file
from database.music_database import get_database
from utils.logging_config import get_logger
logger = get_logger("imports.pipeline")
pp_logger = get_logger("post_processing")
__all__ = [
"build_import_pipeline_runtime",
"post_process_matched_download",
"post_process_matched_download_with_verification",
]
def _should_skip_quarantine_check(context: dict, check_name: str) -> bool:
bypass = context.get('_skip_quarantine_check')
if bypass == 'all':
return True
if isinstance(bypass, (list, tuple, set)):
return 'all' in bypass or check_name in bypass
return bypass == check_name
def _mark_task_quarantined(context: dict, quarantine_path: str | None) -> None:
if not quarantine_path:
return
task_id = context.get('task_id')
if not task_id:
return
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['quarantine_entry_id'] = entry_id_from_quarantined_filename(quarantine_path)
def build_import_pipeline_runtime(
*,
automation_engine: Any | None = None,
on_download_completed: Any | None = None,
web_scan_manager: Any | None = None,
repair_worker: Any | None = None,
) -> SimpleNamespace:
"""Build the runtime object consumed by core.imports.pipeline."""
return SimpleNamespace(
automation_engine=automation_engine,
on_download_completed=on_download_completed,
web_scan_manager=web_scan_manager,
repair_worker=repair_worker,
)
def post_process_matched_download(context_key, context, file_path, runtime, metadata_runtime=None):
on_download_completed = getattr(runtime, "on_download_completed", None)
automation_engine = getattr(runtime, "automation_engine", None)
web_scan_manager = getattr(runtime, "web_scan_manager", None)
repair_worker = getattr(runtime, "repair_worker", None)
metadata_runtime = metadata_runtime or runtime
def _notify_download_completed(batch_id, task_id, success=True):
if on_download_completed:
on_download_completed(batch_id, task_id, success=success)
with post_process_locks_lock:
if context_key not in post_process_locks:
post_process_locks[context_key] = threading.Lock()
file_lock = post_process_locks[context_key]
file_lock.acquire()
try:
if not os.path.exists(file_path):
existing_final = context.get('_final_processed_path')
if existing_final and os.path.exists(existing_final):
logger.info(
f"[Race Guard] Source gone but destination exists — already processed by another thread: "
f"{os.path.basename(existing_final)}"
)
return
logger.error(
f"[Race Guard] Source file gone and no known destination — marking as failed: "
f"{os.path.basename(file_path)}"
)
context['_race_guard_failed'] = True
return
_basename = os.path.basename(file_path)
_prev_size = -1
for _stability_check in range(5):
try:
_cur_size = os.path.getsize(file_path)
except OSError:
_cur_size = -1
if _cur_size == _prev_size and _cur_size > 0:
break
_prev_size = _cur_size
if _stability_check == 0:
logger.info(f"Waiting for file to stabilise: {_basename} ({_cur_size} bytes)")
time.sleep(1.5)
else:
logger.info(f"File may still be writing after stability checks: {_basename} ({_prev_size} bytes)")
# File integrity check: catches broken slskd transfers (truncated,
# corrupted, wrong file masquerading as the target) before we burn
# cycles on AcoustID + tagging + library sync. Universal across
# formats; failed files get quarantined and the slot freed.
try:
_normalized_for_duration = normalize_import_context(context)
_duration_track = get_import_track_info(_normalized_for_duration)
_expected_duration_ms = int(_duration_track.get("duration_ms", 0) or 0) or None
except Exception:
_expected_duration_ms = None
# User-configurable tolerance override. None = use built-in
# auto-scaled defaults (3s normal / 5s for tracks >10min). Set
# higher (e.g. 10) when matched files routinely drift from the
# source's reported duration (live recordings, alternate
# masterings, etc).
_duration_tolerance_override = resolve_duration_tolerance(
config_manager.get('post_processing.duration_tolerance_seconds', 0)
)
# User-approved quarantine restores can bypass quarantine gates
# for this one post-processing pass.
if _should_skip_quarantine_check(context, 'integrity'):
logger.info(f"[Integrity] Skipped (user approval) for {_basename}")
integrity = None
else:
try:
integrity = check_audio_integrity(
file_path,
_expected_duration_ms,
length_tolerance_s=_duration_tolerance_override,
)
except Exception as integrity_error:
logger.error(f"[Integrity] Check raised unexpectedly (continuing): {integrity_error}")
integrity = None
if integrity is not None and not integrity.ok:
logger.error(f"[Integrity] Rejected {_basename}: {integrity.reason}")
context['_integrity_failure_msg'] = integrity.reason
context['_integrity_checks'] = integrity.checks
try:
quarantine_path = move_to_quarantine(
file_path,
context,
f"Integrity check failed: {integrity.reason}",
automation_engine,
trigger='integrity',
)
_mark_task_quarantined(context, quarantine_path)
logger.error(f"File quarantined due to integrity failure: {quarantine_path}")
except Exception as quarantine_error:
logger.error(f"Quarantine failed ({quarantine_error}), deleting broken file: {file_path}")
try:
os.remove(file_path)
except Exception as del_error:
logger.error(f"Could not delete broken file either: {del_error}")
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
task_id = context.get('task_id')
batch_id = context.get('batch_id')
if task_id:
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = (
f"File integrity check failed: {integrity.reason}"
)
if task_id and batch_id:
_notify_download_completed(batch_id, task_id, success=False)
return
if integrity is not None:
logger.info(
f"[Integrity] {_basename} passed "
f"(size={integrity.checks.get('size_bytes', '?')}b, "
f"length={integrity.checks.get('actual_length_s', 0):.1f}s, "
f"drift={integrity.checks.get('length_drift_s', 'n/a')})"
)
_skip_acoustid = _should_skip_quarantine_check(context, 'acoustid')
if _skip_acoustid:
logger.info(f"[AcoustID] Skipped (user approval) for {_basename}")
try:
from core.acoustid_verification import AcoustIDVerification, VerificationResult
verifier = AcoustIDVerification()
available, available_reason = verifier.quick_check_available()
if available and not _skip_acoustid:
context = normalize_import_context(context)
track_info = get_import_track_info(context)
original_search = get_import_original_search(context)
artist_context = get_import_context_artist(context)
expected_track = get_import_clean_title(context, default=original_search.get('title', ''))
expected_artist = ''
track_artists = track_info.get('artists', [])
if track_artists:
first = track_artists[0]
if isinstance(first, dict):
expected_artist = first.get('name', '')
elif isinstance(first, str):
expected_artist = first
if not expected_artist:
expected_artist = extract_artist_name(artist_context) or get_import_clean_artist(context, default='')
if expected_track and expected_artist:
logger.info(f"Running AcoustID verification for: '{expected_track}' by '{expected_artist}'")
verification_result, verification_msg = verifier.verify_audio_file(
file_path,
expected_track,
expected_artist,
context,
)
logger.info(f"AcoustID verification result: {verification_result.value} - {verification_msg}")
context['_acoustid_result'] = verification_result.value
if verification_result == VerificationResult.FAIL:
try:
quarantine_path = move_to_quarantine(
file_path,
context,
verification_msg,
automation_engine,
trigger='acoustid',
)
_mark_task_quarantined(context, quarantine_path)
logger.error(f"File quarantined due to verification failure: {quarantine_path}")
except Exception as quarantine_error:
logger.error(f"Quarantine failed ({quarantine_error}), deleting wrong file: {file_path}")
logger.error(f"Quarantine failed, deleting wrong file: {file_path}")
try:
os.remove(file_path)
except Exception as del_error:
logger.error(f"Could not delete wrong file either: {del_error}")
context['_acoustid_quarantined'] = True
context['_acoustid_failure_msg'] = verification_msg
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
task_id = context.get('task_id')
batch_id = context.get('batch_id')
if task_id:
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = (
f"AcoustID verification failed: {verification_msg}"
)
if task_id and batch_id:
_notify_download_completed(batch_id, task_id, success=False)
return
else:
logger.warning("AcoustID verification skipped: missing track/artist info")
context['_acoustid_result'] = 'skip'
else:
logger.info(f" AcoustID verification not available: {available_reason}")
context['_acoustid_result'] = 'disabled'
except Exception as verify_error:
logger.error(f"AcoustID verification error (continuing normally): {verify_error}")
context['_acoustid_result'] = 'error'
search_result = context.get('search_result', {}) or {}
if not isinstance(search_result, dict):
search_result = {}
is_simple_download = search_result.get('is_simple_download', False)
if is_simple_download:
logger.info(f"Processing simple download (no metadata enhancement): {file_path}")
destination, album_name, filename = build_simple_download_destination(context, file_path)
if album_name:
logger.info(f"Moving to album folder: {album_name}")
else:
logger.info("Moving to Transfer root (single track)")
safe_move_file(file_path, destination)
logger.info(f"Moved simple download to: {destination}")
cleanup_slskd_dedup_siblings(file_path)
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
if web_scan_manager:
threading.Thread(
target=lambda: web_scan_manager.request_scan("Simple download completed"),
daemon=True,
).start()
activity_target = f"{album_name}/{filename}" if album_name else filename
add_activity_item("", "Download Complete", activity_target, "Now")
logger.info(f"Simple download post-processing complete: {activity_target}")
context['_simple_download_completed'] = True
context['_final_path'] = str(destination)
emit_track_downloaded(context, automation_engine)
record_library_history_download(context)
record_download_provenance(context)
try:
check_and_remove_from_wishlist(context)
except Exception as wishlist_error:
logger.error(f"[Simple Download] Error checking wishlist removal: {wishlist_error}")
return
logger.info(f"Starting robust post-processing for: {context_key}")
context = normalize_import_context(context)
artist_context = get_import_context_artist(context)
track_info = get_import_track_info(context)
original_search = get_import_original_search(context)
has_clean_metadata = get_import_has_clean_metadata(context)
if not artist_context:
logger.error("Post-processing failed: Missing artist context.")
return
_junk_artist_names = {'', 'unknown', 'unknown artist', 'various artists', 'none', 'null'}
_artist_name = (artist_context.get('name', '') if isinstance(artist_context, dict) else '').strip()
if _artist_name.lower() in _junk_artist_names:
logger.info(f"[Unknown Artist Guard] Artist name is '{_artist_name}' — attempting to resolve")
_resolved = False
track_info_guard = track_info or {}
original_search_guard = original_search or {}
_ti_artists = track_info_guard.get('artists', [])
if isinstance(_ti_artists, list) and _ti_artists:
_first = _ti_artists[0]
_name = _first.get('name', '') if isinstance(_first, dict) else str(_first)
if _name and _name.strip().lower() not in _junk_artist_names:
artist_context['name'] = _name.strip()
logger.info(f"[Unknown Artist Guard] Resolved from track_info.artists: '{_name}'")
_resolved = True
if not _resolved:
_os_artist = original_search_guard.get('artist') or original_search_guard.get('artist_name') or ''
if isinstance(_os_artist, str) and _os_artist.strip().lower() not in _junk_artist_names:
artist_context['name'] = _os_artist.strip()
logger.info(f"[Unknown Artist Guard] Resolved from original_search_result: '{_os_artist}'")
_resolved = True
if not _resolved:
_track_id = track_info_guard.get('id') or track_info_guard.get('track_id') or ''
if _track_id:
try:
from core.metadata_service import get_client_for_source, get_primary_source
_guard_source = get_import_source(context) or get_primary_source()
_fb_client = get_client_for_source(_guard_source) or get_client_for_source(get_primary_source())
if hasattr(_fb_client, 'get_track_details'):
_details = _fb_client.get_track_details(str(_track_id))
if _details and isinstance(_details, dict):
_d_artists = _details.get('artists', [])
if isinstance(_d_artists, list) and _d_artists:
_d_first = _d_artists[0]
_d_name = _d_first.get('name', '') if isinstance(_d_first, dict) else str(_d_first)
if _d_name and _d_name.strip().lower() not in _junk_artist_names:
artist_context['name'] = _d_name.strip()
logger.info(f"[Unknown Artist Guard] Resolved from metadata API: '{_d_name}'")
_resolved = True
except Exception as _guard_err:
logger.error(f"[Unknown Artist Guard] Metadata re-fetch failed: {_guard_err}")
if not _resolved:
logger.error(f"[Unknown Artist Guard] Could not resolve artist — proceeding with '{_artist_name}'")
context['artist'] = artist_context
playlist_folder_mode = track_info.get("_playlist_folder_mode", False)
logger.debug(f"[Debug] Post-processing - track_info type: {type(track_info)}, is None: {track_info is None}, is empty: {not track_info}")
logger.debug(f"[Debug] Post-processing - playlist_folder_mode: {playlist_folder_mode}")
if track_info:
logger.debug(f"[Debug] Post-processing - track_info keys: {list(track_info.keys())}")
if playlist_folder_mode:
playlist_name = track_info.get("_playlist_name", "Unknown Playlist")
logger.info(f"[Playlist Folder Mode] Organizing in playlist folder: {playlist_name}")
file_ext = os.path.splitext(file_path)[1]
final_path, _ = build_final_path_for_track(context, artist_context, None, file_ext)
logger.info(f"Playlist mode final path: '{final_path}'")
if not os.path.exists(file_path):
if os.path.exists(final_path):
logger.info(
f"[Playlist Folder Mode] Source gone but destination exists — already processed by another thread: "
f"{os.path.basename(final_path)}"
)
context['_final_processed_path'] = final_path
return
pp_logger.info(f"[inner] EXCEPTION in post-processing for {context_key}: Source file not found and destination does not exist: {file_path}")
raise FileNotFoundError(f"Source file not found and destination does not exist: {file_path}")
context['_audio_quality'] = get_audio_quality_string(file_path)
if context['_audio_quality']:
logger.info(f"Audio quality detected: {context['_audio_quality']}")
_skip_bit_depth = _should_skip_quarantine_check(context, 'bit_depth')
rejection_reason = None if _skip_bit_depth else check_flac_bit_depth(file_path, context)
if _skip_bit_depth:
logger.info(f"[BitDepth] Skipped (user approval) for {_basename}")
if rejection_reason:
try:
quarantine_path = move_to_quarantine(
file_path,
context,
rejection_reason,
automation_engine,
trigger='bit_depth',
)
_mark_task_quarantined(context, quarantine_path)
logger.info(f"File quarantined due to bit depth filter: {quarantine_path}")
except Exception as quarantine_error:
logger.error(f"Quarantine failed ({quarantine_error}), deleting file: {file_path}")
try:
os.remove(file_path)
except Exception as e:
logger.debug("delete quarantine fallback: %s", e)
context['_bitdepth_rejected'] = True
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
task_id = context.get('task_id')
batch_id = context.get('batch_id')
if task_id:
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f"Bit depth filter: {rejection_reason}"
if task_id and batch_id:
_notify_download_completed(batch_id, task_id, success=False)
return
try:
logger.warning(
f"[Metadata Input] Playlist mode - artist: '{artist_context.get('name', 'MISSING')}' "
f"(id: {artist_context.get('id', 'MISSING')})"
)
enhance_file_metadata(file_path, context, artist_context, None, runtime=metadata_runtime)
except Exception as meta_err:
import traceback
pp_logger.info(f"[inner] Metadata enhancement FAILED for {context_key}: {meta_err}\n{traceback.format_exc()}")
wipe_source_tags(file_path)
logger.info(f"Moving '{os.path.basename(file_path)}' to '{final_path}'")
safe_move_file(file_path, final_path)
context['_final_processed_path'] = final_path
cleanup_slskd_dedup_siblings(file_path)
if config_manager.get('post_processing.replaygain_enabled', False):
try:
from core.replaygain import analyze_track as _rg_analyze, write_replaygain_tags as _rg_write, is_ffmpeg_available as _rg_ffmpeg_ok, RG_REFERENCE_LUFS as _RG_REF
if _rg_ffmpeg_ok():
lufs, peak_dbfs = _rg_analyze(final_path)
gain_db = _RG_REF - lufs
_rg_write(final_path, gain_db, peak_dbfs)
pp_logger.info(f"ReplayGain: {gain_db:+.2f} dB — {os.path.basename(final_path)}")
except Exception as rg_err:
pp_logger.debug(f"ReplayGain analysis skipped: {rg_err}")
downsampled_path = downsample_hires_flac(final_path, context)
if downsampled_path:
final_path = downsampled_path
context['_final_processed_path'] = final_path
blasphemy_path = create_lossy_copy(final_path)
if blasphemy_path:
context['_final_processed_path'] = blasphemy_path
downloads_path = docker_resolve_path(config_manager.get('soulseek.download_path', './downloads'))
cleanup_empty_directories(downloads_path, file_path)
logger.info(f"[Playlist Folder Mode] Post-processing complete: {final_path}")
try:
check_and_remove_from_wishlist(context)
except Exception as wishlist_error:
logger.error(f"[Playlist Folder] Error checking wishlist removal: {wishlist_error}")
emit_track_downloaded(context, automation_engine)
record_library_history_download(context)
record_download_provenance(context)
task_id = context.get('task_id')
batch_id = context.get('batch_id')
if task_id and batch_id:
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['stream_processed'] = True
download_tasks[task_id]['status'] = 'completed'
logger.info(f"[Playlist Folder Mode] Marked task {task_id} as completed")
_notify_download_completed(batch_id, task_id, success=True)
return
is_album_download = bool(context.get("is_album_download", False))
album_info = build_import_album_info(context, force_album=is_album_download)
if is_album_download:
if has_clean_metadata:
logger.info("Album context with clean metadata found - using normalized album info")
else:
logger.warning("Album context found without clean metadata - using normalized album info")
elif not album_info.get('is_album'):
logger.info("Single track download - attempting album detection")
detected_album_info = detect_album_info_web(context, artist_context)
if detected_album_info:
album_info = detected_album_info
if album_info and album_info['is_album'] and not is_album_download:
logger.info(
"SMART ALBUM GROUPING for track=%r original_album=%r",
album_info.get('clean_track_name', 'Unknown'),
album_info.get('album_name', 'None'),
)
original_album = original_search.get("album") if original_search.get("album") else None
consistent_album_name = resolve_album_group(artist_context, album_info, original_album)
album_info['album_name'] = consistent_album_name
logger.info("Album grouping complete: final_album=%r", consistent_album_name)
elif album_info and album_info['is_album'] and is_album_download:
logger.info(
"EXPLICIT ALBUM DOWNLOAD - preserving album name=%r; skipping smart grouping",
album_info.get('album_name', 'None'),
)
context['_audio_quality'] = get_audio_quality_string(file_path)
if context['_audio_quality']:
logger.info(f"Audio quality detected: {context['_audio_quality']}")
_skip_bit_depth = _should_skip_quarantine_check(context, 'bit_depth')
rejection_reason = None if _skip_bit_depth else check_flac_bit_depth(file_path, context)
if _skip_bit_depth:
logger.info(f"[BitDepth] Skipped (user approval) for {_basename}")
if rejection_reason:
try:
quarantine_path = move_to_quarantine(
file_path,
context,
rejection_reason,
automation_engine,
trigger='bit_depth',
)
_mark_task_quarantined(context, quarantine_path)
logger.info(f"File quarantined due to bit depth filter: {quarantine_path}")
except Exception as quarantine_error:
logger.error(f"Quarantine failed ({quarantine_error}), deleting file: {file_path}")
try:
os.remove(file_path)
except Exception as e:
logger.debug("delete quarantine fallback: %s", e)
context['_bitdepth_rejected'] = True
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
task_id = context.get('task_id')
batch_id = context.get('batch_id')
if task_id:
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f"Bit depth filter: {rejection_reason}"
if task_id and batch_id:
_notify_download_completed(batch_id, task_id, success=False)
return
file_ext = os.path.splitext(file_path)[1]
clean_track_name = get_import_clean_title(
context,
album_info=album_info,
default=original_search.get('title', 'Unknown Track'),
)
track_number = album_info.get('track_number', 1)
logger.debug(
"Final track_number processing: source=%s album_info_track_number=%s track_number=%s",
album_info.get('source', 'unknown'),
album_info.get('track_number', 'NOT_FOUND'),
track_number,
)
if track_number is None:
track_number = extract_track_number_from_filename(file_path)
logger.info(
"Track number was None; extracted from filename=%r -> %s",
os.path.basename(file_path),
track_number,
)
if not isinstance(track_number, int) or track_number < 1:
logger.error(f"Invalid track number ({track_number}), defaulting to 1")
track_number = 1
logger.debug(f"FINAL track_number used for filename: {track_number}")
album_info['track_number'] = track_number
album_info['clean_track_name'] = clean_track_name
logger.info(f"[FIX] Updated album_info track_number to {track_number} for consistent metadata")
final_path, _ = build_final_path_for_track(context, artist_context, album_info, file_ext)
logger.info(f"Resolved path: '{final_path}'")
context['_final_processed_path'] = final_path
try:
logger.warning(f"[Metadata Input] artist: '{artist_context.get('name', 'MISSING')}' (id: {artist_context.get('id', 'MISSING')})")
if album_info:
logger.warning(
f"[Metadata Input] album: '{album_info.get('album_name', 'MISSING')}', "
f"track#: {album_info.get('track_number', 'MISSING')}, disc#: {album_info.get('disc_number', 'MISSING')}, "
f"source: {album_info.get('source', 'unknown')}"
)
else:
logger.info("[Metadata Input] album_info: None (single track)")
enhance_file_metadata(file_path, context, artist_context, album_info, runtime=metadata_runtime)
except Exception as meta_err:
import traceback
pp_logger.info(f"[inner] Metadata enhancement FAILED for {context_key}: {meta_err}\n{traceback.format_exc()}")
wipe_source_tags(file_path)
_enhance_source_info = context.get('track_info', {}).get('source_info') or {}
if isinstance(_enhance_source_info, str):
try:
_enhance_source_info = json.loads(_enhance_source_info)
except (json.JSONDecodeError, TypeError):
_enhance_source_info = {}
is_enhance_download = _enhance_source_info.get('enhance', False)
logger.info(f"Moving '{os.path.basename(file_path)}' to '{final_path}'")
if os.path.exists(final_path):
if not os.path.exists(file_path):
logger.info(f"[Protection] Destination exists and source already gone - file already transferred: {os.path.basename(final_path)}")
return
try:
from mutagen import File as MutagenFile
existing_file = MutagenFile(final_path)
has_metadata = existing_file is not None and len(existing_file.tags or {}) > 2
if has_metadata and not is_enhance_download:
_replace_lower = config_manager.get('import.replace_lower_quality', False)
if _replace_lower:
_existing_tier = get_quality_tier_from_extension(final_path)
_incoming_tier = get_quality_tier_from_extension(file_path)
if _incoming_tier[1] < _existing_tier[1]:
logger.info(f"[Quality Replace] Replacing {_existing_tier[0]} with {_incoming_tier[0]}: {os.path.basename(final_path)}")
try:
os.remove(final_path)
except Exception as e:
logger.error(f"[Quality Replace] Could not remove existing file: {e}")
else:
logger.info(
f"[Protection] Existing file is same or better quality ({_existing_tier[0]} vs {_incoming_tier[0]}) - skipping: "
f"{os.path.basename(final_path)}"
)
try:
os.remove(file_path)
except FileNotFoundError:
pass
except Exception as e:
logger.error(f"[Protection] Error removing redundant file: {e}")
return
else:
logger.info(f"[Protection] Existing file already has metadata enhancement - skipping overwrite: {os.path.basename(final_path)}")
logger.info(f"[Protection] Removing redundant download file: {os.path.basename(file_path)}")
try:
os.remove(file_path)
except FileNotFoundError:
logger.error(f"[Protection] Could not remove redundant file (already gone): {file_path}")
except Exception as e:
logger.error(f"[Protection] Error removing redundant file: {e}")
return
elif is_enhance_download:
logger.info(f"[Enhance] Quality enhance mode — replacing existing file: {os.path.basename(final_path)}")
try:
os.remove(final_path)
except Exception as e:
logger.error(f"[Enhance] Could not remove existing file for replacement: {e}")
else:
logger.info(f"[Protection] Existing file lacks metadata - safe to overwrite: {os.path.basename(final_path)}")
try:
os.remove(final_path)
except FileNotFoundError:
pass
except Exception as check_error:
logger.error(f"[Protection] Error checking existing file metadata, proceeding with overwrite: {check_error}")
try:
if os.path.exists(final_path):
os.remove(final_path)
except Exception as e:
logger.error(f"[Protection] Failed to remove existing file for overwrite: {e}")
if not os.path.exists(file_path):
if os.path.exists(final_path):
logger.info(f"[Pre-Move] Source already gone and destination exists - another thread completed transfer: {os.path.basename(final_path)}")
download_cover_art(album_info, os.path.dirname(final_path), context)
generate_lrc_file(final_path, context, artist_context, album_info)
return
expected_dir = os.path.dirname(final_path)
expected_stem = os.path.splitext(os.path.basename(final_path))[0]
expected_ext = os.path.splitext(final_path)[1]
found_variant = None
check_exts = {expected_ext}
if expected_ext == '.flac' and config_manager.get('lossy_copy.enabled', False) and config_manager.get('lossy_copy.delete_original', False):
_lossy_ext_map = {'mp3': '.mp3', 'opus': '.opus', 'aac': '.m4a'}
_lossy_codec = config_manager.get('lossy_copy.codec', 'mp3')
check_exts.add(_lossy_ext_map.get(_lossy_codec, '.mp3'))
if os.path.exists(expected_dir):
for f in os.listdir(expected_dir):
f_ext = os.path.splitext(f)[1].lower()
if f_ext in check_exts and os.path.splitext(f)[0].startswith(expected_stem):
found_variant = os.path.join(expected_dir, f)
break
if found_variant:
logger.debug(f"[Pre-Move] Source gone but found variant in destination (stream processor handled it): {os.path.basename(found_variant)}")
context['_final_processed_path'] = found_variant
download_cover_art(album_info, expected_dir, context)
generate_lrc_file(found_variant, context, artist_context, album_info)
return
logger.warning(f"[Pre-Move] Source file gone and no matching file in destination: {os.path.basename(file_path)}")
raise FileNotFoundError(f"Source file vanished before move and destination does not exist: {file_path}")
safe_move_file(file_path, final_path)
cleanup_slskd_dedup_siblings(file_path)
if is_enhance_download and _enhance_source_info.get('original_file_path'):
original_enhance_path = _enhance_source_info['original_file_path']
if os.path.normpath(original_enhance_path) != os.path.normpath(final_path) and os.path.exists(original_enhance_path):
try:
os.remove(original_enhance_path)
old_fmt = os.path.splitext(original_enhance_path)[1]
new_fmt = os.path.splitext(final_path)[1]
logger.info(f"[Enhance] Upgraded {old_fmt}{new_fmt}: {os.path.basename(final_path)}")
except Exception as e:
logger.error(f"[Enhance] Could not remove old-format file: {e}")
elif is_enhance_download:
old_fmt = _enhance_source_info.get('original_format', 'unknown')
new_fmt = os.path.splitext(final_path)[1]
logger.info(f"[Enhance] Replaced in-place ({old_fmt}{new_fmt}): {os.path.basename(final_path)}")
download_cover_art(album_info, os.path.dirname(final_path), context)
generate_lrc_file(final_path, context, artist_context, album_info)
if config_manager.get('post_processing.replaygain_enabled', False):
try:
from core.replaygain import analyze_track as _rg_analyze, write_replaygain_tags as _rg_write, is_ffmpeg_available as _rg_ffmpeg_ok, RG_REFERENCE_LUFS as _RG_REF
if _rg_ffmpeg_ok():
lufs, peak_dbfs = _rg_analyze(final_path)
gain_db = _RG_REF - lufs
_rg_write(final_path, gain_db, peak_dbfs)
pp_logger.info(f"ReplayGain: {gain_db:+.2f} dB, peak {peak_dbfs:.2f} dBFS — {os.path.basename(final_path)}")
except Exception as rg_err:
pp_logger.debug(f"ReplayGain analysis skipped: {rg_err}")
downsampled_path = downsample_hires_flac(final_path, context)
if downsampled_path:
final_path = downsampled_path
context['_final_processed_path'] = final_path
blasphemy_path = create_lossy_copy(final_path)
if blasphemy_path:
context['_final_processed_path'] = blasphemy_path
downloads_path = docker_resolve_path(config_manager.get('soulseek.download_path', './downloads'))
cleanup_empty_directories(downloads_path, file_path)
logger.info(f"Post-processing complete for: {context.get('_final_processed_path', final_path)}")
emit_track_downloaded(context, automation_engine)
record_library_history_download(context)
record_download_provenance(context)
record_soulsync_library_entry(context, artist_context, album_info)
try:
if not playlist_folder_mode:
completed_path = context.get('_final_processed_path', final_path)
record_retag_download(context, artist_context, album_info, completed_path)
except Exception as retag_err:
logger.error(f"[Post-Process] Retag data capture failed (non-fatal): {retag_err}")
try:
completed_path = context.get('_final_processed_path', final_path)
batch_id_for_repair = context.get('batch_id')
if completed_path and batch_id_for_repair and repair_worker:
album_folder = os.path.dirname(str(completed_path))
if album_folder:
repair_worker.register_folder(batch_id_for_repair, album_folder)
except Exception as repair_err:
logger.error(f"[Post-Process] Repair folder registration failed: {repair_err}")
try:
completed_path = context.get('_final_processed_path', final_path)
batch_id_for_consistency = context.get('batch_id')
if completed_path and batch_id_for_consistency and album_info and album_info.get('is_album'):
_file_info = {
'path': str(completed_path),
'track_number': album_info.get('track_number', 1),
'disc_number': album_info.get('disc_number', 1),
'title': get_import_clean_title(
context,
album_info=album_info,
default=album_info.get('clean_track_name', ''),
),
}
with tasks_lock:
if batch_id_for_consistency in download_batches:
download_batches[batch_id_for_consistency].setdefault('_consistency_files', []).append(_file_info)
except Exception as cons_err:
logger.error(f"[Post-Process] Album consistency registration failed: {cons_err}")
try:
check_and_remove_from_wishlist(context)
except Exception as wishlist_error:
logger.error(f"[Post-Process] Error checking wishlist removal: {wishlist_error}")
task_id = context.get('task_id')
batch_id = context.get('batch_id')
if task_id and batch_id:
logger.info(f"[Post-Process] Calling completion callback for task {task_id} in batch {batch_id}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['stream_processed'] = True
download_tasks[task_id]['status'] = 'completed'
logger.info(f"[Post-Process] Marked task {task_id} as completed")
_notify_download_completed(batch_id, task_id, success=True)
except Exception as e:
import traceback
pp_logger.info(f"[inner] EXCEPTION in post-processing for {context_key}: {e}")
pp_logger.info(traceback.format_exc())
logger.error(f"\nCRITICAL ERROR in post-processing for {context_key}: {e}")
traceback.print_exc()
source_exists = os.path.exists(file_path) if file_path else False
if source_exists:
if context_key in processed_download_ids:
processed_download_ids.remove(context_key)
logger.warning(f"Removed {context_key} from processed set - will retry on next check")
with matched_context_lock:
if context_key not in matched_downloads_context:
matched_downloads_context[context_key] = context
logger.warning(f"Re-added {context_key} to context for retry")
else:
logger.warning(f"Source file gone, not retrying: {context_key}")
finally:
file_lock.release()
with post_process_locks_lock:
post_process_locks.pop(context_key, None)
def post_process_matched_download_with_verification(context_key, context, file_path, task_id, batch_id, runtime, metadata_runtime=None):
on_download_completed = getattr(runtime, "on_download_completed", None)
def _notify_download_completed(batch_id, task_id, success=True):
if on_download_completed:
on_download_completed(batch_id, task_id, success=success)
logger = pp_logger
try:
original_task_id = context.pop('task_id', None)
original_batch_id = context.pop('batch_id', None)
post_process_matched_download(context_key, context, file_path, runtime, metadata_runtime=metadata_runtime)
if original_task_id:
context['task_id'] = original_task_id
if original_batch_id:
context['batch_id'] = original_batch_id
if context.get('_race_guard_failed'):
logger.info(f"Race guard: source file gone for task {task_id} — marking as failed")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = 'Source file was already processed or removed by another task'
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=False)
return
if context.get('_acoustid_quarantined'):
failure_msg = context.get('_acoustid_failure_msg', 'AcoustID verification failed')
logger.info(f"File was quarantined by AcoustID verification (task={task_id}): {failure_msg}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f"AcoustID verification failed: {failure_msg}"
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=False)
return
if context.get('_simple_download_completed'):
expected_final_path = context.get('_final_path')
if expected_final_path and os.path.exists(expected_final_path):
with tasks_lock:
if task_id in download_tasks:
_mark_task_completed(task_id, context.get('track_info'))
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=True)
return
logger.info(
f"FAILED simple download file not found at: {expected_final_path} "
f"(task={task_id}, context={context_key})"
)
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = (
f"Downloaded file not found at expected location: {os.path.basename(expected_final_path)}"
)
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=False)
return
# Integrity rejection — the inner pipeline quarantined the file
# because audio integrity (size / parse / duration) failed. Wrapper
# was previously falling through to "assuming success" because
# quarantined files have no _final_processed_path, which left the
# task showing ✅ Completed in the UI even though the file is in
# quarantine. Reported by user when downloading Mr. Morale: 3
# tracks (Rich Interlude, Savior Interlude, Savior) showed
# Completed in the modal but were missing on disk because their
# source files failed integrity and were quarantined.
if context.get('_integrity_failure_msg'):
failure_msg = context.get('_integrity_failure_msg', 'unknown')
logger.error(
f"Task {task_id} failed integrity check — marking failed: {failure_msg}"
)
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = (
f"File integrity check failed: {failure_msg}"
)
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=False)
return
# Race guard failure — inner code set this when the source file
# disappeared and there was no known destination to fall back on
# (vs the legitimate race-guard skip where a sibling thread
# already moved the file to its destination).
if context.get('_race_guard_failed'):
logger.error(f"Task {task_id} failed race guard — source file gone with no known destination")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = (
"Source file disappeared before post-processing could complete"
)
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=False)
return
expected_final_path = context.get('_final_processed_path')
if not expected_final_path:
logger.info(f"No _final_processed_path in context for task {task_id} — cannot verify, assuming success")
with tasks_lock:
if task_id in download_tasks:
_mark_task_completed(task_id, context.get('track_info'))
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=True)
return
if os.path.exists(expected_final_path):
redownload_ctx = None
with tasks_lock:
if task_id in download_tasks:
_mark_task_completed(task_id, context.get('track_info'))
download_tasks[task_id]['metadata_enhanced'] = True
redownload_ctx = download_tasks[task_id].get('_redownload_context')
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
if redownload_ctx:
try:
old_path = redownload_ctx.get('old_file_path')
lib_track_id = redownload_ctx.get('library_track_id')
if redownload_ctx.get('delete_old_file') and old_path and os.path.exists(old_path):
if os.path.normpath(old_path) != os.path.normpath(expected_final_path):
os.remove(old_path)
logger.info(f"[Redownload] Deleted old file: {old_path}")
if lib_track_id and expected_final_path:
_rd_db = get_database()
_rd_conn = _rd_db._get_connection()
_rd_cursor = _rd_conn.cursor()
_rd_cursor.execute(
"""
UPDATE tracks SET file_path = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(expected_final_path, lib_track_id),
)
_rd_conn.commit()
_rd_conn.close()
logger.info(f"[Redownload] Updated DB path for track {lib_track_id}")
except Exception as e:
logger.error(f"[Redownload] Post-processing hook error: {e}")
_notify_download_completed(batch_id, task_id, success=True)
else:
track_name = get_import_clean_title(context, default=context_key)
logger.info(f"FAILED verification for '{track_name}' (task={task_id})")
logger.info(f" expected_final_path: {expected_final_path}")
logger.info(f" file_path (source): {file_path}, exists={os.path.exists(file_path)}")
logger.info(
f" is_album={context.get('is_album_download', False)}, "
f"has_clean_data={get_import_has_clean_metadata(context)}"
)
expected_dir = os.path.dirname(expected_final_path)
if os.path.exists(expected_dir):
dir_contents = os.listdir(expected_dir)
logger.info(f" directory contains {len(dir_contents)} files: {dir_contents[:20]}")
else:
logger.info(f" directory does not exist: {expected_dir}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = (
f'File verification failed: expected file at {os.path.basename(expected_final_path)} but it was not found after processing'
)
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=False)
except Exception as e:
import traceback
logger.info(f"EXCEPTION in post-processing for '{context_key}' (task={task_id}): {e}")
logger.info(traceback.format_exc())
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f"Post-processing verification failed: {str(e)}"
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
_notify_download_completed(batch_id, task_id, success=False)