From a3929b457bb5bc10cbf68df1d6c53244c93effce Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Mon, 4 May 2026 14:38:20 -0700 Subject: [PATCH] E1: Add RateLimitPolicy declaration mechanism MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `core/download_engine/rate_limit.py` introduces a per-source policy declaration: download_concurrency + download_delay_seconds. Plugins declare via `RATE_LIMIT_POLICY` class attribute or a `rate_limit_policy()` method. Engine applies the declared policy to engine.worker at register_plugin time — set_concurrency + set_delay get pushed in automatically. Plugins without a declaration get the conservative default (1 / 0). The set_engine callback fires AFTER policy registration so config-driven sources (YouTube reads user-tunable youtube.download_delay) can override. Plan doc updated to reflect Phase D skip (search code is 90% source-specific, not 60% — lifting it would be lossy or bloated). Pure additive — no plugin migrated yet. 8 tests pin the resolution priority + engine wire-up + override semantics. Suite still green (327 download tests). --- core/download_engine/__init__.py | 3 +- core/download_engine/engine.py | 15 ++++ core/download_engine/rate_limit.py | 73 +++++++++++++++ docs/download-engine-refactor-plan.md | 6 +- tests/downloads/test_rate_limit_policy.py | 105 ++++++++++++++++++++++ 5 files changed, 200 insertions(+), 2 deletions(-) create mode 100644 core/download_engine/rate_limit.py create mode 100644 tests/downloads/test_rate_limit_policy.py diff --git a/core/download_engine/__init__.py b/core/download_engine/__init__.py index 4ebe7a01..f2ba73b0 100644 --- a/core/download_engine/__init__.py +++ b/core/download_engine/__init__.py @@ -25,6 +25,7 @@ commit so behavior never breaks across the suite. """ from core.download_engine.engine import DownloadEngine +from core.download_engine.rate_limit import RateLimitPolicy from core.download_engine.worker import BackgroundDownloadWorker -__all__ = ["DownloadEngine", "BackgroundDownloadWorker"] +__all__ = ["DownloadEngine", "BackgroundDownloadWorker", "RateLimitPolicy"] diff --git a/core/download_engine/engine.py b/core/download_engine/engine.py index 0deec668..d3dd31a6 100644 --- a/core/download_engine/engine.py +++ b/core/download_engine/engine.py @@ -91,10 +91,25 @@ class DownloadEngine: been migrated to the engine yet simply don't define ``set_engine`` — they keep their pre-engine behavior unchanged. + + Also reads the plugin's declared ``RateLimitPolicy`` (via + the ``rate_limit_policy()`` method or ``RATE_LIMIT_POLICY`` + class attribute) and applies it to the worker. Plugins that + don't declare a policy get the conservative default + (concurrency=1, delay=0). """ if source_name in self._plugins: logger.warning("Plugin %s already registered with engine — overwriting", source_name) self._plugins[source_name] = plugin + + # Apply the plugin's rate-limit policy BEFORE set_engine so + # set_engine callbacks can override per-source if they need + # config-driven values (e.g. YouTube's user-tunable delay). + from core.download_engine.rate_limit import resolve_policy + policy = resolve_policy(plugin) + self.worker.set_concurrency(source_name, policy.download_concurrency) + self.worker.set_delay(source_name, policy.download_delay_seconds) + set_engine = getattr(plugin, 'set_engine', None) if callable(set_engine): try: diff --git a/core/download_engine/rate_limit.py b/core/download_engine/rate_limit.py new file mode 100644 index 00000000..57b86642 --- /dev/null +++ b/core/download_engine/rate_limit.py @@ -0,0 +1,73 @@ +"""Per-source rate-limit policy declarations. + +Today's per-source download throttling is scattered: + +- YouTube: ``self._download_delay = config_manager.get('youtube.download_delay', 3)`` + set in ``__init__``, applied in ``set_engine`` via worker.set_delay. +- Qobuz: module-level ``_qobuz_api_lock`` + ``_QOBUZ_MIN_INTERVAL`` for + search-side throttling, no download-side throttle. +- Other sources: no explicit declarations — default to 0s delay / + concurrency=1, which works because the streaming APIs have their + own gateway-level rate limits. + +Phase E centralizes this into one place: each plugin declares a +``RateLimitPolicy`` (either as a class attribute or returned from a +``rate_limit_policy()`` method), and the engine reads + applies the +policy to ``engine.worker`` at ``register_plugin`` time. + +Adding a new source = declaring its policy alongside the rest of +the source's auth/config — no longer a hidden line in __init__ or a +module-level constant in the client file. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class RateLimitPolicy: + """Per-source download throttling policy. + + Attributes: + download_concurrency: Max number of concurrent downloads + from this source. Default 1 (serial). Most streaming + APIs prefer serial transfers because parallel just + trades rate-limit errors for thread overhead. + download_delay_seconds: Minimum gap between successive + downloads from this source. YouTube uses 3s today + (legacy ``_download_delay`` config key) to avoid + yt-dlp 429s. Most other sources use 0. + """ + + download_concurrency: int = 1 + download_delay_seconds: float = 0.0 + + +# Sentinel default — most plugins want this. Plugins that need +# tighter throttling override by exposing ``RATE_LIMIT_POLICY`` as +# a class attribute or returning a custom one from +# ``rate_limit_policy()``. +DEFAULT_POLICY = RateLimitPolicy() + + +def resolve_policy(plugin) -> RateLimitPolicy: + """Read a plugin's declared rate-limit policy. Checks (in order): + 1. ``plugin.rate_limit_policy()`` method (returns a RateLimitPolicy) + 2. ``plugin.RATE_LIMIT_POLICY`` class attribute + 3. ``DEFAULT_POLICY`` + """ + method = getattr(plugin, 'rate_limit_policy', None) + if callable(method): + try: + policy = method() + if isinstance(policy, RateLimitPolicy): + return policy + except Exception: + pass + + declared = getattr(plugin, 'RATE_LIMIT_POLICY', None) + if isinstance(declared, RateLimitPolicy): + return declared + + return DEFAULT_POLICY diff --git a/docs/download-engine-refactor-plan.md b/docs/download-engine-refactor-plan.md index 618adccf..5b45ac29 100644 --- a/docs/download-engine-refactor-plan.md +++ b/docs/download-engine-refactor-plan.md @@ -118,7 +118,11 @@ After Phase A: ~50 new tests pinning current behavior. We can refactor with conf After Phase C: ~490 LOC of duplicated thread management deleted. Each affected client shrinks. -### Phase D — Search retry + quality filter lift +### Phase D — SKIPPED + +**Original intent:** Lift search retry / query normalization / quality filter into engine. **Dropped after surveying actual per-source search code.** Search is 90% source-specific (slskd event subscription vs yt-dlp subprocess vs HTTP REST vs HLS quality map), not 60% like the original plan estimated. Lifting would be either lossy (force per-source quirks through a uniform interface) or bloated (adapter code bigger than the original). The shared portion is ~10 LOC per source — not worth a SearchOrchestrator. Per-source search stays per-source. + +### Phase D (original — kept for reference, NOT executed) **Commit D1:** New `core/download_engine/search.py` — `SearchOrchestrator`. Owns: query normalization, shortened-query retry ladder, quality filter, dedup. Calls `plugin.search_raw(query)` for the actual API hit. diff --git a/tests/downloads/test_rate_limit_policy.py b/tests/downloads/test_rate_limit_policy.py new file mode 100644 index 00000000..ec454952 --- /dev/null +++ b/tests/downloads/test_rate_limit_policy.py @@ -0,0 +1,105 @@ +"""Tests for the per-source RateLimitPolicy declaration mechanism (Phase E1).""" + +from __future__ import annotations + +from core.download_engine import DownloadEngine, RateLimitPolicy +from core.download_engine.rate_limit import DEFAULT_POLICY, resolve_policy + + +# --------------------------------------------------------------------------- +# resolve_policy +# --------------------------------------------------------------------------- + + +def test_resolve_policy_returns_default_when_plugin_declares_nothing(): + plugin = object() + assert resolve_policy(plugin) is DEFAULT_POLICY + + +def test_resolve_policy_reads_class_attribute(): + class _Plugin: + RATE_LIMIT_POLICY = RateLimitPolicy(download_delay_seconds=5.0) + + policy = resolve_policy(_Plugin()) + assert policy.download_delay_seconds == 5.0 + + +def test_resolve_policy_prefers_method_over_class_attribute(): + class _Plugin: + RATE_LIMIT_POLICY = RateLimitPolicy(download_delay_seconds=1.0) + + def rate_limit_policy(self): + return RateLimitPolicy(download_delay_seconds=10.0) + + assert resolve_policy(_Plugin()).download_delay_seconds == 10.0 + + +def test_resolve_policy_falls_back_to_default_when_method_returns_garbage(): + class _Plugin: + def rate_limit_policy(self): + return "not a policy object" + + assert resolve_policy(_Plugin()) is DEFAULT_POLICY + + +def test_resolve_policy_falls_back_to_default_when_method_raises(): + class _Plugin: + def rate_limit_policy(self): + raise RuntimeError("boom") + + assert resolve_policy(_Plugin()) is DEFAULT_POLICY + + +# --------------------------------------------------------------------------- +# Engine applies policy on register +# --------------------------------------------------------------------------- + + +def test_engine_applies_declared_policy_on_register(): + """Pinning: when a plugin is registered, its declared + RateLimitPolicy is pushed into the worker's per-source semaphore + + delay registry. Future dispatches use those values.""" + class _ThrottledPlugin: + RATE_LIMIT_POLICY = RateLimitPolicy(download_concurrency=1, download_delay_seconds=2.5) + + engine = DownloadEngine() + engine.register_plugin('throttled', _ThrottledPlugin()) + + assert engine.worker._get_delay('throttled') == 2.5 + + +def test_engine_applies_default_policy_when_plugin_declares_nothing(): + """Plugins without a declaration get the conservative default + (concurrency=1, delay=0).""" + class _DefaultPlugin: + pass + + engine = DownloadEngine() + engine.register_plugin('default', _DefaultPlugin()) + + assert engine.worker._get_delay('default') == 0.0 + + +def test_set_engine_callback_runs_after_policy_applied(): + """Pinning: set_engine fires AFTER policy registration, so + config-driven sources can override their declared policy. + YouTube uses this — set_engine reads the user-tunable + youtube.download_delay config and overrides the declared default.""" + fired_at: list = [] + + class _Plugin: + RATE_LIMIT_POLICY = RateLimitPolicy(download_delay_seconds=1.0) + + def set_engine(self, engine): + # Capture worker state at the moment set_engine fires. + fired_at.append(engine.worker._get_delay('flexible')) + # Then override. + engine.worker.set_delay('flexible', 99.0) + + engine = DownloadEngine() + engine.register_plugin('flexible', _Plugin()) + + # The class-attribute value was applied first. + assert fired_at == [1.0] + # Then set_engine overrode it. + assert engine.worker._get_delay('flexible') == 99.0