mirror of https://github.com/Nezreka/SoulSync.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
392 lines
16 KiB
392 lines
16 KiB
"""Engine-boundary tests for the automation handler registration.
|
|
|
|
Per-handler boundary tests in the sibling test files prove each
|
|
handler's body works in isolation. These tests prove the
|
|
**registration layer** wires every handler to the right action name,
|
|
attaches the right guard, and registers the four progress callbacks
|
|
in the slots the engine expects.
|
|
|
|
The kettui standard for refactor PRs: don't ship a "behavior
|
|
preserved" claim that's only validated at the function boundary.
|
|
Wire the seam — engine + register_all + deps — and exercise it.
|
|
|
|
These tests use a minimal recording engine that captures every
|
|
``register_action_handler`` / ``register_progress_callbacks`` call,
|
|
plus a no-op ``add_scan_completion_callback`` on a fake scan
|
|
manager. No real AutomationEngine, no real DB, no real Flask app.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
import pytest
|
|
|
|
from core.automation.deps import AutomationDeps, AutomationState
|
|
from core.automation.handlers import register_all
|
|
|
|
|
|
# Every action name `register_all` is expected to register. Drift
|
|
# (rename / new handler / removed handler) fails this test
|
|
# immediately so refactor PRs can't quietly drop a handler.
|
|
EXPECTED_ACTION_NAMES = frozenset({
|
|
'process_wishlist',
|
|
'scan_watchlist',
|
|
'scan_library',
|
|
'refresh_mirrored',
|
|
'sync_playlist',
|
|
'discover_playlist',
|
|
'playlist_pipeline',
|
|
'personalized_pipeline',
|
|
'start_database_update',
|
|
'deep_scan_library',
|
|
'run_duplicate_cleaner',
|
|
'clear_quarantine',
|
|
'cleanup_wishlist',
|
|
'update_discovery_pool',
|
|
'start_quality_scan',
|
|
'backup_database',
|
|
'refresh_beatport_cache',
|
|
'clean_search_history',
|
|
'clean_completed_downloads',
|
|
'full_cleanup',
|
|
'run_script',
|
|
'search_and_download',
|
|
})
|
|
|
|
# Action names that MUST register a guard (duplicate-run prevention).
|
|
EXPECTED_GUARDED_ACTIONS = frozenset({
|
|
'process_wishlist',
|
|
'scan_watchlist',
|
|
'scan_library',
|
|
'playlist_pipeline',
|
|
'personalized_pipeline',
|
|
'start_database_update',
|
|
'deep_scan_library',
|
|
'run_duplicate_cleaner',
|
|
'start_quality_scan',
|
|
})
|
|
|
|
|
|
class _RecordingEngine:
|
|
"""Minimal AutomationEngine stand-in. Captures everything
|
|
register_all does so tests can assert on it."""
|
|
|
|
def __init__(self):
|
|
self.action_handlers: Dict[str, Dict[str, Any]] = {}
|
|
self.progress_callbacks: Tuple = ()
|
|
self.emits: List[Tuple[str, dict]] = []
|
|
|
|
def register_action_handler(self, action_type, handler_fn, guard_fn=None):
|
|
self.action_handlers[action_type] = {'handler': handler_fn, 'guard': guard_fn}
|
|
|
|
def register_progress_callbacks(self, init_fn, finish_fn, update_fn=None, history_fn=None):
|
|
self.progress_callbacks = (init_fn, finish_fn, update_fn, history_fn)
|
|
|
|
def emit(self, event_name, payload):
|
|
self.emits.append((event_name, payload))
|
|
|
|
|
|
class _RecordingScanMgr:
|
|
_current_server_type = 'plex'
|
|
|
|
def __init__(self):
|
|
self.callbacks: List = []
|
|
|
|
def add_scan_completion_callback(self, cb):
|
|
self.callbacks.append(cb)
|
|
|
|
|
|
def _build_deps(engine, scan_mgr=None) -> AutomationDeps:
|
|
class _Logger:
|
|
def debug(self, *a, **k): pass
|
|
def info(self, *a, **k): pass
|
|
def warning(self, *a, **k): pass
|
|
def error(self, *a, **k): pass
|
|
|
|
class _Cfg:
|
|
def get(self, key, default=None): return default
|
|
def get_active_media_server(self): return 'plex'
|
|
|
|
return AutomationDeps(
|
|
engine=engine,
|
|
state=AutomationState(),
|
|
config_manager=_Cfg(),
|
|
update_progress=lambda *a, **k: None,
|
|
logger=_Logger(),
|
|
get_database=lambda: object(),
|
|
spotify_client=None,
|
|
tidal_client=None,
|
|
web_scan_manager=scan_mgr,
|
|
process_wishlist_automatically=lambda **k: None,
|
|
process_watchlist_scan_automatically=lambda **k: None,
|
|
is_wishlist_actually_processing=lambda: False,
|
|
is_watchlist_actually_scanning=lambda: False,
|
|
get_watchlist_scan_state=lambda: {},
|
|
run_playlist_discovery_worker=lambda *a, **k: None,
|
|
run_sync_task=lambda *a, **k: None,
|
|
load_sync_status_file=lambda: {},
|
|
get_deezer_client=lambda: None,
|
|
parse_youtube_playlist=lambda url: None,
|
|
get_sync_states=lambda: {},
|
|
set_db_update_automation_id=lambda v: None,
|
|
get_db_update_state=lambda: {},
|
|
db_update_lock=threading.Lock(),
|
|
db_update_executor=None,
|
|
run_db_update_task=lambda *a, **k: None,
|
|
run_deep_scan_task=lambda *a, **k: None,
|
|
get_duplicate_cleaner_state=lambda: {},
|
|
duplicate_cleaner_lock=threading.Lock(),
|
|
duplicate_cleaner_executor=None,
|
|
run_duplicate_cleaner=lambda: None,
|
|
get_quality_scanner_state=lambda: {},
|
|
quality_scanner_lock=threading.Lock(),
|
|
quality_scanner_executor=None,
|
|
run_quality_scanner=lambda *a, **k: None,
|
|
download_orchestrator=None,
|
|
run_async=lambda coro: None,
|
|
tasks_lock=threading.Lock(),
|
|
get_download_batches=lambda: {},
|
|
get_download_tasks=lambda: {},
|
|
sweep_empty_download_directories=lambda: 0,
|
|
get_staging_path=lambda: '/staging',
|
|
docker_resolve_path=lambda p: p,
|
|
get_current_profile_id=lambda: 1,
|
|
get_watchlist_scanner=lambda spc: None,
|
|
get_app=lambda: None,
|
|
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
|
|
init_automation_progress=lambda *a, **k: None,
|
|
record_progress_history=lambda *a, **k: None,
|
|
build_personalized_manager=lambda: None,
|
|
)
|
|
|
|
|
|
# ─── action handler registration ─────────────────────────────────────
|
|
|
|
|
|
class TestActionHandlerRegistration:
|
|
def test_every_expected_action_name_registered(self):
|
|
engine = _RecordingEngine()
|
|
register_all(_build_deps(engine))
|
|
registered = set(engine.action_handlers.keys())
|
|
missing = EXPECTED_ACTION_NAMES - registered
|
|
extra = registered - EXPECTED_ACTION_NAMES
|
|
assert not missing, f"register_all dropped: {missing}"
|
|
assert not extra, f"register_all added unexpected: {extra}"
|
|
|
|
def test_guarded_actions_have_a_guard(self):
|
|
engine = _RecordingEngine()
|
|
register_all(_build_deps(engine))
|
|
for name in EXPECTED_GUARDED_ACTIONS:
|
|
assert engine.action_handlers[name]['guard'] is not None, (
|
|
f"action {name!r} expected to register a guard but didn't"
|
|
)
|
|
|
|
def test_unguarded_actions_have_no_guard(self):
|
|
engine = _RecordingEngine()
|
|
register_all(_build_deps(engine))
|
|
unguarded = EXPECTED_ACTION_NAMES - EXPECTED_GUARDED_ACTIONS
|
|
for name in unguarded:
|
|
assert engine.action_handlers[name]['guard'] is None, (
|
|
f"action {name!r} unexpectedly registered a guard"
|
|
)
|
|
|
|
def test_every_handler_callable(self):
|
|
# Every registered handler must be callable with a config dict.
|
|
engine = _RecordingEngine()
|
|
register_all(_build_deps(engine))
|
|
for name, entry in engine.action_handlers.items():
|
|
handler = entry['handler']
|
|
assert callable(handler), f"{name} handler is not callable"
|
|
|
|
def test_every_guard_callable_returns_bool(self):
|
|
engine = _RecordingEngine()
|
|
register_all(_build_deps(engine))
|
|
for name, entry in engine.action_handlers.items():
|
|
guard = entry['guard']
|
|
if guard is None:
|
|
continue
|
|
value = guard()
|
|
assert isinstance(value, bool), (
|
|
f"{name} guard returned non-bool: {type(value).__name__}"
|
|
)
|
|
|
|
|
|
# ─── progress callback registration ──────────────────────────────────
|
|
|
|
|
|
class TestProgressCallbackRegistration:
|
|
def test_all_four_callbacks_registered(self):
|
|
engine = _RecordingEngine()
|
|
register_all(_build_deps(engine))
|
|
init_fn, finish_fn, update_fn, history_fn = engine.progress_callbacks
|
|
assert callable(init_fn)
|
|
assert callable(finish_fn)
|
|
assert callable(update_fn)
|
|
assert callable(history_fn)
|
|
|
|
def test_progress_init_callback_invocable(self):
|
|
# Engine signature: init_fn(aid, name, action_type)
|
|
engine = _RecordingEngine()
|
|
captured: List[Tuple] = []
|
|
deps = _build_deps(engine)
|
|
deps = AutomationDeps(**{
|
|
**{f.name: getattr(deps, f.name) for f in deps.__dataclass_fields__.values()},
|
|
'init_automation_progress': lambda aid, name, at: captured.append((aid, name, at)),
|
|
})
|
|
register_all(deps)
|
|
init_fn, _, _, _ = engine.progress_callbacks
|
|
init_fn('auto-1', 'My Auto', 'wishlist')
|
|
assert captured == [('auto-1', 'My Auto', 'wishlist')]
|
|
|
|
def test_progress_finish_callback_invocable(self):
|
|
# Engine signature: finish_fn(aid, result)
|
|
engine = _RecordingEngine()
|
|
captured: List[Tuple] = []
|
|
deps = _build_deps(engine)
|
|
deps = AutomationDeps(**{
|
|
**{f.name: getattr(deps, f.name) for f in deps.__dataclass_fields__.values()},
|
|
'update_progress': lambda aid, **kw: captured.append((aid, kw)),
|
|
})
|
|
register_all(deps)
|
|
_, finish_fn, _, _ = engine.progress_callbacks
|
|
# Non-_manages_own_progress result triggers update_progress emit.
|
|
finish_fn('auto-1', {'status': 'completed'})
|
|
assert len(captured) == 1
|
|
assert captured[0][0] == 'auto-1'
|
|
assert captured[0][1]['status'] == 'finished'
|
|
|
|
def test_progress_finish_skips_self_managed(self):
|
|
engine = _RecordingEngine()
|
|
captured: List[Tuple] = []
|
|
deps = _build_deps(engine)
|
|
deps = AutomationDeps(**{
|
|
**{f.name: getattr(deps, f.name) for f in deps.__dataclass_fields__.values()},
|
|
'update_progress': lambda aid, **kw: captured.append((aid, kw)),
|
|
})
|
|
register_all(deps)
|
|
_, finish_fn, _, _ = engine.progress_callbacks
|
|
finish_fn('auto-1', {'_manages_own_progress': True, 'status': 'completed'})
|
|
assert captured == []
|
|
|
|
def test_history_callback_invocable_with_db(self):
|
|
# Engine signature: history_fn(aid, result)
|
|
engine = _RecordingEngine()
|
|
captured: List[Tuple] = []
|
|
db_obj = object()
|
|
deps = _build_deps(engine)
|
|
deps = AutomationDeps(**{
|
|
**{f.name: getattr(deps, f.name) for f in deps.__dataclass_fields__.values()},
|
|
'get_database': lambda: db_obj,
|
|
'record_progress_history': lambda aid, result, db: captured.append((aid, result, db)),
|
|
})
|
|
register_all(deps)
|
|
_, _, _, history_fn = engine.progress_callbacks
|
|
history_fn('auto-1', {'status': 'completed'})
|
|
assert captured == [('auto-1', {'status': 'completed'}, db_obj)]
|
|
|
|
|
|
# ─── library_scan_completed wiring ───────────────────────────────────
|
|
|
|
|
|
class TestLibraryScanCompletedEmitter:
|
|
def test_no_scan_manager_safe(self):
|
|
# Should not raise when scan manager is absent (test/headless mode).
|
|
engine = _RecordingEngine()
|
|
register_all(_build_deps(engine, scan_mgr=None))
|
|
# No callbacks captured (no scan manager to register against),
|
|
# but engine still has all the handlers.
|
|
assert engine.action_handlers
|
|
|
|
def test_scan_completion_callback_registered(self):
|
|
engine = _RecordingEngine()
|
|
scan_mgr = _RecordingScanMgr()
|
|
register_all(_build_deps(engine, scan_mgr=scan_mgr))
|
|
assert len(scan_mgr.callbacks) == 1
|
|
# Invoking the callback should fire engine.emit('library_scan_completed', ...).
|
|
scan_mgr.callbacks[0]()
|
|
assert engine.emits == [('library_scan_completed', {'server_type': 'plex'})]
|
|
|
|
|
|
# ─── handler invocation through the engine boundary ─────────────────
|
|
|
|
|
|
class TestHandlerInvocation:
|
|
"""For each registered handler, exercise the lambda the engine
|
|
would call. Verifies the deps closure is captured correctly + the
|
|
handler returns a result dict.
|
|
|
|
Forces every long-running handler down its guard short-circuit
|
|
path by pre-setting the relevant `*_state` dicts to ``running``
|
|
(database_update, deep_scan, duplicate_cleaner, quality_scanner)
|
|
or by pre-occupying the state flags (scan_library, playlist_pipeline).
|
|
Other handlers either return error early (no playlist specified,
|
|
no scan manager, etc) or run cleanly against the no-op stub deps.
|
|
"""
|
|
|
|
def test_every_handler_returns_dict(self):
|
|
engine = _RecordingEngine()
|
|
# Pre-set state dicts so guarded handlers skip-return cleanly.
|
|
running_state = {'status': 'running'}
|
|
active_batches = {'b1': {'phase': 'downloading'}}
|
|
|
|
# Stub DB that satisfies the handlers reaching for it on
|
|
# short paths. cleanup_wishlist calls remove_wishlist_duplicates.
|
|
# update_discovery_pool's exception path swallows missing
|
|
# scanner. Other handlers either short-circuit or use stub
|
|
# callables.
|
|
class _StubDB:
|
|
def remove_wishlist_duplicates(self, profile_id): return 0
|
|
def get_mirrored_playlists(self): return []
|
|
def get_mirrored_playlist(self, _id): return None
|
|
def get_mirrored_playlist_tracks(self, _id): return []
|
|
|
|
# Stub Flask app so refresh_beatport_cache's test_client call
|
|
# doesn't crash. Use a minimal context manager.
|
|
class _FakeResponse:
|
|
status_code = 200
|
|
class _FakeClient:
|
|
def __enter__(self): return self
|
|
def __exit__(self, *a): return False
|
|
def get(self, _path): return _FakeResponse()
|
|
class _StubApp:
|
|
def test_client(self): return _FakeClient()
|
|
|
|
deps = _build_deps(engine)
|
|
deps = AutomationDeps(**{
|
|
**{f.name: getattr(deps, f.name) for f in deps.__dataclass_fields__.values()},
|
|
'get_db_update_state': lambda: running_state,
|
|
'get_duplicate_cleaner_state': lambda: running_state,
|
|
'get_quality_scanner_state': lambda: running_state,
|
|
'get_download_batches': lambda: active_batches, # forces clean_completed_downloads to skip
|
|
'get_database': lambda: _StubDB(),
|
|
'get_app': lambda: _StubApp(),
|
|
})
|
|
# Pre-set state flags too so scan_library + playlist_pipeline guards fire.
|
|
deps.state.scan_library_automation_id = 'someone-else'
|
|
deps.state.pipeline_running = True
|
|
|
|
# Patch time.sleep across handler modules so refresh_beatport_cache
|
|
# (which sleeps 2s between sections) doesn't extend the test.
|
|
import core.automation.handlers.maintenance as maint_mod
|
|
original_sleep = maint_mod.time.sleep
|
|
maint_mod.time.sleep = lambda _: None
|
|
|
|
register_all(deps)
|
|
|
|
try:
|
|
for name, entry in engine.action_handlers.items():
|
|
handler = entry['handler']
|
|
# Minimal config: trigger the natural error/skip paths
|
|
# for handlers that need a playlist_id, query, script_name etc.
|
|
result = handler({'_automation_id': 'test', 'all': False})
|
|
assert isinstance(result, dict), (
|
|
f"handler {name!r} returned {type(result).__name__}, expected dict"
|
|
)
|
|
assert 'status' in result, (
|
|
f"handler {name!r} returned dict without 'status': {list(result.keys())}"
|
|
)
|
|
finally:
|
|
maint_mod.time.sleep = original_sleep
|