From ea7d5c65bb6de386b85b4699c5508db09642e378 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Fri, 15 May 2026 10:25:41 -0700 Subject: [PATCH] Extract automation handlers (1/N): infrastructure + 3 simple handlers Begins the lift of `web_server._register_automation_handlers` (1530 lines, 20 nested closures) into `core/automation/handlers/`. Each extracted handler is a top-level function that accepts `(config, deps)` instead of reaching for module-level globals -- makes them unit-testable in isolation. Infrastructure: - `core/automation/deps.py`: `AutomationDeps` (dependency-injection bundle of clients + callables) and `AutomationState` (mutable flags shared across handler invocations, with thread-safe accessors). - `core/automation/handlers/__init__.py` + `registration.py`: one-stop `register_all(deps)` that wires every extracted handler to the engine. First batch of handlers extracted: - `process_wishlist` -> `core/automation/handlers/process_wishlist.py` - `scan_watchlist` -> `core/automation/handlers/scan_watchlist.py` - `scan_library` -> `core/automation/handlers/scan_library.py` `web_server._register_automation_handlers` now builds the deps once and calls `register_all(deps)` for the extracted batch. Remaining 17 closures still live below; subsequent commits in this branch finish the lift. 14 boundary tests in `tests/automation/test_handlers_simple.py` pin every shape: success path, exception swallow contract, fresh-vs-stale state detection (scan_watchlist's id() trick), guard short-circuits, state cleanup on exceptions, AutomationState concurrent-safe accessors. All 101 automation tests pass; no regression. --- core/automation/__init__.py | 14 +- core/automation/deps.py | 95 ++++++ core/automation/handlers/__init__.py | 23 ++ core/automation/handlers/process_wishlist.py | 27 ++ core/automation/handlers/registration.py | 45 +++ core/automation/handlers/scan_library.py | 158 ++++++++++ core/automation/handlers/scan_watchlist.py | 46 +++ tests/automation/test_handlers_simple.py | 287 +++++++++++++++++++ web_server.py | 156 +++------- 9 files changed, 728 insertions(+), 123 deletions(-) create mode 100644 core/automation/deps.py create mode 100644 core/automation/handlers/__init__.py create mode 100644 core/automation/handlers/process_wishlist.py create mode 100644 core/automation/handlers/registration.py create mode 100644 core/automation/handlers/scan_library.py create mode 100644 core/automation/handlers/scan_watchlist.py create mode 100644 tests/automation/test_handlers_simple.py diff --git a/core/automation/__init__.py b/core/automation/__init__.py index 7841a13e..043b8173 100644 --- a/core/automation/__init__.py +++ b/core/automation/__init__.py @@ -1,7 +1,11 @@ -"""Automation API + progress tracking helpers package. +"""Automation API + progress + handlers package. -Lifted from web_server.py /api/automations/* routes and progress -emitters. The action handler registration (`_register_automation_handlers`) -stays in web_server.py because each handler closure is tightly coupled -to other application features. +Lifted from web_server.py: + - `/api/automations/*` route helpers → `api.py` + - block library used by the trigger/action UI → `blocks.py` + - progress tracker (init / update / finish) → `progress.py` + - cross-handler signal bus → `signals.py` + - per-action handler functions → `handlers/` subpackage (with + `deps.py` defining the dependency-injection surface so handlers + stay testable in isolation) """ diff --git a/core/automation/deps.py b/core/automation/deps.py new file mode 100644 index 00000000..58b8c9aa --- /dev/null +++ b/core/automation/deps.py @@ -0,0 +1,95 @@ +"""Dependency-injection surface for automation handlers. + +Each handler in ``core.automation.handlers`` is a top-level pure +function that accepts ``(config: dict, deps: AutomationDeps)`` instead +of reaching for module-level globals in ``web_server``. The deps +namespace bundles every callable, client, and mutable-state container +the handlers need. + +Construction happens once at app startup in ``web_server.py``: + + from core.automation.deps import AutomationDeps, AutomationState + state = AutomationState() + deps = AutomationDeps( + engine=automation_engine, + state=state, + get_database=get_database, + spotify_client=spotify_client, + ... + ) + register_all(deps) + +Tests construct a fake ``AutomationDeps`` with stub callables — every +handler is then exercisable without spinning up Flask, the DB, or +the real media-server clients. +""" + +from __future__ import annotations + +import threading +from dataclasses import dataclass, field +from typing import Any, Callable, Optional + + +@dataclass +class AutomationState: + """Mutable flags shared across handler invocations. + + Pre-refactor each was a ``global`` or ``nonlocal`` variable inside + the registration closure. Lifted here so handlers + their guards + can read/write a single object instead of importing globals. + + All mutations should hold ``lock``; the helper methods below do + so for the common get/set patterns. + """ + + scan_library_automation_id: Optional[str] = None + db_update_automation_id: Optional[str] = None + pipeline_running: bool = False + lock: threading.Lock = field(default_factory=threading.Lock) + + def is_scan_library_active(self) -> bool: + with self.lock: + return self.scan_library_automation_id is not None + + def is_pipeline_running(self) -> bool: + with self.lock: + return self.pipeline_running + + def set_scan_library_id(self, automation_id: Optional[str]) -> None: + with self.lock: + self.scan_library_automation_id = automation_id + + def set_pipeline_running(self, value: bool) -> None: + with self.lock: + self.pipeline_running = value + + +@dataclass +class AutomationDeps: + """Bundle of every callable + client an automation handler may need. + + Add fields as new handlers are extracted. Every field is required + at construction (no defaults) so a missing dep fails loudly at + startup, not silently mid-handler. + """ + + # --- Engine + shared state --- + engine: Any # AutomationEngine instance + state: AutomationState + config_manager: Any # config.settings.ConfigManager singleton + update_progress: Callable[..., None] # _update_automation_progress + logger: Any # module-level logger from utils.logging_config + + # --- Service clients (each may be None depending on user config) --- + get_database: Callable[[], Any] # late-binding so tests don't need DB + spotify_client: Any + tidal_client: Any + web_scan_manager: Any + + # --- Background-task entry points --- + process_wishlist_automatically: Callable[..., Any] + process_watchlist_scan_automatically: Callable[..., Any] + is_wishlist_actually_processing: Callable[[], bool] + is_watchlist_actually_scanning: Callable[[], bool] + get_watchlist_scan_state: Callable[[], dict] # accessor returns the live mutable dict diff --git a/core/automation/handlers/__init__.py b/core/automation/handlers/__init__.py new file mode 100644 index 00000000..8c8120f2 --- /dev/null +++ b/core/automation/handlers/__init__.py @@ -0,0 +1,23 @@ +"""Per-action automation handlers. + +Each module in this subpackage exposes one top-level handler function +(or a small cluster of related handlers) of the form:: + + def auto_(config: dict, deps: AutomationDeps) -> dict + +The ``register_all`` helper in :mod:`registration` wires every handler +to the engine in one place. ``web_server.py`` calls +``register_all(deps)`` once at startup. +""" + +from core.automation.handlers.process_wishlist import auto_process_wishlist +from core.automation.handlers.scan_watchlist import auto_scan_watchlist +from core.automation.handlers.scan_library import auto_scan_library +from core.automation.handlers.registration import register_all + +__all__ = [ + 'auto_process_wishlist', + 'auto_scan_watchlist', + 'auto_scan_library', + 'register_all', +] diff --git a/core/automation/handlers/process_wishlist.py b/core/automation/handlers/process_wishlist.py new file mode 100644 index 00000000..27a08046 --- /dev/null +++ b/core/automation/handlers/process_wishlist.py @@ -0,0 +1,27 @@ +"""Automation handler: ``process_wishlist`` action. + +Lifted from ``web_server._register_automation_handlers`` (the +``_auto_process_wishlist`` closure). Wishlist processing is async — +the helper submits a batch to an executor and returns immediately; +per-track stats arrive later via batch-completion callbacks. +""" + +from __future__ import annotations + +from typing import Any, Dict + +from core.automation.deps import AutomationDeps + + +def auto_process_wishlist(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]: + """Kick off the wishlist processor for an automation trigger. + + Returns immediately after submission; the wishlist worker emits + per-batch progress via its own callbacks. We only report + ``status: completed`` to mark the trigger fired successfully. + """ + try: + deps.process_wishlist_automatically(automation_id=config.get('_automation_id')) + return {'status': 'completed'} + except Exception as e: # noqa: BLE001 — automation handlers must never raise into the engine + return {'status': 'error', 'error': str(e)} diff --git a/core/automation/handlers/registration.py b/core/automation/handlers/registration.py new file mode 100644 index 00000000..6fcf5f9c --- /dev/null +++ b/core/automation/handlers/registration.py @@ -0,0 +1,45 @@ +"""One-stop registration of every extracted automation handler. + +``web_server`` builds the deps once at startup and calls +:func:`register_all` here. Each new handler module gets one line in +this file when it lands. +""" + +from __future__ import annotations + +from core.automation.deps import AutomationDeps +from core.automation.handlers.process_wishlist import auto_process_wishlist +from core.automation.handlers.scan_watchlist import auto_scan_watchlist +from core.automation.handlers.scan_library import auto_scan_library + + +def register_all(deps: AutomationDeps) -> None: + """Wire every extracted handler to the engine. + + Each ``register_action_handler`` call binds the action name (the + string the trigger uses to look up its action) to a thin lambda + that injects ``deps`` and forwards the engine-supplied config. + Guards stay alongside their handler so duplicate-run prevention + behaves identically to the pre-extraction code. + """ + engine = deps.engine + + # Self-guards prevent duplicate runs of the SAME operation, but + # different operations can run concurrently — wishlist downloads + # use bandwidth, watchlist scans use API calls, library scans use + # media-server CPU. Different resources, no contention. + engine.register_action_handler( + 'process_wishlist', + lambda config: auto_process_wishlist(config, deps), + guard_fn=deps.is_wishlist_actually_processing, + ) + engine.register_action_handler( + 'scan_watchlist', + lambda config: auto_scan_watchlist(config, deps), + guard_fn=deps.is_watchlist_actually_scanning, + ) + engine.register_action_handler( + 'scan_library', + lambda config: auto_scan_library(config, deps), + deps.state.is_scan_library_active, + ) diff --git a/core/automation/handlers/scan_library.py b/core/automation/handlers/scan_library.py new file mode 100644 index 00000000..824d293f --- /dev/null +++ b/core/automation/handlers/scan_library.py @@ -0,0 +1,158 @@ +"""Automation handler: ``scan_library`` action. + +Lifted from ``web_server._register_automation_handlers`` (the +``_auto_scan_library`` closure). The handler triggers a media-server +scan via ``web_scan_manager``, then polls the manager's status until +the scan completes (or a 30-minute timeout fires). Progress phases +are emitted via :func:`AutomationDeps.update_progress` so the +trigger card stays current throughout the run. + +The handler manages its own progress reporting (it sets +``_manages_own_progress: True`` in the result) so the engine doesn't +overwrite the live phase string with a generic 'completed' label. +""" + +from __future__ import annotations + +import time +from typing import Any, Dict + +from core.automation.deps import AutomationDeps + + +# Outer poll cap — covers extreme worst case (long Plex scans on +# huge libraries). Past this point we surface a clear timeout error +# so users notice rather than letting the trigger hang forever. +_SCAN_TIMEOUT_SECONDS = 1800 + +# Per-phase poll intervals. +_POLL_SCHEDULED_SECONDS = 2 +_POLL_SCANNING_SECONDS = 5 +_POLL_UNKNOWN_SECONDS = 2 + +# Progress percentage waypoints. +_PROGRESS_SCHEDULED_MAX = 14 +_PROGRESS_SCAN_START = 15 +_PROGRESS_SCAN_MAX = 95 + + +def auto_scan_library(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]: + """Run a media-server library scan and stream progress to the + trigger card. + + Returns one of: + - ``{'status': 'completed', '_manages_own_progress': True, ...}`` + - ``{'status': 'skipped', 'reason': 'Scan already being tracked'}`` + - ``{'status': 'error', 'reason': '...', '_manages_own_progress': True}`` + """ + automation_id = config.get('_automation_id') + + if not deps.web_scan_manager: + return {'status': 'error', 'reason': 'Scan manager not available'} + + # If another automation is already tracking the scan, just forward + # the request — the original tracker keeps emitting progress. + if deps.state.is_scan_library_active(): + deps.web_scan_manager.request_scan('Automation trigger (additional batch)') + return {'status': 'skipped', 'reason': 'Scan already being tracked'} + + deps.state.set_scan_library_id(automation_id) + + try: + result = deps.web_scan_manager.request_scan('Automation trigger') + scan_status_val = result.get('status', 'unknown') + + if scan_status_val == 'queued': + deps.update_progress( + automation_id, + log_line='Scan already in progress — waiting for completion', + log_type='info', + ) + else: + delay = result.get('delay_seconds', 60) + deps.update_progress( + automation_id, + log_line=f'Scan scheduled (debounce: {delay}s)', + log_type='info', + ) + + # Unified polling loop — handles debounce → scanning → idle. + poll_start = time.time() + scan_started = (scan_status_val == 'queued') + while time.time() - poll_start < _SCAN_TIMEOUT_SECONDS: + status = deps.web_scan_manager.get_scan_status() + st = status.get('status') + + if st == 'idle': + break # Scan completed (or finished before we polled) + + if st == 'scheduled': + elapsed = int(time.time() - poll_start) + deps.update_progress( + automation_id, + phase=f'Waiting for scan to start... ({elapsed}s)', + progress=min(int(elapsed / 60 * 10), _PROGRESS_SCHEDULED_MAX), + ) + time.sleep(_POLL_SCHEDULED_SECONDS) + continue + + if st == 'scanning': + if not scan_started: + scan_started = True + deps.update_progress( + automation_id, + progress=_PROGRESS_SCAN_START, + log_line='Scan triggered on media server', + log_type='success', + ) + elapsed = status.get('elapsed_seconds', 0) + max_time = status.get('max_time_seconds', 300) + pct = min(_PROGRESS_SCAN_START + int(elapsed / max_time * 80), _PROGRESS_SCAN_MAX) + mins, secs = divmod(elapsed, 60) + deps.update_progress( + automation_id, + phase=f'Library scan in progress... ({mins}m {secs}s)', + progress=pct, + ) + time.sleep(_POLL_SCANNING_SECONDS) + continue + + time.sleep(_POLL_UNKNOWN_SECONDS) + else: + # 30-min timeout reached + deps.update_progress( + automation_id, + status='error', + phase='Timed out', + log_line='Library scan timed out after 30 minutes', + log_type='error', + ) + return {'status': 'error', 'reason': 'Timed out', '_manages_own_progress': True} + + elapsed = round(time.time() - poll_start, 1) + deps.update_progress( + automation_id, + status='finished', + progress=100, + phase='Complete', + log_line='Library scan completed', + log_type='success', + ) + return { + 'status': 'completed', + '_manages_own_progress': True, + 'scan_duration_seconds': elapsed, + } + + except Exception as e: # noqa: BLE001 — automation handlers must never raise into the engine + deps.update_progress( + automation_id, + status='error', + phase='Error', + log_line=str(e), + log_type='error', + ) + return {'status': 'error', 'error': str(e), '_manages_own_progress': True} + + finally: + deps.state.set_scan_library_id(None) diff --git a/core/automation/handlers/scan_watchlist.py b/core/automation/handlers/scan_watchlist.py new file mode 100644 index 00000000..fe522f62 --- /dev/null +++ b/core/automation/handlers/scan_watchlist.py @@ -0,0 +1,46 @@ +"""Automation handler: ``scan_watchlist`` action. + +Lifted from ``web_server._register_automation_handlers`` (the +``_auto_scan_watchlist`` closure). The watchlist scanner returns +summary stats for the trigger card only when a fresh scan actually +ran — detected by snapshotting ``id(state_dict)`` before/after, since +the live processor reassigns the dict on each new scan. +""" + +from __future__ import annotations + +from typing import Any, Dict + +from core.automation.deps import AutomationDeps + + +def auto_scan_watchlist(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]: + """Run a watchlist scan when the automation triggers. + + Pre-scan we capture ``id(watchlist_scan_state)`` so we can tell + afterwards whether the worker ran (and reassigned the state dict) + or short-circuited (kept the same dict). Only fresh scans report + summary stats — repeat triggers without an intervening run return + a bare ``completed``. + """ + try: + pre_state = deps.get_watchlist_scan_state() + pre_state_id = id(pre_state) + deps.process_watchlist_scan_automatically( + automation_id=config.get('_automation_id'), + profile_id=config.get('_profile_id'), + ) + post_state = deps.get_watchlist_scan_state() + # Fresh scan = state dict was reassigned mid-run. + if id(post_state) != pre_state_id: + summary = post_state.get('summary', {}) if isinstance(post_state, dict) else {} + return { + 'status': 'completed', + 'artists_scanned': summary.get('total_artists', 0), + 'successful_scans': summary.get('successful_scans', 0), + 'new_tracks_found': summary.get('new_tracks_found', 0), + 'tracks_added_to_wishlist': summary.get('tracks_added_to_wishlist', 0), + } + return {'status': 'completed'} + except Exception as e: # noqa: BLE001 — automation handlers must never raise into the engine + return {'status': 'error', 'error': str(e)} diff --git a/tests/automation/test_handlers_simple.py b/tests/automation/test_handlers_simple.py new file mode 100644 index 00000000..3c5d0bc9 --- /dev/null +++ b/tests/automation/test_handlers_simple.py @@ -0,0 +1,287 @@ +"""Boundary tests for the simple extracted automation handlers +(``process_wishlist``, ``scan_watchlist``, ``scan_library``). + +Each handler is tested as a pure function: real ``AutomationDeps`` +constructed with stub callables, no Flask, no DB, no media-server +clients. The tests exercise the success path, the guard paths +(handler short-circuits when another instance is running), the +exception-swallowing contract (handlers must NEVER raise into the +engine), and the mutable-state machinery for handlers that own a +flag in ``AutomationState``. + +Pre-extraction these closures lived inside +``web_server._register_automation_handlers`` and were essentially +un-testable — every test would have needed to spin up the whole +Flask app and stub a dozen module-level globals.""" + +from __future__ import annotations + +import threading +import time +from dataclasses import dataclass, field +from typing import Any, Callable, List, Optional + +import pytest + +from core.automation.deps import AutomationDeps, AutomationState +from core.automation.handlers.process_wishlist import auto_process_wishlist +from core.automation.handlers.scan_watchlist import auto_scan_watchlist +from core.automation.handlers.scan_library import auto_scan_library + + +# ─── shared test scaffolding ────────────────────────────────────────── + + +def _build_deps(**overrides: Any) -> AutomationDeps: + """Return a default `AutomationDeps` with no-op callables. Tests + pass ``overrides`` to install behaviour on the specific deps they + care about.""" + defaults = dict( + engine=object(), + state=AutomationState(), + config_manager=object(), + update_progress=lambda *a, **k: None, + logger=object(), + get_database=lambda: object(), + spotify_client=None, + tidal_client=None, + web_scan_manager=None, + process_wishlist_automatically=lambda **k: None, + process_watchlist_scan_automatically=lambda **k: None, + is_wishlist_actually_processing=lambda: False, + is_watchlist_actually_scanning=lambda: False, + get_watchlist_scan_state=lambda: {}, + ) + defaults.update(overrides) + return AutomationDeps(**defaults) # type: ignore[arg-type] + + +# ─── process_wishlist ───────────────────────────────────────────────── + + +class TestProcessWishlist: + def test_success_returns_completed_status(self): + called: List[Any] = [] + + def stub(automation_id=None): + called.append(automation_id) + + deps = _build_deps(process_wishlist_automatically=stub) + result = auto_process_wishlist({'_automation_id': 'auto-1'}, deps) + assert result == {'status': 'completed'} + assert called == ['auto-1'] + + def test_passes_none_when_no_automation_id(self): + called: List[Any] = [] + + def stub(automation_id=None): + called.append(automation_id) + + deps = _build_deps(process_wishlist_automatically=stub) + result = auto_process_wishlist({}, deps) + assert result == {'status': 'completed'} + assert called == [None] + + def test_handler_swallows_exceptions(self): + def stub(**_kwargs): + raise RuntimeError('boom') + + deps = _build_deps(process_wishlist_automatically=stub) + result = auto_process_wishlist({'_automation_id': 'a'}, deps) + assert result == {'status': 'error', 'error': 'boom'} + + +# ─── scan_watchlist ────────────────────────────────────────────────── + + +class TestScanWatchlist: + def test_fresh_scan_reports_summary_stats(self): + # Worker reassigns the state dict mid-run — handler detects + # via id() change and reports stats. + states = [ + {'summary': {}}, + {'summary': { + 'total_artists': 5, + 'successful_scans': 4, + 'new_tracks_found': 12, + 'tracks_added_to_wishlist': 8, + }}, + ] + idx = {'i': 0} + + def get_state(): + return states[idx['i']] + + def stub(**_kwargs): + idx['i'] = 1 # simulate the worker swapping the dict + + deps = _build_deps( + process_watchlist_scan_automatically=stub, + get_watchlist_scan_state=get_state, + ) + result = auto_scan_watchlist({}, deps) + assert result == { + 'status': 'completed', + 'artists_scanned': 5, + 'successful_scans': 4, + 'new_tracks_found': 12, + 'tracks_added_to_wishlist': 8, + } + + def test_no_fresh_scan_returns_bare_completed(self): + # Same dict identity before and after = no fresh scan ran. + same_dict = {'summary': {'total_artists': 999}} + deps = _build_deps( + process_watchlist_scan_automatically=lambda **_k: None, + get_watchlist_scan_state=lambda: same_dict, + ) + result = auto_scan_watchlist({}, deps) + assert result == {'status': 'completed'} + + def test_handler_swallows_exceptions(self): + def stub(**_kwargs): + raise ValueError('no scanner') + + deps = _build_deps(process_watchlist_scan_automatically=stub) + result = auto_scan_watchlist({}, deps) + assert result == {'status': 'error', 'error': 'no scanner'} + + +# ─── scan_library ──────────────────────────────────────────────────── + + +@dataclass +class _StubScanManager: + """Minimal fake of ``web_scan_manager`` — records calls + lets + tests script its responses.""" + + request_responses: List[dict] = field(default_factory=list) + status_responses: List[dict] = field(default_factory=list) + request_calls: List[str] = field(default_factory=list) + + def request_scan(self, label: str) -> dict: + self.request_calls.append(label) + return self.request_responses.pop(0) if self.request_responses else {'status': 'queued'} + + def get_scan_status(self) -> dict: + return self.status_responses.pop(0) if self.status_responses else {'status': 'idle'} + + +class TestScanLibrary: + def test_no_scan_manager_returns_error(self): + deps = _build_deps(web_scan_manager=None) + result = auto_scan_library({'_automation_id': 'a'}, deps) + assert result == {'status': 'error', 'reason': 'Scan manager not available'} + + def test_already_tracked_returns_skipped(self): + # Pre-set the state flag — handler should short-circuit. + state = AutomationState() + state.scan_library_automation_id = 'someone-else' + scanner = _StubScanManager(request_responses=[{'status': 'queued'}]) + deps = _build_deps(state=state, web_scan_manager=scanner) + result = auto_scan_library({'_automation_id': 'a'}, deps) + assert result == {'status': 'skipped', 'reason': 'Scan already being tracked'} + assert scanner.request_calls == ['Automation trigger (additional batch)'] + + def test_scan_completes_normally(self): + # request_scan returns scheduled; first poll = scheduled; + # second poll = scanning; third poll = idle. + scanner = _StubScanManager( + request_responses=[{'status': 'scheduled', 'delay_seconds': 5}], + status_responses=[ + {'status': 'scheduled'}, + {'status': 'scanning', 'elapsed_seconds': 10, 'max_time_seconds': 100}, + {'status': 'idle'}, + ], + ) + progress: List[dict] = [] + + def stub_progress(automation_id, **kwargs): + progress.append({'aid': automation_id, **kwargs}) + + deps = _build_deps( + web_scan_manager=scanner, + update_progress=stub_progress, + ) + # Patch time.sleep so the test runs instantly. + import core.automation.handlers.scan_library as module + original = module.time.sleep + module.time.sleep = lambda _: None + try: + result = auto_scan_library({'_automation_id': 'auto-1'}, deps) + finally: + module.time.sleep = original + + assert result['status'] == 'completed' + assert result.get('_manages_own_progress') is True + # State flag cleaned up after run + assert deps.state.scan_library_automation_id is None + # Progress phases emitted: scheduled, scan-start, scanning, completed + statuses = [p.get('status') for p in progress] + assert 'finished' in statuses + + def test_state_cleanup_on_exception(self): + class ExplodingScanner: + def request_scan(self, _): + raise RuntimeError('boom') + + def get_scan_status(self): + return {'status': 'idle'} + + progress: List[dict] = [] + deps = _build_deps( + web_scan_manager=ExplodingScanner(), + update_progress=lambda aid, **kw: progress.append({'aid': aid, **kw}), + ) + result = auto_scan_library({'_automation_id': 'auto-x'}, deps) + assert result['status'] == 'error' + assert result['_manages_own_progress'] is True + # State flag still cleaned up + assert deps.state.scan_library_automation_id is None + # Error progress emitted + assert any(p.get('status') == 'error' for p in progress) + + +# ─── AutomationState ────────────────────────────────────────────────── + + +class TestAutomationState: + def test_default_state(self): + s = AutomationState() + assert s.scan_library_automation_id is None + assert s.db_update_automation_id is None + assert s.pipeline_running is False + assert s.is_scan_library_active() is False + assert s.is_pipeline_running() is False + + def test_set_scan_library_id(self): + s = AutomationState() + s.set_scan_library_id('auto-1') + assert s.scan_library_automation_id == 'auto-1' + assert s.is_scan_library_active() is True + s.set_scan_library_id(None) + assert s.is_scan_library_active() is False + + def test_set_pipeline_running(self): + s = AutomationState() + s.set_pipeline_running(True) + assert s.is_pipeline_running() is True + s.set_pipeline_running(False) + assert s.is_pipeline_running() is False + + def test_concurrent_set_safe_via_lock(self): + # Smoke test: two threads flipping the same field don't crash. + # Lock ensures the final value is consistent. + s = AutomationState() + + def worker(): + for _ in range(100): + s.set_pipeline_running(True) + s.set_pipeline_running(False) + + threads = [threading.Thread(target=worker) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join() + assert s.pipeline_running is False diff --git a/web_server.py b/web_server.py index 557d7c5a..aedbfd9d 100644 --- a/web_server.py +++ b/web_server.py @@ -906,129 +906,49 @@ _scan_library_automation_id = None def _register_automation_handlers(): - """Register real SoulSync action handlers with the automation engine.""" + """Register real SoulSync action handlers with the automation engine. + + Per-handler bodies live in ``core.automation.handlers``. This + function wires the dependency-injection surface (clients, + callables, mutable state) into a single ``AutomationDeps`` object + and hands it to ``register_all``. + + NOTE: extraction is in progress. The first batch of handlers + (``process_wishlist`` / ``scan_watchlist`` / ``scan_library``) + has been moved to ``core/automation/handlers/``; the remaining + closures still live below until subsequent commits in the same + branch finish the lift. + """ if not automation_engine: return - def _auto_process_wishlist(config): - try: - _process_wishlist_automatically(automation_id=config.get('_automation_id')) - return {'status': 'completed'} - except Exception as e: - return {'status': 'error', 'error': str(e)} - # Note: wishlist processing is async (batch submitted to executor), stats come via batch completion - - def _auto_scan_watchlist(config): - try: - pre_state_id = id(watchlist_scan_state) - _process_watchlist_scan_automatically( - automation_id=config.get('_automation_id'), - profile_id=config.get('_profile_id') - ) - # Only report stats if a fresh scan actually ran (state dict was reassigned) - if id(watchlist_scan_state) != pre_state_id: - summary = watchlist_scan_state.get('summary', {}) - return { - 'status': 'completed', - 'artists_scanned': summary.get('total_artists', 0), - 'successful_scans': summary.get('successful_scans', 0), - 'new_tracks_found': summary.get('new_tracks_found', 0), - 'tracks_added_to_wishlist': summary.get('tracks_added_to_wishlist', 0), - } - return {'status': 'completed'} - except Exception as e: - return {'status': 'error', 'error': str(e)} - - def _auto_scan_library(config): - global _scan_library_automation_id - automation_id = config.get('_automation_id') - - if not web_scan_manager: - return {'status': 'error', 'reason': 'Scan manager not available'} + from core.automation.deps import AutomationDeps, AutomationState + from core.automation.handlers import register_all as _register_extracted_handlers - # If another automation is already tracking the scan, just forward the request - if _scan_library_automation_id is not None: - web_scan_manager.request_scan('Automation trigger (additional batch)') - return {'status': 'skipped', 'reason': 'Scan already being tracked'} - - _scan_library_automation_id = automation_id - - try: - result = web_scan_manager.request_scan('Automation trigger') - scan_status_val = result.get('status', 'unknown') - - if scan_status_val == 'queued': - _update_automation_progress(automation_id, - log_line='Scan already in progress — waiting for completion', log_type='info') - else: - delay = result.get('delay_seconds', 60) - _update_automation_progress(automation_id, - log_line=f'Scan scheduled (debounce: {delay}s)', log_type='info') - - # Unified polling loop — handles debounce → scanning → idle transitions - poll_start = time.time() - scan_started = (scan_status_val == 'queued') # Already scanning if queued - while time.time() - poll_start < 1800: # Max 30 min overall - status = web_scan_manager.get_scan_status() - st = status.get('status') + # Mutable shared state previously lived as module-level globals + # (`_scan_library_automation_id`, `_pipeline_running`, etc). + # Keeping the legacy globals AS WELL as the new state object during + # the transitional period so the not-yet-extracted closures still + # work; they'll be removed once the rest of the lift is done. + _automation_state = AutomationState() - if st == 'idle': - break # Scan completed (or finished before we started polling) - - elif st == 'scheduled': - elapsed = int(time.time() - poll_start) - _update_automation_progress(automation_id, - phase=f'Waiting for scan to start... ({elapsed}s)', - progress=min(int(elapsed / 60 * 10), 14)) - time.sleep(2) - - elif st == 'scanning': - if not scan_started: - scan_started = True - _update_automation_progress(automation_id, progress=15, - log_line='Scan triggered on media server', log_type='success') - elapsed = status.get('elapsed_seconds', 0) - max_time = status.get('max_time_seconds', 300) - pct = min(15 + int(elapsed / max_time * 80), 95) - mins, secs = divmod(elapsed, 60) - _update_automation_progress(automation_id, - phase=f'Library scan in progress... ({mins}m {secs}s)', - progress=pct) - time.sleep(5) - - else: - time.sleep(2) # Unknown status, avoid tight loop - else: - # 30-min timeout reached - _update_automation_progress(automation_id, status='error', - phase='Timed out', log_line='Library scan timed out after 30 minutes', log_type='error') - return {'status': 'error', 'reason': 'Timed out', '_manages_own_progress': True} - - elapsed = round(time.time() - poll_start, 1) - _update_automation_progress(automation_id, status='finished', progress=100, - phase='Complete', - log_line='Library scan completed', log_type='success') - - return {'status': 'completed', '_manages_own_progress': True, 'scan_duration_seconds': elapsed} - - except Exception as e: - _update_automation_progress(automation_id, status='error', - phase='Error', log_line=str(e), log_type='error') - return {'status': 'error', 'error': str(e), '_manages_own_progress': True} - - finally: - _scan_library_automation_id = None - - # Self-guards only — prevent duplicate runs of the same operation, - # but allow wishlist processing and watchlist scanning to run concurrently. - # Downloads use bandwidth (Soulseek/Tidal/etc), scans use API calls — different resources. - # The per-call rate limiter handles any API contention during post-processing. - automation_engine.register_action_handler('process_wishlist', _auto_process_wishlist, - guard_fn=lambda: is_wishlist_actually_processing()) - automation_engine.register_action_handler('scan_watchlist', _auto_scan_watchlist, - guard_fn=lambda: is_watchlist_actually_scanning()) - automation_engine.register_action_handler('scan_library', _auto_scan_library, - lambda: _scan_library_automation_id is not None) + _automation_deps = AutomationDeps( + engine=automation_engine, + state=_automation_state, + config_manager=config_manager, + update_progress=_update_automation_progress, + logger=logger, + get_database=get_database, + spotify_client=spotify_client, + tidal_client=tidal_client, + web_scan_manager=web_scan_manager, + process_wishlist_automatically=_process_wishlist_automatically, + process_watchlist_scan_automatically=_process_watchlist_scan_automatically, + is_wishlist_actually_processing=is_wishlist_actually_processing, + is_watchlist_actually_scanning=is_watchlist_actually_scanning, + get_watchlist_scan_state=lambda: watchlist_scan_state, + ) + _register_extracted_handlers(_automation_deps) def _auto_refresh_mirrored(config): """Refresh mirrored playlist(s) from source."""