Personalized playlist pipeline: auto-sync discover-page playlists

Follow-up to the personalized-playlists standardization PR. New
`personalized_pipeline` automation action syncs selected discover-
page playlists (Hidden Gems / Discovery Shuffle / Time Machine /
Genre / Daily Mix / Fresh Tape / The Archives / Seasonal Mix) to
the active media server + queues missing tracks for download.

Same pattern as the existing mirrored `playlist_pipeline` but two
phases instead of four — no REFRESH (no external source to re-pull)
and no DISCOVER (manager-backed snapshots are already metadata-
matched). Pipeline shape:

    SNAPSHOT → SYNC → WISHLIST

Where SNAPSHOT either reads the persisted track list from
`PersonalizedPlaylistManager` (default) or refreshes it first when
`refresh_first=true` (cron use case: regenerate Hidden Gems nightly
and sync the fresh set).

Shared helper extraction:

PHASE 3 (SYNC loop) + PHASE 4 (WISHLIST tail) lifted out of mirrored
`playlist_pipeline` into `core/automation/handlers/_pipeline_shared.py`
as `run_sync_and_wishlist(deps, automation_id, playlists, sync_one_fn,
sync_id_for_fn, ...)`. Both pipelines call it. Mirrored injects
`auto_sync_playlist` as the per-playlist sync function; personalized
injects a thin wrapper that launches `_run_sync_task` directly with
a pre-built tracks_json. Same sync-state polling / progress emission
/ status counting / wishlist trigger logic — 0 duplication.

Files added:
- core/automation/handlers/_pipeline_shared.py
- core/automation/handlers/personalized_pipeline.py
- tests/automation/test_handlers_personalized_pipeline.py

Files changed:
- core/automation/handlers/playlist_pipeline.py: PHASE 3+4 replaced
  with shared helper call (~100 lines deleted, 1 helper invocation
  added; behavior identical).
- core/automation/deps.py: new `build_personalized_manager` field
  (lazy builder so the pipeline gets a fresh PersonalizedPlaylistManager
  per run).
- core/automation/handlers/__init__.py + registration.py: register
  `personalized_pipeline` action with the shared `pipeline_running`
  guard so it can't overlap mirrored.
- core/automation/blocks.py: new `personalized_pipeline` block
  declaration with config_fields (kinds multi-select, refresh_first,
  skip_wishlist).
- web_server.py: thread `_build_personalized_manager` into
  AutomationDeps construction.
- All 5 automation test fixtures: `_build_deps` adds
  `build_personalized_manager=lambda: None` stub.
- tests/automation/test_handler_registration.py:
  EXPECTED_ACTION_NAMES + EXPECTED_GUARDED_ACTIONS gain
  `personalized_pipeline`.

Trigger schema:

    {
      "_automation_id": "...",
      "kinds": [
        {"kind": "hidden_gems"},
        {"kind": "time_machine", "variant": "1980s"},
        {"kind": "seasonal_mix", "variant": "halloween"}
      ],
      "refresh_first": false,
      "skip_wishlist": false
    }

Tests (14 new, 178 automation total):
- _track_to_sync_shape: basic shape, source ID fallback chain,
  no-id returns empty string
- empty config / non-list kinds / empty kinds list all return
  error + clear pipeline_running flag
- _build_payloads_for_kinds: skips invalid entries, skips kinds
  with no tracks, refresh_first vs ensure dispatch, payload shape
  + sync_id format, manager exception swallowed continues
- _sync_personalized_playlist: launches background thread + returns
  status='started'
- happy path: stubbed sync_states drives helper to completion, flag
  cleaned up

Full suite: 3383 passed.

Note: the trigger UI block declares config_fields but the frontend
doesn't yet render the `personalized_playlist_select` multi-select
type — usable today via API; polished UI ships in a follow-up
frontend PR.
pull/614/head
Broque Thomas 1 week ago
parent fe6f196cac
commit cc44254bf9

@ -146,6 +146,15 @@ ACTIONS: list[dict] = [
{"key": "all", "type": "checkbox", "label": "Process all mirrored playlists", "default": False},
{"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False},
]},
{"type": "personalized_pipeline", "label": "Personalized Playlist Pipeline", "icon": "sparkles",
"description": "Sync personalized / discover-page playlists (Hidden Gems, Time Machine, Fresh Tape, etc.) to your media server + queue missing tracks for download.",
"available": True,
"config_fields": [
{"key": "kinds", "type": "personalized_playlist_select", "label": "Playlists to sync",
"description": "Multi-select: Hidden Gems, Discovery Shuffle, Time Machine (per decade), Genre playlists, Fresh Tape, The Archives, Seasonal Mix (per season)"},
{"key": "refresh_first", "type": "checkbox", "label": "Refresh playlists before sync (regenerate snapshots)", "default": False},
{"key": "skip_wishlist", "type": "checkbox", "label": "Skip wishlist processing", "default": False},
]},
{"type": "notify_only", "label": "Notify Only", "icon": "bell", "description": "No action — just send notification", "available": True},
# Phase 3 actions
{"type": "start_database_update", "label": "Update Database", "icon": "database",

@ -138,3 +138,9 @@ class AutomationDeps:
# the engine's progress callback hooks). ---
init_automation_progress: Callable[..., Any]
record_progress_history: Callable[..., Any]
# --- Personalized playlist pipeline ---
# Lazy builder so the pipeline handler can construct a fresh
# PersonalizedPlaylistManager per run (cheap accessors inside,
# no caching needed yet).
build_personalized_manager: Callable[[], Any]

@ -17,6 +17,7 @@ from core.automation.handlers.refresh_mirrored import auto_refresh_mirrored
from core.automation.handlers.sync_playlist import auto_sync_playlist
from core.automation.handlers.discover_playlist import auto_discover_playlist
from core.automation.handlers.playlist_pipeline import auto_playlist_pipeline
from core.automation.handlers.personalized_pipeline import auto_personalized_pipeline
from core.automation.handlers.database_update import auto_start_database_update, auto_deep_scan_library
from core.automation.handlers.duplicate_cleaner import auto_run_duplicate_cleaner
from core.automation.handlers.quality_scanner import auto_start_quality_scan
@ -44,6 +45,7 @@ __all__ = [
'auto_sync_playlist',
'auto_discover_playlist',
'auto_playlist_pipeline',
'auto_personalized_pipeline',
'auto_start_database_update',
'auto_deep_scan_library',
'auto_run_duplicate_cleaner',

@ -0,0 +1,201 @@
"""Shared helpers between mirrored + personalized playlist pipelines.
Both pipelines end in the same shape:
1. SYNC each playlist to the active media server.
2. WISHLIST: trigger the wishlist processor for missing tracks.
The differing prefix (mirrored = REFRESH external sources + DISCOVER
metadata; personalized = SNAPSHOT manager-backed playlists) is owned
by each pipeline. This module owns the SYNC + WISHLIST tail so both
pipelines stay consistent + DRY.
"""
from __future__ import annotations
import time
from typing import Any, Callable, Dict, List, Optional
from core.automation.deps import AutomationDeps
# Per-playlist sync poll cap (mirrored side already used this).
_SYNC_PER_PLAYLIST_TIMEOUT_SECONDS = 600
# Sync-status final-state markers.
_SYNC_TERMINAL_STATUSES = ('finished', 'complete', 'error', 'failed')
def run_sync_and_wishlist(
deps: AutomationDeps,
automation_id: Optional[str],
playlists: List[Dict[str, Any]],
*,
sync_one_fn: Callable[[Dict[str, Any]], Dict[str, Any]],
sync_id_for_fn: Callable[[Dict[str, Any]], str],
skip_wishlist: bool = False,
progress_start: int = 56,
progress_end: int = 85,
sync_phase_label: str = 'Phase: Syncing to server...',
sync_phase_start_log: str = 'Sync',
wishlist_phase_label: str = 'Phase: Processing wishlist...',
wishlist_phase_start_log: str = 'Wishlist',
) -> Dict[str, int]:
"""Run the SYNC + WISHLIST tail of a playlist pipeline.
The caller supplies:
- ``playlists``: list of playlist payload dicts. Each must have at
least a ``name`` key (used in progress logs). The shape beyond
``name`` is opaque to the helper ``sync_one_fn`` receives the
payload and returns a sync_result dict.
- ``sync_one_fn(payload) -> sync_result``: launches sync for one
playlist. Result dict must carry ``status`` ``('started',
'skipped', 'error')`` and may carry ``reason``.
- ``sync_id_for_fn(payload) -> str``: returns the sync-state key
the helper polls on (so we can wait for the background sync
thread to complete + read the matched_tracks count).
Returns ``{'synced': int, 'skipped': int, 'errors': int,
'wishlist_queued': int}`` so the caller can stitch it into its
final status.
"""
deps.update_progress(
automation_id,
progress=progress_start,
phase=sync_phase_label,
log_line=sync_phase_start_log,
log_type='info',
)
total_synced = 0
total_skipped = 0
sync_errors = 0
sync_states = deps.get_sync_states()
n_playlists = max(1, len(playlists))
progress_span = max(1, progress_end - progress_start - 1)
for pl_idx, pl in enumerate(playlists):
pl_name = pl.get('name', '')
sync_result = sync_one_fn(pl)
sync_status = sync_result.get('status', '')
if sync_status == 'started':
sync_id = sync_id_for_fn(pl)
sync_poll_start = time.time()
while time.time() - sync_poll_start < _SYNC_PER_PLAYLIST_TIMEOUT_SECONDS:
if (sync_id in sync_states
and sync_states[sync_id].get('status') in _SYNC_TERMINAL_STATUSES):
break
time.sleep(2)
elapsed = int(time.time() - sync_poll_start)
sub_progress = progress_start + 1 + ((pl_idx + 1) / n_playlists) * progress_span
deps.update_progress(
automation_id,
progress=min(int(sub_progress), progress_end - 1),
phase=f'{sync_phase_label.rstrip(".")}"{pl_name}" ({elapsed}s)',
)
ss = sync_states.get(sync_id, {})
ss_result = ss.get('result', ss.get('progress', {}))
matched = ss_result.get('matched_tracks', 0) if isinstance(ss_result, dict) else 0
total_synced += int(matched) if matched else 0
deps.update_progress(
automation_id,
log_line=f'Synced "{pl_name}": {matched} tracks matched',
log_type='success',
)
elif sync_status == 'skipped':
total_skipped += 1
reason = sync_result.get('reason', 'unchanged')
deps.update_progress(
automation_id,
log_line=f'Skipped "{pl_name}": {reason}',
log_type='skip',
)
elif sync_status == 'error':
sync_errors += 1
deps.update_progress(
automation_id,
log_line=f'Sync error "{pl_name}": {sync_result.get("reason", "unknown")}',
log_type='error',
)
deps.update_progress(
automation_id,
progress=progress_end,
phase=f'{sync_phase_label.rstrip(".")} complete',
log_line=f'Sync done: {total_synced} matched, {total_skipped} skipped, {sync_errors} errors',
log_type='success' if sync_errors == 0 else 'warning',
)
wishlist_queued = run_wishlist_phase(
deps, automation_id,
skip=skip_wishlist,
progress_pct=progress_end + 1,
wishlist_phase_label=wishlist_phase_label,
wishlist_phase_start_log=wishlist_phase_start_log,
)
return {
'synced': total_synced,
'skipped': total_skipped,
'errors': sync_errors,
'wishlist_queued': wishlist_queued,
}
def run_wishlist_phase(
deps: AutomationDeps,
automation_id: Optional[str],
*,
skip: bool,
progress_pct: int,
wishlist_phase_label: str = 'Phase: Processing wishlist...',
wishlist_phase_start_log: str = 'Wishlist',
) -> int:
"""Trigger the wishlist processor unless skipped or already running.
Returns 1 when the processor was triggered, 0 otherwise. Errors are
logged but never raised wishlist failure should not abort the
pipeline."""
if skip:
deps.update_progress(
automation_id,
progress=progress_pct,
log_line=f'{wishlist_phase_start_log}: skipped (disabled)',
log_type='skip',
)
return 0
deps.update_progress(
automation_id,
progress=progress_pct,
phase=wishlist_phase_label,
log_line=wishlist_phase_start_log,
log_type='info',
)
try:
if not deps.is_wishlist_actually_processing():
deps.process_wishlist_automatically(automation_id=None)
deps.update_progress(
automation_id,
log_line='Wishlist processing triggered',
log_type='success',
)
return 1
deps.update_progress(
automation_id,
log_line='Wishlist already running — skipped',
log_type='skip',
)
return 0
except Exception as e: # noqa: BLE001 — wishlist failure must never abort pipeline
deps.update_progress(
automation_id,
log_line=f'Wishlist error: {e}',
log_type='warning',
)
return 0
__all__ = ['run_sync_and_wishlist', 'run_wishlist_phase']

@ -0,0 +1,269 @@
"""Personalized Playlist Pipeline automation handler.
Sibling to ``auto_playlist_pipeline`` (mirrored). Where the mirrored
pipeline runs REFRESH external sources DISCOVER metadata SYNC
WISHLIST, the personalized pipeline is simpler:
SNAPSHOT SYNC WISHLIST
SNAPSHOT reads the persisted track list from
``PersonalizedPlaylistManager``. When ``refresh_first=True`` (config),
each playlist is refreshed BEFORE syncing useful when the user
wants the cron to capture a fresh-each-run view (e.g. "give me a new
Hidden Gems set every night"). Default is to sync the existing
snapshot, on the assumption the user / a separate cron has already
refreshed when they wanted to.
Config schema:
{
'kinds': [
{'kind': 'hidden_gems'},
{'kind': 'time_machine', 'variant': '1980s'},
{'kind': 'seasonal_mix', 'variant': 'halloween'},
...
],
'refresh_first': bool, # default false
'skip_wishlist': bool, # default false
}
Each kind dict has at minimum ``kind``; ``variant`` is required for
kinds that need it (time_machine, genre_playlist, daily_mix,
seasonal_mix). Singleton kinds (hidden_gems, discovery_shuffle,
popular_picks, fresh_tape, archives) ignore variant.
Pipeline-running flag (``deps.state.pipeline_running``) is shared
with the mirrored pipeline so the two can't overlap. (One sync
queue, one wishlist worker overlapping triggers would step on
each other.)"""
from __future__ import annotations
import threading
import time
from typing import Any, Dict, List, Optional
from core.automation.deps import AutomationDeps
from core.automation.handlers._pipeline_shared import run_sync_and_wishlist
# Sync state key prefix so personalized syncs don't collide with
# mirrored ones (`auto_mirror_<id>`).
_SYNC_ID_PREFIX = 'auto_personalized'
def auto_personalized_pipeline(config: Dict[str, Any], deps: AutomationDeps) -> Dict[str, Any]:
"""Run SNAPSHOT → SYNC → WISHLIST for selected personalized playlists."""
deps.state.set_pipeline_running(True)
automation_id = config.get('_automation_id')
pipeline_start = time.time()
try:
kinds_config = config.get('kinds') or []
if not isinstance(kinds_config, list) or not kinds_config:
deps.state.set_pipeline_running(False)
return {
'status': 'error',
'error': 'No personalized playlist kinds selected',
}
refresh_first = bool(config.get('refresh_first', False))
skip_wishlist = bool(config.get('skip_wishlist', False))
manager = deps.build_personalized_manager()
deps.update_progress(
automation_id,
progress=2,
phase=f'Personalized pipeline: {len(kinds_config)} playlist(s)',
log_line=f'Starting pipeline for {len(kinds_config)} playlist(s)',
log_type='info',
)
# ── PHASE 1: SNAPSHOT (optionally refresh) ──────────────────
deps.update_progress(
automation_id,
progress=3,
phase='Phase 1/2: Loading snapshots...' if not refresh_first
else 'Phase 1/2: Refreshing snapshots...',
log_line='Phase 1: Snapshot' + (' (with refresh)' if refresh_first else ''),
log_type='info',
)
profile_id = deps.get_current_profile_id()
playload_payloads = _build_payloads_for_kinds(
deps, manager, kinds_config, profile_id,
automation_id=automation_id,
refresh_first=refresh_first,
)
if not playload_payloads:
deps.state.set_pipeline_running(False)
deps.update_progress(
automation_id,
status='finished', progress=100,
phase='No playlists to sync',
log_line='No personalized playlists had tracks to sync',
log_type='warning',
)
return {
'status': 'completed',
'_manages_own_progress': True,
'playlists_synced': '0',
'tracks_synced': '0',
'duration_seconds': str(int(time.time() - pipeline_start)),
}
deps.update_progress(
automation_id,
progress=50,
phase='Phase 1/2: Snapshot complete',
log_line=f'Phase 1 done: {len(playload_payloads)} playlist(s) ready to sync',
log_type='success',
)
# ── PHASE 2: SYNC + WISHLIST (shared helper) ────────────────
sync_summary = run_sync_and_wishlist(
deps,
automation_id,
playload_payloads,
sync_one_fn=lambda pl: _sync_personalized_playlist(deps, pl),
sync_id_for_fn=lambda pl: pl['sync_id'],
skip_wishlist=skip_wishlist,
progress_start=51,
progress_end=90,
sync_phase_label='Phase 2/2: Syncing to server...',
sync_phase_start_log='Phase 2: Sync',
wishlist_phase_label='Phase 2/2: Processing wishlist...',
wishlist_phase_start_log='Wishlist',
)
# ── COMPLETE ────────────────────────────────────────────────
duration = int(time.time() - pipeline_start)
deps.update_progress(
automation_id,
status='finished', progress=100,
phase='Pipeline complete',
log_line=f'Personalized pipeline finished in {duration // 60}m {duration % 60}s',
log_type='success',
)
deps.state.set_pipeline_running(False)
return {
'status': 'completed',
'_manages_own_progress': True,
'playlists_synced': str(len(playload_payloads)),
'tracks_synced': str(sync_summary['synced']),
'sync_skipped': str(sync_summary['skipped']),
'wishlist_queued': str(sync_summary['wishlist_queued']),
'duration_seconds': str(duration),
}
except Exception as e: # noqa: BLE001 — automation handlers must never raise into engine
deps.state.set_pipeline_running(False)
deps.update_progress(
automation_id,
status='error', progress=100,
phase='Pipeline error',
log_line=f'Personalized pipeline failed: {e}',
log_type='error',
)
return {'status': 'error', 'error': str(e), '_manages_own_progress': True}
def _build_payloads_for_kinds(
deps: AutomationDeps,
manager: Any,
kinds_config: List[Dict[str, Any]],
profile_id: int,
*,
automation_id: Optional[str],
refresh_first: bool,
) -> List[Dict[str, Any]]:
"""Resolve each requested kind+variant into a sync-payload dict.
Each payload has: ``{'name', 'kind', 'variant', 'tracks_json',
'image_url', 'sync_id'}``. Playlists with no tracks (e.g. a
seasonal mix that hasn't been populated yet) are omitted from
the result so the sync loop doesn't waste time on empty pushes.
"""
payloads: List[Dict[str, Any]] = []
for entry in kinds_config:
if not isinstance(entry, dict):
continue
kind = entry.get('kind')
variant = entry.get('variant') or ''
if not kind:
continue
try:
if refresh_first:
record = manager.refresh_playlist(kind, variant, profile_id)
else:
record = manager.ensure_playlist(kind, variant, profile_id)
except Exception as exc: # noqa: BLE001 — log + continue with next kind
deps.update_progress(
automation_id,
log_line=f'Skipping {kind}{("/" + variant) if variant else ""}: {exc}',
log_type='warning',
)
continue
tracks = manager.get_playlist_tracks(record.id)
if not tracks:
deps.update_progress(
automation_id,
log_line=f'No tracks in {record.name} — skipping sync',
log_type='skip',
)
continue
tracks_json = [_track_to_sync_shape(t) for t in tracks]
payloads.append({
'name': record.name,
'kind': record.kind,
'variant': record.variant,
'tracks_json': tracks_json,
'image_url': '', # personalized playlists don't have a cover image yet
'sync_id': f'{_SYNC_ID_PREFIX}_{record.kind}_{record.variant or "_"}',
})
return payloads
def _track_to_sync_shape(track: Any) -> Dict[str, Any]:
"""Convert a personalized.types.Track into the dict shape
`_run_sync_task` expects. Mirrors what the mirrored pipeline
builds from extra_data.matched_data name/artists/album/duration/id."""
primary_id = track.spotify_track_id or track.itunes_track_id or track.deezer_track_id or ''
artists = [{'name': track.artist_name}]
return {
'name': track.track_name,
'artists': artists,
'album': {'name': track.album_name or ''},
'duration_ms': int(track.duration_ms or 0),
'id': primary_id,
}
def _sync_personalized_playlist(deps: AutomationDeps, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Launch a personalized playlist sync via _run_sync_task on a
daemon thread + return immediately with status='started'.
Mirrors the mirrored ``auto_sync_playlist`` return contract so the
shared helper can poll on ``sync_states[sync_id]`` and aggregate
results identically."""
sync_id = payload['sync_id']
name = payload['name']
tracks_json = payload['tracks_json']
profile_id = deps.get_current_profile_id()
threading.Thread(
target=deps.run_sync_task,
args=(sync_id, name, tracks_json, None, profile_id, payload.get('image_url', '')),
daemon=True,
name=f'auto-personalized-{sync_id}',
).start()
return {
'status': 'started',
'playlist_name': name,
'_manages_own_progress': True,
}

@ -28,12 +28,12 @@ import time
from typing import Any, Dict
from core.automation.deps import AutomationDeps
from core.automation.handlers._pipeline_shared import run_sync_and_wishlist
from core.automation.handlers.refresh_mirrored import auto_refresh_mirrored
from core.automation.handlers.sync_playlist import auto_sync_playlist
# Per-playlist sync poll cap inside Phase 3.
_SYNC_PER_PLAYLIST_TIMEOUT_SECONDS = 600
# Discovery poll cap inside Phase 2.
_DISCOVERY_TIMEOUT_SECONDS = 3600
@ -165,124 +165,31 @@ def auto_playlist_pipeline(config: Dict[str, Any], deps: AutomationDeps) -> Dict
log_type='success',
)
# ── PHASE 3: SYNC ─────────────────────────────────────────────
deps.update_progress(
automation_id,
progress=56,
phase='Phase 3/4: Syncing to server...',
log_line='Phase 3: Sync',
log_type='info',
)
total_synced = 0
total_skipped = 0
sync_errors = 0
sync_states = deps.get_sync_states()
for pl_idx, pl in enumerate(playlists):
pl_id = pl.get('id')
if not pl_id:
continue
sync_config = {
'playlist_id': str(pl_id),
'_automation_id': None, # Don't let sync handler hijack our progress.
}
sync_result = auto_sync_playlist(sync_config, deps)
sync_status = sync_result.get('status', '')
if sync_status == 'started':
# Sync launched a background thread — wait for it.
sync_id = f"auto_mirror_{pl_id}"
sync_poll_start = time.time()
while time.time() - sync_poll_start < _SYNC_PER_PLAYLIST_TIMEOUT_SECONDS:
if (sync_id in sync_states
and sync_states[sync_id].get('status')
in ('finished', 'complete', 'error', 'failed')):
break
time.sleep(2)
elapsed = int(time.time() - sync_poll_start)
sub_progress = 56 + ((pl_idx + 1) / max(1, len(playlists))) * 29
deps.update_progress(
automation_id,
progress=min(int(sub_progress), 84),
phase=f'Phase 3/4: Syncing "{pl.get("name", "")}" ({elapsed}s)',
)
# Check result.
ss = sync_states.get(sync_id, {})
ss_result = ss.get('result', ss.get('progress', {}))
matched = ss_result.get('matched_tracks', 0) if isinstance(ss_result, dict) else 0
total_synced += int(matched) if matched else 0
deps.update_progress(
automation_id,
log_line=f'Synced "{pl.get("name", "")}": {matched} tracks matched',
log_type='success',
)
elif sync_status == 'skipped':
total_skipped += 1
reason = sync_result.get('reason', 'unchanged')
deps.update_progress(
automation_id,
log_line=f'Skipped "{pl.get("name", "")}": {reason}',
log_type='skip',
)
elif sync_status == 'error':
sync_errors += 1
deps.update_progress(
automation_id,
log_line=f'Sync error "{pl.get("name", "")}": {sync_result.get("reason", "unknown")}',
log_type='error',
)
deps.update_progress(
# ── PHASE 3 + 4: SYNC + WISHLIST (delegated to shared helper) ──
# Each mirrored playlist payload only needs `id` + `name` for
# the helper; `auto_sync_playlist` reads the rest from the
# mirrored DB by id.
sync_summary = run_sync_and_wishlist(
deps,
automation_id,
progress=85,
phase='Phase 3/4: Sync complete',
log_line=f'Phase 3 done: {total_synced} matched, {total_skipped} skipped, {sync_errors} errors',
log_type='success' if sync_errors == 0 else 'warning',
[pl for pl in playlists if pl.get('id')],
sync_one_fn=lambda pl: auto_sync_playlist(
{'playlist_id': str(pl['id']), '_automation_id': None},
deps,
),
sync_id_for_fn=lambda pl: f"auto_mirror_{pl['id']}",
skip_wishlist=skip_wishlist,
progress_start=56,
progress_end=85,
sync_phase_label='Phase 3/4: Syncing to server...',
sync_phase_start_log='Phase 3: Sync',
wishlist_phase_label='Phase 4/4: Processing wishlist...',
wishlist_phase_start_log='Phase 4: Wishlist',
)
# ── PHASE 4: WISHLIST ─────────────────────────────────────────
wishlist_queued = 0
if not skip_wishlist:
deps.update_progress(
automation_id,
progress=86,
phase='Phase 4/4: Processing wishlist...',
log_line='Phase 4: Wishlist',
log_type='info',
)
try:
if not deps.is_wishlist_actually_processing():
deps.process_wishlist_automatically(automation_id=None)
deps.update_progress(
automation_id,
log_line='Wishlist processing triggered',
log_type='success',
)
wishlist_queued = 1
else:
deps.update_progress(
automation_id,
log_line='Wishlist already running — skipped',
log_type='skip',
)
except Exception as e:
deps.update_progress(
automation_id,
log_line=f'Wishlist error: {e}',
log_type='warning',
)
else:
deps.update_progress(
automation_id,
progress=86,
log_line='Phase 4: Wishlist skipped (disabled)',
log_type='skip',
)
total_synced = sync_summary['synced']
total_skipped = sync_summary['skipped']
sync_errors = sync_summary['errors']
wishlist_queued = sync_summary['wishlist_queued']
# ── COMPLETE ──────────────────────────────────────────────────
duration = int(time.time() - pipeline_start)

@ -15,6 +15,7 @@ from core.automation.handlers.refresh_mirrored import auto_refresh_mirrored
from core.automation.handlers.sync_playlist import auto_sync_playlist
from core.automation.handlers.discover_playlist import auto_discover_playlist
from core.automation.handlers.playlist_pipeline import auto_playlist_pipeline
from core.automation.handlers.personalized_pipeline import auto_personalized_pipeline
from core.automation.handlers.database_update import (
auto_start_database_update, auto_deep_scan_library,
)
@ -94,6 +95,14 @@ def register_all(deps: AutomationDeps) -> None:
lambda config: auto_playlist_pipeline(config, deps),
deps.state.is_pipeline_running,
)
# Personalized pipeline shares the pipeline_running flag with the
# mirrored pipeline so the two can't overlap (single sync queue,
# single wishlist worker).
engine.register_action_handler(
'personalized_pipeline',
lambda config: auto_personalized_pipeline(config, deps),
deps.state.is_pipeline_running,
)
# Database update + deep scan share the db_update_state guard —
# only one operation can mutate that state at a time.

@ -38,6 +38,7 @@ EXPECTED_ACTION_NAMES = frozenset({
'sync_playlist',
'discover_playlist',
'playlist_pipeline',
'personalized_pipeline',
'start_database_update',
'deep_scan_library',
'run_duplicate_cleaner',
@ -60,6 +61,7 @@ EXPECTED_GUARDED_ACTIONS = frozenset({
'scan_watchlist',
'scan_library',
'playlist_pipeline',
'personalized_pipeline',
'start_database_update',
'deep_scan_library',
'run_duplicate_cleaner',
@ -156,6 +158,7 @@ def _build_deps(engine, scan_mgr=None) -> AutomationDeps:
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
build_personalized_manager=lambda: None,
)

@ -105,6 +105,7 @@ def _build_deps(**overrides) -> AutomationDeps:
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
build_personalized_manager=lambda: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]

@ -0,0 +1,372 @@
"""Boundary tests for the personalized playlist pipeline handler.
Pin every shape: empty kinds error, refresh_first behaviour, snapshot
load + sync dispatch, missing-tracks skip, exception swallowing,
pipeline_running flag cleanup, sync payload shape passed to
_run_sync_task."""
from __future__ import annotations
import threading
from types import SimpleNamespace
from typing import Any, List
import pytest
from core.automation.deps import AutomationDeps, AutomationState
from core.automation.handlers.personalized_pipeline import (
auto_personalized_pipeline,
_build_payloads_for_kinds,
_track_to_sync_shape,
_sync_personalized_playlist,
)
class _StubLogger:
def debug(self, *a, **k): pass
def info(self, *a, **k): pass
def warning(self, *a, **k): pass
def error(self, *a, **k): pass
def _build_deps(**overrides) -> AutomationDeps:
defaults = dict(
engine=object(),
state=AutomationState(),
config_manager=object(),
update_progress=lambda *a, **k: None,
logger=_StubLogger(),
get_database=lambda: object(),
spotify_client=None,
tidal_client=None,
web_scan_manager=None,
process_wishlist_automatically=lambda **k: None,
process_watchlist_scan_automatically=lambda **k: None,
is_wishlist_actually_processing=lambda: False,
is_watchlist_actually_scanning=lambda: False,
get_watchlist_scan_state=lambda: {},
run_playlist_discovery_worker=lambda *a, **k: None,
run_sync_task=lambda *a, **k: None,
load_sync_status_file=lambda: {},
get_deezer_client=lambda: None,
parse_youtube_playlist=lambda url: None,
get_sync_states=lambda: {},
set_db_update_automation_id=lambda v: None,
get_db_update_state=lambda: {},
db_update_lock=threading.Lock(),
db_update_executor=None,
run_db_update_task=lambda *a, **k: None,
run_deep_scan_task=lambda *a, **k: None,
get_duplicate_cleaner_state=lambda: {},
duplicate_cleaner_lock=threading.Lock(),
duplicate_cleaner_executor=None,
run_duplicate_cleaner=lambda: None,
get_quality_scanner_state=lambda: {},
quality_scanner_lock=threading.Lock(),
quality_scanner_executor=None,
run_quality_scanner=lambda *a, **k: None,
download_orchestrator=None,
run_async=lambda coro: None,
tasks_lock=threading.Lock(),
get_download_batches=lambda: {},
get_download_tasks=lambda: {},
sweep_empty_download_directories=lambda: 0,
get_staging_path=lambda: '/staging',
docker_resolve_path=lambda p: p,
get_current_profile_id=lambda: 1,
get_watchlist_scanner=lambda spc: None,
get_app=lambda: None,
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
build_personalized_manager=lambda: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]
# ─── Track shape converter ───────────────────────────────────────────
class TestTrackToSyncShape:
def test_basic_shape(self):
track = SimpleNamespace(
track_name='Song', artist_name='Artist', album_name='Album',
spotify_track_id='sp-1', itunes_track_id=None, deezer_track_id=None,
duration_ms=200000,
)
out = _track_to_sync_shape(track)
assert out == {
'name': 'Song',
'artists': [{'name': 'Artist'}],
'album': {'name': 'Album'},
'duration_ms': 200000,
'id': 'sp-1',
}
def test_falls_back_through_source_ids(self):
t1 = SimpleNamespace(track_name='', artist_name='', album_name='',
spotify_track_id=None, itunes_track_id='it-1',
deezer_track_id=None, duration_ms=0)
assert _track_to_sync_shape(t1)['id'] == 'it-1'
t2 = SimpleNamespace(track_name='', artist_name='', album_name='',
spotify_track_id=None, itunes_track_id=None,
deezer_track_id='dz-1', duration_ms=0)
assert _track_to_sync_shape(t2)['id'] == 'dz-1'
def test_no_id_returns_empty_string(self):
t = SimpleNamespace(track_name='X', artist_name='Y', album_name='Z',
spotify_track_id=None, itunes_track_id=None,
deezer_track_id=None, duration_ms=0)
assert _track_to_sync_shape(t)['id'] == ''
# ─── Empty / config validation ──────────────────────────────────────
class TestEmptyConfig:
def test_no_kinds_returns_error_and_clears_flag(self):
deps = _build_deps()
deps.state.set_pipeline_running(True) # simulate already-running
result = auto_personalized_pipeline({}, deps)
assert result['status'] == 'error'
assert 'No personalized playlist' in result['error']
assert deps.state.pipeline_running is False
def test_empty_kinds_list_returns_error(self):
deps = _build_deps()
result = auto_personalized_pipeline({'kinds': []}, deps)
assert result['status'] == 'error'
assert deps.state.pipeline_running is False
def test_non_list_kinds_returns_error(self):
deps = _build_deps()
result = auto_personalized_pipeline({'kinds': 'not_a_list'}, deps)
assert result['status'] == 'error'
# ─── Payload building ───────────────────────────────────────────────
class _StubManagerNoTracks:
def ensure_playlist(self, kind, variant, profile_id):
return SimpleNamespace(
id=1, name=f'{kind}-{variant}', kind=kind, variant=variant,
)
def refresh_playlist(self, kind, variant, profile_id):
return self.ensure_playlist(kind, variant, profile_id)
def get_playlist_tracks(self, playlist_id):
return []
class _StubManagerWithTracks:
def __init__(self, tracks_per_kind=None):
self.tracks_per_kind = tracks_per_kind or {}
self.refresh_calls: List[tuple] = []
self.ensure_calls: List[tuple] = []
def ensure_playlist(self, kind, variant, profile_id):
self.ensure_calls.append((kind, variant, profile_id))
return SimpleNamespace(
id=hash((kind, variant)) % 10000,
name=f'{kind}-{variant or "S"}', kind=kind, variant=variant,
)
def refresh_playlist(self, kind, variant, profile_id):
self.refresh_calls.append((kind, variant, profile_id))
# Mirror real manager: refresh returns a record without invoking
# the public ensure_playlist API path again.
return SimpleNamespace(
id=hash((kind, variant)) % 10000,
name=f'{kind}-{variant or "S"}', kind=kind, variant=variant,
)
def get_playlist_tracks(self, playlist_id):
# Return all tracks regardless of id — tests scope to one playlist at a time.
for tracks in self.tracks_per_kind.values():
if tracks:
return [SimpleNamespace(
track_name=t['name'], artist_name=t.get('artist', 'A'),
album_name=t.get('album', 'Al'),
spotify_track_id=t.get('id'),
itunes_track_id=None, deezer_track_id=None,
duration_ms=200000,
) for t in tracks]
return []
class TestPayloadBuilding:
def test_skips_kinds_with_no_tracks(self):
deps = _build_deps()
manager = _StubManagerNoTracks()
payloads = _build_payloads_for_kinds(
deps, manager,
[{'kind': 'hidden_gems'}, {'kind': 'discovery_shuffle'}],
profile_id=1, automation_id=None, refresh_first=False,
)
assert payloads == []
def test_skips_invalid_entries(self):
deps = _build_deps()
manager = _StubManagerNoTracks()
payloads = _build_payloads_for_kinds(
deps, manager,
['not-a-dict', {}, {'variant': 'no-kind'}], # all invalid
profile_id=1, automation_id=None, refresh_first=False,
)
assert payloads == []
def test_refresh_first_calls_refresh(self):
deps = _build_deps()
manager = _StubManagerWithTracks(
tracks_per_kind={'hidden_gems': [{'name': 'T', 'id': 'sp-1'}]},
)
_build_payloads_for_kinds(
deps, manager,
[{'kind': 'hidden_gems'}],
profile_id=1, automation_id=None, refresh_first=True,
)
assert manager.refresh_calls == [('hidden_gems', '', 1)]
assert manager.ensure_calls == []
def test_no_refresh_calls_ensure(self):
deps = _build_deps()
manager = _StubManagerWithTracks(
tracks_per_kind={'hidden_gems': [{'name': 'T', 'id': 'sp-1'}]},
)
_build_payloads_for_kinds(
deps, manager,
[{'kind': 'hidden_gems'}],
profile_id=1, automation_id=None, refresh_first=False,
)
assert manager.ensure_calls == [('hidden_gems', '', 1)]
assert manager.refresh_calls == []
def test_payload_shape(self):
deps = _build_deps()
manager = _StubManagerWithTracks(
tracks_per_kind={'hidden_gems': [
{'name': 'Track1', 'id': 'sp-1'},
{'name': 'Track2', 'id': 'sp-2'},
]},
)
payloads = _build_payloads_for_kinds(
deps, manager,
[{'kind': 'hidden_gems'}],
profile_id=1, automation_id=None, refresh_first=False,
)
assert len(payloads) == 1
p = payloads[0]
assert p['kind'] == 'hidden_gems'
assert p['variant'] == ''
assert p['name'] == 'hidden_gems-S'
assert p['sync_id'].startswith('auto_personalized_hidden_gems_')
assert len(p['tracks_json']) == 2
assert p['tracks_json'][0]['id'] == 'sp-1'
def test_manager_exception_swallowed_continues_to_next(self):
deps = _build_deps()
class _ExplodingMgr:
def __init__(self):
self.calls = []
def ensure_playlist(self, kind, variant, profile_id):
self.calls.append(kind)
if kind == 'broken':
raise RuntimeError('manager boom')
return SimpleNamespace(id=1, name=kind, kind=kind, variant=variant)
def get_playlist_tracks(self, _id):
return []
mgr = _ExplodingMgr()
# broken raises, hidden_gems proceeds (just no tracks).
payloads = _build_payloads_for_kinds(
deps, mgr,
[{'kind': 'broken'}, {'kind': 'hidden_gems'}],
profile_id=1, automation_id=None, refresh_first=False,
)
assert mgr.calls == ['broken', 'hidden_gems']
assert payloads == [] # neither produced tracks
# ─── Sync launch ────────────────────────────────────────────────────
class TestSyncLaunch:
def test_sync_one_playlist_starts_thread(self):
captured: List[tuple] = []
def fake_run_sync_task(*args):
captured.append(args)
deps = _build_deps(
run_sync_task=fake_run_sync_task,
get_current_profile_id=lambda: 7,
)
payload = {
'sync_id': 'auto_personalized_hidden_gems_',
'name': 'Hidden Gems',
'tracks_json': [{'name': 'X', 'id': 'sp-1'}],
'image_url': '',
}
result = _sync_personalized_playlist(deps, payload)
assert result['status'] == 'started'
# Wait for thread to invoke fake_run_sync_task.
for _ in range(100):
if captured:
break
import time
time.sleep(0.01)
assert len(captured) == 1
# Args: (sync_id, name, tracks_json, automation_id, profile_id, image_url)
assert captured[0][0] == 'auto_personalized_hidden_gems_'
assert captured[0][1] == 'Hidden Gems'
assert captured[0][3] is None # automation_id muted
assert captured[0][4] == 7 # profile_id
# ─── Full pipeline (with stubbed manager + sync states) ─────────────
class TestPipelineHappyPath:
def test_pipeline_completes_with_synced_count(self):
# Stub manager returns one playlist with 2 tracks.
manager = _StubManagerWithTracks(
tracks_per_kind={'hidden_gems': [
{'name': 'A', 'id': 'sp-1'},
{'name': 'B', 'id': 'sp-2'},
]},
)
# sync_states populated as if the sync background task finished.
sync_states_storage = {}
def fake_run_sync(sync_id, name, tracks, aid, pid, img):
sync_states_storage[sync_id] = {
'status': 'finished',
'result': {'matched_tracks': 2},
}
deps = _build_deps(
build_personalized_manager=lambda: manager,
run_sync_task=fake_run_sync,
get_sync_states=lambda: sync_states_storage,
)
# Patch time.sleep in shared helper so test doesn't take 2s per iter.
import core.automation.handlers._pipeline_shared as shared
orig = shared.time.sleep
shared.time.sleep = lambda _: None
try:
result = auto_personalized_pipeline(
{'_automation_id': 'auto-1', 'kinds': [{'kind': 'hidden_gems'}]},
deps,
)
finally:
shared.time.sleep = orig
assert result['status'] == 'completed'
assert result['_manages_own_progress'] is True
# Pipeline-running flag cleaned up.
assert deps.state.pipeline_running is False

@ -122,6 +122,7 @@ def _build_deps(**overrides) -> AutomationDeps:
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
build_personalized_manager=lambda: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]

@ -92,6 +92,7 @@ def _build_deps(**overrides: Any) -> AutomationDeps:
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
build_personalized_manager=lambda: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]

@ -78,6 +78,7 @@ def _build_deps(**overrides) -> AutomationDeps:
get_beatport_data_cache=lambda: {'cache_lock': threading.Lock(), 'homepage': {}},
init_automation_progress=lambda *a, **k: None,
record_progress_history=lambda *a, **k: None,
build_personalized_manager=lambda: None,
)
defaults.update(overrides)
return AutomationDeps(**defaults) # type: ignore[arg-type]

@ -992,6 +992,7 @@ def _register_automation_handlers():
get_beatport_data_cache=lambda: beatport_data_cache,
init_automation_progress=_init_automation_progress,
record_progress_history=_auto_progress.record_history,
build_personalized_manager=_build_personalized_manager,
)
_register_extracted_handlers(_automation_deps)

@ -3416,6 +3416,7 @@ const WHATS_NEW = {
'2.5.2': [
// --- May 13, 2026 — 2.5.2 release ---
{ date: 'May 13, 2026 — 2.5.2 release' },
{ title: 'Personalized Playlist Pipeline: Auto-Sync Discover-Page Playlists', desc: 'follow-up to the personalized-playlists standardization PR. new automation action `personalized_pipeline` syncs your selected discover-page playlists (Hidden Gems, Time Machine per-decade, Fresh Tape, The Archives, Seasonal Mix per-season, etc.) to your active media server + queues missing tracks for download — same pattern as the existing mirrored playlist pipeline but two phases instead of four (no REFRESH or DISCOVER needed since manager-backed snapshots are already metadata-matched). config: pick which kinds+variants to include, optional `refresh_first` to regenerate snapshots before syncing, optional `skip_wishlist`. shares the pipeline_running guard with the mirrored pipeline so the two can\'t overlap (one sync queue, one wishlist worker). lifted PHASE 3 (SYNC loop) + PHASE 4 (WISHLIST tail) of the mirrored pipeline into shared `core/automation/handlers/_pipeline_shared.run_sync_and_wishlist` so both pipelines reuse the same sync-state polling / progress emission / wishlist trigger logic — 0 duplication. trigger UI block declared at `core/automation/blocks.py` (full multi-select picker UI is its own follow-up). 14 new boundary tests pin: track→sync_shape conversion + source ID fallback, empty-kinds error, payload building skips no-tracks playlists, refresh_first vs ensure dispatch, manager exception swallowed continues to next kind, full pipeline happy-path with stubbed sync_states. 3383 tests pass total. now usable via API today, polished UI dropping next.', page: 'discover' },
{ title: 'Personalized Playlists Standardization', desc: 'all 8 personalized / discover-page playlists (Hidden Gems, Discovery Shuffle, Popular Picks, Time Machine per-decade, Genre playlists per-genre, Daily Mixes, Fresh Tape, The Archives, Seasonal Mix per-season) now share one unified storage layer. pre-overhaul: Group A (Fresh Tape / Archives / Seasonal Mix) lived in one shape, Group B (everything else) was computed-on-demand with no persistence — every page-load re-rolled the dice and tracks rotated under your feet. post-overhaul: every playlist has a stable identity, persistent track snapshot, explicit refresh button, and per-playlist tweakable config (limit, diversity caps, popularity bounds, recency window, exclude-recent-days staleness window). prerequisite for the playlist pipeline integration coming in the next PR (sync these to your media server + send missing tracks to wishlist on a timer). fixed a stub: Daily Mixes used to promise 50% library + 50% discovery but the library half always returned [] (tracks table has no source IDs to sync) — now honestly discovery-only so it actually works. also: each kind\'s body lifted into its own module under `core/personalized/generators/`, behavior preserved verbatim from the legacy `PersonalizedPlaylistsService` and `SeasonalDiscoveryService`. new REST endpoints under `/api/personalized/*`. 134 boundary tests cover every kind + the manager + the API + staleness filter; full suite at 3369 tests.', page: 'discover' },
{ title: 'Dashboard Activity Feed: Stop Showing "NaNmo ago"', desc: 'recent activity items on the dashboard all rendered "NaNmo ago" because the formatter was parsing `activity.time` (a human label like "Now") as a date. backend has always emitted `activity.timestamp` (Unix epoch seconds) alongside the label — frontend now uses that for relative-time formatting. falls back to the literal label only when no timestamp present (legacy items / future shapes).', page: 'home' },
{ title: 'Token Leak Round 2: URL-Encoded Form In Artist Endpoint + Playlist Sync', desc: 'security follow-up to the prior token-leak fix. found three sites in `web_server.py` (artist endpoint) that logged the full `image_url` and the entire artist_info dict at INFO on every artist-page render — the dict contained the `image_url` field routed through the image proxy (`/api/image-proxy?url=<encoded>`), URL-encoding the X-Plex-Token / X-Emby-Token / Subsonic auth straight into the log line. also one site in `core/discovery/sync.py` logged the playlist poster URL during sync. fixes: dropped the three artist-endpoint dev-time debug log lines entirely (before-fix, after-fix, "Final artist data being sent"). playlist-image log now logs `has_image=True/False`, not the URL. strengthened `_redact_url_secrets` with a second regex pattern that matches the URL-encoded form (`%3FX-Plex-Token%3D...`) so any future log-through-redactor catches both plain and encoded shapes. wipe your existing app.log if it captured tokens in either form, and rotate Plex / Jellyfin / Navidrome credentials.', page: 'settings' },

Loading…
Cancel
Save