From 6c226613bf5be1d3198ead0e9dc7ffb5ba443e9f Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Sat, 23 May 2026 15:08:21 -0700 Subject: [PATCH] Add Soulseek album bundle downloads Route primary Soulseek album downloads through the album-bundle staging flow, reusing preflight-selected folders when available. In hybrid mode, only the first configured source can claim whole-album bundle behavior so later Soulseek fallback keeps the existing per-track/source-reuse path. Allow Soulseek album bundles to stage completed tracks when some same-source transfers fail or time out, and keep partial bundles from blocking per-track fallback. Add coverage for dispatcher gating, master flow ordering, task-worker staged-miss behavior, and Soulseek bundle polling. --- core/downloads/album_bundle_dispatch.py | 28 +- core/downloads/master.py | 84 ++++- core/downloads/task_worker.py | 14 +- core/soulseek_client.py | 326 ++++++++++++++++++ tests/downloads/test_downloads_master.py | 158 +++++++++ tests/downloads/test_downloads_task_worker.py | 86 +++++ tests/downloads/test_soulseek_pinning.py | 148 ++++++++ tests/test_album_bundle_dispatch.py | 53 ++- 8 files changed, 872 insertions(+), 25 deletions(-) diff --git a/core/downloads/album_bundle_dispatch.py b/core/downloads/album_bundle_dispatch.py index 6a74914f..09ec1a46 100644 --- a/core/downloads/album_bundle_dispatch.py +++ b/core/downloads/album_bundle_dispatch.py @@ -7,8 +7,9 @@ can be unit-tested in isolation. The gate fires only when ALL conditions hold: - Batch is an album-context download (``is_album_download`` flag). -- Active download source is ``torrent`` or ``usenet`` (single-source - mode — hybrid stays per-track to preserve fallback). +- Active download source is ``torrent``, ``usenet``, or ``soulseek``. + In hybrid mode the caller may pass the first configured source as a + source override; later hybrid sources stay per-track to preserve fallback. - Both album-name and artist-name are populated in batch context. - The resolved plugin exposes ``download_album_to_staging``. @@ -57,7 +58,7 @@ class BatchStateAccess(Protocol): # so the Downloads page can render it without coupling to the # specific payload shape. _MIRRORED_KEYS = ('progress', 'release', 'speed', 'downloaded', - 'size', 'seeders', 'grabs', 'count') + 'size', 'seeders', 'grabs', 'count', 'failed') def is_eligible( @@ -72,7 +73,7 @@ def is_eligible( the gate logic without standing up a plugin.""" if not is_album: return False - if (mode or '').lower() not in ('torrent', 'usenet'): + if (mode or '').lower() not in ('torrent', 'usenet', 'soulseek'): return False if not (album_name or '').strip(): return False @@ -90,6 +91,8 @@ def try_dispatch( config_get: Callable[..., Any], plugin_resolver: Callable[[str], Optional[Any]], state: BatchStateAccess, + source_override: Optional[str] = None, + plugin_kwargs: Optional[dict] = None, ) -> bool: """Attempt the album-bundle flow. Returns ``True`` iff the master worker should return early (gate engaged and completed @@ -102,7 +105,7 @@ def try_dispatch( BatchStateAccess shim. Injecting these keeps the module dependency-light + unit-testable. """ - mode = (config_get('download_source.mode', 'soulseek') or 'soulseek').lower() + mode = (source_override or config_get('download_source.mode', 'soulseek') or 'soulseek').lower() album_name = (album_context or {}).get('name') or '' artist_name = (artist_context or {}).get('name') or '' @@ -159,6 +162,7 @@ def try_dispatch( try: outcome = plugin.download_album_to_staging( album_name, artist_name, staging_dir, _emit, + **(plugin_kwargs or {}), ) except Exception as exc: logger.exception("[Album Bundle] %s plugin raised: %s", mode, exc) @@ -166,6 +170,17 @@ def try_dispatch( if not outcome.get('success'): err = outcome.get('error', 'Album bundle download failed') + if outcome.get('fallback'): + logger.warning( + "[Album Bundle] %s flow could not commit for '%s': %s — falling back to per-track flow", + mode, album_name, err, + ) + state.update_fields(batch_id, { + 'phase': 'analysis', + 'album_bundle_state': 'fallback', + 'album_bundle_error': err, + }) + return False logger.error("[Album Bundle] %s flow failed for '%s': %s", mode, album_name, err) state.mark_failed(batch_id, err) @@ -178,6 +193,9 @@ def try_dispatch( state.update_fields(batch_id, { 'phase': 'analysis', 'album_bundle_state': 'staged', + 'album_bundle_partial': bool(outcome.get('partial')), + 'album_bundle_expected_count': outcome.get('expected_count'), + 'album_bundle_completed_count': outcome.get('completed_count', len(outcome.get('files', []))), }) # Engaged-and-succeeded: we DON'T early-return because the # per-track flow needs to run to create + complete the per-track diff --git a/core/downloads/master.py b/core/downloads/master.py index 90adbb4e..83842f17 100644 --- a/core/downloads/master.py +++ b/core/downloads/master.py @@ -50,6 +50,7 @@ _VARIANT_WORDS = { 'remix', 'rmx', 'acapella', 'a cappella', 'instrumental', 'karaoke', 'live', 'demo', 'extended', } +_ALBUM_BUNDLE_SOURCES = frozenset(('torrent', 'usenet', 'soulseek')) def _norm_text(value: Any) -> str: @@ -245,6 +246,29 @@ def _soulseek_album_preflight_enabled(config_manager: Any) -> bool: return primary == 'soulseek' +def _resolve_album_bundle_source(config_manager: Any) -> str: + """Return the album-bundle source for this batch. + + In single-source mode, the active source may own the whole album if + it supports album bundles. In hybrid mode, only the first source in + the configured order may claim the whole album; later sources remain + per-track fallback. + """ + mode = (config_manager.get('download_source.mode', 'soulseek') or 'soulseek').lower() + if mode in _ALBUM_BUNDLE_SOURCES: + return mode + if mode != 'hybrid': + return '' + + order = config_manager.get('download_source.hybrid_order', ['hifi', 'youtube', 'soulseek']) + first = '' + if order: + first = str(order[0] or '').lower() + else: + first = str(config_manager.get('download_source.hybrid_primary', '') or '').lower() + return first if first in _ALBUM_BUNDLE_SOURCES else '' + + @dataclass class MasterDeps: """Bundle of cross-cutting deps the master worker needs.""" @@ -338,16 +362,19 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma # should stop (gate fired and failed); False = engaged-and- # succeeded OR didn't engage, both fall through to per-track. _bundle_state = _BatchStateAccessImpl() - if _album_bundle_dispatch.try_dispatch( - batch_id=batch_id, - is_album=batch_is_album, - album_context=batch_album_context, - artist_context=batch_artist_context, - config_get=deps.config_manager.get, - plugin_resolver=deps.download_orchestrator.client, - state=_bundle_state, - ): - return + _album_bundle_source = _resolve_album_bundle_source(deps.config_manager) + if _album_bundle_source and _album_bundle_source != 'soulseek': + if _album_bundle_dispatch.try_dispatch( + batch_id=batch_id, + is_album=batch_is_album, + album_context=batch_album_context, + artist_context=batch_artist_context, + config_get=deps.config_manager.get, + plugin_resolver=deps.download_orchestrator.client, + state=_bundle_state, + source_override=_album_bundle_source, + ): + return # Allow duplicate tracks across albums — when enabled, only skip tracks already # owned in THIS album, not tracks owned in other albums @@ -640,6 +667,7 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma batch_album_context = batch.get('album_context') batch_artist_context = batch.get('artist_context') batch_is_album = batch.get('is_album_download', False) + batch_private_album_bundle = bool(batch.get('album_bundle_private_staging')) batch_playlist_folder_mode = batch.get('playlist_folder_mode', False) batch_playlist_name = batch.get('playlist_name', 'Unknown Playlist') @@ -648,7 +676,8 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma preflight_source = None preflight_tracks = None soulseek_is_source = _soulseek_album_preflight_enabled(deps.config_manager) - if batch_is_album and batch_album_context and batch_artist_context and soulseek_is_source: + if (batch_is_album and batch_album_context and batch_artist_context + and soulseek_is_source and not batch_private_album_bundle): artist_name = batch_artist_context.get('name', '') album_name = batch_album_context.get('name', '') if artist_name and album_name: @@ -761,13 +790,44 @@ def run_full_missing_tracks_process(batch_id, playlist_id, tracks_json, deps: Ma logger.error(f"[Album Pre-flight] Search failed (non-fatal, falling back to track-by-track): {preflight_err}") deps.source_reuse_logger.info(f"[Album Pre-flight] Exception: {preflight_err}") + # Soulseek album bundles run after analysis so an already-owned + # album does not get downloaded just because the source supports a + # whole-folder flow. When preflight selected a folder, pass that + # exact source into the bundle downloader so we keep the richer + # tracklist-aware scoring instead of doing a weaker second pick. + _bundle_state = _BatchStateAccessImpl() + _album_bundle_source = _resolve_album_bundle_source(deps.config_manager) + if _album_bundle_source == 'soulseek': + if _album_bundle_dispatch.try_dispatch( + batch_id=batch_id, + is_album=batch_is_album, + album_context=batch_album_context, + artist_context=batch_artist_context, + config_get=deps.config_manager.get, + plugin_resolver=deps.download_orchestrator.client, + state=_bundle_state, + source_override=_album_bundle_source, + plugin_kwargs={ + 'preferred_source': preflight_source, + 'preferred_tracks': preflight_tracks, + } if preflight_source and preflight_tracks else None, + ): + return + with tasks_lock: if batch_id not in download_batches: return download_batches[batch_id]['phase'] = 'downloading' # Store album pre-flight results on batch for source reuse - if preflight_source and preflight_tracks: + # unless the Soulseek album-bundle path already staged a private + # release. Task workers check source reuse before staging match, so + # preloading here would make the staged happy path re-download. + if ( + preflight_source + and preflight_tracks + and not download_batches[batch_id].get('album_bundle_private_staging') + ): download_batches[batch_id]['last_good_source'] = preflight_source download_batches[batch_id]['source_folder_tracks'] = preflight_tracks download_batches[batch_id]['failed_sources'] = set() diff --git a/core/downloads/task_worker.py b/core/downloads/task_worker.py index 67b2a3fe..77cb803f 100644 --- a/core/downloads/task_worker.py +++ b/core/downloads/task_worker.py @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) def _private_album_bundle_staging_miss_reason(batch_id: Optional[str], deps: Any) -> Optional[str]: """Return a user-facing miss reason when per-track search should stop. - Torrent / usenet album batches first download one private staged release, + Torrent / usenet / Soulseek album batches first download one private staged release, then each track claims the matching staged file. If that claim fails after the release is already staged, falling through to the normal per-track search only retries release-level sources N times and can keep re-adding @@ -49,11 +49,19 @@ def _private_album_bundle_staging_miss_reason(batch_id: Optional[str], deps: Any source = (batch.get('album_bundle_source') or '').lower() mode = (getattr(deps.download_orchestrator, 'mode', '') or '').lower() + hybrid_first = '' + if mode == 'hybrid': + order = getattr(deps.download_orchestrator, 'hybrid_order', None) or [] + if order: + hybrid_first = str(order[0] or '').lower() + else: + hybrid_first = str(getattr(deps.download_orchestrator, 'hybrid_primary', '') or '').lower() if ( batch.get('album_bundle_private_staging') and batch.get('album_bundle_state') == 'staged' - and source in ('torrent', 'usenet') - and mode == source + and not batch.get('album_bundle_partial') + and source in ('torrent', 'usenet', 'soulseek') + and (mode == source or (mode == 'hybrid' and hybrid_first == source)) ): return f'Track was not found in the staged {source} album release' diff --git a/core/soulseek_client.py b/core/soulseek_client.py index 71b20345..8a7f82dd 100644 --- a/core/soulseek_client.py +++ b/core/soulseek_client.py @@ -18,7 +18,13 @@ from core.download_plugins.types import ( SearchResult, TrackResult, ) +from core.download_plugins.album_bundle import ( + copy_audio_files_atomically, + get_poll_interval, + get_poll_timeout, +) from core.download_plugins.base import DownloadSourcePlugin +from utils.async_helpers import run_async logger = get_logger("soulseek_client") @@ -1456,6 +1462,326 @@ class SoulseekClient(DownloadSourcePlugin): logger.info(f"Downloading: {best_result.filename} ({quality_info}) from {best_result.username}") return await self.download(best_result.username, best_result.filename, best_result.size) + + def download_album_to_staging( + self, + album_name: str, + artist_name: str, + staging_dir: str, + progress_callback=None, + *, + preferred_source: Optional[Dict[str, Any]] = None, + preferred_tracks: Optional[List[TrackResult]] = None, + ) -> Dict[str, Any]: + """One-shot Soulseek album download. + + Search for one album folder, enqueue files from that single + ``username + folder_path``, wait for slskd to report completion, + then copy completed files into the private album-bundle staging + directory. If the folder cannot be selected or enqueued cleanly, + callers may fall back to the existing per-track Soulseek flow. + Once files are staged, the per-track staging matcher owns final + import, same as torrent / usenet album bundles. + """ + result: Dict[str, Any] = { + 'success': False, + 'files': [], + 'error': None, + 'fallback': True, + 'partial': False, + } + if not self.is_configured(): + result['error'] = 'Soulseek source not configured' + return result + + def _emit(state: str, **extra) -> None: + if progress_callback: + try: + progress_callback({'state': state, **extra}) + except Exception as cb_exc: + logger.debug("[Soulseek album] progress callback failed: %s", cb_exc) + + picked = None + folder_tracks = list(preferred_tracks or []) + username = (preferred_source or {}).get('username', '') if preferred_source else '' + folder_path = (preferred_source or {}).get('folder_path', '') if preferred_source else '' + if username and folder_path: + logger.info( + "[Soulseek album] Using preflight-selected folder %s:%s", + username, + folder_path, + ) + _emit('searching', query=f"{artist_name} {album_name}".strip(), release=folder_path) + else: + query = f"{artist_name} {album_name}".strip() + _emit('searching', query=query) + try: + _, albums = run_async(self.search(query, timeout=30)) + except Exception as exc: + result['error'] = f'Soulseek album search failed: {exc}' + return result + + if not albums: + result['error'] = 'No complete Soulseek album folders found' + return result + + picked = self._pick_album_bundle_folder(albums, album_name, artist_name) + if picked is None: + result['error'] = 'No suitable Soulseek album folder after filtering' + return result + + folder_path = getattr(picked, 'album_path', '') or '' + username = getattr(picked, 'username', '') or '' + if not username or not folder_path: + result['error'] = 'No suitable Soulseek album folder after filtering' + return result + + logger.info( + "[Soulseek album] Picked %s:%s (%s tracks, quality=%s)", + username, + folder_path, + getattr(picked, 'track_count', 0), + getattr(picked, 'dominant_quality', ''), + ) + _emit( + 'queued', + release=getattr(picked, 'album_title', folder_path) if picked else folder_path, + count=getattr(picked, 'track_count', 0) if picked else len(folder_tracks), + ) + + if not folder_tracks: + try: + browse_files = run_async(self.browse_user_directory(username, folder_path)) + except Exception as exc: + result['error'] = f'Soulseek folder browse failed: {exc}' + return result + + if not browse_files: + result['error'] = 'Could not browse selected Soulseek album folder' + return result + + folder_tracks = self.parse_browse_results_to_tracks( + username, + browse_files, + directory=folder_path, + ) + folder_tracks = self.filter_results_by_quality_preference(folder_tracks) + if not folder_tracks: + result['error'] = 'Selected Soulseek album folder contained no audio files' + return result + + transfer_keys: Dict[tuple, TrackResult] = {} + _emit( + 'downloading', + release=getattr(picked, 'album_title', folder_path) if picked else folder_path, + count=len(folder_tracks), + ) + for track in folder_tracks: + try: + download_id = run_async(self.download(track.username, track.filename, track.size)) + except Exception as exc: + logger.warning("[Soulseek album] Failed to enqueue %s: %s", track.filename, exc) + continue + if download_id: + transfer_keys[(track.username, track.filename)] = track + + if not transfer_keys: + result['error'] = 'No Soulseek album files could be enqueued' + return result + + result['fallback'] = False + completed = self._poll_album_bundle_downloads(transfer_keys, _emit) + if not completed: + result['error'] = 'Soulseek album download failed or timed out' + return result + + _emit('staging', release=getattr(picked, 'album_title', folder_path) if picked else folder_path) + copied = copy_audio_files_atomically(completed, Path(staging_dir)) + if not copied: + result['error'] = 'No Soulseek album files copied to staging' + return result + + partial = len(copied) < len(transfer_keys) + if partial: + logger.warning( + "[Soulseek album] Staged partial album for '%s': %d/%d files", + album_name, + len(copied), + len(transfer_keys), + ) + else: + logger.info("[Soulseek album] Staged %d files for '%s'", len(copied), album_name) + _emit('staged', count=len(copied)) + result['success'] = True + result['files'] = copied + result['partial'] = partial + result['expected_count'] = len(transfer_keys) + result['completed_count'] = len(copied) + return result + + def _pick_album_bundle_folder( + self, + albums: List[AlbumResult], + album_name: str, + artist_name: str, + ) -> Optional[AlbumResult]: + scored = [] + for album in albums: + tracks = self.filter_results_by_quality_preference(list(getattr(album, 'tracks', []) or [])) + if not tracks: + continue + album_text = f"{getattr(album, 'album_title', '')} {getattr(album, 'album_path', '')}" + artist_text = f"{getattr(album, 'artist', '')} {getattr(album, 'album_path', '')}" + album_score = self._bundle_similarity(album_name, album_text) + artist_score = self._bundle_similarity(artist_name, artist_text) + track_count = int(getattr(album, 'track_count', 0) or len(tracks)) + count_score = 1.0 if track_count >= 3 else 0.35 + score = ( + album_score * 0.42 + + artist_score * 0.22 + + count_score * 0.12 + + min(1.0, len(tracks) / max(1, track_count)) * 0.12 + + float(getattr(album, 'quality_score', 0.0) or 0.0) * 0.12 + ) + scored.append((score, len(tracks), album)) + if not scored: + return None + scored.sort(key=lambda row: (row[0], row[1], getattr(row[2], 'quality_score', 0.0)), reverse=True) + best_score, _, best = scored[0] + if best_score < 0.58: + logger.warning("[Soulseek album] Best folder score %.3f below threshold", best_score) + return None + return best + + @staticmethod + def _bundle_similarity(expected: Any, actual: Any) -> float: + import re + from difflib import SequenceMatcher + left = re.sub(r'[^a-z0-9]+', ' ', str(expected or '').lower()).strip() + right = re.sub(r'[^a-z0-9]+', ' ', str(actual or '').lower()).strip() + if not left or not right: + return 0.0 + if left == right: + return 1.0 + left_words = set(left.split()) + right_words = set(right.split()) + if left_words and left_words.issubset(right_words): + return 0.92 + if right_words and right_words.issubset(left_words): + return 0.86 + if left in right or right in left: + return min(len(left), len(right)) / max(len(left), len(right)) + return SequenceMatcher(None, left, right).ratio() + + def _poll_album_bundle_downloads(self, transfer_keys: Dict[tuple, TrackResult], emit) -> List[Path]: + deadline = time.monotonic() + get_poll_timeout() + interval = get_poll_interval() + completed_paths: Dict[tuple, Path] = {} + failed_states: Dict[tuple, str] = {} + while time.monotonic() < deadline: + try: + downloads = run_async(self.get_all_downloads()) + except Exception as exc: + logger.warning("[Soulseek album] Poll error: %s", exc) + downloads = [] + + by_key = {} + for dl in downloads: + exact_key = (dl.username, dl.filename) + by_key[exact_key] = dl + basename_key = ( + dl.username, + os.path.basename((dl.filename or '').replace('\\', '/')), + ) + by_key.setdefault(basename_key, dl) + for key, track in transfer_keys.items(): + if key in completed_paths or key in failed_states: + continue + dl = by_key.get(key) or by_key.get(( + key[0], + os.path.basename((key[1] or '').replace('\\', '/')), + )) + state = (getattr(dl, 'state', '') or '') if dl else '' + if any(token in state for token in ('Errored', 'Failed', 'Rejected', 'TimedOut')): + failed_states[key] = state or 'Failed' + logger.warning( + "[Soulseek album] Transfer failed from selected folder: %s (%s)", + os.path.basename((track.filename or '').replace('\\', '/')), + failed_states[key], + ) + continue + if dl and ('Completed' in state or 'Succeeded' in state): + if dl.size and dl.transferred and dl.transferred < dl.size: + continue + path = self._resolve_downloaded_album_file(track.filename) + if path: + completed_paths[key] = path + else: + logger.debug( + "[Soulseek album] Transfer completed but local file not found yet: %s", + track.filename, + ) + emit( + 'downloading', + progress=round(len(completed_paths) / max(1, len(transfer_keys)) * 100, 1), + count=len(completed_paths), + failed=len(failed_states), + ) + if completed_paths and len(completed_paths) + len(failed_states) == len(transfer_keys): + logger.warning( + "[Soulseek album] Selected folder finished with %d completed and %d failed transfer(s)", + len(completed_paths), + len(failed_states), + ) + return list(completed_paths.values()) + if not completed_paths and len(failed_states) == len(transfer_keys): + logger.warning("[Soulseek album] All %d transfer(s) failed from selected folder", len(failed_states)) + return [] + if len(completed_paths) == len(transfer_keys): + return list(completed_paths.values()) + time.sleep(interval) + pending = len(transfer_keys) - len(completed_paths) - len(failed_states) + if completed_paths: + logger.warning( + "[Soulseek album] Timed out with partial album: %d completed, %d failed, %d pending", + len(completed_paths), + len(failed_states), + pending, + ) + return list(completed_paths.values()) + logger.error( + "[Soulseek album] Timed out waiting for %d album files (%d failed, %d pending)", + len(transfer_keys), + len(failed_states), + pending, + ) + return [] + + def _resolve_downloaded_album_file(self, remote_filename: str) -> Optional[Path]: + basename = os.path.basename((remote_filename or '').replace('\\', '/')) + if not basename: + return None + candidates = [ + self.download_path / remote_filename, + self.download_path / basename, + ] + normalized_parts = [p for p in remote_filename.replace('\\', '/').split('/') if p] + if normalized_parts: + candidates.append(self.download_path.joinpath(*normalized_parts)) + for candidate in candidates: + try: + if candidate.exists() and candidate.is_file(): + return candidate + except OSError: + continue + try: + matches = list(self.download_path.rglob(basename)) + except OSError: + matches = [] + for match in matches: + if match.is_file(): + return match + return None async def check_connection(self) -> bool: """Check if slskd is running and connected to the Soulseek network""" diff --git a/tests/downloads/test_downloads_master.py b/tests/downloads/test_downloads_master.py index 80daa759..a1586028 100644 --- a/tests/downloads/test_downloads_master.py +++ b/tests/downloads/test_downloads_master.py @@ -132,6 +132,37 @@ class _FakeSoulseekWrapper: return self.soulseek if name == 'soulseek' else None +class _FakePluginWrapper: + def __init__(self, plugins): + self._plugins = dict(plugins) + + def client(self, name): + return self._plugins.get(name) + + +class _FakeAlbumBundleSoulseek: + def __init__(self, outcome=None): + self.calls = [] + self.outcome = outcome or {'success': True, 'files': ['/tmp/a.flac']} + + def download_album_to_staging(self, album, artist, staging, emit, **kwargs): + self.calls.append((album, artist, staging, kwargs)) + emit({'state': 'staged', 'count': len(self.outcome.get('files', []))}) + return self.outcome + + +class _FakePreflightAlbumBundleSoulseek(_FakeSoulseek): + def __init__(self, *args, outcome=None, **kwargs): + super().__init__(*args, **kwargs) + self.calls = [] + self.outcome = outcome or {'success': True, 'files': ['/tmp/a.flac']} + + def download_album_to_staging(self, album, artist, staging, emit, **kwargs): + self.calls.append((album, artist, staging, kwargs)) + emit({'state': 'staged', 'count': len(self.outcome.get('files', []))}) + return self.outcome + + class _FakeMonitor: def __init__(self): self.started = [] @@ -704,6 +735,133 @@ def test_soulseek_album_preflight_does_not_jump_ahead_of_hybrid_primary(monkeypa assert 'last_good_source' not in download_batches['B24'] +def test_soulseek_album_bundle_runs_after_missing_analysis(monkeypatch): + """Soulseek whole-folder bundles should engage only after analysis + has confirmed there is something missing.""" + db = _FakeDB() + monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db) + + plugin = _FakeAlbumBundleSoulseek() + deps = _build_deps( + config=_FakeConfig({'download_source.mode': 'soulseek'}), + soulseek=_FakeSoulseekWrapper(plugin), + ) + _seed_batch( + 'B25', + is_album_download=True, + album_context={'name': 'Test Album', 'total_tracks': 1}, + artist_context={'name': 'Artist'}, + ) + tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}] + + mw.run_full_missing_tracks_process('B25', 'album:1', tracks, deps) + + assert len(plugin.calls) == 1 + album, artist, staging, kwargs = plugin.calls[0] + assert (album, artist) == ('Test Album', 'Artist') + assert staging.replace('\\', '/').endswith('storage/album_bundle_staging/B25') + assert kwargs == {} + assert download_batches['B25']['album_bundle_source'] == 'soulseek' + assert download_batches['B25']['album_bundle_private_staging'] is True + assert download_batches['B25']['album_bundle_state'] == 'staged' + assert 'last_good_source' not in download_batches['B25'] + + +def test_hybrid_first_soulseek_uses_album_bundle(monkeypatch): + """Hybrid keeps fallback semantics, but the first source can own + album-bundle downloads when it supports them.""" + db = _FakeDB() + monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db) + + plugin = _FakeAlbumBundleSoulseek() + deps = _build_deps( + config=_FakeConfig({ + 'download_source.mode': 'hybrid', + 'download_source.hybrid_order': ['soulseek', 'hifi'], + }), + soulseek=_FakeSoulseekWrapper(plugin), + ) + _seed_batch( + 'B26', + is_album_download=True, + album_context={'name': 'Test Album', 'total_tracks': 1}, + artist_context={'name': 'Artist'}, + ) + tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}] + + mw.run_full_missing_tracks_process('B26', 'album:1', tracks, deps) + + assert len(plugin.calls) == 1 + assert download_batches['B26']['album_bundle_source'] == 'soulseek' + assert download_batches['B26']['album_bundle_private_staging'] is True + + +def test_soulseek_album_bundle_uses_preflight_source_without_preloading_reuse(monkeypatch): + """When the bundle path stages files, workers must claim staging + before any Soulseek source-reuse attempt can fire.""" + db = _FakeDB() + monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db) + + tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}] + folder_tracks = [_slsk_track('T1', 1, folder='Artist/Test Album')] + album = _album_result('peer', 'Artist/Test Album', 'Test Album', folder_tracks) + slsk = _FakePreflightAlbumBundleSoulseek( + album_results=[album], + browse_files=None, + parsed_tracks=folder_tracks, + ) + deps = _build_deps( + config=_FakeConfig({'download_source.mode': 'soulseek'}), + soulseek=_FakeSoulseekWrapper(slsk), + ) + _seed_batch( + 'B28', + is_album_download=True, + album_context={'name': 'Test Album', 'total_tracks': 1}, + artist_context={'name': 'Artist'}, + ) + + mw.run_full_missing_tracks_process('B28', 'album:1', tracks, deps) + + assert len(slsk.calls) == 1 + assert slsk.calls[0][3] == { + 'preferred_source': { + 'username': 'peer', + 'folder_path': 'Artist/Test Album', + }, + 'preferred_tracks': folder_tracks, + } + assert download_batches['B28']['album_bundle_private_staging'] is True + assert 'last_good_source' not in download_batches['B28'] + assert 'source_folder_tracks' not in download_batches['B28'] + + +def test_hybrid_first_torrent_uses_album_bundle_before_per_track(monkeypatch): + db = _FakeDB() + monkeypatch.setattr('database.music_database.MusicDatabase', lambda: db) + + plugin = _FakeAlbumBundleSoulseek() + deps = _build_deps( + config=_FakeConfig({ + 'download_source.mode': 'hybrid', + 'download_source.hybrid_order': ['torrent', 'soulseek'], + }), + soulseek=_FakePluginWrapper({'torrent': plugin}), + ) + _seed_batch( + 'B27', + is_album_download=True, + album_context={'name': 'Test Album', 'total_tracks': 1}, + artist_context={'name': 'Artist'}, + ) + tracks = [{'name': 'T1', 'artists': ['Artist'], 'track_number': 1}] + + mw.run_full_missing_tracks_process('B27', 'album:1', tracks, deps) + + assert len(plugin.calls) == 1 + assert download_batches['B27']['album_bundle_source'] == 'torrent' + + # --------------------------------------------------------------------------- # Task creation # --------------------------------------------------------------------------- diff --git a/tests/downloads/test_downloads_task_worker.py b/tests/downloads/test_downloads_task_worker.py index e5156b4a..a222b311 100644 --- a/tests/downloads/test_downloads_task_worker.py +++ b/tests/downloads/test_downloads_task_worker.py @@ -223,6 +223,92 @@ def test_private_torrent_album_staging_miss_skips_per_track_search(): assert ('done', ('b1', 't1', False), {}) in rec.calls +def test_private_soulseek_album_staging_miss_skips_per_track_search(): + _seed_task(track_info={ + 'id': 'sp-1', 'name': 'Song', 'artists': ['Artist'], + 'album': 'Album', 'duration_ms': 180000, + }) + download_batches['b1'] = { + 'album_bundle_private_staging': True, + 'album_bundle_state': 'staged', + 'album_bundle_source': 'soulseek', + } + client = _FakeClient(results=['should-not-search'], mode='soulseek') + rec = _Recorder() + deps, _ = _build_deps( + soulseek=client, + matching=_FakeMatchEngine(queries=['Artist Song']), + try_staging_match=lambda *a, **kw: False, + on_download_completed=rec('done'), + ) + + tw.download_track_worker('t1', 'b1', deps) + + assert client.search_calls == [] + assert download_tasks['t1']['status'] == 'not_found' + assert 'staged soulseek album release' in download_tasks['t1']['error_message'] + assert ('done', ('b1', 't1', False), {}) in rec.calls + + +def test_private_hybrid_first_soulseek_album_staging_miss_skips_per_track_search(): + _seed_task(track_info={ + 'id': 'sp-1', 'name': 'Song', 'artists': ['Artist'], + 'album': 'Album', 'duration_ms': 180000, + }) + download_batches['b1'] = { + 'album_bundle_private_staging': True, + 'album_bundle_state': 'staged', + 'album_bundle_source': 'soulseek', + } + client = _FakeClient( + results=['should-not-search'], + mode='hybrid', + subclients={'hybrid_order': ['soulseek', 'hifi']}, + ) + rec = _Recorder() + deps, _ = _build_deps( + soulseek=client, + matching=_FakeMatchEngine(queries=['Artist Song']), + try_staging_match=lambda *a, **kw: False, + on_download_completed=rec('done'), + ) + + tw.download_track_worker('t1', 'b1', deps) + + assert client.search_calls == [] + assert download_tasks['t1']['status'] == 'not_found' + assert 'staged soulseek album release' in download_tasks['t1']['error_message'] + + +def test_partial_private_hybrid_first_soulseek_album_staging_miss_allows_per_track_search(): + _seed_task(track_info={ + 'id': 'sp-1', 'name': 'Song', 'artists': ['Artist'], + 'album': 'Album', 'duration_ms': 180000, + }) + download_batches['b1'] = { + 'album_bundle_private_staging': True, + 'album_bundle_state': 'staged', + 'album_bundle_source': 'soulseek', + 'album_bundle_partial': True, + } + client = _FakeClient( + results=[], + mode='hybrid', + subclients={'hybrid_order': ['soulseek', 'hifi']}, + ) + deps, _ = _build_deps( + soulseek=client, + matching=_FakeMatchEngine(queries=['Artist Song']), + try_staging_match=lambda *a, **kw: False, + ) + + tw.download_track_worker('t1', 'b1', deps) + + assert client.search_calls + assert download_tasks['t1']['status'] == 'not_found' + assert 'staged soulseek album release' not in download_tasks['t1']['error_message'] + + # --------------------------------------------------------------------------- # Search loop happy path # --------------------------------------------------------------------------- diff --git a/tests/downloads/test_soulseek_pinning.py b/tests/downloads/test_soulseek_pinning.py index 0c7a8202..3ba3508b 100644 --- a/tests/downloads/test_soulseek_pinning.py +++ b/tests/downloads/test_soulseek_pinning.py @@ -28,6 +28,7 @@ from unittest.mock import AsyncMock, patch import pytest from core.soulseek_client import SoulseekClient +from core.download_plugins.types import AlbumResult, DownloadStatus, TrackResult def _run_async(coro): @@ -121,6 +122,153 @@ def test_download_extracts_id_from_dict_response(configured_client): assert result == 'abc123' +# --------------------------------------------------------------------------- +# album bundle +# --------------------------------------------------------------------------- + + +def _track(username='peer', filename='Artist/Album/01 - Song.flac', title='Song', number=1, size=10): + return TrackResult( + username=username, + filename=filename, + size=size, + bitrate=None, + duration=180000, + quality='flac', + free_upload_slots=1, + upload_speed=1_000_000, + queue_length=0, + artist='Artist', + title=title, + album='Album', + track_number=number, + ) + + +def test_album_bundle_stages_one_selected_soulseek_folder(configured_client, tmp_path): + configured_client.download_path = tmp_path + local_file = tmp_path / '01 - Song.flac' + local_file.write_bytes(b'audio') + track = _track(filename='Artist/Album/01 - Song.flac') + album = AlbumResult( + username='peer', + album_path='Artist/Album', + album_title='Album', + artist='Artist', + track_count=1, + total_size=10, + tracks=[track], + dominant_quality='flac', + free_upload_slots=1, + upload_speed=1_000_000, + queue_length=0, + ) + events = [] + + with patch.object(configured_client, 'search', AsyncMock(return_value=([], [album]))), \ + patch.object(configured_client, 'browse_user_directory', AsyncMock(return_value=[ + {'filename': '01 - Song.flac', 'size': 10} + ])), \ + patch.object(configured_client, 'filter_results_by_quality_preference', side_effect=lambda tracks: tracks), \ + patch.object(configured_client, 'download', AsyncMock(return_value='dl-1')) as download_mock, \ + patch.object(configured_client, 'get_all_downloads', AsyncMock(return_value=[ + DownloadStatus( + id='dl-1', + username='peer', + filename='Artist/Album/01 - Song.flac', + state='Completed, Succeeded', + progress=100, + size=10, + transferred=10, + speed=0, + ) + ])), \ + patch('core.soulseek_client.get_poll_timeout', return_value=1), \ + patch('core.soulseek_client.get_poll_interval', return_value=0.01): + outcome = configured_client.download_album_to_staging( + 'Album', + 'Artist', + str(tmp_path / 'staging'), + events.append, + ) + + assert outcome['success'] is True + assert outcome['fallback'] is False + assert len(outcome['files']) == 1 + assert Path(outcome['files'][0]).read_bytes() == b'audio' + download_mock.assert_awaited_once_with( + 'peer', + 'Artist/Album/01 - Song.flac', + 10, + ) + assert events[-1]['state'] == 'staged' + + +def test_album_bundle_stages_completed_files_when_same_source_partially_times_out(configured_client, tmp_path): + configured_client.download_path = tmp_path + (tmp_path / '01 - Ready.flac').write_bytes(b'audio') + ready = _track(filename='Artist/Album/01 - Ready.flac', title='Ready', number=1) + timed_out = _track(filename='Artist/Album/02 - Waiting.flac', title='Waiting', number=2) + events = [] + + with patch.object(configured_client, 'download', AsyncMock(side_effect=['dl-1', 'dl-2'])), \ + patch.object(configured_client, 'filter_results_by_quality_preference', side_effect=lambda tracks: tracks), \ + patch.object(configured_client, 'get_all_downloads', AsyncMock(return_value=[ + DownloadStatus( + id='dl-1', + username='peer', + filename='Artist/Album/01 - Ready.flac', + state='Completed, Succeeded', + progress=100, + size=10, + transferred=10, + speed=0, + ), + DownloadStatus( + id='dl-2', + username='peer', + filename='Artist/Album/02 - Waiting.flac', + state='TimedOut', + progress=0, + size=10, + transferred=0, + speed=0, + ), + ])), \ + patch('core.soulseek_client.get_poll_timeout', return_value=1), \ + patch('core.soulseek_client.get_poll_interval', return_value=0.01): + outcome = configured_client.download_album_to_staging( + 'Album', + 'Artist', + str(tmp_path / 'staging'), + events.append, + preferred_source={'username': 'peer', 'folder_path': 'Artist/Album'}, + preferred_tracks=[ready, timed_out], + ) + + assert outcome['success'] is True + assert outcome['fallback'] is False + assert len(outcome['files']) == 1 + assert Path(outcome['files'][0]).name == '01 - Ready.flac' + assert any(event.get('failed') == 1 for event in events) + + +def test_album_bundle_falls_back_when_no_album_folder(configured_client, tmp_path): + configured_client.download_path = tmp_path + with patch.object(configured_client, 'search', AsyncMock(return_value=([], []))), \ + patch.object(configured_client, 'download', AsyncMock(return_value='dl-1')) as dl: + outcome = configured_client.download_album_to_staging( + 'Missing Album', + 'Artist', + str(tmp_path / 'staging'), + ) + + assert outcome['success'] is False + assert outcome['fallback'] is True + assert 'No complete Soulseek album folders' in outcome['error'] + dl.assert_not_awaited() + + def test_download_extracts_id_from_list_response(configured_client): """Pinning: slskd sometimes returns a list of file objects. The first item's id is the download_id.""" diff --git a/tests/test_album_bundle_dispatch.py b/tests/test_album_bundle_dispatch.py index 68579f6d..5cafa065 100644 --- a/tests/test_album_bundle_dispatch.py +++ b/tests/test_album_bundle_dispatch.py @@ -56,18 +56,20 @@ def test_is_eligible_requires_album_flag() -> None: album_name='X', artist_name='Y') is False -def test_is_eligible_requires_torrent_or_usenet_mode() -> None: - for mode in ('soulseek', 'youtube', 'tidal', 'qobuz', 'hifi', +def test_is_eligible_requires_album_bundle_mode() -> None: + for mode in ('youtube', 'tidal', 'qobuz', 'hifi', 'deezer_dl', 'amazon', 'lidarr', 'soundcloud', 'hybrid'): assert is_eligible(mode=mode, is_album=True, album_name='X', artist_name='Y') is False -def test_is_eligible_accepts_torrent_and_usenet() -> None: +def test_is_eligible_accepts_torrent_usenet_and_soulseek() -> None: assert is_eligible(mode='torrent', is_album=True, album_name='X', artist_name='Y') is True assert is_eligible(mode='usenet', is_album=True, album_name='X', artist_name='Y') is True + assert is_eligible(mode='soulseek', is_album=True, + album_name='X', artist_name='Y') is True def test_is_eligible_requires_non_empty_names() -> None: @@ -103,13 +105,13 @@ def test_dispatch_returns_false_when_not_album() -> None: plugin.download_album_to_staging.assert_not_called() -def test_dispatch_returns_false_for_non_torrent_modes() -> None: +def test_dispatch_returns_false_for_non_album_bundle_modes() -> None: state = _FakeState() plugin = MagicMock() result = try_dispatch( batch_id='b1', is_album=True, album_context={'name': 'X'}, artist_context={'name': 'Y'}, - config_get=_config({'download_source.mode': 'soulseek'}), + config_get=_config({'download_source.mode': 'youtube'}), plugin_resolver=lambda _name: plugin, state=state, ) assert result is False @@ -234,6 +236,28 @@ def test_dispatch_failure_returns_true_so_master_stops() -> None: assert state.fields['phase'] == 'failed' +def test_dispatch_fallback_failure_returns_false_for_per_track_flow() -> None: + state = _FakeState() + plugin = MagicMock() + plugin.download_album_to_staging.return_value = { + 'success': False, + 'files': [], + 'error': 'No complete Soulseek album folders found', + 'fallback': True, + } + result = try_dispatch( + batch_id='b1', is_album=True, + album_context={'name': 'Album'}, artist_context={'name': 'Artist'}, + config_get=_config({'download_source.mode': 'soulseek'}), + plugin_resolver=lambda _name: plugin, state=state, + ) + assert result is False + assert state.failed_with == '' + assert state.fields['phase'] == 'analysis' + assert state.fields['album_bundle_state'] == 'fallback' + assert state.fields['album_bundle_error'] == 'No complete Soulseek album folders found' + + def test_dispatch_plugin_exception_treated_as_failure() -> None: """A bug / network error in the plugin must not propagate into the master worker — caught + treated as a normal failure so @@ -269,6 +293,25 @@ def test_dispatch_strips_whitespace_from_names() -> None: assert args.args[1] == 'Kendrick' +def test_dispatch_source_override_uses_first_hybrid_source() -> None: + state = _FakeState() + plugin = MagicMock() + plugin.download_album_to_staging.return_value = {'success': True, 'files': ['/x']} + seen = [] + + try_dispatch( + batch_id='b1', is_album=True, + album_context={'name': 'GNX'}, artist_context={'name': 'Kendrick Lamar'}, + config_get=_config({'download_source.mode': 'hybrid'}), + plugin_resolver=lambda name: seen.append(name) or plugin, + state=state, + source_override='soulseek', + ) + + assert seen == ['soulseek'] + assert state.fields['album_bundle_source'] == 'soulseek' + + def test_dispatch_progress_callback_mirrors_payload_to_state() -> None: """The progress callback the plugin gets must mirror its payload onto the batch state under ``album_bundle_*`` keys so