diff --git a/core/automation/blocks.py b/core/automation/blocks.py index c84d1b37..2697af26 100644 --- a/core/automation/blocks.py +++ b/core/automation/blocks.py @@ -146,6 +146,15 @@ ACTIONS: list[dict] = [ {"key": "all", "type": "checkbox", "label": "Process all mirrored playlists", "default": False}, {"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False}, ]}, + {"type": "personalized_pipeline", "label": "Personalized Playlist Pipeline", "icon": "sparkles", + "description": "Sync personalized / discover-page playlists (Hidden Gems, Time Machine, Fresh Tape, etc.) to your media server + queue missing tracks for download.", + "available": True, + "config_fields": [ + {"key": "kinds", "type": "personalized_playlist_select", "label": "Playlists to sync", + "description": "Multi-select: Hidden Gems, Discovery Shuffle, Time Machine (per decade), Genre playlists, Fresh Tape, The Archives, Seasonal Mix (per season)"}, + {"key": "refresh_first", "type": "checkbox", "label": "Refresh playlists before sync (regenerate snapshots)", "default": False}, + {"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False}, + ]}, {"type": "notify_only", "label": "Notify Only", "icon": "bell", "description": "No action — just send notification", "available": True}, # Phase 3 actions {"type": "start_database_update", "label": "Update Database", "icon": "database", diff --git a/core/automation/deps.py b/core/automation/deps.py index 106d9c94..a521a496 100644 --- a/core/automation/deps.py +++ b/core/automation/deps.py @@ -138,3 +138,9 @@ class AutomationDeps: # the engine's progress callback hooks). --- init_automation_progress: Callable[..., Any] record_progress_history: Callable[..., Any] + + # --- Personalized playlist pipeline --- + # Lazy builder so the pipeline handler can construct a fresh + # PersonalizedPlaylistManager per run (cheap accessors inside, + # no caching needed yet). + build_personalized_manager: Callable[[], Any] diff --git a/core/automation/handlers/__init__.py b/core/automation/handlers/__init__.py index 95f357a4..7a02f4d4 100644 --- a/core/automation/handlers/__init__.py +++ b/core/automation/handlers/__init__.py @@ -17,6 +17,7 @@ from core.automation.handlers.refresh_mirrored import auto_refresh_mirrored from core.automation.handlers.sync_playlist import auto_sync_playlist from core.automation.handlers.discover_playlist import auto_discover_playlist from core.automation.handlers.playlist_pipeline import auto_playlist_pipeline +from core.automation.handlers.personalized_pipeline import auto_personalized_pipeline from core.automation.handlers.database_update import auto_start_database_update, auto_deep_scan_library from core.automation.handlers.duplicate_cleaner import auto_run_duplicate_cleaner from core.automation.handlers.quality_scanner import auto_start_quality_scan @@ -44,6 +45,7 @@ __all__ = [ 'auto_sync_playlist', 'auto_discover_playlist', 'auto_playlist_pipeline', + 'auto_personalized_pipeline', 'auto_start_database_update', 'auto_deep_scan_library', 'auto_run_duplicate_cleaner', diff --git a/core/automation/handlers/_pipeline_shared.py b/core/automation/handlers/_pipeline_shared.py new file mode 100644 index 00000000..c7427f8a --- /dev/null +++ b/core/automation/handlers/_pipeline_shared.py @@ -0,0 +1,201 @@ +"""Shared helpers between mirrored + personalized playlist pipelines. + +Both pipelines end in the same shape: +1. SYNC each playlist to the active media server. +2. WISHLIST: trigger the wishlist processor for missing tracks. + +The differing prefix (mirrored = REFRESH external sources + DISCOVER +metadata; personalized = SNAPSHOT manager-backed playlists) is owned +by each pipeline. This module owns the SYNC + WISHLIST tail so both +pipelines stay consistent + DRY. +""" + +from __future__ import annotations + +import time +from typing import Any, Callable, Dict, List, Optional + +from core.automation.deps import AutomationDeps + + +# Per-playlist sync poll cap (mirrored side already used this). +_SYNC_PER_PLAYLIST_TIMEOUT_SECONDS = 600 +# Sync-status final-state markers. +_SYNC_TERMINAL_STATUSES = ('finished', 'complete', 'error', 'failed') + + +def run_sync_and_wishlist( + deps: AutomationDeps, + automation_id: Optional[str], + playlists: List[Dict[str, Any]], + *, + sync_one_fn: Callable[[Dict[str, Any]], Dict[str, Any]], + sync_id_for_fn: Callable[[Dict[str, Any]], str], + skip_wishlist: bool = False, + progress_start: int = 56, + progress_end: int = 85, + sync_phase_label: str = 'Phase: Syncing to server...', + sync_phase_start_log: str = 'Sync', + wishlist_phase_label: str = 'Phase: Processing wishlist...', + wishlist_phase_start_log: str = 'Wishlist', +) -> Dict[str, int]: + """Run the SYNC + WISHLIST tail of a playlist pipeline. + + The caller supplies: + - ``playlists``: list of playlist payload dicts. Each must have at + least a ``name`` key (used in progress logs). The shape beyond + ``name`` is opaque to the helper — ``sync_one_fn`` receives the + payload and returns a sync_result dict. + - ``sync_one_fn(payload) -> sync_result``: launches sync for one + playlist. Result dict must carry ``status`` ∈ ``('started', + 'skipped', 'error')`` and may carry ``reason``. + - ``sync_id_for_fn(payload) -> str``: returns the sync-state key + the helper polls on (so we can wait for the background sync + thread to complete + read the matched_tracks count). + + Returns ``{'synced': int, 'skipped': int, 'errors': int, + 'wishlist_queued': int}`` so the caller can stitch it into its + final status. + """ + deps.update_progress( + automation_id, + progress=progress_start, + phase=sync_phase_label, + log_line=sync_phase_start_log, + log_type='info', + ) + + total_synced = 0 + total_skipped = 0 + sync_errors = 0 + sync_states = deps.get_sync_states() + n_playlists = max(1, len(playlists)) + progress_span = max(1, progress_end - progress_start - 1) + + for pl_idx, pl in enumerate(playlists): + pl_name = pl.get('name', '') + sync_result = sync_one_fn(pl) + sync_status = sync_result.get('status', '') + + if sync_status == 'started': + sync_id = sync_id_for_fn(pl) + sync_poll_start = time.time() + while time.time() - sync_poll_start < _SYNC_PER_PLAYLIST_TIMEOUT_SECONDS: + if (sync_id in sync_states + and sync_states[sync_id].get('status') in _SYNC_TERMINAL_STATUSES): + break + time.sleep(2) + elapsed = int(time.time() - sync_poll_start) + sub_progress = progress_start + 1 + ((pl_idx + 1) / n_playlists) * progress_span + deps.update_progress( + automation_id, + progress=min(int(sub_progress), progress_end - 1), + phase=f'{sync_phase_label.rstrip(".")} — "{pl_name}" ({elapsed}s)', + ) + + ss = sync_states.get(sync_id, {}) + ss_result = ss.get('result', ss.get('progress', {})) + matched = ss_result.get('matched_tracks', 0) if isinstance(ss_result, dict) else 0 + total_synced += int(matched) if matched else 0 + deps.update_progress( + automation_id, + log_line=f'Synced "{pl_name}": {matched} tracks matched', + log_type='success', + ) + + elif sync_status == 'skipped': + total_skipped += 1 + reason = sync_result.get('reason', 'unchanged') + deps.update_progress( + automation_id, + log_line=f'Skipped "{pl_name}": {reason}', + log_type='skip', + ) + elif sync_status == 'error': + sync_errors += 1 + deps.update_progress( + automation_id, + log_line=f'Sync error "{pl_name}": {sync_result.get("reason", "unknown")}', + log_type='error', + ) + + deps.update_progress( + automation_id, + progress=progress_end, + phase=f'{sync_phase_label.rstrip(".")} complete', + log_line=f'Sync done: {total_synced} matched, {total_skipped} skipped, {sync_errors} errors', + log_type='success' if sync_errors == 0 else 'warning', + ) + + wishlist_queued = run_wishlist_phase( + deps, automation_id, + skip=skip_wishlist, + progress_pct=progress_end + 1, + wishlist_phase_label=wishlist_phase_label, + wishlist_phase_start_log=wishlist_phase_start_log, + ) + + return { + 'synced': total_synced, + 'skipped': total_skipped, + 'errors': sync_errors, + 'wishlist_queued': wishlist_queued, + } + + +def run_wishlist_phase( + deps: AutomationDeps, + automation_id: Optional[str], + *, + skip: bool, + progress_pct: int, + wishlist_phase_label: str = 'Phase: Processing wishlist...', + wishlist_phase_start_log: str = 'Wishlist', +) -> int: + """Trigger the wishlist processor unless skipped or already running. + + Returns 1 when the processor was triggered, 0 otherwise. Errors are + logged but never raised — wishlist failure should not abort the + pipeline.""" + if skip: + deps.update_progress( + automation_id, + progress=progress_pct, + log_line=f'{wishlist_phase_start_log}: skipped (disabled)', + log_type='skip', + ) + return 0 + + deps.update_progress( + automation_id, + progress=progress_pct, + phase=wishlist_phase_label, + log_line=wishlist_phase_start_log, + log_type='info', + ) + + try: + if not deps.is_wishlist_actually_processing(): + deps.process_wishlist_automatically(automation_id=None) + deps.update_progress( + automation_id, + log_line='Wishlist processing triggered', + log_type='success', + ) + return 1 + deps.update_progress( + automation_id, + log_line='Wishlist already running — skipped', + log_type='skip', + ) + return 0 + except Exception as e: # noqa: BLE001 — wishlist failure must never abort pipeline + deps.update_progress( + automation_id, + log_line=f'Wishlist error: {e}', + log_type='warning', + ) + return 0 + + +__all__ = ['run_sync_and_wishlist', 'run_wishlist_phase'] diff --git a/core/automation/handlers/personalized_pipeline.py b/core/automation/handlers/personalized_pipeline.py new file mode 100644 index 00000000..a076bafa --- /dev/null +++ b/core/automation/handlers/personalized_pipeline.py @@ -0,0 +1,269 @@ +"""Personalized Playlist Pipeline automation handler. + +Sibling to ``auto_playlist_pipeline`` (mirrored). Where the mirrored +pipeline runs REFRESH external sources → DISCOVER metadata → SYNC → +WISHLIST, the personalized pipeline is simpler: + + SNAPSHOT → SYNC → WISHLIST + +SNAPSHOT reads the persisted track list from +``PersonalizedPlaylistManager``. When ``refresh_first=True`` (config), +each playlist is refreshed BEFORE syncing — useful when the user +wants the cron to capture a fresh-each-run view (e.g. "give me a new +Hidden Gems set every night"). Default is to sync the existing +snapshot, on the assumption the user / a separate cron has already +refreshed when they wanted to. + +Config schema: + { + 'kinds': [ + {'kind': 'hidden_gems'}, + {'kind': 'time_machine', 'variant': '1980s'}, + {'kind': 'seasonal_mix', 'variant': 'halloween'}, + ... + ], + 'refresh_first': bool, # default false + 'skip_wishlist': bool, # default false + } + +Each kind dict has at minimum ``kind``; ``variant`` is required for +kinds that need it (time_machine, genre_playlist, daily_mix, +seasonal_mix). Singleton kinds (hidden_gems, discovery_shuffle, +popular_picks, fresh_tape, archives) ignore variant. + +Pipeline-running flag (``deps.state.pipeline_running``) is shared +with the mirrored pipeline so the two can't overlap. (One sync +queue, one wishlist worker — overlapping triggers would step on +each other.)""" + +from __future__ import annotations + +import threading +import time +from typing import Any, Dict, List, Optional + +from core.automation.deps import AutomationDeps +from core.automation.handlers._pipeline_shared import run_sync_and_wishlist + + +# Sync state key prefix so personalized syncs don't collide with +# mirrored ones (`auto_mirror_`). +_SYNC_ID_PREFIX = 'auto_personalized' + + +def auto_personalized_pipeline(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]: + """Run SNAPSHOT → SYNC → WISHLIST for selected personalized playlists.""" + deps.state.set_pipeline_running(True) + automation_id = config.get('_automation_id') + pipeline_start = time.time() + + try: + kinds_config = config.get('kinds') or [] + if not isinstance(kinds_config, list) or not kinds_config: + deps.state.set_pipeline_running(False) + return { + 'status': 'error', + 'error': 'No personalized playlist kinds selected', + } + + refresh_first = bool(config.get('refresh_first', False)) + skip_wishlist = bool(config.get('skip_wishlist', False)) + + manager = deps.build_personalized_manager() + + deps.update_progress( + automation_id, + progress=2, + phase=f'Personalized pipeline: {len(kinds_config)} playlist(s)', + log_line=f'Starting pipeline for {len(kinds_config)} playlist(s)', + log_type='info', + ) + + # ── PHASE 1: SNAPSHOT (optionally refresh) ────────────────── + deps.update_progress( + automation_id, + progress=3, + phase='Phase 1/2: Loading snapshots...' if not refresh_first + else 'Phase 1/2: Refreshing snapshots...', + log_line='Phase 1: Snapshot' + (' (with refresh)' if refresh_first else ''), + log_type='info', + ) + + profile_id = deps.get_current_profile_id() + playload_payloads = _build_payloads_for_kinds( + deps, manager, kinds_config, profile_id, + automation_id=automation_id, + refresh_first=refresh_first, + ) + + if not playload_payloads: + deps.state.set_pipeline_running(False) + deps.update_progress( + automation_id, + status='finished', progress=100, + phase='No playlists to sync', + log_line='No personalized playlists had tracks to sync', + log_type='warning', + ) + return { + 'status': 'completed', + '_manages_own_progress': True, + 'playlists_synced': '0', + 'tracks_synced': '0', + 'duration_seconds': str(int(time.time() - pipeline_start)), + } + + deps.update_progress( + automation_id, + progress=50, + phase='Phase 1/2: Snapshot complete', + log_line=f'Phase 1 done: {len(playload_payloads)} playlist(s) ready to sync', + log_type='success', + ) + + # ── PHASE 2: SYNC + WISHLIST (shared helper) ──────────────── + sync_summary = run_sync_and_wishlist( + deps, + automation_id, + playload_payloads, + sync_one_fn=lambda pl: _sync_personalized_playlist(deps, pl), + sync_id_for_fn=lambda pl: pl['sync_id'], + skip_wishlist=skip_wishlist, + progress_start=51, + progress_end=90, + sync_phase_label='Phase 2/2: Syncing to server...', + sync_phase_start_log='Phase 2: Sync', + wishlist_phase_label='Phase 2/2: Processing wishlist...', + wishlist_phase_start_log='Wishlist', + ) + + # ── COMPLETE ──────────────────────────────────────────────── + duration = int(time.time() - pipeline_start) + deps.update_progress( + automation_id, + status='finished', progress=100, + phase='Pipeline complete', + log_line=f'Personalized pipeline finished in {duration // 60}m {duration % 60}s', + log_type='success', + ) + + deps.state.set_pipeline_running(False) + return { + 'status': 'completed', + '_manages_own_progress': True, + 'playlists_synced': str(len(playload_payloads)), + 'tracks_synced': str(sync_summary['synced']), + 'sync_skipped': str(sync_summary['skipped']), + 'wishlist_queued': str(sync_summary['wishlist_queued']), + 'duration_seconds': str(duration), + } + + except Exception as e: # noqa: BLE001 — automation handlers must never raise into engine + deps.state.set_pipeline_running(False) + deps.update_progress( + automation_id, + status='error', progress=100, + phase='Pipeline error', + log_line=f'Personalized pipeline failed: {e}', + log_type='error', + ) + return {'status': 'error', 'error': str(e), '_manages_own_progress': True} + + +def _build_payloads_for_kinds( + deps: AutomationDeps, + manager: Any, + kinds_config: List[Dict[str, Any]], + profile_id: int, + *, + automation_id: Optional[str], + refresh_first: bool, +) -> List[Dict[str, Any]]: + """Resolve each requested kind+variant into a sync-payload dict. + + Each payload has: ``{'name', 'kind', 'variant', 'tracks_json', + 'image_url', 'sync_id'}``. Playlists with no tracks (e.g. a + seasonal mix that hasn't been populated yet) are omitted from + the result so the sync loop doesn't waste time on empty pushes. + """ + payloads: List[Dict[str, Any]] = [] + for entry in kinds_config: + if not isinstance(entry, dict): + continue + kind = entry.get('kind') + variant = entry.get('variant') or '' + if not kind: + continue + + try: + if refresh_first: + record = manager.refresh_playlist(kind, variant, profile_id) + else: + record = manager.ensure_playlist(kind, variant, profile_id) + except Exception as exc: # noqa: BLE001 — log + continue with next kind + deps.update_progress( + automation_id, + log_line=f'Skipping {kind}{("/" + variant) if variant else ""}: {exc}', + log_type='warning', + ) + continue + + tracks = manager.get_playlist_tracks(record.id) + if not tracks: + deps.update_progress( + automation_id, + log_line=f'No tracks in {record.name} — skipping sync', + log_type='skip', + ) + continue + + tracks_json = [_track_to_sync_shape(t) for t in tracks] + payloads.append({ + 'name': record.name, + 'kind': record.kind, + 'variant': record.variant, + 'tracks_json': tracks_json, + 'image_url': '', # personalized playlists don't have a cover image yet + 'sync_id': f'{_SYNC_ID_PREFIX}_{record.kind}_{record.variant or "_"}', + }) + return payloads + + +def _track_to_sync_shape(track: Any) -> Dict[str, Any]: + """Convert a personalized.types.Track into the dict shape + `_run_sync_task` expects. Mirrors what the mirrored pipeline + builds from extra_data.matched_data — name/artists/album/duration/id.""" + primary_id = track.spotify_track_id or track.itunes_track_id or track.deezer_track_id or '' + artists = [{'name': track.artist_name}] + return { + 'name': track.track_name, + 'artists': artists, + 'album': {'name': track.album_name or ''}, + 'duration_ms': int(track.duration_ms or 0), + 'id': primary_id, + } + + +def _sync_personalized_playlist(deps: AutomationDeps, payload: Dict[str, Any]) -> Dict[str, Any]: + """Launch a personalized playlist sync via _run_sync_task on a + daemon thread + return immediately with status='started'. + + Mirrors the mirrored ``auto_sync_playlist`` return contract so the + shared helper can poll on ``sync_states[sync_id]`` and aggregate + results identically.""" + sync_id = payload['sync_id'] + name = payload['name'] + tracks_json = payload['tracks_json'] + profile_id = deps.get_current_profile_id() + + threading.Thread( + target=deps.run_sync_task, + args=(sync_id, name, tracks_json, None, profile_id, payload.get('image_url', '')), + daemon=True, + name=f'auto-personalized-{sync_id}', + ).start() + return { + 'status': 'started', + 'playlist_name': name, + '_manages_own_progress': True, + } diff --git a/core/automation/handlers/playlist_pipeline.py b/core/automation/handlers/playlist_pipeline.py index 2ef18669..8fc98df2 100644 --- a/core/automation/handlers/playlist_pipeline.py +++ b/core/automation/handlers/playlist_pipeline.py @@ -28,12 +28,12 @@ import time from typing import Any, Dict from core.automation.deps import AutomationDeps +from core.automation.handlers._pipeline_shared import run_sync_and_wishlist from core.automation.handlers.refresh_mirrored import auto_refresh_mirrored from core.automation.handlers.sync_playlist import auto_sync_playlist # Per-playlist sync poll cap inside Phase 3. -_SYNC_PER_PLAYLIST_TIMEOUT_SECONDS = 600 # Discovery poll cap inside Phase 2. _DISCOVERY_TIMEOUT_SECONDS = 3600 @@ -165,124 +165,31 @@ def auto_playlist_pipeline(config: Dict[str, Any], deps: AutomationDeps) -> Dict log_type='success', ) - # ── PHASE 3: SYNC ───────────────────────────────────────────── - deps.update_progress( - automation_id, - progress=56, - phase='Phase 3/4: Syncing to server...', - log_line='Phase 3: Sync', - log_type='info', - ) - - total_synced = 0 - total_skipped = 0 - sync_errors = 0 - sync_states = deps.get_sync_states() - - for pl_idx, pl in enumerate(playlists): - pl_id = pl.get('id') - if not pl_id: - continue - - sync_config = { - 'playlist_id': str(pl_id), - '_automation_id': None, # Don't let sync handler hijack our progress. - } - sync_result = auto_sync_playlist(sync_config, deps) - sync_status = sync_result.get('status', '') - - if sync_status == 'started': - # Sync launched a background thread — wait for it. - sync_id = f"auto_mirror_{pl_id}" - sync_poll_start = time.time() - while time.time() - sync_poll_start < _SYNC_PER_PLAYLIST_TIMEOUT_SECONDS: - if (sync_id in sync_states - and sync_states[sync_id].get('status') - in ('finished', 'complete', 'error', 'failed')): - break - time.sleep(2) - elapsed = int(time.time() - sync_poll_start) - sub_progress = 56 + ((pl_idx + 1) / max(1, len(playlists))) * 29 - deps.update_progress( - automation_id, - progress=min(int(sub_progress), 84), - phase=f'Phase 3/4: Syncing "{pl.get("name", "")}" ({elapsed}s)', - ) - - # Check result. - ss = sync_states.get(sync_id, {}) - ss_result = ss.get('result', ss.get('progress', {})) - matched = ss_result.get('matched_tracks', 0) if isinstance(ss_result, dict) else 0 - total_synced += int(matched) if matched else 0 - deps.update_progress( - automation_id, - log_line=f'Synced "{pl.get("name", "")}": {matched} tracks matched', - log_type='success', - ) - - elif sync_status == 'skipped': - total_skipped += 1 - reason = sync_result.get('reason', 'unchanged') - deps.update_progress( - automation_id, - log_line=f'Skipped "{pl.get("name", "")}": {reason}', - log_type='skip', - ) - elif sync_status == 'error': - sync_errors += 1 - deps.update_progress( - automation_id, - log_line=f'Sync error "{pl.get("name", "")}": {sync_result.get("reason", "unknown")}', - log_type='error', - ) - - deps.update_progress( + # ── PHASE 3 + 4: SYNC + WISHLIST (delegated to shared helper) ── + # Each mirrored playlist payload only needs `id` + `name` for + # the helper; `auto_sync_playlist` reads the rest from the + # mirrored DB by id. + sync_summary = run_sync_and_wishlist( + deps, automation_id, - progress=85, - phase='Phase 3/4: Sync complete', - log_line=f'Phase 3 done: {total_synced} matched, {total_skipped} skipped, {sync_errors} errors', - log_type='success' if sync_errors == 0 else 'warning', + [pl for pl in playlists if pl.get('id')], + sync_one_fn=lambda pl: auto_sync_playlist( + {'playlist_id': str(pl['id']), '_automation_id': None}, + deps, + ), + sync_id_for_fn=lambda pl: f"auto_mirror_{pl['id']}", + skip_wishlist=skip_wishlist, + progress_start=56, + progress_end=85, + sync_phase_label='Phase 3/4: Syncing to server...', + sync_phase_start_log='Phase 3: Sync', + wishlist_phase_label='Phase 4/4: Processing wishlist...', + wishlist_phase_start_log='Phase 4: Wishlist', ) - - # ── PHASE 4: WISHLIST ───────────────────────────────────────── - wishlist_queued = 0 - if not skip_wishlist: - deps.update_progress( - automation_id, - progress=86, - phase='Phase 4/4: Processing wishlist...', - log_line='Phase 4: Wishlist', - log_type='info', - ) - - try: - if not deps.is_wishlist_actually_processing(): - deps.process_wishlist_automatically(automation_id=None) - deps.update_progress( - automation_id, - log_line='Wishlist processing triggered', - log_type='success', - ) - wishlist_queued = 1 - else: - deps.update_progress( - automation_id, - log_line='Wishlist already running — skipped', - log_type='skip', - ) - except Exception as e: - deps.update_progress( - automation_id, - log_line=f'Wishlist error: {e}', - log_type='warning', - ) - else: - deps.update_progress( - automation_id, - progress=86, - log_line='Phase 4: Wishlist skipped (disabled)', - log_type='skip', - ) + total_synced = sync_summary['synced'] + total_skipped = sync_summary['skipped'] + sync_errors = sync_summary['errors'] + wishlist_queued = sync_summary['wishlist_queued'] # ── COMPLETE ────────────────────────────────────────────────── duration = int(time.time() - pipeline_start) diff --git a/core/automation/handlers/registration.py b/core/automation/handlers/registration.py index de868bd9..94f23db6 100644 --- a/core/automation/handlers/registration.py +++ b/core/automation/handlers/registration.py @@ -15,6 +15,7 @@ from core.automation.handlers.refresh_mirrored import auto_refresh_mirrored from core.automation.handlers.sync_playlist import auto_sync_playlist from core.automation.handlers.discover_playlist import auto_discover_playlist from core.automation.handlers.playlist_pipeline import auto_playlist_pipeline +from core.automation.handlers.personalized_pipeline import auto_personalized_pipeline from core.automation.handlers.database_update import ( auto_start_database_update, auto_deep_scan_library, ) @@ -94,6 +95,14 @@ def register_all(deps: AutomationDeps) -> None: lambda config: auto_playlist_pipeline(config, deps), deps.state.is_pipeline_running, ) + # Personalized pipeline shares the pipeline_running flag with the + # mirrored pipeline so the two can't overlap (single sync queue, + # single wishlist worker). + engine.register_action_handler( + 'personalized_pipeline', + lambda config: auto_personalized_pipeline(config, deps), + deps.state.is_pipeline_running, + ) # Database update + deep scan share the db_update_state guard — # only one operation can mutate that state at a time. diff --git a/tests/automation/test_handler_registration.py b/tests/automation/test_handler_registration.py index e684bc3a..b6660541 100644 --- a/tests/automation/test_handler_registration.py +++ b/tests/automation/test_handler_registration.py @@ -38,6 +38,7 @@ EXPECTED_ACTION_NAMES = frozenset({ 'sync_playlist', 'discover_playlist', 'playlist_pipeline', + 'personalized_pipeline', 'start_database_update', 'deep_scan_library', 'run_duplicate_cleaner', @@ -60,6 +61,7 @@ EXPECTED_GUARDED_ACTIONS = frozenset({ 'scan_watchlist', 'scan_library', 'playlist_pipeline', + 'personalized_pipeline', 'start_database_update', 'deep_scan_library', 'run_duplicate_cleaner', @@ -156,6 +158,7 @@ def _build_deps(engine, scan_mgr=None) -> AutomationDeps: get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, init_automation_progress=lambda *a, **k: None, record_progress_history=lambda *a, **k: None, + build_personalized_manager=lambda: None, ) diff --git a/tests/automation/test_handlers_maintenance.py b/tests/automation/test_handlers_maintenance.py index a780d88a..a3844b77 100644 --- a/tests/automation/test_handlers_maintenance.py +++ b/tests/automation/test_handlers_maintenance.py @@ -105,6 +105,7 @@ def _build_deps(**overrides) -> AutomationDeps: get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, init_automation_progress=lambda *a, **k: None, record_progress_history=lambda *a, **k: None, + build_personalized_manager=lambda: None, ) defaults.update(overrides) return AutomationDeps(**defaults) # type: ignore[arg-type] diff --git a/tests/automation/test_handlers_personalized_pipeline.py b/tests/automation/test_handlers_personalized_pipeline.py new file mode 100644 index 00000000..c195d14d --- /dev/null +++ b/tests/automation/test_handlers_personalized_pipeline.py @@ -0,0 +1,372 @@ +"""Boundary tests for the personalized playlist pipeline handler. + +Pin every shape: empty kinds error, refresh_first behaviour, snapshot +load + sync dispatch, missing-tracks skip, exception swallowing, +pipeline_running flag cleanup, sync payload shape passed to +_run_sync_task.""" + +from __future__ import annotations + +import threading +from types import SimpleNamespace +from typing import Any, List + +import pytest + +from core.automation.deps import AutomationDeps, AutomationState +from core.automation.handlers.personalized_pipeline import ( + auto_personalized_pipeline, + _build_payloads_for_kinds, + _track_to_sync_shape, + _sync_personalized_playlist, +) + + +class _StubLogger: + def debug(self, *a, **k): pass + def info(self, *a, **k): pass + def warning(self, *a, **k): pass + def error(self, *a, **k): pass + + +def _build_deps(**overrides) -> AutomationDeps: + defaults = dict( + engine=object(), + state=AutomationState(), + config_manager=object(), + update_progress=lambda *a, **k: None, + logger=_StubLogger(), + get_database=lambda: object(), + spotify_client=None, + tidal_client=None, + web_scan_manager=None, + process_wishlist_automatically=lambda **k: None, + process_watchlist_scan_automatically=lambda **k: None, + is_wishlist_actually_processing=lambda: False, + is_watchlist_actually_scanning=lambda: False, + get_watchlist_scan_state=lambda: {}, + run_playlist_discovery_worker=lambda *a, **k: None, + run_sync_task=lambda *a, **k: None, + load_sync_status_file=lambda: {}, + get_deezer_client=lambda: None, + parse_youtube_playlist=lambda url: None, + get_sync_states=lambda: {}, + set_db_update_automation_id=lambda v: None, + get_db_update_state=lambda: {}, + db_update_lock=threading.Lock(), + db_update_executor=None, + run_db_update_task=lambda *a, **k: None, + run_deep_scan_task=lambda *a, **k: None, + get_duplicate_cleaner_state=lambda: {}, + duplicate_cleaner_lock=threading.Lock(), + duplicate_cleaner_executor=None, + run_duplicate_cleaner=lambda: None, + get_quality_scanner_state=lambda: {}, + quality_scanner_lock=threading.Lock(), + quality_scanner_executor=None, + run_quality_scanner=lambda *a, **k: None, + download_orchestrator=None, + run_async=lambda coro: None, + tasks_lock=threading.Lock(), + get_download_batches=lambda: {}, + get_download_tasks=lambda: {}, + sweep_empty_download_directories=lambda: 0, + get_staging_path=lambda: '/staging', + docker_resolve_path=lambda p: p, + get_current_profile_id=lambda: 1, + get_watchlist_scanner=lambda spc: None, + get_app=lambda: None, + get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, + init_automation_progress=lambda *a, **k: None, + record_progress_history=lambda *a, **k: None, + build_personalized_manager=lambda: None, + ) + defaults.update(overrides) + return AutomationDeps(**defaults) # type: ignore[arg-type] + + +# ─── Track shape converter ─────────────────────────────────────────── + + +class TestTrackToSyncShape: + def test_basic_shape(self): + track = SimpleNamespace( + track_name='Song', artist_name='Artist', album_name='Album', + spotify_track_id='sp-1', itunes_track_id=None, deezer_track_id=None, + duration_ms=200000, + ) + out = _track_to_sync_shape(track) + assert out == { + 'name': 'Song', + 'artists': [{'name': 'Artist'}], + 'album': {'name': 'Album'}, + 'duration_ms': 200000, + 'id': 'sp-1', + } + + def test_falls_back_through_source_ids(self): + t1 = SimpleNamespace(track_name='', artist_name='', album_name='', + spotify_track_id=None, itunes_track_id='it-1', + deezer_track_id=None, duration_ms=0) + assert _track_to_sync_shape(t1)['id'] == 'it-1' + + t2 = SimpleNamespace(track_name='', artist_name='', album_name='', + spotify_track_id=None, itunes_track_id=None, + deezer_track_id='dz-1', duration_ms=0) + assert _track_to_sync_shape(t2)['id'] == 'dz-1' + + def test_no_id_returns_empty_string(self): + t = SimpleNamespace(track_name='X', artist_name='Y', album_name='Z', + spotify_track_id=None, itunes_track_id=None, + deezer_track_id=None, duration_ms=0) + assert _track_to_sync_shape(t)['id'] == '' + + +# ─── Empty / config validation ────────────────────────────────────── + + +class TestEmptyConfig: + def test_no_kinds_returns_error_and_clears_flag(self): + deps = _build_deps() + deps.state.set_pipeline_running(True) # simulate already-running + result = auto_personalized_pipeline({}, deps) + assert result['status'] == 'error' + assert 'No personalized playlist' in result['error'] + assert deps.state.pipeline_running is False + + def test_empty_kinds_list_returns_error(self): + deps = _build_deps() + result = auto_personalized_pipeline({'kinds': []}, deps) + assert result['status'] == 'error' + assert deps.state.pipeline_running is False + + def test_non_list_kinds_returns_error(self): + deps = _build_deps() + result = auto_personalized_pipeline({'kinds': 'not_a_list'}, deps) + assert result['status'] == 'error' + + +# ─── Payload building ─────────────────────────────────────────────── + + +class _StubManagerNoTracks: + def ensure_playlist(self, kind, variant, profile_id): + return SimpleNamespace( + id=1, name=f'{kind}-{variant}', kind=kind, variant=variant, + ) + + def refresh_playlist(self, kind, variant, profile_id): + return self.ensure_playlist(kind, variant, profile_id) + + def get_playlist_tracks(self, playlist_id): + return [] + + +class _StubManagerWithTracks: + def __init__(self, tracks_per_kind=None): + self.tracks_per_kind = tracks_per_kind or {} + self.refresh_calls: List[tuple] = [] + self.ensure_calls: List[tuple] = [] + + def ensure_playlist(self, kind, variant, profile_id): + self.ensure_calls.append((kind, variant, profile_id)) + return SimpleNamespace( + id=hash((kind, variant)) % 10000, + name=f'{kind}-{variant or "S"}', kind=kind, variant=variant, + ) + + def refresh_playlist(self, kind, variant, profile_id): + self.refresh_calls.append((kind, variant, profile_id)) + # Mirror real manager: refresh returns a record without invoking + # the public ensure_playlist API path again. + return SimpleNamespace( + id=hash((kind, variant)) % 10000, + name=f'{kind}-{variant or "S"}', kind=kind, variant=variant, + ) + + def get_playlist_tracks(self, playlist_id): + # Return all tracks regardless of id — tests scope to one playlist at a time. + for tracks in self.tracks_per_kind.values(): + if tracks: + return [SimpleNamespace( + track_name=t['name'], artist_name=t.get('artist', 'A'), + album_name=t.get('album', 'Al'), + spotify_track_id=t.get('id'), + itunes_track_id=None, deezer_track_id=None, + duration_ms=200000, + ) for t in tracks] + return [] + + +class TestPayloadBuilding: + def test_skips_kinds_with_no_tracks(self): + deps = _build_deps() + manager = _StubManagerNoTracks() + payloads = _build_payloads_for_kinds( + deps, manager, + [{'kind': 'hidden_gems'}, {'kind': 'discovery_shuffle'}], + profile_id=1, automation_id=None, refresh_first=False, + ) + assert payloads == [] + + def test_skips_invalid_entries(self): + deps = _build_deps() + manager = _StubManagerNoTracks() + payloads = _build_payloads_for_kinds( + deps, manager, + ['not-a-dict', {}, {'variant': 'no-kind'}], # all invalid + profile_id=1, automation_id=None, refresh_first=False, + ) + assert payloads == [] + + def test_refresh_first_calls_refresh(self): + deps = _build_deps() + manager = _StubManagerWithTracks( + tracks_per_kind={'hidden_gems': [{'name': 'T', 'id': 'sp-1'}]}, + ) + _build_payloads_for_kinds( + deps, manager, + [{'kind': 'hidden_gems'}], + profile_id=1, automation_id=None, refresh_first=True, + ) + assert manager.refresh_calls == [('hidden_gems', '', 1)] + assert manager.ensure_calls == [] + + def test_no_refresh_calls_ensure(self): + deps = _build_deps() + manager = _StubManagerWithTracks( + tracks_per_kind={'hidden_gems': [{'name': 'T', 'id': 'sp-1'}]}, + ) + _build_payloads_for_kinds( + deps, manager, + [{'kind': 'hidden_gems'}], + profile_id=1, automation_id=None, refresh_first=False, + ) + assert manager.ensure_calls == [('hidden_gems', '', 1)] + assert manager.refresh_calls == [] + + def test_payload_shape(self): + deps = _build_deps() + manager = _StubManagerWithTracks( + tracks_per_kind={'hidden_gems': [ + {'name': 'Track1', 'id': 'sp-1'}, + {'name': 'Track2', 'id': 'sp-2'}, + ]}, + ) + payloads = _build_payloads_for_kinds( + deps, manager, + [{'kind': 'hidden_gems'}], + profile_id=1, automation_id=None, refresh_first=False, + ) + assert len(payloads) == 1 + p = payloads[0] + assert p['kind'] == 'hidden_gems' + assert p['variant'] == '' + assert p['name'] == 'hidden_gems-S' + assert p['sync_id'].startswith('auto_personalized_hidden_gems_') + assert len(p['tracks_json']) == 2 + assert p['tracks_json'][0]['id'] == 'sp-1' + + def test_manager_exception_swallowed_continues_to_next(self): + deps = _build_deps() + + class _ExplodingMgr: + def __init__(self): + self.calls = [] + def ensure_playlist(self, kind, variant, profile_id): + self.calls.append(kind) + if kind == 'broken': + raise RuntimeError('manager boom') + return SimpleNamespace(id=1, name=kind, kind=kind, variant=variant) + def get_playlist_tracks(self, _id): + return [] + + mgr = _ExplodingMgr() + # broken raises, hidden_gems proceeds (just no tracks). + payloads = _build_payloads_for_kinds( + deps, mgr, + [{'kind': 'broken'}, {'kind': 'hidden_gems'}], + profile_id=1, automation_id=None, refresh_first=False, + ) + assert mgr.calls == ['broken', 'hidden_gems'] + assert payloads == [] # neither produced tracks + + +# ─── Sync launch ──────────────────────────────────────────────────── + + +class TestSyncLaunch: + def test_sync_one_playlist_starts_thread(self): + captured: List[tuple] = [] + + def fake_run_sync_task(*args): + captured.append(args) + + deps = _build_deps( + run_sync_task=fake_run_sync_task, + get_current_profile_id=lambda: 7, + ) + payload = { + 'sync_id': 'auto_personalized_hidden_gems_', + 'name': 'Hidden Gems', + 'tracks_json': [{'name': 'X', 'id': 'sp-1'}], + 'image_url': '', + } + result = _sync_personalized_playlist(deps, payload) + assert result['status'] == 'started' + # Wait for thread to invoke fake_run_sync_task. + for _ in range(100): + if captured: + break + import time + time.sleep(0.01) + assert len(captured) == 1 + # Args: (sync_id, name, tracks_json, automation_id, profile_id, image_url) + assert captured[0][0] == 'auto_personalized_hidden_gems_' + assert captured[0][1] == 'Hidden Gems' + assert captured[0][3] is None # automation_id muted + assert captured[0][4] == 7 # profile_id + + +# ─── Full pipeline (with stubbed manager + sync states) ───────────── + + +class TestPipelineHappyPath: + def test_pipeline_completes_with_synced_count(self): + # Stub manager returns one playlist with 2 tracks. + manager = _StubManagerWithTracks( + tracks_per_kind={'hidden_gems': [ + {'name': 'A', 'id': 'sp-1'}, + {'name': 'B', 'id': 'sp-2'}, + ]}, + ) + + # sync_states populated as if the sync background task finished. + sync_states_storage = {} + + def fake_run_sync(sync_id, name, tracks, aid, pid, img): + sync_states_storage[sync_id] = { + 'status': 'finished', + 'result': {'matched_tracks': 2}, + } + + deps = _build_deps( + build_personalized_manager=lambda: manager, + run_sync_task=fake_run_sync, + get_sync_states=lambda: sync_states_storage, + ) + # Patch time.sleep in shared helper so test doesn't take 2s per iter. + import core.automation.handlers._pipeline_shared as shared + orig = shared.time.sleep + shared.time.sleep = lambda _: None + try: + result = auto_personalized_pipeline( + {'_automation_id': 'auto-1', 'kinds': [{'kind': 'hidden_gems'}]}, + deps, + ) + finally: + shared.time.sleep = orig + assert result['status'] == 'completed' + assert result['_manages_own_progress'] is True + # Pipeline-running flag cleaned up. + assert deps.state.pipeline_running is False diff --git a/tests/automation/test_handlers_playlist.py b/tests/automation/test_handlers_playlist.py index a017c592..1a1548ef 100644 --- a/tests/automation/test_handlers_playlist.py +++ b/tests/automation/test_handlers_playlist.py @@ -122,6 +122,7 @@ def _build_deps(**overrides) -> AutomationDeps: get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, init_automation_progress=lambda *a, **k: None, record_progress_history=lambda *a, **k: None, + build_personalized_manager=lambda: None, ) defaults.update(overrides) return AutomationDeps(**defaults) # type: ignore[arg-type] diff --git a/tests/automation/test_handlers_simple.py b/tests/automation/test_handlers_simple.py index 21e6e9d2..ac8b2ef4 100644 --- a/tests/automation/test_handlers_simple.py +++ b/tests/automation/test_handlers_simple.py @@ -92,6 +92,7 @@ def _build_deps(**overrides: Any) -> AutomationDeps: get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, init_automation_progress=lambda *a, **k: None, record_progress_history=lambda *a, **k: None, + build_personalized_manager=lambda: None, ) defaults.update(overrides) return AutomationDeps(**defaults) # type: ignore[arg-type] diff --git a/tests/automation/test_progress_callbacks.py b/tests/automation/test_progress_callbacks.py index de844f62..10456bfd 100644 --- a/tests/automation/test_progress_callbacks.py +++ b/tests/automation/test_progress_callbacks.py @@ -78,6 +78,7 @@ def _build_deps(**overrides) -> AutomationDeps: get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}}, init_automation_progress=lambda *a, **k: None, record_progress_history=lambda *a, **k: None, + build_personalized_manager=lambda: None, ) defaults.update(overrides) return AutomationDeps(**defaults) # type: ignore[arg-type] diff --git a/web_server.py b/web_server.py index 7bc3ec5c..86269fa0 100644 --- a/web_server.py +++ b/web_server.py @@ -992,6 +992,7 @@ def _register_automation_handlers(): get_beatport_data_cache=lambda: beatport_data_cache, init_automation_progress=_init_automation_progress, record_progress_history=_auto_progress.record_history, + build_personalized_manager=_build_personalized_manager, ) _register_extracted_handlers(_automation_deps) diff --git a/webui/static/helper.js b/webui/static/helper.js index 8eb59c13..e04d8333 100644 --- a/webui/static/helper.js +++ b/webui/static/helper.js @@ -3416,6 +3416,7 @@ const WHATS_NEW = { '2.5.2': [ // --- May 13, 2026 — 2.5.2 release --- { date: 'May 13, 2026 — 2.5.2 release' }, + { title: 'Personalized Playlist Pipeline: Auto-Sync Discover-Page Playlists', desc: 'follow-up to the personalized-playlists standardization PR. new automation action `personalized_pipeline` syncs your selected discover-page playlists (Hidden Gems, Time Machine per-decade, Fresh Tape, The Archives, Seasonal Mix per-season, etc.) to your active media server + queues missing tracks for download — same pattern as the existing mirrored playlist pipeline but two phases instead of four (no REFRESH or DISCOVER needed since manager-backed snapshots are already metadata-matched). config: pick which kinds+variants to include, optional `refresh_first` to regenerate snapshots before syncing, optional `skip_wishlist`. shares the pipeline_running guard with the mirrored pipeline so the two can\'t overlap (one sync queue, one wishlist worker). lifted PHASE 3 (SYNC loop) + PHASE 4 (WISHLIST tail) of the mirrored pipeline into shared `core/automation/handlers/_pipeline_shared.run_sync_and_wishlist` so both pipelines reuse the same sync-state polling / progress emission / wishlist trigger logic — 0 duplication. trigger UI block declared at `core/automation/blocks.py` (full multi-select picker UI is its own follow-up). 14 new boundary tests pin: track→sync_shape conversion + source ID fallback, empty-kinds error, payload building skips no-tracks playlists, refresh_first vs ensure dispatch, manager exception swallowed continues to next kind, full pipeline happy-path with stubbed sync_states. 3383 tests pass total. now usable via API today, polished UI dropping next.', page: 'discover' }, { title: 'Personalized Playlists Standardization', desc: 'all 8 personalized / discover-page playlists (Hidden Gems, Discovery Shuffle, Popular Picks, Time Machine per-decade, Genre playlists per-genre, Daily Mixes, Fresh Tape, The Archives, Seasonal Mix per-season) now share one unified storage layer. pre-overhaul: Group A (Fresh Tape / Archives / Seasonal Mix) lived in one shape, Group B (everything else) was computed-on-demand with no persistence — every page-load re-rolled the dice and tracks rotated under your feet. post-overhaul: every playlist has a stable identity, persistent track snapshot, explicit refresh button, and per-playlist tweakable config (limit, diversity caps, popularity bounds, recency window, exclude-recent-days staleness window). prerequisite for the playlist pipeline integration coming in the next PR (sync these to your media server + send missing tracks to wishlist on a timer). fixed a stub: Daily Mixes used to promise 50% library + 50% discovery but the library half always returned [] (tracks table has no source IDs to sync) — now honestly discovery-only so it actually works. also: each kind\'s body lifted into its own module under `core/personalized/generators/`, behavior preserved verbatim from the legacy `PersonalizedPlaylistsService` and `SeasonalDiscoveryService`. new REST endpoints under `/api/personalized/*`. 134 boundary tests cover every kind + the manager + the API + staleness filter; full suite at 3369 tests.', page: 'discover' }, { title: 'Dashboard Activity Feed: Stop Showing "NaNmo ago"', desc: 'recent activity items on the dashboard all rendered "NaNmo ago" because the formatter was parsing `activity.time` (a human label like "Now") as a date. backend has always emitted `activity.timestamp` (Unix epoch seconds) alongside the label — frontend now uses that for relative-time formatting. falls back to the literal label only when no timestamp present (legacy items / future shapes).', page: 'home' }, { title: 'Token Leak Round 2: URL-Encoded Form In Artist Endpoint + Playlist Sync', desc: 'security follow-up to the prior token-leak fix. found three sites in `web_server.py` (artist endpoint) that logged the full `image_url` and the entire artist_info dict at INFO on every artist-page render — the dict contained the `image_url` field routed through the image proxy (`/api/image-proxy?url=`), URL-encoding the X-Plex-Token / X-Emby-Token / Subsonic auth straight into the log line. also one site in `core/discovery/sync.py` logged the playlist poster URL during sync. fixes: dropped the three artist-endpoint dev-time debug log lines entirely (before-fix, after-fix, "Final artist data being sent"). playlist-image log now logs `has_image=True/False`, not the URL. strengthened `_redact_url_secrets` with a second regex pattern that matches the URL-encoded form (`%3FX-Plex-Token%3D...`) so any future log-through-redactor catches both plain and encoded shapes. wipe your existing app.log if it captured tokens in either form, and rotate Plex / Jellyfin / Navidrome credentials.', page: 'settings' },