E1: Add RateLimitPolicy declaration mechanism

`core/download_engine/rate_limit.py` introduces a per-source
policy declaration: download_concurrency + download_delay_seconds.
Plugins declare via `RATE_LIMIT_POLICY` class attribute or a
`rate_limit_policy()` method.

Engine applies the declared policy to engine.worker at
register_plugin time — set_concurrency + set_delay get pushed
in automatically. Plugins without a declaration get the
conservative default (1 / 0). The set_engine callback fires
AFTER policy registration so config-driven sources (YouTube
reads user-tunable youtube.download_delay) can override.

Plan doc updated to reflect Phase D skip (search code is 90%
source-specific, not 60% — lifting it would be lossy or
bloated).

Pure additive — no plugin migrated yet. 8 tests pin the
resolution priority + engine wire-up + override semantics.
Suite still green (327 download tests).
pull/495/head
Broque Thomas 3 weeks ago
parent fdb3e44965
commit a3929b457b

@ -25,6 +25,7 @@ commit so behavior never breaks across the suite.
"""
from core.download_engine.engine import DownloadEngine
from core.download_engine.rate_limit import RateLimitPolicy
from core.download_engine.worker import BackgroundDownloadWorker
__all__ = ["DownloadEngine", "BackgroundDownloadWorker"]
__all__ = ["DownloadEngine", "BackgroundDownloadWorker", "RateLimitPolicy"]

@ -91,10 +91,25 @@ class DownloadEngine:
been migrated to the engine yet simply don't define
``set_engine`` they keep their pre-engine behavior
unchanged.
Also reads the plugin's declared ``RateLimitPolicy`` (via
the ``rate_limit_policy()`` method or ``RATE_LIMIT_POLICY``
class attribute) and applies it to the worker. Plugins that
don't declare a policy get the conservative default
(concurrency=1, delay=0).
"""
if source_name in self._plugins:
logger.warning("Plugin %s already registered with engine — overwriting", source_name)
self._plugins[source_name] = plugin
# Apply the plugin's rate-limit policy BEFORE set_engine so
# set_engine callbacks can override per-source if they need
# config-driven values (e.g. YouTube's user-tunable delay).
from core.download_engine.rate_limit import resolve_policy
policy = resolve_policy(plugin)
self.worker.set_concurrency(source_name, policy.download_concurrency)
self.worker.set_delay(source_name, policy.download_delay_seconds)
set_engine = getattr(plugin, 'set_engine', None)
if callable(set_engine):
try:

@ -0,0 +1,73 @@
"""Per-source rate-limit policy declarations.
Today's per-source download throttling is scattered:
- YouTube: ``self._download_delay = config_manager.get('youtube.download_delay', 3)``
set in ``__init__``, applied in ``set_engine`` via worker.set_delay.
- Qobuz: module-level ``_qobuz_api_lock`` + ``_QOBUZ_MIN_INTERVAL`` for
search-side throttling, no download-side throttle.
- Other sources: no explicit declarations default to 0s delay /
concurrency=1, which works because the streaming APIs have their
own gateway-level rate limits.
Phase E centralizes this into one place: each plugin declares a
``RateLimitPolicy`` (either as a class attribute or returned from a
``rate_limit_policy()`` method), and the engine reads + applies the
policy to ``engine.worker`` at ``register_plugin`` time.
Adding a new source = declaring its policy alongside the rest of
the source's auth/config — no longer a hidden line in __init__ or a
module-level constant in the client file.
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class RateLimitPolicy:
"""Per-source download throttling policy.
Attributes:
download_concurrency: Max number of concurrent downloads
from this source. Default 1 (serial). Most streaming
APIs prefer serial transfers because parallel just
trades rate-limit errors for thread overhead.
download_delay_seconds: Minimum gap between successive
downloads from this source. YouTube uses 3s today
(legacy ``_download_delay`` config key) to avoid
yt-dlp 429s. Most other sources use 0.
"""
download_concurrency: int = 1
download_delay_seconds: float = 0.0
# Sentinel default — most plugins want this. Plugins that need
# tighter throttling override by exposing ``RATE_LIMIT_POLICY`` as
# a class attribute or returning a custom one from
# ``rate_limit_policy()``.
DEFAULT_POLICY = RateLimitPolicy()
def resolve_policy(plugin) -> RateLimitPolicy:
"""Read a plugin's declared rate-limit policy. Checks (in order):
1. ``plugin.rate_limit_policy()`` method (returns a RateLimitPolicy)
2. ``plugin.RATE_LIMIT_POLICY`` class attribute
3. ``DEFAULT_POLICY``
"""
method = getattr(plugin, 'rate_limit_policy', None)
if callable(method):
try:
policy = method()
if isinstance(policy, RateLimitPolicy):
return policy
except Exception:
pass
declared = getattr(plugin, 'RATE_LIMIT_POLICY', None)
if isinstance(declared, RateLimitPolicy):
return declared
return DEFAULT_POLICY

@ -118,7 +118,11 @@ After Phase A: ~50 new tests pinning current behavior. We can refactor with conf
After Phase C: ~490 LOC of duplicated thread management deleted. Each affected client shrinks.
### Phase D — Search retry + quality filter lift
### Phase D — SKIPPED
**Original intent:** Lift search retry / query normalization / quality filter into engine. **Dropped after surveying actual per-source search code.** Search is 90% source-specific (slskd event subscription vs yt-dlp subprocess vs HTTP REST vs HLS quality map), not 60% like the original plan estimated. Lifting would be either lossy (force per-source quirks through a uniform interface) or bloated (adapter code bigger than the original). The shared portion is ~10 LOC per source — not worth a SearchOrchestrator. Per-source search stays per-source.
### Phase D (original — kept for reference, NOT executed)
**Commit D1:** New `core/download_engine/search.py``SearchOrchestrator`. Owns: query normalization, shortened-query retry ladder, quality filter, dedup. Calls `plugin.search_raw(query)` for the actual API hit.

@ -0,0 +1,105 @@
"""Tests for the per-source RateLimitPolicy declaration mechanism (Phase E1)."""
from __future__ import annotations
from core.download_engine import DownloadEngine, RateLimitPolicy
from core.download_engine.rate_limit import DEFAULT_POLICY, resolve_policy
# ---------------------------------------------------------------------------
# resolve_policy
# ---------------------------------------------------------------------------
def test_resolve_policy_returns_default_when_plugin_declares_nothing():
plugin = object()
assert resolve_policy(plugin) is DEFAULT_POLICY
def test_resolve_policy_reads_class_attribute():
class _Plugin:
RATE_LIMIT_POLICY = RateLimitPolicy(download_delay_seconds=5.0)
policy = resolve_policy(_Plugin())
assert policy.download_delay_seconds == 5.0
def test_resolve_policy_prefers_method_over_class_attribute():
class _Plugin:
RATE_LIMIT_POLICY = RateLimitPolicy(download_delay_seconds=1.0)
def rate_limit_policy(self):
return RateLimitPolicy(download_delay_seconds=10.0)
assert resolve_policy(_Plugin()).download_delay_seconds == 10.0
def test_resolve_policy_falls_back_to_default_when_method_returns_garbage():
class _Plugin:
def rate_limit_policy(self):
return "not a policy object"
assert resolve_policy(_Plugin()) is DEFAULT_POLICY
def test_resolve_policy_falls_back_to_default_when_method_raises():
class _Plugin:
def rate_limit_policy(self):
raise RuntimeError("boom")
assert resolve_policy(_Plugin()) is DEFAULT_POLICY
# ---------------------------------------------------------------------------
# Engine applies policy on register
# ---------------------------------------------------------------------------
def test_engine_applies_declared_policy_on_register():
"""Pinning: when a plugin is registered, its declared
RateLimitPolicy is pushed into the worker's per-source semaphore +
delay registry. Future dispatches use those values."""
class _ThrottledPlugin:
RATE_LIMIT_POLICY = RateLimitPolicy(download_concurrency=1, download_delay_seconds=2.5)
engine = DownloadEngine()
engine.register_plugin('throttled', _ThrottledPlugin())
assert engine.worker._get_delay('throttled') == 2.5
def test_engine_applies_default_policy_when_plugin_declares_nothing():
"""Plugins without a declaration get the conservative default
(concurrency=1, delay=0)."""
class _DefaultPlugin:
pass
engine = DownloadEngine()
engine.register_plugin('default', _DefaultPlugin())
assert engine.worker._get_delay('default') == 0.0
def test_set_engine_callback_runs_after_policy_applied():
"""Pinning: set_engine fires AFTER policy registration, so
config-driven sources can override their declared policy.
YouTube uses this set_engine reads the user-tunable
youtube.download_delay config and overrides the declared default."""
fired_at: list = []
class _Plugin:
RATE_LIMIT_POLICY = RateLimitPolicy(download_delay_seconds=1.0)
def set_engine(self, engine):
# Capture worker state at the moment set_engine fires.
fired_at.append(engine.worker._get_delay('flexible'))
# Then override.
engine.worker.set_delay('flexible', 99.0)
engine = DownloadEngine()
engine.register_plugin('flexible', _Plugin())
# The class-attribute value was applied first.
assert fired_at == [1.0]
# Then set_engine overrode it.
assert engine.worker._get_delay('flexible') == 99.0
Loading…
Cancel
Save