mirror of https://github.com/Nezreka/SoulSync.git
Per code review: ~90 lines of inline gate logic in ``run_full_missing_tracks_process`` was inflating an already-580- line worker function and was non-testable in isolation. Lifted to ``core/downloads/album_bundle_dispatch.py`` with two entry points: - ``is_eligible(mode, is_album, album_name, artist_name)`` — pure predicate, no side effects, easy to assert against. Splits the gate decision from the resolution + run step so tests can pin the gate semantics without standing up a plugin. - ``try_dispatch(...)`` — full flow. Returns True iff the master worker should stop (gate fired + failed); False = engaged-and- succeeded OR didn't engage, both fall through to per-track. State access is now decoupled from ``runtime_state``: - New ``BatchStateAccess`` Protocol with two methods (``update_fields``, ``mark_failed``). - Concrete impl ``_BatchStateAccessImpl`` lives in master.py and wraps the tasks_lock + dict ops the original inline code did. - Injected via parameter so the dispatch module never imports ``download_batches`` / ``tasks_lock`` directly. Same goes for the plugin resolver and config getter — both injected, so the dispatcher works against any orchestrator / config implementation (including the in-test fakes). Behavior unchanged. The master worker call site is now 11 lines of boilerplate instead of 90 lines of inline conditional. Plugin contract (``download_album_to_staging`` return dict shape) unchanged. - core/downloads/album_bundle_dispatch.py: new module owning the gate + execution. ~150 lines including docstrings and the Protocol definition. - core/downloads/master.py: gate call site shrunk to a single ``if _album_bundle_dispatch.try_dispatch(...): return``. New ``_BatchStateAccessImpl`` class implements the Protocol against the existing ``download_batches`` dict + ``tasks_lock`` so the dispatcher gets injected access instead of importing them. - tests/test_album_bundle_dispatch.py: 16 new tests covering the pure predicate (album-required, mode allowlist, name validation, case insensitivity), the resolver-failure fall-through (plugin missing, plugin lacks method, resolver raises), the success path returning False so per-track flows, the failure path returning True with state.mark_failed called, plugin-raise treated as a normal failure, whitespace stripping on names, and progress-callback mirroring lifecycle events into batch state.pull/671/head
parent
670a2db95e
commit
ad59bf05a1
@ -0,0 +1,179 @@
|
||||
"""Album-bundle dispatch for torrent / usenet single-source downloads.
|
||||
|
||||
Lifted from ``run_full_missing_tracks_process`` so the master
|
||||
worker doesn't carry a 90-line inline branch and so the gate logic
|
||||
can be unit-tested in isolation.
|
||||
|
||||
The gate fires only when ALL conditions hold:
|
||||
|
||||
- Batch is an album-context download (``is_album_download`` flag).
|
||||
- Active download source is ``torrent`` or ``usenet`` (single-source
|
||||
mode — hybrid stays per-track to preserve fallback).
|
||||
- Both album-name and artist-name are populated in batch context.
|
||||
- The resolved plugin exposes ``download_album_to_staging``.
|
||||
|
||||
When the gate engages it runs the plugin synchronously (the master
|
||||
worker is already on a thread-pool executor) and mirrors the
|
||||
plugin's lifecycle payloads into the batch state so the Downloads
|
||||
page can render meaningful progress before per-track tasks exist.
|
||||
|
||||
Return semantics: ``True`` means the gate handled the batch — the
|
||||
master worker should stop and not run per-track analysis. ``False``
|
||||
means the gate didn't engage (or engaged-and-fell-back) — caller
|
||||
continues the normal per-track flow.
|
||||
|
||||
The ``BatchStateAccess`` Protocol exists so this module doesn't
|
||||
import ``download_batches`` from runtime_state directly. The
|
||||
caller (master worker) injects accessors so this module stays
|
||||
testable without touching live runtime state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Callable, Optional, Protocol
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BatchStateAccess(Protocol):
|
||||
"""Narrow shim around the batch-state dict ops the dispatch needs.
|
||||
|
||||
Two methods to keep the surface small:
|
||||
- ``update_fields(batch_id, fields)`` — atomic merge into the
|
||||
batch dict under tasks_lock.
|
||||
- ``mark_failed(batch_id, error)`` — convenience for the failure
|
||||
path (sets phase + error + album_bundle_state in one shot).
|
||||
"""
|
||||
|
||||
def update_fields(self, batch_id: str, fields: dict) -> None: ...
|
||||
|
||||
def mark_failed(self, batch_id: str, error: str) -> None: ...
|
||||
|
||||
|
||||
# Fields the album-bundle progress callback may carry. Anything in
|
||||
# this set gets mirrored onto the batch row as ``album_bundle_<key>``
|
||||
# so the Downloads page can render it without coupling to the
|
||||
# specific payload shape.
|
||||
_MIRRORED_KEYS = ('progress', 'release', 'speed', 'downloaded',
|
||||
'size', 'seeders', 'grabs', 'count')
|
||||
|
||||
|
||||
def is_eligible(
|
||||
*,
|
||||
mode: str,
|
||||
is_album: bool,
|
||||
album_name: str,
|
||||
artist_name: str,
|
||||
) -> bool:
|
||||
"""Pure predicate: does this batch even qualify for the album
|
||||
flow? Separate from the resolution+run step so tests can pin
|
||||
the gate logic without standing up a plugin."""
|
||||
if not is_album:
|
||||
return False
|
||||
if (mode or '').lower() not in ('torrent', 'usenet'):
|
||||
return False
|
||||
if not (album_name or '').strip():
|
||||
return False
|
||||
if not (artist_name or '').strip():
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def try_dispatch(
|
||||
*,
|
||||
batch_id: str,
|
||||
is_album: bool,
|
||||
album_context: Optional[dict],
|
||||
artist_context: Optional[dict],
|
||||
config_get: Callable[..., Any],
|
||||
plugin_resolver: Callable[[str], Optional[Any]],
|
||||
state: BatchStateAccess,
|
||||
) -> bool:
|
||||
"""Attempt the album-bundle flow. Returns ``True`` iff the
|
||||
master worker should return early (gate engaged and completed
|
||||
— success OR failure). ``False`` means fall through to the
|
||||
normal per-track flow.
|
||||
|
||||
``config_get`` is a callable shaped like ``config_manager.get``;
|
||||
``plugin_resolver`` resolves a source-name string to an
|
||||
initialised plugin instance (or None); ``state`` is the
|
||||
BatchStateAccess shim. Injecting these keeps the module
|
||||
dependency-light + unit-testable.
|
||||
"""
|
||||
mode = (config_get('download_source.mode', 'soulseek') or 'soulseek').lower()
|
||||
album_name = (album_context or {}).get('name') or ''
|
||||
artist_name = (artist_context or {}).get('name') or ''
|
||||
|
||||
if not is_eligible(mode=mode, is_album=is_album,
|
||||
album_name=album_name, artist_name=artist_name):
|
||||
return False
|
||||
|
||||
album_name = album_name.strip()
|
||||
artist_name = artist_name.strip()
|
||||
|
||||
plugin = None
|
||||
try:
|
||||
plugin = plugin_resolver(mode)
|
||||
except Exception as exc:
|
||||
logger.warning("[Album Bundle] Could not resolve %s plugin: %s", mode, exc)
|
||||
|
||||
if plugin is None or not hasattr(plugin, 'download_album_to_staging'):
|
||||
logger.warning(
|
||||
"[Album Bundle] Gate matched but plugin / context unavailable "
|
||||
"(mode=%s album=%r artist=%r plugin=%s) — falling back to per-track flow",
|
||||
mode, album_name, artist_name,
|
||||
type(plugin).__name__ if plugin else None,
|
||||
)
|
||||
return False
|
||||
|
||||
staging_dir = config_get('import.staging_path', './Staging') or './Staging'
|
||||
logger.info(
|
||||
"[Album Bundle] Engaging %s album flow for '%s' by '%s' -> %s",
|
||||
mode, album_name, artist_name, staging_dir,
|
||||
)
|
||||
state.update_fields(batch_id, {
|
||||
'phase': 'album_downloading',
|
||||
'album_bundle_state': 'searching',
|
||||
'album_bundle_source': mode,
|
||||
})
|
||||
|
||||
def _emit(payload):
|
||||
"""Mirror plugin lifecycle into batch state for UI rendering."""
|
||||
try:
|
||||
fields = {'album_bundle_state': payload.get('state', '')}
|
||||
for key in _MIRRORED_KEYS:
|
||||
if key in payload:
|
||||
fields[f'album_bundle_{key}'] = payload[key]
|
||||
state.update_fields(batch_id, fields)
|
||||
except Exception as exc:
|
||||
logger.debug("[Album Bundle] emit failed: %s", exc)
|
||||
|
||||
try:
|
||||
outcome = plugin.download_album_to_staging(
|
||||
album_name, artist_name, staging_dir, _emit,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("[Album Bundle] %s plugin raised: %s", mode, exc)
|
||||
outcome = {'success': False, 'error': f'Plugin error: {exc}'}
|
||||
|
||||
if not outcome.get('success'):
|
||||
err = outcome.get('error', 'Album bundle download failed')
|
||||
logger.error("[Album Bundle] %s flow failed for '%s': %s",
|
||||
mode, album_name, err)
|
||||
state.mark_failed(batch_id, err)
|
||||
return True
|
||||
|
||||
logger.info(
|
||||
"[Album Bundle] %s staged %d files for '%s' — handing off to per-track staging matcher",
|
||||
mode, len(outcome.get('files', [])), album_name,
|
||||
)
|
||||
state.update_fields(batch_id, {
|
||||
'phase': 'analysis',
|
||||
'album_bundle_state': 'staged',
|
||||
})
|
||||
# Engaged-and-succeeded: we DON'T early-return because the
|
||||
# per-track flow needs to run to create + complete the per-track
|
||||
# task rows. Those tasks will hit try_staging_match and pull the
|
||||
# files we just staged.
|
||||
return False
|
||||
@ -0,0 +1,300 @@
|
||||
"""Tests for ``core/downloads/album_bundle_dispatch.py``.
|
||||
|
||||
Pins the gate predicate, the resolution + run flow, and the
|
||||
fail / fall-through return contract. Mocks the config, plugin
|
||||
resolver, and state access so the dispatcher is testable without
|
||||
standing up runtime_state or a real plugin.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from core.downloads.album_bundle_dispatch import (
|
||||
BatchStateAccess,
|
||||
is_eligible,
|
||||
try_dispatch,
|
||||
)
|
||||
|
||||
|
||||
class _FakeState:
|
||||
"""In-memory ``BatchStateAccess`` for tests — records every
|
||||
update so assertions can check the sequence of fields set."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.fields: dict = {}
|
||||
self.update_calls: list = []
|
||||
self.failed_with: str = ''
|
||||
|
||||
def update_fields(self, batch_id: str, fields: dict) -> None:
|
||||
self.update_calls.append((batch_id, dict(fields)))
|
||||
self.fields.update(fields)
|
||||
|
||||
def mark_failed(self, batch_id: str, error: str) -> None:
|
||||
self.failed_with = error
|
||||
self.fields['phase'] = 'failed'
|
||||
self.fields['error'] = error
|
||||
self.fields['album_bundle_state'] = 'failed'
|
||||
|
||||
|
||||
def _config(values: dict):
|
||||
"""Build a config_get callable from a flat dict."""
|
||||
def _get(key, default=None):
|
||||
return values.get(key, default)
|
||||
return _get
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_eligible pure predicate
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_is_eligible_requires_album_flag() -> None:
|
||||
assert is_eligible(mode='torrent', is_album=False,
|
||||
album_name='X', artist_name='Y') is False
|
||||
|
||||
|
||||
def test_is_eligible_requires_torrent_or_usenet_mode() -> None:
|
||||
for mode in ('soulseek', 'youtube', 'tidal', 'qobuz', 'hifi',
|
||||
'deezer_dl', 'amazon', 'lidarr', 'soundcloud', 'hybrid'):
|
||||
assert is_eligible(mode=mode, is_album=True,
|
||||
album_name='X', artist_name='Y') is False
|
||||
|
||||
|
||||
def test_is_eligible_accepts_torrent_and_usenet() -> None:
|
||||
assert is_eligible(mode='torrent', is_album=True,
|
||||
album_name='X', artist_name='Y') is True
|
||||
assert is_eligible(mode='usenet', is_album=True,
|
||||
album_name='X', artist_name='Y') is True
|
||||
|
||||
|
||||
def test_is_eligible_requires_non_empty_names() -> None:
|
||||
assert is_eligible(mode='torrent', is_album=True,
|
||||
album_name='', artist_name='Y') is False
|
||||
assert is_eligible(mode='torrent', is_album=True,
|
||||
album_name='X', artist_name='') is False
|
||||
assert is_eligible(mode='torrent', is_album=True,
|
||||
album_name=' ', artist_name='Y') is False
|
||||
|
||||
|
||||
def test_is_eligible_case_insensitive_mode() -> None:
|
||||
assert is_eligible(mode='TORRENT', is_album=True,
|
||||
album_name='X', artist_name='Y') is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# try_dispatch — gate evaluation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_dispatch_returns_false_when_not_album() -> None:
|
||||
state = _FakeState()
|
||||
plugin = MagicMock()
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=False,
|
||||
album_context={'name': 'X'}, artist_context={'name': 'Y'},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=lambda _name: plugin, state=state,
|
||||
)
|
||||
assert result is False
|
||||
assert state.update_calls == []
|
||||
plugin.download_album_to_staging.assert_not_called()
|
||||
|
||||
|
||||
def test_dispatch_returns_false_for_non_torrent_modes() -> None:
|
||||
state = _FakeState()
|
||||
plugin = MagicMock()
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'X'}, artist_context={'name': 'Y'},
|
||||
config_get=_config({'download_source.mode': 'soulseek'}),
|
||||
plugin_resolver=lambda _name: plugin, state=state,
|
||||
)
|
||||
assert result is False
|
||||
assert state.update_calls == []
|
||||
|
||||
|
||||
def test_dispatch_returns_false_when_plugin_missing() -> None:
|
||||
"""No plugin available → fall through to per-track flow with a
|
||||
warning. The state SHOULD NOT have been touched."""
|
||||
state = _FakeState()
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'X'}, artist_context={'name': 'Y'},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=lambda _name: None, state=state,
|
||||
)
|
||||
assert result is False
|
||||
assert state.update_calls == []
|
||||
|
||||
|
||||
def test_dispatch_returns_false_when_plugin_lacks_method() -> None:
|
||||
state = _FakeState()
|
||||
# Plugin that doesn't implement download_album_to_staging.
|
||||
class _LegacyPlugin:
|
||||
pass
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'X'}, artist_context={'name': 'Y'},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=lambda _name: _LegacyPlugin(), state=state,
|
||||
)
|
||||
assert result is False
|
||||
assert state.update_calls == []
|
||||
|
||||
|
||||
def test_dispatch_returns_false_when_resolver_raises() -> None:
|
||||
"""Plugin resolution can fail (registry not initialised); we log
|
||||
and fall through rather than crashing the master worker."""
|
||||
state = _FakeState()
|
||||
def _boom(_name):
|
||||
raise RuntimeError("registry not initialised")
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'X'}, artist_context={'name': 'Y'},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=_boom, state=state,
|
||||
)
|
||||
assert result is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# try_dispatch — success / failure paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_dispatch_success_returns_false_so_per_track_can_run() -> None:
|
||||
"""Success → master worker should CONTINUE to per-track flow so
|
||||
each task can hit try_staging_match and find its file."""
|
||||
state = _FakeState()
|
||||
plugin = MagicMock()
|
||||
plugin.download_album_to_staging.return_value = {
|
||||
'success': True, 'files': ['/tmp/a.flac', '/tmp/b.flac'], 'error': None,
|
||||
}
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'GNX'}, artist_context={'name': 'Kendrick Lamar'},
|
||||
config_get=_config({
|
||||
'download_source.mode': 'torrent',
|
||||
'import.staging_path': '/staging/path',
|
||||
}),
|
||||
plugin_resolver=lambda _name: plugin, state=state,
|
||||
)
|
||||
assert result is False
|
||||
# Plugin was called with the right args.
|
||||
args = plugin.download_album_to_staging.call_args
|
||||
assert args.args[0] == 'GNX'
|
||||
assert args.args[1] == 'Kendrick Lamar'
|
||||
assert args.args[2] == '/staging/path'
|
||||
# Phase transitioned through searching → analysis.
|
||||
assert state.fields['phase'] == 'analysis'
|
||||
assert state.fields['album_bundle_state'] == 'staged'
|
||||
assert state.fields['album_bundle_source'] == 'torrent'
|
||||
assert state.failed_with == ''
|
||||
|
||||
|
||||
def test_dispatch_failure_returns_true_so_master_stops() -> None:
|
||||
state = _FakeState()
|
||||
plugin = MagicMock()
|
||||
plugin.download_album_to_staging.return_value = {
|
||||
'success': False, 'files': [], 'error': 'No torrent results found',
|
||||
}
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'GNX'}, artist_context={'name': 'Kendrick Lamar'},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=lambda _name: plugin, state=state,
|
||||
)
|
||||
assert result is True
|
||||
assert state.failed_with == 'No torrent results found'
|
||||
assert state.fields['phase'] == 'failed'
|
||||
|
||||
|
||||
def test_dispatch_plugin_exception_treated_as_failure() -> None:
|
||||
"""A bug / network error in the plugin must not propagate into
|
||||
the master worker — caught + treated as a normal failure so
|
||||
the batch reports the error cleanly."""
|
||||
state = _FakeState()
|
||||
plugin = MagicMock()
|
||||
plugin.download_album_to_staging.side_effect = RuntimeError("network down")
|
||||
result = try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'GNX'}, artist_context={'name': 'Kendrick Lamar'},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=lambda _name: plugin, state=state,
|
||||
)
|
||||
assert result is True
|
||||
assert 'network down' in state.failed_with
|
||||
|
||||
|
||||
def test_dispatch_strips_whitespace_from_names() -> None:
|
||||
"""Trailing whitespace in batch context shouldn't fail the
|
||||
eligibility predicate AND should be cleaned before passing to
|
||||
the plugin."""
|
||||
state = _FakeState()
|
||||
plugin = MagicMock()
|
||||
plugin.download_album_to_staging.return_value = {'success': True, 'files': ['/x']}
|
||||
try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': ' GNX '}, artist_context={'name': ' Kendrick '},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=lambda _name: plugin, state=state,
|
||||
)
|
||||
args = plugin.download_album_to_staging.call_args
|
||||
assert args.args[0] == 'GNX'
|
||||
assert args.args[1] == 'Kendrick'
|
||||
|
||||
|
||||
def test_dispatch_progress_callback_mirrors_payload_to_state() -> None:
|
||||
"""The progress callback the plugin gets must mirror its
|
||||
payload onto the batch state under ``album_bundle_*`` keys so
|
||||
the Downloads page can render progress while the torrent
|
||||
download runs."""
|
||||
state = _FakeState()
|
||||
captured_emit = {}
|
||||
|
||||
def _capture(album, artist, staging, emit):
|
||||
captured_emit['fn'] = emit
|
||||
emit({'state': 'searching', 'release': 'GNX [FLAC]'})
|
||||
emit({'state': 'downloading', 'progress': 0.42, 'speed': 1024 * 1024})
|
||||
emit({'state': 'staged', 'count': 12})
|
||||
return {'success': True, 'files': []}
|
||||
|
||||
plugin = MagicMock()
|
||||
plugin.download_album_to_staging.side_effect = _capture
|
||||
try_dispatch(
|
||||
batch_id='b1', is_album=True,
|
||||
album_context={'name': 'GNX'}, artist_context={'name': 'Kendrick Lamar'},
|
||||
config_get=_config({'download_source.mode': 'torrent'}),
|
||||
plugin_resolver=lambda _name: plugin, state=state,
|
||||
)
|
||||
# State should have seen each of the three lifecycle emissions.
|
||||
states_seen = [fields.get('album_bundle_state')
|
||||
for _, fields in state.update_calls
|
||||
if 'album_bundle_state' in fields]
|
||||
assert 'searching' in states_seen
|
||||
assert 'downloading' in states_seen
|
||||
assert 'staged' in states_seen
|
||||
# Numeric progress + release name made it through.
|
||||
assert state.fields['album_bundle_release'] == 'GNX [FLAC]'
|
||||
assert state.fields['album_bundle_progress'] == 0.42
|
||||
assert state.fields['album_bundle_count'] == 12
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Protocol conformance — runtime impl must satisfy the contract
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_runtime_state_impl_matches_protocol() -> None:
|
||||
"""Sanity check that the concrete BatchStateAccess impl in
|
||||
master.py implements both methods. We don't import master.py
|
||||
here (would pull in heavy deps); duck-check on the _FakeState
|
||||
instead since it's a sibling impl of the same Protocol."""
|
||||
state: BatchStateAccess = _FakeState()
|
||||
state.update_fields('b1', {'x': 1})
|
||||
state.mark_failed('b1', 'oops')
|
||||
assert state.fields['x'] == 1
|
||||
assert state.fields['error'] == 'oops'
|
||||
Loading…
Reference in new issue