mirror of https://github.com/Nezreka/SoulSync.git
The dashboard's enrichment-status bubbles (MusicBrainz, AudioDB,
Discogs, Deezer, Spotify, iTunes, Last.fm, Genius, Tidal, Qobuz) each
had its own copy-pasted /status, /pause, /resume route in web_server.py
— 30 routes that differed only in the worker reference and a couple
of per-service quirks (Spotify's rate-limit guard, Last.fm/Genius
yield-override behavior, Tidal/Qobuz extra status fields).
Replace them with a registry-driven blueprint:
- core/enrichment/services.py declares an EnrichmentService dataclass
with worker_getter, config_paused_key, pre_resume_check,
auto_pause_token, and extra_status_defaults — all variation captured
as data, no branching on service id.
- core/enrichment/api.py exposes a Flask blueprint with three routes
(/api/enrichment/<service>/{status,pause,resume}). Per-service
quirks are honored via the descriptor: Spotify's rate-limit ban
still returns 429 with `rate_limited: true`, Last.fm/Genius still
drop the auto-pause token and add the yield override, Tidal/Qobuz
still merge `authenticated: false` into the fallback payload.
- web_server.py registers all 10 services after their workers
initialize, wires the host-side hooks (config_manager.set,
_download_auto_paused.discard, _download_yield_override.add), and
registers the blueprint.
- webui/static/enrichment.js polling + click handlers now hit the
generic endpoints. The per-service `update<Service>StatusFromData`
functions are unchanged — they still process the same payload.
This is the cutover step. Old per-service routes are intentionally
left in place as a fallback during the soak period — they currently
have zero callers in the codebase and will be deleted in a follow-up
patch once production has run on the new pipeline for a few days.
27 new tests in tests/test_enrichment_services.py cover the registry
behavior + every quirk path through the generic blueprint (rate-limit
guard, auto-pause token cleanup, persisted-pause config keys, extra
default fields, worker-not-initialized fallback, exceptions). Full
suite 1541 passed; ruff clean.
pull/465/head
parent
30c506cd12
commit
98c04cf332
@ -0,0 +1,156 @@
|
||||
"""Generic Flask routes for enrichment-bubble status / pause / resume.
|
||||
|
||||
Replaces 30 near-identical per-service routes that web_server.py used
|
||||
to hand-roll. The blueprint reads the registry in ``core.enrichment.services``
|
||||
and dispatches:
|
||||
|
||||
GET /api/enrichment/<service_id>/status
|
||||
POST /api/enrichment/<service_id>/pause
|
||||
POST /api/enrichment/<service_id>/resume
|
||||
|
||||
A 404 is returned for unknown service ids. Per-service quirks (Spotify
|
||||
rate-limit guard, auto-pause token cleanup, persisted-pause config keys)
|
||||
are encoded as data on the ``EnrichmentService`` descriptor — there is
|
||||
no branching on service id inside this module.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from flask import Blueprint, jsonify
|
||||
|
||||
from core.enrichment.services import EnrichmentService, get_service
|
||||
from utils.logging_config import get_logger
|
||||
|
||||
|
||||
logger = get_logger("enrichment.api")
|
||||
|
||||
|
||||
# Hooks the host wires up so the blueprint can persist pause state and
|
||||
# clean up auto-pause / yield-override sets without circular imports.
|
||||
_config_set: Optional[Callable[[str, Any], None]] = None
|
||||
_auto_paused_discard: Optional[Callable[[str], None]] = None
|
||||
_yield_override_add: Optional[Callable[[str], None]] = None
|
||||
|
||||
|
||||
def configure(
|
||||
*,
|
||||
config_set: Optional[Callable[[str, Any], None]] = None,
|
||||
auto_paused_discard: Optional[Callable[[str], None]] = None,
|
||||
yield_override_add: Optional[Callable[[str], None]] = None,
|
||||
) -> None:
|
||||
"""Wire host-side mutators that the generic routes call after pause/resume.
|
||||
|
||||
Each is optional — pass None for hosts that don't have a corresponding
|
||||
mechanism (e.g. tests).
|
||||
"""
|
||||
global _config_set, _auto_paused_discard, _yield_override_add
|
||||
_config_set = config_set
|
||||
_auto_paused_discard = auto_paused_discard
|
||||
_yield_override_add = yield_override_add
|
||||
|
||||
|
||||
def _persist_paused(service: EnrichmentService, paused: bool) -> None:
|
||||
if not service.config_paused_key or _config_set is None:
|
||||
return
|
||||
try:
|
||||
_config_set(service.config_paused_key, paused)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Persisting pause flag for %s failed: %s", service.id, e
|
||||
)
|
||||
|
||||
|
||||
def _drop_auto_pause_marker(service: EnrichmentService) -> None:
|
||||
if service.auto_pause_token is None or _auto_paused_discard is None:
|
||||
return
|
||||
try:
|
||||
_auto_paused_discard(service.auto_pause_token)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _add_yield_override(service: EnrichmentService) -> None:
|
||||
if service.auto_pause_token is None or _yield_override_add is None:
|
||||
return
|
||||
try:
|
||||
_yield_override_add(service.auto_pause_token)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def create_blueprint() -> Blueprint:
|
||||
"""Build the Flask blueprint — call once during host startup."""
|
||||
bp = Blueprint('enrichment_api', __name__)
|
||||
|
||||
@bp.route('/api/enrichment/<service_id>/status', methods=['GET'])
|
||||
def enrichment_status(service_id: str):
|
||||
service = get_service(service_id)
|
||||
if service is None:
|
||||
return jsonify({'error': f'Unknown enrichment service: {service_id}'}), 404
|
||||
try:
|
||||
worker = service.get_worker()
|
||||
if worker is None:
|
||||
return jsonify(service.fallback_status()), 200
|
||||
return jsonify(worker.get_stats()), 200
|
||||
except Exception as e:
|
||||
logger.error("Error getting %s enrichment status: %s", service.id, e)
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
@bp.route('/api/enrichment/<service_id>/pause', methods=['POST'])
|
||||
def enrichment_pause(service_id: str):
|
||||
service = get_service(service_id)
|
||||
if service is None:
|
||||
return jsonify({'error': f'Unknown enrichment service: {service_id}'}), 404
|
||||
worker = service.get_worker()
|
||||
if worker is None:
|
||||
return jsonify({
|
||||
'error': f'{service.display_name} enrichment worker not initialized',
|
||||
}), 400
|
||||
try:
|
||||
worker.pause()
|
||||
_persist_paused(service, True)
|
||||
_drop_auto_pause_marker(service)
|
||||
logger.info("%s worker paused via UI", service.display_name)
|
||||
return jsonify({'status': 'paused'}), 200
|
||||
except Exception as e:
|
||||
logger.error("Error pausing %s worker: %s", service.id, e)
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
@bp.route('/api/enrichment/<service_id>/resume', methods=['POST'])
|
||||
def enrichment_resume(service_id: str):
|
||||
service = get_service(service_id)
|
||||
if service is None:
|
||||
return jsonify({'error': f'Unknown enrichment service: {service_id}'}), 404
|
||||
worker = service.get_worker()
|
||||
if worker is None:
|
||||
return jsonify({
|
||||
'error': f'{service.display_name} enrichment worker not initialized',
|
||||
}), 400
|
||||
# Pre-resume guard (e.g. Spotify rate-limit ban). Returns
|
||||
# (http_status, error_message) when blocking, None when ok.
|
||||
if service.pre_resume_check is not None:
|
||||
try:
|
||||
blocked = service.pre_resume_check()
|
||||
except Exception as e:
|
||||
logger.error("Pre-resume check for %s raised: %s", service.id, e)
|
||||
blocked = None
|
||||
if blocked is not None:
|
||||
http_status, message = blocked
|
||||
payload: dict = {'error': message}
|
||||
if http_status == 429:
|
||||
payload['rate_limited'] = True
|
||||
return jsonify(payload), http_status
|
||||
try:
|
||||
worker.resume()
|
||||
_persist_paused(service, False)
|
||||
_drop_auto_pause_marker(service)
|
||||
_add_yield_override(service)
|
||||
logger.info("%s worker resumed via UI", service.display_name)
|
||||
return jsonify({'status': 'running'}), 200
|
||||
except Exception as e:
|
||||
logger.error("Error resuming %s worker: %s", service.id, e)
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
return bp
|
||||
@ -0,0 +1,125 @@
|
||||
"""Registry of enrichment workers exposed via the dashboard bubble UI.
|
||||
|
||||
Every "bubble" on the dashboard (MusicBrainz, Spotify, iTunes, Last.fm,
|
||||
Genius, Deezer, Discogs, AudioDB, Tidal, Qobuz) used to have its own
|
||||
copy-pasted ``status`` / ``pause`` / ``resume`` Flask routes — 30 routes
|
||||
that differed only in the worker reference and a couple of per-service
|
||||
quirks. This module collapses them into a single ``EnrichmentService``
|
||||
descriptor + registry so the generic blueprint in ``core.enrichment.api``
|
||||
can drive every bubble from one place.
|
||||
|
||||
Hydrabase (P2P mirror) and SoulID (entity ID generation) are intentionally
|
||||
out of scope here — their workers report fundamentally different status
|
||||
shapes and don't share the bubble pause/resume contract.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
# Default status payload shape returned when a worker isn't initialized.
|
||||
# Mirrors the shape every per-service route used to inline before this
|
||||
# refactor; UI consumers depend on these exact keys.
|
||||
_DEFAULT_STATUS_FALLBACK: Dict[str, Any] = {
|
||||
'enabled': False,
|
||||
'running': False,
|
||||
'paused': False,
|
||||
'current_item': None,
|
||||
'stats': {'matched': 0, 'not_found': 0, 'pending': 0, 'errors': 0},
|
||||
'progress': {},
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnrichmentService:
|
||||
"""Descriptor for one enrichment worker exposed via the dashboard.
|
||||
|
||||
The dashboard talks to every worker through three identical-looking
|
||||
endpoints (status / pause / resume). The variation between services
|
||||
is captured here as data, not branching code:
|
||||
|
||||
- ``worker_getter`` returns the live worker reference (or None when
|
||||
initialization failed). Lazy so the registry can be defined before
|
||||
web_server.py finishes module-level imports.
|
||||
- ``config_paused_key`` is the ``config_manager`` key that persists
|
||||
the user's pause / resume choice across restarts. Empty string
|
||||
means "do not persist" (Hydrabase historically did this).
|
||||
- ``pre_resume_check`` runs before resume — return ``(http_status,
|
||||
error_message)`` to short-circuit (Spotify uses this for the
|
||||
rate-limit guard).
|
||||
- ``auto_pause_token`` matches an entry in
|
||||
``_download_auto_paused`` / ``_download_yield_override`` so the
|
||||
pause/resume routes can clean those up correctly. None means
|
||||
this service doesn't participate in the auto-pause-during-download
|
||||
mechanism.
|
||||
- ``extra_status_defaults`` is merged into the fallback status
|
||||
payload (Tidal / Qobuz add ``'authenticated': False``).
|
||||
"""
|
||||
|
||||
id: str
|
||||
display_name: str
|
||||
worker_getter: Callable[[], Any]
|
||||
config_paused_key: str = ''
|
||||
pre_resume_check: Optional[Callable[[], Optional[Tuple[int, str]]]] = None
|
||||
auto_pause_token: Optional[str] = None
|
||||
extra_status_defaults: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def get_worker(self) -> Any:
|
||||
"""Resolve the worker reference (None if init failed)."""
|
||||
try:
|
||||
return self.worker_getter()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def fallback_status(self) -> Dict[str, Any]:
|
||||
"""Return the shape we serve when the worker isn't initialized."""
|
||||
payload = dict(_DEFAULT_STATUS_FALLBACK)
|
||||
# stats dict is shared — copy so callers can't mutate the module
|
||||
# default.
|
||||
payload['stats'] = dict(_DEFAULT_STATUS_FALLBACK['stats'])
|
||||
if self.extra_status_defaults:
|
||||
payload.update(self.extra_status_defaults)
|
||||
return payload
|
||||
|
||||
|
||||
# Module-level registry. Populated by ``register_services`` so the host
|
||||
# (web_server.py) can wire its module-local worker globals + downstream
|
||||
# state collections (auto-pause sets, rate-limit guard) without circular
|
||||
# imports.
|
||||
_REGISTRY: Dict[str, EnrichmentService] = {}
|
||||
|
||||
|
||||
def register_services(services: List[EnrichmentService]) -> None:
|
||||
"""Replace the active service registry.
|
||||
|
||||
The host registers all services in one call after its workers are
|
||||
initialized. Re-registering is allowed (used by tests) — clears the
|
||||
previous set.
|
||||
"""
|
||||
_REGISTRY.clear()
|
||||
for svc in services:
|
||||
if not svc.id:
|
||||
raise ValueError("EnrichmentService.id must be non-empty")
|
||||
_REGISTRY[svc.id] = svc
|
||||
|
||||
|
||||
def get_service(service_id: str) -> Optional[EnrichmentService]:
|
||||
"""Return the registered service with this id, or None."""
|
||||
return _REGISTRY.get(service_id)
|
||||
|
||||
|
||||
def all_services() -> List[EnrichmentService]:
|
||||
"""Return every registered service in registration order."""
|
||||
return list(_REGISTRY.values())
|
||||
|
||||
|
||||
def all_service_ids() -> List[str]:
|
||||
"""Return the ids of every registered service."""
|
||||
return list(_REGISTRY.keys())
|
||||
|
||||
|
||||
def clear_registry() -> None:
|
||||
"""Wipe the registry. Test-only — production code uses ``register_services``."""
|
||||
_REGISTRY.clear()
|
||||
@ -0,0 +1,402 @@
|
||||
"""Tests for the enrichment service registry + generic Flask blueprint.
|
||||
|
||||
Covers the registry contract (registration / lookup / fallback status
|
||||
shape) and the generic ``/api/enrichment/<service_id>/...`` routes,
|
||||
including per-service quirks (rate-limit pre-resume guard, auto-pause
|
||||
token cleanup, persisted-pause config keys, extra default fields).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
|
||||
from core.enrichment.api import configure as configure_api, create_blueprint
|
||||
from core.enrichment.services import (
|
||||
EnrichmentService,
|
||||
all_service_ids,
|
||||
all_services,
|
||||
clear_registry,
|
||||
get_service,
|
||||
register_services,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test doubles
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeWorker:
|
||||
"""Captures pause / resume calls + returns controllable get_stats."""
|
||||
|
||||
def __init__(self, stats: Dict[str, Any] | None = None):
|
||||
self.stats = stats or {
|
||||
'enabled': True, 'running': True, 'paused': False,
|
||||
'current_item': None,
|
||||
'stats': {'matched': 5, 'not_found': 1, 'pending': 10, 'errors': 0},
|
||||
'progress': {},
|
||||
}
|
||||
self.pause_calls = 0
|
||||
self.resume_calls = 0
|
||||
self.pause_should_raise: Exception | None = None
|
||||
self.resume_should_raise: Exception | None = None
|
||||
|
||||
def pause(self) -> None:
|
||||
if self.pause_should_raise:
|
||||
raise self.pause_should_raise
|
||||
self.pause_calls += 1
|
||||
|
||||
def resume(self) -> None:
|
||||
if self.resume_should_raise:
|
||||
raise self.resume_should_raise
|
||||
self.resume_calls += 1
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
return self.stats
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_registry():
|
||||
"""Every test starts from a clean registry."""
|
||||
clear_registry()
|
||||
yield
|
||||
clear_registry()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def host_state():
|
||||
"""Host-side state collections (config, auto-pause set, yield-override set)."""
|
||||
state = {
|
||||
'config': {},
|
||||
'auto_paused': set(),
|
||||
'yield_override': set(),
|
||||
}
|
||||
configure_api(
|
||||
config_set=lambda k, v: state['config'].__setitem__(k, v),
|
||||
auto_paused_discard=lambda token: state['auto_paused'].discard(token),
|
||||
yield_override_add=lambda token: state['yield_override'].add(token),
|
||||
)
|
||||
yield state
|
||||
# Reset hooks so other tests run on a clean slate.
|
||||
configure_api(config_set=None, auto_paused_discard=None, yield_override_add=None)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(host_state):
|
||||
"""Flask app with the enrichment blueprint registered."""
|
||||
app = Flask(__name__)
|
||||
app.register_blueprint(create_blueprint())
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(app):
|
||||
return app.test_client()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registry behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegistry:
|
||||
def test_register_and_lookup(self):
|
||||
worker = _FakeWorker()
|
||||
svc = EnrichmentService(
|
||||
id='example', display_name='Example', worker_getter=lambda: worker,
|
||||
)
|
||||
register_services([svc])
|
||||
assert get_service('example') is svc
|
||||
assert all_service_ids() == ['example']
|
||||
assert all_services() == [svc]
|
||||
|
||||
def test_unknown_service_returns_none(self):
|
||||
register_services([])
|
||||
assert get_service('does_not_exist') is None
|
||||
|
||||
def test_re_register_replaces(self):
|
||||
register_services([
|
||||
EnrichmentService(id='a', display_name='A', worker_getter=lambda: None),
|
||||
])
|
||||
register_services([
|
||||
EnrichmentService(id='b', display_name='B', worker_getter=lambda: None),
|
||||
])
|
||||
assert get_service('a') is None
|
||||
assert get_service('b') is not None
|
||||
|
||||
def test_empty_id_rejected(self):
|
||||
with pytest.raises(ValueError):
|
||||
register_services([
|
||||
EnrichmentService(id='', display_name='X', worker_getter=lambda: None),
|
||||
])
|
||||
|
||||
def test_worker_getter_exception_returns_none(self):
|
||||
def boom():
|
||||
raise RuntimeError("init failed")
|
||||
|
||||
svc = EnrichmentService(id='broken', display_name='Broken', worker_getter=boom)
|
||||
register_services([svc])
|
||||
assert svc.get_worker() is None
|
||||
|
||||
def test_fallback_status_default_shape(self):
|
||||
svc = EnrichmentService(id='x', display_name='X', worker_getter=lambda: None)
|
||||
fb = svc.fallback_status()
|
||||
assert fb['enabled'] is False
|
||||
assert fb['running'] is False
|
||||
assert fb['paused'] is False
|
||||
assert fb['current_item'] is None
|
||||
assert fb['stats'] == {'matched': 0, 'not_found': 0, 'pending': 0, 'errors': 0}
|
||||
assert fb['progress'] == {}
|
||||
|
||||
def test_fallback_status_extra_defaults_merged(self):
|
||||
"""Tidal / Qobuz add ``'authenticated': False`` to the fallback."""
|
||||
svc = EnrichmentService(
|
||||
id='tidal', display_name='Tidal', worker_getter=lambda: None,
|
||||
extra_status_defaults={'authenticated': False},
|
||||
)
|
||||
fb = svc.fallback_status()
|
||||
assert fb['authenticated'] is False
|
||||
# And the standard keys still present.
|
||||
assert fb['enabled'] is False
|
||||
|
||||
def test_fallback_status_does_not_share_stats_dict(self):
|
||||
svc = EnrichmentService(id='x', display_name='X', worker_getter=lambda: None)
|
||||
fb1 = svc.fallback_status()
|
||||
fb1['stats']['matched'] = 999
|
||||
fb2 = svc.fallback_status()
|
||||
assert fb2['stats']['matched'] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Status route
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStatusRoute:
|
||||
def test_returns_worker_stats_when_initialized(self, client):
|
||||
worker = _FakeWorker(stats={'enabled': True, 'matched': 42})
|
||||
register_services([
|
||||
EnrichmentService(id='spotify', display_name='Spotify', worker_getter=lambda: worker),
|
||||
])
|
||||
resp = client.get('/api/enrichment/spotify/status')
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json() == {'enabled': True, 'matched': 42}
|
||||
|
||||
def test_returns_fallback_when_worker_none(self, client):
|
||||
register_services([
|
||||
EnrichmentService(id='spotify', display_name='Spotify', worker_getter=lambda: None),
|
||||
])
|
||||
resp = client.get('/api/enrichment/spotify/status')
|
||||
assert resp.status_code == 200
|
||||
body = resp.get_json()
|
||||
assert body['enabled'] is False
|
||||
assert body['stats'] == {'matched': 0, 'not_found': 0, 'pending': 0, 'errors': 0}
|
||||
|
||||
def test_unknown_service_returns_404(self, client):
|
||||
register_services([])
|
||||
resp = client.get('/api/enrichment/no_such_service/status')
|
||||
assert resp.status_code == 404
|
||||
|
||||
def test_get_stats_exception_returns_500(self, client):
|
||||
class BoomWorker:
|
||||
def get_stats(self):
|
||||
raise RuntimeError("db gone")
|
||||
|
||||
register_services([
|
||||
EnrichmentService(id='x', display_name='X', worker_getter=lambda: BoomWorker()),
|
||||
])
|
||||
resp = client.get('/api/enrichment/x/status')
|
||||
assert resp.status_code == 500
|
||||
assert 'db gone' in resp.get_json()['error']
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pause route
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPauseRoute:
|
||||
def test_pause_calls_worker_and_persists_config(self, client, host_state):
|
||||
worker = _FakeWorker()
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='itunes', display_name='iTunes', worker_getter=lambda: worker,
|
||||
config_paused_key='itunes_enrichment_paused',
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/itunes/pause')
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json() == {'status': 'paused'}
|
||||
assert worker.pause_calls == 1
|
||||
assert host_state['config']['itunes_enrichment_paused'] is True
|
||||
|
||||
def test_pause_drops_auto_pause_token(self, client, host_state):
|
||||
worker = _FakeWorker()
|
||||
host_state['auto_paused'].add('lastfm-enrichment')
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='lastfm', display_name='Last.fm', worker_getter=lambda: worker,
|
||||
config_paused_key='lastfm_enrichment_paused',
|
||||
auto_pause_token='lastfm-enrichment',
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/lastfm/pause')
|
||||
assert resp.status_code == 200
|
||||
assert 'lastfm-enrichment' not in host_state['auto_paused']
|
||||
|
||||
def test_pause_without_config_key_skips_persistence(self, client, host_state):
|
||||
worker = _FakeWorker()
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='hydra', display_name='Hydra', worker_getter=lambda: worker,
|
||||
config_paused_key='', # No persistence
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/hydra/pause')
|
||||
assert resp.status_code == 200
|
||||
assert host_state['config'] == {} # Nothing persisted
|
||||
|
||||
def test_pause_when_worker_none_returns_400(self, client):
|
||||
register_services([
|
||||
EnrichmentService(id='x', display_name='X', worker_getter=lambda: None),
|
||||
])
|
||||
resp = client.post('/api/enrichment/x/pause')
|
||||
assert resp.status_code == 400
|
||||
assert 'not initialized' in resp.get_json()['error']
|
||||
|
||||
def test_pause_worker_exception_returns_500(self, client):
|
||||
worker = _FakeWorker()
|
||||
worker.pause_should_raise = RuntimeError("worker dead")
|
||||
register_services([
|
||||
EnrichmentService(id='x', display_name='X', worker_getter=lambda: worker),
|
||||
])
|
||||
resp = client.post('/api/enrichment/x/pause')
|
||||
assert resp.status_code == 500
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Resume route
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestResumeRoute:
|
||||
def test_resume_calls_worker_persists_and_adds_yield_override(self, client, host_state):
|
||||
worker = _FakeWorker()
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='spotify', display_name='Spotify', worker_getter=lambda: worker,
|
||||
config_paused_key='spotify_enrichment_paused',
|
||||
auto_pause_token='spotify-enrichment',
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/spotify/resume')
|
||||
assert resp.status_code == 200
|
||||
assert resp.get_json() == {'status': 'running'}
|
||||
assert worker.resume_calls == 1
|
||||
assert host_state['config']['spotify_enrichment_paused'] is False
|
||||
assert 'spotify-enrichment' in host_state['yield_override']
|
||||
|
||||
def test_resume_blocked_by_pre_check_returns_429(self, client):
|
||||
"""Spotify rate-limit guard: pre-check returns (429, message)."""
|
||||
worker = _FakeWorker()
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='spotify', display_name='Spotify', worker_getter=lambda: worker,
|
||||
config_paused_key='spotify_enrichment_paused',
|
||||
pre_resume_check=lambda: (429, 'Cannot resume while Spotify is rate limited'),
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/spotify/resume')
|
||||
assert resp.status_code == 429
|
||||
body = resp.get_json()
|
||||
assert body['rate_limited'] is True
|
||||
assert 'rate limited' in body['error']
|
||||
assert worker.resume_calls == 0 # Worker not touched
|
||||
|
||||
def test_resume_pre_check_returning_none_passes(self, client):
|
||||
worker = _FakeWorker()
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='spotify', display_name='Spotify', worker_getter=lambda: worker,
|
||||
pre_resume_check=lambda: None,
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/spotify/resume')
|
||||
assert resp.status_code == 200
|
||||
assert worker.resume_calls == 1
|
||||
|
||||
def test_resume_pre_check_exception_treated_as_pass(self, client):
|
||||
"""A buggy pre-check shouldn't permanently lock out resume."""
|
||||
worker = _FakeWorker()
|
||||
|
||||
def boom():
|
||||
raise RuntimeError("pre-check broke")
|
||||
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='spotify', display_name='Spotify', worker_getter=lambda: worker,
|
||||
pre_resume_check=boom,
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/spotify/resume')
|
||||
assert resp.status_code == 200
|
||||
assert worker.resume_calls == 1
|
||||
|
||||
def test_resume_when_worker_none_returns_400(self, client):
|
||||
register_services([
|
||||
EnrichmentService(id='x', display_name='X', worker_getter=lambda: None),
|
||||
])
|
||||
resp = client.post('/api/enrichment/x/resume')
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_resume_worker_exception_returns_500(self, client):
|
||||
worker = _FakeWorker()
|
||||
worker.resume_should_raise = RuntimeError("worker dead")
|
||||
register_services([
|
||||
EnrichmentService(id='x', display_name='X', worker_getter=lambda: worker),
|
||||
])
|
||||
resp = client.post('/api/enrichment/x/resume')
|
||||
assert resp.status_code == 500
|
||||
|
||||
def test_resume_without_auto_pause_token_skips_yield_override(self, client, host_state):
|
||||
"""Services without an auto_pause_token (e.g. iTunes, Deezer) should
|
||||
NOT add to yield_override — that's a Spotify/LastFM/Genius-only
|
||||
mechanism."""
|
||||
worker = _FakeWorker()
|
||||
register_services([
|
||||
EnrichmentService(
|
||||
id='itunes', display_name='iTunes', worker_getter=lambda: worker,
|
||||
config_paused_key='itunes_enrichment_paused',
|
||||
auto_pause_token=None,
|
||||
),
|
||||
])
|
||||
resp = client.post('/api/enrichment/itunes/resume')
|
||||
assert resp.status_code == 200
|
||||
assert host_state['yield_override'] == set()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 404 path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestUnknownService:
|
||||
@pytest.mark.parametrize('verb,path', [
|
||||
('get', '/api/enrichment/no_such/status'),
|
||||
('post', '/api/enrichment/no_such/pause'),
|
||||
('post', '/api/enrichment/no_such/resume'),
|
||||
])
|
||||
def test_404_for_unknown_service(self, client, verb, path):
|
||||
register_services([])
|
||||
method = getattr(client, verb)
|
||||
resp = method(path)
|
||||
assert resp.status_code == 404
|
||||
assert 'no_such' in resp.get_json()['error']
|
||||
Loading…
Reference in new issue