#!/usr/bin/env python3 """ Personalized Playlists Service - Creates Spotify-quality personalized playlists from user's library and discovery pool """ from typing import List, Dict, Any, Optional, Tuple from datetime import datetime, timedelta from collections import Counter import random import json from utils.logging_config import get_logger logger = get_logger("personalized_playlists") class PersonalizedPlaylistsService: """Service for generating personalized playlists from library and discovery pool""" # Genre consolidation mapping - maps specific Spotify genres to broad parent categories GENRE_MAPPING = { 'Electronic/Dance': [ 'house', 'techno', 'trance', 'edm', 'electro', 'dubstep', 'drum and bass', 'breakbeat', 'jungle', 'dnb', 'bass', 'garage', 'uk garage', 'future bass', 'trap', 'hardstyle', 'hardcore', 'rave', 'dance', 'electronic', 'electronica', 'synth', 'downtempo', 'chillwave', 'vaporwave', 'synthwave', 'idm', 'glitch' ], 'Hip Hop/Rap': [ 'hip hop', 'rap', 'trap', 'drill', 'grime', 'boom bap', 'underground hip hop', 'conscious hip hop', 'gangsta rap', 'southern hip hop', 'east coast', 'west coast', 'crunk', 'hyphy', 'cloud rap', 'emo rap', 'mumble rap' ], 'Rock': [ 'rock', 'alternative rock', 'indie rock', 'garage rock', 'post-punk', 'punk', 'hard rock', 'psychedelic rock', 'progressive rock', 'art rock', 'glam rock', 'blues rock', 'southern rock', 'surf rock', 'rockabilly', 'grunge', 'shoegaze', 'noise rock', 'post-rock', 'math rock', 'emo', 'screamo' ], 'Pop': [ 'pop', 'dance pop', 'electropop', 'synth pop', 'indie pop', 'chamber pop', 'art pop', 'baroque pop', 'dream pop', 'power pop', 'bubblegum pop', 'k-pop', 'j-pop', 'hyperpop', 'pop rock', 'teen pop' ], 'R&B/Soul': [ 'r&b', 'soul', 'neo soul', 'contemporary r&b', 'alternative r&b', 'funk', 'disco', 'motown', 'northern soul', 'quiet storm', 'new jack swing' ], 'Jazz': [ 'jazz', 'bebop', 'cool jazz', 'hard bop', 'modal jazz', 'free jazz', 'fusion', 'jazz fusion', 'smooth jazz', 'contemporary jazz', 'latin jazz', 'afro-cuban jazz', 'swing', 'big band', 'ragtime', 'dixieland' ], 'Classical': [ 'classical', 'baroque', 'romantic', 'contemporary classical', 'minimalism', 'opera', 'orchestral', 'chamber music', 'choral', 'renaissance', 'medieval' ], 'Metal': [ 'metal', 'heavy metal', 'thrash metal', 'death metal', 'black metal', 'doom metal', 'power metal', 'progressive metal', 'metalcore', 'deathcore', 'djent', 'nu metal', 'industrial metal', 'symphonic metal', 'gothic metal' ], 'Country': [ 'country', 'bluegrass', 'americana', 'outlaw country', 'country rock', 'alt-country', 'contemporary country', 'traditional country', 'honky tonk', 'western', 'nashville sound' ], 'Folk/Indie': [ 'folk', 'indie folk', 'folk rock', 'freak folk', 'anti-folk', 'singer-songwriter', 'acoustic', 'indie', 'lo-fi', 'bedroom pop', 'slowcore', 'sadcore' ], 'Latin': [ 'latin', 'reggaeton', 'salsa', 'bachata', 'merengue', 'cumbia', 'banda', 'regional mexican', 'mariachi', 'ranchera', 'corrido', 'latin pop', 'latin trap', 'urbano latino', 'bossa nova', 'samba', 'tango' ], 'Reggae/Dancehall': [ 'reggae', 'dancehall', 'dub', 'roots reggae', 'ska', 'rocksteady', 'lovers rock', 'reggae fusion' ], 'World': [ 'afrobeat', 'afropop', 'african', 'world', 'worldbeat', 'ethnic', 'traditional', 'folk music', 'celtic', 'klezmer', 'flamenco', 'fado', 'indian classical', 'raga', 'qawwali', 'k-indie', 'j-indie' ], 'Alternative': [ 'alternative', 'experimental', 'avant-garde', 'noise', 'ambient', 'industrial', 'new wave', 'no wave', 'gothic', 'darkwave', 'coldwave', 'witch house', 'trip hop', 'downtempo' ], 'Blues': [ 'blues', 'delta blues', 'chicago blues', 'electric blues', 'blues rock', 'rhythm and blues', 'soul blues', 'gospel blues' ], 'Funk/Disco': [ 'funk', 'disco', 'p-funk', 'boogie', 'electro-funk', 'g-funk' ] } def __init__(self, database, spotify_client=None): self.database = database self.spotify_client = spotify_client def _get_active_source(self) -> str: """Determine which music source is active — delegates to centralized metadata_service.""" from core.metadata_service import get_primary_source return get_primary_source() def _get_popularity_thresholds(self, source: str) -> Tuple[Optional[int], Optional[int]]: """Return (popular_min, hidden_max) thresholds for the given source. Either value can be None to skip that filter for that source. - Spotify: 60 / 40 (the existing 0-100 popularity scale) - Deezer: 500000 / 100000 (rank values — ballpark from real data) - iTunes / others: None / None (no popularity data; fall back to no threshold filter, just diversity-on-random) Sources are normalized lowercase so 'Spotify'/'spotify' both match. """ normalized = (source or '').lower() if normalized == 'spotify': return 60, 40 if normalized == 'deezer': return 500_000, 100_000 # iTunes, hydrabase, anything else — no usable popularity data return None, None # Standard column set returned by every discovery_pool selector. # Callers can request additional columns via the `extra_columns` parameter # of `_select_discovery_tracks` (e.g. `release_date`, `artist_genres`). _STANDARD_DISCOVERY_COLUMNS: Tuple[str, ...] = ( 'spotify_track_id', 'itunes_track_id', 'deezer_track_id', 'track_name', 'artist_name', 'album_name', 'album_cover_url', 'duration_ms', 'popularity', 'track_data_json', 'source', ) def _select_discovery_tracks( self, *, source: str, extra_where: str = "", extra_params: tuple = (), order_by: str = "RANDOM()", fetch_limit: int, extra_columns: tuple = (), exclude_owned: bool = True, ) -> List[Dict]: """ Shared selector for discovery_pool playlist methods. Builds and runs a SELECT against `discovery_pool` with a baked-in ID-validity gate so callers cannot accidentally return rows with no usable source IDs (which would fail downstream when the user clicks download). The WHERE clause always includes: source = ? AND (spotify_track_id IS NOT NULL OR itunes_track_id IS NOT NULL OR deezer_track_id IS NOT NULL) AND LOWER(artist_name) NOT IN (SELECT LOWER(artist_name) FROM discovery_artist_blacklist) When `exclude_owned=True` (default) the WHERE additionally excludes any discovery_pool row whose IDs already match a row in the local `tracks` table — i.e. tracks the user already has in their library. Without this filter, Discovery / Hidden Gems / Popular Picks etc. would happily surface tracks the user owns. Note the column-name asymmetry: `tracks.deezer_id` ↔ `discovery_pool.deezer_track_id`. The ID gate is mandatory and not opt-out by design — if a future method needs to skip it, that's a design discussion, not a flag. Callers compose additional filters via `extra_where` (a SQL fragment beginning with "AND ...") and `extra_params` (positional bindings for any `?` placeholders inside `extra_where`). Diversity filtering is the caller's responsibility — apply `_apply_diversity_filter` to the returned list if needed. Args: source: discovery_pool.source value to filter on. extra_where: optional SQL fragment appended to the WHERE clause. Must start with "AND " if non-empty. extra_params: positional bindings for `?` placeholders in `extra_where`. order_by: ORDER BY expression, used as-is (e.g. "RANDOM()", "popularity DESC, RANDOM()"). fetch_limit: LIMIT applied to the query. Callers that intend to run a diversity filter should over-fetch (e.g. `limit * 3`). extra_columns: additional columns to SELECT beyond `_STANDARD_DISCOVERY_COLUMNS` (e.g. `('release_date',)`). exclude_owned: when True (default), filter out discovery rows whose IDs match any row in the local `tracks` table. Returns: List of track dicts via `_build_track_dict`. Returns `[]` on any error (logged at error level). """ try: columns = self._STANDARD_DISCOVERY_COLUMNS + tuple(extra_columns) select_cols = ",\n ".join(columns) owned_clause = "" if exclude_owned: # Note column-name asymmetry: discovery_pool.deezer_track_id # but tracks.deezer_id. Don't refactor without checking. owned_clause = """ AND NOT EXISTS ( SELECT 1 FROM tracks t WHERE (t.spotify_track_id IS NOT NULL AND t.spotify_track_id = discovery_pool.spotify_track_id) OR (t.itunes_track_id IS NOT NULL AND t.itunes_track_id = discovery_pool.itunes_track_id) OR (t.deezer_id IS NOT NULL AND t.deezer_id = discovery_pool.deezer_track_id) )""" query = f""" SELECT {select_cols} FROM discovery_pool WHERE source = ? AND (spotify_track_id IS NOT NULL OR itunes_track_id IS NOT NULL OR deezer_track_id IS NOT NULL) AND LOWER(artist_name) NOT IN (SELECT LOWER(artist_name) FROM discovery_artist_blacklist) {owned_clause} {extra_where} ORDER BY {order_by} LIMIT ? """ params = (source,) + tuple(extra_params) + (fetch_limit,) with self.database._get_connection() as conn: cursor = conn.cursor() cursor.execute(query, params) rows = cursor.fetchall() return [self._build_track_dict(row, source) for row in rows] except Exception as e: logger.error(f"Error in _select_discovery_tracks (source={source}): {e}") return [] def _apply_diversity_filter( self, tracks: List[Dict], *, max_per_album: int, max_per_artist: int, limit: int, ) -> List[Dict]: """ Apply per-album / per-artist diversity caps to a track list. Iterates `tracks` in order, accepting each only if its album count is still under `max_per_album` AND its artist count is still under `max_per_artist`. Stops once `limit` tracks have been accepted. Returns a new list, already trimmed to at most `limit` items. """ tracks_by_album: Dict[str, int] = {} tracks_by_artist: Dict[str, int] = {} diverse_tracks: List[Dict] = [] for track in tracks: album = track['album_name'] artist = track['artist_name'] album_count = tracks_by_album.get(album, 0) artist_count = tracks_by_artist.get(artist, 0) if album_count < max_per_album and artist_count < max_per_artist: diverse_tracks.append(track) tracks_by_album[album] = album_count + 1 tracks_by_artist[artist] = artist_count + 1 if len(diverse_tracks) >= limit: break return diverse_tracks def _compute_adaptive_diversity_limits( self, tracks: List[Dict], *, relaxed: bool = False, ) -> Tuple[int, int]: """ Pick (max_per_album, max_per_artist) caps based on artist variety. Mirrors the step-functions previously inlined in the decade and genre playlist methods. With more unique artists we can be strict; with fewer we relax to still hit the requested track count. When `relaxed=True` (used for genre playlists), the moderate and low bands use looser caps and an extra "very limited" tier kicks in below 5 unique artists. Args: tracks: candidate track list to inspect. relaxed: True to apply the looser genre-playlist limits. Returns: Tuple of (max_per_album, max_per_artist). """ unique_artists = len(set(t['artist_name'] for t in tracks)) if relaxed: # Genre playlist tiers if unique_artists >= 20: return 3, 5 if unique_artists >= 10: return 4, 10 if unique_artists >= 5: return 6, 15 return 8, 25 # Decade-style strict tiers if unique_artists >= 20: return 3, 5 if unique_artists >= 10: return 4, 8 return 5, 12 def _build_track_dict(self, row, source: str) -> Dict: """Build a standardized track dictionary from a database row. If the row carries the optional `artist_genres` column (selected via `_select_discovery_tracks(extra_columns=('artist_genres',))`), the raw JSON string is passed through under `_artist_genres_raw` so callers can run Python-side genre matching without re-querying the row. """ # Convert sqlite3.Row to dict if needed (Row objects don't support .get()) if hasattr(row, 'keys'): row = dict(row) track_data = row.get('track_data_json') if isinstance(track_data, str): try: track_data = json.loads(track_data) except: track_data = None result = { 'track_id': row.get('spotify_track_id') or row.get('itunes_track_id') or row.get('deezer_track_id'), 'spotify_track_id': row.get('spotify_track_id'), 'itunes_track_id': row.get('itunes_track_id'), 'deezer_track_id': row.get('deezer_track_id'), 'track_name': row.get('track_name', 'Unknown'), 'artist_name': row.get('artist_name', 'Unknown'), 'album_name': row.get('album_name', 'Unknown'), 'album_cover_url': row.get('album_cover_url'), 'duration_ms': row.get('duration_ms', 0), 'popularity': row.get('popularity', 0), 'track_data_json': track_data, 'source': source, } # Pass through optional extra columns under underscore-prefixed keys # so callers that requested them via `extra_columns` can use them # without having to re-query. if 'artist_genres' in row: result['_artist_genres_raw'] = row.get('artist_genres') if 'release_date' in row: result['_release_date'] = row.get('release_date') return result @staticmethod def get_parent_genre(spotify_genre: str) -> str: """ Map a specific Spotify genre to its parent category. Returns the parent genre or 'Other' if no match found. """ spotify_genre_lower = spotify_genre.lower() for parent_genre, keywords in PersonalizedPlaylistsService.GENRE_MAPPING.items(): for keyword in keywords: if keyword in spotify_genre_lower: return parent_genre return 'Other' def get_decade_playlist(self, decade: int, limit: int = 100, source: str = None) -> List[Dict]: """ Get tracks from a specific decade from discovery pool with diversity filtering. Args: decade: Decade year (e.g., 2020 for 2020s, 2010 for 2010s) limit: Maximum tracks to return source: Optional source filter ('spotify' or 'itunes'), auto-detects if not provided """ start_year = decade end_year = decade + 9 active_source = source or self._get_active_source() # Over-fetch 10x for diversity filtering headroom. all_tracks = self._select_discovery_tracks( source=active_source, extra_where=( "AND release_date IS NOT NULL " "AND CAST(SUBSTR(release_date, 1, 4) AS INTEGER) BETWEEN ? AND ?" ), extra_params=(start_year, end_year), order_by="RANDOM()", fetch_limit=limit * 10, extra_columns=('release_date',), ) if not all_tracks: logger.warning(f"No tracks found for {decade}s") return [] random.shuffle(all_tracks) max_per_album, max_per_artist = self._compute_adaptive_diversity_limits(all_tracks) unique_artists = len(set(t['artist_name'] for t in all_tracks)) logger.info( f"{decade}s has {unique_artists} unique artists - using limits: " f"{max_per_album} per album, {max_per_artist} per artist" ) diverse_tracks = self._apply_diversity_filter( all_tracks, max_per_album=max_per_album, max_per_artist=max_per_artist, limit=limit, ) logger.info(f"Found {len(diverse_tracks)} tracks from {decade}s in discovery pool (adaptive diversity)") return diverse_tracks def get_available_genres(self, source: str = None) -> List[Dict]: """ Get list of consolidated parent genres with track counts from discovery pool. Uses cached artist genres from database (populated during discovery scan). Consolidates specific Spotify genres into broader parent categories. """ try: # Determine active source if not specified active_source = source or self._get_active_source() with self.database._get_connection() as conn: cursor = conn.cursor() # Get all tracks with genres from discovery pool, filtered by source cursor.execute(""" SELECT artist_genres FROM discovery_pool WHERE artist_genres IS NOT NULL AND source = ? """, (active_source,)) rows = cursor.fetchall() if not rows: logger.warning(f"No genres found in discovery pool for source {active_source}") return [] # Count tracks per PARENT genre (consolidated) parent_genre_track_count = {} # {parent_genre: count} for row in rows: try: artist_genres_json = row[0] if artist_genres_json: genres = json.loads(artist_genres_json) # Map each Spotify genre to parent and count tracks mapped_parents = set() # Use set to avoid double-counting per track for genre in genres: parent_genre = self.get_parent_genre(genre) mapped_parents.add(parent_genre) # Add this track to all parent genres for parent_genre in mapped_parents: parent_genre_track_count[parent_genre] = parent_genre_track_count.get(parent_genre, 0) + 1 except Exception as e: logger.debug(f"Error parsing genres JSON: {e}") continue # Filter genres with at least 10 tracks and sort by count # Exclude 'Other' category available_genres = [ {'name': genre, 'track_count': count} for genre, count in parent_genre_track_count.items() if count >= 10 and genre != 'Other' ] available_genres.sort(key=lambda x: x['track_count'], reverse=True) logger.info(f"Found {len(available_genres)} consolidated genres with 10+ tracks") return available_genres[:20] # Top 20 parent genres except Exception as e: logger.error(f"Error getting available genres: {e}") return [] def get_genre_playlist(self, genre: str, limit: int = 50, source: str = None) -> List[Dict]: """ Get tracks from a specific genre with diversity filtering. Uses cached artist genres from database (populated during discovery scan). Supports both parent genres (e.g., "Electronic/Dance") and specific genres (e.g., "house"). The keyword match is pushed into SQL as an OR-chain of LIKE clauses on `artist_genres`, so we no longer have to over-fetch the entire pool to filter Python-side. SQLite's LIKE is case-insensitive for ASCII, matching the previous case-insensitive Python comparison. """ active_source = source or self._get_active_source() # Build keyword list: parent genre expands to all child keywords; # specific genre uses its own name for partial matching. if genre in self.GENRE_MAPPING: search_keywords = [k.lower() for k in self.GENRE_MAPPING[genre]] logger.info(f"Matching parent genre '{genre}' with {len(search_keywords)} child keywords") else: search_keywords = [genre.lower()] logger.info(f"Matching specific genre '{genre}' with partial matching") # Build the SQL keyword OR-chain. One LIKE per keyword, all OR'd # together inside a single parenthesized clause so the surrounding # AND structure isn't broken. like_clauses = " OR ".join(["artist_genres LIKE ?"] * len(search_keywords)) like_params = tuple(f"%{k}%" for k in search_keywords) # Over-fetch 10x for diversity filter headroom — the SQL filter # has already narrowed to genre matches, so this is bounded. all_tracks = self._select_discovery_tracks( source=active_source, extra_where=f"AND artist_genres IS NOT NULL AND ({like_clauses})", extra_params=like_params, order_by="RANDOM()", fetch_limit=limit * 10, extra_columns=('artist_genres',), ) if not all_tracks: logger.warning(f"No tracks found for genre: {genre}") return [] max_per_album, max_per_artist = self._compute_adaptive_diversity_limits(all_tracks, relaxed=True) unique_artists = len(set(t['artist_name'] for t in all_tracks)) logger.info( f"Genre '{genre}' has {unique_artists} artists, {len(all_tracks)} total tracks - " f"limits: {max_per_album}/album, {max_per_artist}/artist" ) diverse_tracks = self._apply_diversity_filter( all_tracks, max_per_album=max_per_album, max_per_artist=max_per_artist, limit=limit, ) logger.info(f"Found {len(diverse_tracks)} tracks for genre '{genre}'") return diverse_tracks # ======================================== # DISCOVERY POOL PLAYLISTS # ======================================== def get_popular_picks(self, limit: int = 50) -> List[Dict]: """Get high popularity tracks from discovery pool with diversity (max 2 per album, 3 per artist). Popularity threshold is source-aware via `_get_popularity_thresholds` — sources without a usable popularity scale (iTunes / Hydrabase) skip the threshold filter and fall back to RANDOM + diversity only. """ active_source = self._get_active_source() popular_min, _ = self._get_popularity_thresholds(active_source) if popular_min is not None: extra_where = "AND popularity >= ?" extra_params: tuple = (popular_min,) order_by = "popularity DESC, RANDOM()" else: extra_where = "" extra_params = () order_by = "RANDOM()" # Over-fetch 3x so the diversity filter has room to spread albums/artists. all_tracks = self._select_discovery_tracks( source=active_source, extra_where=extra_where, extra_params=extra_params, order_by=order_by, fetch_limit=limit * 3, ) diverse_tracks = self._apply_diversity_filter( all_tracks, max_per_album=2, max_per_artist=3, limit=limit, ) logger.info(f"Popular Picks ({active_source}): selected {len(diverse_tracks)} tracks with diversity") return diverse_tracks def get_hidden_gems(self, limit: int = 50) -> List[Dict]: """Get low-popularity (underground/indie) tracks from discovery pool with diversity. Mirrors Popular Picks' diversity caps (2 per album, 3 per artist) since both are curated-feel sections. Popularity threshold is source-aware — iTunes/Hydrabase fall back to RANDOM + diversity only. """ active_source = self._get_active_source() _, hidden_max = self._get_popularity_thresholds(active_source) if hidden_max is not None: extra_where = "AND popularity < ?" extra_params: tuple = (hidden_max,) else: extra_where = "" extra_params = () # Over-fetch 3x for diversity filter headroom. all_tracks = self._select_discovery_tracks( source=active_source, extra_where=extra_where, extra_params=extra_params, order_by="RANDOM()", fetch_limit=limit * 3, ) diverse_tracks = self._apply_diversity_filter( all_tracks, max_per_album=2, max_per_artist=3, limit=limit, ) logger.info(f"Hidden Gems ({active_source}): selected {len(diverse_tracks)} tracks with diversity") return diverse_tracks def get_discovery_shuffle(self, limit: int = 50) -> List[Dict]: """ Get random tracks from discovery pool - pure exploration. Different every time you call it! Tighter diversity than Popular Picks / Hidden Gems (2 per album, 2 per artist) since shuffle should feel maximally varied. """ active_source = self._get_active_source() # Over-fetch 3x for diversity filter headroom. all_tracks = self._select_discovery_tracks( source=active_source, order_by="RANDOM()", fetch_limit=limit * 3, ) diverse_tracks = self._apply_diversity_filter( all_tracks, max_per_album=2, max_per_artist=2, limit=limit, ) logger.info(f"Discovery Shuffle ({active_source}): selected {len(diverse_tracks)} tracks with diversity") return diverse_tracks # ======================================== # DAILY MIX (HYBRID PLAYLISTS) # ======================================== def get_top_genres_from_library(self, limit: int = 5) -> List[Tuple[str, int]]: """ Get top genres from user's library by track count. Returns: List of (genre_name, track_count) tuples """ try: # Get all genres from library tracks with self.database._get_connection() as conn: cursor = conn.cursor() # Try to get genres from tracks or albums cursor.execute("PRAGMA table_info(tracks)") columns = [row['name'] for row in cursor.fetchall()] if 'genres' in columns: # Get genres directly from tracks cursor.execute(""" SELECT genres FROM tracks WHERE genres IS NOT NULL """) rows = cursor.fetchall() # Parse genres (assuming JSON array or comma-separated) all_genres = [] for row in rows: genres_str = row['genres'] if genres_str: # Try JSON parse first try: import json genres = json.loads(genres_str) all_genres.extend(genres) except: # Fallback to comma-separated genres = [g.strip() for g in genres_str.split(',')] all_genres.extend(genres) # Count genres genre_counts = Counter(all_genres) return genre_counts.most_common(limit) else: # Fallback: use artist names as "genres" logger.warning("No genres column - using top artists as categories") cursor.execute(""" SELECT ar.name, COUNT(*) as count FROM tracks t LEFT JOIN artists ar ON t.artist_id = ar.id WHERE ar.name IS NOT NULL GROUP BY ar.name ORDER BY count DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() return [(row['name'], row['count']) for row in rows] except Exception as e: logger.error(f"Error getting top genres: {e}") return [] def create_daily_mix(self, genre_or_artist: str, mix_number: int = 1) -> Dict[str, Any]: """ Create a Daily Mix playlist - hybrid of library + discovery pool. Strategy: - 50% tracks from user's library matching genre/artist - 50% tracks from discovery pool matching genre/artist Args: genre_or_artist: Genre name or artist name to base mix on mix_number: Mix number (1, 2, 3, etc.) Returns: Dict with playlist metadata and tracks """ try: logger.info(f"Creating Daily Mix #{mix_number} for: {genre_or_artist}") mix_size = 50 library_portion = mix_size // 2 # 25 tracks discovery_portion = mix_size - library_portion # 25 tracks # Get tracks from library library_tracks = self._get_library_tracks_by_category(genre_or_artist, library_portion) # Get tracks from discovery pool discovery_tracks = self._get_discovery_tracks_by_category(genre_or_artist, discovery_portion) # Combine and shuffle all_tracks = library_tracks + discovery_tracks random.shuffle(all_tracks) return { 'mix_number': mix_number, 'name': f"Daily Mix {mix_number}", 'description': f"{genre_or_artist} mix", 'category': genre_or_artist, 'track_count': len(all_tracks), 'tracks': all_tracks } except Exception as e: logger.error(f"Error creating daily mix: {e}") return { 'mix_number': mix_number, 'name': f"Daily Mix {mix_number}", 'description': 'Mix', 'category': genre_or_artist, 'track_count': 0, 'tracks': [] } def _get_library_tracks_by_category(self, category: str, limit: int) -> List[Dict]: """Library tracks intentionally excluded from daily mixes. The legacy ambition was 50% library + 50% discovery, but the ``tracks`` table doesn't carry source IDs (no ``spotify_track_id`` / ``itunes_track_id`` / ``deezer_track_id`` column) — so library rows can't flow through the same sync / wishlist pipeline that discovery tracks do. Returning them here would produce un-syncable, un-downloadable phantom entries. Returns ``[]`` so callers compose with ``_get_discovery_tracks_by_category`` for a discovery-only mix. A future PR can backfill source IDs into library rows and lift this restriction. """ return [] def _get_discovery_tracks_by_category(self, category: str, limit: int) -> List[Dict]: """Get tracks from discovery pool matching genre or artist""" # Determine active source active_source = self._get_active_source() try: with self.database._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT spotify_track_id, itunes_track_id, track_name, artist_name, album_name, album_cover_url, duration_ms, popularity, track_data_json, source FROM discovery_pool WHERE (artist_name LIKE ? OR track_name LIKE ?) AND source = ? AND LOWER(artist_name) NOT IN (SELECT LOWER(artist_name) FROM discovery_artist_blacklist) ORDER BY RANDOM() LIMIT ? """, (f'%{category}%', f'%{category}%', active_source, limit)) rows = cursor.fetchall() return [self._build_track_dict(row, active_source) for row in rows] except Exception as e: logger.error(f"Error getting discovery tracks by category: {e}") return [] def get_all_daily_mixes(self, max_mixes: int = 4) -> List[Dict]: """ Generate multiple Daily Mix playlists based on top genres/artists. Args: max_mixes: Maximum number of mixes to generate (default: 4) Returns: List of daily mix dictionaries """ try: # Get top categories (genres or artists) top_categories = self.get_top_genres_from_library(limit=max_mixes) if not top_categories: logger.warning("No categories found for Daily Mixes") return [] daily_mixes = [] for i, (category, _count) in enumerate(top_categories, 1): mix = self.create_daily_mix(category, mix_number=i) if mix['track_count'] > 0: daily_mixes.append(mix) logger.info(f"Created {len(daily_mixes)} Daily Mixes") return daily_mixes except Exception as e: logger.error(f"Error getting all daily mixes: {e}") return [] # ======================================== # BUILD A PLAYLIST (CUSTOM GENERATOR) # ======================================== def build_custom_playlist(self, seed_artist_ids: List[str], playlist_size: int = 50) -> Dict[str, Any]: """ Build a custom playlist from seed artists. Process: 1. Get similar artists for each seed artist (max 25 total) 2. Get albums from those similar artists 3. Select 20 random albums 4. Build playlist from tracks in those albums (max 50 tracks) Args: seed_artist_ids: List of 1-5 artist IDs (Spotify or iTunes) playlist_size: Maximum tracks in final playlist (default: 50) Returns: Dict with playlist metadata and tracks """ try: if not seed_artist_ids or len(seed_artist_ids) > 5: logger.error(f"Invalid seed artists count: {len(seed_artist_ids)}") return {'tracks': [], 'error': 'Must provide 1-5 seed artists'} active_source = self._get_active_source() use_spotify = (active_source == 'spotify') and self.spotify_client and self.spotify_client.sp logger.info(f"Building custom playlist from {len(seed_artist_ids)} seed artists (source: {active_source})") # Step 1: Get similar artists for each seed all_similar_artists = [] seen_artist_ids = set(seed_artist_ids) for seed_artist_id in seed_artist_ids: try: # Try database first (cached from MusicMap/watchlist scans) db_results = [] with self.database._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT similar_artist_spotify_id, similar_artist_itunes_id, similar_artist_deezer_id, similar_artist_musicbrainz_id, similar_artist_name FROM similar_artists WHERE source_artist_id = ? ORDER BY similarity_rank ASC LIMIT 10 """, (seed_artist_id,)) db_results = cursor.fetchall() if db_results: source_id_col = { 'spotify': 'similar_artist_spotify_id', 'itunes': 'similar_artist_itunes_id', 'deezer': 'similar_artist_deezer_id', 'musicbrainz': 'similar_artist_musicbrainz_id', }.get(active_source, 'similar_artist_itunes_id') for row in db_results: r = dict(row) artist_id = r.get(source_id_col) or r.get('similar_artist_spotify_id') or r.get('similar_artist_itunes_id') artist_name = r['similar_artist_name'] if artist_id and artist_id not in seen_artist_ids: all_similar_artists.append({'id': artist_id, 'name': artist_name}) seen_artist_ids.add(artist_id) if len(all_similar_artists) >= 25: break elif self.spotify_client and self.spotify_client.sp: # Fallback: fetch related artists from Spotify API (no Deezer/iTunes equivalent) logger.info(f"No cached similar artists for {seed_artist_id}, trying Spotify related artists API") try: related = self.spotify_client.sp.artist_related_artists(seed_artist_id) if related and 'artists' in related: for artist in related['artists'][:10]: artist_id = artist['id'] if artist_id not in seen_artist_ids: all_similar_artists.append({'id': artist_id, 'name': artist['name']}) seen_artist_ids.add(artist_id) if len(all_similar_artists) >= 25: break except Exception as e2: logger.warning(f"Spotify related artists fallback failed for {seed_artist_id}: {e2}") if len(all_similar_artists) >= 25: break except Exception as e: logger.warning(f"Error getting similar artists for {seed_artist_id}: {e}") continue logger.info(f"Found {len(all_similar_artists)} similar artists") # Always include seed artists alongside similar artists # so the playlist has tracks from both the selected and discovered artists artists_for_albums = [{'id': sid, 'name': '', 'is_seed': True} for sid in seed_artist_ids] for sa in all_similar_artists[:22]: # Cap similar to leave room for seeds artists_for_albums.append({**sa, 'is_seed': False}) # Step 2: Get albums from seed + similar artists all_albums = [] if use_spotify: for artist in artists_for_albums: try: albums = self.spotify_client.get_artist_albums( artist['id'], album_type='album,single', limit=10 ) if albums: all_albums.extend(albums) import time time.sleep(0.3) except Exception as e: logger.warning(f"Error getting albums for {artist.get('name', artist['id'])}: {e}") continue else: from core.metadata_service import get_primary_client itunes = get_primary_client() for artist in artists_for_albums: try: albums = itunes.get_artist_albums(artist['id'], limit=10) if albums: all_albums.extend(albums) import time time.sleep(0.3) except Exception as e: logger.warning(f"Error getting albums for {artist.get('name', artist['id'])}: {e}") continue logger.info(f"Found {len(all_albums)} total albums") if not all_albums: return {'tracks': [], 'error': 'No albums found for the selected artists'} # Step 3: Select 20 random albums random.shuffle(all_albums) selected_albums = all_albums[:20] logger.info(f"Selected {len(selected_albums)} random albums") # Step 4: Build playlist from tracks in those albums all_tracks = [] if use_spotify: for album in selected_albums: try: album_data = self.spotify_client.get_album(album.id) if album_data and 'tracks' in album_data: tracks = album_data['tracks'].get('items', []) for track in tracks: if track['id']: all_tracks.append({ 'spotify_track_id': track['id'], 'track_name': track['name'], 'artist_name': ', '.join([a['name'] for a in track.get('artists', [])]), 'album_name': album_data.get('name', 'Unknown'), 'album_cover_url': album_data.get('images', [{}])[0].get('url') if album_data.get('images') else None, 'duration_ms': track.get('duration_ms', 0), 'popularity': album_data.get('popularity', 0), 'id': track['id'], 'name': track['name'], 'artists': [a['name'] for a in track.get('artists', [])], 'album': { 'name': album_data.get('name', 'Unknown'), 'images': album_data.get('images', []) } }) import time time.sleep(0.3) except Exception as e: logger.warning(f"Error getting tracks from album: {e}") continue else: from core.metadata_service import get_primary_client itunes = get_primary_client() for album in selected_albums: try: album_data = itunes.get_album(album.id, include_tracks=True) if album_data and 'tracks' in album_data: tracks = album_data['tracks'].get('items', []) album_name = album_data.get('name', 'Unknown') album_images = album_data.get('images', []) album_cover = album_images[0].get('url') if album_images else None for track in tracks: track_id = track.get('id', '') if track_id: # iTunes artists are [{'name': '...'}] dicts track_artists = track.get('artists', []) artist_names = [a['name'] for a in track_artists] if isinstance(track_artists, list) and track_artists and isinstance(track_artists[0], dict) else (track_artists if isinstance(track_artists, list) else []) all_tracks.append({ 'spotify_track_id': track_id, 'track_name': track.get('name', ''), 'artist_name': ', '.join(artist_names) if artist_names else 'Unknown', 'album_name': album_name, 'album_cover_url': album_cover, 'duration_ms': track.get('duration_ms', 0), 'popularity': 0, 'id': track_id, 'name': track.get('name', ''), 'artists': artist_names, 'album': { 'name': album_name, 'images': album_images } }) import time time.sleep(0.3) except Exception as e: logger.warning(f"Error getting tracks from album: {e}") continue logger.info(f"Collected {len(all_tracks)} total tracks") if not all_tracks: return {'tracks': [], 'error': 'No tracks found'} # Shuffle and limit to playlist_size random.shuffle(all_tracks) final_tracks = all_tracks[:playlist_size] logger.info(f"Built custom playlist with {len(final_tracks)} tracks") return { 'name': 'Custom Playlist', 'description': f'Built from {len(seed_artist_ids)} seed artists', 'track_count': len(final_tracks), 'tracks': final_tracks, 'metadata': { 'total_tracks': len(final_tracks), 'similar_artists_count': len(all_similar_artists), 'albums_count': len(selected_albums) } } except Exception as e: logger.error(f"Error building custom playlist: {e}") import traceback traceback.print_exc() return {'tracks': [], 'error': str(e)} # Singleton instance _personalized_playlists_instance = None def get_personalized_playlists_service(database, spotify_client=None): """Get the global personalized playlists service instance""" global _personalized_playlists_instance if _personalized_playlists_instance is None: _personalized_playlists_instance = PersonalizedPlaylistsService(database, spotify_client) return _personalized_playlists_instance