From f40c6d3b552f73aba268c8cca2baf6c95f9e86a2 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Mon, 4 May 2026 12:39:41 -0700 Subject: [PATCH] B1: Add DownloadEngine skeleton MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `core/download_engine/` package with the engine class that will own cross-source state, threading, search retry, rate-limits, and fallback chains. Orchestrator constructs an engine and registers each plugin with it. Phase B1 scope: skeleton only. Engine stores active_downloads records keyed by (source, download_id), provides thread-safe add/update/remove/iterate primitives, and holds plugin references for later phases. NOT on any code path yet — pure additive scaffolding so subsequent commits can introduce engine-driven behavior one piece at a time without a big-bang switchover. 15 new tests pin the engine's state-storage contract: shallow-copy reads, partial-patch updates, no-op-on-missing semantics, per-source iteration, id-only find, concurrent-add safety. Suite still 290 (download subset) green. Zero behavior change. --- core/download_engine/__init__.py | 29 ++++ core/download_engine/engine.py | 173 +++++++++++++++++++ core/download_orchestrator.py | 16 +- tests/downloads/test_download_engine.py | 219 ++++++++++++++++++++++++ 4 files changed, 436 insertions(+), 1 deletion(-) create mode 100644 core/download_engine/__init__.py create mode 100644 core/download_engine/engine.py create mode 100644 tests/downloads/test_download_engine.py diff --git a/core/download_engine/__init__.py b/core/download_engine/__init__.py new file mode 100644 index 00000000..1e945207 --- /dev/null +++ b/core/download_engine/__init__.py @@ -0,0 +1,29 @@ +"""Download Engine — central owner of cross-source download state, +thread workers, search retry, rate-limits, and fallback chains. + +This is the second leg of the multi-source download dispatcher +refactor (the first leg, ``core/download_plugins/``, defined the +contract). The engine takes ownership of everything that used to +be duplicated across the per-source clients (background thread +workers, active_downloads dicts, search retry ladders, quality +filtering, hybrid fallback). Clients become DUMB — just hit the +API for their source, manage their own auth state, and let the +engine drive everything else. + +This package is built up in phases (see +``docs/download-engine-refactor-plan.md`` for the full plan): + +- Phase B (current) — engine skeleton + state lift. +- Phase C — background download worker. +- Phase D — search retry + quality filter. +- Phase E — rate-limit pool. +- Phase F — fallback chain. + +Each phase is purely additive at first (engine grows, clients +unchanged). Migration to the new shape happens one source per +commit so behavior never breaks across the suite. +""" + +from core.download_engine.engine import DownloadEngine + +__all__ = ["DownloadEngine"] diff --git a/core/download_engine/engine.py b/core/download_engine/engine.py new file mode 100644 index 00000000..74cfebf7 --- /dev/null +++ b/core/download_engine/engine.py @@ -0,0 +1,173 @@ +"""DownloadEngine — central owner of cross-source download state. + +Phase B scope: skeleton only. The engine exposes a place for +plugins to register, a single ``active_downloads`` dict keyed by +``(source, download_id)``, and a ``state_lock`` that guards mutations +across the multi-threaded download worker pool. + +Subsequent phases bolt more capability on top: +- ``dispatch_download(plugin, target_id)`` (Phase C — replaces every + client's ``_download_thread_worker`` boilerplate). +- ``search(query, source_chain)`` (Phase D — replaces every client's + retry ladder + quality filter). +- ``rate_limit.acquire(source)`` (Phase E — replaces every client's + semaphore + last-download-timestamp dance). +- ``search_with_fallback`` / ``download_with_fallback`` (Phase F — + unifies hybrid mode across search and download). + +The engine is constructed by ``DownloadOrchestrator.__init__`` and +each plugin from the registry is registered with it. In Phase B +nothing in the existing code paths goes through the engine yet — +this commit is pure additive scaffolding so subsequent commits can +introduce engine-driven behavior one piece at a time without a +big-bang switchover. +""" + +from __future__ import annotations + +import threading +from typing import Any, Dict, Iterator, List, Optional, Tuple + +from utils.logging_config import get_logger + +logger = get_logger("download_engine") + + +# Type alias for the per-download state dict. Today's clients each +# define their own slightly-different shape (see Phase A pinning +# tests); the engine stores them as opaque dicts and the per-plugin +# accessor preserves the source-specific fields. +DownloadRecord = Dict[str, Any] + + +class DownloadEngine: + """Central state for every active download across every source. + + State is keyed by ``(source_name, download_id)`` so the same + UUID could hypothetically appear in two sources without + collision (in practice each source generates its own UUID4 + so collisions are negligible — the source qualifier exists + so the engine can answer "which plugin owns this download" in + O(1) without iterating every plugin). + + Thread safety: every state mutation goes through ``state_lock``. + Read-only accessors (``get_record``, ``iter_records_for_source``) + take the lock briefly and return a SHALLOW COPY so the caller + can iterate without holding the lock. Callers that need to + mutate a record should use ``update_record`` which takes the + lock and applies the patch atomically. + """ + + def __init__(self) -> None: + self.state_lock = threading.RLock() + # Composite key: (source_name, download_id) → record dict. + # RLock so a plugin's worker callback can re-enter while + # holding the lock for its own update. + self._records: Dict[Tuple[str, str], DownloadRecord] = {} + # Plugins that have registered with the engine. Source name + # → plugin instance. The engine itself doesn't use plugins + # until later phases, but holding the references here keeps + # plugin lookup local to the engine instead of forcing every + # caller to also touch the registry. + self._plugins: Dict[str, Any] = {} + + # ------------------------------------------------------------------ + # Plugin registration + # ------------------------------------------------------------------ + + def register_plugin(self, source_name: str, plugin: Any) -> None: + """Register a plugin under its canonical source name. Called + once per source by the orchestrator after the registry's + ``initialize`` builds the client instances. + + Phase B is purely informational — the engine doesn't yet + dispatch through plugins. Subsequent phases use these + references to call ``plugin._download_impl`` / + ``plugin._search_raw`` etc. + """ + if source_name in self._plugins: + logger.warning("Plugin %s already registered with engine — overwriting", source_name) + self._plugins[source_name] = plugin + + def get_plugin(self, source_name: str) -> Optional[Any]: + return self._plugins.get(source_name) + + def registered_sources(self) -> List[str]: + return list(self._plugins.keys()) + + # ------------------------------------------------------------------ + # Active-downloads state — Phase B core surface + # ------------------------------------------------------------------ + + def add_record(self, source_name: str, download_id: str, record: DownloadRecord) -> None: + """Insert a fresh download record. Used by clients (today + directly via their own dicts; Phase B2 routes them through + here).""" + with self.state_lock: + key = (source_name, download_id) + if key in self._records: + logger.warning("Replacing existing download record for %s/%s", source_name, download_id) + self._records[key] = dict(record) + + def update_record(self, source_name: str, download_id: str, patch: DownloadRecord) -> None: + """Apply a partial patch to an existing record. No-op if the + record was already removed (e.g. cancelled mid-update).""" + with self.state_lock: + existing = self._records.get((source_name, download_id)) + if existing is None: + return + existing.update(patch) + + def remove_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]: + """Delete a record (cancellation cleanup). Returns the + removed record or None if not found.""" + with self.state_lock: + return self._records.pop((source_name, download_id), None) + + def get_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]: + """Return a SHALLOW COPY of the record. Caller mutations + don't affect engine state — use ``update_record`` for that.""" + with self.state_lock: + record = self._records.get((source_name, download_id)) + return dict(record) if record is not None else None + + def iter_records_for_source(self, source_name: str) -> Iterator[DownloadRecord]: + """Yield SHALLOW COPIES of every record owned by a source. + Holds the lock briefly to snapshot, then yields outside the + lock so callers can spend arbitrary time on each record.""" + with self.state_lock: + snapshot = [ + dict(record) + for (source, _), record in self._records.items() + if source == source_name + ] + for record in snapshot: + yield record + + def iter_all_records(self) -> Iterator[Tuple[str, DownloadRecord]]: + """Yield ``(source_name, record_copy)`` for every active + download across every source. Used by Phase B3's unified + ``get_all_downloads`` query.""" + with self.state_lock: + snapshot = [ + (source, dict(record)) + for (source, _), record in self._records.items() + ] + for source, record in snapshot: + yield source, record + + def find_record(self, download_id: str) -> Optional[Tuple[str, DownloadRecord]]: + """Look up a record by download_id alone (no source hint). + Used by ``cancel_download`` / ``get_download_status`` API + endpoints that don't pass the source name. Returns + ``(source_name, record_copy)`` or None. + + O(N) over total downloads — fine for the tens-to-hundreds + of in-flight transfers SoulSync sees, would need an index + if downloads scaled to thousands. + """ + with self.state_lock: + for (source, dl_id), record in self._records.items(): + if dl_id == download_id: + return source, dict(record) + return None diff --git a/core/download_orchestrator.py b/core/download_orchestrator.py index 590a8c90..41f34eb3 100644 --- a/core/download_orchestrator.py +++ b/core/download_orchestrator.py @@ -26,6 +26,7 @@ from pathlib import Path from utils.logging_config import get_logger from config.settings import config_manager +from core.download_engine import DownloadEngine from core.download_plugins.registry import DownloadPluginRegistry, build_default_registry from core.soulseek_client import TrackResult, AlbumResult, DownloadStatus @@ -40,12 +41,17 @@ class DownloadOrchestrator: Routes requests to the appropriate client(s) based on configured mode. """ - def __init__(self, registry: Optional[DownloadPluginRegistry] = None): + def __init__(self, registry: Optional[DownloadPluginRegistry] = None, + engine: Optional[DownloadEngine] = None): """Initialize orchestrator with a plugin registry. Each plugin is built and registered independently — one failing plugin doesn't prevent others from working. The ``registry`` arg exists so tests can inject a registry with mock plugins; in production callers leave it None and get the default. + + ``engine`` is the cross-source state owner. Phase B introduces + it as a held reference; it isn't on any code path yet — Phase + C/D/E/F migrate behavior into it incrementally. """ self.registry = registry if registry is not None else build_default_registry() self.registry.initialize() @@ -64,6 +70,14 @@ class DownloadOrchestrator: self.lidarr = self.registry.get('lidarr') self.soundcloud = self.registry.get('soundcloud') + # Engine — owns cross-source state, threading, search retry, + # rate-limits, fallback. Built in subsequent phases. For Phase + # B it's just an empty registry of plugins so future phases + # can route through it without further orchestrator changes. + self.engine = engine if engine is not None else DownloadEngine() + for source_name, plugin in self.registry.all_plugins(): + self.engine.register_plugin(source_name, plugin) + if self._init_failures: logger.warning(f"Download clients failed to initialize: {', '.join(self._init_failures)}") diff --git a/tests/downloads/test_download_engine.py b/tests/downloads/test_download_engine.py new file mode 100644 index 00000000..e5ec49af --- /dev/null +++ b/tests/downloads/test_download_engine.py @@ -0,0 +1,219 @@ +"""Tests for the DownloadEngine skeleton (Phase B). + +Pinning the engine's state-storage contract: add/update/remove, +per-source iteration, find-by-id, plugin registration, lock-held +mutations vs lock-released reads. Future phases (C/D/E/F) bolt +behavior on top of this surface — these tests stay green and act +as the regression net while behavior moves in. +""" + +from __future__ import annotations + +import threading + +import pytest + +from core.download_engine import DownloadEngine + + +# --------------------------------------------------------------------------- +# Plugin registration +# --------------------------------------------------------------------------- + + +def test_register_plugin_stores_under_source_name(): + engine = DownloadEngine() + plugin = object() + engine.register_plugin('soulseek', plugin) + assert engine.get_plugin('soulseek') is plugin + assert 'soulseek' in engine.registered_sources() + + +def test_get_plugin_returns_none_for_unknown_source(): + engine = DownloadEngine() + assert engine.get_plugin('made_up') is None + + +def test_register_plugin_overwrites_on_duplicate(caplog): + """Re-registering under the same name overwrites and warns. Not a + common path but useful so test fixtures that build a fresh engine + can swap a mock plugin in without setup gymnastics.""" + engine = DownloadEngine() + first = object() + second = object() + engine.register_plugin('soulseek', first) + engine.register_plugin('soulseek', second) + assert engine.get_plugin('soulseek') is second + + +# --------------------------------------------------------------------------- +# Active-download state — add / get / update / remove +# --------------------------------------------------------------------------- + + +def test_add_record_inserts_under_composite_key(): + engine = DownloadEngine() + engine.add_record('youtube', 'dl-1', {'state': 'Initializing', 'progress': 0.0}) + + rec = engine.get_record('youtube', 'dl-1') + assert rec is not None + assert rec['state'] == 'Initializing' + assert rec['progress'] == 0.0 + + +def test_get_record_returns_shallow_copy(): + """Mutating the returned dict must NOT affect engine state. + Engine reads should be safe to hold / iterate without locks.""" + engine = DownloadEngine() + engine.add_record('youtube', 'dl-1', {'state': 'Initializing'}) + + rec = engine.get_record('youtube', 'dl-1') + rec['state'] = 'TamperedByCaller' + + # Engine state still has the original. + fresh = engine.get_record('youtube', 'dl-1') + assert fresh['state'] == 'Initializing' + + +def test_update_record_applies_partial_patch(): + engine = DownloadEngine() + engine.add_record('tidal', 'dl-2', {'state': 'Initializing', 'progress': 0.0, + 'file_path': None}) + + engine.update_record('tidal', 'dl-2', {'state': 'Completed, Succeeded', + 'progress': 100.0, + 'file_path': '/tmp/song.flac'}) + + rec = engine.get_record('tidal', 'dl-2') + assert rec['state'] == 'Completed, Succeeded' + assert rec['progress'] == 100.0 + assert rec['file_path'] == '/tmp/song.flac' + + +def test_update_record_is_noop_when_record_removed(): + """If a record was removed (e.g. user cancelled mid-download), + the worker thread's late update is silently dropped — never + raises. Mirrors the per-client `if download_id in active_downloads` + guard pattern that's all over the existing clients.""" + engine = DownloadEngine() + engine.add_record('tidal', 'dl-2', {'state': 'Initializing'}) + engine.remove_record('tidal', 'dl-2') + + # Should not raise. + engine.update_record('tidal', 'dl-2', {'state': 'Completed, Succeeded'}) + + assert engine.get_record('tidal', 'dl-2') is None + + +def test_remove_record_returns_removed_record(): + engine = DownloadEngine() + engine.add_record('qobuz', 'dl-3', {'state': 'InProgress'}) + + removed = engine.remove_record('qobuz', 'dl-3') + assert removed is not None + assert removed['state'] == 'InProgress' + assert engine.get_record('qobuz', 'dl-3') is None + + +def test_remove_record_returns_none_when_missing(): + engine = DownloadEngine() + assert engine.remove_record('qobuz', 'never-existed') is None + + +# --------------------------------------------------------------------------- +# Iteration +# --------------------------------------------------------------------------- + + +def test_iter_records_for_source_filters_correctly(): + engine = DownloadEngine() + engine.add_record('youtube', 'yt-1', {'title': 'A'}) + engine.add_record('youtube', 'yt-2', {'title': 'B'}) + engine.add_record('tidal', 'td-1', {'title': 'C'}) + + yt_records = list(engine.iter_records_for_source('youtube')) + assert len(yt_records) == 2 + assert {r['title'] for r in yt_records} == {'A', 'B'} + + td_records = list(engine.iter_records_for_source('tidal')) + assert len(td_records) == 1 + assert td_records[0]['title'] == 'C' + + +def test_iter_all_records_yields_source_paired_with_record(): + engine = DownloadEngine() + engine.add_record('youtube', 'yt-1', {'title': 'A'}) + engine.add_record('tidal', 'td-1', {'title': 'B'}) + + pairs = list(engine.iter_all_records()) + assert len(pairs) == 2 + sources = {source for source, _ in pairs} + titles = {record['title'] for _, record in pairs} + assert sources == {'youtube', 'tidal'} + assert titles == {'A', 'B'} + + +def test_iter_yields_shallow_copies(): + """Iteration returns COPIES — caller can hold the list and mutate + each record without affecting engine state. Important: future + Phase B3's `get_all_downloads` will iterate then build + DownloadStatus objects from the snapshots.""" + engine = DownloadEngine() + engine.add_record('youtube', 'yt-1', {'title': 'A'}) + + snapshot = list(engine.iter_records_for_source('youtube')) + snapshot[0]['title'] = 'TAMPERED' + + fresh = engine.get_record('youtube', 'yt-1') + assert fresh['title'] == 'A' + + +# --------------------------------------------------------------------------- +# find_record — id-only lookup +# --------------------------------------------------------------------------- + + +def test_find_record_returns_source_and_copy(): + engine = DownloadEngine() + engine.add_record('youtube', 'shared-id-shape', {'title': 'A'}) + + result = engine.find_record('shared-id-shape') + assert result is not None + source, record = result + assert source == 'youtube' + assert record['title'] == 'A' + + +def test_find_record_returns_none_for_unknown_id(): + engine = DownloadEngine() + engine.add_record('youtube', 'yt-1', {}) + assert engine.find_record('nonexistent') is None + + +# --------------------------------------------------------------------------- +# Thread safety — basic concurrent-mutation smoke +# --------------------------------------------------------------------------- + + +def test_concurrent_adds_dont_lose_records(): + """Hammer the engine with concurrent add_record from multiple + threads. With proper locking, every record lands in state. + Future Phase C BackgroundDownloadWorker spawns N threads doing + exactly this kind of mutation.""" + engine = DownloadEngine() + + def add_records(source, base): + for i in range(50): + engine.add_record(source, f'{base}-{i}', {'i': i}) + + threads = [ + threading.Thread(target=add_records, args=(f'src-{n}', f'dl-{n}')) + for n in range(4) + ] + for t in threads: + t.start() + for t in threads: + t.join() + + total = sum(1 for _ in engine.iter_all_records()) + assert total == 4 * 50 # 200 records, none lost