Merge pull request #613 from Nezreka/feature/personalized-playlists-overhaul

Feature/personalized playlists overhaul
pull/529/head
BoulderBadgeDad 1 week ago committed by GitHub
commit fe6f196cac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,24 @@
"""Standardized personalized-playlist subsystem.
Replaces the patchwork of `PersonalizedPlaylistsService` (computed-on-
demand views, no persistence) + `discovery_curated_playlists` (ID-only
storage) + `curated_seasonal_playlists` (full storage) with a single
unified abstraction:
- ``manager.PersonalizedPlaylistManager`` owns the storage layer +
generator dispatch + refresh lifecycle.
- ``specs.PlaylistKindSpec`` one spec per playlist KIND
(``hidden_gems``, ``time_machine``, ``seasonal_mix``, etc.) with
generator function, default config, variant resolver, and display-
name template.
- ``types.Track`` / ``types.PlaylistConfig`` shared dataclasses.
The legacy ``PersonalizedPlaylistsService`` keeps its existing
generator implementations they're called BY the manager rather than
duplicated. This means:
- The improved diversity logic / popularity thresholds / blacklist
filtering all stays.
- New behavior layered on top: persistence, refresh-on-demand,
per-playlist user-tweakable config, staleness windows, listening-
history cross-reference, seeded randomization.
"""

@ -0,0 +1,151 @@
"""HTTP endpoint handlers for the personalized-playlists subsystem.
Wired into the Flask app from web_server.py. Each handler is a thin
wrapper that:
1. Pulls profile id + manager from request context.
2. Calls one PersonalizedPlaylistManager method.
3. Returns a JSON-serializable shape.
Live routes (registered against the main Flask app):
- GET /api/personalized/playlists list
- GET /api/personalized/kinds registry
- GET /api/personalized/playlist/<kind> singleton
- GET /api/personalized/playlist/<kind>/<variant> variant
- POST /api/personalized/playlist/<kind>/refresh singleton
- POST /api/personalized/playlist/<kind>/<variant>/refresh variant
- PUT /api/personalized/playlist/<kind>/config singleton
- PUT /api/personalized/playlist/<kind>/<variant>/config variant
The handlers themselves are pure functions returning Python dicts so
they're testable without spinning up Flask. The wiring step in
web_server.py wraps them in `jsonify` + URL routing.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from core.personalized.manager import PersonalizedPlaylistManager
from core.personalized.specs import PlaylistKindRegistry, get_registry
from core.personalized.types import PlaylistRecord, Track
def _record_to_dict(record: PlaylistRecord) -> Dict[str, Any]:
return {
'id': record.id,
'profile_id': record.profile_id,
'kind': record.kind,
'variant': record.variant,
'name': record.name,
'config': record.config.to_json_dict(),
'track_count': record.track_count,
'last_generated_at': record.last_generated_at,
'last_synced_at': record.last_synced_at,
'last_generation_source': record.last_generation_source,
'last_generation_error': record.last_generation_error,
}
def _track_to_dict(track: Track) -> Dict[str, Any]:
return {
'spotify_track_id': track.spotify_track_id,
'itunes_track_id': track.itunes_track_id,
'deezer_track_id': track.deezer_track_id,
'track_name': track.track_name,
'artist_name': track.artist_name,
'album_name': track.album_name,
'album_cover_url': track.album_cover_url,
'duration_ms': track.duration_ms,
'popularity': track.popularity,
'track_data_json': track.track_data_json,
'source': track.source,
}
def list_kinds(registry: Optional[PlaylistKindRegistry] = None) -> Dict[str, Any]:
"""Return every registered playlist kind with metadata.
UI uses this to render the "available playlists" picker. Each
kind reports whether it requires a variant and the resolved
variant set so the UI can render variant choices when relevant."""
reg = registry or get_registry()
out = []
for spec in reg.all():
out.append({
'kind': spec.kind,
'name_template': spec.name_template,
'description': spec.description,
'requires_variant': spec.requires_variant,
'tags': list(spec.tags),
'default_config': spec.default_config.to_json_dict(),
})
return {'success': True, 'kinds': out}
def list_playlists(manager: PersonalizedPlaylistManager, profile_id: int) -> Dict[str, Any]:
"""List every persisted playlist for a profile."""
records = manager.list_playlists(profile_id)
return {
'success': True,
'playlists': [_record_to_dict(r) for r in records],
}
def get_playlist_with_tracks(
manager: PersonalizedPlaylistManager,
kind: str,
variant: str,
profile_id: int,
) -> Dict[str, Any]:
"""Get the playlist row + its current track snapshot. Auto-creates
the row from default config if it doesn't exist (so the UI's first-
paint of an unseen kind works without a separate ensure call)."""
record = manager.ensure_playlist(kind, variant, profile_id)
tracks = manager.get_playlist_tracks(record.id)
return {
'success': True,
'playlist': _record_to_dict(record),
'tracks': [_track_to_dict(t) for t in tracks],
}
def refresh_playlist(
manager: PersonalizedPlaylistManager,
kind: str,
variant: str,
profile_id: int,
config_overrides: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Run the kind's generator and persist the snapshot. Returns the
fresh row + tracks."""
record = manager.refresh_playlist(kind, variant, profile_id, config_overrides=config_overrides)
tracks = manager.get_playlist_tracks(record.id)
return {
'success': True,
'playlist': _record_to_dict(record),
'tracks': [_track_to_dict(t) for t in tracks],
}
def update_config(
manager: PersonalizedPlaylistManager,
kind: str,
variant: str,
profile_id: int,
overrides: Dict[str, Any],
) -> Dict[str, Any]:
"""Patch the playlist's config with the provided fields."""
record = manager.update_config(kind, variant, profile_id, overrides)
return {
'success': True,
'playlist': _record_to_dict(record),
}
__all__ = [
'list_kinds',
'list_playlists',
'get_playlist_with_tracks',
'refresh_playlist',
'update_config',
]

@ -0,0 +1,27 @@
"""Per-kind generators for the personalized-playlists subsystem.
Each module in this subpackage:
1. Defines a generator function ``generate(deps, variant, config)``
that returns a ``List[Track]``.
2. Calls ``get_registry().register(spec)`` at import time so the
manager auto-discovers it.
The legacy ``core.personalized_playlists.PersonalizedPlaylistsService``
keeps its existing implementations the wrappers in this package
just adapt the call surface (`PlaylistConfig` method kwargs) and
coerce results into ``Track`` instances.
To register every generator, import this package `from
core.personalized import generators` typically done once at
application startup."""
# Importing each module triggers its registration side-effect.
from core.personalized.generators import hidden_gems # noqa: F401
from core.personalized.generators import discovery_shuffle # noqa: F401
from core.personalized.generators import popular_picks # noqa: F401
from core.personalized.generators import time_machine # noqa: F401
from core.personalized.generators import genre_playlist # noqa: F401
from core.personalized.generators import daily_mix # noqa: F401
from core.personalized.generators import fresh_tape # noqa: F401
from core.personalized.generators import archives # noqa: F401
from core.personalized.generators import seasonal_mix # noqa: F401

@ -0,0 +1,37 @@
"""Shared helpers for personalized-playlist generators.
Each per-kind generator module is small + mechanical it pulls the
legacy ``PersonalizedPlaylistsService`` instance off the deps object
and calls the matching method, then coerces results. This module
holds the bits every generator reuses so we don't repeat them
five times."""
from __future__ import annotations
from typing import Any, List
from core.personalized.types import Track
def get_service(deps: Any):
"""Pull the ``PersonalizedPlaylistsService`` instance from deps.
Generators access the service via ``deps.service``. Tests can
pass a fake deps namespace with a ``service`` attribute that
returns a stub. Raises a clear error if the dep isn't wired."""
service = getattr(deps, 'service', None) or (deps.get('service') if isinstance(deps, dict) else None)
if service is None:
raise RuntimeError(
"Personalized generator deps missing `service` "
"(PersonalizedPlaylistsService instance). Wire it during "
"PersonalizedPlaylistManager construction."
)
return service
def coerce_tracks(rows: List[dict]) -> List[Track]:
"""Convert legacy generator output (list of dicts) into Track
instances. Tolerates None / non-list inputs by returning []."""
if not rows:
return []
return [Track.from_dict(row) for row in rows if isinstance(row, dict)]

@ -0,0 +1,35 @@
"""The Archives (Spotify Discover Weekly) generator.
Same shape as Fresh Tape read curated track-id list from
``discovery_curated_playlists`` under ``discovery_weekly_<source>``
(fallback ``discovery_weekly``), hydrate via discovery pool."""
from __future__ import annotations
from typing import Any, List
from core.personalized.generators.fresh_tape import _hydrate_curated
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'archives'
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
return _hydrate_curated(deps, 'discovery_weekly', config)
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='The Archives',
description='Your Spotify Discover Weekly — curated discovery picks.',
default_config=PlaylistConfig(limit=50, max_per_album=5, max_per_artist=10),
generator=generate,
requires_variant=False,
tags=['curated', 'spotify'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,83 @@
"""Daily Mix generator — top library genre → discovery picks.
Variant = rank position as a string ('1' / '2' / '3' / '4'). Each
mix tracks the user's Nth top library genre and returns discovery
picks within it. Top genres recompute at refresh time, so as the
library evolves a mix's underlying genre can shift -- the playlist
metadata records which genre was used at the most recent refresh
so the UI can label the mix accurately.
Note: previously this kind ambitiously promised 50% library + 50%
discovery. The library half was a stub (`tracks` table has no
source IDs to sync), so the new generator is discovery-only.
A future enhancement can backfill source IDs into library rows
and re-add the hybrid behavior."""
from __future__ import annotations
from typing import Any, List
from core.personalized.generators._common import coerce_tracks, get_service
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'daily_mix'
# Default rank set — UI surfaces 4 daily mixes by default.
_DEFAULT_RANKS = ('1', '2', '3', '4')
# Number of top library genres to consider when ranking.
_MAX_TOP_GENRES = 8
def _resolve_genre_for_rank(service, rank: int) -> str:
"""Look up the user's Nth-ranked top library genre. Returns the
genre key or '' when no genre at that rank.
Calls ``service.get_top_genres_from_library(limit=...)`` and
indexes the resulting (genre, count) tuples by 0-based rank.
"""
top = service.get_top_genres_from_library(limit=_MAX_TOP_GENRES) or []
if rank < 1 or rank > len(top):
return ''
pair = top[rank - 1]
if not pair:
return ''
# `top` is List[Tuple[str, int]] per service signature.
return pair[0] if isinstance(pair, (tuple, list)) else str(pair)
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
service = get_service(deps)
try:
rank = int(variant)
except (TypeError, ValueError) as exc:
raise ValueError(f"Daily Mix variant {variant!r} must be a rank int") from exc
genre = _resolve_genre_for_rank(service, rank)
if not genre:
# User's library doesn't have enough genres for this rank.
return []
rows = service.get_genre_playlist(genre=genre, limit=config.limit)
return coerce_tracks(rows)
def variant_resolver(deps: Any) -> List[str]:
"""Return the standard rank set."""
return list(_DEFAULT_RANKS)
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Daily Mix {variant}',
description='Personalized mix based on your top library genres. One mix per top genre rank.',
default_config=PlaylistConfig(limit=50, max_per_album=2, max_per_artist=3),
generator=generate,
variant_resolver=variant_resolver,
requires_variant=True,
tags=['discovery', 'personalized'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,33 @@
"""Discovery Shuffle generator — pure-random discovery pool exploration."""
from __future__ import annotations
from typing import Any, List
from core.personalized.generators._common import coerce_tracks, get_service
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'discovery_shuffle'
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
service = get_service(deps)
rows = service.get_discovery_shuffle(limit=config.limit)
return coerce_tracks(rows)
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Discovery Shuffle',
description='Pure random shuffle from the discovery pool — different every refresh.',
default_config=PlaylistConfig(limit=50, max_per_album=2, max_per_artist=2),
generator=generate,
requires_variant=False,
tags=['discovery'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,119 @@
"""Fresh Tape (Spotify Release Radar) generator.
Reads the curated track-id list cached in ``discovery_curated_playlists``
under ``release_radar_<source>`` (with fallback to ``release_radar``)
and hydrates each ID against the discovery pool to produce full Track
records. The Spotify enrichment worker is responsible for keeping the
curated list fresh this generator is just a read-and-hydrate path."""
from __future__ import annotations
import json
from typing import Any, List
from core.personalized.generators._common import coerce_tracks
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'fresh_tape'
def _hydrate_curated(deps: Any, curated_type_prefix: str, config: PlaylistConfig) -> List[Track]:
"""Shared body for Fresh Tape + Archives — pulls the cached IDs
from discovery_curated_playlists and hydrates them via the live
discovery pool. Returns a Track list trimmed to ``config.limit``."""
# Allow tests to inject a fake db / service; production flow gets
# them from the manager's deps.
db = getattr(deps, 'database', None) or (deps.get('database') if isinstance(deps, dict) else None)
if db is None:
raise RuntimeError("Curated-playlist generator deps missing `database`")
profile_id = _resolve_profile_id(deps)
active_source = _resolve_active_source(deps)
# Try source-specific then generic, mirrors web_server endpoint behavior.
curated_ids = (
db.get_curated_playlist(f'{curated_type_prefix}_{active_source}', profile_id=profile_id)
or db.get_curated_playlist(curated_type_prefix, profile_id=profile_id)
or []
)
if not curated_ids:
return []
pool_rows = db.get_discovery_pool_tracks(
limit=5000, new_releases_only=False,
source=active_source, profile_id=profile_id,
)
by_id = {}
for t in pool_rows:
if active_source == 'spotify' and getattr(t, 'spotify_track_id', None):
by_id[t.spotify_track_id] = t
elif active_source == 'deezer' and getattr(t, 'deezer_track_id', None):
by_id[t.deezer_track_id] = t
elif active_source == 'itunes' and getattr(t, 'itunes_track_id', None):
by_id[t.itunes_track_id] = t
tracks: List[Track] = []
for tid in curated_ids:
candidate = by_id.get(tid)
if candidate is None:
continue
# The pool track is a row-like object; coerce to dict for
# Track.from_dict's existing tolerance.
td = getattr(candidate, 'track_data_json', None)
if isinstance(td, str):
try:
td = json.loads(td)
except (ValueError, TypeError):
td = None
track_dict = {
'spotify_track_id': getattr(candidate, 'spotify_track_id', None),
'itunes_track_id': getattr(candidate, 'itunes_track_id', None),
'deezer_track_id': getattr(candidate, 'deezer_track_id', None),
'track_name': getattr(candidate, 'track_name', ''),
'artist_name': getattr(candidate, 'artist_name', ''),
'album_name': getattr(candidate, 'album_name', ''),
'album_cover_url': getattr(candidate, 'album_cover_url', None),
'duration_ms': getattr(candidate, 'duration_ms', 0),
'popularity': getattr(candidate, 'popularity', 0),
'track_data_json': td,
'source': getattr(candidate, 'source', active_source),
}
tracks.append(Track.from_dict(track_dict))
if len(tracks) >= config.limit:
break
return tracks
def _resolve_profile_id(deps: Any) -> int:
fn = getattr(deps, 'get_current_profile_id', None) or (
deps.get('get_current_profile_id') if isinstance(deps, dict) else None
)
return fn() if callable(fn) else 1
def _resolve_active_source(deps: Any) -> str:
fn = getattr(deps, 'get_active_discovery_source', None) or (
deps.get('get_active_discovery_source') if isinstance(deps, dict) else None
)
return fn() if callable(fn) else 'spotify'
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
return _hydrate_curated(deps, 'release_radar', config)
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Fresh Tape',
description='Your Spotify Release Radar — new releases from artists you follow.',
default_config=PlaylistConfig(limit=50, max_per_album=5, max_per_artist=10),
generator=generate,
requires_variant=False,
tags=['curated', 'spotify'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,78 @@
"""Genre Playlist generator — discovery picks within one genre.
Variant = either a parent-genre key from
``PersonalizedPlaylistsService.GENRE_MAPPING`` (e.g. ``'rock'``,
``'electronic_dance'``) or a specific child-genre keyword (e.g.
``'house'``). Stored variant is always normalized to lowercase
underscore-separated form so the UI and storage agree.
"""
from __future__ import annotations
from typing import Any, List
from core.personalized.generators._common import coerce_tracks, get_service
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'genre_playlist'
def _normalize_variant_to_genre_key(variant: str, service) -> str:
"""Resolve a variant string back into the genre identifier the
legacy service expects.
Service accepts both parent-genre KEYS from GENRE_MAPPING (e.g.
'Electronic/Dance', 'Hip Hop/Rap') and free-form keywords.
The URL-safe variant we store is the parent key with `/` replaced
by `_` and lowercased e.g. 'electronic_dance'. This helper
inverts that mapping."""
if not variant:
raise ValueError('Genre playlist requires a variant')
# Build a once-computed lookup of normalized → original parent key.
mapping = getattr(service, 'GENRE_MAPPING', {})
for parent_key in mapping.keys():
normalized = parent_key.lower().replace('/', '_').replace(' ', '_')
if normalized == variant.lower():
return parent_key
# Fall through: treat the variant as a free-form keyword (the
# legacy service handles partial matching for those).
return variant
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
service = get_service(deps)
genre_key = _normalize_variant_to_genre_key(variant, service)
rows = service.get_genre_playlist(genre=genre_key, limit=config.limit)
return coerce_tracks(rows)
def variant_resolver(deps: Any) -> List[str]:
"""Return the URL-safe variant for every parent genre defined on
the service. Specific (free-form) genre variants aren't enumerated
they're created on demand when the user requests a custom one."""
service = get_service(deps)
mapping = getattr(service, 'GENRE_MAPPING', {})
return [
parent.lower().replace('/', '_').replace(' ', '_')
for parent in mapping.keys()
]
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Genre — {variant}',
description='Discovery picks within one genre. Supports parent-genre families + free-form genre keywords.',
default_config=PlaylistConfig(limit=50, max_per_album=3, max_per_artist=5),
generator=generate,
variant_resolver=variant_resolver,
requires_variant=True,
tags=['genre'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,42 @@
"""Hidden Gems generator — low-popularity tracks from discovery pool.
Wraps ``PersonalizedPlaylistsService.get_hidden_gems`` so the
existing source-aware popularity threshold + diversity filter
behavior is preserved verbatim. The user-tweakable knobs that
arrive via ``PlaylistConfig`` (limit) flow through; future config
options (popularity_max override, exclude_recent_days) get layered
on the wrapper without changing the legacy implementation."""
from __future__ import annotations
from typing import Any, List
from core.personalized.generators._common import coerce_tracks, get_service
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'hidden_gems'
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
service = get_service(deps)
rows = service.get_hidden_gems(limit=config.limit)
return coerce_tracks(rows)
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Hidden Gems',
description='Low-popularity discovery picks — underground / indie tracks you probably haven\'t heard.',
default_config=PlaylistConfig(limit=50, max_per_album=2, max_per_artist=3),
generator=generate,
requires_variant=False,
tags=['discovery'],
)
# Register at import time so the manager auto-discovers this kind.
# Re-import (e.g. test reloads) is tolerated: only register if absent.
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,33 @@
"""Popular Picks generator — high-popularity discovery pool picks."""
from __future__ import annotations
from typing import Any, List
from core.personalized.generators._common import coerce_tracks, get_service
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'popular_picks'
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
service = get_service(deps)
rows = service.get_popular_picks(limit=config.limit)
return coerce_tracks(rows)
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Popular Picks',
description='High-popularity tracks from the discovery pool — what most people are listening to.',
default_config=PlaylistConfig(limit=50, max_per_album=2, max_per_artist=3),
generator=generate,
requires_variant=False,
tags=['discovery'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,137 @@
"""Seasonal Mix generator (variant = season key).
Variant = season key from ``SEASONAL_CONFIG`` (``'halloween'`` /
``'christmas'`` / ``'valentines'`` / ``'summer'`` / ``'spring'`` /
``'autumn'``). One playlist per season user picks which seasons
to enable; idle seasons can stay un-refreshed until their active
period.
Reads curated track IDs from ``curated_seasonal_playlists`` (via
``SeasonalDiscoveryService.get_curated_seasonal_playlist``) and
hydrates them against ``seasonal_tracks`` (which carries full
metadata including ``track_data_json`` for sync-ready downstream
use)."""
from __future__ import annotations
import json
from typing import Any, List
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'seasonal_mix'
def _resolve_seasonal_service(deps: Any):
"""Pull the SeasonalDiscoveryService instance from deps."""
svc = getattr(deps, 'seasonal_service', None) or (
deps.get('seasonal_service') if isinstance(deps, dict) else None
)
if svc is None:
raise RuntimeError(
"Seasonal mix generator deps missing `seasonal_service` "
"(SeasonalDiscoveryService instance)."
)
return svc
def _resolve_database(deps: Any):
db = getattr(deps, 'database', None) or (
deps.get('database') if isinstance(deps, dict) else None
)
if db is None:
raise RuntimeError("Seasonal mix generator deps missing `database`")
return db
def _resolve_active_source(deps: Any) -> str:
fn = getattr(deps, 'get_active_discovery_source', None) or (
deps.get('get_active_discovery_source') if isinstance(deps, dict) else None
)
return fn() if callable(fn) else 'spotify'
def _hydrate_seasonal_tracks(db, season_key: str, source: str, track_ids: List[str]) -> List[Track]:
"""Look up the seasonal_tracks rows for the given IDs."""
if not track_ids:
return []
placeholders = ','.join('?' * len(track_ids))
with db._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
f"""
SELECT spotify_track_id, track_name, artist_name, album_name,
album_cover_url, duration_ms, popularity, track_data_json
FROM seasonal_tracks
WHERE season_key = ? AND source = ?
AND spotify_track_id IN ({placeholders})
""",
(season_key, source, *track_ids),
)
rows = cursor.fetchall()
by_id = {}
for r in rows:
if hasattr(r, 'keys'):
r = dict(r)
else:
r = dict(zip(
('spotify_track_id', 'track_name', 'artist_name', 'album_name',
'album_cover_url', 'duration_ms', 'popularity', 'track_data_json'),
r,
strict=False,
))
td = r.get('track_data_json')
if isinstance(td, str):
try:
td = json.loads(td)
except (ValueError, TypeError):
td = None
r['track_data_json'] = td
r['source'] = source
by_id[r['spotify_track_id']] = r
# Preserve curated order.
return [
Track.from_dict(by_id[tid])
for tid in track_ids
if tid in by_id
]
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
if not variant:
raise ValueError('Seasonal Mix requires a season variant')
seasonal_service = _resolve_seasonal_service(deps)
db = _resolve_database(deps)
source = _resolve_active_source(deps)
track_ids = seasonal_service.get_curated_seasonal_playlist(variant, source=source) or []
tracks = _hydrate_seasonal_tracks(db, variant, source, track_ids)
return tracks[:config.limit]
def variant_resolver(deps: Any) -> List[str]:
"""Return every season key from SEASONAL_CONFIG."""
try:
from core.seasonal_discovery import SEASONAL_CONFIG
except Exception:
return []
return list(SEASONAL_CONFIG.keys())
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Seasonal — {variant}',
description='Holiday / season-themed picks. One playlist per season; user enables which to track.',
default_config=PlaylistConfig(limit=50, max_per_album=2, max_per_artist=3),
generator=generate,
variant_resolver=variant_resolver,
requires_variant=True,
tags=['curated', 'seasonal'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,66 @@
"""Time Machine generator — by-decade discovery picks.
Variant = decade label like ``'1980s'`` / ``'1990s'`` / ``'2000s'``.
The variant resolver returns the standard decade set; users see one
playlist per decade (each independently configurable / refreshable).
"""
from __future__ import annotations
from typing import Any, List
from core.personalized.generators._common import coerce_tracks, get_service
from core.personalized.specs import PlaylistKindSpec, get_registry
from core.personalized.types import PlaylistConfig, Track
KIND = 'time_machine'
# Standard decades the UI exposes. Adjust here when adding eras.
_DEFAULT_DECADES = ('1960s', '1970s', '1980s', '1990s', '2000s', '2010s', '2020s')
def _decade_to_year(variant: str) -> int:
"""'1980s' -> 1980. Tolerates ' 1980 ', '1980'.
Raises ValueError for anything that doesn't look like a decade
label so the manager surfaces a clear error instead of generating
garbage."""
cleaned = (variant or '').strip().rstrip('sS')
try:
year = int(cleaned)
except ValueError as exc:
raise ValueError(f"Time Machine variant {variant!r} not a decade label") from exc
if year < 1900 or year > 2100:
raise ValueError(f"Time Machine variant {variant!r} out of range")
return year
def generate(deps: Any, variant: str, config: PlaylistConfig) -> List[Track]:
service = get_service(deps)
decade_year = _decade_to_year(variant)
rows = service.get_decade_playlist(decade=decade_year, limit=config.limit)
return coerce_tracks(rows)
def variant_resolver(deps: Any) -> List[str]:
"""Return the standard decade set. Future enhancement: filter to
decades that actually have data in the discovery pool."""
return list(_DEFAULT_DECADES)
SPEC = PlaylistKindSpec(
kind=KIND,
name_template='Time Machine — {variant}',
description='Tracks from a specific decade. One playlist per decade.',
default_config=PlaylistConfig(limit=100, max_per_album=3, max_per_artist=5),
generator=generate,
variant_resolver=variant_resolver,
requires_variant=True,
tags=['time'],
)
if get_registry().get(KIND) is None:
get_registry().register(SPEC)

@ -0,0 +1,444 @@
"""Storage layer + lifecycle for personalized playlists.
The manager is the ONLY place that touches the
``personalized_playlists`` / ``personalized_playlist_tracks`` /
``personalized_track_history`` tables. Generators (in
``core/personalized/generators/``) produce track lists; the manager
persists, refreshes, and serves them.
Key invariants:
- ``(profile_id, kind, variant)`` uniquely identifies a playlist.
Variant '' (empty string) means singleton used for kinds like
``hidden_gems`` that don't have multiple instances per profile.
- A playlist row is auto-created on first access (``ensure_playlist``)
using the kind's default config.
- Track lists are atomically replaced on refresh never partial-
mutated. Either the new snapshot lands fully or the old one
remains.
- Refresh failures get logged on the row
(``last_generation_error``) so the UI can surface them without
losing the previous good snapshot.
- Staleness history is append-only and queried by the
``exclude_recent_days`` config option (handled by individual
generators when they want to honor it).
"""
from __future__ import annotations
import json
import time
from dataclasses import asdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from utils.logging_config import get_logger
from core.personalized.specs import PlaylistKindRegistry, get_registry
from core.personalized.types import PlaylistConfig, PlaylistRecord, Track
logger = get_logger("personalized.manager")
class PersonalizedPlaylistManager:
"""Owns persistence + refresh lifecycle for personalized playlists."""
def __init__(self, database, deps: Any, registry: Optional[PlaylistKindRegistry] = None):
"""
Args:
database: MusicDatabase singleton (exposes ``_get_connection``).
deps: Opaque object passed through to each generator
callable. Whatever a generator needs (the legacy
``PersonalizedPlaylistsService`` instance, the
``config_manager``, a metadata client) goes in here.
Manager doesn't inspect it.
registry: optional PlaylistKindRegistry override (for tests).
"""
self.database = database
self.deps = deps
self.registry = registry or get_registry()
# ─── playlist row lifecycle ──────────────────────────────────────
def ensure_playlist(self, kind: str, variant: str = '', profile_id: int = 1) -> PlaylistRecord:
"""Return the playlist row for ``(profile, kind, variant)``,
creating it from the kind's default config if it doesn't exist."""
spec = self.registry.get(kind)
if spec is None:
raise ValueError(f"Unknown playlist kind: {kind!r}")
if spec.requires_variant and not variant:
raise ValueError(f"Kind {kind!r} requires a variant")
existing = self._fetch_playlist_row(kind, variant, profile_id)
if existing is not None:
return existing
# Insert new row using the kind's defaults.
config = spec.default_config
name = spec.display_name(variant)
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO personalized_playlists
(profile_id, kind, variant, name, config_json,
track_count, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""",
(profile_id, kind, variant, name, json.dumps(config.to_json_dict())),
)
conn.commit()
return self._fetch_playlist_row(kind, variant, profile_id) # type: ignore[return-value]
def get_playlist(self, kind: str, variant: str = '', profile_id: int = 1) -> Optional[PlaylistRecord]:
"""Return the playlist row if it exists. Does NOT auto-create."""
return self._fetch_playlist_row(kind, variant, profile_id)
def list_playlists(self, profile_id: int = 1) -> List[PlaylistRecord]:
"""List every persisted playlist for a profile, newest-first."""
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, profile_id, kind, variant, name, config_json,
track_count, last_generated_at, last_synced_at,
last_generation_source, last_generation_error
FROM personalized_playlists
WHERE profile_id = ?
ORDER BY COALESCE(last_generated_at, created_at) DESC
""",
(profile_id,),
)
rows = cursor.fetchall()
return [self._row_to_record(r) for r in rows]
def update_config(self, kind: str, variant: str, profile_id: int, overrides: Dict[str, Any]) -> PlaylistRecord:
"""Patch the per-playlist config with the provided overrides."""
record = self.ensure_playlist(kind, variant, profile_id)
merged = record.config.merged(overrides)
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE personalized_playlists
SET config_json = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(json.dumps(merged.to_json_dict()), record.id),
)
conn.commit()
return self._fetch_playlist_row(kind, variant, profile_id) # type: ignore[return-value]
# ─── refresh / generation ─────────────────────────────────────────
def refresh_playlist(
self,
kind: str,
variant: str = '',
profile_id: int = 1,
config_overrides: Optional[Dict[str, Any]] = None,
) -> PlaylistRecord:
"""Run the kind's generator and persist the result as the
playlist's current snapshot.
Atomic: track list is replaced in a single transaction. On
generator exception, the previous snapshot is preserved and
the error is recorded on the row.
Args:
kind: registered kind identifier.
variant: e.g. '1980s' for time machine, '' for singletons.
profile_id: profile to refresh under.
config_overrides: optional per-call config tweaks merged on
top of the stored config (e.g. UI lets the user "preview
with limit=100" without persisting that change).
Returns:
Updated PlaylistRecord with fresh ``track_count`` /
``last_generated_at`` (or ``last_generation_error`` on
failure).
"""
spec = self.registry.get(kind)
if spec is None:
raise ValueError(f"Unknown playlist kind: {kind!r}")
record = self.ensure_playlist(kind, variant, profile_id)
config = record.config
if config_overrides:
config = config.merged(config_overrides)
try:
tracks = spec.generator(self.deps, variant, config)
except Exception as exc: # noqa: BLE001 — record + re-raise after persisting
logger.exception("Generator failed for kind=%s variant=%s: %s", kind, variant, exc)
self._record_generation_failure(record.id, str(exc))
return self._fetch_playlist_row(kind, variant, profile_id) # type: ignore[return-value]
# Quality post-filters — applied uniformly to every kind so
# generators stay focused on selection logic, not staleness
# bookkeeping. Filters are config-driven; defaults preserve
# the pre-overhaul behavior (no filtering).
tracks = self._apply_quality_filters(tracks, kind, profile_id, config)
return self._persist_snapshot(record.id, kind, profile_id, tracks)
def _apply_quality_filters(
self,
tracks: List[Track],
kind: str,
profile_id: int,
config: PlaylistConfig,
) -> List[Track]:
"""Apply manager-level quality filters to a generator's output.
Currently:
- **Staleness window** (`config.exclude_recent_days > 0`): drops
any track whose primary id was served by this `kind` for this
`profile_id` in the last N days. Prevents the same track
from showing up across consecutive refreshes e.g. a daily
Discovery Shuffle that shouldn't replay yesterday's picks.
Tracks without a primary id pass through unchanged (nothing
to dedupe on).
Returns a new list (never mutates input). When no filter
applies, returns ``tracks`` unchanged."""
if config.exclude_recent_days <= 0 or not tracks:
return tracks
recent_set = set(self.recent_track_ids(profile_id, kind, config.exclude_recent_days))
if not recent_set:
return tracks
return [t for t in tracks if not t.primary_id() or t.primary_id() not in recent_set]
# ─── track read ──────────────────────────────────────────────────
def get_playlist_tracks(self, playlist_id: int) -> List[Track]:
"""Return the persisted track list for a playlist row."""
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT spotify_track_id, itunes_track_id, deezer_track_id,
track_name, artist_name, album_name, album_cover_url,
duration_ms, popularity, track_data_json
FROM personalized_playlist_tracks
WHERE playlist_id = ?
ORDER BY position ASC
""",
(playlist_id,),
)
rows = cursor.fetchall()
return [self._row_to_track(r) for r in rows]
# ─── staleness history ───────────────────────────────────────────
def recent_track_ids(self, profile_id: int, kind: str, days: int) -> List[str]:
"""Return track IDs served by ``kind`` for ``profile_id`` in
the last ``days`` days. Generators query this when honoring
the ``exclude_recent_days`` config knob."""
if days <= 0:
return []
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT DISTINCT track_id
FROM personalized_track_history
WHERE profile_id = ?
AND kind = ?
AND served_at >= datetime('now', ?)
""",
(profile_id, kind, f'-{int(days)} days'),
)
return [r[0] for r in cursor.fetchall() if r[0]]
# ─── internal helpers ────────────────────────────────────────────
def _persist_snapshot(self, playlist_id: int, kind: str, profile_id: int, tracks: List[Track]) -> PlaylistRecord:
"""Atomic replace of a playlist's track list + history append."""
now = datetime.now(timezone.utc).isoformat(timespec='seconds')
primary_source = next(
(t.source for t in tracks if t.source), None,
)
with self.database._get_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("BEGIN")
cursor.execute(
"DELETE FROM personalized_playlist_tracks WHERE playlist_id = ?",
(playlist_id,),
)
for position, track in enumerate(tracks):
td = track.track_data_json
if td is not None and not isinstance(td, str):
td = json.dumps(td)
cursor.execute(
"""
INSERT INTO personalized_playlist_tracks
(playlist_id, position,
spotify_track_id, itunes_track_id, deezer_track_id,
track_name, artist_name, album_name, album_cover_url,
duration_ms, popularity, track_data_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
playlist_id, position,
track.spotify_track_id, track.itunes_track_id, track.deezer_track_id,
track.track_name, track.artist_name, track.album_name, track.album_cover_url,
int(track.duration_ms or 0), int(track.popularity or 0), td,
),
)
cursor.execute(
"""
UPDATE personalized_playlists
SET track_count = ?, last_generated_at = CURRENT_TIMESTAMP,
last_generation_source = ?, last_generation_error = NULL,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(len(tracks), primary_source, playlist_id),
)
# History append — only for tracks with a primary ID,
# used by exclude_recent_days filter on next refresh.
for track in tracks:
tid = track.primary_id()
if not tid:
continue
cursor.execute(
"""
INSERT INTO personalized_track_history
(profile_id, kind, track_id, served_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
""",
(profile_id, kind, tid),
)
conn.commit()
except Exception:
conn.rollback()
raise
# Re-read the row so the returned record carries the fresh
# last_generated_at timestamp.
record = self._fetch_playlist_row_by_id(playlist_id)
if record is None:
raise RuntimeError(f"playlist row {playlist_id} disappeared mid-refresh")
return record
def _record_generation_failure(self, playlist_id: int, error_message: str) -> None:
"""Stamp the error on the row WITHOUT touching tracks."""
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE personalized_playlists
SET last_generation_error = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(error_message[:500], playlist_id),
)
conn.commit()
def _fetch_playlist_row(self, kind: str, variant: str, profile_id: int) -> Optional[PlaylistRecord]:
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, profile_id, kind, variant, name, config_json,
track_count, last_generated_at, last_synced_at,
last_generation_source, last_generation_error
FROM personalized_playlists
WHERE profile_id = ? AND kind = ? AND variant = ?
""",
(profile_id, kind, variant),
)
row = cursor.fetchone()
return self._row_to_record(row) if row else None
def _fetch_playlist_row_by_id(self, playlist_id: int) -> Optional[PlaylistRecord]:
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, profile_id, kind, variant, name, config_json,
track_count, last_generated_at, last_synced_at,
last_generation_source, last_generation_error
FROM personalized_playlists
WHERE id = ?
""",
(playlist_id,),
)
row = cursor.fetchone()
return self._row_to_record(row) if row else None
@staticmethod
def _row_to_record(row: Any) -> PlaylistRecord:
# Tolerate sqlite3.Row + plain tuples.
if hasattr(row, 'keys'):
row = dict(row)
return PlaylistRecord(
id=row['id'], profile_id=row['profile_id'],
kind=row['kind'], variant=row['variant'] or '',
name=row['name'],
config=PlaylistConfig.from_json_dict(_safe_json_loads(row['config_json'])),
track_count=row['track_count'] or 0,
last_generated_at=row.get('last_generated_at'),
last_synced_at=row.get('last_synced_at'),
last_generation_source=row.get('last_generation_source'),
last_generation_error=row.get('last_generation_error'),
)
# Tuple form: positional access matches SELECT order above.
return PlaylistRecord(
id=row[0], profile_id=row[1],
kind=row[2], variant=row[3] or '',
name=row[4],
config=PlaylistConfig.from_json_dict(_safe_json_loads(row[5])),
track_count=row[6] or 0,
last_generated_at=row[7],
last_synced_at=row[8],
last_generation_source=row[9],
last_generation_error=row[10],
)
@staticmethod
def _row_to_track(row: Any) -> Track:
if hasattr(row, 'keys'):
row = dict(row)
return Track(
spotify_track_id=row.get('spotify_track_id'),
itunes_track_id=row.get('itunes_track_id'),
deezer_track_id=row.get('deezer_track_id'),
track_name=row.get('track_name', ''),
artist_name=row.get('artist_name', ''),
album_name=row.get('album_name') or '',
album_cover_url=row.get('album_cover_url'),
duration_ms=int(row.get('duration_ms') or 0),
popularity=int(row.get('popularity') or 0),
track_data_json=_safe_json_loads(row.get('track_data_json')),
)
return Track(
spotify_track_id=row[0], itunes_track_id=row[1], deezer_track_id=row[2],
track_name=row[3], artist_name=row[4], album_name=row[5] or '',
album_cover_url=row[6], duration_ms=int(row[7] or 0),
popularity=int(row[8] or 0),
track_data_json=_safe_json_loads(row[9]),
)
def _safe_json_loads(value: Any) -> Any:
"""Tolerant JSON parse — returns None / dict / passes through
non-string values. Avoids exceptions on bad rows so the manager
never breaks on a single corrupt record."""
if value is None:
return None
if not isinstance(value, str):
return value
if not value.strip():
return None
try:
return json.loads(value)
except (ValueError, TypeError):
return None
__all__ = ['PersonalizedPlaylistManager']

@ -0,0 +1,121 @@
"""Per-kind specifications for the personalized-playlist subsystem.
A ``PlaylistKindSpec`` declares everything the manager needs to know
about one playlist type:
- The kind identifier (stable string used in URLs / configs / DB).
- Human-readable display name template (with optional ``{variant}``
substitution).
- Whether the kind supports / requires variants and what valid
variants look like.
- The default user-tweakable config for this kind.
- A generator callable that produces a fresh track list given
``(deps, variant, config)``.
Generators live in ``core/personalized/generators/`` (added in
later commits as each kind is migrated). For commit 1 the registry
ships empty schema + manager land first; generators arrive
incrementally with their per-kind tests.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from core.personalized.types import PlaylistConfig, Track
# Generator callable signature.
# Args:
# deps: opaque object the generator may need (DB, service handle,
# config_manager). Each generator declares what it pulls.
# variant: e.g. '1980s' for time machine, 'halloween' for seasonal.
# Empty string '' for singleton kinds.
# config: PlaylistConfig with the user's per-playlist overrides
# merged onto the kind's defaults.
# Returns:
# List[Track] — the fresh snapshot to persist as the playlist's
# current track list.
GeneratorFn = Callable[[Any, str, PlaylistConfig], List[Track]]
# Variant resolver: returns the list of currently-valid variant
# identifiers for a kind that supports multiple instances. Used by
# the manager when auto-creating playlist rows for newly available
# variants (e.g. a new decade, a new season). Singletons return [''].
VariantResolver = Callable[[Any], List[str]]
@dataclass
class PlaylistKindSpec:
"""Declaration of one playlist kind.
See module docstring for the contract.
"""
kind: str
name_template: str # e.g. 'Time Machine — {variant}', 'Hidden Gems'
description: str
default_config: PlaylistConfig
generator: GeneratorFn
variant_resolver: Optional[VariantResolver] = None
requires_variant: bool = False
# Tags for UI grouping ('curated' / 'discovery' / 'time' / 'genre').
tags: List[str] = field(default_factory=list)
def display_name(self, variant: str) -> str:
"""Render the human-readable playlist name for a given variant."""
if not variant:
return self.name_template.replace('{variant}', '').strip(' —-')
return self.name_template.format(variant=variant)
class PlaylistKindRegistry:
"""Module-level registry of every kind the manager knows about.
Populated at import time as each generator module is loaded. The
manager queries the registry at runtime to dispatch refresh
requests, list available kinds for the UI, and resolve variants.
"""
def __init__(self) -> None:
self._kinds: Dict[str, PlaylistKindSpec] = {}
def register(self, spec: PlaylistKindSpec) -> None:
if spec.kind in self._kinds:
raise ValueError(f"Kind {spec.kind!r} already registered")
self._kinds[spec.kind] = spec
def get(self, kind: str) -> Optional[PlaylistKindSpec]:
return self._kinds.get(kind)
def all(self) -> List[PlaylistKindSpec]:
return list(self._kinds.values())
def kinds(self) -> List[str]:
return list(self._kinds.keys())
def reset_for_tests(self) -> None:
"""Drop every registration. Tests only — production runs
register at module import and never reset."""
self._kinds.clear()
# Module-level singleton. Generators register against this on import.
_registry = PlaylistKindRegistry()
def get_registry() -> PlaylistKindRegistry:
"""Public accessor for the module-level registry. Tests can reset
via ``get_registry().reset_for_tests()``."""
return _registry
__all__ = [
'PlaylistKindSpec',
'PlaylistKindRegistry',
'GeneratorFn',
'VariantResolver',
'get_registry',
]

@ -0,0 +1,174 @@
"""Shared dataclasses for the personalized-playlist subsystem.
These are pure data containers no business logic, no IO. The
manager + specs + generators all speak in these types so the seam
between them stays mechanical.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class Track:
"""A track in a personalized playlist.
Mirrors the shape returned by
``PersonalizedPlaylistsService._build_track_dict`` so the legacy
generators can be wrapped without translating fields. Always at
least one of the source IDs is populated; ``track_data_json`` is
the full enriched track object when available (used by sync /
download paths that need richer metadata than just the ID)."""
track_name: str
artist_name: str
album_name: str = ''
spotify_track_id: Optional[str] = None
itunes_track_id: Optional[str] = None
deezer_track_id: Optional[str] = None
album_cover_url: Optional[str] = None
duration_ms: int = 0
popularity: int = 0
track_data_json: Optional[Any] = None # dict OR JSON string OR None
source: Optional[str] = None
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> 'Track':
"""Coerce a legacy generator's output dict into a Track.
Tolerates the ``_artist_genres_raw`` / ``_release_date`` extra-
column passthroughs by ignoring them this dataclass only
carries the storage-layer fields."""
return cls(
track_name=d.get('track_name', 'Unknown'),
artist_name=d.get('artist_name', 'Unknown'),
album_name=d.get('album_name', '') or '',
spotify_track_id=d.get('spotify_track_id'),
itunes_track_id=d.get('itunes_track_id'),
deezer_track_id=d.get('deezer_track_id'),
album_cover_url=d.get('album_cover_url'),
duration_ms=int(d.get('duration_ms') or 0),
popularity=int(d.get('popularity') or 0),
track_data_json=d.get('track_data_json'),
source=d.get('source'),
)
def primary_id(self) -> Optional[str]:
"""Return the first non-empty source ID. Used as the staleness-
history key + the dedupe key when persisting tracks."""
return (
self.spotify_track_id
or self.itunes_track_id
or self.deezer_track_id
or None
)
@dataclass
class PlaylistConfig:
"""User-tweakable knobs per playlist instance.
Stored as JSON in `personalized_playlists.config_json`. Defaults
come from the kind's spec; user overrides override per-playlist.
Fields:
- limit: target number of tracks
- max_per_album / max_per_artist: diversity caps
- popularity_min / popularity_max: filter bounds (None = ignore)
- exclude_recent_days: avoid tracks served by this kind in the
last N days (0 = no exclusion)
- recency_days: only include tracks released in the last N days
(None = all-time)
- seed: optional deterministic seed for randomization (None =
use system random; same seed + same pool = same output)
- extra: free-form per-kind extension dict (e.g. seasonal mix
stores ``selected_seasons``, time machine stores
``selected_decades``, genre stores ``selected_genres``).
"""
limit: int = 50
max_per_album: int = 2
max_per_artist: int = 3
popularity_min: Optional[int] = None
popularity_max: Optional[int] = None
exclude_recent_days: int = 0
recency_days: Optional[int] = None
seed: Optional[int] = None
extra: Dict[str, Any] = field(default_factory=dict)
def to_json_dict(self) -> Dict[str, Any]:
"""Serialise to a JSON-safe dict for storage."""
return {
'limit': self.limit,
'max_per_album': self.max_per_album,
'max_per_artist': self.max_per_artist,
'popularity_min': self.popularity_min,
'popularity_max': self.popularity_max,
'exclude_recent_days': self.exclude_recent_days,
'recency_days': self.recency_days,
'seed': self.seed,
'extra': dict(self.extra),
}
@classmethod
def from_json_dict(cls, d: Optional[Dict[str, Any]]) -> 'PlaylistConfig':
"""Reconstruct from a stored JSON dict. Missing fields fall
back to defaults so old rows + new code stay compatible."""
if not isinstance(d, dict):
return cls()
return cls(
limit=int(d.get('limit', 50)),
max_per_album=int(d.get('max_per_album', 2)),
max_per_artist=int(d.get('max_per_artist', 3)),
popularity_min=d.get('popularity_min'),
popularity_max=d.get('popularity_max'),
exclude_recent_days=int(d.get('exclude_recent_days', 0)),
recency_days=d.get('recency_days'),
seed=d.get('seed'),
extra=dict(d.get('extra') or {}),
)
def merged(self, overrides: Dict[str, Any]) -> 'PlaylistConfig':
"""Return a new PlaylistConfig with `overrides` merged in.
Used when a user PATCHes their per-playlist config apply
only the fields they sent, leave the rest at their stored
values."""
base = self.to_json_dict()
for key, value in (overrides or {}).items():
if key == 'extra' and isinstance(value, dict):
base['extra'] = {**base.get('extra', {}), **value}
elif key in base:
base[key] = value
return PlaylistConfig.from_json_dict(base)
@dataclass
class PlaylistRecord:
"""One row of `personalized_playlists` plus its track count.
The live track list is fetched separately via
``PersonalizedPlaylistManager.get_playlist_tracks(playlist_id)``
so list / detail responses can stay cheap when the caller only
needs metadata."""
id: int
profile_id: int
kind: str
variant: str
name: str
config: PlaylistConfig
track_count: int
last_generated_at: Optional[str]
last_synced_at: Optional[str]
last_generation_source: Optional[str]
last_generation_error: Optional[str]
__all__ = [
'Track',
'PlaylistConfig',
'PlaylistRecord',
]

@ -779,19 +779,21 @@ class PersonalizedPlaylistsService:
}
def _get_library_tracks_by_category(self, category: str, limit: int) -> List[Dict]:
"""Library tracks intentionally excluded from daily mixes.
The legacy ambition was 50% library + 50% discovery, but the
``tracks`` table doesn't carry source IDs (no
``spotify_track_id`` / ``itunes_track_id`` / ``deezer_track_id``
column) so library rows can't flow through the same
sync / wishlist pipeline that discovery tracks do. Returning
them here would produce un-syncable, un-downloadable phantom
entries.
Returns ``[]`` so callers compose with ``_get_discovery_tracks_by_category``
for a discovery-only mix. A future PR can backfill source IDs
into library rows and lift this restriction.
"""
Get tracks from library matching genre or artist
NOTE: This requires library tracks to have Spotify metadata which may not be available.
Returns empty list if schema incompatible.
"""
try:
logger.warning("Library tracks by category requires Spotify-linked library - returning empty")
return []
except Exception as e:
logger.error(f"Error getting library tracks by category: {e}")
return []
return []
def _get_discovery_tracks_by_category(self, category: str, limit: int) -> List[Dict]:
"""Get tracks from discovery pool matching genre or artist"""

@ -752,13 +752,21 @@ class MusicDatabase:
)
""")
# Personalized-playlists subsystem schema (Group A + Group B
# unified storage). Idempotent — safe on every startup.
try:
from database.personalized_schema import ensure_personalized_schema
ensure_personalized_schema(conn)
except Exception as ps_err:
logger.error(f"Personalized-playlist schema init failed: {ps_err}")
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {e}")
raise
def _add_mirrored_playlist_explored_column(self, cursor):
"""Add explored_at column to mirrored_playlists to persist explore badge."""
try:

@ -0,0 +1,139 @@
"""Schema + migration helpers for the personalized-playlists subsystem.
Pre-existing state (this PR replaces over time):
- Group A (Fresh Tape / The Archives / Seasonal Mix) lives in
`discovery_curated_playlists` (track_ids only) and
`curated_seasonal_playlists` (track_ids + seasonal_tracks join).
Read paths exist; refresh paths are tied to specific workers.
- Group B (Hidden Gems / Discovery Shuffle / Time Machine / Popular
Picks / Genre / Daily Mixes) is computed on-demand by
`PersonalizedPlaylistsService` no persistence, every call reruns
the generator with `ORDER BY RANDOM()` so the result rotates.
Post-overhaul (this module's responsibility):
- ALL personalized playlists land in a unified storage layer with a
stable (profile_id, kind, variant) identity, JSON config per
playlist (limit, diversity caps, popularity / recency filters,
exclude-recent-days, randomization seed), and a persistent track
list that only mutates when the playlist is explicitly refreshed.
Tables created here:
- ``personalized_playlists`` one row per (profile, kind, variant).
Variants disambiguate kinds with multiple instances:
* ``time_machine``: variant = ``'1980s'`` / ``'1990s'`` / ...
* ``seasonal_mix``: variant = ``'halloween'`` / ``'christmas'`` / ...
* ``genre_playlist``: variant = ``'rock'`` / ``'electronic_dance'`` / ...
* ``daily_mix``: variant = ``'1'`` / ``'2'`` / ``'3'`` / ``'4'``
* Singletons (``hidden_gems``, ``discovery_shuffle``,
``popular_picks``, ``fresh_tape``, ``archives``): variant = ``''``.
Variant '' (empty) is used instead of NULL so the UNIQUE
constraint behaves predictably (NULL doesn't collide with NULL in
SQLite UNIQUE indexes would let multiple singleton rows
coexist).
- ``personalized_playlist_tracks`` current snapshot per playlist.
Cleared + repopulated on refresh; never partial-mutates.
- ``personalized_track_history`` append-only log of which tracks
were served by which (profile, kind) over time. Powers the
``exclude_recent_days`` config option so generators can avoid
recommending the same track twice in N days.
The schema is created idempotently `ensure_personalized_schema`
runs CREATE TABLE IF NOT EXISTS at startup, so existing installs
upgrade silently."""
from __future__ import annotations
from typing import Any
from utils.logging_config import get_logger
logger = get_logger("database.personalized_schema")
PERSONALIZED_PLAYLISTS_DDL = """
CREATE TABLE IF NOT EXISTS personalized_playlists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
profile_id INTEGER NOT NULL DEFAULT 1,
kind TEXT NOT NULL,
variant TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL,
config_json TEXT NOT NULL DEFAULT '{}',
track_count INTEGER NOT NULL DEFAULT 0,
last_generated_at TIMESTAMP,
last_synced_at TIMESTAMP,
last_generation_source TEXT,
last_generation_error TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE (profile_id, kind, variant)
)
"""
PERSONALIZED_PLAYLIST_TRACKS_DDL = """
CREATE TABLE IF NOT EXISTS personalized_playlist_tracks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
playlist_id INTEGER NOT NULL,
position INTEGER NOT NULL,
spotify_track_id TEXT,
itunes_track_id TEXT,
deezer_track_id TEXT,
track_name TEXT,
artist_name TEXT,
album_name TEXT,
album_cover_url TEXT,
duration_ms INTEGER,
popularity INTEGER,
track_data_json TEXT,
FOREIGN KEY (playlist_id) REFERENCES personalized_playlists(id) ON DELETE CASCADE,
UNIQUE (playlist_id, position)
)
"""
PERSONALIZED_PLAYLIST_TRACKS_INDEX = """
CREATE INDEX IF NOT EXISTS idx_personalized_tracks_playlist
ON personalized_playlist_tracks(playlist_id)
"""
PERSONALIZED_TRACK_HISTORY_DDL = """
CREATE TABLE IF NOT EXISTS personalized_track_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
profile_id INTEGER NOT NULL DEFAULT 1,
kind TEXT NOT NULL,
track_id TEXT NOT NULL,
served_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
PERSONALIZED_TRACK_HISTORY_INDEX = """
CREATE INDEX IF NOT EXISTS idx_personalized_history_lookup
ON personalized_track_history(profile_id, kind, track_id, served_at)
"""
def ensure_personalized_schema(connection: Any) -> None:
"""Create the personalized-playlist tables + indexes if missing.
Idempotent. Safe to call on every app startup. Caller is responsible
for committing the connection (we leave that to the caller so this
composes with other schema-init steps in one transaction).
"""
cursor = connection.cursor()
cursor.execute(PERSONALIZED_PLAYLISTS_DDL)
cursor.execute(PERSONALIZED_PLAYLIST_TRACKS_DDL)
cursor.execute(PERSONALIZED_PLAYLIST_TRACKS_INDEX)
cursor.execute(PERSONALIZED_TRACK_HISTORY_DDL)
cursor.execute(PERSONALIZED_TRACK_HISTORY_INDEX)
logger.debug("Personalized-playlist schema ensured")
__all__ = [
'ensure_personalized_schema',
'PERSONALIZED_PLAYLISTS_DDL',
'PERSONALIZED_PLAYLIST_TRACKS_DDL',
'PERSONALIZED_PLAYLIST_TRACKS_INDEX',
'PERSONALIZED_TRACK_HISTORY_DDL',
'PERSONALIZED_TRACK_HISTORY_INDEX',
]

@ -0,0 +1,181 @@
"""Boundary tests for `core.personalized.api` handler functions.
These are pure-function dispatchers they take a manager + ids,
return a JSON-serializable dict. No Flask required, no real DB.
The Flask wiring in `web_server.py` adds `jsonify` + URL routing
on top.
"""
from __future__ import annotations
import sqlite3
from types import SimpleNamespace
from typing import Any, List
import pytest
from core.personalized import api as _api
from core.personalized.manager import PersonalizedPlaylistManager
from core.personalized.specs import PlaylistKindRegistry, PlaylistKindSpec
from core.personalized.types import PlaylistConfig, Track
from database.personalized_schema import ensure_personalized_schema
class _FakeDB:
def __init__(self, path):
self.path = path
def _get_connection(self):
c = sqlite3.connect(self.path)
c.row_factory = sqlite3.Row
return c
@pytest.fixture
def db(tmp_path):
p = str(tmp_path / 't.db')
conn = sqlite3.connect(p)
ensure_personalized_schema(conn)
conn.commit()
conn.close()
return _FakeDB(p)
@pytest.fixture
def registry():
return PlaylistKindRegistry()
@pytest.fixture
def manager(db, registry):
return PersonalizedPlaylistManager(db, deps=None, registry=registry)
def _register(reg, kind='hidden_gems', requires_variant=False, generator=None):
spec = PlaylistKindSpec(
kind=kind, name_template=kind.replace('_', ' ').title(),
description=f'Description for {kind}',
default_config=PlaylistConfig(limit=20),
generator=generator or (lambda *a, **k: []),
requires_variant=requires_variant,
tags=['test'],
)
reg.register(spec)
return spec
# ─── list_kinds ──────────────────────────────────────────────────────
class TestListKinds:
def test_lists_every_registered_kind(self, registry):
_register(registry, kind='hidden_gems')
_register(registry, kind='time_machine', requires_variant=True)
out = _api.list_kinds(registry)
assert out['success'] is True
kinds = {k['kind'] for k in out['kinds']}
assert kinds == {'hidden_gems', 'time_machine'}
def test_kind_metadata_shape(self, registry):
_register(registry, kind='hidden_gems')
out = _api.list_kinds(registry)
kind = out['kinds'][0]
assert kind['kind'] == 'hidden_gems'
assert kind['requires_variant'] is False
assert kind['tags'] == ['test']
assert kind['default_config']['limit'] == 20
def test_empty_registry(self):
out = _api.list_kinds(PlaylistKindRegistry())
assert out == {'success': True, 'kinds': []}
# ─── list_playlists ─────────────────────────────────────────────────
class TestListPlaylists:
def test_returns_empty_when_no_playlists(self, manager, registry):
_register(registry)
out = _api.list_playlists(manager, profile_id=1)
assert out == {'success': True, 'playlists': []}
def test_serializes_playlist_record(self, manager, registry):
_register(registry)
manager.ensure_playlist('hidden_gems', '', 1)
out = _api.list_playlists(manager, profile_id=1)
assert out['success'] is True
assert len(out['playlists']) == 1
pl = out['playlists'][0]
assert pl['kind'] == 'hidden_gems'
assert pl['variant'] == ''
assert pl['name'] == 'Hidden Gems'
assert pl['track_count'] == 0
assert pl['config']['limit'] == 20
# ─── get_playlist_with_tracks ───────────────────────────────────────
class TestGetPlaylistWithTracks:
def test_auto_creates_on_first_get(self, manager, registry):
_register(registry)
out = _api.get_playlist_with_tracks(manager, 'hidden_gems', '', 1)
assert out['success'] is True
assert out['playlist']['kind'] == 'hidden_gems'
assert out['tracks'] == []
def test_returns_persisted_tracks(self, manager, registry):
gen_calls = []
def gen(deps, variant, config):
gen_calls.append(1)
return [Track(track_name='X', artist_name='Y', spotify_track_id='sp-1')]
_register(registry, generator=gen)
manager.refresh_playlist('hidden_gems', '', 1)
out = _api.get_playlist_with_tracks(manager, 'hidden_gems', '', 1)
assert len(out['tracks']) == 1
assert out['tracks'][0]['track_name'] == 'X'
assert out['tracks'][0]['spotify_track_id'] == 'sp-1'
def test_unknown_kind_raises_value_error(self, manager):
with pytest.raises(ValueError):
_api.get_playlist_with_tracks(manager, 'nope', '', 1)
# ─── refresh_playlist ───────────────────────────────────────────────
class TestRefreshPlaylist:
def test_refresh_runs_generator_and_returns_tracks(self, manager, registry):
_register(registry, generator=lambda *a, **k: [
Track(track_name='T1', artist_name='A', spotify_track_id='1'),
Track(track_name='T2', artist_name='B', spotify_track_id='2'),
])
out = _api.refresh_playlist(manager, 'hidden_gems', '', 1)
assert out['success'] is True
assert len(out['tracks']) == 2
assert out['playlist']['track_count'] == 2
assert out['playlist']['last_generated_at'] is not None
def test_config_overrides_passed_through(self, manager, registry):
captured = {}
def gen(deps, variant, config):
captured['limit'] = config.limit
return []
_register(registry, generator=gen)
_api.refresh_playlist(manager, 'hidden_gems', '', 1, config_overrides={'limit': 99})
assert captured['limit'] == 99
# ─── update_config ──────────────────────────────────────────────────
class TestUpdateConfig:
def test_patches_config(self, manager, registry):
_register(registry)
out = _api.update_config(manager, 'hidden_gems', '', 1, {'limit': 75})
assert out['success'] is True
assert out['playlist']['config']['limit'] == 75

@ -0,0 +1,316 @@
"""Boundary tests for the curated / hybrid personalized generators
(`daily_mix`, `fresh_tape`, `archives`, `seasonal_mix`)."""
from __future__ import annotations
import sqlite3
from types import SimpleNamespace
from typing import Any, List
from unittest.mock import MagicMock
import pytest
from core.personalized.generators import archives as _arch_mod
from core.personalized.generators import daily_mix as _dm_mod
from core.personalized.generators import fresh_tape as _ft_mod
from core.personalized.generators import seasonal_mix as _sm_mod
from core.personalized.specs import get_registry
from core.personalized.types import PlaylistConfig
# ─── daily_mix ───────────────────────────────────────────────────────
class _DailyMixService:
"""Stub PersonalizedPlaylistsService for daily_mix tests."""
GENRE_MAPPING = {}
def __init__(self, top_genres=None, genre_tracks=None):
self._top = top_genres or []
self._tracks = genre_tracks or {}
self.calls: List[dict] = []
def get_top_genres_from_library(self, limit):
self.calls.append({'method': 'get_top_genres_from_library', 'limit': limit})
return self._top
def get_genre_playlist(self, genre, limit, **kw):
self.calls.append({'method': 'get_genre_playlist', 'genre': genre, 'limit': limit})
return self._tracks.get(genre, [])
class TestDailyMix:
def test_registered(self):
spec = get_registry().get('daily_mix')
assert spec is not None
assert spec.requires_variant is True
def test_variant_resolver_returns_ranks(self):
spec = get_registry().get('daily_mix')
ranks = spec.variant_resolver(SimpleNamespace(service=_DailyMixService()))
assert ranks == ['1', '2', '3', '4']
def test_resolves_rank_to_top_genre(self):
svc = _DailyMixService(
top_genres=[('Rock', 100), ('Pop', 80), ('Jazz', 30)],
genre_tracks={'Rock': [{'track_name': 'R', 'artist_name': 'A'}]},
)
out = _dm_mod.generate(SimpleNamespace(service=svc), '1', PlaylistConfig(limit=10))
assert len(out) == 1
assert out[0].track_name == 'R'
# Service called for top-genre lookup + genre playlist.
assert {c['method'] for c in svc.calls} == {
'get_top_genres_from_library', 'get_genre_playlist',
}
def test_rank_beyond_top_returns_empty(self):
svc = _DailyMixService(top_genres=[('Rock', 100)]) # only 1 top genre
out = _dm_mod.generate(SimpleNamespace(service=svc), '4', PlaylistConfig())
assert out == []
def test_invalid_variant_raises(self):
deps = SimpleNamespace(service=_DailyMixService())
with pytest.raises(ValueError, match='must be a rank int'):
_dm_mod.generate(deps, 'abc', PlaylistConfig())
# ─── fresh_tape / archives shared shape ─────────────────────────────
class _StubPoolTrack:
def __init__(self, sid, name='T', artist='A', source='spotify'):
self.spotify_track_id = sid
self.itunes_track_id = None
self.deezer_track_id = None
self.track_name = name
self.artist_name = artist
self.album_name = 'Album'
self.album_cover_url = None
self.duration_ms = 200000
self.popularity = 50
self.track_data_json = None
self.source = source
class _CuratedDB:
def __init__(self, curated_ids=None, pool_tracks=None):
self.curated_ids = curated_ids or []
self.pool_tracks = pool_tracks or []
self.requested_keys: List[str] = []
def get_curated_playlist(self, key, profile_id=1):
self.requested_keys.append(key)
return list(self.curated_ids)
def get_discovery_pool_tracks(self, **kwargs):
return list(self.pool_tracks)
def _curated_deps(db):
return SimpleNamespace(
database=db,
get_current_profile_id=lambda: 1,
get_active_discovery_source=lambda: 'spotify',
)
class TestFreshTape:
def test_registered(self):
spec = get_registry().get('fresh_tape')
assert spec is not None
assert spec.requires_variant is False
assert spec.display_name('') == 'Fresh Tape'
def test_returns_empty_when_no_curated_ids(self):
db = _CuratedDB(curated_ids=[])
out = _ft_mod.generate(_curated_deps(db), '', PlaylistConfig())
assert out == []
def test_hydrates_curated_ids_from_pool(self):
db = _CuratedDB(
curated_ids=['sp-1', 'sp-2', 'sp-missing'],
pool_tracks=[
_StubPoolTrack('sp-1', name='Song1', artist='Artist1'),
_StubPoolTrack('sp-2', name='Song2', artist='Artist2'),
],
)
out = _ft_mod.generate(_curated_deps(db), '', PlaylistConfig())
# Missing IDs silently skipped; order preserved.
assert [t.track_name for t in out] == ['Song1', 'Song2']
def test_tries_source_specific_then_fallback_key(self):
# First lookup (source-specific) returns []; second (generic) returns IDs.
class _DB:
def __init__(self):
self.calls = []
self.responses = {
'release_radar_spotify': [],
'release_radar': ['sp-1'],
}
def get_curated_playlist(self, key, profile_id=1):
self.calls.append(key)
return self.responses.get(key, [])
def get_discovery_pool_tracks(self, **kw):
return [_StubPoolTrack('sp-1', name='Hit')]
db = _DB()
out = _ft_mod.generate(_curated_deps(db), '', PlaylistConfig())
assert db.calls == ['release_radar_spotify', 'release_radar']
assert len(out) == 1
def test_respects_limit(self):
db = _CuratedDB(
curated_ids=[f'sp-{i}' for i in range(20)],
pool_tracks=[_StubPoolTrack(f'sp-{i}', name=f'T{i}') for i in range(20)],
)
out = _ft_mod.generate(_curated_deps(db), '', PlaylistConfig(limit=5))
assert len(out) == 5
def test_missing_database_dep_raises(self):
with pytest.raises(RuntimeError, match='missing `database`'):
_ft_mod.generate(SimpleNamespace(), '', PlaylistConfig())
class TestArchives:
def test_registered(self):
spec = get_registry().get('archives')
assert spec is not None
assert spec.display_name('') == 'The Archives'
def test_uses_discovery_weekly_curated_key(self):
db = _CuratedDB(
curated_ids=['sp-1'],
pool_tracks=[_StubPoolTrack('sp-1', name='Discover')],
)
_arch_mod.generate(_curated_deps(db), '', PlaylistConfig())
# Source-specific request fires first; fallback only fires
# when source-specific returns empty. Stub returns IDs on
# every call, so only the first key gets queried.
assert db.requested_keys[0] == 'discovery_weekly_spotify'
# ─── seasonal_mix ───────────────────────────────────────────────────
class _SeasonalService:
def __init__(self, track_ids):
self.track_ids = track_ids
def get_curated_seasonal_playlist(self, season_key, source=None):
return list(self.track_ids)
@pytest.fixture
def seasonal_db(tmp_path):
"""Real sqlite DB with seasonal_tracks rows for hydration."""
p = str(tmp_path / 'seasonal.db')
conn = sqlite3.connect(p)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE seasonal_tracks (
id INTEGER PRIMARY KEY,
season_key TEXT, source TEXT,
spotify_track_id TEXT, track_name TEXT, artist_name TEXT,
album_name TEXT, album_cover_url TEXT,
duration_ms INTEGER, popularity INTEGER, track_data_json TEXT
)
""")
seed = [
('halloween', 'spotify', 'sp-1', 'Spooky', 'Ghost Band', 'Album1', None, 200000, 80, '{"id":"sp-1"}'),
('halloween', 'spotify', 'sp-2', 'Haunted', 'Ghost Band', 'Album2', None, 210000, 70, None),
('halloween', 'spotify', 'sp-extra', 'Extra', 'Other', 'Album3', None, 200000, 60, None),
]
cursor.executemany("""
INSERT INTO seasonal_tracks
(season_key, source, spotify_track_id, track_name, artist_name,
album_name, album_cover_url, duration_ms, popularity, track_data_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", seed)
conn.commit()
conn.close()
class _DB:
def __init__(self, path): self.path = path
def _get_connection(self):
c = sqlite3.connect(self.path)
c.row_factory = sqlite3.Row
return c
return _DB(p)
class TestSeasonalMix:
def test_registered(self):
spec = get_registry().get('seasonal_mix')
assert spec is not None
assert spec.requires_variant is True
def test_variant_resolver_returns_seasons(self):
spec = get_registry().get('seasonal_mix')
seasons = spec.variant_resolver(None)
assert 'halloween' in seasons
assert 'christmas' in seasons
def test_no_variant_raises(self, seasonal_db):
deps = SimpleNamespace(
seasonal_service=_SeasonalService(['sp-1']),
database=seasonal_db,
get_active_discovery_source=lambda: 'spotify',
)
with pytest.raises(ValueError, match='requires a season variant'):
_sm_mod.generate(deps, '', PlaylistConfig())
def test_hydrates_curated_ids_in_order(self, seasonal_db):
deps = SimpleNamespace(
seasonal_service=_SeasonalService(['sp-2', 'sp-1']),
database=seasonal_db,
get_active_discovery_source=lambda: 'spotify',
)
out = _sm_mod.generate(deps, 'halloween', PlaylistConfig())
assert [t.track_name for t in out] == ['Haunted', 'Spooky']
def test_missing_track_id_silently_skipped(self, seasonal_db):
deps = SimpleNamespace(
seasonal_service=_SeasonalService(['sp-1', 'sp-not-in-db']),
database=seasonal_db,
get_active_discovery_source=lambda: 'spotify',
)
out = _sm_mod.generate(deps, 'halloween', PlaylistConfig())
assert len(out) == 1
assert out[0].track_name == 'Spooky'
def test_track_data_json_round_trips(self, seasonal_db):
deps = SimpleNamespace(
seasonal_service=_SeasonalService(['sp-1']),
database=seasonal_db,
get_active_discovery_source=lambda: 'spotify',
)
out = _sm_mod.generate(deps, 'halloween', PlaylistConfig())
# sp-1 had JSON; sp-2 had None.
assert out[0].track_data_json == {'id': 'sp-1'}
def test_empty_curated_returns_empty(self, seasonal_db):
deps = SimpleNamespace(
seasonal_service=_SeasonalService([]),
database=seasonal_db,
get_active_discovery_source=lambda: 'spotify',
)
out = _sm_mod.generate(deps, 'halloween', PlaylistConfig())
assert out == []
def test_respects_limit(self, seasonal_db):
deps = SimpleNamespace(
seasonal_service=_SeasonalService(['sp-1', 'sp-2', 'sp-extra']),
database=seasonal_db,
get_active_discovery_source=lambda: 'spotify',
)
out = _sm_mod.generate(deps, 'halloween', PlaylistConfig(limit=2))
assert len(out) == 2
def test_missing_seasonal_service_raises(self):
deps = SimpleNamespace(database=object())
with pytest.raises(RuntimeError, match='missing `seasonal_service`'):
_sm_mod.generate(deps, 'halloween', PlaylistConfig())

@ -0,0 +1,147 @@
"""Boundary tests for the singleton-kind personalized generators
(`hidden_gems`, `discovery_shuffle`, `popular_picks`).
Each generator wraps the legacy
``PersonalizedPlaylistsService`` method 1:1, so the tests pin:
- registration side-effect at import
- generator forwards `config.limit` correctly
- empty / None / non-dict service output []
- tracks coerced through `Track.from_dict`
- missing service in deps raises a clear error"""
from __future__ import annotations
from types import SimpleNamespace
from typing import Any, List
import pytest
# Importing each generator triggers registration as a side-effect.
from core.personalized.generators import discovery_shuffle as _ds_mod
from core.personalized.generators import hidden_gems as _hg_mod
from core.personalized.generators import popular_picks as _pp_mod
from core.personalized.specs import get_registry
from core.personalized.types import PlaylistConfig
class _StubService:
"""Records every call so tests can assert on dispatched limits."""
def __init__(self, return_value=None):
self.calls: List[dict] = []
self.return_value = return_value if return_value is not None else []
def get_hidden_gems(self, limit):
self.calls.append({'method': 'get_hidden_gems', 'limit': limit})
return self.return_value
def get_discovery_shuffle(self, limit):
self.calls.append({'method': 'get_discovery_shuffle', 'limit': limit})
return self.return_value
def get_popular_picks(self, limit):
self.calls.append({'method': 'get_popular_picks', 'limit': limit})
return self.return_value
def _deps(svc):
return SimpleNamespace(service=svc)
# ─── registration ────────────────────────────────────────────────────
class TestRegistration:
def test_hidden_gems_registered(self):
spec = get_registry().get('hidden_gems')
assert spec is not None
assert spec.kind == 'hidden_gems'
assert spec.requires_variant is False
assert spec.default_config.limit == 50
def test_discovery_shuffle_registered(self):
spec = get_registry().get('discovery_shuffle')
assert spec is not None
assert spec.requires_variant is False
def test_popular_picks_registered(self):
spec = get_registry().get('popular_picks')
assert spec is not None
assert spec.requires_variant is False
def test_display_names(self):
assert get_registry().get('hidden_gems').display_name('') == 'Hidden Gems'
assert get_registry().get('discovery_shuffle').display_name('') == 'Discovery Shuffle'
assert get_registry().get('popular_picks').display_name('') == 'Popular Picks'
# ─── generator dispatch ──────────────────────────────────────────────
class TestHiddenGemsGenerator:
def test_forwards_limit(self):
svc = _StubService()
_hg_mod.generate(_deps(svc), '', PlaylistConfig(limit=75))
assert svc.calls == [{'method': 'get_hidden_gems', 'limit': 75}]
def test_uses_default_limit_when_config_default(self):
svc = _StubService()
_hg_mod.generate(_deps(svc), '', PlaylistConfig())
assert svc.calls[0]['limit'] == 50
def test_coerces_tracks(self):
svc = _StubService(return_value=[
{'track_name': 'A', 'artist_name': 'X', 'spotify_track_id': 'sp-1'},
{'track_name': 'B', 'artist_name': 'Y', 'spotify_track_id': 'sp-2'},
])
out = _hg_mod.generate(_deps(svc), '', PlaylistConfig())
assert len(out) == 2
assert out[0].track_name == 'A'
assert out[0].spotify_track_id == 'sp-1'
def test_empty_service_output_returns_empty_list(self):
svc = _StubService(return_value=[])
out = _hg_mod.generate(_deps(svc), '', PlaylistConfig())
assert out == []
def test_none_service_output_returns_empty_list(self):
svc = _StubService(return_value=None)
out = _hg_mod.generate(_deps(svc), '', PlaylistConfig())
assert out == []
class TestDiscoveryShuffleGenerator:
def test_forwards_limit(self):
svc = _StubService()
_ds_mod.generate(_deps(svc), '', PlaylistConfig(limit=42))
assert svc.calls == [{'method': 'get_discovery_shuffle', 'limit': 42}]
def test_coerces_tracks(self):
svc = _StubService(return_value=[{'track_name': 'Z', 'artist_name': 'Q'}])
out = _ds_mod.generate(_deps(svc), '', PlaylistConfig())
assert out[0].track_name == 'Z'
class TestPopularPicksGenerator:
def test_forwards_limit(self):
svc = _StubService()
_pp_mod.generate(_deps(svc), '', PlaylistConfig(limit=10))
assert svc.calls == [{'method': 'get_popular_picks', 'limit': 10}]
# ─── deps validation ─────────────────────────────────────────────────
class TestDepsValidation:
def test_missing_service_raises(self):
# No `service` attribute on deps.
deps = SimpleNamespace()
with pytest.raises(RuntimeError, match='missing `service`'):
_hg_mod.generate(deps, '', PlaylistConfig())
def test_dict_form_deps_accepted(self):
# generators._common.get_service tolerates dict deps too.
svc = _StubService()
out = _hg_mod.generate({'service': svc}, '', PlaylistConfig())
assert isinstance(out, list)
assert svc.calls

@ -0,0 +1,131 @@
"""Boundary tests for variant-bearing personalized generators
(`time_machine` per decade, `genre_playlist` per genre).
Each generator coerces a URL-safe variant string into the form the
legacy service expects, then forwards. Tests pin the variant
parsing + service dispatch + variant_resolver listing."""
from __future__ import annotations
from types import SimpleNamespace
from typing import List
import pytest
from core.personalized.generators import genre_playlist as _gp_mod
from core.personalized.generators import time_machine as _tm_mod
from core.personalized.specs import get_registry
from core.personalized.types import PlaylistConfig
class _StubService:
GENRE_MAPPING = {
'Electronic/Dance': ['house', 'techno'],
'Hip Hop/Rap': ['hip hop', 'rap'],
'Rock': ['rock', 'punk'],
}
def __init__(self):
self.calls: List[dict] = []
def get_decade_playlist(self, decade, limit, **kw):
self.calls.append({'method': 'get_decade_playlist', 'decade': decade, 'limit': limit})
return [{'track_name': f'D{decade}', 'artist_name': 'A'}]
def get_genre_playlist(self, genre, limit, **kw):
self.calls.append({'method': 'get_genre_playlist', 'genre': genre, 'limit': limit})
return [{'track_name': f'G{genre}', 'artist_name': 'A'}]
def _deps():
return SimpleNamespace(service=_StubService())
# ─── time_machine ───────────────────────────────────────────────────
class TestTimeMachine:
def test_registered(self):
spec = get_registry().get('time_machine')
assert spec is not None
assert spec.requires_variant is True
assert spec.variant_resolver is not None
def test_variant_resolver_returns_decades(self):
spec = get_registry().get('time_machine')
decades = spec.variant_resolver(_deps())
assert '1980s' in decades
assert '2020s' in decades
# All decades should be 4-digit + 's'
for d in decades:
assert d.endswith('s')
assert d[:-1].isdigit()
def test_decade_label_to_year(self):
deps = _deps()
_tm_mod.generate(deps, '1980s', PlaylistConfig(limit=20))
assert deps.service.calls == [
{'method': 'get_decade_playlist', 'decade': 1980, 'limit': 20}
]
def test_invalid_variant_raises(self):
deps = _deps()
with pytest.raises(ValueError, match='not a decade label'):
_tm_mod.generate(deps, 'banana', PlaylistConfig())
def test_out_of_range_year_raises(self):
deps = _deps()
with pytest.raises(ValueError, match='out of range'):
_tm_mod.generate(deps, '1500s', PlaylistConfig())
def test_tolerates_no_s_suffix(self):
deps = _deps()
_tm_mod.generate(deps, '1990', PlaylistConfig())
assert deps.service.calls[0]['decade'] == 1990
def test_default_limit_is_100(self):
spec = get_registry().get('time_machine')
assert spec.default_config.limit == 100
def test_display_name_with_variant(self):
spec = get_registry().get('time_machine')
assert spec.display_name('1980s') == 'Time Machine — 1980s'
# ─── genre_playlist ─────────────────────────────────────────────────
class TestGenrePlaylist:
def test_registered(self):
spec = get_registry().get('genre_playlist')
assert spec is not None
assert spec.requires_variant is True
def test_variant_resolver_normalizes_parent_keys(self):
spec = get_registry().get('genre_playlist')
variants = spec.variant_resolver(_deps())
# 'Electronic/Dance' → 'electronic_dance' (slash → underscore + lowercase)
assert 'electronic_dance' in variants
assert 'hip_hop_rap' in variants
assert 'rock' in variants
def test_normalized_variant_resolves_to_parent_key(self):
deps = _deps()
_gp_mod.generate(deps, 'electronic_dance', PlaylistConfig())
# Service receives ORIGINAL parent key.
assert deps.service.calls[0]['genre'] == 'Electronic/Dance'
def test_unknown_variant_passed_through_as_freeform(self):
# Service handles partial-matching for free-form keywords.
deps = _deps()
_gp_mod.generate(deps, 'shoegaze', PlaylistConfig())
assert deps.service.calls[0]['genre'] == 'shoegaze'
def test_empty_variant_raises(self):
deps = _deps()
with pytest.raises(ValueError, match='requires a variant'):
_gp_mod.generate(deps, '', PlaylistConfig())
def test_display_name(self):
spec = get_registry().get('genre_playlist')
assert spec.display_name('electronic_dance') == 'Genre — electronic_dance'

@ -0,0 +1,476 @@
"""Boundary tests for the personalized-playlists foundation
(``core.personalized.types`` + ``core.personalized.specs`` +
``core.personalized.manager``).
Pin every shape the storage layer + lifecycle has to handle so the
generators that arrive in subsequent commits can rely on a stable
contract: ensure_playlist auto-creates from default config, refresh
atomically replaces the snapshot + appends history, generator
exceptions don't lose the previous good snapshot, config patches
preserve unsent fields, recent_track_ids honors the day window,
list_playlists orders newest-first."""
from __future__ import annotations
import os
import sqlite3
import tempfile
from typing import Any, List
from unittest.mock import MagicMock
import pytest
from core.personalized.manager import PersonalizedPlaylistManager
from core.personalized.specs import PlaylistKindRegistry, PlaylistKindSpec
from core.personalized.types import PlaylistConfig, Track
from database.personalized_schema import ensure_personalized_schema
# ─── shared fixtures ─────────────────────────────────────────────────
class _FakeDB:
"""Minimal MusicDatabase stand-in — gives the manager a real
sqlite connection so the manager exercises actual SQL."""
def __init__(self, path: str):
self.path = path
def _get_connection(self):
conn = sqlite3.connect(self.path)
conn.row_factory = sqlite3.Row
return conn
@pytest.fixture
def db_path(tmp_path):
p = str(tmp_path / 'test.db')
conn = sqlite3.connect(p)
ensure_personalized_schema(conn)
conn.commit()
conn.close()
return p
@pytest.fixture
def db(db_path):
return _FakeDB(db_path)
@pytest.fixture
def registry():
r = PlaylistKindRegistry()
return r
def _make_track(name='T1', artist='A1', sid='spot-1', source='spotify') -> Track:
return Track(
track_name=name, artist_name=artist, album_name='Album',
spotify_track_id=sid, source=source,
duration_ms=200000, popularity=50,
)
# ─── PlaylistConfig ──────────────────────────────────────────────────
class TestPlaylistConfig:
def test_default_values(self):
c = PlaylistConfig()
assert c.limit == 50
assert c.max_per_album == 2
assert c.max_per_artist == 3
assert c.popularity_min is None
assert c.popularity_max is None
assert c.exclude_recent_days == 0
assert c.recency_days is None
assert c.seed is None
assert c.extra == {}
def test_round_trip_through_json_dict(self):
c = PlaylistConfig(
limit=100, max_per_album=5, max_per_artist=10,
popularity_min=20, popularity_max=80,
exclude_recent_days=14, recency_days=180,
seed=42, extra={'selected_seasons': ['halloween', 'christmas']},
)
d = c.to_json_dict()
c2 = PlaylistConfig.from_json_dict(d)
assert c2 == c
def test_from_json_dict_handles_none(self):
c = PlaylistConfig.from_json_dict(None)
assert c == PlaylistConfig()
def test_from_json_dict_handles_non_dict(self):
c = PlaylistConfig.from_json_dict('garbage') # type: ignore
assert c == PlaylistConfig()
def test_from_json_dict_missing_fields_use_defaults(self):
c = PlaylistConfig.from_json_dict({'limit': 75})
assert c.limit == 75
assert c.max_per_album == 2 # default
def test_merged_overrides_only_named_fields(self):
base = PlaylistConfig(limit=50, popularity_min=20)
out = base.merged({'limit': 100})
assert out.limit == 100
assert out.popularity_min == 20 # untouched
def test_merged_extra_dict_is_deep_merged(self):
base = PlaylistConfig(extra={'a': 1, 'b': 2})
out = base.merged({'extra': {'b': 99, 'c': 3}})
assert out.extra == {'a': 1, 'b': 99, 'c': 3}
def test_merged_ignores_unknown_keys(self):
base = PlaylistConfig()
out = base.merged({'unknown_field': 'foo'})
assert out == base
# ─── Track ────────────────────────────────────────────────────────────
class TestTrack:
def test_from_dict_legacy_shape(self):
d = {
'track_name': 'Song', 'artist_name': 'Band',
'album_name': 'Album', 'spotify_track_id': 'spot-1',
'duration_ms': 200000, 'popularity': 60,
'_artist_genres_raw': '["rock"]', # ignored extra
}
t = Track.from_dict(d)
assert t.track_name == 'Song'
assert t.spotify_track_id == 'spot-1'
assert t.duration_ms == 200000
def test_primary_id_prefers_spotify(self):
t = Track(
track_name='', artist_name='',
spotify_track_id='spot', itunes_track_id='itu', deezer_track_id='dee',
)
assert t.primary_id() == 'spot'
def test_primary_id_falls_back_through_sources(self):
t = Track(track_name='', artist_name='', itunes_track_id='itu')
assert t.primary_id() == 'itu'
t2 = Track(track_name='', artist_name='', deezer_track_id='dee')
assert t2.primary_id() == 'dee'
def test_primary_id_none_when_no_sources(self):
t = Track(track_name='', artist_name='')
assert t.primary_id() is None
# ─── PlaylistKindRegistry ────────────────────────────────────────────
class TestRegistry:
def test_register_and_get(self, registry):
spec = PlaylistKindSpec(
kind='hidden_gems', name_template='Hidden Gems',
description='', default_config=PlaylistConfig(),
generator=lambda *a, **k: [],
)
registry.register(spec)
assert registry.get('hidden_gems') is spec
assert registry.get('nonexistent') is None
def test_duplicate_registration_raises(self, registry):
spec = PlaylistKindSpec(
kind='x', name_template='X', description='',
default_config=PlaylistConfig(), generator=lambda *a, **k: [],
)
registry.register(spec)
with pytest.raises(ValueError, match='already registered'):
registry.register(spec)
def test_display_name_singleton(self):
spec = PlaylistKindSpec(
kind='x', name_template='Hidden Gems', description='',
default_config=PlaylistConfig(), generator=lambda *a, **k: [],
)
assert spec.display_name('') == 'Hidden Gems'
def test_display_name_with_variant(self):
spec = PlaylistKindSpec(
kind='x', name_template='Time Machine — {variant}',
description='', default_config=PlaylistConfig(),
generator=lambda *a, **k: [],
)
assert spec.display_name('1980s') == 'Time Machine — 1980s'
def test_kinds_listing(self, registry):
for k in ('a', 'b', 'c'):
registry.register(PlaylistKindSpec(
kind=k, name_template=k, description='',
default_config=PlaylistConfig(), generator=lambda *a, **k: [],
))
assert set(registry.kinds()) == {'a', 'b', 'c'}
# ─── PersonalizedPlaylistManager ─────────────────────────────────────
def _register_simple_kind(registry, generator, kind='hidden_gems', requires_variant=False):
spec = PlaylistKindSpec(
kind=kind, name_template=kind.replace('_', ' ').title(),
description='', default_config=PlaylistConfig(limit=10),
generator=generator, requires_variant=requires_variant,
)
registry.register(spec)
return spec
class TestEnsurePlaylist:
def test_creates_row_with_default_config(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
record = mgr.ensure_playlist('hidden_gems', '', 1)
assert record.id > 0
assert record.kind == 'hidden_gems'
assert record.variant == ''
assert record.profile_id == 1
assert record.config.limit == 10 # from default
assert record.track_count == 0
assert record.last_generated_at is None
def test_returns_same_row_on_second_call(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
r1 = mgr.ensure_playlist('hidden_gems', '', 1)
r2 = mgr.ensure_playlist('hidden_gems', '', 1)
assert r1.id == r2.id
def test_variant_creates_separate_row(self, db, registry):
_register_simple_kind(
registry, lambda *a, **k: [], kind='time_machine', requires_variant=True,
)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
r1 = mgr.ensure_playlist('time_machine', '1980s', 1)
r2 = mgr.ensure_playlist('time_machine', '1990s', 1)
assert r1.id != r2.id
def test_unknown_kind_raises(self, db, registry):
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
with pytest.raises(ValueError, match='Unknown playlist kind'):
mgr.ensure_playlist('does_not_exist', '', 1)
def test_required_variant_missing_raises(self, db, registry):
_register_simple_kind(
registry, lambda *a, **k: [], kind='time_machine', requires_variant=True,
)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
with pytest.raises(ValueError, match='requires a variant'):
mgr.ensure_playlist('time_machine', '', 1)
class TestRefreshPlaylist:
def test_refresh_persists_tracks(self, db, registry):
tracks = [_make_track('S1', 'A1', 'sp1'), _make_track('S2', 'A1', 'sp2')]
_register_simple_kind(registry, lambda deps, variant, config: tracks)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
record = mgr.refresh_playlist('hidden_gems', '', 1)
assert record.track_count == 2
assert record.last_generated_at is not None
assert record.last_generation_error is None
persisted = mgr.get_playlist_tracks(record.id)
assert len(persisted) == 2
assert persisted[0].track_name == 'S1'
assert persisted[1].track_name == 'S2'
def test_refresh_replaces_previous_snapshot_atomically(self, db, registry):
run = {'tracks': [_make_track('first')]}
def gen(deps, variant, config):
return run['tracks']
_register_simple_kind(registry, gen)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
r1 = mgr.refresh_playlist('hidden_gems', '', 1)
assert r1.track_count == 1
run['tracks'] = [_make_track('A'), _make_track('B'), _make_track('C')]
r2 = mgr.refresh_playlist('hidden_gems', '', 1)
assert r2.id == r1.id
assert r2.track_count == 3
persisted = mgr.get_playlist_tracks(r2.id)
assert [t.track_name for t in persisted] == ['A', 'B', 'C']
def test_generator_exception_preserves_previous_snapshot(self, db, registry):
run = {'mode': 'success'}
def gen(deps, variant, config):
if run['mode'] == 'fail':
raise RuntimeError('generator boom')
return [_make_track('first')]
_register_simple_kind(registry, gen)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
r1 = mgr.refresh_playlist('hidden_gems', '', 1)
assert r1.track_count == 1
run['mode'] = 'fail'
r2 = mgr.refresh_playlist('hidden_gems', '', 1)
# Previous snapshot preserved.
assert r2.track_count == 1
# Error stamped on row.
assert r2.last_generation_error is not None
assert 'generator boom' in r2.last_generation_error
# Tracks still queryable.
persisted = mgr.get_playlist_tracks(r2.id)
assert len(persisted) == 1
def test_config_overrides_passed_to_generator(self, db, registry):
captured = {}
def gen(deps, variant, config):
captured['limit'] = config.limit
return []
_register_simple_kind(registry, gen)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.refresh_playlist('hidden_gems', '', 1, config_overrides={'limit': 200})
assert captured['limit'] == 200
def test_refresh_records_source_from_first_track(self, db, registry):
tracks = [_make_track(source='spotify'), _make_track(source='deezer')]
_register_simple_kind(registry, lambda *a, **k: tracks)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
record = mgr.refresh_playlist('hidden_gems', '', 1)
assert record.last_generation_source == 'spotify'
def test_track_data_json_round_trips(self, db, registry):
nested = {'id': 'spot-1', 'name': 'Foo', 'artists': [{'name': 'Bar'}]}
track = Track(
track_name='Foo', artist_name='Bar',
spotify_track_id='spot-1', track_data_json=nested,
)
_register_simple_kind(registry, lambda *a, **k: [track])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
record = mgr.refresh_playlist('hidden_gems', '', 1)
persisted = mgr.get_playlist_tracks(record.id)
assert persisted[0].track_data_json == nested
class TestUpdateConfig:
def test_patch_merges_with_stored(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.ensure_playlist('hidden_gems', '', 1)
record = mgr.update_config('hidden_gems', '', 1, {'limit': 75})
assert record.config.limit == 75
# Other fields kept.
assert record.config.max_per_album == 2
def test_patch_extra_dict_deep_merges(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.ensure_playlist('hidden_gems', '', 1)
mgr.update_config('hidden_gems', '', 1, {'extra': {'a': 1}})
record = mgr.update_config('hidden_gems', '', 1, {'extra': {'b': 2}})
assert record.config.extra == {'a': 1, 'b': 2}
class TestListPlaylists:
def test_lists_all_playlists_for_profile(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [], kind='hidden_gems')
_register_simple_kind(registry, lambda *a, **k: [], kind='popular_picks')
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.ensure_playlist('hidden_gems', '', 1)
mgr.ensure_playlist('popular_picks', '', 1)
records = mgr.list_playlists(1)
kinds = {r.kind for r in records}
assert kinds == {'hidden_gems', 'popular_picks'}
def test_does_not_list_other_profiles(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.ensure_playlist('hidden_gems', '', 1)
mgr.ensure_playlist('hidden_gems', '', 2)
assert len(mgr.list_playlists(1)) == 1
assert len(mgr.list_playlists(2)) == 1
class TestStalenessFilter:
"""`config.exclude_recent_days > 0` drops tracks served by this
kind for this profile in the last N days."""
def test_zero_days_means_no_filter(self, db, registry):
# Default config has exclude_recent_days=0; everything passes.
tracks = [_make_track(sid='spot-1'), _make_track(sid='spot-2')]
run = {'tracks': tracks}
_register_simple_kind(registry, lambda *a, **k: run['tracks'])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
r1 = mgr.refresh_playlist('hidden_gems', '', 1)
# Refresh again with same tracks — no filter, all should persist.
r2 = mgr.refresh_playlist('hidden_gems', '', 1)
assert r2.track_count == 2
def test_positive_days_filters_recently_served(self, db, registry):
run = {'tracks': [_make_track(sid='spot-1'), _make_track(sid='spot-2')]}
_register_simple_kind(registry, lambda *a, **k: run['tracks'])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
r1 = mgr.refresh_playlist('hidden_gems', '', 1)
assert r1.track_count == 2
# Update config to exclude tracks served in last 7 days.
mgr.update_config('hidden_gems', '', 1, {'exclude_recent_days': 7})
# Same generator output now → all tracks just got served, all filtered out.
r2 = mgr.refresh_playlist('hidden_gems', '', 1)
assert r2.track_count == 0
def test_filter_preserves_non_recent_tracks(self, db, registry):
run = {'tracks': [_make_track(sid='spot-1')]}
_register_simple_kind(registry, lambda *a, **k: run['tracks'])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
r1 = mgr.refresh_playlist('hidden_gems', '', 1)
mgr.update_config('hidden_gems', '', 1, {'exclude_recent_days': 7})
# New generator output with a NEW id — should pass.
run['tracks'] = [_make_track(sid='spot-1'), _make_track(sid='spot-NEW')]
r2 = mgr.refresh_playlist('hidden_gems', '', 1)
# spot-1 was just served, dropped. spot-NEW is fresh, kept.
assert r2.track_count == 1
persisted = mgr.get_playlist_tracks(r2.id)
assert persisted[0].spotify_track_id == 'spot-NEW'
def test_tracks_without_primary_id_pass_through(self, db, registry):
# Track with no source IDs — primary_id() is None — staleness
# filter has nothing to dedupe on, so the track passes.
track_no_id = Track(track_name='X', artist_name='Y', source='spotify')
run = {'tracks': [track_no_id]}
_register_simple_kind(registry, lambda *a, **k: run['tracks'])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.refresh_playlist('hidden_gems', '', 1)
mgr.update_config('hidden_gems', '', 1, {'exclude_recent_days': 7})
r2 = mgr.refresh_playlist('hidden_gems', '', 1)
# Track is kept because there's no id to match against history.
assert r2.track_count == 1
class TestStalenessHistory:
def test_recent_track_ids_returns_zero_when_days_zero(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [_make_track(sid='spot-1')])
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.refresh_playlist('hidden_gems', '', 1)
assert mgr.recent_track_ids(1, 'hidden_gems', 0) == []
def test_recent_track_ids_after_refresh(self, db, registry):
_register_simple_kind(
registry,
lambda *a, **k: [_make_track(sid='spot-1'), _make_track(sid='spot-2')],
)
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.refresh_playlist('hidden_gems', '', 1)
recent = mgr.recent_track_ids(1, 'hidden_gems', 7)
assert set(recent) == {'spot-1', 'spot-2'}
def test_recent_track_ids_scoped_to_kind(self, db, registry):
_register_simple_kind(registry, lambda *a, **k: [_make_track(sid='gem-1')], kind='hidden_gems')
_register_simple_kind(registry, lambda *a, **k: [_make_track(sid='pop-1')], kind='popular_picks')
mgr = PersonalizedPlaylistManager(db, deps=None, registry=registry)
mgr.refresh_playlist('hidden_gems', '', 1)
mgr.refresh_playlist('popular_picks', '', 1)
assert mgr.recent_track_ids(1, 'hidden_gems', 7) == ['gem-1']
assert mgr.recent_track_ids(1, 'popular_picks', 7) == ['pop-1']

@ -26818,6 +26818,115 @@ def get_discovery_shuffle():
logger.error(f"Error getting discovery shuffle playlist: {e}")
return jsonify({"success": False, "error": str(e)}), 500
# ========================================================================
# Personalized Playlists v2 — unified storage + manager-backed routes.
# Wraps every personalized playlist (Group A + Group B) behind one API
# surface. Generators in `core/personalized/generators/` register at
# import time; this set of routes exposes the manager for the UI.
# Legacy `/api/discover/personalized/...` endpoints stay alive for
# backward compat during the UI migration window.
# ========================================================================
# Trigger registration of every generator (side-effect import).
from core.personalized import generators as _personalized_generators # noqa: F401
from core.personalized import api as _personalized_api
from core.personalized.manager import PersonalizedPlaylistManager as _PersonalizedManager
def _build_personalized_manager():
"""Construct a manager wired with whatever each generator needs.
Per-request construction: the underlying services are cheap
accessors, so we don't bother caching. If profiling shows
overhead, this becomes a module-level lazy singleton."""
from core.personalized_playlists import get_personalized_playlists_service
from core.seasonal_discovery import get_seasonal_discovery_service
database = get_database()
deps = types.SimpleNamespace(
database=database,
service=get_personalized_playlists_service(database, spotify_client),
seasonal_service=get_seasonal_discovery_service(spotify_client, database),
get_current_profile_id=get_current_profile_id,
get_active_discovery_source=_get_active_discovery_source,
)
return _PersonalizedManager(database=database, deps=deps)
@app.route('/api/personalized/kinds', methods=['GET'])
def personalized_list_kinds():
"""List every registered personalized-playlist kind."""
try:
return jsonify(_personalized_api.list_kinds())
except Exception as e:
logger.error(f"Personalized kinds list error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/personalized/playlists', methods=['GET'])
def personalized_list_playlists():
"""List every persisted personalized playlist for the active profile."""
try:
manager = _build_personalized_manager()
return jsonify(_personalized_api.list_playlists(manager, get_current_profile_id()))
except Exception as e:
logger.error(f"Personalized playlists list error: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/personalized/playlist/<kind>', methods=['GET'])
@app.route('/api/personalized/playlist/<kind>/<variant>', methods=['GET'])
def personalized_get_playlist(kind, variant=''):
"""Get one personalized playlist + its current track snapshot.
Auto-creates the row from default config if it doesn't exist."""
try:
manager = _build_personalized_manager()
return jsonify(_personalized_api.get_playlist_with_tracks(
manager, kind, variant, get_current_profile_id(),
))
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 400
except Exception as e:
logger.error(f"Personalized playlist get error ({kind}/{variant}): {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/personalized/playlist/<kind>/refresh', methods=['POST'])
@app.route('/api/personalized/playlist/<kind>/<variant>/refresh', methods=['POST'])
def personalized_refresh_playlist(kind, variant=''):
"""Run the kind's generator and persist the snapshot."""
try:
manager = _build_personalized_manager()
body = request.get_json(silent=True) or {}
overrides = body.get('config_overrides') if isinstance(body.get('config_overrides'), dict) else None
return jsonify(_personalized_api.refresh_playlist(
manager, kind, variant, get_current_profile_id(), config_overrides=overrides,
))
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 400
except Exception as e:
logger.error(f"Personalized playlist refresh error ({kind}/{variant}): {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/personalized/playlist/<kind>/config', methods=['PUT'])
@app.route('/api/personalized/playlist/<kind>/<variant>/config', methods=['PUT'])
def personalized_update_config(kind, variant=''):
"""Patch the playlist's per-instance config."""
try:
manager = _build_personalized_manager()
body = request.get_json(silent=True) or {}
return jsonify(_personalized_api.update_config(
manager, kind, variant, get_current_profile_id(), body,
))
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 400
except Exception as e:
logger.error(f"Personalized playlist config error ({kind}/{variant}): {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/discover/artist-blacklist', methods=['GET'])
def get_discovery_artist_blacklist():
"""Get all blacklisted discovery artists."""

@ -3416,6 +3416,7 @@ const WHATS_NEW = {
'2.5.2': [
// --- May 13, 2026 — 2.5.2 release ---
{ date: 'May 13, 2026 — 2.5.2 release' },
{ title: 'Personalized Playlists Standardization', desc: 'all 8 personalized / discover-page playlists (Hidden Gems, Discovery Shuffle, Popular Picks, Time Machine per-decade, Genre playlists per-genre, Daily Mixes, Fresh Tape, The Archives, Seasonal Mix per-season) now share one unified storage layer. pre-overhaul: Group A (Fresh Tape / Archives / Seasonal Mix) lived in one shape, Group B (everything else) was computed-on-demand with no persistence — every page-load re-rolled the dice and tracks rotated under your feet. post-overhaul: every playlist has a stable identity, persistent track snapshot, explicit refresh button, and per-playlist tweakable config (limit, diversity caps, popularity bounds, recency window, exclude-recent-days staleness window). prerequisite for the playlist pipeline integration coming in the next PR (sync these to your media server + send missing tracks to wishlist on a timer). fixed a stub: Daily Mixes used to promise 50% library + 50% discovery but the library half always returned [] (tracks table has no source IDs to sync) — now honestly discovery-only so it actually works. also: each kind\'s body lifted into its own module under `core/personalized/generators/`, behavior preserved verbatim from the legacy `PersonalizedPlaylistsService` and `SeasonalDiscoveryService`. new REST endpoints under `/api/personalized/*`. 134 boundary tests cover every kind + the manager + the API + staleness filter; full suite at 3369 tests.', page: 'discover' },
{ title: 'Dashboard Activity Feed: Stop Showing "NaNmo ago"', desc: 'recent activity items on the dashboard all rendered "NaNmo ago" because the formatter was parsing `activity.time` (a human label like "Now") as a date. backend has always emitted `activity.timestamp` (Unix epoch seconds) alongside the label — frontend now uses that for relative-time formatting. falls back to the literal label only when no timestamp present (legacy items / future shapes).', page: 'home' },
{ title: 'Token Leak Round 2: URL-Encoded Form In Artist Endpoint + Playlist Sync', desc: 'security follow-up to the prior token-leak fix. found three sites in `web_server.py` (artist endpoint) that logged the full `image_url` and the entire artist_info dict at INFO on every artist-page render — the dict contained the `image_url` field routed through the image proxy (`/api/image-proxy?url=<encoded>`), URL-encoding the X-Plex-Token / X-Emby-Token / Subsonic auth straight into the log line. also one site in `core/discovery/sync.py` logged the playlist poster URL during sync. fixes: dropped the three artist-endpoint dev-time debug log lines entirely (before-fix, after-fix, "Final artist data being sent"). playlist-image log now logs `has_image=True/False`, not the URL. strengthened `_redact_url_secrets` with a second regex pattern that matches the URL-encoded form (`%3FX-Plex-Token%3D...`) so any future log-through-redactor catches both plain and encoded shapes. wipe your existing app.log if it captured tokens in either form, and rotate Plex / Jellyfin / Navidrome credentials.', page: 'settings' },
{ title: 'Stop Leaking Plex / Jellyfin / Navidrome Tokens Into app.log', desc: 'security: artwork URL fixer was logging full media-server URLs (including the X-Plex-Token / X-Emby-Token / Subsonic auth params) at INFO level on every cover-art lookup. tokens piled up in app.log on disk — anyone with read access to the log file gained full read access to the user\'s media server. fix: log lines moved to DEBUG (so they don\'t persist by default) and routed through a new `_redact_url_secrets` helper that masks the values of `X-Plex-Token` / `X-Emby-Token` / `api_key` / `apikey` / Subsonic `t` / `s` / `p` / generic `token` / `password` query params. anchor regex on `?` or `&` boundary so short keys like `t` don\'t false-match inside `format=Jpg`. also dropped the noisy per-call "Plex/Jellyfin/Navidrome config - base_url: ..., token: ..." INFO lines that fired on every thumbnail. wipe your existing app.log if your config has been logged.', page: 'settings' },

Loading…
Cancel
Save