Extract automation handlers (1/N): infrastructure + 3 simple handlers

Begins the lift of `web_server._register_automation_handlers` (1530
lines, 20 nested closures) into `core/automation/handlers/`. Each
extracted handler is a top-level function that accepts
`(config, deps)` instead of reaching for module-level globals --
makes them unit-testable in isolation.

Infrastructure:
- `core/automation/deps.py`: `AutomationDeps` (dependency-injection
  bundle of clients + callables) and `AutomationState` (mutable flags
  shared across handler invocations, with thread-safe accessors).
- `core/automation/handlers/__init__.py` + `registration.py`: one-stop
  `register_all(deps)` that wires every extracted handler to the
  engine.

First batch of handlers extracted:
- `process_wishlist` -> `core/automation/handlers/process_wishlist.py`
- `scan_watchlist`   -> `core/automation/handlers/scan_watchlist.py`
- `scan_library`     -> `core/automation/handlers/scan_library.py`

`web_server._register_automation_handlers` now builds the deps once
and calls `register_all(deps)` for the extracted batch. Remaining
17 closures still live below; subsequent commits in this branch
finish the lift.

14 boundary tests in `tests/automation/test_handlers_simple.py` pin
every shape: success path, exception swallow contract, fresh-vs-stale
state detection (scan_watchlist's id() trick), guard short-circuits,
state cleanup on exceptions, AutomationState concurrent-safe accessors.
All 101 automation tests pass; no regression.
pull/611/head
Broque Thomas 1 week ago
parent d9529fc801
commit ea7d5c65bb

@ -1,7 +1,11 @@
"""Automation API + progress tracking helpers package.
"""Automation API + progress + handlers package.
Lifted from web_server.py /api/automations/* routes and progress
emitters. The action handler registration (`_register_automation_handlers`)
stays in web_server.py because each handler closure is tightly coupled
to other application features.
Lifted from web_server.py:
- `/api/automations/*` route helpers `api.py`
- block library used by the trigger/action UI `blocks.py`
- progress tracker (init / update / finish) `progress.py`
- cross-handler signal bus `signals.py`
- per-action handler functions `handlers/` subpackage (with
`deps.py` defining the dependency-injection surface so handlers
stay testable in isolation)
"""

@ -0,0 +1,95 @@
"""Dependency-injection surface for automation handlers.
Each handler in ``core.automation.handlers`` is a top-level pure
function that accepts ``(config: dict, deps: AutomationDeps)`` instead
of reaching for module-level globals in ``web_server``. The deps
namespace bundles every callable, client, and mutable-state container
the handlers need.
Construction happens once at app startup in ``web_server.py``:
from core.automation.deps import AutomationDeps, AutomationState
state = AutomationState()
deps = AutomationDeps(
engine=automation_engine,
state=state,
get_database=get_database,
spotify_client=spotify_client,
...
)
register_all(deps)
Tests construct a fake ``AutomationDeps`` with stub callables every
handler is then exercisable without spinning up Flask, the DB, or
the real media-server clients.
"""
from __future__ import annotations
import threading
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
@dataclass
class AutomationState:
"""Mutable flags shared across handler invocations.
Pre-refactor each was a ``global`` or ``nonlocal`` variable inside
the registration closure. Lifted here so handlers + their guards
can read/write a single object instead of importing globals.
All mutations should hold ``lock``; the helper methods below do
so for the common get/set patterns.
"""
scan_library_automation_id: Optional[str] = None
db_update_automation_id: Optional[str] = None
pipeline_running: bool = False
lock: threading.Lock = field(default_factory=threading.Lock)
def is_scan_library_active(self) -> bool:
with self.lock:
return self.scan_library_automation_id is not None
def is_pipeline_running(self) -> bool:
with self.lock:
return self.pipeline_running
def set_scan_library_id(self, automation_id: Optional[str]) -> None:
with self.lock:
self.scan_library_automation_id = automation_id
def set_pipeline_running(self, value: bool) -> None:
with self.lock:
self.pipeline_running = value
@dataclass
class AutomationDeps:
"""Bundle of every callable + client an automation handler may need.
Add fields as new handlers are extracted. Every field is required
at construction (no defaults) so a missing dep fails loudly at
startup, not silently mid-handler.
"""
# --- Engine + shared state ---
engine: Any # AutomationEngine instance
state: AutomationState
config_manager: Any # config.settings.ConfigManager singleton
update_progress: Callable[..., None] # _update_automation_progress
logger: Any # module-level logger from utils.logging_config
# --- Service clients (each may be None depending on user config) ---
get_database: Callable[[], Any] # late-binding so tests don't need DB
spotify_client: Any
tidal_client: Any
web_scan_manager: Any
# --- Background-task entry points ---
process_wishlist_automatically: Callable[..., Any]
process_watchlist_scan_automatically: Callable[..., Any]
is_wishlist_actually_processing: Callable[[], bool]
is_watchlist_actually_scanning: Callable[[], bool]
get_watchlist_scan_state: Callable[[], dict] # accessor returns the live mutable dict

@ -0,0 +1,23 @@
"""Per-action automation handlers.
Each module in this subpackage exposes one top-level handler function
(or a small cluster of related handlers) of the form::
def auto_<action_name>(config: dict, deps: AutomationDeps) -> dict
The ``register_all`` helper in :mod:`registration` wires every handler
to the engine in one place. ``web_server.py`` calls
``register_all(deps)`` once at startup.
"""
from core.automation.handlers.process_wishlist import auto_process_wishlist
from core.automation.handlers.scan_watchlist import auto_scan_watchlist
from core.automation.handlers.scan_library import auto_scan_library
from core.automation.handlers.registration import register_all
__all__ = [
'auto_process_wishlist',
'auto_scan_watchlist',
'auto_scan_library',
'register_all',
]

@ -0,0 +1,27 @@
"""Automation handler: ``process_wishlist`` action.
Lifted from ``web_server._register_automation_handlers`` (the
``_auto_process_wishlist`` closure). Wishlist processing is async
the helper submits a batch to an executor and returns immediately;
per-track stats arrive later via batch-completion callbacks.
"""
from __future__ import annotations
from typing import Any, Dict
from core.automation.deps import AutomationDeps
def auto_process_wishlist(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]:
"""Kick off the wishlist processor for an automation trigger.
Returns immediately after submission; the wishlist worker emits
per-batch progress via its own callbacks. We only report
``status: completed`` to mark the trigger fired successfully.
"""
try:
deps.process_wishlist_automatically(automation_id=config.get('_automation_id'))
return {'status': 'completed'}
except Exception as e: # noqa: BLE001 — automation handlers must never raise into the engine
return {'status': 'error', 'error': str(e)}

@ -0,0 +1,45 @@
"""One-stop registration of every extracted automation handler.
``web_server`` builds the deps once at startup and calls
:func:`register_all` here. Each new handler module gets one line in
this file when it lands.
"""
from __future__ import annotations
from core.automation.deps import AutomationDeps
from core.automation.handlers.process_wishlist import auto_process_wishlist
from core.automation.handlers.scan_watchlist import auto_scan_watchlist
from core.automation.handlers.scan_library import auto_scan_library
def register_all(deps: AutomationDeps) -> None:
"""Wire every extracted handler to the engine.
Each ``register_action_handler`` call binds the action name (the
string the trigger uses to look up its action) to a thin lambda
that injects ``deps`` and forwards the engine-supplied config.
Guards stay alongside their handler so duplicate-run prevention
behaves identically to the pre-extraction code.
"""
engine = deps.engine
# Self-guards prevent duplicate runs of the SAME operation, but
# different operations can run concurrently — wishlist downloads
# use bandwidth, watchlist scans use API calls, library scans use
# media-server CPU. Different resources, no contention.
engine.register_action_handler(
'process_wishlist',
lambda config: auto_process_wishlist(config, deps),
guard_fn=deps.is_wishlist_actually_processing,
)
engine.register_action_handler(
'scan_watchlist',
lambda config: auto_scan_watchlist(config, deps),
guard_fn=deps.is_watchlist_actually_scanning,
)
engine.register_action_handler(
'scan_library',
lambda config: auto_scan_library(config, deps),
deps.state.is_scan_library_active,
)

@ -0,0 +1,158 @@
"""Automation handler: ``scan_library`` action.
Lifted from ``web_server._register_automation_handlers`` (the
``_auto_scan_library`` closure). The handler triggers a media-server
scan via ``web_scan_manager``, then polls the manager's status until
the scan completes (or a 30-minute timeout fires). Progress phases
are emitted via :func:`AutomationDeps.update_progress` so the
trigger card stays current throughout the run.
The handler manages its own progress reporting (it sets
``_manages_own_progress: True`` in the result) so the engine doesn't
overwrite the live phase string with a generic 'completed' label.
"""
from __future__ import annotations
import time
from typing import Any, Dict
from core.automation.deps import AutomationDeps
# Outer poll cap — covers extreme worst case (long Plex scans on
# huge libraries). Past this point we surface a clear timeout error
# so users notice rather than letting the trigger hang forever.
_SCAN_TIMEOUT_SECONDS = 1800
# Per-phase poll intervals.
_POLL_SCHEDULED_SECONDS = 2
_POLL_SCANNING_SECONDS = 5
_POLL_UNKNOWN_SECONDS = 2
# Progress percentage waypoints.
_PROGRESS_SCHEDULED_MAX = 14
_PROGRESS_SCAN_START = 15
_PROGRESS_SCAN_MAX = 95
def auto_scan_library(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]:
"""Run a media-server library scan and stream progress to the
trigger card.
Returns one of:
- ``{'status': 'completed', '_manages_own_progress': True, ...}``
- ``{'status': 'skipped', 'reason': 'Scan already being tracked'}``
- ``{'status': 'error', 'reason': '...', '_manages_own_progress': True}``
"""
automation_id = config.get('_automation_id')
if not deps.web_scan_manager:
return {'status': 'error', 'reason': 'Scan manager not available'}
# If another automation is already tracking the scan, just forward
# the request — the original tracker keeps emitting progress.
if deps.state.is_scan_library_active():
deps.web_scan_manager.request_scan('Automation trigger (additional batch)')
return {'status': 'skipped', 'reason': 'Scan already being tracked'}
deps.state.set_scan_library_id(automation_id)
try:
result = deps.web_scan_manager.request_scan('Automation trigger')
scan_status_val = result.get('status', 'unknown')
if scan_status_val == 'queued':
deps.update_progress(
automation_id,
log_line='Scan already in progress — waiting for completion',
log_type='info',
)
else:
delay = result.get('delay_seconds', 60)
deps.update_progress(
automation_id,
log_line=f'Scan scheduled (debounce: {delay}s)',
log_type='info',
)
# Unified polling loop — handles debounce → scanning → idle.
poll_start = time.time()
scan_started = (scan_status_val == 'queued')
while time.time() - poll_start < _SCAN_TIMEOUT_SECONDS:
status = deps.web_scan_manager.get_scan_status()
st = status.get('status')
if st == 'idle':
break # Scan completed (or finished before we polled)
if st == 'scheduled':
elapsed = int(time.time() - poll_start)
deps.update_progress(
automation_id,
phase=f'Waiting for scan to start... ({elapsed}s)',
progress=min(int(elapsed / 60 * 10), _PROGRESS_SCHEDULED_MAX),
)
time.sleep(_POLL_SCHEDULED_SECONDS)
continue
if st == 'scanning':
if not scan_started:
scan_started = True
deps.update_progress(
automation_id,
progress=_PROGRESS_SCAN_START,
log_line='Scan triggered on media server',
log_type='success',
)
elapsed = status.get('elapsed_seconds', 0)
max_time = status.get('max_time_seconds', 300)
pct = min(_PROGRESS_SCAN_START + int(elapsed / max_time * 80), _PROGRESS_SCAN_MAX)
mins, secs = divmod(elapsed, 60)
deps.update_progress(
automation_id,
phase=f'Library scan in progress... ({mins}m {secs}s)',
progress=pct,
)
time.sleep(_POLL_SCANNING_SECONDS)
continue
time.sleep(_POLL_UNKNOWN_SECONDS)
else:
# 30-min timeout reached
deps.update_progress(
automation_id,
status='error',
phase='Timed out',
log_line='Library scan timed out after 30 minutes',
log_type='error',
)
return {'status': 'error', 'reason': 'Timed out', '_manages_own_progress': True}
elapsed = round(time.time() - poll_start, 1)
deps.update_progress(
automation_id,
status='finished',
progress=100,
phase='Complete',
log_line='Library scan completed',
log_type='success',
)
return {
'status': 'completed',
'_manages_own_progress': True,
'scan_duration_seconds': elapsed,
}
except Exception as e: # noqa: BLE001 — automation handlers must never raise into the engine
deps.update_progress(
automation_id,
status='error',
phase='Error',
log_line=str(e),
log_type='error',
)
return {'status': 'error', 'error': str(e), '_manages_own_progress': True}
finally:
deps.state.set_scan_library_id(None)

@ -0,0 +1,46 @@
"""Automation handler: ``scan_watchlist`` action.
Lifted from ``web_server._register_automation_handlers`` (the
``_auto_scan_watchlist`` closure). The watchlist scanner returns
summary stats for the trigger card only when a fresh scan actually
ran detected by snapshotting ``id(state_dict)`` before/after, since
the live processor reassigns the dict on each new scan.
"""
from __future__ import annotations
from typing import Any, Dict
from core.automation.deps import AutomationDeps
def auto_scan_watchlist(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]:
"""Run a watchlist scan when the automation triggers.
Pre-scan we capture ``id(watchlist_scan_state)`` so we can tell
afterwards whether the worker ran (and reassigned the state dict)
or short-circuited (kept the same dict). Only fresh scans report
summary stats repeat triggers without an intervening run return
a bare ``completed``.
"""
try:
pre_state = deps.get_watchlist_scan_state()
pre_state_id = id(pre_state)
deps.process_watchlist_scan_automatically(
automation_id=config.get('_automation_id'),
profile_id=config.get('_profile_id'),
)
post_state = deps.get_watchlist_scan_state()
# Fresh scan = state dict was reassigned mid-run.
if id(post_state) != pre_state_id:
summary = post_state.get('summary', {}) if isinstance(post_state, dict) else {}
return {
'status': 'completed',
'artists_scanned': summary.get('total_artists', 0),
'successful_scans': summary.get('successful_scans', 0),
'new_tracks_found': summary.get('new_tracks_found', 0),
'tracks_added_to_wishlist': summary.get('tracks_added_to_wishlist', 0),
}
return {'status': 'completed'}
except Exception as e: # noqa: BLE001 — automation handlers must never raise into the engine
return {'status': 'error', 'error': str(e)}

@ -0,0 +1,287 @@
"""Boundary tests for the simple extracted automation handlers
(``process_wishlist``, ``scan_watchlist``, ``scan_library``).
Each handler is tested as a pure function: real ``AutomationDeps``
constructed with stub callables, no Flask, no DB, no media-server
clients. The tests exercise the success path, the guard paths
(handler short-circuits when another instance is running), the
exception-swallowing contract (handlers must NEVER raise into the
engine), and the mutable-state machinery for handlers that own a
flag in ``AutomationState``.
Pre-extraction these closures lived inside
``web_server._register_automation_handlers`` and were essentially
un-testable every test would have needed to spin up the whole
Flask app and stub a dozen module-level globals."""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Callable, List, Optional
import pytest
from core.automation.deps import AutomationDeps, AutomationState
from core.automation.handlers.process_wishlist import auto_process_wishlist
from core.automation.handlers.scan_watchlist import auto_scan_watchlist
from core.automation.handlers.scan_library import auto_scan_library
# ─── shared test scaffolding ──────────────────────────────────────────
def _build_deps(**overrides: Any) -> AutomationDeps:
"""Return a default `AutomationDeps` with no-op callables. Tests
pass ``overrides`` to install behaviour on the specific deps they
care about."""
defaults = dict(
engine=object(),
state=AutomationState(),
config_manager=object(),
update_progress=lambda *a, **k: None,
logger=object(),
get_database=lambda: object(),
spotify_client=None,
tidal_client=None,
web_scan_manager=None,
process_wishlist_automatically=lambda **k: None,
process_watchlist_scan_automatically=lambda **k: None,
is_wishlist_actually_processing=lambda: False,
is_watchlist_actually_scanning=lambda: False,
get_watchlist_scan_state=lambda: {},
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]
# ─── process_wishlist ─────────────────────────────────────────────────
class TestProcessWishlist:
def test_success_returns_completed_status(self):
called: List[Any] = []
def stub(automation_id=None):
called.append(automation_id)
deps = _build_deps(process_wishlist_automatically=stub)
result = auto_process_wishlist({'_automation_id': 'auto-1'}, deps)
assert result == {'status': 'completed'}
assert called == ['auto-1']
def test_passes_none_when_no_automation_id(self):
called: List[Any] = []
def stub(automation_id=None):
called.append(automation_id)
deps = _build_deps(process_wishlist_automatically=stub)
result = auto_process_wishlist({}, deps)
assert result == {'status': 'completed'}
assert called == [None]
def test_handler_swallows_exceptions(self):
def stub(**_kwargs):
raise RuntimeError('boom')
deps = _build_deps(process_wishlist_automatically=stub)
result = auto_process_wishlist({'_automation_id': 'a'}, deps)
assert result == {'status': 'error', 'error': 'boom'}
# ─── scan_watchlist ──────────────────────────────────────────────────
class TestScanWatchlist:
def test_fresh_scan_reports_summary_stats(self):
# Worker reassigns the state dict mid-run — handler detects
# via id() change and reports stats.
states = [
{'summary': {}},
{'summary': {
'total_artists': 5,
'successful_scans': 4,
'new_tracks_found': 12,
'tracks_added_to_wishlist': 8,
}},
]
idx = {'i': 0}
def get_state():
return states[idx['i']]
def stub(**_kwargs):
idx['i'] = 1 # simulate the worker swapping the dict
deps = _build_deps(
process_watchlist_scan_automatically=stub,
get_watchlist_scan_state=get_state,
)
result = auto_scan_watchlist({}, deps)
assert result == {
'status': 'completed',
'artists_scanned': 5,
'successful_scans': 4,
'new_tracks_found': 12,
'tracks_added_to_wishlist': 8,
}
def test_no_fresh_scan_returns_bare_completed(self):
# Same dict identity before and after = no fresh scan ran.
same_dict = {'summary': {'total_artists': 999}}
deps = _build_deps(
process_watchlist_scan_automatically=lambda **_k: None,
get_watchlist_scan_state=lambda: same_dict,
)
result = auto_scan_watchlist({}, deps)
assert result == {'status': 'completed'}
def test_handler_swallows_exceptions(self):
def stub(**_kwargs):
raise ValueError('no scanner')
deps = _build_deps(process_watchlist_scan_automatically=stub)
result = auto_scan_watchlist({}, deps)
assert result == {'status': 'error', 'error': 'no scanner'}
# ─── scan_library ────────────────────────────────────────────────────
@dataclass
class _StubScanManager:
"""Minimal fake of ``web_scan_manager`` — records calls + lets
tests script its responses."""
request_responses: List[dict] = field(default_factory=list)
status_responses: List[dict] = field(default_factory=list)
request_calls: List[str] = field(default_factory=list)
def request_scan(self, label: str) -> dict:
self.request_calls.append(label)
return self.request_responses.pop(0) if self.request_responses else {'status': 'queued'}
def get_scan_status(self) -> dict:
return self.status_responses.pop(0) if self.status_responses else {'status': 'idle'}
class TestScanLibrary:
def test_no_scan_manager_returns_error(self):
deps = _build_deps(web_scan_manager=None)
result = auto_scan_library({'_automation_id': 'a'}, deps)
assert result == {'status': 'error', 'reason': 'Scan manager not available'}
def test_already_tracked_returns_skipped(self):
# Pre-set the state flag — handler should short-circuit.
state = AutomationState()
state.scan_library_automation_id = 'someone-else'
scanner = _StubScanManager(request_responses=[{'status': 'queued'}])
deps = _build_deps(state=state, web_scan_manager=scanner)
result = auto_scan_library({'_automation_id': 'a'}, deps)
assert result == {'status': 'skipped', 'reason': 'Scan already being tracked'}
assert scanner.request_calls == ['Automation trigger (additional batch)']
def test_scan_completes_normally(self):
# request_scan returns scheduled; first poll = scheduled;
# second poll = scanning; third poll = idle.
scanner = _StubScanManager(
request_responses=[{'status': 'scheduled', 'delay_seconds': 5}],
status_responses=[
{'status': 'scheduled'},
{'status': 'scanning', 'elapsed_seconds': 10, 'max_time_seconds': 100},
{'status': 'idle'},
],
)
progress: List[dict] = []
def stub_progress(automation_id, **kwargs):
progress.append({'aid': automation_id, **kwargs})
deps = _build_deps(
web_scan_manager=scanner,
update_progress=stub_progress,
)
# Patch time.sleep so the test runs instantly.
import core.automation.handlers.scan_library as module
original = module.time.sleep
module.time.sleep = lambda _: None
try:
result = auto_scan_library({'_automation_id': 'auto-1'}, deps)
finally:
module.time.sleep = original
assert result['status'] == 'completed'
assert result.get('_manages_own_progress') is True
# State flag cleaned up after run
assert deps.state.scan_library_automation_id is None
# Progress phases emitted: scheduled, scan-start, scanning, completed
statuses = [p.get('status') for p in progress]
assert 'finished' in statuses
def test_state_cleanup_on_exception(self):
class ExplodingScanner:
def request_scan(self, _):
raise RuntimeError('boom')
def get_scan_status(self):
return {'status': 'idle'}
progress: List[dict] = []
deps = _build_deps(
web_scan_manager=ExplodingScanner(),
update_progress=lambda aid, **kw: progress.append({'aid': aid, **kw}),
)
result = auto_scan_library({'_automation_id': 'auto-x'}, deps)
assert result['status'] == 'error'
assert result['_manages_own_progress'] is True
# State flag still cleaned up
assert deps.state.scan_library_automation_id is None
# Error progress emitted
assert any(p.get('status') == 'error' for p in progress)
# ─── AutomationState ──────────────────────────────────────────────────
class TestAutomationState:
def test_default_state(self):
s = AutomationState()
assert s.scan_library_automation_id is None
assert s.db_update_automation_id is None
assert s.pipeline_running is False
assert s.is_scan_library_active() is False
assert s.is_pipeline_running() is False
def test_set_scan_library_id(self):
s = AutomationState()
s.set_scan_library_id('auto-1')
assert s.scan_library_automation_id == 'auto-1'
assert s.is_scan_library_active() is True
s.set_scan_library_id(None)
assert s.is_scan_library_active() is False
def test_set_pipeline_running(self):
s = AutomationState()
s.set_pipeline_running(True)
assert s.is_pipeline_running() is True
s.set_pipeline_running(False)
assert s.is_pipeline_running() is False
def test_concurrent_set_safe_via_lock(self):
# Smoke test: two threads flipping the same field don't crash.
# Lock ensures the final value is consistent.
s = AutomationState()
def worker():
for _ in range(100):
s.set_pipeline_running(True)
s.set_pipeline_running(False)
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert s.pipeline_running is False

@ -906,129 +906,49 @@ _scan_library_automation_id = None
def _register_automation_handlers():
"""Register real SoulSync action handlers with the automation engine."""
"""Register real SoulSync action handlers with the automation engine.
Per-handler bodies live in ``core.automation.handlers``. This
function wires the dependency-injection surface (clients,
callables, mutable state) into a single ``AutomationDeps`` object
and hands it to ``register_all``.
NOTE: extraction is in progress. The first batch of handlers
(``process_wishlist`` / ``scan_watchlist`` / ``scan_library``)
has been moved to ``core/automation/handlers/``; the remaining
closures still live below until subsequent commits in the same
branch finish the lift.
"""
if not automation_engine:
return
def _auto_process_wishlist(config):
try:
_process_wishlist_automatically(automation_id=config.get('_automation_id'))
return {'status': 'completed'}
except Exception as e:
return {'status': 'error', 'error': str(e)}
# Note: wishlist processing is async (batch submitted to executor), stats come via batch completion
def _auto_scan_watchlist(config):
try:
pre_state_id = id(watchlist_scan_state)
_process_watchlist_scan_automatically(
automation_id=config.get('_automation_id'),
profile_id=config.get('_profile_id')
)
# Only report stats if a fresh scan actually ran (state dict was reassigned)
if id(watchlist_scan_state) != pre_state_id:
summary = watchlist_scan_state.get('summary', {})
return {
'status': 'completed',
'artists_scanned': summary.get('total_artists', 0),
'successful_scans': summary.get('successful_scans', 0),
'new_tracks_found': summary.get('new_tracks_found', 0),
'tracks_added_to_wishlist': summary.get('tracks_added_to_wishlist', 0),
}
return {'status': 'completed'}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def _auto_scan_library(config):
global _scan_library_automation_id
automation_id = config.get('_automation_id')
if not web_scan_manager:
return {'status': 'error', 'reason': 'Scan manager not available'}
from core.automation.deps import AutomationDeps, AutomationState
from core.automation.handlers import register_all as _register_extracted_handlers
# If another automation is already tracking the scan, just forward the request
if _scan_library_automation_id is not None:
web_scan_manager.request_scan('Automation trigger (additional batch)')
return {'status': 'skipped', 'reason': 'Scan already being tracked'}
_scan_library_automation_id = automation_id
try:
result = web_scan_manager.request_scan('Automation trigger')
scan_status_val = result.get('status', 'unknown')
if scan_status_val == 'queued':
_update_automation_progress(automation_id,
log_line='Scan already in progress — waiting for completion', log_type='info')
else:
delay = result.get('delay_seconds', 60)
_update_automation_progress(automation_id,
log_line=f'Scan scheduled (debounce: {delay}s)', log_type='info')
# Unified polling loop — handles debounce → scanning → idle transitions
poll_start = time.time()
scan_started = (scan_status_val == 'queued') # Already scanning if queued
while time.time() - poll_start < 1800: # Max 30 min overall
status = web_scan_manager.get_scan_status()
st = status.get('status')
# Mutable shared state previously lived as module-level globals
# (`_scan_library_automation_id`, `_pipeline_running`, etc).
# Keeping the legacy globals AS WELL as the new state object during
# the transitional period so the not-yet-extracted closures still
# work; they'll be removed once the rest of the lift is done.
_automation_state = AutomationState()
if st == 'idle':
break # Scan completed (or finished before we started polling)
elif st == 'scheduled':
elapsed = int(time.time() - poll_start)
_update_automation_progress(automation_id,
phase=f'Waiting for scan to start... ({elapsed}s)',
progress=min(int(elapsed / 60 * 10), 14))
time.sleep(2)
elif st == 'scanning':
if not scan_started:
scan_started = True
_update_automation_progress(automation_id, progress=15,
log_line='Scan triggered on media server', log_type='success')
elapsed = status.get('elapsed_seconds', 0)
max_time = status.get('max_time_seconds', 300)
pct = min(15 + int(elapsed / max_time * 80), 95)
mins, secs = divmod(elapsed, 60)
_update_automation_progress(automation_id,
phase=f'Library scan in progress... ({mins}m {secs}s)',
progress=pct)
time.sleep(5)
else:
time.sleep(2) # Unknown status, avoid tight loop
else:
# 30-min timeout reached
_update_automation_progress(automation_id, status='error',
phase='Timed out', log_line='Library scan timed out after 30 minutes', log_type='error')
return {'status': 'error', 'reason': 'Timed out', '_manages_own_progress': True}
elapsed = round(time.time() - poll_start, 1)
_update_automation_progress(automation_id, status='finished', progress=100,
phase='Complete',
log_line='Library scan completed', log_type='success')
return {'status': 'completed', '_manages_own_progress': True, 'scan_duration_seconds': elapsed}
except Exception as e:
_update_automation_progress(automation_id, status='error',
phase='Error', log_line=str(e), log_type='error')
return {'status': 'error', 'error': str(e), '_manages_own_progress': True}
finally:
_scan_library_automation_id = None
# Self-guards only — prevent duplicate runs of the same operation,
# but allow wishlist processing and watchlist scanning to run concurrently.
# Downloads use bandwidth (Soulseek/Tidal/etc), scans use API calls — different resources.
# The per-call rate limiter handles any API contention during post-processing.
automation_engine.register_action_handler('process_wishlist', _auto_process_wishlist,
guard_fn=lambda: is_wishlist_actually_processing())
automation_engine.register_action_handler('scan_watchlist', _auto_scan_watchlist,
guard_fn=lambda: is_watchlist_actually_scanning())
automation_engine.register_action_handler('scan_library', _auto_scan_library,
lambda: _scan_library_automation_id is not None)
_automation_deps = AutomationDeps(
engine=automation_engine,
state=_automation_state,
config_manager=config_manager,
update_progress=_update_automation_progress,
logger=logger,
get_database=get_database,
spotify_client=spotify_client,
tidal_client=tidal_client,
web_scan_manager=web_scan_manager,
process_wishlist_automatically=_process_wishlist_automatically,
process_watchlist_scan_automatically=_process_watchlist_scan_automatically,
is_wishlist_actually_processing=is_wishlist_actually_processing,
is_watchlist_actually_scanning=is_watchlist_actually_scanning,
get_watchlist_scan_state=lambda: watchlist_scan_state,
)
_register_extracted_handlers(_automation_deps)
def _auto_refresh_mirrored(config):
"""Refresh mirrored playlist(s) from source."""

Loading…
Cancel
Save