From badb5dd7de589400a2d228957c899d5d1dd35439 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Mon, 4 May 2026 12:42:15 -0700 Subject: [PATCH] B2: Engine owns cross-source query dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `DownloadEngine` grows async query methods that wrap plugin iteration: `get_all_downloads` (concatenates every plugin's active downloads), `get_download_status` (first plugin to recognize the id wins), `cancel_download` (with source-hint routing — streaming sources go direct, unknown hints route to Soulseek as peer username), and `clear_all_completed_downloads` (skips unconfigured plugins). Code moved from the orchestrator's hand-iterated loops into the engine. Orchestrator delegation comes in B3 — for B2 the engine methods exist but nothing calls them yet. Per-plugin behavior preserved verbatim (defensive `try ... except` swallows per-iteration, unconfigured-skip on clear, source-hint routing semantics). Phase A pinning tests + 8 new engine query tests catch any drift. Pure additive — zero behavior change for users. --- core/download_engine/engine.py | 101 ++++++++++++++ tests/downloads/test_download_engine.py | 170 ++++++++++++++++++++++++ 2 files changed, 271 insertions(+) diff --git a/core/download_engine/engine.py b/core/download_engine/engine.py index 74cfebf7..2ea4bf3c 100644 --- a/core/download_engine/engine.py +++ b/core/download_engine/engine.py @@ -171,3 +171,104 @@ class DownloadEngine: if dl_id == download_id: return source, dict(record) return None + + # ------------------------------------------------------------------ + # Cross-source query dispatch — Phase B2 surface + # ------------------------------------------------------------------ + # + # The orchestrator historically iterated every plugin in its own + # ``get_all_downloads`` / ``get_download_status`` / ``cancel_download`` + # methods (with hand-maintained client lists, before the registry + # came along). That iteration logic moves into the engine here so + # the orchestrator becomes a thin pass-through (Phase B3). + # + # In Phase B these methods iterate the registered plugins and call + # their existing ``get_all_downloads`` / ``cancel_download`` + # methods — same behavior as today, just in a new home. Phase C/D + # will replace plugin-iteration with direct engine-state queries + # once the thread worker is also lifted. + # + # All methods are async to match the per-plugin contract. + + async def get_all_downloads(self): + """Aggregated view across every registered plugin's active + downloads. Returns a flat list of DownloadStatus objects.""" + all_downloads = [] + for plugin in self._plugins.values(): + if plugin is None: + continue + try: + all_downloads.extend(await plugin.get_all_downloads()) + except Exception: + pass + return all_downloads + + async def get_download_status(self, download_id: str): + """Find a download_id across every plugin. Returns the first + plugin's response or None if no plugin owns it.""" + for plugin in self._plugins.values(): + if plugin is None: + continue + try: + status = await plugin.get_download_status(download_id) + if status: + return status + except Exception: + pass + return None + + async def cancel_download(self, download_id: str, + source_hint: Optional[str] = None, + remove: bool = False) -> bool: + """Cancel a download. ``source_hint`` is the source name (or + legacy username string like ``'deezer_dl'``) — when provided, + routes directly to that plugin. When omitted, every plugin + is asked in turn until one accepts the cancel.""" + # Direct routing when the caller knows the source. + if source_hint: + # Streaming source names ARE the username. Soulseek + # uses a real peer username (anything not in our plugin + # registry), so route those to the soulseek plugin. + target_plugin = self._plugins.get(source_hint) + if target_plugin is not None and source_hint != 'soulseek': + try: + return await target_plugin.cancel_download( + download_id, source_hint, remove, + ) + except Exception: + return False + soulseek = self._plugins.get('soulseek') + if soulseek is not None: + try: + return await soulseek.cancel_download(download_id, source_hint, remove) + except Exception: + return False + + # No hint → ask every plugin until one cancels successfully. + for plugin in self._plugins.values(): + if plugin is None: + continue + try: + if await plugin.cancel_download(download_id, source_hint, remove): + return True + except Exception: + pass + return False + + async def clear_all_completed_downloads(self) -> bool: + """Best-effort cleanup of every plugin's completed-downloads + list. Skips plugins that report not-configured (saves API + calls + log noise).""" + results = [] + for source_name, plugin in self._plugins.items(): + if plugin is None: + continue + if hasattr(plugin, 'is_configured') and not plugin.is_configured(): + logger.debug("Skipping %s clear_all_completed_downloads (not configured)", source_name) + continue + try: + results.append(await plugin.clear_all_completed_downloads()) + except Exception as exc: + logger.warning("%s clear_all_completed_downloads failed: %s", source_name, exc) + results.append(False) + return all(results) if results else True diff --git a/tests/downloads/test_download_engine.py b/tests/downloads/test_download_engine.py index e5ec49af..4597e9b0 100644 --- a/tests/downloads/test_download_engine.py +++ b/tests/downloads/test_download_engine.py @@ -217,3 +217,173 @@ def test_concurrent_adds_dont_lose_records(): total = sum(1 for _ in engine.iter_all_records()) assert total == 4 * 50 # 200 records, none lost + + +# --------------------------------------------------------------------------- +# Cross-source query dispatch (Phase B2) +# --------------------------------------------------------------------------- + + +def _run_async(coro): + import asyncio + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + +class _FakePlugin: + """Minimal plugin double for engine query tests. Exposes the + methods engine.get_all_downloads / get_download_status / + cancel_download / clear_all_completed_downloads call.""" + + def __init__(self, name, configured=True, downloads=None, + cancel_result=True, clear_result=True): + self.name = name + self._configured = configured + self._downloads = downloads or [] + self._cancel_result = cancel_result + self._clear_result = clear_result + self.cancel_calls = [] + self.clear_calls = 0 + + def is_configured(self): + return self._configured + + async def get_all_downloads(self): + return list(self._downloads) + + async def get_download_status(self, download_id): + for d in self._downloads: + if getattr(d, 'id', None) == download_id: + return d + return None + + async def cancel_download(self, download_id, source_hint, remove): + self.cancel_calls.append((download_id, source_hint, remove)) + return self._cancel_result + + async def clear_all_completed_downloads(self): + self.clear_calls += 1 + return self._clear_result + + +class _FakeStatus: + def __init__(self, id, source): + self.id = id + self.source = source + + +def test_engine_get_all_downloads_aggregates_across_plugins(): + """Engine concatenates every plugin's get_all_downloads output — + same behavior as the legacy orchestrator.""" + engine = DownloadEngine() + yt_plugin = _FakePlugin('youtube', downloads=[_FakeStatus('yt-1', 'youtube')]) + td_plugin = _FakePlugin('tidal', downloads=[_FakeStatus('td-1', 'tidal'), + _FakeStatus('td-2', 'tidal')]) + engine.register_plugin('youtube', yt_plugin) + engine.register_plugin('tidal', td_plugin) + + result = _run_async(engine.get_all_downloads()) + assert len(result) == 3 + assert {r.id for r in result} == {'yt-1', 'td-1', 'td-2'} + + +def test_engine_get_all_downloads_swallows_per_plugin_exceptions(): + """One plugin throwing must NOT take down the whole list — same + defensive behavior as the legacy orchestrator (matched by + `try ... except: pass` on every iteration).""" + engine = DownloadEngine() + + class _BrokenPlugin: + async def get_all_downloads(self): + raise RuntimeError("boom") + + yt_plugin = _FakePlugin('youtube', downloads=[_FakeStatus('yt-1', 'youtube')]) + engine.register_plugin('broken', _BrokenPlugin()) + engine.register_plugin('youtube', yt_plugin) + + result = _run_async(engine.get_all_downloads()) + assert [r.id for r in result] == ['yt-1'] + + +def test_engine_get_download_status_returns_first_match(): + engine = DownloadEngine() + yt_plugin = _FakePlugin('youtube', downloads=[_FakeStatus('shared', 'youtube')]) + td_plugin = _FakePlugin('tidal', downloads=[]) + engine.register_plugin('youtube', yt_plugin) + engine.register_plugin('tidal', td_plugin) + + result = _run_async(engine.get_download_status('shared')) + assert result is not None + assert result.id == 'shared' + + +def test_engine_cancel_routes_streaming_source_directly(): + """When source_hint is a known streaming-source name (not + 'soulseek'), engine routes the cancel to that specific plugin + only — doesn't ask every other plugin first.""" + engine = DownloadEngine() + yt_plugin = _FakePlugin('youtube') + td_plugin = _FakePlugin('tidal') + engine.register_plugin('youtube', yt_plugin) + engine.register_plugin('tidal', td_plugin) + + _run_async(engine.cancel_download('dl-1', 'tidal', remove=False)) + assert yt_plugin.cancel_calls == [] + assert td_plugin.cancel_calls == [('dl-1', 'tidal', False)] + + +def test_engine_cancel_routes_unknown_source_hint_to_soulseek(): + """A username that's NOT in the plugin registry is a real + Soulseek peer name — route to the soulseek plugin.""" + engine = DownloadEngine() + sl_plugin = _FakePlugin('soulseek') + yt_plugin = _FakePlugin('youtube') + engine.register_plugin('soulseek', sl_plugin) + engine.register_plugin('youtube', yt_plugin) + + _run_async(engine.cancel_download('dl-1', 'random_peer_username', remove=False)) + assert sl_plugin.cancel_calls == [('dl-1', 'random_peer_username', False)] + assert yt_plugin.cancel_calls == [] + + +def test_engine_cancel_falls_back_to_iterating_all_plugins_without_hint(): + """No source hint → ask every plugin until one accepts the + cancel (returns True). Mirrors legacy orchestrator behavior.""" + engine = DownloadEngine() + yt_plugin = _FakePlugin('youtube', cancel_result=False) + td_plugin = _FakePlugin('tidal', cancel_result=True) + engine.register_plugin('youtube', yt_plugin) + engine.register_plugin('tidal', td_plugin) + + result = _run_async(engine.cancel_download('dl-1', None, remove=False)) + assert result is True + # Both plugins were asked; tidal accepted. + assert len(yt_plugin.cancel_calls) == 1 + assert len(td_plugin.cancel_calls) == 1 + + +def test_engine_clear_all_skips_unconfigured_plugins(): + """Unconfigured plugins are silently skipped (no API call, no + error) — matches legacy orchestrator's defensive handling.""" + engine = DownloadEngine() + configured = _FakePlugin('youtube', configured=True, clear_result=True) + unconfigured = _FakePlugin('tidal', configured=False) + engine.register_plugin('youtube', configured) + engine.register_plugin('tidal', unconfigured) + + result = _run_async(engine.clear_all_completed_downloads()) + assert result is True + assert configured.clear_calls == 1 + assert unconfigured.clear_calls == 0 + + +def test_engine_clear_all_returns_false_when_any_configured_plugin_fails(): + engine = DownloadEngine() + failing = _FakePlugin('youtube', configured=True, clear_result=False) + engine.register_plugin('youtube', failing) + + result = _run_async(engine.clear_all_completed_downloads()) + assert result is False