Lift _try_staging_match to core/downloads/staging.py

Pulls the 201-line staging-folder shortcut out of `web_server.py` into
its own module under the existing `core/downloads/` package. Pure 1:1
lift — wrapper keeps the original entry-point name so the task worker's
existing call site continues to work without changes.

What `try_staging_match` does:

1. Pull the per-batch staging-file cache (one filesystem scan per batch).
2. For each staging entry, compute title + artist similarity using
   SequenceMatcher and the matching engine's `normalize_string`. Require
   title >= 0.80, then a combined score >= 0.75. The weighting flips
   based on whether artist info is available on both sides:
   - both have artist: 0.55*title + 0.45*artist
   - either side missing artist: 0.80*title + 0.20*artist (lean on title)
3. Copy the matched file to the configured transfer dir (with a
   "_staging" suffix when the destination filename already exists, to
   avoid overwriting a legitimate prior download).
4. Mark the task as 'post_processing', username='staging',
   staging_match=True.
5. Build a synthetic spotify_artist / spotify_album context (mirroring
   the modal-worker logic so the file-organization template applies
   cleanly) and store it under "staging_<task_id>". Two paths:
   - Explicit context branch (track_info has _is_explicit_album_download)
     → real album/artist data copied through.
   - Fallback branch → synthesized from track + track_info, with
     `is_album_download` heuristically derived (album differs from title
     and isn't "Unknown Album").
6. Hand off to `_post_process_matched_download_with_verification` which
   does tagging, path building, AcoustID verification, and DB insertion.

Returns True if the staging shortcut won; False to fall through to the
normal Soulseek search path.

Dependencies injected via `StagingDeps` (5 fields) — config_manager,
matching_engine, get_staging_file_cache, docker_resolve_path,
post_process_matched_download_with_verification.

Diff vs original after `deps.X` → global X normalization is **zero
differences** — 201 lines orig = 201 lines lifted, byte-identical body
(including all whitespace, comments, log strings, and the inline
`from difflib import SequenceMatcher` / `import shutil` imports inside
the function body).

Tests: 9 new under tests/downloads/test_downloads_staging.py covering
no staging files / no track title / low-confidence match returning
False, exact match copying file + transitioning task state + invoking
post-processing, existing-file rename via `_staging` suffix, explicit
album context branch, fallback context synthesis (with both album-as-
album and album-equals-title cases), and copy failure (missing source
file) returning False.

Full suite: 1308 passing (was 1299). Ruff clean.
pull/418/head
Broque Thomas 4 weeks ago
parent 7d9ee39090
commit a2e068eaba

@ -0,0 +1,263 @@
"""Staging-folder match shortcut for downloads.
`try_staging_match(task_id, batch_id, track, deps)` is the per-track
shortcut the task worker calls before kicking off a Soulseek search.
If the user has dropped audio files matching the track into the
configured staging folder, we copy directly to the transfer dir and
hand off to post-processing skipping the network round-trip entirely.
1. Pull the staging-file cache for the batch (one scan per batch).
2. Compute title + artist similarity (SequenceMatcher) against each
staging entry; require title >= 0.80 and combined score >= 0.75.
Score weighting flips based on whether artist info is available on
both sides:
- both have artist: 0.55*title + 0.45*artist
- either side missing artist: 0.80*title + 0.20*artist (lean on title)
3. Copy the matched file to the transfer dir (suffix "_staging" if a
file with that name already exists).
4. Mark the task as 'post_processing' with username='staging'.
5. Build a synthetic spotify_artist / spotify_album context (mirrors
the modal-worker's logic so the path template applies cleanly) and
store it in matched_downloads_context under "staging_<task_id>".
6. Hand off to `_post_process_matched_download_with_verification` which
does tagging, path building, AcoustID verification, and DB insertion.
Returns True if the staging shortcut won; False to fall through to the
normal Soulseek search path.
Lifted verbatim from web_server.py. Wide dependency surface
(matching_engine, post-processing helper, file-system helpers, staging
cache, runtime state) all injected via `StagingDeps`.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from typing import Any, Callable
# `shutil` and `SequenceMatcher` are imported inline inside try_staging_match()
# to keep the lift byte-identical with the original web_server.py function body.
from core.runtime_state import (
download_tasks,
matched_context_lock,
matched_downloads_context,
tasks_lock,
)
logger = logging.getLogger(__name__)
@dataclass
class StagingDeps:
"""Bundle of cross-cutting deps the staging-match helper needs."""
config_manager: Any
matching_engine: Any
get_staging_file_cache: Callable[[str], list]
docker_resolve_path: Callable[[str], str]
post_process_matched_download_with_verification: Callable
def try_staging_match(task_id, batch_id, track, deps: StagingDeps):
"""Check if a matching file exists in the staging folder before downloading.
Returns True if a match was found and the file was moved to the transfer folder.
Returns False to fall through to normal download.
"""
staging_files = deps.get_staging_file_cache(batch_id or task_id)
if not staging_files:
return False
track_title = track.name or ''
track_artist = track.artists[0] if track.artists else ''
if not track_title:
return False
from difflib import SequenceMatcher
normalize = deps.matching_engine.normalize_string
norm_title = normalize(track_title)
norm_artist = normalize(track_artist)
best_match = None
best_score = 0.0
for sf in staging_files:
sf_norm_title = normalize(sf['title'])
sf_norm_artist = normalize(sf['artist'])
if not sf_norm_title:
continue
# Title similarity (primary)
title_sim = SequenceMatcher(None, norm_title, sf_norm_title).ratio()
if title_sim < 0.80:
continue
# Artist similarity (secondary)
artist_sim = 0.0
if norm_artist and sf_norm_artist:
artist_sim = SequenceMatcher(None, norm_artist, sf_norm_artist).ratio()
elif not norm_artist and not sf_norm_artist:
artist_sim = 0.5 # Both unknown — neutral
elif norm_artist and not sf_norm_artist:
artist_sim = 0.3 # Staging file lacks artist — partial credit if title is strong
elif sf_norm_artist and not norm_artist:
artist_sim = 0.3 # Track lacks artist — same partial credit
# Combined score: title-weighted (these are user-curated staging files)
# If artist info is available, require it to match. If not, lean on title.
if norm_artist and sf_norm_artist:
combined = (title_sim * 0.55) + (artist_sim * 0.45)
else:
combined = (title_sim * 0.80) + (artist_sim * 0.20)
if combined > best_score:
best_score = combined
best_match = sf
# Require high confidence to avoid false positives
if not best_match or best_score < 0.75:
return False
logger.info(f"[Staging] Match found for '{track_title}' by '{track_artist}': "
f"{os.path.basename(best_match['full_path'])} (score: {best_score:.2f})")
# Copy the file to the transfer folder
try:
transfer_dir = deps.docker_resolve_path(deps.config_manager.get('soulseek.transfer_path', './Transfer'))
dest_filename = os.path.basename(best_match['full_path'])
dest_path = os.path.join(transfer_dir, dest_filename)
os.makedirs(transfer_dir, exist_ok=True)
# Don't overwrite existing files
if os.path.exists(dest_path):
base, ext = os.path.splitext(dest_filename)
dest_path = os.path.join(transfer_dir, f"{base}_staging{ext}")
import shutil
shutil.copy2(best_match['full_path'], dest_path)
logger.info(f"[Staging] Copied to transfer: {dest_path}")
# Mark task as completed with staging context
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'post_processing'
download_tasks[task_id]['filename'] = dest_path
download_tasks[task_id]['username'] = 'staging'
download_tasks[task_id]['staging_match'] = True
# Run post-processing (tagging, AcoustID verification, path building)
context_key = f"staging_{task_id}"
with tasks_lock:
track_info = download_tasks.get(task_id, {}).get('track_info', {})
if not isinstance(track_info, dict):
track_info = {}
# Build spotify_artist / spotify_album context so post-processing can apply
# the path template. Without these, _post_process_matched_download returns
# early and the file stays at the transfer root with its original filename.
# Mirror the context-building logic from the sync modal worker.
has_explicit_context = track_info.get('_is_explicit_album_download', False)
if has_explicit_context:
explicit_artist = track_info.get('_explicit_artist_context', {})
if isinstance(explicit_artist, str):
explicit_artist = {'name': explicit_artist}
elif not isinstance(explicit_artist, dict):
explicit_artist = {}
spotify_artist_ctx = {
'id': explicit_artist.get('id', 'staging'),
'name': explicit_artist.get('name', track_artist),
'genres': explicit_artist.get('genres', [])
}
explicit_album = track_info.get('_explicit_album_context', {})
if not isinstance(explicit_album, dict):
explicit_album = {}
_album_image_url = explicit_album.get('image_url')
if not _album_image_url and explicit_album.get('images'):
_imgs = explicit_album['images']
if isinstance(_imgs, list) and _imgs:
_album_image_url = _imgs[0].get('url') if isinstance(_imgs[0], dict) else None
spotify_album_ctx = {
'id': explicit_album.get('id', 'staging'),
'name': explicit_album.get('name', getattr(track, 'album', '') or ''),
'release_date': explicit_album.get('release_date', ''),
'image_url': _album_image_url,
'album_type': explicit_album.get('album_type', 'album'),
'total_tracks': explicit_album.get('total_tracks', 0),
'total_discs': explicit_album.get('total_discs', 1),
'artists': explicit_album.get('artists', [{'name': spotify_artist_ctx.get('name', '')}])
}
is_album_ctx = True
has_clean_data = True
else:
fallback_album = track_info.get('album', {})
if isinstance(fallback_album, str):
fallback_album = {'name': fallback_album}
elif not isinstance(fallback_album, dict):
fallback_album = {}
track_album_name = getattr(track, 'album', '') or fallback_album.get('name', '') or ''
spotify_artist_ctx = {
'id': 'staging',
'name': track_artist or 'Unknown',
'genres': []
}
spotify_album_ctx = {
'id': 'staging',
'name': track_album_name,
'release_date': fallback_album.get('release_date', ''),
'image_url': fallback_album.get('image_url'),
'album_type': fallback_album.get('album_type', 'album'),
'total_tracks': fallback_album.get('total_tracks', 0),
'total_discs': fallback_album.get('total_discs', 1),
'artists': [{'name': track_artist}] if track_artist else []
}
is_album_ctx = bool(
track_album_name and
track_album_name.strip() and
track_album_name.lower() not in ('unknown album', '') and
track_album_name.lower() != track_title.lower()
)
has_clean_data = bool(track_title and track_artist and track_album_name)
track_number = (
track_info.get('track_number', 0) or
getattr(track, 'track_number', 0) or 0
)
disc_number = (
track_info.get('disc_number', 1) or
getattr(track, 'disc_number', 1) or 1
)
context = {
'track_info': track_info,
'spotify_artist': spotify_artist_ctx,
'spotify_album': spotify_album_ctx,
'original_search_result': {
'title': track_title,
'artist': track_artist,
'spotify_clean_title': track_title,
'spotify_clean_album': spotify_album_ctx.get('name', ''),
'spotify_clean_artist': track_artist,
'track_number': track_number,
'disc_number': disc_number,
},
'is_album_download': is_album_ctx,
'has_clean_spotify_data': has_clean_data,
'staging_source': True,
}
# Store context in the matched downloads context store (used by post-processing)
with matched_context_lock:
matched_downloads_context[context_key] = context
# Trigger post-processing which handles tagging, path building, and DB insertion
deps.post_process_matched_download_with_verification(context_key, context, dest_path, task_id, batch_id)
return True
except Exception as e:
logger.error(f"[Staging] Failed to use staging file: {e}")
return False

@ -0,0 +1,293 @@
"""Tests for core/downloads/staging.py — staging-folder match shortcut."""
from __future__ import annotations
import os
from dataclasses import dataclass
import pytest
from core.downloads import staging as ds
from core.runtime_state import (
download_tasks,
matched_context_lock,
matched_downloads_context,
)
# ---------------------------------------------------------------------------
# Fixtures + fakes
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_state():
download_tasks.clear()
matched_downloads_context.clear()
yield
download_tasks.clear()
matched_downloads_context.clear()
@dataclass
class _Track:
name: str = 'Hello'
artists: list = None
album: str = 'Album'
def __post_init__(self):
if self.artists is None:
self.artists = ['Artist One']
class _FakeMatchingEngine:
@staticmethod
def normalize_string(s):
return (s or '').lower().strip()
class _FakeConfig:
def __init__(self, transfer_path):
self._transfer_path = transfer_path
def get(self, key, default=None):
if key == 'soulseek.transfer_path':
return self._transfer_path
return default
def _build_deps(
*,
transfer_path,
staging_files=None,
post_process_calls=None,
):
post_process_calls = post_process_calls if post_process_calls is not None else []
deps = ds.StagingDeps(
config_manager=_FakeConfig(transfer_path),
matching_engine=_FakeMatchingEngine(),
get_staging_file_cache=lambda batch_id: staging_files or [],
docker_resolve_path=lambda p: p, # passthrough
post_process_matched_download_with_verification=lambda *a, **kw: post_process_calls.append((a, kw)),
)
deps._post_process_calls = post_process_calls
return deps
def _seed_task(task_id, *, track_info=None):
download_tasks[task_id] = {
'status': 'searching',
'track_info': track_info or {},
'used_sources': set(),
'download_id': None,
}
# ---------------------------------------------------------------------------
# No staging files / no match
# ---------------------------------------------------------------------------
def test_no_staging_files_returns_false(tmp_path):
deps = _build_deps(transfer_path=str(tmp_path), staging_files=[])
_seed_task('t1')
result = ds.try_staging_match('t1', 'b1', _Track(), deps)
assert result is False
def test_no_track_title_returns_false(tmp_path):
deps = _build_deps(transfer_path=str(tmp_path), staging_files=[
{'full_path': str(tmp_path / 'src.flac'), 'title': 'Hello', 'artist': 'Artist One'},
])
_seed_task('t2')
track = _Track(name='')
result = ds.try_staging_match('t2', 'b1', track, deps)
assert result is False
def test_low_confidence_match_returns_false(tmp_path):
"""Match below 0.75 combined score → fall through."""
deps = _build_deps(transfer_path=str(tmp_path), staging_files=[
{'full_path': str(tmp_path / 'src.flac'),
'title': 'Completely Different Song',
'artist': 'Different Artist'},
])
_seed_task('t3')
result = ds.try_staging_match('t3', 'b1', _Track(name='Hello'), deps)
assert result is False
# ---------------------------------------------------------------------------
# High-confidence match — file copy + post-processing
# ---------------------------------------------------------------------------
def test_exact_match_copies_to_transfer_and_marks_post_processing(tmp_path):
"""High-confidence match → file copied, task → post_processing, post-proc invoked."""
src_file = tmp_path / 'staging' / 'Hello.flac'
src_file.parent.mkdir()
src_file.write_bytes(b'fake audio')
transfer_dir = tmp_path / 'transfer'
deps = _build_deps(
transfer_path=str(transfer_dir),
staging_files=[
{'full_path': str(src_file), 'title': 'Hello', 'artist': 'Artist One'},
],
)
_seed_task('t4')
result = ds.try_staging_match('t4', 'b1', _Track(), deps)
assert result is True
# File copied
assert (transfer_dir / 'Hello.flac').exists()
# Task transitioned to post_processing
assert download_tasks['t4']['status'] == 'post_processing'
assert download_tasks['t4']['username'] == 'staging'
assert download_tasks['t4']['staging_match'] is True
# Post-processing invoked
assert len(deps._post_process_calls) == 1
args, _ = deps._post_process_calls[0]
context_key = args[0]
assert context_key == 'staging_t4'
def test_existing_file_in_transfer_gets_staging_suffix(tmp_path):
"""If destination already exists, suffix '_staging' added to avoid overwrite."""
src_file = tmp_path / 'staging' / 'Hello.flac'
src_file.parent.mkdir()
src_file.write_bytes(b'new audio')
transfer_dir = tmp_path / 'transfer'
transfer_dir.mkdir()
# Existing file with same name in transfer dir
(transfer_dir / 'Hello.flac').write_bytes(b'old audio')
deps = _build_deps(
transfer_path=str(transfer_dir),
staging_files=[
{'full_path': str(src_file), 'title': 'Hello', 'artist': 'Artist One'},
],
)
_seed_task('t5')
result = ds.try_staging_match('t5', 'b1', _Track(), deps)
assert result is True
# Original file untouched
assert (transfer_dir / 'Hello.flac').read_bytes() == b'old audio'
# New file has _staging suffix
assert (transfer_dir / 'Hello_staging.flac').exists()
assert (transfer_dir / 'Hello_staging.flac').read_bytes() == b'new audio'
# ---------------------------------------------------------------------------
# Context building
# ---------------------------------------------------------------------------
def test_explicit_album_context_uses_real_data(tmp_path):
"""track_info with _is_explicit_album_download=True copies real album/artist context."""
src_file = tmp_path / 'staging' / 'Hello.flac'
src_file.parent.mkdir()
src_file.touch()
explicit_album = {'id': 'alb-real', 'name': 'Real Album', 'release_date': '2024-05-05',
'total_tracks': 12, 'total_discs': 2, 'album_type': 'album',
'image_url': 'http://img/a.jpg'}
explicit_artist = {'id': 'art-real', 'name': 'Real Artist'}
deps = _build_deps(
transfer_path=str(tmp_path / 'transfer'),
staging_files=[
{'full_path': str(src_file), 'title': 'Hello', 'artist': 'Real Artist'},
],
)
_seed_task('t6', track_info={
'_is_explicit_album_download': True,
'_explicit_album_context': explicit_album,
'_explicit_artist_context': explicit_artist,
'track_number': 5,
'disc_number': 2,
})
ds.try_staging_match('t6', 'b1', _Track(name='Hello', artists=['Real Artist']), deps)
ctx = matched_downloads_context['staging_t6']
assert ctx['spotify_album']['id'] == 'alb-real'
assert ctx['spotify_album']['total_discs'] == 2
assert ctx['spotify_artist']['id'] == 'art-real'
assert ctx['is_album_download'] is True
assert ctx['has_clean_spotify_data'] is True
assert ctx['staging_source'] is True
def test_fallback_context_synthesizes_from_track(tmp_path):
"""Without explicit context, synthesizes spotify_artist/album from the track."""
src_file = tmp_path / 'staging' / 'Hello.flac'
src_file.parent.mkdir()
src_file.touch()
deps = _build_deps(
transfer_path=str(tmp_path / 'transfer'),
staging_files=[
{'full_path': str(src_file), 'title': 'Hello', 'artist': 'Artist One'},
],
)
_seed_task('t7')
ds.try_staging_match('t7', 'b1', _Track(name='Hello', album='Some Album'), deps)
ctx = matched_downloads_context['staging_t7']
assert ctx['spotify_artist']['id'] == 'staging'
assert ctx['spotify_artist']['name'] == 'Artist One'
assert ctx['spotify_album']['id'] == 'staging'
assert ctx['spotify_album']['name'] == 'Some Album'
assert ctx['is_album_download'] is True # album differs from title
def test_album_same_as_title_not_treated_as_album(tmp_path):
"""When track album == title, is_album_download stays False."""
src_file = tmp_path / 'staging' / 'Hello.flac'
src_file.parent.mkdir()
src_file.touch()
deps = _build_deps(
transfer_path=str(tmp_path / 'transfer'),
staging_files=[
{'full_path': str(src_file), 'title': 'Hello', 'artist': 'Artist One'},
],
)
_seed_task('t8')
# album == name → single-track release pattern
ds.try_staging_match('t8', 'b1', _Track(name='Hello', album='Hello'), deps)
ctx = matched_downloads_context['staging_t8']
assert ctx['is_album_download'] is False
# ---------------------------------------------------------------------------
# Error path
# ---------------------------------------------------------------------------
def test_copy_failure_returns_false(tmp_path):
"""If shutil.copy2 raises (e.g., source vanished), returns False, no post-proc invoked."""
# Source path that doesn't exist → copy2 raises FileNotFoundError
deps = _build_deps(
transfer_path=str(tmp_path / 'transfer'),
staging_files=[
{'full_path': str(tmp_path / 'staging' / 'missing.flac'),
'title': 'Hello', 'artist': 'Artist One'},
],
)
_seed_task('t9')
result = ds.try_staging_match('t9', 'b1', _Track(), deps)
assert result is False
assert deps._post_process_calls == []

@ -20221,207 +20221,24 @@ def _get_staging_file_cache(batch_id):
return files
def _try_staging_match(task_id, batch_id, track):
"""Check if a matching file exists in the staging folder before downloading.
Returns True if a match was found and the file was moved to the transfer folder.
Returns False to fall through to normal download.
"""
staging_files = _get_staging_file_cache(batch_id or task_id)
if not staging_files:
return False
track_title = track.name or ''
track_artist = track.artists[0] if track.artists else ''
if not track_title:
return False
from difflib import SequenceMatcher
normalize = matching_engine.normalize_string
norm_title = normalize(track_title)
norm_artist = normalize(track_artist)
best_match = None
best_score = 0.0
for sf in staging_files:
sf_norm_title = normalize(sf['title'])
sf_norm_artist = normalize(sf['artist'])
if not sf_norm_title:
continue
# Title similarity (primary)
title_sim = SequenceMatcher(None, norm_title, sf_norm_title).ratio()
if title_sim < 0.80:
continue
# Artist similarity (secondary)
artist_sim = 0.0
if norm_artist and sf_norm_artist:
artist_sim = SequenceMatcher(None, norm_artist, sf_norm_artist).ratio()
elif not norm_artist and not sf_norm_artist:
artist_sim = 0.5 # Both unknown — neutral
elif norm_artist and not sf_norm_artist:
artist_sim = 0.3 # Staging file lacks artist — partial credit if title is strong
elif sf_norm_artist and not norm_artist:
artist_sim = 0.3 # Track lacks artist — same partial credit
# Combined score: title-weighted (these are user-curated staging files)
# If artist info is available, require it to match. If not, lean on title.
if norm_artist and sf_norm_artist:
combined = (title_sim * 0.55) + (artist_sim * 0.45)
else:
combined = (title_sim * 0.80) + (artist_sim * 0.20)
if combined > best_score:
best_score = combined
best_match = sf
# Require high confidence to avoid false positives
if not best_match or best_score < 0.75:
return False
logger.info(f"[Staging] Match found for '{track_title}' by '{track_artist}': "
f"{os.path.basename(best_match['full_path'])} (score: {best_score:.2f})")
# Copy the file to the transfer folder
try:
transfer_dir = docker_resolve_path(config_manager.get('soulseek.transfer_path', './Transfer'))
dest_filename = os.path.basename(best_match['full_path'])
dest_path = os.path.join(transfer_dir, dest_filename)
os.makedirs(transfer_dir, exist_ok=True)
# Staging-folder match shortcut lives in core/downloads/staging.py.
from core.downloads import staging as _downloads_staging
# Don't overwrite existing files
if os.path.exists(dest_path):
base, ext = os.path.splitext(dest_filename)
dest_path = os.path.join(transfer_dir, f"{base}_staging{ext}")
import shutil
shutil.copy2(best_match['full_path'], dest_path)
logger.info(f"[Staging] Copied to transfer: {dest_path}")
# Mark task as completed with staging context
with tasks_lock:
if task_id in download_tasks:
download_tasks[task_id]['status'] = 'post_processing'
download_tasks[task_id]['filename'] = dest_path
download_tasks[task_id]['username'] = 'staging'
download_tasks[task_id]['staging_match'] = True
# Run post-processing (tagging, AcoustID verification, path building)
context_key = f"staging_{task_id}"
with tasks_lock:
track_info = download_tasks.get(task_id, {}).get('track_info', {})
if not isinstance(track_info, dict):
track_info = {}
# Build spotify_artist / spotify_album context so post-processing can apply
# the path template. Without these, _post_process_matched_download returns
# early and the file stays at the transfer root with its original filename.
# Mirror the context-building logic from the sync modal worker.
has_explicit_context = track_info.get('_is_explicit_album_download', False)
if has_explicit_context:
explicit_artist = track_info.get('_explicit_artist_context', {})
if isinstance(explicit_artist, str):
explicit_artist = {'name': explicit_artist}
elif not isinstance(explicit_artist, dict):
explicit_artist = {}
spotify_artist_ctx = {
'id': explicit_artist.get('id', 'staging'),
'name': explicit_artist.get('name', track_artist),
'genres': explicit_artist.get('genres', [])
}
explicit_album = track_info.get('_explicit_album_context', {})
if not isinstance(explicit_album, dict):
explicit_album = {}
_album_image_url = explicit_album.get('image_url')
if not _album_image_url and explicit_album.get('images'):
_imgs = explicit_album['images']
if isinstance(_imgs, list) and _imgs:
_album_image_url = _imgs[0].get('url') if isinstance(_imgs[0], dict) else None
spotify_album_ctx = {
'id': explicit_album.get('id', 'staging'),
'name': explicit_album.get('name', getattr(track, 'album', '') or ''),
'release_date': explicit_album.get('release_date', ''),
'image_url': _album_image_url,
'album_type': explicit_album.get('album_type', 'album'),
'total_tracks': explicit_album.get('total_tracks', 0),
'total_discs': explicit_album.get('total_discs', 1),
'artists': explicit_album.get('artists', [{'name': spotify_artist_ctx.get('name', '')}])
}
is_album_ctx = True
has_clean_data = True
else:
fallback_album = track_info.get('album', {})
if isinstance(fallback_album, str):
fallback_album = {'name': fallback_album}
elif not isinstance(fallback_album, dict):
fallback_album = {}
track_album_name = getattr(track, 'album', '') or fallback_album.get('name', '') or ''
spotify_artist_ctx = {
'id': 'staging',
'name': track_artist or 'Unknown',
'genres': []
}
spotify_album_ctx = {
'id': 'staging',
'name': track_album_name,
'release_date': fallback_album.get('release_date', ''),
'image_url': fallback_album.get('image_url'),
'album_type': fallback_album.get('album_type', 'album'),
'total_tracks': fallback_album.get('total_tracks', 0),
'total_discs': fallback_album.get('total_discs', 1),
'artists': [{'name': track_artist}] if track_artist else []
}
is_album_ctx = bool(
track_album_name and
track_album_name.strip() and
track_album_name.lower() not in ('unknown album', '') and
track_album_name.lower() != track_title.lower()
)
has_clean_data = bool(track_title and track_artist and track_album_name)
track_number = (
track_info.get('track_number', 0) or
getattr(track, 'track_number', 0) or 0
)
disc_number = (
track_info.get('disc_number', 1) or
getattr(track, 'disc_number', 1) or 1
)
context = {
'track_info': track_info,
'spotify_artist': spotify_artist_ctx,
'spotify_album': spotify_album_ctx,
'original_search_result': {
'title': track_title,
'artist': track_artist,
'spotify_clean_title': track_title,
'spotify_clean_album': spotify_album_ctx.get('name', ''),
'spotify_clean_artist': track_artist,
'track_number': track_number,
'disc_number': disc_number,
},
'is_album_download': is_album_ctx,
'has_clean_spotify_data': has_clean_data,
'staging_source': True,
}
def _build_staging_deps():
"""Build the StagingDeps bundle from web_server.py globals on each call."""
return _downloads_staging.StagingDeps(
config_manager=config_manager,
matching_engine=matching_engine,
get_staging_file_cache=_get_staging_file_cache,
docker_resolve_path=docker_resolve_path,
post_process_matched_download_with_verification=_post_process_matched_download_with_verification,
)
# Store context in the matched downloads context store (used by post-processing)
with matched_context_lock:
matched_downloads_context[context_key] = context
# Trigger post-processing which handles tagging, path building, and DB insertion
_post_process_matched_download_with_verification(context_key, context, dest_path, task_id, batch_id)
return True
def _try_staging_match(task_id, batch_id, track):
return _downloads_staging.try_staging_match(task_id, batch_id, track, _build_staging_deps())
except Exception as e:
logger.error(f"[Staging] Failed to use staging file: {e}")
return False
def _try_source_reuse(task_id, batch_id, track):

Loading…
Cancel
Save