You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/downloads/post_processing.py

626 lines
35 KiB

"""Post-processing worker for completed downloads.
The verification workflow that runs AFTER a slskd transfer reports as
'Succeeded' but BEFORE the task is marked completed in the UI. Locates
the file on disk (with retries + multiple search strategies), routes
it through metadata enhancement and the import pipeline, and finally
calls the batch lifecycle completion callback.
Lifted verbatim from web_server.py's `_run_post_processing_worker`.
The single function is intentionally kept as one ~400-line block to
preserve byte-for-byte parity with the original — refactoring into
smaller helpers gets its own follow-up PR.
Dependencies are passed in via `PostProcessDeps` since the function
needs ~9 callbacks/refs and direct injection beats hidden imports.
"""
from __future__ import annotations
import logging
import os
import shutil
import time
import traceback
from dataclasses import dataclass
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any, Callable, Optional
from core.imports.album_naming import resolve_album_group as _resolve_album_group
from core.imports.context import (
get_import_clean_album,
get_import_clean_title,
get_import_context_album,
get_import_context_artist,
get_import_original_search,
normalize_import_context,
)
from core.imports.filename import extract_track_number_from_filename, parse_filename_metadata
from core.metadata import enrichment as metadata_enrichment
from core.runtime_state import (
download_tasks,
matched_context_lock,
matched_downloads_context,
tasks_lock,
)
logger = logging.getLogger(__name__)
_AUDIO_EXTENSIONS = {
'.flac', '.ape', '.wav', '.alac', '.dsf', '.dff', '.aiff', '.aif',
'.opus', '.ogg', '.m4a', '.aac', '.mp3', '.wma',
}
def _is_audio_file(path: str) -> bool:
return Path(str(path or '')).suffix.lower() in _AUDIO_EXTENSIONS
def _reject_non_audio_found_file(found_file: Optional[str], file_location: Optional[str]) -> tuple[Optional[str], Optional[str]]:
if found_file and not _is_audio_file(found_file):
logger.warning(
"[Post-Processing] Ignoring non-audio candidate found during file search: %s",
found_file,
)
return None, None
return found_file, file_location
def _normalize_match_text(value: str) -> str:
return ''.join(ch.lower() for ch in str(value or '') if ch.isalnum())
def _release_audio_match_score(path: str, expected_title: str, expected_artist: str) -> float:
parsed = parse_filename_metadata(path)
parsed_title = parsed.get('title') or Path(path).stem
parsed_artist = parsed.get('artist') or ''
expected_title_norm = _normalize_match_text(expected_title)
parsed_title_norm = _normalize_match_text(parsed_title)
if expected_title_norm and (
expected_title_norm in parsed_title_norm or parsed_title_norm in expected_title_norm
):
title_score = 1.0
else:
title_score = SequenceMatcher(
None,
expected_title_norm,
parsed_title_norm,
).ratio()
if expected_artist and parsed_artist:
artist_score = SequenceMatcher(
None,
_normalize_match_text(expected_artist),
_normalize_match_text(parsed_artist),
).ratio()
return (title_score * 0.75) + (artist_score * 0.25)
return title_score
def _track_title_from_task(track_info: Any, context: Optional[dict]) -> str:
if isinstance(track_info, dict):
title = track_info.get('name') or track_info.get('title')
if title:
return str(title)
return get_import_clean_title(context or {}, default='')
def _copy_release_audio_to_transfer(source_path: str, transfer_dir: str) -> Optional[str]:
try:
src = Path(source_path)
if not src.exists() or not src.is_file():
return None
dest_dir = Path(transfer_dir)
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / src.name
if dest.exists():
stem, suffix = src.stem, src.suffix
counter = 1
while dest.exists():
dest = dest_dir / f"{stem}_release_{counter}{suffix}"
counter += 1
shutil.copy2(src, dest)
return str(dest)
except Exception as exc:
logger.warning("[Post-Processing] Could not copy release audio to transfer: %s", exc)
return None
@dataclass
class PostProcessDeps:
"""Bundle of dependencies the post-processing worker needs.
Constructed per-call by the route layer so client / config refs are
always live (no caching of pre-init Spotify clients etc).
"""
config_manager: Any
download_orchestrator: Any
run_async: Callable
docker_resolve_path: Callable[[str], str]
extract_filename: Callable[[str], str]
make_context_key: Callable[[str, str], str]
find_completed_file: Callable
enhance_file_metadata: Callable
wipe_source_tags: Callable[[str], bool]
post_process_with_verification: Callable
mark_task_completed: Callable[[str, Optional[dict]], None]
on_download_completed: Callable[[str, str, bool], None]
def run_post_processing_worker(task_id: str, batch_id: str, deps: PostProcessDeps) -> None:
"""NEW VERIFICATION WORKFLOW: Post-processing worker that only sets 'completed'
status after successful file verification and processing. This matches sync.py's
reliability.
"""
try:
logger.info(f"[Post-Processing] Starting verification for task {task_id}")
# Retrieve task details from global state
with tasks_lock:
if task_id not in download_tasks:
logger.warning(f"[Post-Processing] Task {task_id} not found in download_tasks")
return
task = download_tasks[task_id].copy()
# Check if task was cancelled or already completed during post-processing
if task['status'] == 'cancelled':
logger.warning(f"[Post-Processing] Task {task_id} was cancelled, skipping verification")
return
if task['status'] == 'completed' or task.get('stream_processed'):
logger.info(f"[Post-Processing] Task {task_id} already completed by stream processor, skipping verification")
return
# Extract file information for verification
track_info = task.get('track_info', {})
task_filename = task.get('filename') or track_info.get('filename')
task_username = task.get('username') or track_info.get('username')
if not task_filename or not task_username:
logger.warning(f"[Post-Processing] Missing filename or username for task {task_id}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = 'Post-processing failed: missing file or source information from Soulseek transfer'
deps.on_download_completed(batch_id, task_id, False)
return
download_dir = deps.docker_resolve_path(deps.config_manager.get('soulseek.download_path', './downloads'))
transfer_dir = deps.docker_resolve_path(deps.config_manager.get('soulseek.transfer_path', './Transfer'))
# Try to get context for generating the correct final filename
task_basename = deps.extract_filename(task_filename)
context_key = deps.make_context_key(task_username, task_filename)
expected_final_filename = None
logger.info(f"[Post-Processing] Looking up context with key: {context_key}")
with matched_context_lock:
context = matched_downloads_context.get(context_key)
# Debug: Show all available context keys
available_keys = list(matched_downloads_context.keys())
logger.info(f"[Post-Processing] Available context keys: {available_keys[:10]}...") # Show first 10 keys
if context:
logger.info(f"[Post-Processing] Found context for key: {context_key}")
try:
original_search = context.get("original_search_result", {})
logger.info(f"[Post-Processing] original_search keys: {list(original_search.keys())}")
clean_title = get_import_clean_title(context, default=original_search.get('title', ''))
track_number = original_search.get('track_number')
logger.info(f"[Post-Processing] clean_title: '{clean_title}', track_number: {track_number}")
if clean_title and track_number:
# Generate expected final filename that stream processor would create
# Pattern: f"{track_number:02d} - {clean_title}.flac"
sanitized_title = clean_title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')
expected_final_filename = f"{track_number:02d} - {sanitized_title}.flac"
logger.info(f"[Post-Processing] Generated expected final filename: {expected_final_filename}")
else:
logger.warning(f"[Post-Processing] Missing required data - clean_title: {bool(clean_title)}, track_number: {bool(track_number)}")
except Exception as e:
logger.error(f"[Post-Processing] Error generating expected filename: {e}")
traceback.print_exc()
else:
logger.warning(f"[Post-Processing] No context found for key: {context_key}")
# Try fuzzy matching with similar keys containing the filename
# SAFETY: Constrain to same Soulseek username to prevent cross-album
# metadata contamination during mass downloads (e.g., two albums both
# having "01 - Intro.flac" would match the wrong context without this)
with matched_context_lock:
similar_keys = [k for k in matched_downloads_context.keys()
if k.startswith(f"{task_username}::") and task_basename in k]
if similar_keys:
# Use the first similar key found
fuzzy_key = similar_keys[0]
context = matched_downloads_context.get(fuzzy_key)
logger.info(f"[Post-Processing] Found context using fuzzy key matching: {fuzzy_key}")
# Generate expected final filename using the found context
try:
original_search = context.get("original_search_result", {})
logger.info(f"[Post-Processing] fuzzy context original_search keys: {list(original_search.keys())}")
clean_title = get_import_clean_title(context, default=original_search.get('title', ''))
track_number = original_search.get('track_number')
logger.info(f"[Post-Processing] fuzzy context clean_title: '{clean_title}', track_number: {track_number}")
if clean_title and track_number:
# Generate expected final filename that stream processor would create
# Pattern: f"{track_number:02d} - {clean_title}.flac"
sanitized_title = clean_title.replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('<', '_').replace('>', '_').replace('|', '_')
expected_final_filename = f"{track_number:02d} - {sanitized_title}.flac"
logger.info(f"[Post-Processing] Generated expected final filename from fuzzy match: {expected_final_filename}")
else:
logger.warning(f"[Post-Processing] Missing required data from fuzzy match - clean_title: {bool(clean_title)}, track_number: {bool(track_number)}")
except Exception as e:
logger.error(f"[Post-Processing] Error generating expected filename from fuzzy match: {e}")
traceback.print_exc()
else:
logger.warning(f"[Post-Processing] No similar keys found containing '{task_basename}'")
# Show a sample of what keys actually exist for debugging
sample_keys = list(matched_downloads_context.keys())[:5]
logger.info(f"[Post-Processing] Sample of existing keys: {sample_keys}")
# RESILIENT FILE-FINDING LOOP: Try up to 3 times with delays
found_file = None
file_location = None
# CRITICAL FIX: For YouTube downloads, the filename in task is 'id||title' (metadata),
# but the actual file on disk is 'Title.mp3'. We must ask the client for the real path.
# Torrent/usenet also use opaque "url||display" filenames. Their completed
# job may contain a full release, so choose the best matching audio file
# and copy it to transfer before importing instead of moving the client's
# original completed download.
if (task.get('username') == 'youtube' or '||' in str(task_filename)) and not found_file:
logger.info(f"[Post-Processing] Detected engine-backed download task: {task_id}")
try:
# Query the download orchestrator for the status which contains the real file path
# CRITICAL FIX: Use the actual download_id designated by the client, not the internal task_id
actual_download_id = task.get('download_id') or task_id
status = deps.run_async(deps.download_orchestrator.get_download_status(actual_download_id))
if status and task.get('username') in ('torrent', 'usenet'):
audio_files = list(getattr(status, 'audio_files', None) or [])
if not audio_files and getattr(status, 'file_path', None):
audio_files = [status.file_path]
expected_title = _track_title_from_task(track_info, context)
expected_artist = ''
if isinstance(track_info, dict):
artists = track_info.get('artists', [])
if isinstance(artists, list) and artists:
first_artist = artists[0]
expected_artist = first_artist.get('name', '') if isinstance(first_artist, dict) else str(first_artist)
elif isinstance(artists, str):
expected_artist = artists
if not expected_artist and context:
artist_ctx = get_import_context_artist(context)
expected_artist = artist_ctx.get('name', '') if isinstance(artist_ctx, dict) else ''
scored_files = [
(_release_audio_match_score(path, expected_title, expected_artist), path)
for path in audio_files
if _is_audio_file(path)
]
scored_files.sort(reverse=True)
if scored_files:
best_score, best_path = scored_files[0]
logger.info(
"[Post-Processing] Best %s release file for '%s': %s (score %.2f)",
task.get('username'), expected_title, best_path, best_score,
)
if best_score >= 0.80:
copied_path = _copy_release_audio_to_transfer(best_path, transfer_dir)
if copied_path:
found_file = copied_path
file_location = 'download'
logger.info(
"[Post-Processing] Copied matched %s release file to transfer: %s",
task.get('username'), copied_path,
)
else:
logger.warning(
"[Post-Processing] No %s release file met match threshold for '%s' (best %.2f)",
task.get('username'), expected_title, best_score,
)
if not found_file:
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = (
f"No matching audio file found in {task.get('username')} release"
)
deps.on_download_completed(batch_id, task_id, False)
return
if status and status.file_path and not found_file and task.get('username') not in ('torrent', 'usenet'):
real_path = status.file_path
if os.path.exists(real_path):
# Determine if it's in download or transfer directory
real_path_obj = Path(real_path)
download_dir_obj = Path(download_dir)
transfer_dir_obj = Path(transfer_dir)
# Use absolute path comparison
try:
if download_dir_obj.resolve() in real_path_obj.resolve().parents:
file_location = 'download'
elif transfer_dir_obj.resolve() in real_path_obj.resolve().parents:
file_location = 'transfer'
else:
file_location = 'absolute'
except: # noqa: E722 -- byte-faithful to original (catches even KeyboardInterrupt)
# Fallback if resolve fails (e.g. permission or path issues)
file_location = 'absolute'
if file_location:
# We found the file! Use the absolute path if it confuses the joining logic,
# but usually we want just the filename if location is 'download'/'transfer'
# CRITICAL FIX: Always use the absolute real_path.
# Stripping to basename causes FileNotFoundError because post-processing
# runs with CWD as project root, not download dir.
found_file = real_path
logger.info(f"[Post-Processing] Resolved actual YouTube filename: {found_file} (Location: {file_location})")
else:
logger.warning(f"[Post-Processing] Engine status reported path but file missing: {real_path}")
else:
if not found_file:
logger.warning(f"[Post-Processing] Engine status returned no file_path for task {task_id}")
except Exception as e:
logger.error(f"[Post-Processing] Failed to retrieve engine task status: {e}")
_file_search_max_retries = 5
for retry_count in range(_file_search_max_retries):
# If we already resolved the file (e.g. via YouTube status), skip searching
if found_file:
logger.info(f"[Post-Processing] Skipping search loop, file already resolved: {found_file}")
break
# Check if stream processor already completed this task while we were waiting
with tasks_lock:
if task_id in download_tasks:
if download_tasks[task_id].get('stream_processed') or download_tasks[task_id]['status'] == 'completed':
logger.info(f"[Post-Processing] Task {task_id} was completed by stream processor during file search - done")
return
logger.warning(f"[Post-Processing] Attempt {retry_count + 1}/{_file_search_max_retries} to find file")
logger.info(f"[Post-Processing] Original filename: {task_basename}")
if expected_final_filename:
logger.info(f"[Post-Processing] Expected final filename: {expected_final_filename}")
else:
logger.warning("[Post-Processing] No expected final filename available")
# Strategy 1: Try with original filename in both downloads and transfer
logger.info("[Post-Processing] Strategy 1: Searching with original filename...")
found_file, file_location = deps.find_completed_file(download_dir, task_filename, transfer_dir)
found_file, file_location = _reject_non_audio_found_file(found_file, file_location)
if found_file:
logger.info(f"[Post-Processing] Strategy 1 SUCCESS: Found file with original filename in {file_location}: {found_file}")
else:
logger.error("[Post-Processing] Strategy 1 FAILED: Original filename not found in either location")
# Strategy 2: If not found and we have an expected final filename, try that in transfer folder
if not found_file and expected_final_filename:
logger.info("[Post-Processing] Strategy 2: Searching transfer folder with expected final filename...")
found_result = deps.find_completed_file(transfer_dir, expected_final_filename)
if found_result and found_result[0]:
found_file, file_location = found_result[0], 'transfer'
found_file, file_location = _reject_non_audio_found_file(found_file, file_location)
if found_file:
logger.info(f"[Post-Processing] Strategy 2 SUCCESS: Found file with expected final filename: {found_file}")
else:
logger.error("[Post-Processing] Strategy 2 FAILED: Expected final filename resolved to a non-audio file")
else:
logger.error("[Post-Processing] Strategy 2 FAILED: Expected final filename not found in transfer folder")
elif not expected_final_filename:
logger.warning("[Post-Processing] Strategy 2 SKIPPED: No expected final filename available")
if found_file:
logger.warning(f"[Post-Processing] FILE FOUND after {retry_count + 1} attempts in {file_location}: {found_file}")
break
else:
logger.error(f"[Post-Processing] All search strategies failed on attempt {retry_count + 1}/{_file_search_max_retries}")
if retry_count < _file_search_max_retries - 1: # Don't sleep on final attempt
logger.info("[Post-Processing] Waiting 5 seconds before next attempt...")
time.sleep(5)
if not found_file:
# CRITICAL: Before marking as failed, check if stream processor already handled this
# The /api/downloads/status polling endpoint processes files independently and may have
# already moved/renamed/tagged the file successfully while we were searching
with tasks_lock:
if task_id in download_tasks:
if download_tasks[task_id].get('stream_processed') or download_tasks[task_id]['status'] == 'completed':
logger.error(f"[Post-Processing] Task {task_id} was completed by stream processor - not marking as failed")
return
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f'File not found on disk after {_file_search_max_retries} search attempts. Expected: {os.path.basename(task_filename)}'
deps.on_download_completed(batch_id, task_id, False)
return
# Handle file found in transfer folder - already completed by stream processor
if file_location == 'transfer':
logger.info(f"[Post-Processing] File found in transfer folder - already completed by stream processor: {found_file}")
# Check if metadata enhancement was completed
metadata_enhanced = False
with tasks_lock:
if task_id in download_tasks:
metadata_enhanced = download_tasks[task_id].get('metadata_enhanced', False)
if not metadata_enhanced:
logger.warning("[Post-Processing] File in transfer folder missing metadata enhancement - completing now")
# Attempt to complete metadata enhancement using context
if context and expected_final_filename:
try:
context = normalize_import_context(context)
# Extract required data from context
original_search = get_import_original_search(context)
artist_context = get_import_context_artist(context)
album_context = get_import_context_album(context)
if artist_context and album_context:
# CRITICAL FIX: Create album_info dict with proper structure for metadata enhancement
# This must match the format used in main stream processor to ensure consistency
# Extract track number from context (should be available from fuzzy match)
track_number = original_search.get('track_number', 1)
# If no track number in context, extract from filename
if track_number == 1 and found_file:
track_number = extract_track_number_from_filename(found_file)
logger.warning(
"[Verification] missing track_number; extracted from filename=%r -> %s",
os.path.basename(found_file),
track_number,
)
# Ensure track_number is valid
if not isinstance(track_number, int) or track_number < 1:
logger.error(f"[Verification] Invalid track number ({track_number}), defaulting to 1")
track_number = 1
# Get clean track name
clean_track_name = get_import_clean_title(context, default=original_search.get('title', 'Unknown Track'))
album_name = get_import_clean_album(context, default=album_context.get('name', 'Unknown Album'))
album_image_url = album_context.get('image_url')
if not album_image_url and album_context.get('images'):
album_images = album_context.get('images', [])
if album_images and isinstance(album_images[0], dict):
album_image_url = album_images[0].get('url')
album_info = {
'is_album': True, # CRITICAL: Mark as album track
'album_name': album_name,
'track_number': track_number, # CORRECTED TRACK NUMBER
'disc_number': original_search.get('disc_number', 1),
'clean_track_name': clean_track_name,
'album_image_url': album_image_url,
'confidence': 0.9,
'source': 'verification_worker_corrected',
}
# Apply album grouping for consistency with stream processor path.
# Only for singles/auto-detected — explicit album downloads already
# have the correct Spotify name and re-grouping would mangle it.
if not context.get("is_album_download", False):
try:
raw_album_ctx = original_search.get('album')
if isinstance(raw_album_ctx, str):
original_album_ctx = raw_album_ctx
elif isinstance(raw_album_ctx, dict) and 'name' in raw_album_ctx:
original_album_ctx = raw_album_ctx['name']
else:
original_album_ctx = None
consistent_album_name = _resolve_album_group(artist_context, album_info, original_album_ctx)
album_info['album_name'] = consistent_album_name
except Exception as group_err:
logger.error(f"[Verification] Album grouping failed, using raw name: {group_err}")
else:
logger.info(f"[Verification] Explicit album download - preserving album name: '{album_info['album_name']}'")
logger.info(f"[Verification] Created proper album_info - track_number: {track_number}, album: {album_info['album_name']}")
logger.info(f"[Post-Processing] Attempting metadata enhancement for: {found_file}")
logger.warning(f"[Metadata Input] Verification worker - artist: '{artist_context.get('name', 'MISSING')}' (id: {artist_context.get('id', 'MISSING')})")
logger.warning(f"[Metadata Input] Verification worker - album: '{album_info.get('album_name', 'MISSING')}', track#: {album_info.get('track_number', 'MISSING')}, source: {album_info.get('source', 'unknown')}")
enhancement_success = deps.enhance_file_metadata(found_file, context, artist_context, album_info)
if enhancement_success:
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['metadata_enhanced'] = True
logger.info(f"[Post-Processing] Successfully completed metadata enhancement for: {os.path.basename(found_file)}")
else:
logger.info(f"[Post-Processing] Metadata enhancement returned False for: {os.path.basename(found_file)}")
else:
logger.warning("[Post-Processing] Missing artist or album in context")
logger.info(f"[Post-Processing] artist_context: {artist_context is not None}, album_context: {album_context is not None}")
# Wipe source tags even without full enhancement — prevents
# Soulseek uploader's MusicBrainz IDs from causing album splits
if found_file and os.path.exists(found_file):
deps.wipe_source_tags(found_file)
except Exception as enhancement_error:
logger.error(f"[Post-Processing] Error during metadata enhancement: {enhancement_error}\n{traceback.format_exc()}")
if found_file and os.path.exists(found_file):
deps.wipe_source_tags(found_file)
else:
logger.warning("[Post-Processing] Cannot complete metadata enhancement - missing context or expected filename")
if found_file and os.path.exists(found_file):
deps.wipe_source_tags(found_file)
else:
logger.info("[Post-Processing] File already has metadata enhancement completed")
with tasks_lock:
if task_id in download_tasks:
track_info = download_tasks[task_id].get('track_info')
deps.mark_task_completed(task_id, track_info)
# Clean up context now that both stream processor and verification worker are done
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
logger.info(f"[Verification] Cleaned up context after successful verification: {context_key}")
deps.on_download_completed(batch_id, task_id, True)
return
# File found in downloads folder - attempt post-processing
try:
# Rebuild the context key using the same function that stored it
context_key = deps.make_context_key(task_username, task_filename)
# Check if this download has matched context for post-processing
with matched_context_lock:
context = matched_downloads_context.get(context_key)
if context:
logger.info(f"[Post-Processing] Found matched context, running full post-processing for: {context_key}")
# Run the existing post-processing logic with verification
deps.post_process_with_verification(context_key, context, found_file, task_id, batch_id)
else:
# No matched context - just mark as completed since file exists
logger.warning(f"[Post-Processing] No matched context, marking as completed: {os.path.basename(found_file)}")
with tasks_lock:
if task_id in download_tasks:
track_info = download_tasks[task_id].get('track_info')
deps.mark_task_completed(task_id, track_info)
# Clean up context if it exists (might be leftover from stream processor)
with matched_context_lock:
if context_key in matched_downloads_context:
del matched_downloads_context[context_key]
logger.info(f"[Verification] Cleaned up leftover context: {context_key}")
# Call completion callback since there's no other post-processing to handle it
deps.on_download_completed(batch_id, task_id, True)
except Exception as processing_error:
logger.error(f"[Post-Processing] Processing failed for task {task_id}: {processing_error}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f"Post-processing failed: {str(processing_error)}"
deps.on_download_completed(batch_id, task_id, False)
except Exception as e:
logger.error(f"[Post-Processing] Critical error in post-processing worker for task {task_id}: {e}")
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'failed'
download_tasks[task_id]['error_message'] = f"Critical post-processing error: {str(e)}"
deps.on_download_completed(batch_id, task_id, False)
# Re-export the metadata helper so callers can wrap it in a callback without
# importing from core.metadata directly.
__all__ = [
'PostProcessDeps',
'run_post_processing_worker',
'metadata_enrichment',
]