From e140da117a399ce30b11f1ff6d2e99c748680b05 Mon Sep 17 00:00:00 2001 From: Broque Thomas <26755000+Nezreka@users.noreply.github.com> Date: Fri, 15 May 2026 11:59:32 -0700 Subject: [PATCH] =?UTF-8?q?Extract=20automation=20handlers=20(4/3=20?= =?UTF-8?q?=E2=80=94=20finish):=20progress=20callbacks=20+=20scan-completi?= =?UTF-8?q?on=20emitter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cleans up the four remaining inline callbacks at the bottom of `web_server._register_automation_handlers` so the function is now purely deps-construction + register_all + a logger.info line. Lifted: - `_progress_init`, `_progress_finish`, `_record_automation_history`, and `_on_library_scan_completed` -> core/automation/handlers/progress_callbacks.py Each is a top-level function that takes deps as a parameter; the engine sees thin lambdas through `register_progress_callbacks` / `register_library_scan_completed_emitter` (called from `register_all`). Two new deps fields: - `init_automation_progress` (delegates into the live progress tracker) - `record_progress_history` (delegates into _auto_progress.record_history) 12 new boundary tests in tests/automation/test_progress_callbacks.py pin every shape: - progress_init forwards to init_automation_progress - progress_finish skips when handler manages its own progress (prevents double-emit of finished status) - progress_finish: completed -> finished/Complete/success; error -> error/Error/error; msg falls through error -> reason -> status -> 'done' - record_history threads the live db into the recorder - on_library_scan_completed: no engine = noop, server type taken from web_scan_manager._current_server_type, defaults to 'unknown' - register_library_scan_completed_emitter: no scan manager = noop, registered callback emits the right event when invoked 3256 tests pass, no regression. Final state of `_register_automation_handlers`: - Was: 1530 lines, 21 nested closures + 4 progress callbacks - Now: ~50 lines, builds AutomationDeps and calls register_all web_server.py: 34,220 -> 34,187 lines (-33 net, -1,406 across the whole branch). --- core/automation/deps.py | 5 + .../automation/handlers/progress_callbacks.py | 89 +++++++ core/automation/handlers/registration.py | 22 ++ tests/automation/test_handlers_maintenance.py | 2 + tests/automation/test_handlers_playlist.py | 2 + tests/automation/test_handlers_simple.py | 2 + tests/automation/test_progress_callbacks.py | 243 ++++++++++++++++++ web_server.py | 37 +-- 8 files changed, 367 insertions(+), 35 deletions(-) create mode 100644 core/automation/handlers/progress_callbacks.py create mode 100644 tests/automation/test_progress_callbacks.py diff --git a/core/automation/deps.py b/core/automation/deps.py index 83b60fb0..106d9c94 100644 --- a/core/automation/deps.py +++ b/core/automation/deps.py @@ -133,3 +133,8 @@ class AutomationDeps: get_watchlist_scanner: Callable[[Any], Any] get_app: Callable[[], Any] # Flask app for test_client (beatport refresh) get_beatport_data_cache: Callable[[], dict] + + # --- Progress + history callbacks (used by register_all to wire + # the engine's progress callback hooks). --- + init_automation_progress: Callable[..., Any] + record_progress_history: Callable[..., Any] diff --git a/core/automation/handlers/progress_callbacks.py b/core/automation/handlers/progress_callbacks.py new file mode 100644 index 00000000..a097864b --- /dev/null +++ b/core/automation/handlers/progress_callbacks.py @@ -0,0 +1,89 @@ +"""Progress + history callbacks the automation engine invokes around +each handler run. + +Lifted from the closures at the bottom of +``web_server._register_automation_handlers``: +- ``_progress_init`` → :func:`progress_init` +- ``_progress_finish`` → :func:`progress_finish` +- ``_record_automation_history`` → :func:`record_history` +- ``_on_library_scan_completed`` → :func:`on_library_scan_completed` + +The engine accepts four callables via +``register_progress_callbacks(init, finish, update, history)``; +``registration.register_all`` wires these here. The +``library_scan_completed`` callback is registered separately on the +``web_scan_manager`` (when one is available) -- see +``register_library_scan_completed_emitter``. +""" + +from __future__ import annotations + +from typing import Any, Dict + +from core.automation.deps import AutomationDeps + + +def progress_init(aid: Any, name: str, action_type: str, deps: AutomationDeps) -> None: + """Initialize per-automation progress state when the engine starts + a handler. Thin wrapper so the engine receives a closure that + delegates into the live progress tracker.""" + deps.init_automation_progress(aid, name, action_type) + + +def progress_finish(aid: Any, result: Dict[str, Any], deps: AutomationDeps) -> None: + """Emit the final progress update when a handler returns. + + Skipped for handlers that manage their own progress lifecycle + (they call ``update_progress(status='finished')`` themselves and + set ``_manages_own_progress: True`` in the returned dict). + Otherwise translates the handler's status into a finished/error + progress emit with a status-appropriate phase + log line. + """ + if result.get('_manages_own_progress'): + return + result_status = result.get('status', '') + status = 'error' if result_status == 'error' else 'finished' + msg = result.get('error', result.get('reason', result_status or 'done')) + deps.update_progress( + aid, + status=status, + progress=100, + phase='Error' if status == 'error' else 'Complete', + log_line=msg, + log_type='error' if status == 'error' else 'success', + ) + + +def record_history(aid: Any, result: Dict[str, Any], deps: AutomationDeps) -> None: + """Capture progress state into run history before the engine's + cleanup pass clears it. Thin wrapper so the engine sees a stable + callable.""" + deps.record_progress_history(aid, result, deps.get_database()) + + +def on_library_scan_completed(deps: AutomationDeps) -> None: + """Emit the ``library_scan_completed`` automation event with the + active media-server type. Replaces the hard-coded + ``scan_completion_callback → trigger_automatic_database_update`` + chain so any automation can listen for scan completion as a + trigger.""" + if not deps.engine: + return + server_type = ( + getattr(deps.web_scan_manager, '_current_server_type', None) + or 'unknown' + ) + deps.engine.emit('library_scan_completed', { + 'server_type': server_type, + }) + + +def register_library_scan_completed_emitter(deps: AutomationDeps) -> None: + """Wire :func:`on_library_scan_completed` to the + ``web_scan_manager``'s scan-completion callback list. No-op when + no scan manager is configured (e.g. headless / test contexts).""" + if not deps.web_scan_manager: + return + deps.web_scan_manager.add_scan_completion_callback( + lambda: on_library_scan_completed(deps), + ) diff --git a/core/automation/handlers/registration.py b/core/automation/handlers/registration.py index 7044b75a..de868bd9 100644 --- a/core/automation/handlers/registration.py +++ b/core/automation/handlers/registration.py @@ -34,6 +34,12 @@ from core.automation.handlers.download_cleanup import ( ) from core.automation.handlers.run_script import auto_run_script from core.automation.handlers.search_and_download import auto_search_and_download +from core.automation.handlers.progress_callbacks import ( + progress_init, + progress_finish, + record_history, + register_library_scan_completed_emitter, +) def register_all(deps: AutomationDeps) -> None: @@ -151,3 +157,19 @@ def register_all(deps: AutomationDeps) -> None: 'search_and_download', lambda config: auto_search_and_download(config, deps), ) + + # Progress + history callbacks: the engine invokes these around + # each handler run. Lift the closures from + # `web_server._register_automation_handlers` into thin lambdas + # that delegate into the extracted top-level functions. + engine.register_progress_callbacks( + lambda aid, name, action_type: progress_init(aid, name, action_type, deps), + lambda aid, result: progress_finish(aid, result, deps), + deps.update_progress, + lambda aid, result: record_history(aid, result, deps), + ) + + # `library_scan_completed` event: when the media-server scan + # manager finishes a scan, emit the event so any automation can + # trigger off it. No-op when no scan manager is configured. + register_library_scan_completed_emitter(deps) diff --git a/tests/automation/test_handlers_maintenance.py b/tests/automation/test_handlers_maintenance.py index 280ee3d6..a780d88a 100644 --- a/tests/automation/test_handlers_maintenance.py +++ b/tests/automation/test_handlers_maintenance.py @@ -103,6 +103,8 @@ def _build_deps(**overrides) -> AutomationDeps: get_watchlist_scanner=lambda spc: None, get_app=lambda: None, get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, + init_automation_progress=lambda *a, **k: None, + record_progress_history=lambda *a, **k: None, ) defaults.update(overrides) return AutomationDeps(**defaults) # type: ignore[arg-type] diff --git a/tests/automation/test_handlers_playlist.py b/tests/automation/test_handlers_playlist.py index b831b6fd..a017c592 100644 --- a/tests/automation/test_handlers_playlist.py +++ b/tests/automation/test_handlers_playlist.py @@ -120,6 +120,8 @@ def _build_deps(**overrides) -> AutomationDeps: get_watchlist_scanner=lambda spc: None, get_app=lambda: None, get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, + init_automation_progress=lambda *a, **k: None, + record_progress_history=lambda *a, **k: None, ) defaults.update(overrides) return AutomationDeps(**defaults) # type: ignore[arg-type] diff --git a/tests/automation/test_handlers_simple.py b/tests/automation/test_handlers_simple.py index 30b220ad..21e6e9d2 100644 --- a/tests/automation/test_handlers_simple.py +++ b/tests/automation/test_handlers_simple.py @@ -90,6 +90,8 @@ def _build_deps(**overrides: Any) -> AutomationDeps: get_watchlist_scanner=lambda spc: None, get_app=lambda: None, get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, + init_automation_progress=lambda *a, **k: None, + record_progress_history=lambda *a, **k: None, ) defaults.update(overrides) return AutomationDeps(**defaults) # type: ignore[arg-type] diff --git a/tests/automation/test_progress_callbacks.py b/tests/automation/test_progress_callbacks.py new file mode 100644 index 00000000..de844f62 --- /dev/null +++ b/tests/automation/test_progress_callbacks.py @@ -0,0 +1,243 @@ +"""Boundary tests for the progress + history callbacks extracted +from ``web_server._register_automation_handlers``. + +The callbacks are wired by the engine via ``register_progress_callbacks``; +each test invokes the extracted top-level function with stub deps +and verifies the right downstream call fires.""" + +from __future__ import annotations + +import threading +from typing import Any, Dict, List, Tuple + +import pytest + +from core.automation.deps import AutomationDeps, AutomationState +from core.automation.handlers.progress_callbacks import ( + progress_init, + progress_finish, + record_history, + on_library_scan_completed, + register_library_scan_completed_emitter, +) + + +def _build_deps(**overrides) -> AutomationDeps: + class _StubLogger: + def debug(self, *a, **k): pass + def info(self, *a, **k): pass + def warning(self, *a, **k): pass + def error(self, *a, **k): pass + + defaults = dict( + engine=object(), + state=AutomationState(), + config_manager=object(), + update_progress=lambda *a, **k: None, + logger=_StubLogger(), + get_database=lambda: object(), + spotify_client=None, + tidal_client=None, + web_scan_manager=None, + process_wishlist_automatically=lambda **k: None, + process_watchlist_scan_automatically=lambda **k: None, + is_wishlist_actually_processing=lambda: False, + is_watchlist_actually_scanning=lambda: False, + get_watchlist_scan_state=lambda: {}, + run_playlist_discovery_worker=lambda *a, **k: None, + run_sync_task=lambda *a, **k: None, + load_sync_status_file=lambda: {}, + get_deezer_client=lambda: None, + parse_youtube_playlist=lambda url: None, + get_sync_states=lambda: {}, + set_db_update_automation_id=lambda v: None, + get_db_update_state=lambda: {}, + db_update_lock=threading.Lock(), + db_update_executor=None, + run_db_update_task=lambda *a, **k: None, + run_deep_scan_task=lambda *a, **k: None, + get_duplicate_cleaner_state=lambda: {}, + duplicate_cleaner_lock=threading.Lock(), + duplicate_cleaner_executor=None, + run_duplicate_cleaner=lambda: None, + get_quality_scanner_state=lambda: {}, + quality_scanner_lock=threading.Lock(), + quality_scanner_executor=None, + run_quality_scanner=lambda *a, **k: None, + download_orchestrator=None, + run_async=lambda coro: None, + tasks_lock=threading.Lock(), + get_download_batches=lambda: {}, + get_download_tasks=lambda: {}, + sweep_empty_download_directories=lambda: 0, + get_staging_path=lambda: '/staging', + docker_resolve_path=lambda p: p, + get_current_profile_id=lambda: 1, + get_watchlist_scanner=lambda spc: None, + get_app=lambda: None, + get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, + init_automation_progress=lambda *a, **k: None, + record_progress_history=lambda *a, **k: None, + ) + defaults.update(overrides) + return AutomationDeps(**defaults) # type: ignore[arg-type] + + +# ─── progress_init ─────────────────────────────────────────────────── + + +class TestProgressInit: + def test_forwards_to_init_automation_progress(self): + captured: List[Tuple] = [] + + def fake(aid, name, action_type): + captured.append((aid, name, action_type)) + + deps = _build_deps(init_automation_progress=fake) + progress_init('auto-1', 'My Auto', 'wishlist', deps) + assert captured == [('auto-1', 'My Auto', 'wishlist')] + + +# ─── progress_finish ───────────────────────────────────────────────── + + +class TestProgressFinish: + def test_skips_when_handler_manages_own_progress(self): + # Handler set the flag — engine callback must NOT emit a + # second 'finished' over the top of the handler's own. + calls: List[Dict] = [] + deps = _build_deps(update_progress=lambda *a, **k: calls.append({'a': a, 'k': k})) + progress_finish('auto-1', {'_manages_own_progress': True, 'status': 'completed'}, deps) + assert calls == [] + + def test_completed_emits_finished_status(self): + calls: List[Dict] = [] + deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw})) + progress_finish('auto-1', {'status': 'completed'}, deps) + assert len(calls) == 1 + assert calls[0]['aid'] == 'auto-1' + assert calls[0]['status'] == 'finished' + assert calls[0]['progress'] == 100 + assert calls[0]['phase'] == 'Complete' + assert calls[0]['log_type'] == 'success' + + def test_error_status_emits_error_phase(self): + calls: List[Dict] = [] + deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw})) + progress_finish('auto-1', {'status': 'error', 'error': 'boom'}, deps) + assert calls[0]['status'] == 'error' + assert calls[0]['phase'] == 'Error' + assert calls[0]['log_line'] == 'boom' + assert calls[0]['log_type'] == 'error' + + def test_msg_falls_back_through_keys(self): + # error -> reason -> status -> 'done' + calls: List[Dict] = [] + deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw})) + progress_finish('auto-1', {'status': 'completed', 'reason': 'all good'}, deps) + assert calls[0]['log_line'] == 'all good' + + def test_msg_default_done(self): + calls: List[Dict] = [] + deps = _build_deps(update_progress=lambda aid, **kw: calls.append({'aid': aid, **kw})) + progress_finish('auto-1', {}, deps) + assert calls[0]['log_line'] == 'done' + + +# ─── record_history ────────────────────────────────────────────────── + + +class TestRecordHistory: + def test_passes_db_to_recorder(self): + captured: List[Tuple] = [] + db_obj = object() + deps = _build_deps( + get_database=lambda: db_obj, + record_progress_history=lambda aid, result, db: captured.append((aid, result, db)), + ) + record_history('auto-1', {'status': 'completed'}, deps) + assert captured == [('auto-1', {'status': 'completed'}, db_obj)] + + +# ─── on_library_scan_completed ─────────────────────────────────────── + + +class TestOnLibraryScanCompleted: + def test_no_engine_skips(self): + deps = _build_deps(engine=None) + # Should not raise. + on_library_scan_completed(deps) + + def test_emits_event_with_server_type(self): + emits: List[Tuple] = [] + + class _Engine: + def emit(self, name, payload): + emits.append((name, payload)) + + class _ScanMgr: + _current_server_type = 'plex' + + deps = _build_deps(engine=_Engine(), web_scan_manager=_ScanMgr()) + on_library_scan_completed(deps) + assert emits == [('library_scan_completed', {'server_type': 'plex'})] + + def test_unknown_server_type_when_attr_missing(self): + emits: List[Tuple] = [] + + class _Engine: + def emit(self, name, payload): + emits.append((name, payload)) + + deps = _build_deps(engine=_Engine(), web_scan_manager=object()) + on_library_scan_completed(deps) + assert emits[0][1] == {'server_type': 'unknown'} + + +# ─── register_library_scan_completed_emitter ───────────────────────── + + +class TestRegisterEmitter: + def test_no_scan_manager_noop(self): + # No web_scan_manager → no callback registered, no error. + deps = _build_deps(web_scan_manager=None) + register_library_scan_completed_emitter(deps) + + def test_registers_callback_with_scan_manager(self): + callbacks: List = [] + + class _ScanMgr: + _current_server_type = 'plex' + def add_scan_completion_callback(self, cb): + callbacks.append(cb) + + deps = _build_deps(web_scan_manager=_ScanMgr()) + register_library_scan_completed_emitter(deps) + assert len(callbacks) == 1 + # The registered callback must invoke without args (web_scan_manager + # calls completion callbacks with no params). + # Verify it does fire on_library_scan_completed when invoked. + emits: List = [] + + class _Engine: + def emit(self, name, payload): + emits.append((name, payload)) + + deps2 = _build_deps(engine=_Engine(), web_scan_manager=_ScanMgr()) + register_library_scan_completed_emitter(deps2) + # The lambda captured deps2; we need to grab the registered + # callback to invoke it. Re-register and capture. + captured = [] + class _Mgr2: + _current_server_type = 'jellyfin' + def add_scan_completion_callback(self, cb): + captured.append(cb) + deps3 = _build_deps(engine=_Engine(), web_scan_manager=_Mgr2()) + emits3 = [] + deps3 = _build_deps( + engine=type('E', (), {'emit': lambda self, n, p: emits3.append((n, p))})(), + web_scan_manager=_Mgr2(), + ) + register_library_scan_completed_emitter(deps3) + captured[0]() # invoke the registered callback + assert emits3 == [('library_scan_completed', {'server_type': 'jellyfin'})] diff --git a/web_server.py b/web_server.py index d5965540..208bf49a 100644 --- a/web_server.py +++ b/web_server.py @@ -990,45 +990,12 @@ def _register_automation_handlers(): get_watchlist_scanner=_get_watchlist_scanner_fn, get_app=lambda: app, get_beatport_data_cache=lambda: beatport_data_cache, + init_automation_progress=_init_automation_progress, + record_progress_history=_auto_progress.record_history, ) _register_extracted_handlers(_automation_deps) - # --- Phase 3 action handlers --- - - # Register progress tracking callbacks - def _progress_init(aid, name, action_type): - _init_automation_progress(aid, name, action_type) - - def _progress_finish(aid, result): - result_status = result.get('status', '') - # Skip for handlers that manage their own progress lifecycle - # (they call _update_automation_progress(status='finished') themselves) - if result.get('_manages_own_progress'): - return - status = 'error' if result_status == 'error' else 'finished' - msg = result.get('error', result.get('reason', result_status or 'done')) - _update_automation_progress(aid, status=status, progress=100, - phase='Error' if status == 'error' else 'Complete', - log_line=msg, log_type='error' if status == 'error' else 'success') - - def _record_automation_history(aid, result): - """Capture progress state into run history before cleanup clears it.""" - _auto_progress.record_history(aid, result, get_database()) - - automation_engine.register_progress_callbacks(_progress_init, _progress_finish, _update_automation_progress, _record_automation_history) - - # Register permanent callback: when any scan completes, emit library_scan_completed event - # This replaces the hardcoded scan_completion_callback → trigger_automatic_database_update chain - if web_scan_manager: - def _on_library_scan_completed(): - if automation_engine: - server_type = getattr(web_scan_manager, '_current_server_type', None) or 'unknown' - automation_engine.emit('library_scan_completed', { - 'server_type': server_type, - }) - web_scan_manager.add_scan_completion_callback(_on_library_scan_completed) - logger.info("Automation action handlers registered") # --- Register Public REST API Blueprint (v1) ---