You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoulSync/core/personalized/manager.py

479 lines
21 KiB

"""Storage layer + lifecycle for personalized playlists.
The manager is the ONLY place that touches the
``personalized_playlists`` / ``personalized_playlist_tracks`` /
``personalized_track_history`` tables. Generators (in
``core/personalized/generators/``) produce track lists; the manager
persists, refreshes, and serves them.
Key invariants:
- ``(profile_id, kind, variant)`` uniquely identifies a playlist.
Variant '' (empty string) means singleton — used for kinds like
``hidden_gems`` that don't have multiple instances per profile.
- A playlist row is auto-created on first access (``ensure_playlist``)
using the kind's default config.
- Track lists are atomically replaced on refresh — never partial-
mutated. Either the new snapshot lands fully or the old one
remains.
- Refresh failures get logged on the row
(``last_generation_error``) so the UI can surface them without
losing the previous good snapshot.
- Staleness history is append-only and queried by the
``exclude_recent_days`` config option (handled by individual
generators when they want to honor it).
"""
from __future__ import annotations
import json
import time
from dataclasses import asdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from utils.logging_config import get_logger
from core.personalized.specs import PlaylistKindRegistry, get_registry
from core.personalized.types import PlaylistConfig, PlaylistRecord, Track
logger = get_logger("personalized.manager")
class PersonalizedPlaylistManager:
"""Owns persistence + refresh lifecycle for personalized playlists."""
def __init__(self, database, deps: Any, registry: Optional[PlaylistKindRegistry] = None):
"""
Args:
database: MusicDatabase singleton (exposes ``_get_connection``).
deps: Opaque object passed through to each generator
callable. Whatever a generator needs (the legacy
``PersonalizedPlaylistsService`` instance, the
``config_manager``, a metadata client) goes in here.
Manager doesn't inspect it.
registry: optional PlaylistKindRegistry override (for tests).
"""
self.database = database
self.deps = deps
self.registry = registry or get_registry()
# ─── playlist row lifecycle ──────────────────────────────────────
def ensure_playlist(self, kind: str, variant: str = '', profile_id: int = 1) -> PlaylistRecord:
"""Return the playlist row for ``(profile, kind, variant)``,
creating it from the kind's default config if it doesn't exist."""
spec = self.registry.get(kind)
if spec is None:
raise ValueError(f"Unknown playlist kind: {kind!r}")
if spec.requires_variant and not variant:
raise ValueError(f"Kind {kind!r} requires a variant")
existing = self._fetch_playlist_row(kind, variant, profile_id)
if existing is not None:
return existing
# Insert new row using the kind's defaults.
config = spec.default_config
name = spec.display_name(variant)
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO personalized_playlists
(profile_id, kind, variant, name, config_json,
track_count, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""",
(profile_id, kind, variant, name, json.dumps(config.to_json_dict())),
)
conn.commit()
return self._fetch_playlist_row(kind, variant, profile_id) # type: ignore[return-value]
def get_playlist(self, kind: str, variant: str = '', profile_id: int = 1) -> Optional[PlaylistRecord]:
"""Return the playlist row if it exists. Does NOT auto-create."""
return self._fetch_playlist_row(kind, variant, profile_id)
def list_playlists(self, profile_id: int = 1) -> List[PlaylistRecord]:
"""List every persisted playlist for a profile, newest-first."""
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, profile_id, kind, variant, name, config_json,
track_count, last_generated_at, last_synced_at,
last_generation_source, last_generation_error,
is_stale
FROM personalized_playlists
WHERE profile_id = ?
ORDER BY COALESCE(last_generated_at, created_at) DESC
""",
(profile_id,),
)
rows = cursor.fetchall()
return [self._row_to_record(r) for r in rows]
def update_config(self, kind: str, variant: str, profile_id: int, overrides: Dict[str, Any]) -> PlaylistRecord:
"""Patch the per-playlist config with the provided overrides."""
record = self.ensure_playlist(kind, variant, profile_id)
merged = record.config.merged(overrides)
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE personalized_playlists
SET config_json = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(json.dumps(merged.to_json_dict()), record.id),
)
conn.commit()
return self._fetch_playlist_row(kind, variant, profile_id) # type: ignore[return-value]
# ─── refresh / generation ─────────────────────────────────────────
def refresh_playlist(
self,
kind: str,
variant: str = '',
profile_id: int = 1,
config_overrides: Optional[Dict[str, Any]] = None,
) -> PlaylistRecord:
"""Run the kind's generator and persist the result as the
playlist's current snapshot.
Atomic: track list is replaced in a single transaction. On
generator exception, the previous snapshot is preserved and
the error is recorded on the row.
Args:
kind: registered kind identifier.
variant: e.g. '1980s' for time machine, '' for singletons.
profile_id: profile to refresh under.
config_overrides: optional per-call config tweaks merged on
top of the stored config (e.g. UI lets the user "preview
with limit=100" without persisting that change).
Returns:
Updated PlaylistRecord with fresh ``track_count`` /
``last_generated_at`` (or ``last_generation_error`` on
failure).
"""
spec = self.registry.get(kind)
if spec is None:
raise ValueError(f"Unknown playlist kind: {kind!r}")
record = self.ensure_playlist(kind, variant, profile_id)
config = record.config
if config_overrides:
config = config.merged(config_overrides)
try:
tracks = spec.generator(self.deps, variant, config)
except Exception as exc: # noqa: BLE001 — record + re-raise after persisting
logger.exception("Generator failed for kind=%s variant=%s: %s", kind, variant, exc)
self._record_generation_failure(record.id, str(exc))
return self._fetch_playlist_row(kind, variant, profile_id) # type: ignore[return-value]
# Quality post-filters — applied uniformly to every kind so
# generators stay focused on selection logic, not staleness
# bookkeeping. Filters are config-driven; defaults preserve
# the pre-overhaul behavior (no filtering).
tracks = self._apply_quality_filters(tracks, kind, profile_id, config)
return self._persist_snapshot(record.id, kind, profile_id, tracks)
def _apply_quality_filters(
self,
tracks: List[Track],
kind: str,
profile_id: int,
config: PlaylistConfig,
) -> List[Track]:
"""Apply manager-level quality filters to a generator's output.
Currently:
- **Staleness window** (`config.exclude_recent_days > 0`): drops
any track whose primary id was served by this `kind` for this
`profile_id` in the last N days. Prevents the same track
from showing up across consecutive refreshes — e.g. a daily
Discovery Shuffle that shouldn't replay yesterday's picks.
Tracks without a primary id pass through unchanged (nothing
to dedupe on).
Returns a new list (never mutates input). When no filter
applies, returns ``tracks`` unchanged."""
if config.exclude_recent_days <= 0 or not tracks:
return tracks
recent_set = set(self.recent_track_ids(profile_id, kind, config.exclude_recent_days))
if not recent_set:
return tracks
return [t for t in tracks if not t.primary_id() or t.primary_id() not in recent_set]
# ─── track read ──────────────────────────────────────────────────
def get_playlist_tracks(self, playlist_id: int) -> List[Track]:
"""Return the persisted track list for a playlist row."""
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT spotify_track_id, itunes_track_id, deezer_track_id,
track_name, artist_name, album_name, album_cover_url,
duration_ms, popularity, track_data_json
FROM personalized_playlist_tracks
WHERE playlist_id = ?
ORDER BY position ASC
""",
(playlist_id,),
)
rows = cursor.fetchall()
return [self._row_to_track(r) for r in rows]
# ─── snapshot freshness vs source data ───────────────────────────
def mark_kinds_stale(self, kinds: List[str], profile_id: Optional[int] = None) -> int:
"""Flag every playlist row matching one of ``kinds`` as stale.
Called by upstream data refreshers (watchlist scan finishing
/ Spotify enrichment worker re-pulling Release Radar / etc)
so pipelines auto-regenerate snapshots before the next sync
instead of pushing stale data to the media server.
Returns the number of rows touched. When ``profile_id`` is
None, flags rows across every profile.
"""
if not kinds:
return 0
placeholders = ','.join('?' * len(kinds))
sql = f"UPDATE personalized_playlists SET is_stale = 1, updated_at = CURRENT_TIMESTAMP WHERE kind IN ({placeholders})"
params: List[Any] = list(kinds)
if profile_id is not None:
sql += " AND profile_id = ?"
params.append(profile_id)
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params)
count = cursor.rowcount
conn.commit()
return count
# ─── staleness history ───────────────────────────────────────────
def recent_track_ids(self, profile_id: int, kind: str, days: int) -> List[str]:
"""Return track IDs served by ``kind`` for ``profile_id`` in
the last ``days`` days. Generators query this when honoring
the ``exclude_recent_days`` config knob."""
if days <= 0:
return []
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT DISTINCT track_id
FROM personalized_track_history
WHERE profile_id = ?
AND kind = ?
AND served_at >= datetime('now', ?)
""",
(profile_id, kind, f'-{int(days)} days'),
)
return [r[0] for r in cursor.fetchall() if r[0]]
# ─── internal helpers ────────────────────────────────────────────
def _persist_snapshot(self, playlist_id: int, kind: str, profile_id: int, tracks: List[Track]) -> PlaylistRecord:
"""Atomic replace of a playlist's track list + history append."""
now = datetime.now(timezone.utc).isoformat(timespec='seconds')
primary_source = next(
(t.source for t in tracks if t.source), None,
)
with self.database._get_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute("BEGIN")
cursor.execute(
"DELETE FROM personalized_playlist_tracks WHERE playlist_id = ?",
(playlist_id,),
)
for position, track in enumerate(tracks):
td = track.track_data_json
if td is not None and not isinstance(td, str):
td = json.dumps(td)
cursor.execute(
"""
INSERT INTO personalized_playlist_tracks
(playlist_id, position,
spotify_track_id, itunes_track_id, deezer_track_id,
track_name, artist_name, album_name, album_cover_url,
duration_ms, popularity, track_data_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
playlist_id, position,
track.spotify_track_id, track.itunes_track_id, track.deezer_track_id,
track.track_name, track.artist_name, track.album_name, track.album_cover_url,
int(track.duration_ms or 0), int(track.popularity or 0), td,
),
)
cursor.execute(
"""
UPDATE personalized_playlists
SET track_count = ?, last_generated_at = CURRENT_TIMESTAMP,
last_generation_source = ?, last_generation_error = NULL,
is_stale = 0,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(len(tracks), primary_source, playlist_id),
)
# History append — only for tracks with a primary ID,
# used by exclude_recent_days filter on next refresh.
for track in tracks:
tid = track.primary_id()
if not tid:
continue
cursor.execute(
"""
INSERT INTO personalized_track_history
(profile_id, kind, track_id, served_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
""",
(profile_id, kind, tid),
)
conn.commit()
except Exception:
conn.rollback()
raise
# Re-read the row so the returned record carries the fresh
# last_generated_at timestamp.
record = self._fetch_playlist_row_by_id(playlist_id)
if record is None:
raise RuntimeError(f"playlist row {playlist_id} disappeared mid-refresh")
return record
def _record_generation_failure(self, playlist_id: int, error_message: str) -> None:
"""Stamp the error on the row WITHOUT touching tracks."""
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE personalized_playlists
SET last_generation_error = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(error_message[:500], playlist_id),
)
conn.commit()
def _fetch_playlist_row(self, kind: str, variant: str, profile_id: int) -> Optional[PlaylistRecord]:
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, profile_id, kind, variant, name, config_json,
track_count, last_generated_at, last_synced_at,
last_generation_source, last_generation_error,
is_stale
FROM personalized_playlists
WHERE profile_id = ? AND kind = ? AND variant = ?
""",
(profile_id, kind, variant),
)
row = cursor.fetchone()
return self._row_to_record(row) if row else None
def _fetch_playlist_row_by_id(self, playlist_id: int) -> Optional[PlaylistRecord]:
with self.database._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, profile_id, kind, variant, name, config_json,
track_count, last_generated_at, last_synced_at,
last_generation_source, last_generation_error,
is_stale
FROM personalized_playlists
WHERE id = ?
""",
(playlist_id,),
)
row = cursor.fetchone()
return self._row_to_record(row) if row else None
@staticmethod
def _row_to_record(row: Any) -> PlaylistRecord:
# Tolerate sqlite3.Row + plain tuples.
if hasattr(row, 'keys'):
row = dict(row)
return PlaylistRecord(
id=row['id'], profile_id=row['profile_id'],
kind=row['kind'], variant=row['variant'] or '',
name=row['name'],
config=PlaylistConfig.from_json_dict(_safe_json_loads(row['config_json'])),
track_count=row['track_count'] or 0,
last_generated_at=row.get('last_generated_at'),
last_synced_at=row.get('last_synced_at'),
last_generation_source=row.get('last_generation_source'),
last_generation_error=row.get('last_generation_error'),
is_stale=bool(row.get('is_stale') or 0),
)
# Tuple form: positional access matches SELECT order above.
return PlaylistRecord(
id=row[0], profile_id=row[1],
kind=row[2], variant=row[3] or '',
name=row[4],
config=PlaylistConfig.from_json_dict(_safe_json_loads(row[5])),
track_count=row[6] or 0,
last_generated_at=row[7],
last_synced_at=row[8],
last_generation_source=row[9],
last_generation_error=row[10],
is_stale=bool(row[11] or 0) if len(row) > 11 else False,
)
@staticmethod
def _row_to_track(row: Any) -> Track:
if hasattr(row, 'keys'):
row = dict(row)
return Track(
spotify_track_id=row.get('spotify_track_id'),
itunes_track_id=row.get('itunes_track_id'),
deezer_track_id=row.get('deezer_track_id'),
track_name=row.get('track_name', ''),
artist_name=row.get('artist_name', ''),
album_name=row.get('album_name') or '',
album_cover_url=row.get('album_cover_url'),
duration_ms=int(row.get('duration_ms') or 0),
popularity=int(row.get('popularity') or 0),
track_data_json=_safe_json_loads(row.get('track_data_json')),
)
return Track(
spotify_track_id=row[0], itunes_track_id=row[1], deezer_track_id=row[2],
track_name=row[3], artist_name=row[4], album_name=row[5] or '',
album_cover_url=row[6], duration_ms=int(row[7] or 0),
popularity=int(row[8] or 0),
track_data_json=_safe_json_loads(row[9]),
)
def _safe_json_loads(value: Any) -> Any:
"""Tolerant JSON parse — returns None / dict / passes through
non-string values. Avoids exceptions on bad rows so the manager
never breaks on a single corrupt record."""
if value is None:
return None
if not isinstance(value, str):
return value
if not value.strip():
return None
try:
return json.loads(value)
except (ValueError, TypeError):
return None
__all__ = ['PersonalizedPlaylistManager']