Extract automation handlers (4/3 — finish): progress callbacks + scan-completion emitter

Cleans up the four remaining inline callbacks at the bottom of
`web_server._register_automation_handlers` so the function is now
purely deps-construction + register_all + a logger.info line.

Lifted:
- `_progress_init`, `_progress_finish`, `_record_automation_history`,
  and `_on_library_scan_completed` -> core/automation/handlers/progress_callbacks.py

Each is a top-level function that takes deps as a parameter; the
engine sees thin lambdas through `register_progress_callbacks` /
`register_library_scan_completed_emitter` (called from `register_all`).

Two new deps fields:
- `init_automation_progress` (delegates into the live progress tracker)
- `record_progress_history` (delegates into _auto_progress.record_history)

12 new boundary tests in tests/automation/test_progress_callbacks.py
pin every shape:
- progress_init forwards to init_automation_progress
- progress_finish skips when handler manages its own progress
  (prevents double-emit of finished status)
- progress_finish: completed -> finished/Complete/success;
  error -> error/Error/error; msg falls through error -> reason ->
  status -> 'done'
- record_history threads the live db into the recorder
- on_library_scan_completed: no engine = noop, server type taken
  from web_scan_manager._current_server_type, defaults to 'unknown'
- register_library_scan_completed_emitter: no scan manager = noop,
  registered callback emits the right event when invoked

3256 tests pass, no regression.

Final state of `_register_automation_handlers`:
- Was: 1530 lines, 21 nested closures + 4 progress callbacks
- Now: ~50 lines, builds AutomationDeps and calls register_all

web_server.py: 34,220 -> 34,187 lines (-33 net, -1,406 across the
whole branch).
pull/611/head
Broque Thomas 1 week ago
parent 017553193f
commit e140da117a

@ -133,3 +133,8 @@ class AutomationDeps:
get_watchlist_scanner: Callable[[Any], Any]
get_app: Callable[[], Any] # Flask app for test_client (beatport refresh)
get_beatport_data_cache: Callable[[], dict]
# --- Progress + history callbacks (used by register_all to wire
# the engine's progress callback hooks). ---
init_automation_progress: Callable[..., Any]
record_progress_history: Callable[..., Any]

@ -0,0 +1,89 @@
"""Progress + history callbacks the automation engine invokes around
each handler run.
Lifted from the closures at the bottom of
``web_server._register_automation_handlers``:
- ``_progress_init`` :func:`progress_init`
- ``_progress_finish`` :func:`progress_finish`
- ``_record_automation_history`` :func:`record_history`
- ``_on_library_scan_completed`` :func:`on_library_scan_completed`
The engine accepts four callables via
``register_progress_callbacks(init, finish, update, history)``;
``registration.register_all`` wires these here. The
``library_scan_completed`` callback is registered separately on the
``web_scan_manager`` (when one is available) -- see
``register_library_scan_completed_emitter``.
"""
from __future__ import annotations
from typing import Any, Dict
from core.automation.deps import AutomationDeps
def progress_init(aid: Any, name: str, action_type: str, deps: AutomationDeps) -> None:
"""Initialize per-automation progress state when the engine starts
a handler. Thin wrapper so the engine receives a closure that
delegates into the live progress tracker."""
deps.init_automation_progress(aid, name, action_type)
def progress_finish(aid: Any, result: Dict[str, Any], deps: AutomationDeps) -> None:
"""Emit the final progress update when a handler returns.
Skipped for handlers that manage their own progress lifecycle
(they call ``update_progress(status='finished')`` themselves and
set ``_manages_own_progress: True`` in the returned dict).
Otherwise translates the handler's status into a finished/error
progress emit with a status-appropriate phase + log line.
"""
if result.get('_manages_own_progress'):
return
result_status = result.get('status', '')
status = 'error' if result_status == 'error' else 'finished'
msg = result.get('error', result.get('reason', result_status or 'done'))
deps.update_progress(
aid,
status=status,
progress=100,
phase='Error' if status == 'error' else 'Complete',
log_line=msg,
log_type='error' if status == 'error' else 'success',
)
def record_history(aid: Any, result: Dict[str, Any], deps: AutomationDeps) -> None:
"""Capture progress state into run history before the engine's
cleanup pass clears it. Thin wrapper so the engine sees a stable
callable."""
deps.record_progress_history(aid, result, deps.get_database())
def on_library_scan_completed(deps: AutomationDeps) -> None:
"""Emit the ``library_scan_completed`` automation event with the
active media-server type. Replaces the hard-coded
``scan_completion_callback trigger_automatic_database_update``
chain so any automation can listen for scan completion as a
trigger."""
if not deps.engine:
return
server_type = (
getattr(deps.web_scan_manager, '_current_server_type', None)
or 'unknown'
)
deps.engine.emit('library_scan_completed', {
'server_type': server_type,
})
def register_library_scan_completed_emitter(deps: AutomationDeps) -> None:
"""Wire :func:`on_library_scan_completed` to the
``web_scan_manager``'s scan-completion callback list. No-op when
no scan manager is configured (e.g. headless / test contexts)."""
if not deps.web_scan_manager:
return
deps.web_scan_manager.add_scan_completion_callback(
lambda: on_library_scan_completed(deps),
)

@ -34,6 +34,12 @@ from core.automation.handlers.download_cleanup import (
)
from core.automation.handlers.run_script import auto_run_script
from core.automation.handlers.search_and_download import auto_search_and_download
from core.automation.handlers.progress_callbacks import (
progress_init,
progress_finish,
record_history,
register_library_scan_completed_emitter,
)
def register_all(deps: AutomationDeps) -> None:
@ -151,3 +157,19 @@ def register_all(deps: AutomationDeps) -> None:
'search_and_download',
lambda config: auto_search_and_download(config, deps),
)
# Progress + history callbacks: the engine invokes these around
# each handler run. Lift the closures from
# `web_server._register_automation_handlers` into thin lambdas
# that delegate into the extracted top-level functions.
engine.register_progress_callbacks(
lambda aid, name, action_type: progress_init(aid, name, action_type, deps),
lambda aid, result: progress_finish(aid, result, deps),
deps.update_progress,
lambda aid, result: record_history(aid, result, deps),
)
# `library_scan_completed` event: when the media-server scan
# manager finishes a scan, emit the event so any automation can
# trigger off it. No-op when no scan manager is configured.
register_library_scan_completed_emitter(deps)

@ -103,6 +103,8 @@ def _build_deps(**overrides) -> AutomationDeps:
get_watchlist_scanner=lambda spc: None,
get_app=lambda: None,
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]

@ -120,6 +120,8 @@ def _build_deps(**overrides) -> AutomationDeps:
get_watchlist_scanner=lambda spc: None,
get_app=lambda: None,
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]

@ -90,6 +90,8 @@ def _build_deps(**overrides: Any) -> AutomationDeps:
get_watchlist_scanner=lambda spc: None,
get_app=lambda: None,
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]

@ -0,0 +1,243 @@
"""Boundary tests for the progress + history callbacks extracted
from ``web_server._register_automation_handlers``.
The callbacks are wired by the engine via ``register_progress_callbacks``;
each test invokes the extracted top-level function with stub deps
and verifies the right downstream call fires."""
from __future__ import annotations
import threading
from typing import Any, Dict, List, Tuple
import pytest
from core.automation.deps import AutomationDeps, AutomationState
from core.automation.handlers.progress_callbacks import (
progress_init,
progress_finish,
record_history,
on_library_scan_completed,
register_library_scan_completed_emitter,
)
def _build_deps(**overrides) -> AutomationDeps:
class _StubLogger:
def debug(self, *a, **k): pass
def info(self, *a, **k): pass
def warning(self, *a, **k): pass
def error(self, *a, **k): pass
defaults = dict(
engine=object(),
state=AutomationState(),
config_manager=object(),
update_progress=lambda *a, **k: None,
logger=_StubLogger(),
get_database=lambda: object(),
spotify_client=None,
tidal_client=None,
web_scan_manager=None,
process_wishlist_automatically=lambda **k: None,
process_watchlist_scan_automatically=lambda **k: None,
is_wishlist_actually_processing=lambda: False,
is_watchlist_actually_scanning=lambda: False,
get_watchlist_scan_state=lambda: {},
run_playlist_discovery_worker=lambda *a, **k: None,
run_sync_task=lambda *a, **k: None,
load_sync_status_file=lambda: {},
get_deezer_client=lambda: None,
parse_youtube_playlist=lambda url: None,
get_sync_states=lambda: {},
set_db_update_automation_id=lambda v: None,
get_db_update_state=lambda: {},
db_update_lock=threading.Lock(),
db_update_executor=None,
run_db_update_task=lambda *a, **k: None,
run_deep_scan_task=lambda *a, **k: None,
get_duplicate_cleaner_state=lambda: {},
duplicate_cleaner_lock=threading.Lock(),
duplicate_cleaner_executor=None,
run_duplicate_cleaner=lambda: None,
get_quality_scanner_state=lambda: {},
quality_scanner_lock=threading.Lock(),
quality_scanner_executor=None,
run_quality_scanner=lambda *a, **k: None,
download_orchestrator=None,
run_async=lambda coro: None,
tasks_lock=threading.Lock(),
get_download_batches=lambda: {},
get_download_tasks=lambda: {},
sweep_empty_download_directories=lambda: 0,
get_staging_path=lambda: '/staging',
docker_resolve_path=lambda p: p,
get_current_profile_id=lambda: 1,
get_watchlist_scanner=lambda spc: None,
get_app=lambda: None,
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]
# ─── progress_init ───────────────────────────────────────────────────
class TestProgressInit:
def test_forwards_to_init_automation_progress(self):
captured: List[Tuple] = []
def fake(aid, name, action_type):
captured.append((aid, name, action_type))
deps = _build_deps(init_automation_progress=fake)
progress_init('auto-1', 'My Auto', 'wishlist', deps)
assert captured == [('auto-1', 'My Auto', 'wishlist')]
# ─── progress_finish ─────────────────────────────────────────────────
class TestProgressFinish:
def test_skips_when_handler_manages_own_progress(self):
# Handler set the flag — engine callback must NOT emit a
# second 'finished' over the top of the handler's own.
calls: List[Dict] = []
deps = _build_deps(update_progress=lambda *a, **k: calls.append({'a': a, 'k': k}))
progress_finish('auto-1', {'_manages_own_progress': True, 'status': 'completed'}, deps)
assert calls == []
def test_completed_emits_finished_status(self):
calls: List[Dict] = []
deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw}))
progress_finish('auto-1', {'status': 'completed'}, deps)
assert len(calls) == 1
assert calls[0]['aid'] == 'auto-1'
assert calls[0]['status'] == 'finished'
assert calls[0]['progress'] == 100
assert calls[0]['phase'] == 'Complete'
assert calls[0]['log_type'] == 'success'
def test_error_status_emits_error_phase(self):
calls: List[Dict] = []
deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw}))
progress_finish('auto-1', {'status': 'error', 'error': 'boom'}, deps)
assert calls[0]['status'] == 'error'
assert calls[0]['phase'] == 'Error'
assert calls[0]['log_line'] == 'boom'
assert calls[0]['log_type'] == 'error'
def test_msg_falls_back_through_keys(self):
# error -> reason -> status -> 'done'
calls: List[Dict] = []
deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw}))
progress_finish('auto-1', {'status': 'completed', 'reason': 'all good'}, deps)
assert calls[0]['log_line'] == 'all good'
def test_msg_default_done(self):
calls: List[Dict] = []
deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw}))
progress_finish('auto-1', {}, deps)
assert calls[0]['log_line'] == 'done'
# ─── record_history ──────────────────────────────────────────────────
class TestRecordHistory:
def test_passes_db_to_recorder(self):
captured: List[Tuple] = []
db_obj = object()
deps = _build_deps(
get_database=lambda: db_obj,
record_progress_history=lambda aid, result, db: captured.append((aid, result, db)),
)
record_history('auto-1', {'status': 'completed'}, deps)
assert captured == [('auto-1', {'status': 'completed'}, db_obj)]
# ─── on_library_scan_completed ───────────────────────────────────────
class TestOnLibraryScanCompleted:
def test_no_engine_skips(self):
deps = _build_deps(engine=None)
# Should not raise.
on_library_scan_completed(deps)
def test_emits_event_with_server_type(self):
emits: List[Tuple] = []
class _Engine:
def emit(self, name, payload):
emits.append((name, payload))
class _ScanMgr:
_current_server_type = 'plex'
deps = _build_deps(engine=_Engine(), web_scan_manager=_ScanMgr())
on_library_scan_completed(deps)
assert emits == [('library_scan_completed', {'server_type': 'plex'})]
def test_unknown_server_type_when_attr_missing(self):
emits: List[Tuple] = []
class _Engine:
def emit(self, name, payload):
emits.append((name, payload))
deps = _build_deps(engine=_Engine(), web_scan_manager=object())
on_library_scan_completed(deps)
assert emits[0][1] == {'server_type': 'unknown'}
# ─── register_library_scan_completed_emitter ─────────────────────────
class TestRegisterEmitter:
def test_no_scan_manager_noop(self):
# No web_scan_manager → no callback registered, no error.
deps = _build_deps(web_scan_manager=None)
register_library_scan_completed_emitter(deps)
def test_registers_callback_with_scan_manager(self):
callbacks: List = []
class _ScanMgr:
_current_server_type = 'plex'
def add_scan_completion_callback(self, cb):
callbacks.append(cb)
deps = _build_deps(web_scan_manager=_ScanMgr())
register_library_scan_completed_emitter(deps)
assert len(callbacks) == 1
# The registered callback must invoke without args (web_scan_manager
# calls completion callbacks with no params).
# Verify it does fire on_library_scan_completed when invoked.
emits: List = []
class _Engine:
def emit(self, name, payload):
emits.append((name, payload))
deps2 = _build_deps(engine=_Engine(), web_scan_manager=_ScanMgr())
register_library_scan_completed_emitter(deps2)
# The lambda captured deps2; we need to grab the registered
# callback to invoke it. Re-register and capture.
captured = []
class _Mgr2:
_current_server_type = 'jellyfin'
def add_scan_completion_callback(self, cb):
captured.append(cb)
deps3 = _build_deps(engine=_Engine(), web_scan_manager=_Mgr2())
emits3 = []
deps3 = _build_deps(
engine=type('E', (), {'emit': lambda self, n, p: emits3.append((n, p))})(),
web_scan_manager=_Mgr2(),
)
register_library_scan_completed_emitter(deps3)
captured[0]() # invoke the registered callback
assert emits3 == [('library_scan_completed', {'server_type': 'jellyfin'})]

@ -990,45 +990,12 @@ def _register_automation_handlers():
get_watchlist_scanner=_get_watchlist_scanner_fn,
get_app=lambda: app,
get_beatport_data_cache=lambda: beatport_data_cache,
init_automation_progress=_init_automation_progress,
record_progress_history=_auto_progress.record_history,
)
_register_extracted_handlers(_automation_deps)
# --- Phase 3 action handlers ---
# Register progress tracking callbacks
def _progress_init(aid, name, action_type):
_init_automation_progress(aid, name, action_type)
def _progress_finish(aid, result):
result_status = result.get('status', '')
# Skip for handlers that manage their own progress lifecycle
# (they call _update_automation_progress(status='finished') themselves)
if result.get('_manages_own_progress'):
return
status = 'error' if result_status == 'error' else 'finished'
msg = result.get('error', result.get('reason', result_status or 'done'))
_update_automation_progress(aid, status=status, progress=100,
phase='Error' if status == 'error' else 'Complete',
log_line=msg, log_type='error' if status == 'error' else 'success')
def _record_automation_history(aid, result):
"""Capture progress state into run history before cleanup clears it."""
_auto_progress.record_history(aid, result, get_database())
automation_engine.register_progress_callbacks(_progress_init, _progress_finish, _update_automation_progress, _record_automation_history)
# Register permanent callback: when any scan completes, emit library_scan_completed event
# This replaces the hardcoded scan_completion_callback → trigger_automatic_database_update chain
if web_scan_manager:
def _on_library_scan_completed():
if automation_engine:
server_type = getattr(web_scan_manager, '_current_server_type', None) or 'unknown'
automation_engine.emit('library_scan_completed', {
'server_type': server_type,
})
web_scan_manager.add_scan_completion_callback(_on_library_scan_completed)
logger.info("Automation action handlers registered")
# --- Register Public REST API Blueprint (v1) ---

Loading…
Cancel
Save