B1: Add DownloadEngine skeleton

`core/download_engine/` package with the engine class that will own
cross-source state, threading, search retry, rate-limits, and
fallback chains. Orchestrator constructs an engine and registers
each plugin with it.

Phase B1 scope: skeleton only. Engine stores active_downloads
records keyed by (source, download_id), provides thread-safe
add/update/remove/iterate primitives, and holds plugin references
for later phases. NOT on any code path yet — pure additive
scaffolding so subsequent commits can introduce engine-driven
behavior one piece at a time without a big-bang switchover.

15 new tests pin the engine's state-storage contract: shallow-copy
reads, partial-patch updates, no-op-on-missing semantics,
per-source iteration, id-only find, concurrent-add safety.

Suite still 290 (download subset) green. Zero behavior change.
pull/495/head
Broque Thomas 3 weeks ago
parent 4c2fd49df2
commit f40c6d3b55

@ -0,0 +1,29 @@
"""Download Engine — central owner of cross-source download state,
thread workers, search retry, rate-limits, and fallback chains.
This is the second leg of the multi-source download dispatcher
refactor (the first leg, ``core/download_plugins/``, defined the
contract). The engine takes ownership of everything that used to
be duplicated across the per-source clients (background thread
workers, active_downloads dicts, search retry ladders, quality
filtering, hybrid fallback). Clients become DUMB just hit the
API for their source, manage their own auth state, and let the
engine drive everything else.
This package is built up in phases (see
``docs/download-engine-refactor-plan.md`` for the full plan):
- Phase B (current) engine skeleton + state lift.
- Phase C background download worker.
- Phase D search retry + quality filter.
- Phase E rate-limit pool.
- Phase F fallback chain.
Each phase is purely additive at first (engine grows, clients
unchanged). Migration to the new shape happens one source per
commit so behavior never breaks across the suite.
"""
from core.download_engine.engine import DownloadEngine
__all__ = ["DownloadEngine"]

@ -0,0 +1,173 @@
"""DownloadEngine — central owner of cross-source download state.
Phase B scope: skeleton only. The engine exposes a place for
plugins to register, a single ``active_downloads`` dict keyed by
``(source, download_id)``, and a ``state_lock`` that guards mutations
across the multi-threaded download worker pool.
Subsequent phases bolt more capability on top:
- ``dispatch_download(plugin, target_id)`` (Phase C replaces every
client's ``_download_thread_worker`` boilerplate).
- ``search(query, source_chain)`` (Phase D replaces every client's
retry ladder + quality filter).
- ``rate_limit.acquire(source)`` (Phase E replaces every client's
semaphore + last-download-timestamp dance).
- ``search_with_fallback`` / ``download_with_fallback`` (Phase F
unifies hybrid mode across search and download).
The engine is constructed by ``DownloadOrchestrator.__init__`` and
each plugin from the registry is registered with it. In Phase B
nothing in the existing code paths goes through the engine yet
this commit is pure additive scaffolding so subsequent commits can
introduce engine-driven behavior one piece at a time without a
big-bang switchover.
"""
from __future__ import annotations
import threading
from typing import Any, Dict, Iterator, List, Optional, Tuple
from utils.logging_config import get_logger
logger = get_logger("download_engine")
# Type alias for the per-download state dict. Today's clients each
# define their own slightly-different shape (see Phase A pinning
# tests); the engine stores them as opaque dicts and the per-plugin
# accessor preserves the source-specific fields.
DownloadRecord = Dict[str, Any]
class DownloadEngine:
"""Central state for every active download across every source.
State is keyed by ``(source_name, download_id)`` so the same
UUID could hypothetically appear in two sources without
collision (in practice each source generates its own UUID4
so collisions are negligible the source qualifier exists
so the engine can answer "which plugin owns this download" in
O(1) without iterating every plugin).
Thread safety: every state mutation goes through ``state_lock``.
Read-only accessors (``get_record``, ``iter_records_for_source``)
take the lock briefly and return a SHALLOW COPY so the caller
can iterate without holding the lock. Callers that need to
mutate a record should use ``update_record`` which takes the
lock and applies the patch atomically.
"""
def __init__(self) -> None:
self.state_lock = threading.RLock()
# Composite key: (source_name, download_id) → record dict.
# RLock so a plugin's worker callback can re-enter while
# holding the lock for its own update.
self._records: Dict[Tuple[str, str], DownloadRecord] = {}
# Plugins that have registered with the engine. Source name
# → plugin instance. The engine itself doesn't use plugins
# until later phases, but holding the references here keeps
# plugin lookup local to the engine instead of forcing every
# caller to also touch the registry.
self._plugins: Dict[str, Any] = {}
# ------------------------------------------------------------------
# Plugin registration
# ------------------------------------------------------------------
def register_plugin(self, source_name: str, plugin: Any) -> None:
"""Register a plugin under its canonical source name. Called
once per source by the orchestrator after the registry's
``initialize`` builds the client instances.
Phase B is purely informational the engine doesn't yet
dispatch through plugins. Subsequent phases use these
references to call ``plugin._download_impl`` /
``plugin._search_raw`` etc.
"""
if source_name in self._plugins:
logger.warning("Plugin %s already registered with engine — overwriting", source_name)
self._plugins[source_name] = plugin
def get_plugin(self, source_name: str) -> Optional[Any]:
return self._plugins.get(source_name)
def registered_sources(self) -> List[str]:
return list(self._plugins.keys())
# ------------------------------------------------------------------
# Active-downloads state — Phase B core surface
# ------------------------------------------------------------------
def add_record(self, source_name: str, download_id: str, record: DownloadRecord) -> None:
"""Insert a fresh download record. Used by clients (today
directly via their own dicts; Phase B2 routes them through
here)."""
with self.state_lock:
key = (source_name, download_id)
if key in self._records:
logger.warning("Replacing existing download record for %s/%s", source_name, download_id)
self._records[key] = dict(record)
def update_record(self, source_name: str, download_id: str, patch: DownloadRecord) -> None:
"""Apply a partial patch to an existing record. No-op if the
record was already removed (e.g. cancelled mid-update)."""
with self.state_lock:
existing = self._records.get((source_name, download_id))
if existing is None:
return
existing.update(patch)
def remove_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]:
"""Delete a record (cancellation cleanup). Returns the
removed record or None if not found."""
with self.state_lock:
return self._records.pop((source_name, download_id), None)
def get_record(self, source_name: str, download_id: str) -> Optional[DownloadRecord]:
"""Return a SHALLOW COPY of the record. Caller mutations
don't affect engine state — use ``update_record`` for that."""
with self.state_lock:
record = self._records.get((source_name, download_id))
return dict(record) if record is not None else None
def iter_records_for_source(self, source_name: str) -> Iterator[DownloadRecord]:
"""Yield SHALLOW COPIES of every record owned by a source.
Holds the lock briefly to snapshot, then yields outside the
lock so callers can spend arbitrary time on each record."""
with self.state_lock:
snapshot = [
dict(record)
for (source, _), record in self._records.items()
if source == source_name
]
for record in snapshot:
yield record
def iter_all_records(self) -> Iterator[Tuple[str, DownloadRecord]]:
"""Yield ``(source_name, record_copy)`` for every active
download across every source. Used by Phase B3's unified
``get_all_downloads`` query."""
with self.state_lock:
snapshot = [
(source, dict(record))
for (source, _), record in self._records.items()
]
for source, record in snapshot:
yield source, record
def find_record(self, download_id: str) -> Optional[Tuple[str, DownloadRecord]]:
"""Look up a record by download_id alone (no source hint).
Used by ``cancel_download`` / ``get_download_status`` API
endpoints that don't pass the source name. Returns
``(source_name, record_copy)`` or None.
O(N) over total downloads fine for the tens-to-hundreds
of in-flight transfers SoulSync sees, would need an index
if downloads scaled to thousands.
"""
with self.state_lock:
for (source, dl_id), record in self._records.items():
if dl_id == download_id:
return source, dict(record)
return None

@ -26,6 +26,7 @@ from pathlib import Path
from utils.logging_config import get_logger
from config.settings import config_manager
from core.download_engine import DownloadEngine
from core.download_plugins.registry import DownloadPluginRegistry, build_default_registry
from core.soulseek_client import TrackResult, AlbumResult, DownloadStatus
@ -40,12 +41,17 @@ class DownloadOrchestrator:
Routes requests to the appropriate client(s) based on configured mode.
"""
def __init__(self, registry: Optional[DownloadPluginRegistry] = None):
def __init__(self, registry: Optional[DownloadPluginRegistry] = None,
engine: Optional[DownloadEngine] = None):
"""Initialize orchestrator with a plugin registry. Each plugin
is built and registered independently one failing plugin
doesn't prevent others from working. The ``registry`` arg
exists so tests can inject a registry with mock plugins; in
production callers leave it None and get the default.
``engine`` is the cross-source state owner. Phase B introduces
it as a held reference; it isn't on any code path yet — Phase
C/D/E/F migrate behavior into it incrementally.
"""
self.registry = registry if registry is not None else build_default_registry()
self.registry.initialize()
@ -64,6 +70,14 @@ class DownloadOrchestrator:
self.lidarr = self.registry.get('lidarr')
self.soundcloud = self.registry.get('soundcloud')
# Engine — owns cross-source state, threading, search retry,
# rate-limits, fallback. Built in subsequent phases. For Phase
# B it's just an empty registry of plugins so future phases
# can route through it without further orchestrator changes.
self.engine = engine if engine is not None else DownloadEngine()
for source_name, plugin in self.registry.all_plugins():
self.engine.register_plugin(source_name, plugin)
if self._init_failures:
logger.warning(f"Download clients failed to initialize: {', '.join(self._init_failures)}")

@ -0,0 +1,219 @@
"""Tests for the DownloadEngine skeleton (Phase B).
Pinning the engine's state-storage contract: add/update/remove,
per-source iteration, find-by-id, plugin registration, lock-held
mutations vs lock-released reads. Future phases (C/D/E/F) bolt
behavior on top of this surface these tests stay green and act
as the regression net while behavior moves in.
"""
from __future__ import annotations
import threading
import pytest
from core.download_engine import DownloadEngine
# ---------------------------------------------------------------------------
# Plugin registration
# ---------------------------------------------------------------------------
def test_register_plugin_stores_under_source_name():
engine = DownloadEngine()
plugin = object()
engine.register_plugin('soulseek', plugin)
assert engine.get_plugin('soulseek') is plugin
assert 'soulseek' in engine.registered_sources()
def test_get_plugin_returns_none_for_unknown_source():
engine = DownloadEngine()
assert engine.get_plugin('made_up') is None
def test_register_plugin_overwrites_on_duplicate(caplog):
"""Re-registering under the same name overwrites and warns. Not a
common path but useful so test fixtures that build a fresh engine
can swap a mock plugin in without setup gymnastics."""
engine = DownloadEngine()
first = object()
second = object()
engine.register_plugin('soulseek', first)
engine.register_plugin('soulseek', second)
assert engine.get_plugin('soulseek') is second
# ---------------------------------------------------------------------------
# Active-download state — add / get / update / remove
# ---------------------------------------------------------------------------
def test_add_record_inserts_under_composite_key():
engine = DownloadEngine()
engine.add_record('youtube', 'dl-1', {'state': 'Initializing', 'progress': 0.0})
rec = engine.get_record('youtube', 'dl-1')
assert rec is not None
assert rec['state'] == 'Initializing'
assert rec['progress'] == 0.0
def test_get_record_returns_shallow_copy():
"""Mutating the returned dict must NOT affect engine state.
Engine reads should be safe to hold / iterate without locks."""
engine = DownloadEngine()
engine.add_record('youtube', 'dl-1', {'state': 'Initializing'})
rec = engine.get_record('youtube', 'dl-1')
rec['state'] = 'TamperedByCaller'
# Engine state still has the original.
fresh = engine.get_record('youtube', 'dl-1')
assert fresh['state'] == 'Initializing'
def test_update_record_applies_partial_patch():
engine = DownloadEngine()
engine.add_record('tidal', 'dl-2', {'state': 'Initializing', 'progress': 0.0,
'file_path': None})
engine.update_record('tidal', 'dl-2', {'state': 'Completed, Succeeded',
'progress': 100.0,
'file_path': '/tmp/song.flac'})
rec = engine.get_record('tidal', 'dl-2')
assert rec['state'] == 'Completed, Succeeded'
assert rec['progress'] == 100.0
assert rec['file_path'] == '/tmp/song.flac'
def test_update_record_is_noop_when_record_removed():
"""If a record was removed (e.g. user cancelled mid-download),
the worker thread's late update is silently dropped — never
raises. Mirrors the per-client `if download_id in active_downloads`
guard pattern that's all over the existing clients."""
engine = DownloadEngine()
engine.add_record('tidal', 'dl-2', {'state': 'Initializing'})
engine.remove_record('tidal', 'dl-2')
# Should not raise.
engine.update_record('tidal', 'dl-2', {'state': 'Completed, Succeeded'})
assert engine.get_record('tidal', 'dl-2') is None
def test_remove_record_returns_removed_record():
engine = DownloadEngine()
engine.add_record('qobuz', 'dl-3', {'state': 'InProgress'})
removed = engine.remove_record('qobuz', 'dl-3')
assert removed is not None
assert removed['state'] == 'InProgress'
assert engine.get_record('qobuz', 'dl-3') is None
def test_remove_record_returns_none_when_missing():
engine = DownloadEngine()
assert engine.remove_record('qobuz', 'never-existed') is None
# ---------------------------------------------------------------------------
# Iteration
# ---------------------------------------------------------------------------
def test_iter_records_for_source_filters_correctly():
engine = DownloadEngine()
engine.add_record('youtube', 'yt-1', {'title': 'A'})
engine.add_record('youtube', 'yt-2', {'title': 'B'})
engine.add_record('tidal', 'td-1', {'title': 'C'})
yt_records = list(engine.iter_records_for_source('youtube'))
assert len(yt_records) == 2
assert {r['title'] for r in yt_records} == {'A', 'B'}
td_records = list(engine.iter_records_for_source('tidal'))
assert len(td_records) == 1
assert td_records[0]['title'] == 'C'
def test_iter_all_records_yields_source_paired_with_record():
engine = DownloadEngine()
engine.add_record('youtube', 'yt-1', {'title': 'A'})
engine.add_record('tidal', 'td-1', {'title': 'B'})
pairs = list(engine.iter_all_records())
assert len(pairs) == 2
sources = {source for source, _ in pairs}
titles = {record['title'] for _, record in pairs}
assert sources == {'youtube', 'tidal'}
assert titles == {'A', 'B'}
def test_iter_yields_shallow_copies():
"""Iteration returns COPIES — caller can hold the list and mutate
each record without affecting engine state. Important: future
Phase B3's `get_all_downloads` will iterate then build
DownloadStatus objects from the snapshots."""
engine = DownloadEngine()
engine.add_record('youtube', 'yt-1', {'title': 'A'})
snapshot = list(engine.iter_records_for_source('youtube'))
snapshot[0]['title'] = 'TAMPERED'
fresh = engine.get_record('youtube', 'yt-1')
assert fresh['title'] == 'A'
# ---------------------------------------------------------------------------
# find_record — id-only lookup
# ---------------------------------------------------------------------------
def test_find_record_returns_source_and_copy():
engine = DownloadEngine()
engine.add_record('youtube', 'shared-id-shape', {'title': 'A'})
result = engine.find_record('shared-id-shape')
assert result is not None
source, record = result
assert source == 'youtube'
assert record['title'] == 'A'
def test_find_record_returns_none_for_unknown_id():
engine = DownloadEngine()
engine.add_record('youtube', 'yt-1', {})
assert engine.find_record('nonexistent') is None
# ---------------------------------------------------------------------------
# Thread safety — basic concurrent-mutation smoke
# ---------------------------------------------------------------------------
def test_concurrent_adds_dont_lose_records():
"""Hammer the engine with concurrent add_record from multiple
threads. With proper locking, every record lands in state.
Future Phase C BackgroundDownloadWorker spawns N threads doing
exactly this kind of mutation."""
engine = DownloadEngine()
def add_records(source, base):
for i in range(50):
engine.add_record(source, f'{base}-{i}', {'i': i})
threads = [
threading.Thread(target=add_records, args=(f'src-{n}', f'dl-{n}'))
for n in range(4)
]
for t in threads:
t.start()
for t in threads:
t.join()
total = sum(1 for _ in engine.iter_all_records())
assert total == 4 * 50 # 200 records, none lost
Loading…
Cancel
Save