refactor(downloads): extract album_bundle shared helpers + atomic copy

Per code review: the album-bundle helpers (release picker + staging
collision suffix) were defined as private symbols in torrent.py and
imported by usenet.py through ``from core.download_plugins.torrent
import _pick_best_album_release, _unique_staging_path``. Sibling
plugins shouldn't reach into each other's private surface — leaky
module boundary, and the underscore prefix says don't import.

Also addressed two latent issues at the same time:

- The Auto-Import sweep race: my plugin copied audio files into
  staging via plain ``shutil.copy2``, which exposes a partial file
  at the audio extension for the duration of the copy. The Auto-
  Import worker filters by audio extension when scanning Staging
  (AUDIO_EXTENSIONS in core/auto_import_worker.py), so a mid-flight
  scan could pick up a truncated file. Fix: copy to a
  ``.tmp.<random>`` sidecar first, then atomically rename via
  ``Path.replace`` (which is ``os.replace`` — atomic on the same
  filesystem). Auto-Import sees the file either at its final name
  or not at all.

- The 6-hour poll timeout was a hard-coded magic constant. Users
  with slow private trackers or large box sets would silently time
  out after 6h. Both the timeout and the poll interval are now
  read from config (``download_source.album_bundle_timeout_seconds``
  / ``..._poll_interval_seconds``) with safe fallback to the
  existing defaults when unset / non-numeric.

- core/download_plugins/album_bundle.py: new module owns the
  shared surface — ``pick_best_album_release`` (with quality_guess
  passed in as a parameter to avoid the circular import that would
  result from this module trying to know about torrent.py's title
  parser), ``unique_staging_path``, ``atomic_copy_to_staging``,
  ``copy_audio_files_atomically``, ``get_poll_interval``,
  ``get_poll_timeout``. Module-level size constants and quality
  weights live here too. Usenet's grabs-as-popularity-proxy is
  built into the picker so both plugins get the right behavior
  without divergent local logic.
- core/download_plugins/torrent.py: drops the local helpers + the
  hard-coded poll constants, imports from album_bundle. Per-track
  download flow still uses module-level ``_POLL_TIMEOUT_SECONDS``
  / ``_POLL_INTERVAL_SECONDS`` aliases (read from config once at
  import time, same as before from a per-track perspective).
- core/download_plugins/usenet.py: drops the imports of the
  torrent.py private helpers; everything goes through album_bundle
  now. Stops the cross-plugin private-import leak that started
  this whole refactor.
- tests/test_album_bundle.py: 23 new tests covering the picker
  heuristic (empty input, singleton drop, FLAC preference, grabs
  fallback for usenet, size-floor / ceiling boundaries), the
  collision-suffix logic, the atomic-copy invariant (concurrent
  scanner thread asserts it never observes a partial audio file
  during five sequential copies), the failure-skip behavior of the
  batch copier, and the config-driven poll cadence including
  garbage-input fallback.
- tests/test_torrent_usenet_plugins.py: existing picker tests
  updated to call the new module-level helpers instead of the
  former torrent.py privates.
pull/671/head
Broque Thomas 5 days ago
parent 8975031e3a
commit 670a2db95e

@ -0,0 +1,217 @@
"""Shared helpers for the album-bundle download flow.
The torrent and usenet download plugins both implement a
``download_album_to_staging`` method that searches Prowlarr for a
whole release, hands it to the active downloader, walks the
resulting audio files, and copies them into the staging folder. The
two implementations share the same release-picker heuristic and the
same staging-path collision logic.
Pulled out of ``core/download_plugins/torrent.py`` so the usenet
plugin doesn't have to import private helpers from a sibling
plugin (Cin's "no leaky module boundaries" standard).
Also exposes ``atomic_copy_to_staging`` the audio file is copied
to a ``.tmp.<random>`` sidecar first and atomically renamed onto its
final extension. The Auto-Import worker filters by audio extension
so the in-flight ``.tmp`` file is never picked up mid-copy, closing
the race between the album-bundle copy loop and Auto-Import's
folder scan.
"""
from __future__ import annotations
import shutil
import time
import uuid
from pathlib import Path
from typing import Iterable, Optional
from config.settings import config_manager
from utils.logging_config import get_logger
logger = get_logger("download_plugins.album_bundle")
# Album-pick size floor / ceiling. Single-track torrents (~10 MB)
# are rejected when bigger candidates exist; anything past 3 GB is
# treated as suspicious (multi-disc box-set + scans + extras).
ALBUM_PICK_MIN_BYTES = 40 * 1024 * 1024
ALBUM_PICK_MAX_BYTES = 3 * 1024 * 1024 * 1024
# Quality-score weights for the album-pick heuristic. Mirrors the
# tier order in ``core/imports/file_ops.py``'s ``quality_tiers`` —
# higher number = preferred.
_QUALITY_SCORE = {'flac': 4, 'ogg': 3, 'aac': 2, 'mp3': 1}
# Default poll cadence + timeout for the album-download poll loop.
# Both are overridable through config so users with slow trackers
# / large box-sets can extend the deadline without editing code.
DEFAULT_POLL_INTERVAL_SECONDS = 2.0
DEFAULT_POLL_TIMEOUT_SECONDS = 6 * 60 * 60
def get_poll_interval() -> float:
"""Return the per-poll sleep duration (seconds). Configurable via
``download_source.album_bundle_poll_interval_seconds``."""
raw = config_manager.get('download_source.album_bundle_poll_interval_seconds',
DEFAULT_POLL_INTERVAL_SECONDS)
try:
value = float(raw)
if value > 0:
return value
except (TypeError, ValueError):
pass
return DEFAULT_POLL_INTERVAL_SECONDS
def get_poll_timeout() -> float:
"""Return the total deadline for an album-bundle download
(seconds). Configurable via
``download_source.album_bundle_timeout_seconds``."""
raw = config_manager.get('download_source.album_bundle_timeout_seconds',
DEFAULT_POLL_TIMEOUT_SECONDS)
try:
value = float(raw)
if value > 0:
return value
except (TypeError, ValueError):
pass
return DEFAULT_POLL_TIMEOUT_SECONDS
def quality_score(title: str, quality_guess) -> int:
"""Map a release title's inferred quality to a sortable integer.
``quality_guess`` is the function from each plugin that maps a
title string to a quality string ('flac' / 'mp3' / etc.) passed
in so this module doesn't have to import either plugin and risk
a circular import."""
return _QUALITY_SCORE.get(quality_guess(title) or '', 0)
def pick_best_album_release(candidates, quality_guess) -> Optional[object]:
"""Pick the single best torrent / NZB for an album-bundle download.
Heuristic, in priority order:
1. Reasonable album-ish size (40 MB 3 GB) drops single-track
releases that snuck in and quarantines suspicious giants.
2. Higher seeders > lower (dead torrents = dead downloads).
Usenet releases use ``grabs`` as a popularity proxy when
seeders is None.
3. Higher quality (FLAC > AAC > MP3) inferred from title.
4. Larger size as tiebreaker (often = higher bitrate).
"""
if not candidates:
return None
sized = [c for c in candidates
if ALBUM_PICK_MIN_BYTES <= (c.size or 0) <= ALBUM_PICK_MAX_BYTES]
pool = sized or list(candidates)
if not pool:
return None
def _score(c) -> tuple:
seeders = c.seeders if c.seeders is not None else (c.grabs or 0)
return (seeders, quality_score(c.title or '', quality_guess), c.size or 0)
return max(pool, key=_score)
def unique_staging_path(staging_dir: Path, src: Path) -> Path:
"""Return a destination path inside ``staging_dir`` that doesn't
collide with an existing file. Appends ``_1``, ``_2``, ... before
the extension when needed; gives up after 1000 candidates and
returns the unsuffixed path so the caller will overwrite (better
than infinite loop or crash)."""
dest = staging_dir / src.name
if not dest.exists():
return dest
stem = dest.stem
suffix = dest.suffix
for i in range(1, 1000):
candidate = staging_dir / f"{stem}_{i}{suffix}"
if not candidate.exists():
return candidate
return dest
def atomic_copy_to_staging(src: Path, dest: Path) -> bool:
"""Copy ``src`` to ``dest`` without exposing a partial file to
folder scanners.
The Auto-Import worker filters by audio extension when scanning
Staging see ``AUDIO_EXTENSIONS`` in ``core/auto_import_worker.py``.
Naming the in-flight file ``<dest>.tmp.<random>`` keeps it
invisible until the rename atomically swings it to its final
extension. ``os.replace`` (used by ``Path.rename`` on Python 3.x)
is atomic on the same filesystem, so Auto-Import either sees the
file at its final name (complete) or doesn't see it at all
(in flight).
Returns True on success, False on copy / rename failure. Caller
is expected to log the failure case so we don't double-log here.
"""
tmp = dest.with_name(f"{dest.name}.tmp.{uuid.uuid4().hex[:8]}")
try:
shutil.copy2(src, tmp)
except Exception:
# Best-effort cleanup of the partial file. If unlink fails
# (locked, permissions) we leave it — Auto-Import ignores it
# anyway because of the .tmp extension.
try:
if tmp.exists():
tmp.unlink()
except Exception as cleanup_exc:
logger.debug("album_bundle tmp cleanup failed: %s", cleanup_exc)
raise
try:
tmp.replace(dest)
return True
except Exception:
try:
tmp.unlink(missing_ok=True)
except Exception as cleanup_exc:
logger.debug("album_bundle tmp cleanup failed: %s", cleanup_exc)
raise
def copy_audio_files_atomically(
sources: Iterable[Path], staging_dir: Path,
) -> list:
"""Convenience wrapper: pick a non-colliding staging path for
each source, copy via ``atomic_copy_to_staging``. Returns the
list of final destination paths (as strings). Files that fail
to copy are logged and skipped; the caller decides what to do
with a partial result."""
staging_dir.mkdir(parents=True, exist_ok=True)
out: list = []
for src in sources:
dest = unique_staging_path(staging_dir, src)
try:
atomic_copy_to_staging(src, dest)
out.append(str(dest))
except Exception as e:
logger.warning("[album_bundle] Failed to stage %s -> %s: %s", src, dest, e)
return out
# Re-export so callers don't have to remember which module owns
# what. The ``time`` import is kept so plugins can ``from
# core.download_plugins.album_bundle import time`` if they want to,
# avoiding a second std-lib import line for a single use.
__all__ = [
"ALBUM_PICK_MIN_BYTES",
"ALBUM_PICK_MAX_BYTES",
"DEFAULT_POLL_INTERVAL_SECONDS",
"DEFAULT_POLL_TIMEOUT_SECONDS",
"atomic_copy_to_staging",
"copy_audio_files_atomically",
"get_poll_interval",
"get_poll_timeout",
"pick_best_album_release",
"quality_score",
"time",
"unique_staging_path",
]

@ -49,7 +49,6 @@ from __future__ import annotations
import asyncio
import re
import shutil
import threading
import time
import uuid
@ -58,6 +57,12 @@ from typing import Any, Dict, List, Optional, Tuple
from config.settings import config_manager
from core.archive_pipeline import collect_audio_after_extraction
from core.download_plugins.album_bundle import (
copy_audio_files_atomically,
get_poll_interval,
get_poll_timeout,
pick_best_album_release,
)
from core.download_plugins.base import DownloadSourcePlugin
from core.download_plugins.types import AlbumResult, DownloadStatus, TrackResult
from core.prowlarr_client import (
@ -83,14 +88,13 @@ _FILENAME_SEP = '||'
# don't want to keep sharing.
_COMPLETE_STATES = frozenset(['seeding', 'completed'])
# Max seconds the poll thread keeps watching one download before
# giving up. 6 hours covers slow private trackers without leaking
# threads forever on dead torrents.
_POLL_TIMEOUT_SECONDS = 6 * 60 * 60
# Poll cadence — torrent state changes slowly; no point hammering
# the WebUI more than once a second.
_POLL_INTERVAL_SECONDS = 2.0
# Poll cadence / timeout — both pull from config via the shared
# album_bundle helpers so users can extend the deadline for slow
# trackers without editing source. Kept as module aliases so the
# per-track flow at the bottom of this file can still import them
# under the legacy names without re-reading config every loop.
_POLL_TIMEOUT_SECONDS = get_poll_timeout()
_POLL_INTERVAL_SECONDS = get_poll_interval()
class TorrentDownloadPlugin(DownloadSourcePlugin):
@ -466,7 +470,7 @@ class TorrentDownloadPlugin(DownloadSourcePlugin):
result['error'] = f'No torrent results found for "{query}"'
return result
picked = _pick_best_album_release(candidates)
picked = pick_best_album_release(candidates, _guess_quality_from_title)
if picked is None:
result['error'] = 'No suitable torrent candidate after filtering'
return result
@ -504,16 +508,7 @@ class TorrentDownloadPlugin(DownloadSourcePlugin):
result['error'] = f'No audio files found in {save_path}'
return result
staging_path = Path(staging_dir)
staging_path.mkdir(parents=True, exist_ok=True)
copied: List[str] = []
for src in audio_files:
dst = _unique_staging_path(staging_path, src)
try:
shutil.copy2(src, dst)
copied.append(str(dst))
except Exception as e:
logger.warning("[Torrent album] Failed to copy %s -> %s: %s", src, dst, e)
copied = copy_audio_files_atomically(audio_files, Path(staging_dir))
if not copied:
result['error'] = 'No audio files copied to staging'
return result
@ -553,57 +548,6 @@ class TorrentDownloadPlugin(DownloadSourcePlugin):
return None
# ---------------------------------------------------------------------------
# Album-pick helpers
# ---------------------------------------------------------------------------
_QUALITY_SCORE = {'flac': 4, 'ogg': 3, 'aac': 2, 'mp3': 1}
def _pick_best_album_release(candidates) -> Optional[Any]:
"""Pick the single best torrent for an album-bundle download.
Heuristic, in priority order:
1. Reasonable album-ish size (40 MB 3 GB) drops single-track
releases that snuck in and quarantines suspicious giants.
2. Higher seeders > lower (dead torrents = dead downloads).
3. Higher quality (FLAC > AAC > MP3) inferred from title.
4. Larger size as tiebreaker (often = higher bitrate).
"""
MIN_BYTES = 40 * 1024 * 1024
MAX_BYTES = 3 * 1024 * 1024 * 1024
sized = [c for c in candidates if MIN_BYTES <= (c.size or 0) <= MAX_BYTES]
pool = sized or candidates
if not pool:
return None
def _score(c) -> tuple:
seeders = c.seeders or 0
quality = _QUALITY_SCORE.get(_guess_quality_from_title(c.title), 0)
size = c.size or 0
return (seeders, quality, size)
return max(pool, key=_score)
def _unique_staging_path(staging_dir: Path, src: Path) -> Path:
"""Return a destination path inside ``staging_dir`` that doesn't
collide with an existing file. Appends ``_1``, ``_2``, etc. before
the extension when needed."""
dest = staging_dir / src.name
if not dest.exists():
return dest
stem = dest.stem
suffix = dest.suffix
for i in range(1, 1000):
candidate = staging_dir / f"{stem}_{i}{suffix}"
if not candidate.exists():
return candidate
return dest # give up — overwrite
# ---------------------------------------------------------------------------
# Module-level helpers (pure functions — easy to unit-test)
# ---------------------------------------------------------------------------

@ -14,7 +14,6 @@ module's docstring for the full pipeline rationale). Differences:
from __future__ import annotations
import shutil
import threading
import time
import uuid
@ -22,6 +21,10 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from core.archive_pipeline import collect_audio_after_extraction
from core.download_plugins.album_bundle import (
copy_audio_files_atomically,
pick_best_album_release,
)
from core.download_plugins.base import DownloadSourcePlugin
from core.download_plugins.torrent import (
_adapter_state_to_display,
@ -29,9 +32,7 @@ from core.download_plugins.torrent import (
_guess_quality_from_title,
_parse_indexer_id_filter,
_parse_release_title,
_pick_best_album_release,
_row_to_status,
_unique_staging_path,
_COMPLETE_STATES,
_FILENAME_SEP,
_POLL_INTERVAL_SECONDS,
@ -386,7 +387,7 @@ class UsenetDownloadPlugin(DownloadSourcePlugin):
result['error'] = f'No usenet results found for "{query}"'
return result
picked = _pick_best_album_release(candidates)
picked = pick_best_album_release(candidates, _guess_quality_from_title)
if picked is None:
result['error'] = 'No suitable NZB candidate after filtering'
return result
@ -420,16 +421,7 @@ class UsenetDownloadPlugin(DownloadSourcePlugin):
result['error'] = f'No audio files found in {save_path}'
return result
staging_path = Path(staging_dir)
staging_path.mkdir(parents=True, exist_ok=True)
copied: List[str] = []
for src in audio_files:
dst = _unique_staging_path(staging_path, src)
try:
shutil.copy2(src, dst)
copied.append(str(dst))
except Exception as e:
logger.warning("[Usenet album] Failed to copy %s -> %s: %s", src, dst, e)
copied = copy_audio_files_atomically(audio_files, Path(staging_dir))
if not copied:
result['error'] = 'No audio files copied to staging'
return result

@ -0,0 +1,301 @@
"""Tests for ``core/download_plugins/album_bundle.py``.
The shared helpers used by both the torrent and usenet album-bundle
flows. Pins the pick heuristic, the atomic-copy invariant
(no partial files ever visible at the audio extension), the
collision-suffix logic, and the config-driven poll cadence so a
future tweak in either plugin can't break the contract.
"""
from __future__ import annotations
import os
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from unittest.mock import patch
import pytest
from core.download_plugins.album_bundle import (
ALBUM_PICK_MAX_BYTES,
ALBUM_PICK_MIN_BYTES,
DEFAULT_POLL_INTERVAL_SECONDS,
DEFAULT_POLL_TIMEOUT_SECONDS,
atomic_copy_to_staging,
copy_audio_files_atomically,
get_poll_interval,
get_poll_timeout,
pick_best_album_release,
quality_score,
unique_staging_path,
)
# Minimal release-result shim — duck-types the fields the picker reads.
@dataclass
class _Release:
title: str
size: int
seeders: Optional[int] = None
grabs: Optional[int] = None
def _flac_quality_guess(title: str) -> str:
"""Stand-in for the plugin's title→quality function."""
t = (title or '').lower()
if 'flac' in t:
return 'flac'
if 'aac' in t:
return 'aac'
if 'ogg' in t:
return 'ogg'
return 'mp3'
# ---------------------------------------------------------------------------
# pick_best_album_release
# ---------------------------------------------------------------------------
def test_picker_returns_none_for_empty_input() -> None:
assert pick_best_album_release([], _flac_quality_guess) is None
def test_picker_drops_singletons_when_albums_present() -> None:
"""Single-track torrents under 40 MB shouldn't beat an album-sized
candidate even if the single has thousands of seeders."""
single = _Release(title='Track [MP3]', size=10_000_000, seeders=10_000)
album = _Release(title='Album [MP3]', size=120_000_000, seeders=5)
assert pick_best_album_release([single, album], _flac_quality_guess) is album
def test_picker_prefers_flac_when_tied_on_seeders() -> None:
flac = _Release(title='Album [FLAC]', size=400_000_000, seeders=50)
mp3 = _Release(title='Album [MP3]', size=130_000_000, seeders=50)
assert pick_best_album_release([flac, mp3], _flac_quality_guess) is flac
def test_picker_uses_grabs_when_seeders_is_none() -> None:
"""Usenet results have ``seeders=None`` — the picker should fall
back to ``grabs`` so popularity still drives the ranking."""
cold = _Release(title='Album A [MP3]', size=200_000_000, seeders=None, grabs=1)
popular = _Release(title='Album B [MP3]', size=200_000_000, seeders=None, grabs=999)
assert pick_best_album_release([cold, popular], _flac_quality_guess) is popular
def test_picker_falls_back_when_all_below_floor() -> None:
"""When every candidate is below the 40 MB album-size floor,
return the most-seeded one rather than None the user still
wants a download attempt."""
small_low = _Release(title='X', size=5_000_000, seeders=10)
small_high = _Release(title='Y', size=8_000_000, seeders=200)
assert pick_best_album_release([small_low, small_high], _flac_quality_guess) is small_high
def test_picker_size_floor_matches_constant() -> None:
"""If someone moves the constant the floor moves with it — pin
the relationship to catch accidental literals creeping back in."""
just_below = _Release(title='Below', size=ALBUM_PICK_MIN_BYTES - 1, seeders=999)
just_above = _Release(title='Above', size=ALBUM_PICK_MIN_BYTES + 1, seeders=1)
assert pick_best_album_release([just_below, just_above], _flac_quality_guess) is just_above
def test_picker_rejects_oversized_box_sets() -> None:
"""Anything past 3 GB drops out of the preferred pool — most likely
a multi-disc box set with scans + bonus material, not what the
user asked for."""
sane = _Release(title='Album [FLAC]', size=400_000_000, seeders=10)
box = _Release(title='Album Box [FLAC]', size=ALBUM_PICK_MAX_BYTES + 1_000_000, seeders=999)
# Sane wins even with 100x fewer seeders, because box is outside
# the preferred range.
assert pick_best_album_release([sane, box], _flac_quality_guess) is sane
# ---------------------------------------------------------------------------
# quality_score
# ---------------------------------------------------------------------------
def test_quality_score_orders_formats() -> None:
assert quality_score('Album [FLAC]', _flac_quality_guess) > quality_score('Album [MP3]', _flac_quality_guess)
assert quality_score('Album [AAC]', _flac_quality_guess) > quality_score('Album [MP3]', _flac_quality_guess)
assert quality_score('Bare title', _flac_quality_guess) == quality_score('Album [MP3]', _flac_quality_guess)
# ---------------------------------------------------------------------------
# unique_staging_path
# ---------------------------------------------------------------------------
def test_unique_staging_path_returns_natural_when_clear(tmp_path: Path) -> None:
src = tmp_path / 'src.flac'
src.write_bytes(b'fLaC')
staging = tmp_path / 'staging'
staging.mkdir()
assert unique_staging_path(staging, src) == staging / 'src.flac'
def test_unique_staging_path_suffixes_on_collision(tmp_path: Path) -> None:
src = tmp_path / 'src.flac'
src.write_bytes(b'fLaC')
staging = tmp_path / 'staging'
staging.mkdir()
(staging / 'src.flac').write_bytes(b'existing')
assert unique_staging_path(staging, src) == staging / 'src_1.flac'
def test_unique_staging_path_increments_suffix(tmp_path: Path) -> None:
src = tmp_path / 'src.flac'
src.write_bytes(b'fLaC')
staging = tmp_path / 'staging'
staging.mkdir()
(staging / 'src.flac').write_bytes(b'1')
(staging / 'src_1.flac').write_bytes(b'2')
(staging / 'src_2.flac').write_bytes(b'3')
assert unique_staging_path(staging, src) == staging / 'src_3.flac'
# ---------------------------------------------------------------------------
# atomic_copy_to_staging
# ---------------------------------------------------------------------------
def test_atomic_copy_lands_at_final_path(tmp_path: Path) -> None:
src = tmp_path / 'src.flac'
src.write_bytes(b'fLaC payload')
dest = tmp_path / 'staging' / 'track.flac'
dest.parent.mkdir()
assert atomic_copy_to_staging(src, dest) is True
assert dest.read_bytes() == b'fLaC payload'
def test_atomic_copy_leaves_no_tmp_files_after_success(tmp_path: Path) -> None:
"""The .tmp.<random> sidecar must be cleaned up by the rename —
no orphan files left behind on a successful copy."""
src = tmp_path / 'src.flac'
src.write_bytes(b'data')
dest = tmp_path / 'staging' / 'track.flac'
dest.parent.mkdir()
atomic_copy_to_staging(src, dest)
tmp_files = list(dest.parent.glob('*.tmp.*'))
assert tmp_files == []
def test_atomic_copy_never_exposes_partial_to_extension_scanner(tmp_path: Path) -> None:
"""Auto-Import filters by audio extension — the in-flight file
must NEVER be visible at its final extension until the copy is
complete. We probe this by scanning the staging dir in parallel
with the copy and assert the audio file is either absent OR
fully written.
"""
src = tmp_path / 'src.flac'
src.write_bytes(b'x' * (2 * 1024 * 1024))
dest = tmp_path / 'staging' / 'track.flac'
dest.parent.mkdir()
stop = threading.Event()
saw_partial = threading.Event()
expected_size = src.stat().st_size
def _scan_loop():
while not stop.is_set():
try:
files = [p for p in dest.parent.iterdir() if p.suffix == '.flac']
except FileNotFoundError:
continue
for fp in files:
size = fp.stat().st_size
if 0 < size < expected_size:
saw_partial.set()
return
scanner = threading.Thread(target=_scan_loop, daemon=True)
scanner.start()
try:
for i in range(5):
target = dest.with_name(f'track_{i}.flac')
atomic_copy_to_staging(src, target)
# Give the scanner a moment to drain any final scan iteration.
time.sleep(0.05)
finally:
stop.set()
scanner.join(timeout=1.0)
assert not saw_partial.is_set(), \
"Scanner observed a partial audio file — atomic copy contract broken"
def test_copy_audio_files_atomically_skips_failures(tmp_path: Path) -> None:
"""One file failing to copy shouldn't stop the rest from being
staged partial results are better than a complete bailout."""
src_a = tmp_path / 'a.flac'
src_a.write_bytes(b'a')
src_missing = tmp_path / 'does-not-exist.flac' # never created
src_c = tmp_path / 'c.flac'
src_c.write_bytes(b'c')
staging = tmp_path / 'staging'
out = copy_audio_files_atomically([src_a, src_missing, src_c], staging)
assert len(out) == 2
landed = sorted(Path(p).name for p in out)
assert landed == ['a.flac', 'c.flac']
def test_copy_audio_files_atomically_creates_staging_dir(tmp_path: Path) -> None:
src = tmp_path / 'a.flac'
src.write_bytes(b'a')
staging = tmp_path / 'nested' / 'staging' / 'dir'
out = copy_audio_files_atomically([src], staging)
assert len(out) == 1
assert staging.exists()
# ---------------------------------------------------------------------------
# Config-driven poll cadence
# ---------------------------------------------------------------------------
def test_get_poll_interval_uses_default_when_unset() -> None:
with patch('core.download_plugins.album_bundle.config_manager') as cm:
cm.get.return_value = DEFAULT_POLL_INTERVAL_SECONDS
assert get_poll_interval() == DEFAULT_POLL_INTERVAL_SECONDS
def test_get_poll_interval_honours_override() -> None:
with patch('core.download_plugins.album_bundle.config_manager') as cm:
cm.get.return_value = 5
assert get_poll_interval() == 5.0
def test_get_poll_interval_falls_back_on_garbage() -> None:
"""Non-numeric / non-positive values fall back to the default
rather than crashing the poll loop."""
with patch('core.download_plugins.album_bundle.config_manager') as cm:
cm.get.return_value = 'not-a-number'
assert get_poll_interval() == DEFAULT_POLL_INTERVAL_SECONDS
cm.get.return_value = -1
assert get_poll_interval() == DEFAULT_POLL_INTERVAL_SECONDS
def test_get_poll_timeout_uses_default_when_unset() -> None:
with patch('core.download_plugins.album_bundle.config_manager') as cm:
cm.get.return_value = DEFAULT_POLL_TIMEOUT_SECONDS
assert get_poll_timeout() == DEFAULT_POLL_TIMEOUT_SECONDS
def test_get_poll_timeout_honours_override() -> None:
"""Users with slow trackers / large box sets can extend the
deadline without touching code."""
with patch('core.download_plugins.album_bundle.config_manager') as cm:
cm.get.return_value = 86_400 # 24h
assert get_poll_timeout() == 86_400.0
def test_get_poll_timeout_falls_back_on_garbage() -> None:
with patch('core.download_plugins.album_bundle.config_manager') as cm:
cm.get.return_value = ''
assert get_poll_timeout() == DEFAULT_POLL_TIMEOUT_SECONDS
cm.get.return_value = 0
assert get_poll_timeout() == DEFAULT_POLL_TIMEOUT_SECONDS

@ -395,20 +395,22 @@ def test_plugins_conform_to_protocol() -> None:
def test_torrent_album_pick_prefers_seeded_flac(tmp_path: Path) -> None:
"""Album bundle picker prefers high-seeded FLAC over low-seeded MP3
of comparable size protects against picking a dead torrent."""
from core.download_plugins.torrent import _pick_best_album_release
from core.download_plugins.album_bundle import pick_best_album_release
from core.download_plugins.torrent import _guess_quality_from_title
flac = _make_torrent_result(title='Kendrick Lamar - GNX [FLAC]', size=400_000_000, seeders=120)
mp3 = _make_torrent_result(title='Kendrick Lamar - GNX [MP3 320]', size=120_000_000, seeders=5, guid='guid-2')
picked = _pick_best_album_release([flac, mp3])
picked = pick_best_album_release([flac, mp3], _guess_quality_from_title)
assert picked is flac
def test_torrent_album_pick_drops_too_small() -> None:
"""Single-track torrents (~10 MB) shouldn't be picked when the user
is downloading a whole album the size floor (40 MB) catches them."""
from core.download_plugins.torrent import _pick_best_album_release
from core.download_plugins.album_bundle import pick_best_album_release
from core.download_plugins.torrent import _guess_quality_from_title
single = _make_torrent_result(title='Kendrick Lamar - HUMBLE', size=10_000_000, seeders=500)
album = _make_torrent_result(title='Kendrick Lamar - DAMN [MP3]', size=120_000_000, seeders=50, guid='guid-2')
picked = _pick_best_album_release([single, album])
picked = pick_best_album_release([single, album], _guess_quality_from_title)
assert picked is album
@ -416,26 +418,27 @@ def test_torrent_album_pick_falls_back_when_all_outside_size_range() -> None:
"""If every candidate is below the floor (e.g. all results are
singles), pick the most-seeded one rather than returning None
user still wants a download even if it's a track torrent."""
from core.download_plugins.torrent import _pick_best_album_release
from core.download_plugins.album_bundle import pick_best_album_release
from core.download_plugins.torrent import _guess_quality_from_title
small_a = _make_torrent_result(title='X [MP3]', size=8_000_000, seeders=5)
small_b = _make_torrent_result(title='Y [MP3]', size=9_000_000, seeders=80, guid='guid-2')
picked = _pick_best_album_release([small_a, small_b])
picked = pick_best_album_release([small_a, small_b], _guess_quality_from_title)
assert picked is small_b
def test_unique_staging_path_handles_collision(tmp_path: Path) -> None:
from core.download_plugins.torrent import _unique_staging_path
from core.download_plugins.album_bundle import unique_staging_path
src = tmp_path / 'src' / 'track.flac'
src.parent.mkdir()
src.write_bytes(b'fLaC')
dest_dir = tmp_path / 'staging'
dest_dir.mkdir()
# First call returns the natural name.
first = _unique_staging_path(dest_dir, src)
first = unique_staging_path(dest_dir, src)
assert first == dest_dir / 'track.flac'
first.write_bytes(b'fLaC')
# Second call picks a non-colliding suffix.
second = _unique_staging_path(dest_dir, src)
second = unique_staging_path(dest_dir, src)
assert second == dest_dir / 'track_1.flac'

Loading…
Cancel
Save