Add Soulseek album bundle downloads

Route primary Soulseek album downloads through the album-bundle staging flow, reusing preflight-selected folders when available. In hybrid mode, only the first configured source can claim whole-album bundle behavior so later Soulseek fallback keeps the existing per-track/source-reuse path.

Allow Soulseek album bundles to stage completed tracks when some same-source transfers fail or time out, and keep partial bundles from blocking per-track fallback. Add coverage for dispatcher gating, master flow ordering, task-worker staged-miss behavior, and Soulseek bundle polling.
pull/685/head
Broque Thomas 2 days ago
parent e6c4cc3d87
commit 6c226613bf

@ -7,8 +7,9 @@ can be unit-tested in isolation.
The gate fires only when ALL conditions hold:
- Batch is an album-context download (``is_album_download`` flag).
- Active download source is ``torrent`` or ``usenet`` (single-source
mode hybrid stays per-track to preserve fallback).
- Active download source is ``torrent``, ``usenet``, or ``soulseek``.
In hybrid mode the caller may pass the first configured source as a
source override; later hybrid sources stay per-track to preserve fallback.
- Both album-name and artist-name are populated in batch context.
- The resolved plugin exposes ``download_album_to_staging``.
@ -57,7 +58,7 @@ class BatchStateAccess(Protocol):
# so the Downloads page can render it without coupling to the
# specific payload shape.
_MIRRORED_KEYS = ('progress', 'release', 'speed', 'downloaded',
'size', 'seeders', 'grabs', 'count')
'size', 'seeders', 'grabs', 'count', 'failed')
def is_eligible(
@ -72,7 +73,7 @@ def is_eligible(
the gate logic without standing up a plugin."""
if not is_album:
return False
if (mode or '').lower() not in ('torrent', 'usenet'):
if (mode or '').lower() not in ('torrent', 'usenet', 'soulseek'):
return False
if not (album_name or '').strip():
return False
@ -90,6 +91,8 @@ def try_dispatch(
config_get: Callable[..., Any],
plugin_resolver: Callable[[str], Optional[Any]],
state: BatchStateAccess,
source_override: Optional[str] = None,
plugin_kwargs: Optional[dict] = None,
) -> bool:
"""Attempt the album-bundle flow. Returns ``True`` iff the
master worker should return early (gate engaged and completed
@ -102,7 +105,7 @@ def try_dispatch(
BatchStateAccess shim. Injecting these keeps the module
dependency-light + unit-testable.
"""
mode = (config_get('download_source.mode', 'soulseek') or 'soulseek').lower()
mode = (source_override or config_get('download_source.mode', 'soulseek') or 'soulseek').lower()
album_name = (album_context or {}).get('name') or ''
artist_name = (artist_context or {}).get('name') or ''
@ -159,6 +162,7 @@ def try_dispatch(
try:
outcome = plugin.download_album_to_staging(
album_name, artist_name, staging_dir, _emit,
**(plugin_kwargs or {}),
)
except Exception as exc:
logger.exception("[Album Bundle] %s plugin raised: %s", mode, exc)
@ -166,6 +170,17 @@ def try_dispatch(
if not outcome.get('success'):
err = outcome.get('error', 'Album bundle download failed')
if outcome.get('fallback'):
logger.warning(
"[Album Bundle] %s flow could not commit for '%s': %s — falling back to per-track flow",
mode, album_name, err,
)
state.update_fields(batch_id, {
'phase': 'analysis',
'album_bundle_state': 'fallback',
'album_bundle_error': err,
})
return False
logger.error("[Album Bundle] %s flow failed for '%s': %s",
mode, album_name, err)
state.mark_failed(batch_id, err)
@ -178,6 +193,9 @@ def try_dispatch(
state.update_fields(batch_id, {
'phase': 'analysis',
'album_bundle_state': 'staged',
'album_bundle_partial': bool(outcome.get('partial')),
'album_bundle_expected_count': outcome.get('expected_count'),
'album_bundle_completed_count': outcome.get('completed_count', len(outcome.get('files', []))),
})
# Engaged-and-succeeded: we DON'T early-return because the
# per-track flow needs to run to create + complete the per-track

@ -50,6 +50,7 @@ _VARIANT_WORDS = {
'remix', 'rmx', 'acapella', 'a cappella', 'instrumental', 'karaoke',
'live', 'demo', 'extended',
}
_ALBUM_BUNDLE_SOURCES = frozenset(('torrent', 'usenet', 'soulseek'))
def _norm_text(value: Any) -> str:
@ -245,6 +246,29 @@ def _soulseek_album_preflight_enabled(config_manager: Any) -> bool:
return primary == 'soulseek'
def _resolve_album_bundle_source(config_manager: Any) -> str:
"""Return the album-bundle source for this batch.
In single-source mode, the active source may own the whole album if
it supports album bundles. In hybrid mode, only the first source in
the configured order may claim the whole album; later sources remain
per-track fallback.
"""
mode = (config_manager.get('download_source.mode', 'soulseek') or 'soulseek').lower()
if mode in _ALBUM_BUNDLE_SOURCES:
return mode
if mode != 'hybrid':
return ''
order = config_manager.get('download_source.hybrid_order', ['hifi', 'youtube', 'soulseek'])
first = ''
if order:
first = str(order[0] or '').lower()
else:
first = str(config_manager.get('download_source.hybrid_primary', '') or '').lower()
return first if first in _ALBUM_BUNDLE_SOURCES else ''
@dataclass
class MasterDeps:
"""Bundle of cross-cutting deps the master worker needs."""
@ -338,16 +362,19 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma
# should stop (gate fired and failed); False = engaged-and-
# succeeded OR didn't engage, both fall through to per-track.
_bundle_state = _BatchStateAccessImpl()
if _album_bundle_dispatch.try_dispatch(
batch_id=batch_id,
is_album=batch_is_album,
album_context=batch_album_context,
artist_context=batch_artist_context,
config_get=deps.config_manager.get,
plugin_resolver=deps.download_orchestrator.client,
state=_bundle_state,
):
return
_album_bundle_source = _resolve_album_bundle_source(deps.config_manager)
if _album_bundle_source and _album_bundle_source != 'soulseek':
if _album_bundle_dispatch.try_dispatch(
batch_id=batch_id,
is_album=batch_is_album,
album_context=batch_album_context,
artist_context=batch_artist_context,
config_get=deps.config_manager.get,
plugin_resolver=deps.download_orchestrator.client,
state=_bundle_state,
source_override=_album_bundle_source,
):
return
# Allow duplicate tracks across albums — when enabled, only skip tracks already
# owned in THIS album, not tracks owned in other albums
@ -640,6 +667,7 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma
batch_album_context = batch.get('album_context')
batch_artist_context = batch.get('artist_context')
batch_is_album = batch.get('is_album_download', False)
batch_private_album_bundle = bool(batch.get('album_bundle_private_staging'))
batch_playlist_folder_mode = batch.get('playlist_folder_mode', False)
batch_playlist_name = batch.get('playlist_name', 'Unknown Playlist')
@ -648,7 +676,8 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma
preflight_source = None
preflight_tracks = None
soulseek_is_source = _soulseek_album_preflight_enabled(deps.config_manager)
if batch_is_album and batch_album_context and batch_artist_context and soulseek_is_source:
if (batch_is_album and batch_album_context and batch_artist_context
and soulseek_is_source and not batch_private_album_bundle):
artist_name = batch_artist_context.get('name', '')
album_name = batch_album_context.get('name', '')
if artist_name and album_name:
@ -761,13 +790,44 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma
logger.error(f"[Album Pre-flight] Search failed (non-fatal, falling back to track-by-track): {preflight_err}")
deps.source_reuse_logger.info(f"[Album Pre-flight] Exception: {preflight_err}")
# Soulseek album bundles run after analysis so an already-owned
# album does not get downloaded just because the source supports a
# whole-folder flow. When preflight selected a folder, pass that
# exact source into the bundle downloader so we keep the richer
# tracklist-aware scoring instead of doing a weaker second pick.
_bundle_state = _BatchStateAccessImpl()
_album_bundle_source = _resolve_album_bundle_source(deps.config_manager)
if _album_bundle_source == 'soulseek':
if _album_bundle_dispatch.try_dispatch(
batch_id=batch_id,
is_album=batch_is_album,
album_context=batch_album_context,
artist_context=batch_artist_context,
config_get=deps.config_manager.get,
plugin_resolver=deps.download_orchestrator.client,
state=_bundle_state,
source_override=_album_bundle_source,
plugin_kwargs={
'preferred_source': preflight_source,
'preferred_tracks': preflight_tracks,
} if preflight_source and preflight_tracks else None,
):
return
with tasks_lock:
if batch_id not in download_batches: return
download_batches[batch_id]['phase'] = 'downloading'
# Store album pre-flight results on batch for source reuse
if preflight_source and preflight_tracks:
# unless the Soulseek album-bundle path already staged a private
# release. Task workers check source reuse before staging match, so
# preloading here would make the staged happy path re-download.
if (
preflight_source
and preflight_tracks
and not download_batches[batch_id].get('album_bundle_private_staging')
):
download_batches[batch_id]['last_good_source'] = preflight_source
download_batches[batch_id]['source_folder_tracks'] = preflight_tracks
download_batches[batch_id]['failed_sources'] = set()

@ -34,7 +34,7 @@ logger = logging.getLogger(__name__)
def _private_album_bundle_staging_miss_reason(batch_id: Optional[str], deps: Any) -> Optional[str]:
"""Return a user-facing miss reason when per-track search should stop.
Torrent / usenet album batches first download one private staged release,
Torrent / usenet / Soulseek album batches first download one private staged release,
then each track claims the matching staged file. If that claim fails after
the release is already staged, falling through to the normal per-track
search only retries release-level sources N times and can keep re-adding
@ -49,11 +49,19 @@ def _private_album_bundle_staging_miss_reason(batch_id: Optional[str], deps: Any
source = (batch.get('album_bundle_source') or '').lower()
mode = (getattr(deps.download_orchestrator, 'mode', '') or '').lower()
hybrid_first = ''
if mode == 'hybrid':
order = getattr(deps.download_orchestrator, 'hybrid_order', None) or []
if order:
hybrid_first = str(order[0] or '').lower()
else:
hybrid_first = str(getattr(deps.download_orchestrator, 'hybrid_primary', '') or '').lower()
if (
batch.get('album_bundle_private_staging')
and batch.get('album_bundle_state') == 'staged'
and source in ('torrent', 'usenet')
and mode == source
and not batch.get('album_bundle_partial')
and source in ('torrent', 'usenet', 'soulseek')
and (mode == source or (mode == 'hybrid' and hybrid_first == source))
):
return f'Track was not found in the staged {source} album release'

@ -18,7 +18,13 @@ from core.download_plugins.types import (
SearchResult,
TrackResult,
)
from core.download_plugins.album_bundle import (
copy_audio_files_atomically,
get_poll_interval,
get_poll_timeout,
)
from core.download_plugins.base import DownloadSourcePlugin
from utils.async_helpers import run_async
logger = get_logger("soulseek_client")
@ -1456,6 +1462,326 @@ class SoulseekClient(DownloadSourcePlugin):
logger.info(f"Downloading: {best_result.filename} ({quality_info}) from {best_result.username}")
return await self.download(best_result.username, best_result.filename, best_result.size)
def download_album_to_staging(
self,
album_name: str,
artist_name: str,
staging_dir: str,
progress_callback=None,
*,
preferred_source: Optional[Dict[str, Any]] = None,
preferred_tracks: Optional[List[TrackResult]] = None,
) -> Dict[str, Any]:
"""One-shot Soulseek album download.
Search for one album folder, enqueue files from that single
``username + folder_path``, wait for slskd to report completion,
then copy completed files into the private album-bundle staging
directory. If the folder cannot be selected or enqueued cleanly,
callers may fall back to the existing per-track Soulseek flow.
Once files are staged, the per-track staging matcher owns final
import, same as torrent / usenet album bundles.
"""
result: Dict[str, Any] = {
'success': False,
'files': [],
'error': None,
'fallback': True,
'partial': False,
}
if not self.is_configured():
result['error'] = 'Soulseek source not configured'
return result
def _emit(state: str, **extra) -> None:
if progress_callback:
try:
progress_callback({'state': state, **extra})
except Exception as cb_exc:
logger.debug("[Soulseek album] progress callback failed: %s", cb_exc)
picked = None
folder_tracks = list(preferred_tracks or [])
username = (preferred_source or {}).get('username', '') if preferred_source else ''
folder_path = (preferred_source or {}).get('folder_path', '') if preferred_source else ''
if username and folder_path:
logger.info(
"[Soulseek album] Using preflight-selected folder %s:%s",
username,
folder_path,
)
_emit('searching', query=f"{artist_name} {album_name}".strip(), release=folder_path)
else:
query = f"{artist_name} {album_name}".strip()
_emit('searching', query=query)
try:
_, albums = run_async(self.search(query, timeout=30))
except Exception as exc:
result['error'] = f'Soulseek album search failed: {exc}'
return result
if not albums:
result['error'] = 'No complete Soulseek album folders found'
return result
picked = self._pick_album_bundle_folder(albums, album_name, artist_name)
if picked is None:
result['error'] = 'No suitable Soulseek album folder after filtering'
return result
folder_path = getattr(picked, 'album_path', '') or ''
username = getattr(picked, 'username', '') or ''
if not username or not folder_path:
result['error'] = 'No suitable Soulseek album folder after filtering'
return result
logger.info(
"[Soulseek album] Picked %s:%s (%s tracks, quality=%s)",
username,
folder_path,
getattr(picked, 'track_count', 0),
getattr(picked, 'dominant_quality', ''),
)
_emit(
'queued',
release=getattr(picked, 'album_title', folder_path) if picked else folder_path,
count=getattr(picked, 'track_count', 0) if picked else len(folder_tracks),
)
if not folder_tracks:
try:
browse_files = run_async(self.browse_user_directory(username, folder_path))
except Exception as exc:
result['error'] = f'Soulseek folder browse failed: {exc}'
return result
if not browse_files:
result['error'] = 'Could not browse selected Soulseek album folder'
return result
folder_tracks = self.parse_browse_results_to_tracks(
username,
browse_files,
directory=folder_path,
)
folder_tracks = self.filter_results_by_quality_preference(folder_tracks)
if not folder_tracks:
result['error'] = 'Selected Soulseek album folder contained no audio files'
return result
transfer_keys: Dict[tuple, TrackResult] = {}
_emit(
'downloading',
release=getattr(picked, 'album_title', folder_path) if picked else folder_path,
count=len(folder_tracks),
)
for track in folder_tracks:
try:
download_id = run_async(self.download(track.username, track.filename, track.size))
except Exception as exc:
logger.warning("[Soulseek album] Failed to enqueue %s: %s", track.filename, exc)
continue
if download_id:
transfer_keys[(track.username, track.filename)] = track
if not transfer_keys:
result['error'] = 'No Soulseek album files could be enqueued'
return result
result['fallback'] = False
completed = self._poll_album_bundle_downloads(transfer_keys, _emit)
if not completed:
result['error'] = 'Soulseek album download failed or timed out'
return result
_emit('staging', release=getattr(picked, 'album_title', folder_path) if picked else folder_path)
copied = copy_audio_files_atomically(completed, Path(staging_dir))
if not copied:
result['error'] = 'No Soulseek album files copied to staging'
return result
partial = len(copied) < len(transfer_keys)
if partial:
logger.warning(
"[Soulseek album] Staged partial album for '%s': %d/%d files",
album_name,
len(copied),
len(transfer_keys),
)
else:
logger.info("[Soulseek album] Staged %d files for '%s'", len(copied), album_name)
_emit('staged', count=len(copied))
result['success'] = True
result['files'] = copied
result['partial'] = partial
result['expected_count'] = len(transfer_keys)
result['completed_count'] = len(copied)
return result
def _pick_album_bundle_folder(
self,
albums: List[AlbumResult],
album_name: str,
artist_name: str,
) -> Optional[AlbumResult]:
scored = []
for album in albums:
tracks = self.filter_results_by_quality_preference(list(getattr(album, 'tracks', []) or []))
if not tracks:
continue
album_text = f"{getattr(album, 'album_title', '')} {getattr(album, 'album_path', '')}"
artist_text = f"{getattr(album, 'artist', '')} {getattr(album, 'album_path', '')}"
album_score = self._bundle_similarity(album_name, album_text)
artist_score = self._bundle_similarity(artist_name, artist_text)
track_count = int(getattr(album, 'track_count', 0) or len(tracks))
count_score = 1.0 if track_count >= 3 else 0.35
score = (
album_score * 0.42
+ artist_score * 0.22
+ count_score * 0.12
+ min(1.0, len(tracks) / max(1, track_count)) * 0.12
+ float(getattr(album, 'quality_score', 0.0) or 0.0) * 0.12
)
scored.append((score, len(tracks), album))
if not scored:
return None
scored.sort(key=lambda row: (row[0], row[1], getattr(row[2], 'quality_score', 0.0)), reverse=True)
best_score, _, best = scored[0]
if best_score < 0.58:
logger.warning("[Soulseek album] Best folder score %.3f below threshold", best_score)
return None
return best
@staticmethod
def _bundle_similarity(expected: Any, actual: Any) -> float:
import re
from difflib import SequenceMatcher
left = re.sub(r'[^a-z0-9]+', ' ', str(expected or '').lower()).strip()
right = re.sub(r'[^a-z0-9]+', ' ', str(actual or '').lower()).strip()
if not left or not right:
return 0.0
if left == right:
return 1.0
left_words = set(left.split())
right_words = set(right.split())
if left_words and left_words.issubset(right_words):
return 0.92
if right_words and right_words.issubset(left_words):
return 0.86
if left in right or right in left:
return min(len(left), len(right)) / max(len(left), len(right))
return SequenceMatcher(None, left, right).ratio()
def _poll_album_bundle_downloads(self, transfer_keys: Dict[tuple, TrackResult], emit) -> List[Path]:
deadline = time.monotonic() + get_poll_timeout()
interval = get_poll_interval()
completed_paths: Dict[tuple, Path] = {}
failed_states: Dict[tuple, str] = {}
while time.monotonic() < deadline:
try:
downloads = run_async(self.get_all_downloads())
except Exception as exc:
logger.warning("[Soulseek album] Poll error: %s", exc)
downloads = []
by_key = {}
for dl in downloads:
exact_key = (dl.username, dl.filename)
by_key[exact_key] = dl
basename_key = (
dl.username,
os.path.basename((dl.filename or '').replace('\\', '/')),
)
by_key.setdefault(basename_key, dl)
for key, track in transfer_keys.items():
if key in completed_paths or key in failed_states:
continue
dl = by_key.get(key) or by_key.get((
key[0],
os.path.basename((key[1] or '').replace('\\', '/')),
))
state = (getattr(dl, 'state', '') or '') if dl else ''
if any(token in state for token in ('Errored', 'Failed', 'Rejected', 'TimedOut')):
failed_states[key] = state or 'Failed'
logger.warning(
"[Soulseek album] Transfer failed from selected folder: %s (%s)",
os.path.basename((track.filename or '').replace('\\', '/')),
failed_states[key],
)
continue
if dl and ('Completed' in state or 'Succeeded' in state):
if dl.size and dl.transferred and dl.transferred < dl.size:
continue
path = self._resolve_downloaded_album_file(track.filename)
if path:
completed_paths[key] = path
else:
logger.debug(
"[Soulseek album] Transfer completed but local file not found yet: %s",
track.filename,
)
emit(
'downloading',
progress=round(len(completed_paths) / max(1, len(transfer_keys)) * 100, 1),
count=len(completed_paths),
failed=len(failed_states),
)
if completed_paths and len(completed_paths) + len(failed_states) == len(transfer_keys):
logger.warning(
"[Soulseek album] Selected folder finished with %d completed and %d failed transfer(s)",
len(completed_paths),
len(failed_states),
)
return list(completed_paths.values())
if not completed_paths and len(failed_states) == len(transfer_keys):
logger.warning("[Soulseek album] All %d transfer(s) failed from selected folder", len(failed_states))
return []
if len(completed_paths) == len(transfer_keys):
return list(completed_paths.values())
time.sleep(interval)
pending = len(transfer_keys) - len(completed_paths) - len(failed_states)
if completed_paths:
logger.warning(
"[Soulseek album] Timed out with partial album: %d completed, %d failed, %d pending",
len(completed_paths),
len(failed_states),
pending,
)
return list(completed_paths.values())
logger.error(
"[Soulseek album] Timed out waiting for %d album files (%d failed, %d pending)",
len(transfer_keys),
len(failed_states),
pending,
)
return []
def _resolve_downloaded_album_file(self, remote_filename: str) -> Optional[Path]:
basename = os.path.basename((remote_filename or '').replace('\\', '/'))
if not basename:
return None
candidates = [
self.download_path / remote_filename,
self.download_path / basename,
]
normalized_parts = [p for p in remote_filename.replace('\\', '/').split('/') if p]
if normalized_parts:
candidates.append(self.download_path.joinpath(*normalized_parts))
for candidate in candidates:
try:
if candidate.exists() and candidate.is_file():
return candidate
except OSError:
continue
try:
matches = list(self.download_path.rglob(basename))
except OSError:
matches = []
for match in matches:
if match.is_file():
return match
return None
async def check_connection(self) -> bool:
"""Check if slskd is running and connected to the Soulseek network"""

@ -132,6 +132,37 @@ class _FakeSoulseekWrapper:
return self.soulseek if name == 'soulseek' else None
class _FakePluginWrapper:
def __init__(self, plugins):
self._plugins = dict(plugins)
def client(self, name):
return self._plugins.get(name)
class _FakeAlbumBundleSoulseek:
def __init__(self, outcome=None):
self.calls = []
self.outcome = outcome or {'success': True, 'files': ['/tmp/a.flac']}
def download_album_to_staging(self, album, artist, staging, emit, **kwargs):
self.calls.append((album, artist, staging, kwargs))
emit({'state': 'staged', 'count': len(self.outcome.get('files', []))})
return self.outcome
class _FakePreflightAlbumBundleSoulseek(_FakeSoulseek):
def __init__(self, *args, outcome=None, **kwargs):
super().__init__(*args, **kwargs)
self.calls = []
self.outcome = outcome or {'success': True, 'files': ['/tmp/a.flac']}
def download_album_to_staging(self, album, artist, staging, emit, **kwargs):
self.calls.append((album, artist, staging, kwargs))
emit({'state': 'staged', 'count': len(self.outcome.get('files', []))})
return self.outcome
class _FakeMonitor:
def __init__(self):
self.started = []
@ -704,6 +735,133 @@ def test_soulseek_album_preflight_does_not_jump_ahead_of_hybrid_primary(monkeypa
assert 'last_good_source' not in download_batches['B24']
def test_soulseek_album_bundle_runs_after_missing_analysis(monkeypatch):
"""Soulseek whole-folder bundles should engage only after analysis
has confirmed there is something missing."""
db = _FakeDB()
monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db)
plugin = _FakeAlbumBundleSoulseek()
deps = _build_deps(
config=_FakeConfig({'download_source.mode': 'soulseek'}),
soulseek=_FakeSoulseekWrapper(plugin),
)
_seed_batch(
'B25',
is_album_download=True,
album_context={'name': 'Test Album', 'total_tracks': 1},
artist_context={'name': 'Artist'},
)
tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}]
mw.run_full_missing_tracks_process('B25', 'album:1', tracks, deps)
assert len(plugin.calls) == 1
album, artist, staging, kwargs = plugin.calls[0]
assert (album, artist) == ('Test Album', 'Artist')
assert staging.replace('\\', '/').endswith('storage/album_bundle_staging/B25')
assert kwargs == {}
assert download_batches['B25']['album_bundle_source'] == 'soulseek'
assert download_batches['B25']['album_bundle_private_staging'] is True
assert download_batches['B25']['album_bundle_state'] == 'staged'
assert 'last_good_source' not in download_batches['B25']
def test_hybrid_first_soulseek_uses_album_bundle(monkeypatch):
"""Hybrid keeps fallback semantics, but the first source can own
album-bundle downloads when it supports them."""
db = _FakeDB()
monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db)
plugin = _FakeAlbumBundleSoulseek()
deps = _build_deps(
config=_FakeConfig({
'download_source.mode': 'hybrid',
'download_source.hybrid_order': ['soulseek', 'hifi'],
}),
soulseek=_FakeSoulseekWrapper(plugin),
)
_seed_batch(
'B26',
is_album_download=True,
album_context={'name': 'Test Album', 'total_tracks': 1},
artist_context={'name': 'Artist'},
)
tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}]
mw.run_full_missing_tracks_process('B26', 'album:1', tracks, deps)
assert len(plugin.calls) == 1
assert download_batches['B26']['album_bundle_source'] == 'soulseek'
assert download_batches['B26']['album_bundle_private_staging'] is True
def test_soulseek_album_bundle_uses_preflight_source_without_preloading_reuse(monkeypatch):
"""When the bundle path stages files, workers must claim staging
before any Soulseek source-reuse attempt can fire."""
db = _FakeDB()
monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db)
tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}]
folder_tracks = [_slsk_track('T1', 1, folder='Artist/Test Album')]
album = _album_result('peer', 'Artist/Test Album', 'Test Album', folder_tracks)
slsk = _FakePreflightAlbumBundleSoulseek(
album_results=[album],
browse_files=None,
parsed_tracks=folder_tracks,
)
deps = _build_deps(
config=_FakeConfig({'download_source.mode': 'soulseek'}),
soulseek=_FakeSoulseekWrapper(slsk),
)
_seed_batch(
'B28',
is_album_download=True,
album_context={'name': 'Test Album', 'total_tracks': 1},
artist_context={'name': 'Artist'},
)
mw.run_full_missing_tracks_process('B28', 'album:1', tracks, deps)
assert len(slsk.calls) == 1
assert slsk.calls[0][3] == {
'preferred_source': {
'username': 'peer',
'folder_path': 'Artist/Test Album',
},
'preferred_tracks': folder_tracks,
}
assert download_batches['B28']['album_bundle_private_staging'] is True
assert 'last_good_source' not in download_batches['B28']
assert 'source_folder_tracks' not in download_batches['B28']
def test_hybrid_first_torrent_uses_album_bundle_before_per_track(monkeypatch):
db = _FakeDB()
monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db)
plugin = _FakeAlbumBundleSoulseek()
deps = _build_deps(
config=_FakeConfig({
'download_source.mode': 'hybrid',
'download_source.hybrid_order': ['torrent', 'soulseek'],
}),
soulseek=_FakePluginWrapper({'torrent': plugin}),
)
_seed_batch(
'B27',
is_album_download=True,
album_context={'name': 'Test Album', 'total_tracks': 1},
artist_context={'name': 'Artist'},
)
tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}]
mw.run_full_missing_tracks_process('B27', 'album:1', tracks, deps)
assert len(plugin.calls) == 1
assert download_batches['B27']['album_bundle_source'] == 'torrent'
# ---------------------------------------------------------------------------
# Task creation
# ---------------------------------------------------------------------------

@ -223,6 +223,92 @@ def test_private_torrent_album_staging_miss_skips_per_track_search():
assert ('done', ('b1', 't1', False), {}) in rec.calls
def test_private_soulseek_album_staging_miss_skips_per_track_search():
_seed_task(track_info={
'id': 'sp-1', 'name': 'Song', 'artists': ['Artist'],
'album': 'Album', 'duration_ms': 180000,
})
download_batches['b1'] = {
'album_bundle_private_staging': True,
'album_bundle_state': 'staged',
'album_bundle_source': 'soulseek',
}
client = _FakeClient(results=['should-not-search'], mode='soulseek')
rec = _Recorder()
deps, _ = _build_deps(
soulseek=client,
matching=_FakeMatchEngine(queries=['Artist Song']),
try_staging_match=lambda *a, **kw: False,
on_download_completed=rec('done'),
)
tw.download_track_worker('t1', 'b1', deps)
assert client.search_calls == []
assert download_tasks['t1']['status'] == 'not_found'
assert 'staged soulseek album release' in download_tasks['t1']['error_message']
assert ('done', ('b1', 't1', False), {}) in rec.calls
def test_private_hybrid_first_soulseek_album_staging_miss_skips_per_track_search():
_seed_task(track_info={
'id': 'sp-1', 'name': 'Song', 'artists': ['Artist'],
'album': 'Album', 'duration_ms': 180000,
})
download_batches['b1'] = {
'album_bundle_private_staging': True,
'album_bundle_state': 'staged',
'album_bundle_source': 'soulseek',
}
client = _FakeClient(
results=['should-not-search'],
mode='hybrid',
subclients={'hybrid_order': ['soulseek', 'hifi']},
)
rec = _Recorder()
deps, _ = _build_deps(
soulseek=client,
matching=_FakeMatchEngine(queries=['Artist Song']),
try_staging_match=lambda *a, **kw: False,
on_download_completed=rec('done'),
)
tw.download_track_worker('t1', 'b1', deps)
assert client.search_calls == []
assert download_tasks['t1']['status'] == 'not_found'
assert 'staged soulseek album release' in download_tasks['t1']['error_message']
def test_partial_private_hybrid_first_soulseek_album_staging_miss_allows_per_track_search():
_seed_task(track_info={
'id': 'sp-1', 'name': 'Song', 'artists': ['Artist'],
'album': 'Album', 'duration_ms': 180000,
})
download_batches['b1'] = {
'album_bundle_private_staging': True,
'album_bundle_state': 'staged',
'album_bundle_source': 'soulseek',
'album_bundle_partial': True,
}
client = _FakeClient(
results=[],
mode='hybrid',
subclients={'hybrid_order': ['soulseek', 'hifi']},
)
deps, _ = _build_deps(
soulseek=client,
matching=_FakeMatchEngine(queries=['Artist Song']),
try_staging_match=lambda *a, **kw: False,
)
tw.download_track_worker('t1', 'b1', deps)
assert client.search_calls
assert download_tasks['t1']['status'] == 'not_found'
assert 'staged soulseek album release' not in download_tasks['t1']['error_message']
# ---------------------------------------------------------------------------
# Search loop happy path
# ---------------------------------------------------------------------------

@ -28,6 +28,7 @@ from unittest.mock import AsyncMock, patch
import pytest
from core.soulseek_client import SoulseekClient
from core.download_plugins.types import AlbumResult, DownloadStatus, TrackResult
def _run_async(coro):
@ -121,6 +122,153 @@ def test_download_extracts_id_from_dict_response(configured_client):
assert result == 'abc123'
# ---------------------------------------------------------------------------
# album bundle
# ---------------------------------------------------------------------------
def _track(username='peer', filename='Artist/Album/01 - Song.flac', title='Song', number=1, size=10):
return TrackResult(
username=username,
filename=filename,
size=size,
bitrate=None,
duration=180000,
quality='flac',
free_upload_slots=1,
upload_speed=1_000_000,
queue_length=0,
artist='Artist',
title=title,
album='Album',
track_number=number,
)
def test_album_bundle_stages_one_selected_soulseek_folder(configured_client, tmp_path):
configured_client.download_path = tmp_path
local_file = tmp_path / '01 - Song.flac'
local_file.write_bytes(b'audio')
track = _track(filename='Artist/Album/01 - Song.flac')
album = AlbumResult(
username='peer',
album_path='Artist/Album',
album_title='Album',
artist='Artist',
track_count=1,
total_size=10,
tracks=[track],
dominant_quality='flac',
free_upload_slots=1,
upload_speed=1_000_000,
queue_length=0,
)
events = []
with patch.object(configured_client, 'search', AsyncMock(return_value=([], [album]))), \
patch.object(configured_client, 'browse_user_directory', AsyncMock(return_value=[
{'filename': '01 - Song.flac', 'size': 10}
])), \
patch.object(configured_client, 'filter_results_by_quality_preference', side_effect=lambda tracks: tracks), \
patch.object(configured_client, 'download', AsyncMock(return_value='dl-1')) as download_mock, \
patch.object(configured_client, 'get_all_downloads', AsyncMock(return_value=[
DownloadStatus(
id='dl-1',
username='peer',
filename='Artist/Album/01 - Song.flac',
state='Completed, Succeeded',
progress=100,
size=10,
transferred=10,
speed=0,
)
])), \
patch('core.soulseek_client.get_poll_timeout', return_value=1), \
patch('core.soulseek_client.get_poll_interval', return_value=0.01):
outcome = configured_client.download_album_to_staging(
'Album',
'Artist',
str(tmp_path / 'staging'),
events.append,
)
assert outcome['success'] is True
assert outcome['fallback'] is False
assert len(outcome['files']) == 1
assert Path(outcome['files'][0]).read_bytes() == b'audio'
download_mock.assert_awaited_once_with(
'peer',
'Artist/Album/01 - Song.flac',
10,
)
assert events[-1]['state'] == 'staged'
def test_album_bundle_stages_completed_files_when_same_source_partially_times_out(configured_client, tmp_path):
configured_client.download_path = tmp_path
(tmp_path / '01 - Ready.flac').write_bytes(b'audio')
ready = _track(filename='Artist/Album/01 - Ready.flac', title='Ready', number=1)
timed_out = _track(filename='Artist/Album/02 - Waiting.flac', title='Waiting', number=2)
events = []
with patch.object(configured_client, 'download', AsyncMock(side_effect=['dl-1', 'dl-2'])), \
patch.object(configured_client, 'filter_results_by_quality_preference', side_effect=lambda tracks: tracks), \
patch.object(configured_client, 'get_all_downloads', AsyncMock(return_value=[
DownloadStatus(
id='dl-1',
username='peer',
filename='Artist/Album/01 - Ready.flac',
state='Completed, Succeeded',
progress=100,
size=10,
transferred=10,
speed=0,
),
DownloadStatus(
id='dl-2',
username='peer',
filename='Artist/Album/02 - Waiting.flac',
state='TimedOut',
progress=0,
size=10,
transferred=0,
speed=0,
),
])), \
patch('core.soulseek_client.get_poll_timeout', return_value=1), \
patch('core.soulseek_client.get_poll_interval', return_value=0.01):
outcome = configured_client.download_album_to_staging(
'Album',
'Artist',
str(tmp_path / 'staging'),
events.append,
preferred_source={'username': 'peer', 'folder_path': 'Artist/Album'},
preferred_tracks=[ready, timed_out],
)
assert outcome['success'] is True
assert outcome['fallback'] is False
assert len(outcome['files']) == 1
assert Path(outcome['files'][0]).name == '01 - Ready.flac'
assert any(event.get('failed') == 1 for event in events)
def test_album_bundle_falls_back_when_no_album_folder(configured_client, tmp_path):
configured_client.download_path = tmp_path
with patch.object(configured_client, 'search', AsyncMock(return_value=([], []))), \
patch.object(configured_client, 'download', AsyncMock(return_value='dl-1')) as dl:
outcome = configured_client.download_album_to_staging(
'Missing Album',
'Artist',
str(tmp_path / 'staging'),
)
assert outcome['success'] is False
assert outcome['fallback'] is True
assert 'No complete Soulseek album folders' in outcome['error']
dl.assert_not_awaited()
def test_download_extracts_id_from_list_response(configured_client):
"""Pinning: slskd sometimes returns a list of file objects.
The first item's id is the download_id."""

@ -56,18 +56,20 @@ def test_is_eligible_requires_album_flag() -> None:
album_name='X', artist_name='Y') is False
def test_is_eligible_requires_torrent_or_usenet_mode() -> None:
for mode in ('soulseek', 'youtube', 'tidal', 'qobuz', 'hifi',
def test_is_eligible_requires_album_bundle_mode() -> None:
for mode in ('youtube', 'tidal', 'qobuz', 'hifi',
'deezer_dl', 'amazon', 'lidarr', 'soundcloud', 'hybrid'):
assert is_eligible(mode=mode, is_album=True,
album_name='X', artist_name='Y') is False
def test_is_eligible_accepts_torrent_and_usenet() -> None:
def test_is_eligible_accepts_torrent_usenet_and_soulseek() -> None:
assert is_eligible(mode='torrent', is_album=True,
album_name='X', artist_name='Y') is True
assert is_eligible(mode='usenet', is_album=True,
album_name='X', artist_name='Y') is True
assert is_eligible(mode='soulseek', is_album=True,
album_name='X', artist_name='Y') is True
def test_is_eligible_requires_non_empty_names() -> None:
@ -103,13 +105,13 @@ def test_dispatch_returns_false_when_not_album() -> None:
plugin.download_album_to_staging.assert_not_called()
def test_dispatch_returns_false_for_non_torrent_modes() -> None:
def test_dispatch_returns_false_for_non_album_bundle_modes() -> None:
state = _FakeState()
plugin = MagicMock()
result = try_dispatch(
batch_id='b1', is_album=True,
album_context={'name': 'X'}, artist_context={'name': 'Y'},
config_get=_config({'download_source.mode': 'soulseek'}),
config_get=_config({'download_source.mode': 'youtube'}),
plugin_resolver=lambda _name: plugin, state=state,
)
assert result is False
@ -234,6 +236,28 @@ def test_dispatch_failure_returns_true_so_master_stops() -> None:
assert state.fields['phase'] == 'failed'
def test_dispatch_fallback_failure_returns_false_for_per_track_flow() -> None:
state = _FakeState()
plugin = MagicMock()
plugin.download_album_to_staging.return_value = {
'success': False,
'files': [],
'error': 'No complete Soulseek album folders found',
'fallback': True,
}
result = try_dispatch(
batch_id='b1', is_album=True,
album_context={'name': 'Album'}, artist_context={'name': 'Artist'},
config_get=_config({'download_source.mode': 'soulseek'}),
plugin_resolver=lambda _name: plugin, state=state,
)
assert result is False
assert state.failed_with == ''
assert state.fields['phase'] == 'analysis'
assert state.fields['album_bundle_state'] == 'fallback'
assert state.fields['album_bundle_error'] == 'No complete Soulseek album folders found'
def test_dispatch_plugin_exception_treated_as_failure() -> None:
"""A bug / network error in the plugin must not propagate into
the master worker caught + treated as a normal failure so
@ -269,6 +293,25 @@ def test_dispatch_strips_whitespace_from_names() -> None:
assert args.args[1] == 'Kendrick'
def test_dispatch_source_override_uses_first_hybrid_source() -> None:
state = _FakeState()
plugin = MagicMock()
plugin.download_album_to_staging.return_value = {'success': True, 'files': ['/x']}
seen = []
try_dispatch(
batch_id='b1', is_album=True,
album_context={'name': 'GNX'}, artist_context={'name': 'Kendrick Lamar'},
config_get=_config({'download_source.mode': 'hybrid'}),
plugin_resolver=lambda name: seen.append(name) or plugin,
state=state,
source_override='soulseek',
)
assert seen == ['soulseek']
assert state.fields['album_bundle_source'] == 'soulseek'
def test_dispatch_progress_callback_mirrors_payload_to_state() -> None:
"""The progress callback the plugin gets must mirror its
payload onto the batch state under ``album_bundle_*`` keys so

Loading…
Cancel
Save